477 lines
19 KiB
Python
477 lines
19 KiB
Python
"""This module contains the ZiplineApi class, which is the main class used to interact with the Zipline API."""
|
|
from datetime import datetime, timedelta
|
|
from typing import List, Union
|
|
|
|
from pyzipline.exceptions import (FeatureDisabledError, Forbidden, NotFound,
|
|
PyZiplineError)
|
|
from pyzipline.models import (File, Invite, Result, Stats, User, Version,
|
|
ZiplineApiConfig)
|
|
from pyzipline.rest_adapter import RestAdapter
|
|
from pyzipline.utils import convert_datetime_to_str
|
|
|
|
|
|
# pylint: disable=not-a-mapping
|
|
class ZiplineApi:
|
|
"""Represents an instance of the Zipline API.
|
|
|
|
All API requests should be made through this class.
|
|
|
|
Args:
|
|
config (ZiplineApiConfig): Configuration object for the ZiplineApi class
|
|
"""
|
|
def __init__(
|
|
self,
|
|
config: ZiplineApiConfig
|
|
):
|
|
self._rest_adapter = RestAdapter(config)
|
|
|
|
def create_invite(self, expiry: timedelta = timedelta(days=1), count: int = 1) -> Union[Invite, List[Invite]]:
|
|
"""Create an invite code
|
|
|
|
/// admonition | Requires Authentication
|
|
type: warning
|
|
///
|
|
|
|
/// admonition | Requires Administrator
|
|
type: danger
|
|
///
|
|
|
|
Args:
|
|
expiry (timedelta): Timedelta object representing when the invite should expire
|
|
count (int): Number of invites to create
|
|
|
|
Raises:
|
|
FeatureDisabledError: Raised when invites are disabled on the Zipline instance
|
|
Forbidden: Raised if the authenticated user is not an administrator
|
|
PyZiplineError: Raised if the API changes, causing a breaking change in this method
|
|
ValueError: Raised when the expiry datetime is invalid or the count is less than 1
|
|
|
|
Returns:
|
|
Invite: The newly created invite code(s)
|
|
"""
|
|
json = {'expiresAt': 'date=' + convert_datetime_to_str(datetime.now() + expiry), 'count': count}
|
|
result: Result = self._rest_adapter.post(endpoint="auth/invite", json=json)
|
|
if result.status_code == 200:
|
|
if count > 1:
|
|
invite_list = []
|
|
for invite in result.data:
|
|
i = Invite(**invite)
|
|
invite_list.append(i)
|
|
return invite_list
|
|
data = result.data[0] if isinstance(result.data, list) else result.data
|
|
return Invite(**data)
|
|
if result.message == 'invites are disabled':
|
|
raise FeatureDisabledError(result.message)
|
|
if result.message == 'invalid date':
|
|
raise ValueError(f"{result.status_code}: {result.message}\n{result.data}\n{json}")
|
|
if result.message == 'not an administrator':
|
|
raise Forbidden(result.message)
|
|
raise PyZiplineError(f"{result.status_code}: {result.message}\n{result.data}")
|
|
|
|
def register_user(self, username: str, password: str, invite: str = None, admin: bool = False) -> User:
|
|
"""Register a new user
|
|
|
|
/// admonition | Requires Authentication
|
|
type: warning
|
|
///
|
|
|
|
/// admonition | Parameter Requires Super Administrator
|
|
type: danger
|
|
The authenticated user must be a Super Administrator to use the `admin` parameter.
|
|
///
|
|
|
|
/// admonition | Conditionally Requires Administrator
|
|
type: danger
|
|
The authenticated user must be an Administrator to register a user when registration is disabled.
|
|
///
|
|
|
|
Args:
|
|
username (str): Username to register
|
|
password (str): Password for the new user
|
|
invite (str): Invite code to register the new user with, only required if registration without invites is disabled and the authenticated user is not an administrator
|
|
admin (bool): Whether or not the new user should be an administrator, authenticated user must be a super administrator to create an administrator
|
|
|
|
Raises:
|
|
FeatureDisabledError: Raised when:\n
|
|
- registration or invites are disabled on the Zipline instance and the authenticated user is not an administrator
|
|
- invite code is provided and invites are disabled
|
|
Forbidden: Raised if the authenticated user is not an super administrator and attempts to create an administrator
|
|
PyZiplineError: Raised if the API changes, causing a breaking change in this method
|
|
ValueError: Raised when the username is already taken or if the invite code is invalid/expired
|
|
|
|
Returns:
|
|
User: The newly created user
|
|
"""
|
|
json = {'username': username, 'password': password}
|
|
if invite is not None:
|
|
json['code'] = invite
|
|
if admin:
|
|
json['admin'] = True
|
|
|
|
result: Result = self._rest_adapter.post(endpoint="auth/register", json=json)
|
|
if result.status_code == 200:
|
|
return User(**result.data)
|
|
|
|
if result.message == 'This endpoint is unavailable due to current configurations':
|
|
raise FeatureDisabledError('user registration or invites are disabled')
|
|
|
|
if result.message =='Bad Username/Password':
|
|
if self.check_user_exists(username):
|
|
raise ValueError('username already taken')
|
|
raise FeatureDisabledError('invite code is provided and invites are disabled')
|
|
|
|
if result.message == 'Bad invite':
|
|
raise ValueError('invite code is invalid or expired')
|
|
|
|
if result.message == 'not an administrator':
|
|
raise Forbidden(result.message)
|
|
|
|
raise PyZiplineError(f"{result.status_code}: {result.message}\n{result.data}")
|
|
|
|
def check_user_exists(self, username: str, invite: str = None) -> bool:
|
|
"""Check if a user exists by username
|
|
|
|
Args:
|
|
username (str): Username to check
|
|
invite (str): Invite code to use, only required if registration without invites is disabled
|
|
|
|
Raises:
|
|
FeatureDisabledError: Raised when registration or invites are disabled on the Zipline instance
|
|
PyZiplineError: Raised if the API changes, causing a breaking change in this method
|
|
ValueError: Raised when the username is not present, or the invite code is invalid/not present and invites are enabled
|
|
|
|
Returns:
|
|
bool: True if user exists, False if not
|
|
"""
|
|
json = {'username': username} if invite is None else {'username': username, 'code': invite}
|
|
result: Result = self._rest_adapter.post(endpoint="user/check", json=json)
|
|
if result.status_code == 200:
|
|
return False
|
|
if result.message == 'username already exists':
|
|
return True
|
|
if result.message == 'user registration is disabled':
|
|
raise FeatureDisabledError('user registration or invites are disabled')
|
|
if result.message == 'invalid invite code':
|
|
raise ValueError(result.message + "(most likely doesn't exist)")
|
|
if result.message == 'no code':
|
|
raise ValueError('invite code not provided')
|
|
if result.message == 'no username':
|
|
raise ValueError('username not provided')
|
|
raise PyZiplineError(f"{result.status_code}: {result.message}\n{result.data}")
|
|
|
|
def delete_invite(self, code: str) -> Invite:
|
|
"""Delete an invite code
|
|
|
|
/// admonition | Requires Authentication
|
|
type: warning
|
|
///
|
|
|
|
/// admonition | Requires Administrator
|
|
type: danger
|
|
///
|
|
|
|
Args:
|
|
code (str): Invite code to delete
|
|
|
|
Raises:
|
|
Forbidden: Raised if the authenticated user is not an administrator
|
|
NotFound: Raised if the invite code does not exist
|
|
PyZiplineError: Raised if the API changes, causing a breaking change in this method
|
|
|
|
Returns:
|
|
Invite: An object containing the deleted invite
|
|
"""
|
|
result: Result = self._rest_adapter.delete(endpoint="auth/invite", params={'code': code})
|
|
if result.status_code == 200:
|
|
return Invite(**result.data)
|
|
if result.message == 'invite not found':
|
|
raise NotFound(result.message)
|
|
if result.message == 'not an administrator':
|
|
raise Forbidden(result.message)
|
|
raise PyZiplineError(f"{result.status_code}: {result.message}\n{result.data}")
|
|
|
|
def get_exif(self, file_id: int) -> dict:
|
|
"""Get the EXIF data for a file
|
|
|
|
/// admonition | Requires Authentication
|
|
type: warning
|
|
///
|
|
|
|
Args:
|
|
file_id (int): ID of the file to get EXIF data for
|
|
|
|
Raises:
|
|
Forbidden: The user is not authenticated
|
|
NotFound: The file does not exist
|
|
PyZiplineError: Raised if the API changes, causing a breaking change in this method
|
|
|
|
Returns:
|
|
dict: EXIF data for the file
|
|
"""
|
|
result: Result = self._rest_adapter.get(endpoint="/exif", params={'id': file_id})
|
|
if result.status_code == 200:
|
|
return result.data
|
|
if result.status_code == 401:
|
|
raise Forbidden(result.message)
|
|
if result.message == 'image not found':
|
|
raise NotFound(result.message)
|
|
if result.message == 'image not found on fs':
|
|
raise NotFound('image not found on filesystem')
|
|
raise PyZiplineError(f"{result.status_code}: {result.message}\n{result.data}")
|
|
|
|
def get_files(self, favorite: bool = False, media_only: bool = False) -> list[File]:
|
|
"""Get a list of the files uploaded by the authenticated user
|
|
|
|
/// admonition | Requires Authentication
|
|
type: warning
|
|
///
|
|
|
|
Args:
|
|
favorite (bool): Whether or not to return only favorite files
|
|
media_only (bool): Whether or not to return only media files
|
|
|
|
Raises:
|
|
Forbidden: The user is not authenticated
|
|
PyZiplineError: Raised if the API changes, causing a breaking change in this method
|
|
|
|
Returns:
|
|
File: List of files uploaded by the authenticated user
|
|
"""
|
|
params = {}
|
|
if favorite:
|
|
params['favorite'] = favorite
|
|
if media_only:
|
|
params['media_only'] = media_only
|
|
result: Result = self._rest_adapter.get(endpoint="/user/files", params=params)
|
|
if result.status_code == 200:
|
|
files = []
|
|
for file in result.data:
|
|
f = File(**file)
|
|
files.append(f)
|
|
return files
|
|
if result.status_code == 401:
|
|
raise Forbidden(result.message)
|
|
|
|
def get_password_protected_file(self, file_id: int, password: str) -> bytes:
|
|
"""Get a password protected file
|
|
|
|
Args:
|
|
file_id (int): ID of the file to get
|
|
password (str): Password for the file
|
|
|
|
Raises:
|
|
Forbidden: The password is incorrect
|
|
NotFound: The file does not exist
|
|
PyZiplineError: Raised if the API changes, causing a breaking change in this method
|
|
|
|
Returns:
|
|
bytes: The file, in bytes
|
|
"""
|
|
result: Result = self._rest_adapter.get(endpoint="auth/image", params={'id': file_id, 'password': password})
|
|
if result.status_code == 200:
|
|
return result.data
|
|
if result.message == 'image not found':
|
|
raise NotFound(result.message)
|
|
if result.message == 'wrong password':
|
|
raise Forbidden(result.message)
|
|
raise PyZiplineError(f"{result.status_code}: {result.message}\n{result.data}")
|
|
|
|
def get_invites(self) -> list[Invite]:
|
|
"""Get a list of invites
|
|
|
|
/// admonition | Requires Authentication
|
|
type: warning
|
|
///
|
|
|
|
/// admonition | Requires Administrator
|
|
type: danger
|
|
///
|
|
|
|
Raises:
|
|
Forbidden: The user is not authenticated
|
|
FeatureDisabledError: Invites are disabled on the Zipline instance
|
|
PyZiplineError: Raised if the API changes, causing a breaking change in this method
|
|
|
|
Returns:
|
|
Invite: List of invites
|
|
"""
|
|
result = self._rest_adapter.get(endpoint="auth/invite")
|
|
if result.status_code == 200:
|
|
return [Invite(**invite) for invite in result.data]
|
|
if result.status_code == 401:
|
|
raise Forbidden(result.message)
|
|
if result.message == 'invites are disabled':
|
|
raise FeatureDisabledError(result.message)
|
|
raise PyZiplineError(f"{result.status_code}: {result.message}\n{result.data}")
|
|
|
|
|
|
def get_self(self) -> User:
|
|
"""Get the currently authenticated user
|
|
|
|
/// admonition | Requires Authentication
|
|
type: warning
|
|
///
|
|
|
|
Raises:
|
|
Forbidden: The user is not authenticated
|
|
PyZiplineError: Raised if the API changes, causing a breaking change in this method
|
|
|
|
Returns:
|
|
User: The currently authenticated user
|
|
"""
|
|
result = self._rest_adapter.get(endpoint="user")
|
|
if result.status_code == 200:
|
|
return User(**result.data)
|
|
if result.status_code == 401:
|
|
raise Forbidden(result.message)
|
|
raise PyZiplineError(f"{result.status_code}: {result.message}\n{result.data}")
|
|
|
|
def shorten(self, url: str, vanity: str = None, max_views: int = None, zero_width: bool = False) -> str:
|
|
"""Shorten a URL
|
|
|
|
/// admonition | Requires Authentication
|
|
type: warning
|
|
///
|
|
|
|
Args:
|
|
url (str): URL to shorten
|
|
vanity (str): Vanity string to use
|
|
max_views (int): Maximum number of views before the URL expires
|
|
zero_width (bool): Whether or not to use zero width characters in the shortened URL
|
|
|
|
Raises:
|
|
Forbidden: The user is not authenticated
|
|
PyZiplineError: Raised if the API changes, causing a breaking change in this method
|
|
ValueError: Raised if the vanity string already exists, if the vanity string is empty, or if the max views is invalid (less than 0)
|
|
|
|
Returns:
|
|
str: The shortened URL
|
|
"""
|
|
headers = {}
|
|
if max_views is not None:
|
|
headers['Max-Views'] = max_views
|
|
if zero_width:
|
|
headers['Zws'] = True
|
|
|
|
json = {'url': url} if not vanity else {'url': url, 'vanity': vanity}
|
|
|
|
result = self._rest_adapter.post(endpoint="shorten", json=json, headers=headers)
|
|
|
|
if result.status_code == 200:
|
|
return result.data['url']
|
|
|
|
if result.status_code == 400:
|
|
raise ValueError(result.message)
|
|
|
|
if result.status_code == 401:
|
|
raise Forbidden(result.message)
|
|
|
|
raise PyZiplineError(f"{result.status_code}: {result.message}\n{result.data}")
|
|
|
|
def get_user(self, user_id: int) -> User:
|
|
"""Get a user by ID
|
|
|
|
/// admonition | Requires Administrator
|
|
type: danger
|
|
///
|
|
|
|
Args:
|
|
user_id (int): Integer ID of the user to retrieve
|
|
|
|
Raises:
|
|
Forbidden: Raised if the authenticated user is not an administrator
|
|
PyZiplineError: Raised if the API changes, causing a breaking change in this method
|
|
|
|
Returns:
|
|
User: The user with the given ID
|
|
"""
|
|
result = self._rest_adapter.get(endpoint=f"user/{user_id}")
|
|
if result.status_code == 200:
|
|
return User(**result.data)
|
|
if result.status_code == 403:
|
|
raise Forbidden(result.message)
|
|
raise PyZiplineError(f"{result.status_code}: {result.message}\n{result.data}")
|
|
|
|
def get_users(self) -> list[User]:
|
|
"""Get a list of users
|
|
|
|
/// admonition | Requires Administrator
|
|
type: danger
|
|
///
|
|
|
|
Raises:
|
|
Forbidden: Raised if the authenticated user is not an administrator
|
|
PyZiplineError: Raised if the API changes, causing a breaking change in this method
|
|
|
|
Returns:
|
|
list[User]: List of users
|
|
"""
|
|
result = self._rest_adapter.get(endpoint="users")
|
|
if result.status_code == 200:
|
|
return [User(**user) for user in result.data]
|
|
if result.status_code == 403:
|
|
raise Forbidden(result.message)
|
|
raise PyZiplineError(f"{result.status_code}: {result.message}\n{result.data}")
|
|
|
|
def get_stats(self, amount: int = 1, force_update: bool = False) -> Union[Stats, List[Stats]]:
|
|
"""Get statistics about the Zipline instance
|
|
|
|
/// admonition | Requires Authentication
|
|
type: warning
|
|
///
|
|
|
|
/// admonition | Parameter Requires Administrator
|
|
type: danger
|
|
The authenticated user must be an administrator to use the `force_update` argument.
|
|
///
|
|
|
|
/// admonition | Configuration Varies
|
|
type: note
|
|
The endpoint this method uses, `/api/stats`, relies a lot on Zipline's [configuration](https://zipline.diced.sh/docs/config/website#website_show_files_per_user) to determine who can access the endpoint and what the endpoint returns depending on permission level.
|
|
Please bear this in mind when using this method.
|
|
///
|
|
|
|
Args:
|
|
amount (int): Number of stats to retrieve
|
|
force_update (bool): Force the Zipline instance to update its statistics before returning them, requires administrator
|
|
|
|
Raises:
|
|
Forbidden: The user is not authenticated, or the user is not an administrator and `force_update` is True
|
|
PyZiplineError: Raised if the API changes, causing a breaking change in this method
|
|
ValueError: Raised if amount is less than 1
|
|
|
|
Returns:
|
|
Stats: Statistics about the Zipline instance
|
|
"""
|
|
if amount < 1:
|
|
raise ValueError('amount must be greater than 0')
|
|
if force_update:
|
|
result = self._rest_adapter.post(endpoint="stats", params={'amount': amount})
|
|
else:
|
|
result = self._rest_adapter.get(endpoint="stats", params={'amount': amount})
|
|
if result.status_code == 200:
|
|
return [Stats(**stats) for stats in result.data]
|
|
if result.status_code in (401, 403):
|
|
raise Forbidden(result.message)
|
|
raise PyZiplineError(f"{result.status_code}: {result.message}\n{result.data}")
|
|
|
|
def get_version(self) -> Version:
|
|
"""Get the version of the Zipline instance
|
|
|
|
/// admonition | Requires Authentication
|
|
type: warning
|
|
///
|
|
|
|
Raises:
|
|
Forbidden: The user is not authenticated
|
|
PyZiplineError: Raised if the API changes, causing a breaking change in this method
|
|
|
|
Returns:
|
|
Version: The version of the Zipline instance
|
|
"""
|
|
result = self._rest_adapter.get(endpoint="version")
|
|
if result.status_code == 200:
|
|
return Version(**result.data)
|
|
if result.status_code == 401:
|
|
raise Forbidden(result.message)
|
|
raise PyZiplineError(f"{result.status_code}: {result.message}\n{result.data}")
|