374 lines
13 KiB
Python
374 lines
13 KiB
Python
import json
|
|
import sqlite3
|
|
from datetime import datetime, timedelta
|
|
from time import time
|
|
from typing import Dict, Iterable, List, Optional, Union
|
|
|
|
import discord
|
|
from discord import NotFound
|
|
from redbot.core.bot import Red
|
|
|
|
from ..utilities.logger import logger
|
|
from ..utilities.utils import get_next_case_number
|
|
from .base import AuroraGuildModel
|
|
from .change import Change
|
|
from .partials import PartialChannel, PartialRole, PartialUser
|
|
|
|
|
|
class Moderation(AuroraGuildModel):
|
|
moderation_id: int
|
|
timestamp: datetime
|
|
moderation_type: str
|
|
target_type: str
|
|
target_id: int
|
|
moderator_id: int
|
|
role_id: Optional[int] = None
|
|
duration: Optional[timedelta] = None
|
|
end_timestamp: Optional[datetime] = None
|
|
reason: Optional[str] = None
|
|
resolved: bool
|
|
resolved_by: Optional[int] = None
|
|
resolve_reason: Optional[str] = None
|
|
expired: bool
|
|
changes: List["Change"]
|
|
metadata: Dict
|
|
|
|
@property
|
|
def id(self) -> int:
|
|
return self.moderation_id
|
|
|
|
@property
|
|
def type(self) -> str:
|
|
return self.moderation_type
|
|
|
|
@property
|
|
def unix_timestamp(self) -> int:
|
|
return int(self.timestamp.timestamp())
|
|
|
|
async def get_moderator(self) -> "PartialUser":
|
|
return await PartialUser.from_id(self.bot, self.moderator_id)
|
|
|
|
async def get_target(self) -> Union["PartialUser", "PartialChannel"]:
|
|
if self.target_type == "USER":
|
|
return await PartialUser.from_id(self.bot, self.target_id)
|
|
return await PartialChannel.from_id(self.bot, self.target_id)
|
|
|
|
async def get_resolved_by(self) -> Optional["PartialUser"]:
|
|
if self.resolved_by:
|
|
return await PartialUser.from_id(self.bot, self.resolved_by)
|
|
return None
|
|
|
|
async def get_role(self) -> Optional["PartialRole"]:
|
|
if self.role_id:
|
|
return await PartialRole.from_id(self.bot, self.guild_id, self.role_id)
|
|
return None
|
|
|
|
def __str__(self):
|
|
return f"{self.moderation_type} {self.target_type} {self.target_id} {self.reason}"
|
|
|
|
async def resolve(self, resolved_by: int, reason: str):
|
|
if self.resolved:
|
|
raise ValueError("Case is already resolved!")
|
|
|
|
self.resolved = True
|
|
self.resolved_by = resolved_by
|
|
self.resolve_reason = reason
|
|
|
|
if self.type in ["UNMUTE", "UNBAN"]:
|
|
raise TypeError("Cannot resolve an unmute or unban case!")
|
|
|
|
if self.type == "MUTE":
|
|
try:
|
|
guild: discord.Guild = await self.bot.fetch_guild(self.guild_id)
|
|
member = await guild.fetch_member(self.target_id)
|
|
|
|
await member.timeout(
|
|
None, reason=f"Case {self.moderation_id} resolved by {resolved_by}{' for '+ reason if reason else ''}"
|
|
)
|
|
except NotFound:
|
|
pass
|
|
|
|
if self.type in ["BAN", "TEMPBAN"]:
|
|
try:
|
|
guild: discord.Guild = await self.bot.fetch_guild(self.guild_id)
|
|
await guild.unban(await self.get_target(), reason=f"Case {self.moderation_id} resolved by {resolved_by}{' for '+ reason if reason else ''}")
|
|
except NotFound:
|
|
pass
|
|
|
|
if not self.changes:
|
|
self.changes.append(Change.from_dict(self.bot, {
|
|
"type": "ORIGINAL",
|
|
"timestamp": self.timestamp,
|
|
"reason": self.reason,
|
|
"user_id": self.moderator_id,
|
|
"duration": self.duration,
|
|
"end_timestamp": self.end_timestamp,
|
|
}))
|
|
self.changes.append(Change.from_dict(self.bot, {
|
|
"type": "RESOLVE",
|
|
"timestamp": datetime.now(),
|
|
"reason": reason,
|
|
"user_id": resolved_by,
|
|
}))
|
|
|
|
self.update()
|
|
|
|
def update(self):
|
|
from ..utilities.database import connect
|
|
from ..utilities.json import dumps
|
|
query = f"UPDATE moderation_{self.guild_id} SET timestamp = ?, moderation_type = ?, target_type = ?, moderator_id = ?, role_id = ?, duration = ?, end_timestamp = ?, reason = ?, resolved = ?, resolved_by = ?, resolve_reason = ?, expired = ?, changes = ?, metadata = ? WHERE moderation_id = ?;"
|
|
|
|
with connect() as database:
|
|
database.execute(query, (
|
|
self.timestamp.timestamp(),
|
|
self.moderation_type,
|
|
self.target_type,
|
|
self.moderator_id,
|
|
self.role_id,
|
|
str(self.duration) if self.duration else None,
|
|
self.end_timestamp.timestamp() if self.end_timestamp else None,
|
|
self.reason,
|
|
self.resolved,
|
|
self.resolved_by,
|
|
self.resolve_reason,
|
|
self.expired,
|
|
dumps(self.changes),
|
|
dumps(self.metadata),
|
|
self.moderation_id,
|
|
))
|
|
|
|
logger.debug("Row updated in moderation_%s!\n%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s",
|
|
self.moderation_id,
|
|
self.guild_id,
|
|
self.timestamp.timestamp(),
|
|
self.moderation_type,
|
|
self.target_type,
|
|
self.moderator_id,
|
|
self.role_id,
|
|
str(self.duration) if self.duration else None,
|
|
self.end_timestamp.timestamp() if self.end_timestamp else None,
|
|
self.reason,
|
|
self.resolved,
|
|
self.resolved_by,
|
|
self.resolve_reason,
|
|
self.expired,
|
|
dumps(self.changes),
|
|
dumps(self.metadata),
|
|
)
|
|
|
|
@classmethod
|
|
def from_dict(cls, bot: Red, data: dict) -> "Moderation":
|
|
return cls(bot=bot, **data)
|
|
|
|
@classmethod
|
|
def from_result(cls, bot: Red, result: Iterable, guild_id: int) -> "Moderation":
|
|
if result[7] is not None:
|
|
hours, minutes, seconds = map(int, result[7].split(':'))
|
|
duration = timedelta(hours=hours, minutes=minutes, seconds=seconds)
|
|
else:
|
|
duration = None
|
|
|
|
if result[14] is not None:
|
|
changes = json.loads(result[14])
|
|
change_obj_list = []
|
|
if changes:
|
|
for change in changes:
|
|
change_obj_list.append(Change.from_dict(bot=bot, data=change))
|
|
|
|
if result[15] is not None:
|
|
metadata = json.loads(result[15])
|
|
else:
|
|
metadata = {}
|
|
|
|
case = {
|
|
"moderation_id": int(result[0]),
|
|
"guild_id": int(guild_id),
|
|
"timestamp": datetime.fromtimestamp(result[1]),
|
|
"moderation_type": str(result[2]),
|
|
"target_type": str(result[3]),
|
|
"target_id": int(result[4]),
|
|
"moderator_id": int(result[5]),
|
|
"role_id": int(result[6]) if result[6] is not None else None,
|
|
"duration": duration,
|
|
"end_timestamp": datetime.fromtimestamp(result[8]) if result[8] is not None else None,
|
|
"reason": result[9],
|
|
"resolved": bool(result[10]),
|
|
"resolved_by": result[11],
|
|
"resolve_reason": result[12],
|
|
"expired": bool(result[13]),
|
|
"changes": change_obj_list,
|
|
"metadata": metadata if metadata else {},
|
|
}
|
|
return cls.from_dict(bot=bot, data=case)
|
|
|
|
@classmethod
|
|
def execute(cls, bot: Red, query: str, parameters: tuple | None = None) -> List["Moderation"]:
|
|
from ..utilities.database import connect
|
|
if not parameters:
|
|
parameters = ()
|
|
with connect() as database:
|
|
cursor = database.cursor()
|
|
cursor.execute(query, parameters=parameters)
|
|
results = cursor.fetchall()
|
|
cursor.close()
|
|
|
|
if results:
|
|
cases = []
|
|
for result in results:
|
|
case = cls.from_result(bot=bot, result=result)
|
|
if case.moderation_id != 0:
|
|
cases.append(case)
|
|
return cases
|
|
return []
|
|
|
|
@classmethod
|
|
def from_sql(cls, bot: Red, moderation_id: int, guild_id: int) -> "Moderation":
|
|
return cls.find_by_id(bot=bot, moderation_id=moderation_id, guild_id=guild_id)
|
|
|
|
@classmethod
|
|
def find_by_id(cls, bot: Red, moderation_id: int, guild_id: int) -> "Moderation":
|
|
query = f"SELECT * FROM moderation_{guild_id} WHERE moderation_id = ?;"
|
|
case = cls.execute(bot=bot, query=query, parameters=(moderation_id,))
|
|
if case:
|
|
return case[0]
|
|
raise ValueError(f"Case {moderation_id} not found in moderation_{guild_id}!")
|
|
|
|
@classmethod
|
|
def find_by_target(cls, bot: Red, guild_id: int, target: int, types: list | None = None) -> List["Moderation"]:
|
|
query = f"SELECT * FROM moderation_{guild_id} WHERE target_id = ?"
|
|
if types:
|
|
query += f" AND moderation_type IN ({', '.join(['?' for _ in types])})"
|
|
query += " ORDER BY moderation_id DESC;"
|
|
|
|
return cls.execute(bot=bot, query=query, parameters=(target, *types) if types else (target,))
|
|
|
|
@classmethod
|
|
def find_by_moderator(cls, bot: Red, guild_id: int, moderator: int, types: list | None = None) -> List["Moderation"]:
|
|
query = f"SELECT * FROM moderation_{guild_id} WHERE moderator_id = ?"
|
|
if types:
|
|
query += f" AND moderation_type IN ({', '.join(['?' for _ in types])})"
|
|
query += " ORDER BY moderation_id DESC;"
|
|
|
|
return cls.execute(bot=bot, query=query, parameters=(moderator, *types) if types else (moderator,))
|
|
|
|
@classmethod
|
|
def get_all_cases(cls, bot: Red, guild_id: int, types: list | None = None) -> List["Moderation"]:
|
|
query = f"SELECT * FROM moderation_{guild_id}"
|
|
if types:
|
|
query += f" WHERE moderation_type IN ({', '.join(['?' for _ in types])})"
|
|
query += " ORDER BY moderation_id DESC;"
|
|
return cls.execute(bot=bot, query=query, parameters=tuple(iterable=types) if types else None)
|
|
|
|
@classmethod
|
|
def log(
|
|
cls,
|
|
bot: Red,
|
|
guild_id: int,
|
|
moderator_id: int,
|
|
moderation_type: str,
|
|
target_type: str,
|
|
target_id: int,
|
|
role_id: int | None = None,
|
|
duration: timedelta | None = None,
|
|
reason: str | None = None,
|
|
database: sqlite3.Connection | None = None,
|
|
timestamp: datetime | None = None,
|
|
resolved: bool = False,
|
|
resolved_by: int | None = None,
|
|
resolved_reason: str | None = None,
|
|
expired: bool | None = None,
|
|
changes: list | None = None,
|
|
metadata: dict | None = None,
|
|
) -> "Moderation":
|
|
from ..utilities.database import connect
|
|
from ..utilities.json import dumps
|
|
if not timestamp:
|
|
timestamp = datetime.fromtimestamp(time())
|
|
elif not isinstance(timestamp, datetime):
|
|
timestamp = datetime.fromtimestamp(timestamp)
|
|
|
|
if duration == "NULL":
|
|
duration = None
|
|
|
|
if duration is not None:
|
|
end_timestamp = timestamp + duration
|
|
else:
|
|
duration = None
|
|
end_timestamp = None
|
|
|
|
if not expired:
|
|
if end_timestamp:
|
|
expired = bool(timestamp > end_timestamp)
|
|
else:
|
|
expired = False
|
|
|
|
if reason == "NULL":
|
|
reason = None
|
|
|
|
if resolved_by == "NULL":
|
|
resolved_by = None
|
|
|
|
if resolved_reason == "NULL":
|
|
resolved_reason = None
|
|
|
|
if role_id == 0:
|
|
role_id = None
|
|
|
|
if not database:
|
|
database = connect()
|
|
close_db = True
|
|
else:
|
|
close_db = False
|
|
cursor = database.cursor()
|
|
|
|
moderation_id = get_next_case_number(guild_id=guild_id, cursor=cursor)
|
|
|
|
case = {
|
|
"moderation_id": moderation_id,
|
|
"timestamp": timestamp.timestamp(),
|
|
"moderation_type": moderation_type,
|
|
"target_type": target_type,
|
|
"target_id": target_id,
|
|
"moderator_id": moderator_id,
|
|
"role_id": role_id,
|
|
"duration": str(duration) if duration else None,
|
|
"end_timestamp": end_timestamp.timestamp() if end_timestamp else None,
|
|
"reason": reason,
|
|
"resolved": resolved,
|
|
"resolved_by": resolved_by,
|
|
"resolve_reason": resolved_reason,
|
|
"expired": expired,
|
|
"changes": dumps(changes),
|
|
"metadata": dumps(metadata)
|
|
}
|
|
|
|
sql = f"INSERT INTO `moderation_{guild_id}` (moderation_id, timestamp, moderation_type, target_type, target_id, moderator_id, role_id, duration, end_timestamp, reason, resolved, resolved_by, resolve_reason, expired, changes, metadata) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)"
|
|
cursor.execute(sql, tuple(case.values()))
|
|
|
|
cursor.close()
|
|
database.commit()
|
|
if close_db:
|
|
database.close()
|
|
|
|
logger.debug(
|
|
"Row inserted into moderation_%s!\n%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s",
|
|
guild_id,
|
|
case["moderation_id"],
|
|
case["timestamp"],
|
|
case["moderation_type"],
|
|
case["target_type"],
|
|
case["target_id"],
|
|
case["moderator_id"],
|
|
case["role_id"],
|
|
case["duration"],
|
|
case["end_timestamp"],
|
|
case["reason"],
|
|
case["resolved"],
|
|
case["resolved_by"],
|
|
case["resolve_reason"],
|
|
case["expired"],
|
|
case["changes"],
|
|
case["metadata"],
|
|
)
|
|
|
|
return cls.from_sql(bot=bot, moderation_id=moderation_id, guild_id=guild_id)
|