SeaswimmerTheFsh
f313a3b992
All checks were successful
Pylint / Pylint (3.12) (push) Successful in 35s
426 lines
17 KiB
Python
426 lines
17 KiB
Python
"""This module contains the ZiplineApi class, which is the main class used to interact with the Zipline API."""
|
|
from datetime import datetime, timedelta
|
|
from typing import Union, List
|
|
import logging
|
|
from pyzipline.rest_adapter import RestAdapter
|
|
from pyzipline.exceptions import PyZiplineError, FeatureDisabledError, Forbidden, NotFound
|
|
from pyzipline.models import User, File, Result, Invite, Stats, Version
|
|
from pyzipline.utils import convert_datetime_to_str
|
|
|
|
# pylint: disable=not-a-mapping
|
|
class ZiplineApi:
|
|
"""Represents an instance of the Zipline API.
|
|
|
|
All API requests should be made through this class.
|
|
|
|
Args:
|
|
hostname (str): The hostname of your Zipline instance, WITHOUT https or http.
|
|
token (str): String used for authentication when making requests.
|
|
ssl (bool): Normally set to True, but if your Zipline instance doesn't use SSL/TLS, set this to False.
|
|
enforced_signing (bool): Normally set to True, but if having SSL/TLS cert validation issues, can turn off with False.
|
|
logger (logging.Logger): If your app has a logger, pass it in here.
|
|
"""
|
|
def __init__(
|
|
self,
|
|
hostname: str,
|
|
token: str = '',
|
|
ssl: bool = True,
|
|
enforced_signing: bool = True,
|
|
logger: logging.Logger = None
|
|
):
|
|
self._rest_adapter = RestAdapter(hostname=hostname, token=token, ssl=ssl, enforced_signing=enforced_signing, logger=logger)
|
|
|
|
def create_invite(self, expiry: timedelta = timedelta(days=1), count: int = 1) -> Union[Invite, List[Invite]]:
|
|
"""Create an invite code
|
|
|
|
/// admonition | Requires Authentication
|
|
type: warning
|
|
///
|
|
|
|
/// admonition | Requires Administrator
|
|
type: danger
|
|
///
|
|
|
|
Args:
|
|
expiry (timedelta): Timedelta object representing when the invite should expire
|
|
count (int): Number of invites to create
|
|
|
|
Raises:
|
|
FeatureDisabledError: Raised when invites are disabled on the Zipline instance
|
|
Forbidden: Raised if the authenticated user is not an administrator
|
|
PyZiplineError: Raised if the API changes, causing a breaking change in this method
|
|
ValueError: Raised when the expiry datetime is invalid or the count is less than 1
|
|
|
|
Returns:
|
|
Invite: The newly created invite code(s)
|
|
"""
|
|
json = {'expiresAt': 'date=' + convert_datetime_to_str(datetime.now() + expiry), 'count': count}
|
|
result: Result = self._rest_adapter.post(endpoint="auth/invite", json=json)
|
|
if result.status_code == 200:
|
|
if count > 1:
|
|
invite_list = []
|
|
for invite in result.data:
|
|
i = Invite(**invite)
|
|
invite_list.append(i)
|
|
return invite_list
|
|
data = result.data[0] if isinstance(result.data, list) else result.data
|
|
return Invite(**data)
|
|
if result.message == 'invites are disabled':
|
|
raise FeatureDisabledError(result.message)
|
|
if result.message == 'invalid date':
|
|
raise ValueError(f"{result.status_code}: {result.message}\n{result.data}\n{json}")
|
|
if result.message == 'not an administrator':
|
|
raise Forbidden(result.message)
|
|
raise PyZiplineError(f"{result.status_code}: {result.message}\n{result.data}")
|
|
|
|
def register_user(self, username: str, password: str, invite: str = None, admin: bool = False) -> User:
|
|
"""Register a new user
|
|
|
|
/// admonition | Requires Authentication
|
|
type: warning
|
|
///
|
|
|
|
/// admonition | Parameter Requires Super Administrator
|
|
type: danger
|
|
The authenticated user must be a Super Administrator to use the `admin` parameter.
|
|
///
|
|
|
|
/// admonition | Conditionally Requires Administrator
|
|
type: danger
|
|
The authenticated user must be an Administrator to register a user when registration is disabled.
|
|
///
|
|
|
|
Args:
|
|
username (str): Username to register
|
|
password (str): Password for the new user
|
|
invite (str): Invite code to register the new user with, only required if registration without invites is disabled and the authenticated user is not an administrator
|
|
admin (bool): Whether or not the new user should be an administrator, authenticated user must be a super administrator to create an administrator
|
|
|
|
Raises:
|
|
FeatureDisabledError: Raised when:\n
|
|
- registration or invites are disabled on the Zipline instance and the authenticated user is not an administrator
|
|
- invite code is provided and invites are disabled
|
|
Forbidden: Raised if the authenticated user is not an super administrator and attempts to create an administrator
|
|
PyZiplineError: Raised if the API changes, causing a breaking change in this method
|
|
ValueError: Raised when the username is already taken or if the invite code is invalid/expired
|
|
|
|
Returns:
|
|
User: The newly created user
|
|
"""
|
|
json = {'username': username, 'password': password}
|
|
if invite is not None:
|
|
json['code'] = invite
|
|
if admin:
|
|
json['admin'] = True
|
|
|
|
result: Result = self._rest_adapter.post(endpoint="auth/register", json=json)
|
|
if result.status_code == 200:
|
|
return User(**result.data)
|
|
|
|
if result.message == 'This endpoint is unavailable due to current configurations':
|
|
raise FeatureDisabledError('user registration or invites are disabled')
|
|
|
|
if result.message =='Bad Username/Password':
|
|
if self.check_user_exists(username):
|
|
raise ValueError('username already taken')
|
|
raise FeatureDisabledError('invite code is provided and invites are disabled')
|
|
|
|
if result.message == 'Bad invite':
|
|
raise ValueError('invite code is invalid or expired')
|
|
|
|
if result.message == 'not an administrator':
|
|
raise Forbidden(result.message)
|
|
|
|
raise PyZiplineError(f"{result.status_code}: {result.message}\n{result.data}")
|
|
|
|
def check_user_exists(self, username: str, invite: str = None) -> bool:
|
|
"""Check if a user exists by username
|
|
|
|
Args:
|
|
username (str): Username to check
|
|
invite (str): Invite code to use, only required if registration without invites is disabled
|
|
|
|
Raises:
|
|
FeatureDisabledError: Raised when registration or invites are disabled on the Zipline instance
|
|
PyZiplineError: Raised if the API changes, causing a breaking change in this method
|
|
ValueError: Raised when the username is not present, or the invite code is invalid/not present and invites are enabled
|
|
|
|
Returns:
|
|
bool: True if user exists, False if not
|
|
"""
|
|
json = {'username': username} if invite is None else {'username': username, 'code': invite}
|
|
result: Result = self._rest_adapter.post(endpoint="user/check", json=json)
|
|
if result.status_code == 200:
|
|
return False
|
|
if result.message == 'username already exists':
|
|
return True
|
|
if result.message == 'user registration is disabled':
|
|
raise FeatureDisabledError('user registration or invites are disabled')
|
|
if result.message == 'invalid invite code':
|
|
raise ValueError(result.message + "(most likely doesn't exist)")
|
|
if result.message == 'no code':
|
|
raise ValueError('invite code not provided')
|
|
if result.message == 'no username':
|
|
raise ValueError('username not provided')
|
|
raise PyZiplineError(f"{result.status_code}: {result.message}\n{result.data}")
|
|
|
|
def delete_invite(self, code: str) -> Invite:
|
|
"""Delete an invite code
|
|
|
|
/// admonition | Requires Authentication
|
|
type: warning
|
|
///
|
|
|
|
/// admonition | Requires Administrator
|
|
type: danger
|
|
///
|
|
|
|
Args:
|
|
code (str): Invite code to delete
|
|
|
|
Raises:
|
|
Forbidden: Raised if the authenticated user is not an administrator
|
|
NotFound: Raised if the invite code does not exist
|
|
PyZiplineError: Raised if the API changes, causing a breaking change in this method
|
|
ValueError: Raised if the invite code is not provided
|
|
|
|
Returns:
|
|
Invite: An object containing the deleted invite
|
|
"""
|
|
result: Result = self._rest_adapter.delete(endpoint="auth/invite", params={'code': code})
|
|
if result.status_code == 200:
|
|
return Invite(**result.data)
|
|
if result.message == 'invite not found':
|
|
raise NotFound(result.message)
|
|
if result.message == 'no code':
|
|
raise ValueError('invite code not provided')
|
|
raise PyZiplineError(f"{result.status_code}: {result.message}\n{result.data}")
|
|
|
|
def get_exif(self, file_id: int) -> dict:
|
|
"""Get the EXIF data for a file
|
|
|
|
/// admonition | Requires Authentication
|
|
type: warning
|
|
///
|
|
|
|
Args:
|
|
file_id (int): ID of the file to get EXIF data for
|
|
|
|
Raises:
|
|
Forbidden: The user is not authenticated
|
|
NotFound: The file does not exist
|
|
PyZiplineError: Raised if the API changes, causing a breaking change in this method
|
|
|
|
Returns:
|
|
dict: EXIF data for the file
|
|
"""
|
|
result: Result = self._rest_adapter.get(endpoint="/exif", params={'id': file_id})
|
|
if result.status_code == 200:
|
|
return result.data
|
|
if result.status_code == 401:
|
|
raise Forbidden(result.message)
|
|
if result.message == 'image not found':
|
|
raise NotFound(result.message)
|
|
if result.message == 'image not found on fs':
|
|
raise NotFound('image not found on filesystem')
|
|
raise PyZiplineError(f"{result.status_code}: {result.message}\n{result.data}")
|
|
|
|
def get_files(self, favorite: bool = False, media_only: bool = False) -> list[File]:
|
|
"""Get a list of the files uploaded by the authenticated user
|
|
|
|
/// admonition | Requires Authentication
|
|
type: warning
|
|
///
|
|
|
|
Args:
|
|
favorite (bool): Whether or not to return only favorite files
|
|
media_only (bool): Whether or not to return only media files
|
|
|
|
Raises:
|
|
Forbidden: The user is not authenticated
|
|
PyZiplineError: Raised if the API changes, causing a breaking change in this method
|
|
|
|
Returns:
|
|
File: List of files uploaded by the authenticated user
|
|
"""
|
|
params = {}
|
|
if favorite:
|
|
params['favorite'] = favorite
|
|
if media_only:
|
|
params['media_only'] = media_only
|
|
result: Result = self._rest_adapter.get(endpoint="/user/files", params=params)
|
|
if result.status_code == 200:
|
|
files = []
|
|
for file in result.data:
|
|
f = File(**file)
|
|
files.append(f)
|
|
return files
|
|
if result.status_code == 401:
|
|
raise Forbidden(result.message)
|
|
|
|
def get_invites(self) -> list[Invite]:
|
|
"""Get a list of invites
|
|
|
|
/// admonition | Requires Authentication
|
|
type: warning
|
|
///
|
|
|
|
Raises:
|
|
Forbidden: The user is not authenticated
|
|
FeatureDisabledError: Invites are disabled on the Zipline instance
|
|
PyZiplineError: Raised if the API changes, causing a breaking change in this method
|
|
|
|
Returns:
|
|
Invite: List of invites
|
|
"""
|
|
result = self._rest_adapter.get(endpoint="auth/invite")
|
|
if result.status_code == 200:
|
|
invites = []
|
|
for invite in result.data:
|
|
i = Invite(**invite)
|
|
invites.append(i)
|
|
return invites
|
|
if result.status_code == 401:
|
|
raise Forbidden(result.message)
|
|
if result.message == 'invites are disabled':
|
|
raise FeatureDisabledError(result.message)
|
|
raise PyZiplineError(f"{result.status_code}: {result.message}\n{result.data}")
|
|
|
|
|
|
def get_self(self) -> User:
|
|
"""Get the currently authenticated user
|
|
|
|
/// admonition | Requires Authentication
|
|
type: warning
|
|
///
|
|
|
|
Raises:
|
|
Forbidden: The user is not authenticated
|
|
PyZiplineError: Raised if the API changes, causing a breaking change in this method
|
|
|
|
Returns:
|
|
User: The currently authenticated user
|
|
"""
|
|
result = self._rest_adapter.get(endpoint="user")
|
|
if result.status_code == 200:
|
|
return User(**result.data)
|
|
if result.status_code == 401:
|
|
raise Forbidden(result.message)
|
|
raise PyZiplineError(f"{result.status_code}: {result.message}\n{result.data}")
|
|
|
|
def get_user(self, user_id: int) -> User:
|
|
"""Get a user by ID
|
|
|
|
/// admonition | Requires Administrator
|
|
type: danger
|
|
///
|
|
|
|
Args:
|
|
user_id (int): Integer ID of the user to retrieve
|
|
|
|
Raises:
|
|
Forbidden: Raised if the authenticated user is not an administrator
|
|
PyZiplineError: Raised if the API changes, causing a breaking change in this method
|
|
|
|
Returns:
|
|
User: The user with the given ID
|
|
"""
|
|
result = self._rest_adapter.get(endpoint=f"user/{user_id}")
|
|
if result.status_code == 200:
|
|
return User(**result.data)
|
|
if result.status_code == 403:
|
|
raise Forbidden(result.message)
|
|
raise PyZiplineError(f"{result.status_code}: {result.message}\n{result.data}")
|
|
|
|
def get_users(self) -> list[User]:
|
|
"""Get a list of users
|
|
|
|
/// admonition | Requires Administrator
|
|
type: danger
|
|
///
|
|
|
|
Raises:
|
|
Forbidden: Raised if the authenticated user is not an administrator
|
|
PyZiplineError: Raised if the API changes, causing a breaking change in this method
|
|
|
|
Returns:
|
|
list[User]: List of users
|
|
"""
|
|
result = self._rest_adapter.get(endpoint="users")
|
|
if result.status_code == 200:
|
|
users = []
|
|
for user in result.data:
|
|
u = User(**user)
|
|
users.append(u)
|
|
return users
|
|
if result.status_code == 403:
|
|
raise Forbidden(result.message)
|
|
raise PyZiplineError(f"{result.status_code}: {result.message}\n{result.data}")
|
|
|
|
def get_stats(self, amount: int = 1, force_update: bool = False) -> Union[Stats, List[Stats]]:
|
|
"""Get statistics about the Zipline instance
|
|
|
|
/// admonition | Requires Authentication
|
|
type: warning
|
|
///
|
|
|
|
/// admonition | Parameter Requires Administrator
|
|
type: danger
|
|
The authenticated user must be an administrator to use the `force_update` argument.
|
|
///
|
|
|
|
/// admonition | Configuration Varies
|
|
type: note
|
|
The endpoint this method uses, `/api/stats`, relies a lot on Zipline's [configuration](https://zipline.diced.sh/docs/config/website#website_show_files_per_user) to determine who can access the endpoint and what the endpoint returns depending on permission level.
|
|
Please bear this in mind when using this method.
|
|
///
|
|
|
|
Args:
|
|
amount (int ): Number of stats to retrieve
|
|
force_update (bool): Force the Zipline instance to update its statistics before returning them, requires administrator
|
|
|
|
Raises:
|
|
Forbidden: The user is not authenticated, or the user is not an administrator and `force_update` is True
|
|
PyZiplineError: Raised if the API changes, causing a breaking change in this method
|
|
ValueError: Raised if amount is less than 1
|
|
|
|
Returns:
|
|
Stats: Statistics about the Zipline instance"""
|
|
if amount < 1:
|
|
raise ValueError('amount must be greater than 0')
|
|
if force_update:
|
|
result = self._rest_adapter.post(endpoint="stats", params={'amount': amount})
|
|
else:
|
|
result = self._rest_adapter.get(endpoint="stats", params={'amount': amount})
|
|
if result.status_code == 200:
|
|
if amount > 1:
|
|
stats_list = []
|
|
for stats in result.data:
|
|
s = Stats(**stats)
|
|
stats_list.append(s)
|
|
return stats_list
|
|
data = result.data[0] if isinstance(result.data, list) else result.data
|
|
return Stats(**data)
|
|
if result.status_code in (401, 403):
|
|
raise Forbidden(result.message)
|
|
raise PyZiplineError(f"{result.status_code}: {result.message}\n{result.data}")
|
|
|
|
def get_version(self) -> Version:
|
|
"""Get the version of the Zipline instance
|
|
|
|
/// admonition | Requires Authentication
|
|
type: warning
|
|
///
|
|
|
|
Raises:
|
|
Forbidden: The user is not authenticated
|
|
PyZiplineError: Raised if the API changes, causing a breaking change in this method
|
|
|
|
Returns:
|
|
Version: The version of the Zipline instance"""
|
|
result = self._rest_adapter.get(endpoint="version")
|
|
if result.status_code == 200:
|
|
return Version(**result.data)
|
|
if result.status_code == 401:
|
|
raise Forbidden(result.message)
|
|
raise PyZiplineError(f"{result.status_code}: {result.message}\n{result.data}")
|