PyZipline/pyzipline/zipline.py
SeaswimmerTheFsh 292911df8b
All checks were successful
Pylint / Pylint (3.12) (push) Successful in 39s
fix(docs): added requires administrator admonition to get_invites
2024-01-15 05:14:57 -05:00

497 lines
20 KiB
Python

"""This module contains the ZiplineApi class, which is the main class used to interact with the Zipline API."""
from datetime import datetime, timedelta
from typing import Union, List
import logging
from pyzipline.rest_adapter import RestAdapter
from pyzipline.exceptions import PyZiplineError, FeatureDisabledError, Forbidden, NotFound
from pyzipline.models import User, File, Result, Invite, Stats, Version
from pyzipline.utils import convert_datetime_to_str
# pylint: disable=not-a-mapping
class ZiplineApi:
"""Represents an instance of the Zipline API.
All API requests should be made through this class.
Args:
hostname (str): The hostname of your Zipline instance, WITHOUT https or http.
token (str): String used for authentication when making requests.
ssl (bool): Normally set to True, but if your Zipline instance doesn't use SSL/TLS, set this to False.
enforced_signing (bool): Normally set to True, but if having SSL/TLS cert validation issues, can turn off with False.
logger (logging.Logger): If your app has a logger, pass it in here.
"""
def __init__(
self,
hostname: str,
token: str = '',
ssl: bool = True,
enforced_signing: bool = True,
logger: logging.Logger = None
):
self._rest_adapter = RestAdapter(hostname=hostname, token=token, ssl=ssl, enforced_signing=enforced_signing, logger=logger)
def create_invite(self, expiry: timedelta = timedelta(days=1), count: int = 1) -> Union[Invite, List[Invite]]:
"""Create an invite code
/// admonition | Requires Authentication
type: warning
///
/// admonition | Requires Administrator
type: danger
///
Args:
expiry (timedelta): Timedelta object representing when the invite should expire
count (int): Number of invites to create
Raises:
FeatureDisabledError: Raised when invites are disabled on the Zipline instance
Forbidden: Raised if the authenticated user is not an administrator
PyZiplineError: Raised if the API changes, causing a breaking change in this method
ValueError: Raised when the expiry datetime is invalid or the count is less than 1
Returns:
Invite: The newly created invite code(s)
"""
json = {'expiresAt': 'date=' + convert_datetime_to_str(datetime.now() + expiry), 'count': count}
result: Result = self._rest_adapter.post(endpoint="auth/invite", json=json)
if result.status_code == 200:
if count > 1:
invite_list = []
for invite in result.data:
i = Invite(**invite)
invite_list.append(i)
return invite_list
data = result.data[0] if isinstance(result.data, list) else result.data
return Invite(**data)
if result.message == 'invites are disabled':
raise FeatureDisabledError(result.message)
if result.message == 'invalid date':
raise ValueError(f"{result.status_code}: {result.message}\n{result.data}\n{json}")
if result.message == 'not an administrator':
raise Forbidden(result.message)
raise PyZiplineError(f"{result.status_code}: {result.message}\n{result.data}")
def register_user(self, username: str, password: str, invite: str = None, admin: bool = False) -> User:
"""Register a new user
/// admonition | Requires Authentication
type: warning
///
/// admonition | Parameter Requires Super Administrator
type: danger
The authenticated user must be a Super Administrator to use the `admin` parameter.
///
/// admonition | Conditionally Requires Administrator
type: danger
The authenticated user must be an Administrator to register a user when registration is disabled.
///
Args:
username (str): Username to register
password (str): Password for the new user
invite (str): Invite code to register the new user with, only required if registration without invites is disabled and the authenticated user is not an administrator
admin (bool): Whether or not the new user should be an administrator, authenticated user must be a super administrator to create an administrator
Raises:
FeatureDisabledError: Raised when:\n
- registration or invites are disabled on the Zipline instance and the authenticated user is not an administrator
- invite code is provided and invites are disabled
Forbidden: Raised if the authenticated user is not an super administrator and attempts to create an administrator
PyZiplineError: Raised if the API changes, causing a breaking change in this method
ValueError: Raised when the username is already taken or if the invite code is invalid/expired
Returns:
User: The newly created user
"""
json = {'username': username, 'password': password}
if invite is not None:
json['code'] = invite
if admin:
json['admin'] = True
result: Result = self._rest_adapter.post(endpoint="auth/register", json=json)
if result.status_code == 200:
return User(**result.data)
if result.message == 'This endpoint is unavailable due to current configurations':
raise FeatureDisabledError('user registration or invites are disabled')
if result.message =='Bad Username/Password':
if self.check_user_exists(username):
raise ValueError('username already taken')
raise FeatureDisabledError('invite code is provided and invites are disabled')
if result.message == 'Bad invite':
raise ValueError('invite code is invalid or expired')
if result.message == 'not an administrator':
raise Forbidden(result.message)
raise PyZiplineError(f"{result.status_code}: {result.message}\n{result.data}")
def check_user_exists(self, username: str, invite: str = None) -> bool:
"""Check if a user exists by username
Args:
username (str): Username to check
invite (str): Invite code to use, only required if registration without invites is disabled
Raises:
FeatureDisabledError: Raised when registration or invites are disabled on the Zipline instance
PyZiplineError: Raised if the API changes, causing a breaking change in this method
ValueError: Raised when the username is not present, or the invite code is invalid/not present and invites are enabled
Returns:
bool: True if user exists, False if not
"""
json = {'username': username} if invite is None else {'username': username, 'code': invite}
result: Result = self._rest_adapter.post(endpoint="user/check", json=json)
if result.status_code == 200:
return False
if result.message == 'username already exists':
return True
if result.message == 'user registration is disabled':
raise FeatureDisabledError('user registration or invites are disabled')
if result.message == 'invalid invite code':
raise ValueError(result.message + "(most likely doesn't exist)")
if result.message == 'no code':
raise ValueError('invite code not provided')
if result.message == 'no username':
raise ValueError('username not provided')
raise PyZiplineError(f"{result.status_code}: {result.message}\n{result.data}")
def delete_invite(self, code: str) -> Invite:
"""Delete an invite code
/// admonition | Requires Authentication
type: warning
///
/// admonition | Requires Administrator
type: danger
///
Args:
code (str): Invite code to delete
Raises:
Forbidden: Raised if the authenticated user is not an administrator
NotFound: Raised if the invite code does not exist
PyZiplineError: Raised if the API changes, causing a breaking change in this method
Returns:
Invite: An object containing the deleted invite
"""
result: Result = self._rest_adapter.delete(endpoint="auth/invite", params={'code': code})
if result.status_code == 200:
return Invite(**result.data)
if result.message == 'invite not found':
raise NotFound(result.message)
if result.message == 'not an administrator':
raise Forbidden(result.message)
raise PyZiplineError(f"{result.status_code}: {result.message}\n{result.data}")
def get_exif(self, file_id: int) -> dict:
"""Get the EXIF data for a file
/// admonition | Requires Authentication
type: warning
///
Args:
file_id (int): ID of the file to get EXIF data for
Raises:
Forbidden: The user is not authenticated
NotFound: The file does not exist
PyZiplineError: Raised if the API changes, causing a breaking change in this method
Returns:
dict: EXIF data for the file
"""
result: Result = self._rest_adapter.get(endpoint="/exif", params={'id': file_id})
if result.status_code == 200:
return result.data
if result.status_code == 401:
raise Forbidden(result.message)
if result.message == 'image not found':
raise NotFound(result.message)
if result.message == 'image not found on fs':
raise NotFound('image not found on filesystem')
raise PyZiplineError(f"{result.status_code}: {result.message}\n{result.data}")
def get_files(self, favorite: bool = False, media_only: bool = False) -> list[File]:
"""Get a list of the files uploaded by the authenticated user
/// admonition | Requires Authentication
type: warning
///
Args:
favorite (bool): Whether or not to return only favorite files
media_only (bool): Whether or not to return only media files
Raises:
Forbidden: The user is not authenticated
PyZiplineError: Raised if the API changes, causing a breaking change in this method
Returns:
File: List of files uploaded by the authenticated user
"""
params = {}
if favorite:
params['favorite'] = favorite
if media_only:
params['media_only'] = media_only
result: Result = self._rest_adapter.get(endpoint="/user/files", params=params)
if result.status_code == 200:
files = []
for file in result.data:
f = File(**file)
files.append(f)
return files
if result.status_code == 401:
raise Forbidden(result.message)
def get_password_protected_file(self, file_id: int, password: str) -> bytes:
"""Get a password protected file
Args:
file_id (int): ID of the file to get
password (str): Password for the file
Raises:
Forbidden: The password is incorrect
NotFound: The file does not exist
PyZiplineError: Raised if the API changes, causing a breaking change in this method
Returns:
bytes: The file, in bytes
"""
result: Result = self._rest_adapter.get(endpoint="auth/image", params={'id': file_id, 'password': password})
if result.status_code == 200:
return result.data
if result.message == 'image not found':
raise NotFound(result.message)
if result.message == 'wrong password':
raise Forbidden(result.message)
raise PyZiplineError(f"{result.status_code}: {result.message}\n{result.data}")
def get_invites(self) -> list[Invite]:
"""Get a list of invites
/// admonition | Requires Authentication
type: warning
///
/// admonition | Requires Administrator
type: danger
///
Raises:
Forbidden: The user is not authenticated
FeatureDisabledError: Invites are disabled on the Zipline instance
PyZiplineError: Raised if the API changes, causing a breaking change in this method
Returns:
Invite: List of invites
"""
result = self._rest_adapter.get(endpoint="auth/invite")
if result.status_code == 200:
invites = []
for invite in result.data:
i = Invite(**invite)
invites.append(i)
return invites
if result.status_code == 401:
raise Forbidden(result.message)
if result.message == 'invites are disabled':
raise FeatureDisabledError(result.message)
raise PyZiplineError(f"{result.status_code}: {result.message}\n{result.data}")
def get_self(self) -> User:
"""Get the currently authenticated user
/// admonition | Requires Authentication
type: warning
///
Raises:
Forbidden: The user is not authenticated
PyZiplineError: Raised if the API changes, causing a breaking change in this method
Returns:
User: The currently authenticated user
"""
result = self._rest_adapter.get(endpoint="user")
if result.status_code == 200:
return User(**result.data)
if result.status_code == 401:
raise Forbidden(result.message)
raise PyZiplineError(f"{result.status_code}: {result.message}\n{result.data}")
def shorten(self, url: str, vanity: str = None, max_views: int = None, zero_width: bool = False) -> str:
"""Shorten a URL
/// admonition | Requires Authentication
type: warning
///
Args:
url (str): URL to shorten
vanity (str): Vanity string to use
max_views (int): Maximum number of views before the URL expires
zero_width (bool): Whether or not to use zero width characters in the shortened URL
Raises:
Forbidden: The user is not authenticated
PyZiplineError: Raised if the API changes, causing a breaking change in this method
ValueError: Raised if the vanity string already exists, if the vanity string is empty, or if the max views is invalid (less than 0)
Returns:
str: The shortened URL
"""
headers = {}
if max_views is not None:
headers['Max-Views'] = max_views
if zero_width:
headers['Zws'] = True
json = {'url': url} if not vanity else {'url': url, 'vanity': vanity}
result = self._rest_adapter.post(endpoint="shorten", json=json, headers=headers)
if result.status_code == 200:
return result.data['url']
if result.status_code == 400:
raise ValueError(result.message)
if result.status_code == 401:
raise Forbidden(result.message)
raise PyZiplineError(f"{result.status_code}: {result.message}\n{result.data}")
def get_user(self, user_id: int) -> User:
"""Get a user by ID
/// admonition | Requires Administrator
type: danger
///
Args:
user_id (int): Integer ID of the user to retrieve
Raises:
Forbidden: Raised if the authenticated user is not an administrator
PyZiplineError: Raised if the API changes, causing a breaking change in this method
Returns:
User: The user with the given ID
"""
result = self._rest_adapter.get(endpoint=f"user/{user_id}")
if result.status_code == 200:
return User(**result.data)
if result.status_code == 403:
raise Forbidden(result.message)
raise PyZiplineError(f"{result.status_code}: {result.message}\n{result.data}")
def get_users(self) -> list[User]:
"""Get a list of users
/// admonition | Requires Administrator
type: danger
///
Raises:
Forbidden: Raised if the authenticated user is not an administrator
PyZiplineError: Raised if the API changes, causing a breaking change in this method
Returns:
list[User]: List of users
"""
result = self._rest_adapter.get(endpoint="users")
if result.status_code == 200:
users = []
for user in result.data:
u = User(**user)
users.append(u)
return users
if result.status_code == 403:
raise Forbidden(result.message)
raise PyZiplineError(f"{result.status_code}: {result.message}\n{result.data}")
def get_stats(self, amount: int = 1, force_update: bool = False) -> Union[Stats, List[Stats]]:
"""Get statistics about the Zipline instance
/// admonition | Requires Authentication
type: warning
///
/// admonition | Parameter Requires Administrator
type: danger
The authenticated user must be an administrator to use the `force_update` argument.
///
/// admonition | Configuration Varies
type: note
The endpoint this method uses, `/api/stats`, relies a lot on Zipline's [configuration](https://zipline.diced.sh/docs/config/website#website_show_files_per_user) to determine who can access the endpoint and what the endpoint returns depending on permission level.
Please bear this in mind when using this method.
///
Args:
amount (int ): Number of stats to retrieve
force_update (bool): Force the Zipline instance to update its statistics before returning them, requires administrator
Raises:
Forbidden: The user is not authenticated, or the user is not an administrator and `force_update` is True
PyZiplineError: Raised if the API changes, causing a breaking change in this method
ValueError: Raised if amount is less than 1
Returns:
Stats: Statistics about the Zipline instance
"""
if amount < 1:
raise ValueError('amount must be greater than 0')
if force_update:
result = self._rest_adapter.post(endpoint="stats", params={'amount': amount})
else:
result = self._rest_adapter.get(endpoint="stats", params={'amount': amount})
if result.status_code == 200:
if amount > 1:
stats_list = []
for stats in result.data:
s = Stats(**stats)
stats_list.append(s)
return stats_list
data = result.data[0] if isinstance(result.data, list) else result.data
return Stats(**data)
if result.status_code in (401, 403):
raise Forbidden(result.message)
raise PyZiplineError(f"{result.status_code}: {result.message}\n{result.data}")
def get_version(self) -> Version:
"""Get the version of the Zipline instance
/// admonition | Requires Authentication
type: warning
///
Raises:
Forbidden: The user is not authenticated
PyZiplineError: Raised if the API changes, causing a breaking change in this method
Returns:
Version: The version of the Zipline instance
"""
result = self._rest_adapter.get(endpoint="version")
if result.status_code == 200:
return Version(**result.data)
if result.status_code == 401:
raise Forbidden(result.message)
raise PyZiplineError(f"{result.status_code}: {result.message}\n{result.data}")