140 lines
7.8 KiB
Python
140 lines
7.8 KiB
Python
"""This module contains the RestAdapter class, which is used to make requests to the Zipline server."""""
|
|
import logging
|
|
from json import JSONDecodeError
|
|
|
|
import requests
|
|
from urllib3 import disable_warnings
|
|
|
|
from pyzipline.exceptions import HTTPFailure, PyZiplineError
|
|
from pyzipline.models import Result
|
|
|
|
class RestAdapter:
|
|
"""Constructor for RestAdapter
|
|
|
|
Args:
|
|
hostname (str): The hostname of your Zipline instance, WITHOUT https or http.
|
|
token (str = None): String used for authentication when making requests.
|
|
ssl (bool = True): Normally set to True, but if your Zipline instance doesn't use SSL/TLS, set this to False.
|
|
enforced_signing (bool = True): Normally set to True, but if having SSL/TLS cert validation issues, can turn off with False.
|
|
logger (logging.Logger = None): If your app has a logger, pass it in here.
|
|
|
|
Raises:
|
|
ValueError: Raised when the keyword arguments passed to the class constructor conflict.
|
|
"""
|
|
def __init__(self, hostname: str, token: str = '', ssl: bool = True, enforced_signing: bool = True, logger: logging.Logger = None):
|
|
self._url = f"http{'s' if ssl else ''}://{hostname}/api/"
|
|
self._token = token
|
|
self._ssl = ssl
|
|
self._enforced_signing = enforced_signing
|
|
self._logger = logger or logging.getLogger(__name__)
|
|
|
|
if ssl is False and enforced_signing is True:
|
|
raise ValueError("Cannot enforce signing without SSL")
|
|
|
|
if not ssl and not enforced_signing:
|
|
disable_warnings()
|
|
|
|
def _do(self, http_method: str, endpoint: str, headers: dict = None, params: dict = None, json: dict = None, files: dict = None, timeout: float = 60) -> Result:
|
|
"""Internal method to make a request to the Zipline server. You shouldn't use this directly.
|
|
|
|
Args:
|
|
http_method (str): The [HTTP method](https://developer.mozilla.org/en-US/docs/Web/HTTP/Methods) to use
|
|
endpoint (str): The endpoint to make the request to.
|
|
headers (dict): Python dictionary of headers to send with the request.
|
|
params (dict): Python dictionary of query parameters to send with the request.
|
|
json (dict): Python dictionary of JSON-serializable data to send with the request.
|
|
files (dict): Python dictionary of files to send with the request.
|
|
timeout (float): Number of seconds to wait for the request to complete.
|
|
|
|
Returns:
|
|
Result: A Result object containing the status code, message, and data from the request.
|
|
|
|
Raises:
|
|
HTTPError: Raised when an HTTP request fails.
|
|
PyZiplineError: Raised when an error occurs in the PyZipline library.
|
|
"""
|
|
full_url = self._url + endpoint
|
|
if not headers:
|
|
headers = {}
|
|
headers.update({'Authorization': self._token})
|
|
|
|
try: # Perform an HTTP request, catching and re-raising any exceptions
|
|
# will eventually refactor this to use asyncio/aiohttp instead for async operation
|
|
response = requests.request(method=http_method, url=full_url, verify=self._enforced_signing, params=params, headers=headers, json=json, files=files, timeout=timeout)
|
|
except requests.exceptions.RequestException as e:
|
|
self._logger.error(msg=str(e))
|
|
raise HTTPFailure("Could not connect to Zipline server") from e
|
|
|
|
try: # Deserialize JSON output to Python object, or return failed Result on exception
|
|
data_out = response.json()
|
|
except (ValueError, JSONDecodeError) as e:
|
|
if response.headers.get('Content-Type') == 'application/octet-stream':
|
|
# If the response contains a valid file, return success Result with file content
|
|
return Result(success=True, status_code=response.status_code, message=response.reason, data=response.content)
|
|
raise PyZiplineError("Could not decode response from Zipline server") from e
|
|
|
|
# If status_code in 200-299 range, return success Result with data, otherwise return failed Result with message
|
|
is_success = 299 >= response.status_code >= 200
|
|
|
|
if is_success:
|
|
return Result(success=is_success, status_code=response.status_code, message=response.reason, data=data_out)
|
|
|
|
return Result(success=is_success, status_code=response.status_code, message=data_out['error'], data=data_out)
|
|
|
|
def delete(self, endpoint: str, headers: dict = None, params: dict = None, json: dict = None, timeout: float = 60) -> Result:
|
|
"""Make a DELETE request to the Zipline server. You should almost never have to use this directly.
|
|
|
|
Args:
|
|
endpoint (str): The endpoint to make the request to.
|
|
params (dict): Python dictionary of query parameters to send with the request.
|
|
json (dict): Python dictionary of JSON-serializable data to send with the request.
|
|
timeout (float): Number of seconds to wait for the request to complete.
|
|
|
|
Returns:
|
|
Result: A Result object containing the status code, message, and data from the request.
|
|
"""
|
|
return self._do(http_method='DELETE', endpoint=endpoint, headers=headers, params=params, json=json, timeout=timeout)
|
|
|
|
def get(self, endpoint: str, headers: dict = None, params: dict = None, timeout: float = 60) -> Result:
|
|
"""Make a GET request to the Zipline server. You should almost never have to use this directly.
|
|
|
|
Args:
|
|
endpoint (str): The endpoint to make the request to.
|
|
params (dict): Python dictionary of query parameters to send with the request.
|
|
timeout (float): Number of seconds to wait for the request to complete.
|
|
|
|
Returns:
|
|
Result: A Result object containing the status code, message, and data from the request.
|
|
"""
|
|
return self._do(http_method='GET', endpoint=endpoint, headers=headers, params=params, timeout=timeout)
|
|
|
|
def patch(self, endpoint: str, headers: dict = None, params: dict = None, json: dict = None, files: dict = None, timeout: float = 60) -> Result:
|
|
"""Make a PATCH request to the Zipline server. You should almost never have to use this directly.
|
|
|
|
Args:
|
|
endpoint (str): The endpoint to make the request to.
|
|
params (dict): Python dictionary of query parameters to send with the request.
|
|
json (dict): Python dictionary of JSON-serializable data to send with the request.
|
|
files (dict): Python dictionary of files to send with the request.
|
|
timeout (float): Number of seconds to wait for the request to complete.
|
|
|
|
Returns:
|
|
Result: A Result object containing the status code, message, and data from the request.
|
|
"""
|
|
return self._do(http_method='PATCH', endpoint=endpoint, headers=headers, params=params, json=json, files=files, timeout=timeout)
|
|
|
|
def post(self, endpoint: str, headers: dict = None, params: dict = None, json: dict = None, files: dict = None, timeout: float = 60) -> Result:
|
|
"""Make a POST request to the Zipline server. You should almost never have to use this directly.
|
|
|
|
Args:
|
|
endpoint (str): The endpoint to make the request to.
|
|
headers (dict): Python dictionary of headers to send with the request.
|
|
params (dict): Python dictionary of query parameters to send with the request.
|
|
json (dict): Python dictionary of JSON-serializable data to send with the request.
|
|
files (dict): Python dictionary of files to send with the request.
|
|
timeout (float): Number of seconds to wait for the request to complete.
|
|
|
|
Returns:
|
|
Result: A Result object containing the status code, message, and data from the request.
|
|
"""
|
|
return self._do(http_method='POST', endpoint=endpoint, headers=headers, params=params, json=json, files=files, timeout=timeout)
|