SeaCogs/aurora/models/moderation.py

395 lines
14 KiB
Python

import json
import sqlite3
from datetime import datetime, timedelta
from sqlite3 import Cursor
from time import time
from typing import Dict, Iterable, List, Optional, Tuple, Union
import discord
from discord import NotFound
from redbot.core.bot import Red
from ..utilities.logger import logger
from ..utilities.utils import timedelta_to_string
from .base import AuroraGuildModel
from .change import Change
from .partials import PartialChannel, PartialRole, PartialUser
class Moderation(AuroraGuildModel):
moderation_id: int
timestamp: datetime
moderation_type: str
target_type: str
target_id: int
moderator_id: int
role_id: Optional[int] = None
duration: Optional[timedelta] = None
end_timestamp: Optional[datetime] = None
reason: Optional[str] = None
resolved: bool
resolved_by: Optional[int] = None
resolve_reason: Optional[str] = None
expired: bool
changes: List["Change"]
metadata: Dict
@property
def id(self) -> int:
return self.moderation_id
@property
def type(self) -> str:
return self.moderation_type
@property
def unix_timestamp(self) -> int:
return int(self.timestamp.timestamp())
async def get_moderator(self) -> "PartialUser":
return await PartialUser.from_id(self.bot, self.moderator_id)
async def get_target(self) -> Union["PartialUser", "PartialChannel"]:
if self.target_type == "USER":
return await PartialUser.from_id(self.bot, self.target_id)
return await PartialChannel.from_id(self.bot, self.target_id)
async def get_resolved_by(self) -> Optional["PartialUser"]:
if self.resolved_by:
return await PartialUser.from_id(self.bot, self.resolved_by)
return None
async def get_role(self) -> Optional["PartialRole"]:
if self.role_id:
return await PartialRole.from_id(self.bot, self.guild_id, self.role_id)
return None
def __str__(self) -> str:
return f"{self.moderation_type} {self.target_type} {self.target_id} {self.reason}"
def __int__(self) -> int:
return self.moderation_id
async def resolve(self, resolved_by: int, reason: str) -> None:
if self.resolved:
raise ValueError("Case is already resolved!")
self.resolved = True
self.resolved_by = resolved_by
self.resolve_reason = reason
if self.type in ["UNMUTE", "UNBAN"]:
raise TypeError("Cannot resolve an unmute or unban case!")
if self.type == "MUTE":
try:
guild: discord.Guild = await self.bot.fetch_guild(self.guild_id)
member = await guild.fetch_member(self.target_id)
await member.timeout(
None, reason=f"Case {self.moderation_id} resolved by {resolved_by}{' for '+ reason if reason else ''}"
)
except NotFound:
pass
if self.type in ["BAN", "TEMPBAN"]:
try:
guild: discord.Guild = await self.bot.fetch_guild(self.guild_id)
await guild.unban(await self.get_target(), reason=f"Case {self.moderation_id} resolved by {resolved_by}{' for '+ reason if reason else ''}")
except NotFound:
pass
if not self.changes:
self.changes.append(Change.from_dict(self.bot, {
"type": "ORIGINAL",
"timestamp": self.timestamp,
"reason": self.reason,
"user_id": self.moderator_id,
"duration": self.duration,
"end_timestamp": self.end_timestamp,
}))
self.changes.append(Change.from_dict(self.bot, {
"type": "RESOLVE",
"timestamp": datetime.now(),
"reason": reason,
"user_id": resolved_by,
}))
self.update()
def update(self) -> None:
from ..utilities.database import connect
from ..utilities.json import dumps
query = f"UPDATE moderation_{self.guild_id} SET timestamp = ?, moderation_type = ?, target_type = ?, moderator_id = ?, role_id = ?, duration = ?, end_timestamp = ?, reason = ?, resolved = ?, resolved_by = ?, resolve_reason = ?, expired = ?, changes = ?, metadata = ? WHERE moderation_id = ?;"
with connect() as database:
database.execute(query, (
self.timestamp.timestamp(),
self.moderation_type,
self.target_type,
self.moderator_id,
self.role_id,
str(self.duration) if self.duration else None,
self.end_timestamp.timestamp() if self.end_timestamp else None,
self.reason,
self.resolved,
self.resolved_by,
self.resolve_reason,
self.expired,
dumps(self.changes),
dumps(self.metadata),
self.moderation_id,
))
logger.debug("Row updated in moderation_%s!\n%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s",
self.moderation_id,
self.guild_id,
self.timestamp.timestamp(),
self.moderation_type,
self.target_type,
self.moderator_id,
self.role_id,
str(self.duration) if self.duration else None,
self.end_timestamp.timestamp() if self.end_timestamp else None,
self.reason,
self.resolved,
self.resolved_by,
self.resolve_reason,
self.expired,
dumps(self.changes),
dumps(self.metadata),
)
@classmethod
def from_dict(cls, bot: Red, data: dict) -> "Moderation":
return cls(bot=bot, **data)
@classmethod
def from_result(cls, bot: Red, result: Iterable, guild_id: int) -> "Moderation":
if result[7] is not None and result[7] != "NULL":
hours, minutes, seconds = map(int, result[7].split(':'))
duration = timedelta(hours=hours, minutes=minutes, seconds=seconds)
else:
duration = None
if result[14] is not None:
changes = json.loads(result[14])
change_obj_list = []
if changes:
for change in changes:
change_obj_list.append(Change.from_dict(bot=bot, data=change))
if result[15] is not None:
metadata = json.loads(result[15])
else:
metadata = {}
case = {
"moderation_id": int(result[0]),
"guild_id": int(guild_id),
"timestamp": datetime.fromtimestamp(result[1]),
"moderation_type": str(result[2]),
"target_type": str(result[3]),
"target_id": int(result[4]),
"moderator_id": int(result[5]),
"role_id": int(result[6]) if result[6] is not None else None,
"duration": duration,
"end_timestamp": datetime.fromtimestamp(result[8]) if result[8] is not None else None,
"reason": result[9],
"resolved": bool(result[10]),
"resolved_by": result[11],
"resolve_reason": result[12],
"expired": bool(result[13]),
"changes": change_obj_list,
"metadata": metadata if metadata else {},
}
return cls.from_dict(bot=bot, data=case)
@classmethod
def execute(cls, bot: Red, guild_id: int, query: str, parameters: tuple | None = None, cursor: Cursor | None = None) -> Tuple["Moderation"]:
from ..utilities.database import connect
logger.trace("Executing query: %s", query)
logger.trace("With parameters: %s", parameters)
if not parameters:
parameters = ()
if not cursor:
no_cursor = True
database = connect()
cursor = database.cursor()
else:
no_cursor = False
cursor.execute(query, parameters)
results = cursor.fetchall()
if no_cursor:
cursor.close()
database.close()
if results:
cases = []
for result in results:
case = cls.from_result(bot=bot, result=result, guild_id=guild_id)
if case.moderation_id != 0:
cases.append(case)
return tuple(cases)
return ()
@classmethod
def get_latest(cls, bot: Red, guild_id: int, limit: int | None = None, offset: int = 0, types: Iterable | None = None, cursor: Cursor | None = None) -> Tuple["Moderation"]:
params = []
query = f"SELECT * FROM moderation_{guild_id} ORDER BY moderation_id DESC"
if types:
query += f" WHERE moderation_type IN ({', '.join(['?' for _ in types])})"
params.extend(types)
if limit:
query += " LIMIT ? OFFSET ?"
params.extend((limit, offset))
query += ";"
return cls.execute(bot=bot, guild_id=guild_id, query=query, parameters=tuple(params) if params else (), cursor=cursor)
@classmethod
def get_next_case_number(cls, bot: Red, guild_id: int, cursor: Cursor | None = None) -> int:
result = cls.get_latest(bot=bot, guild_id=guild_id, cursor=cursor, limit=1)
return (result[0].moderation_id + 1) if result else 1
@classmethod
def find_by_id(cls, bot: Red, moderation_id: int, guild_id: int, cursor: Cursor | None = None) -> "Moderation":
query = f"SELECT * FROM moderation_{guild_id} WHERE moderation_id = ?;"
case = cls.execute(bot=bot, guild_id=guild_id, query=query, parameters=(moderation_id,), cursor=cursor)
if case:
return case[0]
raise ValueError(f"Case {moderation_id} not found in moderation_{guild_id}!")
@classmethod
def find_by_target(cls, bot: Red, guild_id: int, target: int, types: Iterable | None = None, cursor: Cursor | None = None) -> Tuple["Moderation"]:
query = f"SELECT * FROM moderation_{guild_id} WHERE target_id = ?"
if types:
query += f" AND moderation_type IN ({', '.join(['?' for _ in types])})"
query += " ORDER BY moderation_id DESC;"
return cls.execute(bot=bot, guild_id=guild_id, query=query, parameters=(target, *types) if types else (target,), cursor=cursor)
@classmethod
def find_by_moderator(cls, bot: Red, guild_id: int, moderator: int, types: Iterable | None = None, cursor: Cursor | None = None) -> Tuple["Moderation"]:
query = f"SELECT * FROM moderation_{guild_id} WHERE moderator_id = ?"
if types:
query += f" AND moderation_type IN ({', '.join(['?' for _ in types])})"
query += " ORDER BY moderation_id DESC;"
return cls.execute(bot=bot, guild_id=guild_id, query=query, parameters=(moderator, *types) if types else (moderator,), cursor=cursor)
@classmethod
def log(
cls,
bot: Red,
guild_id: int,
moderator_id: int,
moderation_type: str,
target_type: str,
target_id: int,
role_id: int | None = None,
duration: timedelta | None = None,
reason: str | None = None,
database: sqlite3.Connection | None = None,
timestamp: datetime | None = None,
resolved: bool = False,
resolved_by: int | None = None,
resolved_reason: str | None = None,
expired: bool | None = None,
changes: list | None = None,
metadata: dict | None = None,
return_obj: bool = True,
) -> Union["Moderation", None]:
from ..utilities.database import connect
from ..utilities.json import dumps
if not timestamp:
timestamp = datetime.fromtimestamp(time())
elif not isinstance(timestamp, datetime):
timestamp = datetime.fromtimestamp(timestamp)
if duration == "NULL":
duration = None
if duration is not None:
end_timestamp = timestamp + duration
else:
duration = None
end_timestamp = None
if not expired:
if end_timestamp:
expired = bool(timestamp > end_timestamp)
else:
expired = False
if reason == "NULL":
reason = None
if resolved_by == "NULL" or resolved_by == '?':
resolved_by = None
if resolved_reason == "NULL":
resolved_reason = None
if role_id == 0:
role_id = None
if not database:
database = connect()
close_db = True
else:
close_db = False
cursor = database.cursor()
moderation_id = cls.get_next_case_number(bot=bot, guild_id=guild_id, cursor=cursor)
case = {
"moderation_id": moderation_id,
"timestamp": timestamp.timestamp(),
"moderation_type": moderation_type,
"target_type": target_type,
"target_id": target_id,
"moderator_id": moderator_id,
"role_id": role_id,
"duration": timedelta_to_string(duration) if duration else None,
"end_timestamp": end_timestamp.timestamp() if end_timestamp else None,
"reason": reason,
"resolved": resolved,
"resolved_by": resolved_by,
"resolve_reason": resolved_reason,
"expired": expired,
"changes": dumps(changes),
"metadata": dumps(metadata)
}
sql = f"INSERT INTO `moderation_{guild_id}` (moderation_id, timestamp, moderation_type, target_type, target_id, moderator_id, role_id, duration, end_timestamp, reason, resolved, resolved_by, resolve_reason, expired, changes, metadata) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)"
cursor.execute(sql, tuple(case.values()))
cursor.close()
database.commit()
if close_db:
database.close()
logger.debug(
"Row inserted into moderation_%s!\n%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s",
guild_id,
case["moderation_id"],
case["timestamp"],
case["moderation_type"],
case["target_type"],
case["target_id"],
case["moderator_id"],
case["role_id"],
case["duration"],
case["end_timestamp"],
case["reason"],
case["resolved"],
case["resolved_by"],
case["resolve_reason"],
case["expired"],
case["changes"],
case["metadata"],
)
if return_obj:
return cls.find_by_id(bot=bot, moderation_id=moderation_id, guild_id=guild_id)