import json import time from datetime import datetime import mysql.connector from discord import Guild from .utils import generate_dict, get_next_case_number, check_conf from .config import config from .logger import logger async def connect(): """Connects to the MySQL database, and returns a connection object.""" conf = await check_conf( ["mysql_address", "mysql_database", "mysql_username", "mysql_password"] ) if conf: raise LookupError("MySQL connection details not set properly!") try: connection = mysql.connector.connect( host=await config.mysql_address(), user=await config.mysql_username(), password=await config.mysql_password(), database=await config.mysql_database(), ) return connection except mysql.connector.ProgrammingError as e: logger.error("Unable to access the MySQL database!\nError:\n%s", e.msg) raise ConnectionRefusedError( f"Unable to access the MySQL Database!\n{e.msg}" ) from e async def create_guild_table(guild: Guild): database = await connect() cursor = database.cursor() try: cursor.execute(f"SELECT * FROM `moderation_{guild.id}`") logger.debug("MySQL Table exists for server %s (%s)", guild.name, guild.id) except mysql.connector.errors.ProgrammingError: query = f""" CREATE TABLE `moderation_{guild.id}` ( moderation_id INT UNIQUE PRIMARY KEY NOT NULL, timestamp INT NOT NULL, moderation_type LONGTEXT NOT NULL, target_id LONGTEXT NOT NULL, moderator_id LONGTEXT NOT NULL, role_id LONGTEXT, duration LONGTEXT, end_timestamp INT, reason LONGTEXT, resolved BOOL NOT NULL, resolved_by LONGTEXT, resolve_reason LONGTEXT, expired BOOL NOT NULL, changes JSON NOT NULL, metadata JSON NOT NULL ) """ cursor.execute(query) index_query_1 = "CREATE INDEX idx_target_id ON moderation_%s(target_id(25));" cursor.execute(index_query_1, (guild.id,)) index_query_2 = ( "CREATE INDEX idx_moderator_id ON moderation_%s(moderator_id(25));" ) cursor.execute(index_query_2, (guild.id,)) index_query_3 = ( "CREATE INDEX idx_moderation_id ON moderation_%s(moderation_id);" ) cursor.execute(index_query_3, (guild.id,)) insert_query = f""" INSERT INTO `moderation_{guild.id}` (moderation_id, timestamp, moderation_type, target_id, moderator_id, role_id, duration, end_timestamp, reason, resolved, resolved_by, resolve_reason, expired, changes, metadata) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s) """ insert_values = ( 0, 0, "NULL", 0, 0, 0, "NULL", 0, "NULL", 0, "NULL", "NULL", 0, json.dumps([]), # pylint: disable=dangerous-default-value json.dumps({}), # pylint: disable=dangerous-default-value ) cursor.execute(insert_query, insert_values) database.commit() logger.debug( "MySQL Table (moderation_%s) created for %s (%s)", guild.id, guild.name, guild.id, ) database.close() async def mysql_log( guild_id: str, author_id: str, moderation_type: str, target_id: int, role_id: int, duration, reason: str, database: mysql.connector.MySQLConnection = None, timestamp: int = None, resolved: bool = False, resolved_by: str = None, resolved_reason: str = None, expired: bool = None, changes: list = [], metadata: dict = {}, ): # pylint: disable=dangerous-default-value if not timestamp: timestamp = int(time.time()) if duration != "NULL": end_timedelta = datetime.fromtimestamp(timestamp) + duration end_timestamp = int(end_timedelta.timestamp()) else: end_timestamp = 0 if not expired: if int(time.time()) > end_timestamp: expired = 1 else: expired = 0 if resolved_by is None: resolved_by = "NULL" if resolved_reason is None: resolved_reason = "NULL" if not database: database = await connect() close_db = True else: close_db = False cursor = database.cursor() moderation_id = await get_next_case_number(guild_id=guild_id, cursor=cursor) sql = f"INSERT INTO `moderation_{guild_id}` (moderation_id, timestamp, moderation_type, target_id, moderator_id, role_id, duration, end_timestamp, reason, resolved, resolved_by, resolve_reason, expired, changes, metadata) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)" val = ( moderation_id, timestamp, moderation_type, target_id, author_id, role_id, duration, end_timestamp, reason, int(resolved), resolved_by, resolved_reason, expired, json.dumps(changes), json.dumps(metadata), ) cursor.execute(sql, val) cursor.close() database.commit() if close_db: database.close() logger.debug( "MySQL row inserted into moderation_%s!\n%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s", guild_id, moderation_id, timestamp, moderation_type, target_id, author_id, role_id, duration, end_timestamp, reason, int(resolved), resolved_by, resolved_reason, expired, changes, metadata, ) return moderation_id async def fetch_case(moderation_id: int, guild_id: str): """This method fetches a case from the database and returns the case's dictionary.""" database = await connect() cursor = database.cursor() query = "SELECT * FROM moderation_%s WHERE moderation_id = %s;" cursor.execute(query, (guild_id, moderation_id)) result = cursor.fetchone() cursor.close() database.close() return generate_dict(result)