SeaCogs/aurora/models/moderation.py
Seaswimmer 1990b97518
All checks were successful
Actions / Build Documentation (MkDocs) (pull_request) Successful in 49s
Actions / Lint Code (Ruff & Pylint) (pull_request) Successful in 1m13s
fix(aurora): ignore moderation cases where the moderation_id is 0
2024-06-02 23:59:17 -04:00

353 lines
12 KiB
Python

import json
import sqlite3
from datetime import datetime, timedelta
from time import time
from typing import Dict, Iterable, List, Optional, Union
import discord
from discord import NotFound
from redbot.core.bot import Red
from ..utilities.logger import logger
from ..utilities.utils import get_next_case_number
from .base import AuroraGuildModel
from .change import Change
from .partials import PartialChannel, PartialRole, PartialUser
class Moderation(AuroraGuildModel):
moderation_id: int
timestamp: datetime
moderation_type: str
target_type: str
target_id: int
moderator_id: int
role_id: Optional[int] = None
duration: Optional[timedelta] = None
end_timestamp: Optional[datetime] = None
reason: Optional[str] = None
resolved: bool
resolved_by: Optional[int] = None
resolve_reason: Optional[str] = None
expired: bool
changes: List["Change"]
metadata: Dict
@property
def id(self) -> int:
return self.moderation_id
@property
def type(self) -> str:
return self.moderation_type
@property
def unix_timestamp(self) -> int:
return int(self.timestamp.timestamp())
async def get_moderator(self) -> "PartialUser":
return await PartialUser.from_id(self.bot, self.moderator_id)
async def get_target(self) -> Union["PartialUser", "PartialChannel"]:
if self.target_type == "USER":
return await PartialUser.from_id(self.bot, self.target_id)
return await PartialChannel.from_id(self.bot, self.target_id)
async def get_resolved_by(self) -> Optional["PartialUser"]:
if self.resolved_by:
return await PartialUser.from_id(self.bot, self.resolved_by)
return None
async def get_role(self) -> Optional["PartialRole"]:
if self.role_id:
return await PartialRole.from_id(self.bot, self.guild_id, self.role_id)
return None
def __str__(self):
return f"{self.moderation_type} {self.target_type} {self.target_id} {self.reason}"
async def resolve(self, resolved_by: int, reason: str):
if self.resolved:
raise ValueError("Case is already resolved!")
self.resolved = True
self.resolved_by = resolved_by
self.resolve_reason = reason
if self.type in ["UNMUTE", "UNBAN"]:
raise TypeError("Cannot resolve an unmute or unban case!")
if self.type == "MUTE":
try:
guild: discord.Guild = await self.bot.fetch_guild(self.guild_id)
member = await guild.fetch_member(self.target_id)
await member.timeout(
None, reason=f"Case {self.moderation_id} resolved by {resolved_by}{' for '+ reason if reason else ''}"
)
except NotFound:
pass
if self.type in ["BAN", "TEMPBAN"]:
try:
guild: discord.Guild = await self.bot.fetch_guild(self.guild_id)
await guild.unban(await self.get_target(), reason=f"Case {self.moderation_id} resolved by {resolved_by}{' for '+ reason if reason else ''}")
except NotFound:
pass
if not self.changes:
self.changes.append(Change.from_dict(self.bot, {
"type": "ORIGINAL",
"timestamp": self.timestamp,
"reason": self.reason,
"user_id": self.moderator_id,
"duration": self.duration,
"end_timestamp": self.end_timestamp,
}))
self.changes.append(Change.from_dict(self.bot, {
"type": "RESOLVE",
"timestamp": datetime.now(),
"reason": reason,
"user_id": resolved_by,
}))
self.update()
def update(self):
from ..utilities.database import connect
from ..utilities.json import dumps
query = f"UPDATE moderation_{self.guild_id} SET timestamp = ?, moderation_type = ?, target_type = ?, moderator_id = ?, role_id = ?, duration = ?, end_timestamp = ?, reason = ?, resolved = ?, resolved_by = ?, resolve_reason = ?, expired = ?, changes = ?, metadata = ? WHERE moderation_id = ?;"
with connect() as database:
database.execute(query, (
self.timestamp.timestamp(),
self.moderation_type,
self.target_type,
self.moderator_id,
self.role_id,
str(self.duration) if self.duration else None,
self.end_timestamp.timestamp() if self.end_timestamp else None,
self.reason,
self.resolved,
self.resolved_by,
self.resolve_reason,
self.expired,
dumps(self.changes),
dumps(self.metadata),
self.moderation_id,
))
logger.debug("Row updated in moderation_%s!\n%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s",
self.moderation_id,
self.guild_id,
self.timestamp.timestamp(),
self.moderation_type,
self.target_type,
self.moderator_id,
self.role_id,
str(self.duration) if self.duration else None,
self.end_timestamp.timestamp() if self.end_timestamp else None,
self.reason,
self.resolved,
self.resolved_by,
self.resolve_reason,
self.expired,
dumps(self.changes),
dumps(self.metadata),
)
@classmethod
def from_dict(cls, bot: Red, data: dict) -> "Moderation":
return cls(bot=bot, **data)
@classmethod
def from_result(cls, bot: Red, result: Iterable, guild_id: int) -> "Moderation":
if result[7] is not None:
hours, minutes, seconds = map(int, result[7].split(':'))
duration = timedelta(hours=hours, minutes=minutes, seconds=seconds)
else:
duration = None
if result[14] is not None:
changes = json.loads(result[14])
change_obj_list = []
if changes:
for change in changes:
change_obj_list.append(Change.from_dict(bot=bot, data=change))
if result[15] is not None:
metadata = json.loads(result[15])
else:
metadata = {}
case = {
"moderation_id": int(result[0]),
"guild_id": int(guild_id),
"timestamp": datetime.fromtimestamp(result[1]),
"moderation_type": str(result[2]),
"target_type": str(result[3]),
"target_id": int(result[4]),
"moderator_id": int(result[5]),
"role_id": int(result[6]) if result[6] is not None else None,
"duration": duration,
"end_timestamp": datetime.fromtimestamp(result[8]) if result[8] is not None else None,
"reason": result[9],
"resolved": bool(result[10]),
"resolved_by": result[11],
"resolve_reason": result[12],
"expired": bool(result[13]),
"changes": change_obj_list,
"metadata": metadata if metadata else {},
}
return cls.from_dict(bot=bot, data=case)
@classmethod
def from_sql(cls, bot: Red, moderation_id: int, guild_id: int) -> "Moderation":
from ..utilities.database import connect
query = f"SELECT * FROM moderation_{guild_id} WHERE moderation_id = ?;"
with connect() as database:
cursor = database.cursor()
cursor.execute(query, (moderation_id,))
result = cursor.fetchone()
cursor.close()
if result and not moderation_id == 0:
return cls.from_result(bot, result, guild_id)
raise ValueError(f"Case {moderation_id} not found in moderation_{guild_id}!")
@classmethod
def from_sql_all(cls, bot: Red, guild_id: int) -> List["Moderation"]:
from ..utilities.database import connect
query = f"SELECT * FROM moderation_{guild_id};"
with connect() as database:
cursor = database.cursor()
cursor.execute(query)
results = cursor.fetchall()
cursor.close()
if results:
cases = []
for result in results:
case = cls.from_result(bot, result, guild_id)
if case.moderation_id != 0:
cases.append(case)
return cases
return []
@classmethod
def log(
cls,
bot: Red,
guild_id: int,
moderator_id: int,
moderation_type: str,
target_type: str,
target_id: int,
role_id: int | None = None,
duration: timedelta | None = None,
reason: str | None = None,
database: sqlite3.Connection | None = None,
timestamp: datetime | None = None,
resolved: bool = False,
resolved_by: int | None = None,
resolved_reason: str | None = None,
expired: bool | None = None,
changes: list | None = None,
metadata: dict | None = None,
) -> "Moderation":
from ..utilities.database import connect
from ..utilities.json import dumps
if not timestamp:
timestamp = datetime.fromtimestamp(time())
elif not isinstance(timestamp, datetime):
timestamp = datetime.fromtimestamp(timestamp)
if duration == "NULL":
duration = None
if duration is not None:
end_timestamp = timestamp + duration
else:
duration = None
end_timestamp = None
if not expired:
if end_timestamp:
expired = bool(timestamp > end_timestamp)
else:
expired = False
if reason == "NULL":
reason = None
if resolved_by == "NULL":
resolved_by = None
if resolved_reason == "NULL":
resolved_reason = None
if role_id == 0:
role_id = None
if not database:
database = connect()
close_db = True
else:
close_db = False
cursor = database.cursor()
moderation_id = get_next_case_number(guild_id=guild_id, cursor=cursor)
case = {
"moderation_id": moderation_id,
"timestamp": timestamp.timestamp(),
"moderation_type": moderation_type,
"target_type": target_type,
"target_id": target_id,
"moderator_id": moderator_id,
"role_id": role_id,
"duration": str(duration) if duration else None,
"end_timestamp": end_timestamp.timestamp() if end_timestamp else None,
"reason": reason,
"resolved": resolved,
"resolved_by": resolved_by,
"resolve_reason": resolved_reason,
"expired": expired,
"changes": dumps(changes),
"metadata": dumps(metadata)
}
sql = f"INSERT INTO `moderation_{guild_id}` (moderation_id, timestamp, moderation_type, target_type, target_id, moderator_id, role_id, duration, end_timestamp, reason, resolved, resolved_by, resolve_reason, expired, changes, metadata) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)"
cursor.execute(sql, tuple(case.values()))
cursor.close()
database.commit()
if close_db:
database.close()
logger.debug(
"Row inserted into moderation_%s!\n%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s",
guild_id,
case["moderation_id"],
case["timestamp"],
case["moderation_type"],
case["target_type"],
case["target_id"],
case["moderator_id"],
case["role_id"],
case["duration"],
case["end_timestamp"],
case["reason"],
case["resolved"],
case["resolved_by"],
case["resolve_reason"],
case["expired"],
case["changes"],
case["metadata"],
)
return cls.from_sql(bot=bot, moderation_id=moderation_id, guild_id=guild_id)