PyZipline/pyzipline/zipline.py

498 lines
20 KiB
Python
Raw Normal View History

2023-12-20 23:51:31 -05:00
"""This module contains the ZiplineApi class, which is the main class used to interact with the Zipline API."""
2023-12-22 13:51:42 -05:00
from datetime import datetime, timedelta
from typing import Union, List
2023-12-19 05:36:18 -05:00
import logging
from pyzipline.rest_adapter import RestAdapter
2023-12-22 13:51:42 -05:00
from pyzipline.exceptions import PyZiplineError, FeatureDisabledError, Forbidden, NotFound
from pyzipline.models import User, File, Result, Invite, Stats, Version
from pyzipline.utils import convert_datetime_to_str
2023-12-19 05:36:18 -05:00
2023-12-20 23:51:31 -05:00
# pylint: disable=not-a-mapping
2023-12-19 05:36:18 -05:00
class ZiplineApi:
"""Represents an instance of the Zipline API.
All API requests should be made through this class.
Args:
hostname (str): The hostname of your Zipline instance, WITHOUT https or http.
2023-12-20 23:51:47 -05:00
token (str): String used for authentication when making requests.
ssl (bool): Normally set to True, but if your Zipline instance doesn't use SSL/TLS, set this to False.
enforced_signing (bool): Normally set to True, but if having SSL/TLS cert validation issues, can turn off with False.
logger (logging.Logger): If your app has a logger, pass it in here.
"""
2023-12-19 05:36:18 -05:00
def __init__(
self,
hostname: str,
token: str = '',
ssl: bool = True,
enforced_signing: bool = True,
logger: logging.Logger = None
):
self._rest_adapter = RestAdapter(hostname=hostname, token=token, ssl=ssl, enforced_signing=enforced_signing, logger=logger)
2023-12-22 13:50:31 -05:00
def create_invite(self, expiry: timedelta = timedelta(days=1), count: int = 1) -> Union[Invite, List[Invite]]:
"""Create an invite code
/// admonition | Requires Authentication
type: warning
///
/// admonition | Requires Administrator
type: danger
///
Args:
expiry (timedelta): Timedelta object representing when the invite should expire
count (int): Number of invites to create
Raises:
FeatureDisabledError: Raised when invites are disabled on the Zipline instance
Forbidden: Raised if the authenticated user is not an administrator
PyZiplineError: Raised if the API changes, causing a breaking change in this method
ValueError: Raised when the expiry datetime is invalid or the count is less than 1
Returns:
Invite: The newly created invite code(s)
"""
json = {'expiresAt': 'date=' + convert_datetime_to_str(datetime.now() + expiry), 'count': count}
result: Result = self._rest_adapter.post(endpoint="auth/invite", json=json)
if result.status_code == 200:
if count > 1:
invite_list = []
for invite in result.data:
i = Invite(**invite)
invite_list.append(i)
return invite_list
data = result.data[0] if isinstance(result.data, list) else result.data
return Invite(**data)
if result.message == 'invites are disabled':
raise FeatureDisabledError(result.message)
if result.message == 'invalid date':
raise ValueError(f"{result.status_code}: {result.message}\n{result.data}\n{json}")
if result.message == 'not an administrator':
raise Forbidden(result.message)
raise PyZiplineError(f"{result.status_code}: {result.message}\n{result.data}")
2023-12-20 23:51:57 -05:00
def register_user(self, username: str, password: str, invite: str = None, admin: bool = False) -> User:
"""Register a new user
2023-12-20 20:48:45 -05:00
2023-12-21 15:34:04 -05:00
/// admonition | Requires Authentication
type: warning
///
2023-12-22 13:50:09 -05:00
/// admonition | Parameter Requires Super Administrator
type: danger
The authenticated user must be a Super Administrator to use the `admin` parameter.
///
2023-12-21 15:34:04 -05:00
/// admonition | Conditionally Requires Administrator
type: danger
2023-12-22 13:50:09 -05:00
The authenticated user must be an Administrator to register a user when registration is disabled.
2023-12-21 15:34:04 -05:00
///
Args:
2023-12-20 23:51:57 -05:00
username (str): Username to register
password (str): Password for the new user
invite (str): Invite code to register the new user with, only required if registration without invites is disabled and the authenticated user is not an administrator
admin (bool): Whether or not the new user should be an administrator, authenticated user must be a super administrator to create an administrator
2023-12-19 05:36:18 -05:00
2023-12-20 23:51:57 -05:00
Raises:
FeatureDisabledError: Raised when:\n
2023-12-22 13:50:09 -05:00
- registration or invites are disabled on the Zipline instance and the authenticated user is not an administrator
- invite code is provided and invites are disabled
Forbidden: Raised if the authenticated user is not an super administrator and attempts to create an administrator
2023-12-20 23:51:57 -05:00
PyZiplineError: Raised if the API changes, causing a breaking change in this method
ValueError: Raised when the username is already taken or if the invite code is invalid/expired
2023-12-20 20:48:45 -05:00
Returns:
2023-12-20 23:51:57 -05:00
User: The newly created user
2023-12-19 05:36:18 -05:00
"""
2023-12-22 13:50:09 -05:00
json = {'username': username, 'password': password}
2023-12-20 23:51:57 -05:00
if invite is not None:
2023-12-22 13:50:09 -05:00
json['code'] = invite
2023-12-20 23:51:57 -05:00
if admin:
2023-12-22 13:50:09 -05:00
json['admin'] = True
2023-12-20 23:51:57 -05:00
2023-12-22 13:50:09 -05:00
result: Result = self._rest_adapter.post(endpoint="auth/register", json=json)
2023-12-20 20:48:45 -05:00
if result.status_code == 200:
return User(**result.data)
2023-12-20 23:51:57 -05:00
if result.message == 'This endpoint is unavailable due to current configurations':
raise FeatureDisabledError('user registration or invites are disabled')
if result.message =='Bad Username/Password':
if self.check_user_exists(username):
raise ValueError('username already taken')
raise FeatureDisabledError('invite code is provided and invites are disabled')
if result.message == 'Bad invite':
raise ValueError('invite code is invalid or expired')
if result.message == 'not an administrator':
raise Forbidden(result.message)
raise PyZiplineError(f"{result.status_code}: {result.message}\n{result.data}")
def check_user_exists(self, username: str, invite: str = None) -> bool:
"""Check if a user exists by username
Args:
username (str): Username to check
2023-12-21 15:34:04 -05:00
invite (str): Invite code to use, only required if registration without invites is disabled
Raises:
FeatureDisabledError: Raised when registration or invites are disabled on the Zipline instance
2023-12-20 20:48:45 -05:00
PyZiplineError: Raised if the API changes, causing a breaking change in this method
ValueError: Raised when the username is not present, or the invite code is invalid/not present and invites are enabled
Returns:
bool: True if user exists, False if not
"""
2023-12-22 13:50:09 -05:00
json = {'username': username} if invite is None else {'username': username, 'code': invite}
result: Result = self._rest_adapter.post(endpoint="user/check", json=json)
if result.status_code == 200:
2023-12-20 20:48:45 -05:00
return False
2023-12-20 23:51:31 -05:00
if result.message == 'username already exists':
return True
2023-12-20 23:51:31 -05:00
if result.message == 'user registration is disabled':
2023-12-20 20:48:45 -05:00
raise FeatureDisabledError('user registration or invites are disabled')
2023-12-20 23:51:31 -05:00
if result.message == 'invalid invite code':
2023-12-20 20:48:45 -05:00
raise ValueError(result.message + "(most likely doesn't exist)")
2023-12-20 23:51:31 -05:00
if result.message == 'no code':
2023-12-20 20:48:45 -05:00
raise ValueError('invite code not provided')
2023-12-20 23:51:31 -05:00
if result.message == 'no username':
2023-12-20 20:48:45 -05:00
raise ValueError('username not provided')
2023-12-20 23:51:31 -05:00
raise PyZiplineError(f"{result.status_code}: {result.message}\n{result.data}")
2023-12-22 15:03:19 -05:00
def delete_invite(self, code: str) -> Invite:
"""Delete an invite code
/// admonition | Requires Authentication
type: warning
///
/// admonition | Requires Administrator
type: danger
///
Args:
code (str): Invite code to delete
Raises:
Forbidden: Raised if the authenticated user is not an administrator
NotFound: Raised if the invite code does not exist
PyZiplineError: Raised if the API changes, causing a breaking change in this method
Returns:
Invite: An object containing the deleted invite
"""
result: Result = self._rest_adapter.delete(endpoint="auth/invite", params={'code': code})
if result.status_code == 200:
return Invite(**result.data)
if result.message == 'invite not found':
raise NotFound(result.message)
if result.message == 'not an administrator':
raise Forbidden(result.message)
2023-12-22 15:03:19 -05:00
raise PyZiplineError(f"{result.status_code}: {result.message}\n{result.data}")
2023-12-22 13:50:45 -05:00
def get_exif(self, file_id: int) -> dict:
"""Get the EXIF data for a file
/// admonition | Requires Authentication
type: warning
///
Args:
file_id (int): ID of the file to get EXIF data for
Raises:
Forbidden: The user is not authenticated
NotFound: The file does not exist
PyZiplineError: Raised if the API changes, causing a breaking change in this method
Returns:
dict: EXIF data for the file
"""
result: Result = self._rest_adapter.get(endpoint="/exif", params={'id': file_id})
if result.status_code == 200:
return result.data
if result.status_code == 401:
raise Forbidden(result.message)
if result.message == 'image not found':
raise NotFound(result.message)
if result.message == 'image not found on fs':
raise NotFound('image not found on filesystem')
raise PyZiplineError(f"{result.status_code}: {result.message}\n{result.data}")
2023-12-22 13:50:57 -05:00
def get_files(self, favorite: bool = False, media_only: bool = False) -> list[File]:
"""Get a list of the files uploaded by the authenticated user
/// admonition | Requires Authentication
type: warning
///
Args:
favorite (bool): Whether or not to return only favorite files
media_only (bool): Whether or not to return only media files
Raises:
Forbidden: The user is not authenticated
PyZiplineError: Raised if the API changes, causing a breaking change in this method
Returns:
File: List of files uploaded by the authenticated user
"""
params = {}
if favorite:
params['favorite'] = favorite
if media_only:
params['media_only'] = media_only
result: Result = self._rest_adapter.get(endpoint="/user/files", params=params)
if result.status_code == 200:
files = []
for file in result.data:
f = File(**file)
files.append(f)
return files
if result.status_code == 401:
raise Forbidden(result.message)
def get_password_protected_file(self, file_id: int, password: str) -> bytes:
"""Get a password protected file
Args:
file_id (int): ID of the file to get
password (str): Password for the file
Raises:
Forbidden: The password is incorrect
NotFound: The file does not exist
PyZiplineError: Raised if the API changes, causing a breaking change in this method
Returns:
bytes: The file, in bytes
"""
2023-12-22 21:05:17 -05:00
result: Result = self._rest_adapter.get(endpoint="auth/image", params={'id': file_id, 'password': password})
if result.status_code == 200:
return result.data
if result.message == 'image not found':
raise NotFound(result.message)
if result.message == 'wrong password':
raise Forbidden(result.message)
raise PyZiplineError(f"{result.status_code}: {result.message}\n{result.data}")
2023-12-22 13:51:15 -05:00
def get_invites(self) -> list[Invite]:
"""Get a list of invites
/// admonition | Requires Authentication
type: warning
///
/// admonition | Requires Administrator
type: danger
///
2023-12-22 13:51:15 -05:00
Raises:
Forbidden: The user is not authenticated
FeatureDisabledError: Invites are disabled on the Zipline instance
PyZiplineError: Raised if the API changes, causing a breaking change in this method
Returns:
Invite: List of invites
"""
2023-12-22 15:03:19 -05:00
result = self._rest_adapter.get(endpoint="auth/invite")
2023-12-22 13:51:15 -05:00
if result.status_code == 200:
invites = []
for invite in result.data:
i = Invite(**invite)
invites.append(i)
return invites
if result.status_code == 401:
raise Forbidden(result.message)
if result.message == 'invites are disabled':
raise FeatureDisabledError(result.message)
raise PyZiplineError(f"{result.status_code}: {result.message}\n{result.data}")
2023-12-20 23:51:31 -05:00
def get_self(self) -> User:
"""Get the currently authenticated user
/// admonition | Requires Authentication
type: warning
///
Raises:
2023-12-21 14:08:09 -05:00
Forbidden: The user is not authenticated
2023-12-20 23:51:31 -05:00
PyZiplineError: Raised if the API changes, causing a breaking change in this method
Returns:
User: The currently authenticated user
"""
result = self._rest_adapter.get(endpoint="user")
if result.status_code == 200:
return User(**result.data)
if result.status_code == 401:
raise Forbidden(result.message)
raise PyZiplineError(f"{result.status_code}: {result.message}\n{result.data}")
2023-12-23 11:14:11 -05:00
def shorten(self, url: str, vanity: str = None, max_views: int = None, zero_width: bool = False) -> str:
"""Shorten a URL
/// admonition | Requires Authentication
type: warning
///
Args:
url (str): URL to shorten
vanity (str): Vanity string to use
max_views (int): Maximum number of views before the URL expires
zero_width (bool): Whether or not to use zero width characters in the shortened URL
Raises:
Forbidden: The user is not authenticated
PyZiplineError: Raised if the API changes, causing a breaking change in this method
ValueError: Raised if the vanity string already exists, if the vanity string is empty, or if the max views is invalid (less than 0)
Returns:
str: The shortened URL
"""
headers = {}
if max_views is not None:
headers['Max-Views'] = max_views
if zero_width:
headers['Zws'] = True
json = {'url': url} if not vanity else {'url': url, 'vanity': vanity}
result = self._rest_adapter.post(endpoint="shorten", json=json, headers=headers)
if result.status_code == 200:
return result.data['url']
if result.status_code == 400:
raise ValueError(result.message)
if result.status_code == 401:
raise Forbidden(result.message)
raise PyZiplineError(f"{result.status_code}: {result.message}\n{result.data}")
2023-12-20 23:51:31 -05:00
def get_user(self, user_id: int) -> User:
"""Get a user by ID
/// admonition | Requires Administrator
type: danger
///
Args:
user_id (int): Integer ID of the user to retrieve
Raises:
Forbidden: Raised if the authenticated user is not an administrator
PyZiplineError: Raised if the API changes, causing a breaking change in this method
Returns:
User: The user with the given ID
"""
result = self._rest_adapter.get(endpoint=f"user/{user_id}")
if result.status_code == 200:
return User(**result.data)
if result.status_code == 403:
raise Forbidden(result.message)
2023-12-22 13:51:42 -05:00
raise PyZiplineError(f"{result.status_code}: {result.message}\n{result.data}")
def get_users(self) -> list[User]:
"""Get a list of users
/// admonition | Requires Administrator
type: danger
///
Raises:
Forbidden: Raised if the authenticated user is not an administrator
PyZiplineError: Raised if the API changes, causing a breaking change in this method
Returns:
list[User]: List of users
"""
result = self._rest_adapter.get(endpoint="users")
if result.status_code == 200:
users = []
for user in result.data:
u = User(**user)
users.append(u)
return users
if result.status_code == 403:
raise Forbidden(result.message)
2023-12-20 23:51:31 -05:00
raise PyZiplineError(f"{result.status_code}: {result.message}\n{result.data}")
def get_stats(self, amount: int = 1, force_update: bool = False) -> Union[Stats, List[Stats]]:
"""Get statistics about the Zipline instance
/// admonition | Requires Authentication
type: warning
///
/// admonition | Parameter Requires Administrator
type: danger
The authenticated user must be an administrator to use the `force_update` argument.
///
/// admonition | Configuration Varies
type: note
The endpoint this method uses, `/api/stats`, relies a lot on Zipline's [configuration](https://zipline.diced.sh/docs/config/website#website_show_files_per_user) to determine who can access the endpoint and what the endpoint returns depending on permission level.
Please bear this in mind when using this method.
///
Args:
amount (int ): Number of stats to retrieve
force_update (bool): Force the Zipline instance to update its statistics before returning them, requires administrator
Raises:
Forbidden: The user is not authenticated, or the user is not an administrator and `force_update` is True
PyZiplineError: Raised if the API changes, causing a breaking change in this method
ValueError: Raised if amount is less than 1
Returns:
Stats: Statistics about the Zipline instance
"""
if amount < 1:
raise ValueError('amount must be greater than 0')
if force_update:
result = self._rest_adapter.post(endpoint="stats", params={'amount': amount})
else:
result = self._rest_adapter.get(endpoint="stats", params={'amount': amount})
if result.status_code == 200:
if amount > 1:
stats_list = []
for stats in result.data:
s = Stats(**stats)
stats_list.append(s)
return stats_list
data = result.data[0] if isinstance(result.data, list) else result.data
return Stats(**data)
2023-12-22 13:50:09 -05:00
if result.status_code in (401, 403):
raise Forbidden(result.message)
raise PyZiplineError(f"{result.status_code}: {result.message}\n{result.data}")
def get_version(self) -> Version:
"""Get the version of the Zipline instance
/// admonition | Requires Authentication
type: warning
///
Raises:
Forbidden: The user is not authenticated
PyZiplineError: Raised if the API changes, causing a breaking change in this method
Returns:
Version: The version of the Zipline instance
"""
result = self._rest_adapter.get(endpoint="version")
if result.status_code == 200:
return Version(**result.data)
if result.status_code == 401:
raise Forbidden(result.message)
raise PyZiplineError(f"{result.status_code}: {result.message}\n{result.data}")