holy changes
Some checks failed
Pylint / Pylint (3.12) (push) Failing after 36s

This commit is contained in:
Seaswimmer 2023-12-19 05:36:18 -05:00
parent 0c370cef84
commit 542bd06f09
Signed by: cswimr
GPG key ID: 1EBC234EEDA901AE
5 changed files with 330 additions and 20 deletions

View file

@ -3,3 +3,15 @@ class KwargConflict(Exception):
Raised when the keyword arguments passed to a function conflict. Raised when the keyword arguments passed to a function conflict.
""" """
pass pass
class HTTPFailure(Exception):
"""
Raised when an HTTP request fails.
"""
pass
class PyZiplineError(Exception):
"""
Raised when an error occurs in the PyZipline library.
"""
pass

205
pyzipline/models.py Normal file
View file

@ -0,0 +1,205 @@
from typing import List, Dict, Union
from datetime import datetime
class Embed:
def __init__(
self,
color: str,
title: str,
siteName: str,
description: str,
**kwargs
):
"""Embed object used for checking embeds
:param color: String of the embed's color
:param title: String of the embed's title
:param siteName: String of the embed's site name
:param description: String of the embed's description
"""
self.color = color
self.title = title
self.siteName = siteName
self.description = description
self.__dict__.update(kwargs)
class File:
def __init__(
self,
createdAt: datetime,
id: int,
mimetype: str,
views: int,
name: str,
size: int,
favorite: bool,
originalName: str = None,
url: str = None,
maxViews: int = None,
expiredAt: Union[datetime, None] = None,
thumbnail: str = None,
folderId: int = None,
**kwargs
):
"""File object used for uploading files to Zipline
:param createdAt: Datetime object of when the file was created
:param id: Integer ID of the file
:param mimetype: String of the file's mimetype
:param views: Integer of the number of views the file has
:param name: String of the file's name
:param size: Integer of the file's size in bytes
:param favorite: Boolean of whether the file is favorited
:param originalName: (optional) String of the file's original name
:param url: (optional) String of the file's URL
:param maxViews: (optional) Integer of the file's maximum number of views
:param expiredAt: (optional) Datetime object of when the file will expire
:param thumbnail: (optional) String of the file's thumbnail URL
:param folderId: (optional) Integer of the file's folder ID
"""
self.createdAt = createdAt
self.id = id
self.mimetype = mimetype
self.views = views
self.name = name
self.size = size
self.favorite = favorite
self.originalName = originalName
self.url = url
self.maxViews = maxViews
self.expiredAt = expiredAt
self.thumbnail = thumbnail
self.folderId = folderId
self.__dict__.update(kwargs)
class Result:
def __init__(self, status_code: int, message: str = '', data: List[Dict] = None):
"""Result returned from low-level RestAdapter
:param status_code: Standard HTTP Status code
:param message: Human readable result
:param data: Python List of Dictionaries (or maybe just a single Dictionary on error)
"""
self.status_code = int(status_code)
self.message = str(message)
self.data = data if data else []
class Invite:
def __init__(
self,
id: int,
code: str,
createdAt: datetime,
expiredAt: datetime,
used: bool,
createdById: int,
**kwargs
):
"""Invite object used for managing invites
:param id: Integer ID of the invite
:param code: String of the invite's code
:param createdAt: Datetime object of when the invite was created
:param expiredAt: Datetime object of when the invite will expire
:param used: Boolean of whether the invite has been used
:param createdById: Integer ID of the user who created the invite
"""
self.id = id
self.code = code
self.createdAt = createdAt
self.expiredAt = expiredAt
self.used = used
self.createdById = createdById
self.__dict__.update(kwargs)
class OAuth:
def __init__(
self,
id: int,
provider: str,
userId: int,
providerId: str,
username: str,
token: str,
refresh: str,
**kwargs
):
"""OAuth object used for managing OAuth
:param id: Integer ID of the OAuth
:param provider: String of the OAuth's provider, one of 'DISCORD', 'GITHUB', 'GOOGLE'
:param userId: Integer ID of the user who owns the OAuth
:param providerId: String of the OAuth's provider ID
:param username: String of the OAuth's connected account's username
:param token: String of the OAuth's access token
:param refresh: String of the OAuth's refresh token
"""
self.id = id
self.provider = provider
self.userId = userId
self.providerId = providerId
self.username = username
self.token = token
self.refresh = refresh
self.__dict__.update(kwargs)
class User:
def __init__(
self,
id: int,
uuid: str,
username: str,
avatar: str,
token: str,
administrator: bool,
superAdmin: bool,
systemTheme: str,
embed: Embed,
totpSecret: str,
domains: List[str],
oauth: Union[List['OAuth'], None] = None,
ratelimit: [datetime, None] = None,
**kwargs
):
"""User object used for managing users
:param id: Integer ID of the user
:param uuid: String of the user's UUID
:param username: String of the user's username
:param avatar: String of the user's avatar, base64 encoded
:param token: String of the user's token
:param administrator: Boolean of whether the user is an administrator
:param superAdmin: Boolean of whether the user is a super administrator
:param systemTheme: String of the user's system theme
:param embed: Embed object of the user's embed
:param totpSecret: String of the user's TOTP secret
:param domains: List of Strings of the user's domains
:param oauth: (optional) List of OAuth objects
:param ratelimit: (optional) Datetime object of when the user's ratelimit expires
"""
self.id = id
self.uuid = uuid
self.username = username
self.avatar = avatar
self.token = token
self.administrator = administrator
self.superAdmin = superAdmin
self.systemTheme = systemTheme
self.embed = embed
self.totpSecret = totpSecret
self.domains = domains
self.oauth = oauth
self.ratelimit = ratelimit
self.__dict__.update(kwargs)

View file

@ -1,33 +1,77 @@
from typing import Dict, List import logging
from typing import Dict
from json import JSONDecodeError
import requests import requests
from urllib3 import disable_warnings from urllib3 import disable_warnings
from .errors import KwargConflict from pyzipline.errors import KwargConflict, HTTPFailure, PyZiplineError
from pyzipline.models import Result
class RestAdapter: class RestAdapter:
def __init__(self, hostname: str, api_key: str = '', ssl: bool = True, enforced_signing: bool = True): def __init__(self, hostname: str, token: str = '', ssl: bool = True, enforced_signing: bool = True, logger: logging.Logger = None):
"""Create a new RestAdapter instance, to interact with the REST API of a Zipline server. """Constructor for RestAdapter
:param hostname: The hostname of the Zipline server :param hostname: The hostname of your Zipline instance, WITHOUT https or http.
:param api_key: The API key to use for authentication :param token: (optional) String used for authentication when making requests.
:param ssl: Whether to use SSL :param ssl: (optional) Normally set to True, but if your Zipline instance doesn't use SSL/TLS, set this to False.
:param enforced_signing: Whether to enforce SSL certificate signing :param enforced_signing: (optional) Normally set to True, but if having SSL/TLS cert validation issues, can turn off with False.
:param logger: (optional) If your app has a logger, pass it in here.
""" """
self.url = f"http{'s' if ssl else ''}://{hostname}/" self._url = f"http{'s' if ssl else ''}://{hostname}/api/"
self.api_key = api_key self._token = token
self.ssl = ssl self._ssl = ssl
self.enforced_signing = enforced_signing self._enforced_signing = enforced_signing
self._logger = logger or logging.getLogger(__name__)
if ssl is False and enforced_signing is True: if ssl is False and enforced_signing is True:
raise KwargConflict("Cannot enforce signing without SSL") raise KwargConflict("Cannot enforce signing without SSL")
if not ssl and not enforced_signing: if not ssl and not enforced_signing:
disable_warnings() disable_warnings()
def get(self, endpoint: str) -> List[Dict]: def _do(self, http_method: str, endpoint: str, params: Dict = None, data: Dict = None) -> Result:
full_url = self.url + endpoint """Make a request to the Zipline server."""
headers = {'Authorization': self.api_key} full_url = self._url + endpoint
response = requests.get(url=full_url, verify=self.enforced_signing, headers=headers) headers = {'Authorization': self._token}
log_line_pre = f"method={http_method}, url={full_url}, params={params}"
log_line_post = ', '.join((log_line_pre, "success={}, status_code={}, message={}"))
try: # Log HTTP params and perform an HTTP request, catching and re-raising any exceptions
self._logger.debug(msg=log_line_pre)
# will eventually refactor this to use asyncio/aiohttp instead for async operation
response = requests.request(method=http_method, url=full_url, verify=self._enforced_signing, params=params, headers=headers, json=data)
except requests.exceptions.RequestException as e:
self._logger.error(msg=(str(e)))
raise HTTPFailure("Could not connect to Zipline server") from e
try: # Deserialize JSON output to Python object, or return failed Result on exception
data_out = response.json() data_out = response.json()
if response.status_code >= 200 and response.status_code <= 299: # OK except (ValueError, JSONDecodeError) as e:
return data_out self._logger.error(msg=log_line_post.format(False, None, e))
raise Exception(data_out["message"]) # Todo: raise custom exception later raise PyZiplineError("Could not decode response from Zipline server") from e
# If status_code in 200-299 range, return success Result with data, otherwise raise exception
is_success = 299 >= response.status_code >= 200
log_line = log_line_post.format(is_success, response.status_code, response.reason)
if is_success:
self._logger.debug(msg=log_line_post.format(is_success, response.status_code, response.reason))
return Result(status_code=response.status_code, message=response.reason, data=data_out)
self._logger.error(msg=log_line)
raise PyZiplineError(f"{response.status_code}: {response.reason}")
def get(self, endpoint: str, params: Dict = None) -> Result:
"""Make a GET request to the Zipline server."""
return self._do(http_method='GET', endpoint=endpoint, params=params)
def post(self, endpoint: str, params: Dict = None, data: Dict = None) -> Result:
"""Make a POST request to the Zipline server."""
return self._do(http_method='POST', endpoint=endpoint, params=params, data=data)
def delete(self, endpoint: str, params: Dict = None, data: Dict = None) -> Result:
"""Make a DELETE request to the Zipline server."""
return self._do(http_method='DELETE', endpoint=endpoint, params=params, data=data)

9
pyzipline/utils.py Normal file
View file

@ -0,0 +1,9 @@
from datetime import datetime
def convert_str_to_datetime(date_string: str) -> datetime:
"""Converts a string to a datetime object
:param date_string: String to convert
:return: Datetime object
"""
return datetime.strptime(date_string, '%Y-%m-%dT%H:%M:%S.%fZ')

40
pyzipline/zipline.py Normal file
View file

@ -0,0 +1,40 @@
import logging
from pyzipline.rest_adapter import RestAdapter
from pyzipline.errors import PyZiplineError
from pyzipline.models import *
class ZiplineApi:
def __init__(
self,
hostname: str,
token: str = '',
ssl: bool = True,
enforced_signing: bool = True,
logger: logging.Logger = None
):
"""Constructor for ZiplineApi
:param hostname: The hostname of your Zipline instance, WITHOUT https or http.
:param token: (optional) String used for authentication when making requests.
:param ssl: (optional) Normally set to True, but if your Zipline instance doesn't use SSL/TLS, set this to False.
:param enforced_signing: (optional) Normally set to True, but if having SSL/TLS cert validation issues, can turn off with False.
:param logger: (optional) If your app has a logger, pass it in here.
"""
self._rest_adapter = RestAdapter(hostname=hostname, token=token, ssl=ssl, enforced_signing=enforced_signing, logger=logger)
def get_user(self, user_id: int) -> User:
"""Get a user by ID
:param user_id: Integer ID of the user
:return: User object
"""
result = self._rest_adapter.get(endpoint=f"user/{user_id}")
return User(**result.data)
def get_self(self) -> User:
"""Get the currently authenticated user
:return: User object
"""
result = self._rest_adapter.get(endpoint=f"user")
return User(**result.data)