version 1.0.0
This commit is contained in:
parent
796b57b2d2
commit
2c7ff37d60
17 changed files with 1975 additions and 0 deletions
54
.forgejo/workflows/actions.yaml
Normal file
54
.forgejo/workflows/actions.yaml
Normal file
|
@ -0,0 +1,54 @@
|
|||
name: Actions
|
||||
on:
|
||||
push:
|
||||
|
||||
jobs:
|
||||
Build:
|
||||
runs-on: docker
|
||||
container: www.coastalcommits.com/seaswimmerthefsh/actionscontainers-seacogs:latest
|
||||
steps:
|
||||
- name: Checkout
|
||||
uses: actions/checkout@v3
|
||||
|
||||
- name: Install dependencies
|
||||
run: python3 -m venv .venv && ./.venv/bin/pip install -r ./requirements.txt
|
||||
|
||||
- name: Build the package
|
||||
run: ./.venv/bin/python -m build
|
||||
|
||||
- name: Upload the package
|
||||
uses: actions/upload-artifact@v3
|
||||
with:
|
||||
name: seautils
|
||||
path: ./dist/*
|
||||
|
||||
- name: Publish to CoastalCommits
|
||||
if: startsWith(github.ref, 'refs/tags/')
|
||||
uses: actions/pypi-publish@v1.8.14
|
||||
with:
|
||||
username: ${{ github.repository_owner }}
|
||||
password: ${{ secrets.COASTALCOMMITSTOKEN }}
|
||||
repository_url: https://www.coastalcommits.com/api/packages/${{ github.repository_owner }}/pypi
|
||||
|
||||
# - name: Publish to PyPi
|
||||
# if: startsWith(github.ref, 'refs/tags/')
|
||||
# uses: actions/pypi-publish@v1.8.14
|
||||
# with:
|
||||
# password: ${{ secrets.PYPI_API_TOKEN }}
|
||||
|
||||
Lint Code (Ruff & Pylint):
|
||||
runs-on: docker
|
||||
container: www.coastalcommits.com/seaswimmerthefsh/actionscontainers-seacogs:latest
|
||||
steps:
|
||||
- name: Checkout
|
||||
uses: actions/checkout@v3
|
||||
|
||||
- name: Install dependencies
|
||||
run: python3 -m venv .venv && ./.venv/bin/pip install -r ./requirements.txt
|
||||
|
||||
- name: Analysing code with Ruff
|
||||
run: ./.venv/bin/ruff check $(git ls-files '*.py')
|
||||
continue-on-error: true
|
||||
|
||||
- name: Analysing code with Pylint
|
||||
run: ./.venv/bin/pylint --rcfile=.forgejo/workflows/config/.pylintrc $(git ls-files '*.py')
|
22
pyproject.toml
Normal file
22
pyproject.toml
Normal file
|
@ -0,0 +1,22 @@
|
|||
[project]
|
||||
name = "seautils"
|
||||
version = "1.0.0"
|
||||
authors = [
|
||||
{ name="Seaswimmer", email="seaswimmerthefsh@gmail.com" },
|
||||
]
|
||||
description = "A Python library for utility functions, used in SeaCogs and for third party integrations with the Aurora cog."
|
||||
readme = "README.md"
|
||||
requires-python = ">=3.10, <3.12"
|
||||
classifiers = [
|
||||
"Programming Language :: Python :: 3",
|
||||
"License :: OSI Approved :: MPL-2.0 License",
|
||||
"Operating System :: OS Independent",
|
||||
]
|
||||
|
||||
[project.urls]
|
||||
Homepage = "https://www.coastalcommits.com/Seaswimmer/SeaUtils"
|
||||
Issues = "https://www.coastalcommits.com/Seaswimmer/SeaUtils/issues"
|
||||
|
||||
[build-system]
|
||||
requires = ["hatchling", "semver"]
|
||||
build-backend = "hatchling.build"
|
45
requirements.txt
Normal file
45
requirements.txt
Normal file
|
@ -0,0 +1,45 @@
|
|||
aiohttp==3.9.5
|
||||
aiohttp-json-rpc==0.13.3
|
||||
aiosignal==1.3.1
|
||||
aiosqlite==0.20.0
|
||||
annotated-types==0.7.0
|
||||
apsw==3.46.0.1
|
||||
astroid==3.2.4
|
||||
attrs==24.1.0
|
||||
Babel==2.15.0
|
||||
Brotli==1.1.0
|
||||
click==8.1.7
|
||||
dill==0.3.8
|
||||
discord.py==2.4.0
|
||||
distro==1.9.0
|
||||
frozenlist==1.4.1
|
||||
idna==3.7
|
||||
isort==5.13.2
|
||||
Markdown==3.6
|
||||
markdown-it-py==3.0.0
|
||||
mccabe==0.7.0
|
||||
mdurl==0.1.2
|
||||
multidict==6.0.5
|
||||
orjson==3.10.6
|
||||
packaging==24.1
|
||||
phx-class-registry==4.1.0
|
||||
platformdirs==4.2.2
|
||||
psutil==6.0.0
|
||||
pydantic==2.8.2
|
||||
pydantic_core==2.20.1
|
||||
Pygments==2.18.0
|
||||
pylint==3.2.6
|
||||
python-dateutil==2.9.0.post0
|
||||
PyYAML==6.0.1
|
||||
rapidfuzz==3.9.5
|
||||
Red-Commons==1.0.0
|
||||
Red-DiscordBot==3.5.12
|
||||
Red-Lavalink==0.11.0
|
||||
rich==13.7.1
|
||||
ruff==0.5.7
|
||||
schema==0.7.7
|
||||
six==1.16.0
|
||||
tomlkit==0.13.1
|
||||
typing_extensions==4.12.2
|
||||
uvloop==0.19.0
|
||||
yarl==1.9.4
|
0
seautils/__init__.py
Normal file
0
seautils/__init__.py
Normal file
0
seautils/aurora/__init__.py
Normal file
0
seautils/aurora/__init__.py
Normal file
2
seautils/aurora/models/__init__.py
Normal file
2
seautils/aurora/models/__init__.py
Normal file
|
@ -0,0 +1,2 @@
|
|||
from .moderation_types import * # noqa: F403
|
||||
# This just imports all the built-in moderation types so they can be registered, as they aren't imported anywhere else.
|
31
seautils/aurora/models/base.py
Normal file
31
seautils/aurora/models/base.py
Normal file
|
@ -0,0 +1,31 @@
|
|||
from typing import Any, Optional
|
||||
|
||||
from discord import Guild
|
||||
from pydantic import BaseModel, ConfigDict
|
||||
from redbot.core.bot import Red
|
||||
|
||||
|
||||
class AuroraBaseModel(BaseModel):
|
||||
"""Base class for all models in Aurora."""
|
||||
model_config = ConfigDict(ignored_types=(Red,), arbitrary_types_allowed=True)
|
||||
bot: Red
|
||||
|
||||
def dump(self) -> dict:
|
||||
return self.model_dump(exclude={"bot"})
|
||||
|
||||
def to_json(self, indent: int | None = None, file: Any | None = None, **kwargs) -> str:
|
||||
from ..utils.json import dump, dumps # pylint: disable=cyclic-import
|
||||
return dump(self.dump(), file, indent=indent, **kwargs) if file else dumps(self.dump(), indent=indent, **kwargs)
|
||||
|
||||
class AuroraGuildModel(AuroraBaseModel):
|
||||
"""Subclass of AuroraBaseModel that includes a guild_id attribute and a guild attribute, and a modified to_json() method to match."""
|
||||
model_config = ConfigDict(ignored_types=(Red, Guild), arbitrary_types_allowed=True)
|
||||
guild_id: int
|
||||
guild: Optional[Guild] = None
|
||||
|
||||
def dump(self) -> dict:
|
||||
return self.model_dump(exclude={"bot", "guild_id", "guild"})
|
||||
|
||||
def to_json(self, indent: int | None = None, file: Any | None = None, **kwargs) -> str:
|
||||
from ..utils.json import dump, dumps # pylint: disable=cyclic-import
|
||||
return dump(self.dump(), file, indent=indent, **kwargs) if file else dumps(self.dump(), indent=indent, **kwargs)
|
63
seautils/aurora/models/change.py
Normal file
63
seautils/aurora/models/change.py
Normal file
|
@ -0,0 +1,63 @@
|
|||
import json
|
||||
from datetime import datetime, timedelta
|
||||
from typing import Literal, Optional
|
||||
|
||||
from redbot.core.bot import Red
|
||||
|
||||
from seautils.aurora.models.base import AuroraBaseModel
|
||||
from seautils.aurora.models.partials import PartialUser
|
||||
from seautils.aurora.utils.misc import timedelta_from_string
|
||||
|
||||
|
||||
class Change(AuroraBaseModel):
|
||||
type: Literal["ORIGINAL", "RESOLVE", "EDIT"]
|
||||
timestamp: datetime
|
||||
user_id: int
|
||||
reason: Optional[str] = "No reason provided"
|
||||
duration: Optional[timedelta] = None
|
||||
end_timestamp: Optional[datetime] = None
|
||||
|
||||
@property
|
||||
def unix_timestamp(self) -> int:
|
||||
return int(self.timestamp.timestamp())
|
||||
|
||||
def __str__(self):
|
||||
return f"{self.type} {self.user_id} {self.reason}"
|
||||
|
||||
async def get_user(self) -> "PartialUser":
|
||||
return await PartialUser.from_id(self.bot, self.user_id)
|
||||
|
||||
@classmethod
|
||||
def from_dict(cls, bot: Red, data: dict) -> "Change":
|
||||
if isinstance(data, str):
|
||||
data = json.loads(data)
|
||||
if "duration" in data and data["duration"] and not isinstance(data["duration"], timedelta) and not data["duration"] == "NULL":
|
||||
duration = timedelta_from_string(data["duration"])
|
||||
elif "duration" in data and isinstance(data["duration"], timedelta):
|
||||
duration = data["duration"]
|
||||
else:
|
||||
duration = None
|
||||
|
||||
if "end_timestamp" in data and data["end_timestamp"] and not isinstance(data["end_timestamp"], datetime):
|
||||
end_timestamp = datetime.fromtimestamp(data["end_timestamp"])
|
||||
elif "end_timestamp" in data and isinstance(data["end_timestamp"], datetime):
|
||||
end_timestamp = data["end_timestamp"]
|
||||
else:
|
||||
end_timestamp = None
|
||||
|
||||
if not isinstance(data["timestamp"], datetime):
|
||||
timestamp = datetime.fromtimestamp(data["timestamp"])
|
||||
else:
|
||||
timestamp = data["timestamp"]
|
||||
|
||||
try:
|
||||
data["user_id"] = int(data["user_id"])
|
||||
except ValueError:
|
||||
data["user_id"] = 0
|
||||
|
||||
data.update({
|
||||
"timestamp": timestamp,
|
||||
"end_timestamp": end_timestamp,
|
||||
"duration": duration
|
||||
})
|
||||
return cls(bot=bot, **data)
|
538
seautils/aurora/models/moderation.py
Normal file
538
seautils/aurora/models/moderation.py
Normal file
|
@ -0,0 +1,538 @@
|
|||
import json
|
||||
import sqlite3
|
||||
from datetime import datetime, timedelta
|
||||
from time import time
|
||||
from typing import Dict, Iterable, List, Optional, Tuple, Union
|
||||
|
||||
import discord
|
||||
from aiosqlite import Connection, Cursor, OperationalError, Row
|
||||
from aiosqlite import connect as aiosqlite_connect
|
||||
from redbot.core import data_manager
|
||||
from redbot.core.bot import Red
|
||||
|
||||
from seautils.aurora.models.base import AuroraGuildModel
|
||||
from seautils.aurora.models.change import Change
|
||||
from seautils.aurora.models.partials import PartialChannel, PartialRole, PartialUser
|
||||
from seautils.aurora.models.type import Type, type_registry
|
||||
from seautils.aurora.utils.logger import logger
|
||||
from seautils.aurora.utils.misc import timedelta_to_string
|
||||
|
||||
|
||||
class Moderation(AuroraGuildModel):
|
||||
"""This class represents a moderation case in the database.
|
||||
|
||||
Attributes:
|
||||
bot (Red): The bot instance.
|
||||
guild (discord.Guild): The guild the case belongs to.
|
||||
moderation_id (int): The ID of the moderation case.
|
||||
timestamp (datetime): The timestamp of the case.
|
||||
moderation_type (Type): The type of moderation case.
|
||||
target_type (str): The type of target. Should be either `user` or `channel`.
|
||||
target_id (int): The ID of the target.
|
||||
moderator_id (int): The ID of the moderator who issued the case.
|
||||
role_id (int): The ID of the role, if applicable.
|
||||
duration (timedelta): The duration of the case, if applicable.
|
||||
end_timestamp (datetime): The end timestamp of the case, if applicable.
|
||||
reason (str): The reason for the case.
|
||||
resolved (bool): Whether the case is resolved.
|
||||
resolved_by (int): The ID of the user who resolved the case.
|
||||
resolve_reason (str): The reason the case was resolved.
|
||||
expired (bool): Whether the case is expired.
|
||||
changes (List[Change]): A list of changes to the case.
|
||||
metadata (Dict): A dictionary of metadata stored with the case.
|
||||
|
||||
Properties:
|
||||
id (int): The ID of the case.
|
||||
type (Type): The type of the case.
|
||||
unix_timestamp (int): The timestamp of the case as a Unix timestamp.
|
||||
|
||||
Methods:
|
||||
get_moderator: Gets the moderator who issued the case.
|
||||
get_target: Gets the target of the case.
|
||||
get_resolved_by: Gets the user who resolved the case.
|
||||
get_role: Gets the role, if applicable.
|
||||
resolve: Resolves the case.
|
||||
update: Updates the case in the database.
|
||||
|
||||
Class Methods:
|
||||
from_dict: Creates a `Moderation` object from a dictionary.
|
||||
from_result: Creates a `Moderation` object from a database result.
|
||||
execute: Executes a query on the database.
|
||||
get_latest: Gets the latest cases from the database.
|
||||
get_next_case_number: Gets the next case number to use.
|
||||
find_by_id: Finds a case by its ID.
|
||||
find_by_target: Finds cases by the target.
|
||||
find_by_moderator: Finds cases by the moderator.
|
||||
log: Logs a moderation case in the database.
|
||||
|
||||
Static Methods:
|
||||
connect: Connects to the SQLite database.
|
||||
"""
|
||||
|
||||
moderation_id: int
|
||||
timestamp: datetime
|
||||
moderation_type: Type
|
||||
target_type: str
|
||||
target_id: int
|
||||
moderator_id: int
|
||||
role_id: Optional[int] = None
|
||||
duration: Optional[timedelta] = None
|
||||
end_timestamp: Optional[datetime] = None
|
||||
reason: Optional[str] = None
|
||||
resolved: bool
|
||||
resolved_by: Optional[int] = None
|
||||
resolve_reason: Optional[str] = None
|
||||
expired: bool
|
||||
changes: List["Change"]
|
||||
metadata: Dict
|
||||
|
||||
@property
|
||||
def id(self) -> int:
|
||||
return self.moderation_id
|
||||
|
||||
@property
|
||||
def type(self) -> Type:
|
||||
return self.moderation_type
|
||||
|
||||
@property
|
||||
def unix_timestamp(self) -> int:
|
||||
return int(self.timestamp.timestamp())
|
||||
|
||||
async def get_moderator(self) -> "PartialUser":
|
||||
return await PartialUser.from_id(self.bot, self.moderator_id)
|
||||
|
||||
async def get_target(self) -> Union["PartialUser", "PartialChannel"]:
|
||||
if self.target_type.lower() == "user":
|
||||
return await PartialUser.from_id(self.bot, self.target_id)
|
||||
return await PartialChannel.from_id(self.bot, self.target_id, self.guild)
|
||||
|
||||
async def get_resolved_by(self) -> Optional["PartialUser"]:
|
||||
if self.resolved_by:
|
||||
return await PartialUser.from_id(self.bot, self.resolved_by)
|
||||
return None
|
||||
|
||||
async def get_role(self) -> Optional["PartialRole"]:
|
||||
if self.role_id:
|
||||
return await PartialRole.from_id(self.bot, self.guild, self.role_id)
|
||||
return None
|
||||
|
||||
def __str__(self) -> str:
|
||||
return f"{self.moderation_type} {self.target_type} {self.target_id} {self.reason}"
|
||||
|
||||
def __int__(self) -> int:
|
||||
return self.moderation_id
|
||||
|
||||
async def resolve(self, resolved_by: int, reason: str) -> Tuple[bool, str]:
|
||||
if self.resolved:
|
||||
raise ValueError("Case is already resolved!")
|
||||
|
||||
self.resolved = True
|
||||
self.resolved_by = resolved_by
|
||||
self.resolve_reason = reason
|
||||
|
||||
success, msg = await self.type.resolve_handler(moderation=self, reason=reason)
|
||||
|
||||
if not self.changes:
|
||||
self.changes.append(Change.from_dict(self.bot, {
|
||||
"type": "ORIGINAL",
|
||||
"timestamp": self.timestamp,
|
||||
"reason": self.reason,
|
||||
"user_id": self.moderator_id,
|
||||
"duration": self.duration,
|
||||
"end_timestamp": self.end_timestamp,
|
||||
}))
|
||||
self.changes.append(Change.from_dict(self.bot, {
|
||||
"type": "RESOLVE",
|
||||
"timestamp": datetime.now(),
|
||||
"reason": reason,
|
||||
"user_id": resolved_by,
|
||||
}))
|
||||
|
||||
await self.update()
|
||||
return success, msg
|
||||
|
||||
async def update(self) -> None:
|
||||
from seautils.aurora.utils.json import dumps
|
||||
query = f"UPDATE moderation_{self.guild_id} SET timestamp = ?, moderation_type = ?, target_type = ?, moderator_id = ?, role_id = ?, duration = ?, end_timestamp = ?, reason = ?, resolved = ?, resolved_by = ?, resolve_reason = ?, expired = ?, changes = ?, metadata = ? WHERE moderation_id = ?;"
|
||||
|
||||
await self.execute(query, (
|
||||
self.timestamp.timestamp(),
|
||||
self.moderation_type.key,
|
||||
self.target_type,
|
||||
self.moderator_id,
|
||||
self.role_id,
|
||||
str(self.duration) if self.duration else None,
|
||||
self.end_timestamp.timestamp() if self.end_timestamp else None,
|
||||
self.reason,
|
||||
self.resolved,
|
||||
self.resolved_by,
|
||||
self.resolve_reason,
|
||||
self.expired,
|
||||
dumps(self.changes),
|
||||
dumps(self.metadata),
|
||||
self.moderation_id,
|
||||
))
|
||||
|
||||
logger.verbose("Row updated in moderation_%s!\n%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s",
|
||||
self.moderation_id,
|
||||
self.guild_id,
|
||||
self.timestamp.timestamp(),
|
||||
self.moderation_type.key,
|
||||
self.target_type,
|
||||
self.moderator_id,
|
||||
self.role_id,
|
||||
str(self.duration) if self.duration else None,
|
||||
self.end_timestamp.timestamp() if self.end_timestamp else None,
|
||||
self.reason,
|
||||
self.resolved,
|
||||
self.resolved_by,
|
||||
self.resolve_reason,
|
||||
self.expired,
|
||||
dumps(self.changes),
|
||||
dumps(self.metadata),
|
||||
)
|
||||
|
||||
@classmethod
|
||||
async def from_dict(cls, bot: Red, data: dict) -> "Moderation":
|
||||
if data.get("guild_id"):
|
||||
try:
|
||||
guild = bot.get_guild(data["guild_id"])
|
||||
if not guild:
|
||||
guild = await bot.fetch_guild(data["guild_id"])
|
||||
except (discord.Forbidden, discord.HTTPException):
|
||||
guild = None
|
||||
data.update({"guild": guild})
|
||||
return cls(bot=bot, **data)
|
||||
|
||||
@classmethod
|
||||
async def from_result(cls, bot: Red, result: Iterable, guild_id: int) -> "Moderation":
|
||||
if result[7] is not None and result[7] != "NULL":
|
||||
hours, minutes, seconds = map(int, result[7].split(':'))
|
||||
duration = timedelta(hours=hours, minutes=minutes, seconds=seconds)
|
||||
else:
|
||||
duration = None
|
||||
|
||||
if result[14] is not None:
|
||||
changes = json.loads(result[14])
|
||||
change_obj_list = []
|
||||
if changes:
|
||||
for change in changes:
|
||||
change_obj_list.append(Change.from_dict(bot=bot, data=change))
|
||||
|
||||
if result[15] is not None:
|
||||
metadata = json.loads(result[15])
|
||||
else:
|
||||
metadata = {}
|
||||
|
||||
moderation_type = str.lower(result[2])
|
||||
if moderation_type in type_registry:
|
||||
moderation_type = type_registry[moderation_type]
|
||||
else:
|
||||
logger.error("Unknown moderation type in case %s: %s", result[0], result[2])
|
||||
|
||||
case = {
|
||||
"moderation_id": int(result[0]),
|
||||
"guild_id": int(guild_id),
|
||||
"timestamp": datetime.fromtimestamp(result[1]),
|
||||
"moderation_type": moderation_type,
|
||||
"target_type": str(result[3]),
|
||||
"target_id": int(result[4]),
|
||||
"moderator_id": int(result[5]),
|
||||
"role_id": int(result[6]) if result[6] is not None else None,
|
||||
"duration": duration,
|
||||
"end_timestamp": datetime.fromtimestamp(result[8]) if result[8] is not None else None,
|
||||
"reason": result[9],
|
||||
"resolved": bool(result[10]),
|
||||
"resolved_by": result[11],
|
||||
"resolve_reason": result[12],
|
||||
"expired": bool(result[13]),
|
||||
"changes": change_obj_list,
|
||||
"metadata": metadata if metadata else {},
|
||||
}
|
||||
return await cls.from_dict(bot=bot, data=case)
|
||||
|
||||
@staticmethod
|
||||
async def connect() -> Connection:
|
||||
"""Connects to the SQLite database, and returns a connection object."""
|
||||
try:
|
||||
connection = await aiosqlite_connect(
|
||||
database=data_manager.cog_data_path(raw_name="Aurora") / "aurora.db"
|
||||
)
|
||||
return connection
|
||||
|
||||
except OperationalError as e:
|
||||
logger.error("Unable to access the SQLite database!\nError:\n%s", e.msg)
|
||||
raise ConnectionRefusedError(
|
||||
f"Unable to access the SQLite Database!\n{e.msg}"
|
||||
) from e
|
||||
|
||||
@classmethod
|
||||
async def execute(cls, query: str, parameters: tuple | None = None, bot: Red | None = None, guild_id: int | None = None, cursor: Cursor | None = None, return_obj: bool = True) -> Union[Tuple["Moderation"], Iterable[Row]]:
|
||||
"""Executes a query on the database.
|
||||
|
||||
Arguments:
|
||||
query (str): The query to execute.
|
||||
parameters (tuple): The parameters to pass to the query.
|
||||
bot (Red): The bot instance.
|
||||
guild_id (int): The ID of the guild to execute the query on.
|
||||
cursor (Cursor): The cursor to use for the query.
|
||||
return_obj (bool): Whether to return the case object(s). Defaults to `True`. If `False`, returns a `Iterable` of `aiosqlite.Row` objects.
|
||||
Returns: The result of the query, either as a `Tuple` of `Moderation` objects or an `Iterable` of `aiosqlite.Row` objects.
|
||||
"""
|
||||
logger.trace("Executing query: \"%s\" with parameters \"%s\"", query, parameters)
|
||||
if not parameters:
|
||||
parameters = ()
|
||||
if not cursor:
|
||||
no_cursor = True
|
||||
database = await cls.connect()
|
||||
cursor = await database.cursor()
|
||||
else:
|
||||
no_cursor = False
|
||||
|
||||
try:
|
||||
await cursor.execute(query, parameters)
|
||||
except OperationalError as e:
|
||||
logger.error("Error executing query: \"%s\" with parameters \"%s\"\nError:\n%s",
|
||||
query, parameters, e)
|
||||
raise OperationalError(f"Error executing query: \"{query}\" with parameters \"{parameters}\"") from e
|
||||
results = await cursor.fetchall()
|
||||
await database.commit()
|
||||
if no_cursor:
|
||||
await cursor.close()
|
||||
await database.close()
|
||||
|
||||
if results and return_obj and bot and guild_id:
|
||||
cases = []
|
||||
for result in results:
|
||||
if result[0] == 0:
|
||||
continue
|
||||
case = await cls.from_result(bot=bot, result=result, guild_id=guild_id)
|
||||
cases.append(case)
|
||||
return tuple(cases)
|
||||
return results
|
||||
|
||||
@classmethod
|
||||
async def get_latest(cls, bot: Red, guild_id: int, before: datetime | None = None, after: datetime | None = None, limit: int | None = None, offset: int = 0, types: Iterable[Type] | None = None, expired: bool | None = None, cursor: Cursor | None = None) -> Tuple["Moderation"]:
|
||||
params = []
|
||||
query = f"SELECT * FROM moderation_{guild_id}"
|
||||
conditions = []
|
||||
|
||||
if types:
|
||||
conditions.append(f"moderation_type IN ({', '.join(['?' for _ in types])})")
|
||||
params.extend([t.key for t in types])
|
||||
if before:
|
||||
conditions.append("timestamp < ?")
|
||||
params.append(int(before.timestamp()))
|
||||
if after:
|
||||
conditions.append("timestamp > ?")
|
||||
params.append(int(after.timestamp()))
|
||||
if expired is not None:
|
||||
conditions.append("expired = ?")
|
||||
params.append(int(expired))
|
||||
|
||||
if conditions:
|
||||
query += " WHERE " + " AND ".join(conditions)
|
||||
|
||||
query += " ORDER BY moderation_id DESC"
|
||||
|
||||
if limit:
|
||||
query += " LIMIT ? OFFSET ?"
|
||||
params.extend((limit, offset))
|
||||
query += ";"
|
||||
return await cls.execute(bot=bot, guild_id=guild_id, query=query, parameters=tuple(params) if params else (), cursor=cursor)
|
||||
|
||||
@classmethod
|
||||
async def get_next_case_number(cls, bot: Red, guild_id: int, cursor: Cursor | None = None) -> int:
|
||||
result = await cls.get_latest(bot=bot, guild_id=guild_id, cursor=cursor, limit=1)
|
||||
return (result[0].moderation_id + 1) if result else 1
|
||||
|
||||
@classmethod
|
||||
async def find_by_id(cls, bot: Red, moderation_id: int, guild_id: int, cursor: Cursor | None = None) -> "Moderation":
|
||||
query = f"SELECT * FROM moderation_{guild_id} WHERE moderation_id = ?;"
|
||||
case = await cls.execute(bot=bot, guild_id=guild_id, query=query, parameters=(moderation_id,), cursor=cursor)
|
||||
if case:
|
||||
return case[0]
|
||||
raise ValueError(f"Case {moderation_id} not found in moderation_{guild_id}!")
|
||||
|
||||
@classmethod
|
||||
async def find_by_target(cls, bot: Red, guild_id: int, target: int, before: datetime = None, after: datetime = None, types: Iterable[Type] | None = None, expired: bool | None = None, cursor: Cursor | None = None) -> Tuple["Moderation"]:
|
||||
query = f"SELECT * FROM moderation_{guild_id} WHERE target_id = ?"
|
||||
params = [target]
|
||||
if types:
|
||||
query += f" AND moderation_type IN ({', '.join(['?' for _ in types])})"
|
||||
for t in types:
|
||||
params.append(t.key)
|
||||
if before:
|
||||
query += " AND timestamp < ?"
|
||||
params.append(int(before.timestamp()))
|
||||
if after:
|
||||
query += " AND timestamp > ?"
|
||||
params.append(int(after.timestamp()))
|
||||
if expired is not None:
|
||||
query += " AND expired = ?"
|
||||
params.append(int(expired))
|
||||
|
||||
query += " ORDER BY moderation_id DESC;"
|
||||
|
||||
return await cls.execute(bot=bot, guild_id=guild_id, query=query, parameters=params, cursor=cursor)
|
||||
|
||||
@classmethod
|
||||
async def find_by_moderator(cls, bot: Red, guild_id: int, moderator: int, before: datetime = None, after: datetime = None, types: Iterable[Type] | None = None, expired: bool | None = None, cursor: Cursor | None = None) -> Tuple["Moderation"]:
|
||||
query = f"SELECT * FROM moderation_{guild_id} WHERE moderator_id = ?"
|
||||
params = [moderator]
|
||||
if types:
|
||||
query += f" AND moderation_type IN ({', '.join(['?' for _ in types])})"
|
||||
for t in types:
|
||||
params.append(t.key)
|
||||
if before:
|
||||
query += " AND timestamp < ?"
|
||||
params.append(int(before.timestamp()))
|
||||
if after:
|
||||
query += " AND timestamp > ?"
|
||||
params.append(int(after.timestamp()))
|
||||
if expired is not None:
|
||||
query += " AND expired = ?"
|
||||
params.append(int(expired))
|
||||
|
||||
query += " ORDER BY moderation_id DESC;"
|
||||
|
||||
return await cls.execute(bot=bot, guild_id=guild_id, query=query, parameters=params, cursor=cursor)
|
||||
|
||||
@classmethod
|
||||
async def log(
|
||||
cls,
|
||||
bot: Red,
|
||||
guild_id: int,
|
||||
moderator_id: int,
|
||||
moderation_type: Type,
|
||||
target_type: str,
|
||||
target_id: int,
|
||||
role_id: int | None = None,
|
||||
duration: timedelta | None = None,
|
||||
reason: str | None = None,
|
||||
database: sqlite3.Connection | None = None,
|
||||
timestamp: datetime | None = None,
|
||||
resolved: bool = False,
|
||||
resolved_by: int | None = None,
|
||||
resolved_reason: str | None = None,
|
||||
expired: bool | None = None,
|
||||
changes: list | None = None,
|
||||
metadata: dict | None = None,
|
||||
return_obj: bool = True,
|
||||
) -> Union["Moderation", int]:
|
||||
"""Logs a moderation case in the database.
|
||||
|
||||
Args:
|
||||
bot (Red): The bot instance.
|
||||
guild_id (int): The ID of the guild to log the case in.
|
||||
moderator_id (int): The ID of the moderator who issued the case.
|
||||
moderation_type (Type): The type of moderation case. See `aurora.models.moderation_types` for the built-in options.
|
||||
target_type (str): The type of target. Should be either `user` or `channel`.
|
||||
target_id (int): The ID of the target.
|
||||
role_id (int): The ID of the role, if applicable.
|
||||
duration (timedelta): The duration of the case, if applicable.
|
||||
reason (str): The reason for the case.
|
||||
database (sqlite3.Connection): The database connection to use to log the case. A connection will be automatically created if not provided.
|
||||
timestamp (datetime): The timestamp of the case. Will be automatically generated if not provided.
|
||||
resolved (bool): Whether the case is resolved.
|
||||
resolved_by (int): The ID of the user who resolved the case.
|
||||
resolved_reason (str): The reason the case was resolved.
|
||||
expired (bool): Whether the case is expired.
|
||||
changes (list): A list of changes to log. You usually shouldn't pass this, as it's automatically generated by the `/edit` and `/resolve` commands.
|
||||
metadata (dict): A dictionary of metadata to store with the case.
|
||||
return_obj (bool): Whether to return the case object. Defaults to `True`. If `False`, returns the case ID.
|
||||
|
||||
Returns:
|
||||
Union[Moderation, int]: The `Moderation` object if `return_obj` is `True`, otherwise the case ID.
|
||||
"""
|
||||
from seautils.aurora.utils.json import dumps
|
||||
if not timestamp:
|
||||
timestamp = datetime.fromtimestamp(time())
|
||||
elif not isinstance(timestamp, datetime):
|
||||
timestamp = datetime.fromtimestamp(timestamp)
|
||||
|
||||
if duration == "NULL":
|
||||
duration = None
|
||||
|
||||
if duration is not None:
|
||||
end_timestamp = timestamp + duration
|
||||
else:
|
||||
duration = None
|
||||
end_timestamp = None
|
||||
|
||||
if not expired:
|
||||
if end_timestamp:
|
||||
expired = bool(timestamp > end_timestamp)
|
||||
else:
|
||||
expired = False
|
||||
|
||||
if reason == "NULL":
|
||||
reason = None
|
||||
|
||||
if resolved_by in ["NULL", "?"]:
|
||||
resolved_by = None
|
||||
|
||||
if resolved_reason == "NULL":
|
||||
resolved_reason = None
|
||||
|
||||
if role_id == 0:
|
||||
role_id = None
|
||||
|
||||
if not database:
|
||||
database = await cls.connect()
|
||||
close_db = True
|
||||
else:
|
||||
close_db = False
|
||||
|
||||
moderation_id = await cls.get_next_case_number(bot=bot, guild_id=guild_id)
|
||||
|
||||
case = {
|
||||
"moderation_id": moderation_id,
|
||||
"timestamp": timestamp.timestamp(),
|
||||
"moderation_type": moderation_type.key,
|
||||
"target_type": target_type,
|
||||
"target_id": target_id,
|
||||
"moderator_id": moderator_id,
|
||||
"role_id": role_id,
|
||||
"duration": timedelta_to_string(duration) if duration else None,
|
||||
"end_timestamp": end_timestamp.timestamp() if end_timestamp else None,
|
||||
"reason": reason,
|
||||
"resolved": resolved,
|
||||
"resolved_by": resolved_by,
|
||||
"resolve_reason": resolved_reason,
|
||||
"expired": expired,
|
||||
"changes": dumps(changes),
|
||||
"metadata": dumps(metadata)
|
||||
}
|
||||
|
||||
sql = f"INSERT INTO `moderation_{guild_id}` (moderation_id, timestamp, moderation_type, target_type, target_id, moderator_id, role_id, duration, end_timestamp, reason, resolved, resolved_by, resolve_reason, expired, changes, metadata) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)"
|
||||
await database.execute(sql, tuple(case.values()))
|
||||
|
||||
await database.commit()
|
||||
if close_db:
|
||||
await database.close()
|
||||
|
||||
logger.verbose(
|
||||
"Row inserted into moderation_%s!\n%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s",
|
||||
guild_id,
|
||||
case["moderation_id"],
|
||||
case["timestamp"],
|
||||
case["moderation_type"],
|
||||
case["target_type"],
|
||||
case["target_id"],
|
||||
case["moderator_id"],
|
||||
case["role_id"],
|
||||
case["duration"],
|
||||
case["end_timestamp"],
|
||||
case["reason"],
|
||||
case["resolved"],
|
||||
case["resolved_by"],
|
||||
case["resolve_reason"],
|
||||
case["expired"],
|
||||
case["changes"],
|
||||
case["metadata"],
|
||||
)
|
||||
|
||||
if return_obj:
|
||||
return await cls.find_by_id(bot=bot, moderation_id=moderation_id, guild_id=guild_id)
|
||||
return moderation_id
|
76
seautils/aurora/models/partials.py
Normal file
76
seautils/aurora/models/partials.py
Normal file
|
@ -0,0 +1,76 @@
|
|||
from discord import ChannelType, Forbidden, Guild, HTTPException, InvalidData, NotFound
|
||||
from redbot.core.bot import Red
|
||||
|
||||
from .base import AuroraBaseModel, AuroraGuildModel
|
||||
|
||||
|
||||
class PartialUser(AuroraBaseModel):
|
||||
id: int
|
||||
username: str
|
||||
discriminator: int
|
||||
|
||||
@property
|
||||
def name(self):
|
||||
return f"{self.username}#{self.discriminator}" if self.discriminator != 0 else self.username
|
||||
|
||||
def __str__(self):
|
||||
return self.name
|
||||
|
||||
@classmethod
|
||||
async def from_id(cls, bot: Red, user_id: int) -> "PartialUser":
|
||||
user = bot.get_user(user_id)
|
||||
if not user:
|
||||
try:
|
||||
user = await bot.fetch_user(user_id)
|
||||
return cls(bot=bot, id=user.id, username=user.name, discriminator=user.discriminator)
|
||||
except NotFound:
|
||||
return cls(bot=bot, id=user_id, username="Deleted User", discriminator=0)
|
||||
return cls(bot=bot, id=user.id, username=user.name, discriminator=user.discriminator)
|
||||
|
||||
|
||||
class PartialChannel(AuroraGuildModel):
|
||||
id: int
|
||||
name: str
|
||||
type: ChannelType
|
||||
|
||||
@property
|
||||
def mention(self):
|
||||
if self.name in ["Deleted Channel", "Forbidden Channel"]:
|
||||
return self.name
|
||||
return f"<#{self.id}>"
|
||||
|
||||
def __str__(self):
|
||||
return self.mention
|
||||
|
||||
@classmethod
|
||||
async def from_id(cls, bot: Red, channel_id: int, guild: Guild) -> "PartialChannel":
|
||||
channel = bot.get_channel(channel_id)
|
||||
if not channel:
|
||||
try:
|
||||
channel = await bot.fetch_channel(channel_id)
|
||||
return cls(bot=bot, guild_id=channel.guild.id, guild=guild, id=channel.id, name=channel.name, type=channel.type)
|
||||
except (NotFound, InvalidData, HTTPException, Forbidden) as e:
|
||||
if e == Forbidden:
|
||||
return cls(bot=bot, guild_id=0, id=channel_id, name="Forbidden Channel")
|
||||
return cls(bot=bot, guild_id=0, id=channel_id, name="Deleted Channel", type=ChannelType.text)
|
||||
return cls(bot=bot, guild_id=channel.guild.id, guild=guild, id=channel.id, name=channel.name, type=channel.type)
|
||||
|
||||
class PartialRole(AuroraGuildModel):
|
||||
id: int
|
||||
name: str
|
||||
|
||||
@property
|
||||
def mention(self):
|
||||
if self.name in ["Deleted Role", "Forbidden Role"]:
|
||||
return self.name
|
||||
return f"<@&{self.id}>"
|
||||
|
||||
def __str__(self):
|
||||
return self.mention
|
||||
|
||||
@classmethod
|
||||
async def from_id(cls, bot: Red, guild: Guild, role_id: int) -> "PartialRole":
|
||||
role = guild.get_role(role_id)
|
||||
if not role:
|
||||
return cls(bot=bot, guild_id=guild.id, id=role_id, name="Deleted Role")
|
||||
return cls(bot=bot, guild_id=guild.id, id=role.id, name=role.name)
|
86
seautils/aurora/models/type.py
Normal file
86
seautils/aurora/models/type.py
Normal file
|
@ -0,0 +1,86 @@
|
|||
from abc import abstractmethod
|
||||
from typing import Any, Dict, Tuple
|
||||
|
||||
from class_registry import AutoRegister, ClassRegistry
|
||||
from discord import Interaction, Member, User
|
||||
from discord.abc import Messageable
|
||||
from redbot.core import commands
|
||||
|
||||
type_registry: Dict['str', 'Type'] = ClassRegistry(attr_name='key', unique=True)
|
||||
|
||||
class Type(metaclass=AutoRegister(type_registry)):
|
||||
"""This is a base class for moderation types.
|
||||
|
||||
Attributes:
|
||||
key (str): The key to use for this type. This should be unique, as this is how the type is registered internally. Changing this key will break existing cases with this type.
|
||||
string (str): The string to display for this type.
|
||||
verb (str): The verb to use for this type.
|
||||
embed_desc (str): The string to use for embed descriptions.
|
||||
channel (bool): Whether this type targets channels or users. If this is `true` in a subclass, its overriden handler methods should be typed with `discord.abc.Messageable` instead of `discord.Member | discord.User`.
|
||||
|
||||
Properties:
|
||||
name (str): The string to display for this type. This is the same as the `string` attribute.
|
||||
"""
|
||||
|
||||
key = "type"
|
||||
string = "type"
|
||||
verb = "typed"
|
||||
embed_desc = "been "
|
||||
channel = False
|
||||
|
||||
@abstractmethod
|
||||
def void(self) -> Any:
|
||||
"""This method should be overridden by any child classes. This is a placeholder to allow for automatic class registration."""
|
||||
raise NotImplementedError
|
||||
|
||||
@property
|
||||
def name(self) -> str:
|
||||
"""Alias for the `string` attribute."""
|
||||
return self.string
|
||||
|
||||
def __str__(self) -> str:
|
||||
return self.string
|
||||
|
||||
@classmethod
|
||||
async def handler(cls, ctx: commands.Context, target: Member | User | Messageable, silent: bool, **kwargs) -> 'Type': # pylint: disable=unused-argument
|
||||
"""This method should be overridden by any child classes, but should retain the same starting keyword arguments.
|
||||
|
||||
Arguments:
|
||||
ctx (commands.Context): The context of the command.
|
||||
target (discord.Member | discord.User | discord.abc.Messageable): The target of the moderation.
|
||||
silent (bool): Whether details about the moderation should be DM'ed to the target of the moderation.
|
||||
"""
|
||||
raise NotImplementedError
|
||||
|
||||
@classmethod
|
||||
async def resolve_handler(cls, moderation, reason: str) -> Tuple[bool, str]: # pylint: disable=unused-argument
|
||||
"""This method should be overridden by any resolvable child classes, but should retain the same keyword arguments.
|
||||
If your moderation type should not be resolvable, do not override this.
|
||||
|
||||
Arguments:
|
||||
moderation (aurora.models.Moderation): The moderation to resolve.
|
||||
reason (str): The reason for resolving the moderation.
|
||||
"""
|
||||
raise NotImplementedError
|
||||
|
||||
@classmethod
|
||||
async def expiry_handler(cls, moderation) -> int: # pylint: disable=unused-argument
|
||||
"""This method should be overridden by any expirable child classes, but should retain the same keyword arguments and return an integer.
|
||||
If your moderation type should not expire, do not override this, but also do not set an `end_timestamp` when you log your moderation.
|
||||
|
||||
Arguments:
|
||||
moderation (aurora.models.Moderation): The moderation that is expiring.
|
||||
"""
|
||||
raise NotImplementedError
|
||||
|
||||
@classmethod
|
||||
async def duration_edit_handler(cls, interaction: Interaction, old_moderation, new_moderation) -> bool: # pylint: disable=unused-argument
|
||||
"""This method should be overridden by any child classes with editable durations, but should retain the same keyword arguments and should return True if the duration was successfully modified, or False if it was not.
|
||||
If your moderation type's duration should not be editable, do not override this.
|
||||
|
||||
Arguments:
|
||||
interaction (discord.Interaction): The interaction that triggered the duration edit.
|
||||
old_moderation (aurora.models.Moderation): The old moderation, from before the `/edit` command was invoked.
|
||||
new_moderation (aurora.models.Moderation): The current state of the moderation.
|
||||
"""
|
||||
raise NotImplementedError
|
0
seautils/aurora/utils/__init__.py
Normal file
0
seautils/aurora/utils/__init__.py
Normal file
39
seautils/aurora/utils/config.py
Normal file
39
seautils/aurora/utils/config.py
Normal file
|
@ -0,0 +1,39 @@
|
|||
from redbot.core import Config
|
||||
|
||||
config: Config = Config.get_conf(None, identifier=481923957134912, cog_name="Aurora")
|
||||
|
||||
|
||||
def register_config(config_obj: Config):
|
||||
config_obj.register_guild(
|
||||
show_moderator=True,
|
||||
use_discord_permissions=True,
|
||||
respect_hierarchy=True,
|
||||
ignore_modlog=True,
|
||||
ignore_other_bots=True,
|
||||
dm_users=True,
|
||||
log_channel=" ",
|
||||
immune_roles=[],
|
||||
history_ephemeral=False,
|
||||
history_inline=False,
|
||||
history_pagesize=5,
|
||||
history_inline_pagesize=6,
|
||||
auto_evidenceformat=False,
|
||||
addrole_whitelist=[],
|
||||
)
|
||||
config_obj.register_user(
|
||||
history_ephemeral=None,
|
||||
history_inline=None,
|
||||
history_pagesize=None,
|
||||
history_inline_pagesize=None,
|
||||
auto_evidenceformat=None,
|
||||
)
|
||||
|
||||
moderation_type = {
|
||||
"show_in_history": True,
|
||||
"show_moderator": None,
|
||||
"use_discord_permissions": None,
|
||||
"dm_users": None,
|
||||
}
|
||||
|
||||
config_obj.init_custom("types", 2)
|
||||
config_obj.register_custom("types", **moderation_type)
|
594
seautils/aurora/utils/factory.py
Normal file
594
seautils/aurora/utils/factory.py
Normal file
|
@ -0,0 +1,594 @@
|
|||
# pylint: disable=cyclic-import
|
||||
from datetime import datetime, timedelta
|
||||
from typing import Union
|
||||
|
||||
from discord import Color, Embed, Guild, Interaction, Member, Message, Role, User
|
||||
from redbot.core import commands
|
||||
from redbot.core.bot import Red
|
||||
from redbot.core.utils.chat_formatting import (
|
||||
bold,
|
||||
box,
|
||||
error,
|
||||
humanize_timedelta,
|
||||
warning,
|
||||
)
|
||||
|
||||
from seautils.aurora.models.moderation import Moderation
|
||||
from seautils.aurora.models.partials import PartialUser
|
||||
from seautils.aurora.models.type import Type
|
||||
from seautils.aurora.utils.config import config
|
||||
from seautils.aurora.utils.misc import get_bool_emoji, get_pagesize_str
|
||||
|
||||
|
||||
async def message_factory(
|
||||
bot: Red,
|
||||
color: Color,
|
||||
guild: Guild,
|
||||
reason: str,
|
||||
moderation_type: Type,
|
||||
moderator: Union[Member, User] | None = None,
|
||||
duration: timedelta | None = None,
|
||||
response: Message | None = None,
|
||||
case: bool = True,
|
||||
) -> Embed:
|
||||
"""This function creates a message from set parameters, meant for contacting the moderated user.
|
||||
|
||||
Args:
|
||||
bot (Red): The bot instance.
|
||||
color (Color): The color of the embed.
|
||||
guild (Guild): The guild the moderation occurred in.
|
||||
reason (str): The reason for the moderation.
|
||||
moderation_type (Type): The type of moderation.
|
||||
moderator (Union[Member, User], optional): The moderator who performed the moderation. Defaults to None.
|
||||
duration (timedelta, optional): The duration of the moderation. Defaults to None.
|
||||
response (Message, optional): The response message. Defaults to None.
|
||||
case (bool, optional): Whether the message is for a moderation case. Defaults to True.
|
||||
|
||||
|
||||
Returns:
|
||||
embed: The message embed.
|
||||
"""
|
||||
if response is not None and moderation_type.key not in [
|
||||
"kick",
|
||||
"ban",
|
||||
"tempban",
|
||||
"unban",
|
||||
]:
|
||||
guild_name = f"[{guild.name}]({response.jump_url})"
|
||||
else:
|
||||
guild_name = guild.name
|
||||
|
||||
if duration:
|
||||
embed_duration = f" for {humanize_timedelta(timedelta=duration)}"
|
||||
else:
|
||||
embed_duration = ""
|
||||
|
||||
embed = Embed(
|
||||
title=str.title(moderation_type.verb),
|
||||
description=f"You have {moderation_type.embed_desc}{moderation_type.verb}{embed_duration} in {guild_name}.",
|
||||
color=color,
|
||||
timestamp=datetime.now(),
|
||||
)
|
||||
|
||||
show_moderator = await config.custom("types", guild.id, moderation_type.key).show_moderator()
|
||||
if show_moderator is None:
|
||||
show_moderator = await config.guild(guild).show_moderator()
|
||||
|
||||
if show_moderator and moderator is not None:
|
||||
embed.add_field(
|
||||
name="Moderator", value=f"`{moderator.name} ({moderator.id})`", inline=False
|
||||
)
|
||||
|
||||
embed.add_field(name="Reason", value=f"`{reason}`", inline=False)
|
||||
|
||||
if guild.icon.url is not None:
|
||||
embed.set_author(name=guild.name, icon_url=guild.icon.url)
|
||||
else:
|
||||
embed.set_author(name=guild.name)
|
||||
|
||||
if case:
|
||||
embed.set_footer(
|
||||
text=f"Case #{await Moderation.get_next_case_number(bot=bot, guild_id=guild.id):,}",
|
||||
icon_url="attachment://arrow.png",
|
||||
)
|
||||
|
||||
return embed
|
||||
|
||||
async def resolve_factory(moderation: Moderation, reason: str) -> Embed:
|
||||
"""This function creates a resolved embed from set parameters, meant for contacting the moderated user.
|
||||
|
||||
Args:
|
||||
moderation (aurora.models.Moderation): The moderation object.
|
||||
reason (str): The reason for resolving the moderation.
|
||||
Returns: `discord.Embed`
|
||||
"""
|
||||
|
||||
embed = Embed(
|
||||
title=str.title(moderation.type.name) + " Resolved",
|
||||
description=f"Your {moderation.type.name} in {moderation.guild.name} has been resolved.",
|
||||
color=await moderation.bot.get_embed_color(moderation.guild.channels[0]),
|
||||
timestamp=datetime.now(),
|
||||
)
|
||||
|
||||
embed.add_field(name="Reason", value=f"`{reason}`", inline=False)
|
||||
|
||||
if moderation.guild.icon.url is not None:
|
||||
embed.set_author(name=moderation.guild.name, icon_url=moderation.guild.icon.url)
|
||||
else:
|
||||
embed.set_author(name=moderation.guild.name)
|
||||
embed.set_footer(
|
||||
text=f"Case #{moderation.id:,}",
|
||||
icon_url="attachment://arrow.png",
|
||||
)
|
||||
|
||||
return embed
|
||||
|
||||
async def log_factory(
|
||||
ctx: commands.Context, moderation: Moderation, resolved: bool = False
|
||||
) -> Embed:
|
||||
"""This function creates a log embed from set parameters, meant for moderation logging.
|
||||
|
||||
Args:
|
||||
ctx (commands.Context): The ctx object.
|
||||
moderation (aurora.models.Moderation): The moderation object.
|
||||
resolved (bool, optional): Whether the case is resolved or not. Defaults to False.
|
||||
"""
|
||||
target = await moderation.get_target()
|
||||
moderator = await moderation.get_moderator()
|
||||
if resolved:
|
||||
embed = Embed(
|
||||
title=f"📕 Case #{moderation.id:,} Resolved",
|
||||
color=await ctx.bot.get_embed_color(ctx.channel),
|
||||
)
|
||||
|
||||
resolved_by = await moderation.get_resolved_by()
|
||||
embed.description = f"**Type:** {str.title(moderation.type.string)}\n**Target:** {target.name} ({target.id})\n**Moderator:** {moderator.name} ({moderator.id})\n**Timestamp:** <t:{moderation.unix_timestamp}> | <t:{moderation.unix_timestamp}:R>"
|
||||
|
||||
if moderation.duration is not None:
|
||||
duration_embed = (
|
||||
f"{humanize_timedelta(timedelta=moderation.duration)} | <t:{moderation.end_timestamp}:R>"
|
||||
if not moderation.expired
|
||||
else str(humanize_timedelta(timedelta=moderation.duration))
|
||||
)
|
||||
embed.description = (
|
||||
embed.description
|
||||
+ f"\n**Duration:** {duration_embed}\n**Expired:** {moderation.expired}"
|
||||
)
|
||||
|
||||
if moderation.metadata.items():
|
||||
for key, value in moderation.metadata.items():
|
||||
embed.description += f"\n**{key.title()}:** {value}"
|
||||
|
||||
embed.add_field(name="Reason", value=box(moderation.reason), inline=False)
|
||||
|
||||
embed.add_field(
|
||||
name="Resolve Reason",
|
||||
value=f"Resolved by `{resolved_by.name}` ({resolved_by.id}) for:\n"
|
||||
+ box(moderation.resolve_reason),
|
||||
inline=False,
|
||||
)
|
||||
else:
|
||||
embed = Embed(
|
||||
title=f"📕 Case #{moderation.id:,}",
|
||||
color=await ctx.bot.get_embed_color(ctx.channel),
|
||||
)
|
||||
embed.description = f"**Type:** {str.title(moderation.type.string)}\n**Target:** {target.name} ({target.id})\n**Moderator:** {moderator.name} ({moderator.id})\n**Timestamp:** <t:{moderation.unix_timestamp}> | <t:{moderation.unix_timestamp}:R>"
|
||||
|
||||
if moderation.duration:
|
||||
embed.description = (
|
||||
embed.description
|
||||
+ f"\n**Duration:** {humanize_timedelta(timedelta=moderation.duration)} | <t:{moderation.unix_timestamp}:R>"
|
||||
)
|
||||
|
||||
if moderation.metadata.items():
|
||||
for key, value in moderation.metadata.items():
|
||||
embed.description += f"\n**{key.title()}:** {value}"
|
||||
|
||||
embed.add_field(name="Reason", value=box(moderation.reason), inline=False)
|
||||
return embed
|
||||
|
||||
|
||||
async def case_factory(interaction: Interaction, moderation: Moderation) -> Embed:
|
||||
"""This function creates a case embed from set parameters.
|
||||
|
||||
Args:
|
||||
interaction (discord.Interaction): The interaction object.
|
||||
moderation (aurora.models.Moderation): The moderation object.
|
||||
"""
|
||||
target = await moderation.get_target()
|
||||
moderator = await moderation.get_moderator()
|
||||
|
||||
embed = Embed(
|
||||
title=f"📕 Case #{moderation.id:,}",
|
||||
color=await interaction.client.get_embed_color(interaction.channel),
|
||||
)
|
||||
embed.description = f"**Type:** {str.title(moderation.type.string)}\n**Target:** `{target.name}` ({target.id})\n**Moderator:** `{moderator.name}` ({moderator.id})\n**Resolved:** {moderation.resolved}\n**Timestamp:** <t:{moderation.unix_timestamp}> | <t:{moderation.unix_timestamp}:R>"
|
||||
|
||||
if moderation.duration:
|
||||
duration_embed = (
|
||||
f"{humanize_timedelta(timedelta=moderation.duration)} | <t:{moderation.unix_timestamp}:R>"
|
||||
if moderation.expired is False
|
||||
else str(humanize_timedelta(timedelta=moderation.duration))
|
||||
)
|
||||
embed.description += f"\n**Duration:** {duration_embed}\n**Expired:** {moderation.expired}"
|
||||
|
||||
embed.description += (
|
||||
f"\n**Changes:** {len(moderation.changes) - 1}"
|
||||
if moderation.changes
|
||||
else "\n**Changes:** 0"
|
||||
)
|
||||
|
||||
if moderation.role_id:
|
||||
role = await moderation.get_role()
|
||||
embed.description += f"\n**Role:** {role.name}"
|
||||
|
||||
if moderation.metadata:
|
||||
if moderation.metadata.get("imported_from"):
|
||||
embed.description += (
|
||||
f"\n**Imported From:** {moderation.metadata['imported_from']}"
|
||||
)
|
||||
moderation.metadata.pop("imported_from")
|
||||
if moderation.metadata.get("imported_timestamp"):
|
||||
embed.description += (
|
||||
f"\n**Imported Timestamp:** <t:{moderation.metadata['imported_timestamp']}> | <t:{moderation.metadata['imported_timestamp']}:R>"
|
||||
)
|
||||
moderation.metadata.pop("imported_timestamp")
|
||||
if moderation.metadata.items():
|
||||
for key, value in moderation.metadata.items():
|
||||
embed.description += f"\n**{key.title()}:** {value}"
|
||||
|
||||
embed.add_field(name="Reason", value=box(moderation.reason), inline=False)
|
||||
|
||||
if moderation.resolved:
|
||||
resolved_user = await moderation.get_resolved_by()
|
||||
embed.add_field(
|
||||
name="Resolve Reason",
|
||||
value=f"Resolved by `{resolved_user.name}` ({resolved_user.id}) for:\n{box(moderation.resolve_reason)}",
|
||||
inline=False,
|
||||
)
|
||||
|
||||
return embed
|
||||
|
||||
|
||||
async def changes_factory(interaction: Interaction, moderation: Moderation) -> Embed:
|
||||
"""This function creates a changes embed from set parameters.
|
||||
|
||||
Args:
|
||||
interaction (discord.Interaction): The interaction object.
|
||||
moderation (aurora.models.Moderation): The moderation object.
|
||||
"""
|
||||
embed = Embed(
|
||||
title=f"📕 Case #{moderation.id:,} Changes",
|
||||
color=await interaction.client.get_embed_color(interaction.channel),
|
||||
)
|
||||
|
||||
memory_dict = {}
|
||||
|
||||
if moderation.changes:
|
||||
for change in moderation.changes:
|
||||
if change.user_id not in memory_dict:
|
||||
memory_dict[str(change.user_id)] = await change.get_user()
|
||||
|
||||
user: PartialUser = memory_dict[str(change.user_id)]
|
||||
|
||||
timestamp = f"<t:{change.unix_timestamp}> | <t:{change.unix_timestamp}:R>"
|
||||
|
||||
if change.type == "ORIGINAL":
|
||||
embed.add_field(
|
||||
name="Original",
|
||||
value=f"**User:** `{user.name}` ({user.id})\n**Reason:** {change.reason}\n**Timestamp:** {timestamp}",
|
||||
inline=False,
|
||||
)
|
||||
|
||||
elif change.type == "EDIT":
|
||||
embed.add_field(
|
||||
name="Edit",
|
||||
value=f"**User:** `{user.name}` ({user.id})\n**Reason:** {change.reason}\n**Timestamp:** {timestamp}",
|
||||
inline=False,
|
||||
)
|
||||
|
||||
elif change.type == "RESOLVE":
|
||||
embed.add_field(
|
||||
name="Resolve",
|
||||
value=f"**User:** `{user.name}` ({user.id})\n**Reason:** {change.reason}\n**Timestamp:** {timestamp}",
|
||||
inline=False,
|
||||
)
|
||||
|
||||
else:
|
||||
embed.description = "*No changes have been made to this case.* 🙁"
|
||||
|
||||
return embed
|
||||
|
||||
|
||||
async def evidenceformat_factory(moderation: Moderation) -> str:
|
||||
"""This function creates a codeblock in evidence format from set parameters.
|
||||
|
||||
Args:
|
||||
interaction (discord.Interaction): The interaction object.
|
||||
moderation (aurora.models.Moderation): The moderation object.
|
||||
"""
|
||||
target = await moderation.get_target()
|
||||
moderator = await moderation.get_moderator()
|
||||
|
||||
content = f"Case: {moderation.id:,} ({str.title(moderation.type.string)})\nTarget: {target.name} ({target.id})\nModerator: {moderator.name} ({moderator.id})"
|
||||
|
||||
if moderation.duration is not None:
|
||||
content += f"\nDuration: {humanize_timedelta(timedelta=moderation.duration)}"
|
||||
|
||||
if moderation.role_id:
|
||||
role = await moderation.get_role()
|
||||
content += "\nRole: " + (role.name if role is not None else moderation.role_id)
|
||||
|
||||
content += f"\nReason: {moderation.reason}"
|
||||
|
||||
for key, value in moderation.metadata.items():
|
||||
content += f"\n{key.title()}: {value}"
|
||||
|
||||
return box(content, "prolog")
|
||||
|
||||
|
||||
########################################################################################################################
|
||||
### Configuration Embeds #
|
||||
########################################################################################################################
|
||||
|
||||
|
||||
async def _config(ctx: commands.Context) -> Embed:
|
||||
"""Generates the core embed for configuration menus to use."""
|
||||
e = Embed(title="Aurora Configuration Menu", color=await ctx.embed_color())
|
||||
e.set_thumbnail(url=ctx.bot.user.display_avatar.url)
|
||||
return e
|
||||
|
||||
|
||||
async def overrides_embed(ctx: commands.Context) -> Embed:
|
||||
"""Generates a configuration menu embed for a user's overrides."""
|
||||
|
||||
override_settings = {
|
||||
"ephemeral": await config.user(ctx.author).history_ephemeral(),
|
||||
"inline": await config.user(ctx.author).history_inline(),
|
||||
"inline_pagesize": await config.user(ctx.author).history_inline_pagesize(),
|
||||
"pagesize": await config.user(ctx.author).history_pagesize(),
|
||||
"auto_evidenceformat": await config.user(ctx.author).auto_evidenceformat(),
|
||||
}
|
||||
|
||||
override_str = [
|
||||
"- "
|
||||
+ bold("Auto Evidence Format: ")
|
||||
+ get_bool_emoji(override_settings["auto_evidenceformat"]),
|
||||
"- " + bold("Ephemeral: ") + get_bool_emoji(override_settings["ephemeral"]),
|
||||
"- " + bold("History Inline: ") + get_bool_emoji(override_settings["inline"]),
|
||||
"- "
|
||||
+ bold("History Inline Pagesize: ")
|
||||
+ get_pagesize_str(override_settings["inline_pagesize"]),
|
||||
"- "
|
||||
+ bold("History Pagesize: ")
|
||||
+ get_pagesize_str(override_settings["pagesize"]),
|
||||
]
|
||||
override_str = "\n".join(override_str)
|
||||
|
||||
e = await _config(ctx)
|
||||
e.title += ": User Overrides"
|
||||
e.description = (
|
||||
"""
|
||||
Use the buttons below to manage your user overrides.
|
||||
These settings will override the relevant guild settings.\n
|
||||
"""
|
||||
+ override_str
|
||||
)
|
||||
return e
|
||||
|
||||
|
||||
async def guild_embed(ctx: commands.Context) -> Embed:
|
||||
"""Generates a configuration menu field value for a guild's settings."""
|
||||
|
||||
guild_settings = {
|
||||
"show_moderator": await config.guild(ctx.guild).show_moderator(),
|
||||
"use_discord_permissions": await config.guild(
|
||||
ctx.guild
|
||||
).use_discord_permissions(),
|
||||
"ignore_modlog": await config.guild(ctx.guild).ignore_modlog(),
|
||||
"ignore_other_bots": await config.guild(ctx.guild).ignore_other_bots(),
|
||||
"dm_users": await config.guild(ctx.guild).dm_users(),
|
||||
"log_channel": await config.guild(ctx.guild).log_channel(),
|
||||
"history_ephemeral": await config.guild(ctx.guild).history_ephemeral(),
|
||||
"history_inline": await config.guild(ctx.guild).history_inline(),
|
||||
"history_pagesize": await config.guild(ctx.guild).history_pagesize(),
|
||||
"history_inline_pagesize": await config.guild(
|
||||
ctx.guild
|
||||
).history_inline_pagesize(),
|
||||
"auto_evidenceformat": await config.guild(ctx.guild).auto_evidenceformat(),
|
||||
"respect_hierarchy": await config.guild(ctx.guild).respect_hierarchy(),
|
||||
}
|
||||
|
||||
channel = ctx.guild.get_channel(guild_settings["log_channel"])
|
||||
if channel is None:
|
||||
channel = warning("Not Set")
|
||||
else:
|
||||
channel = channel.mention
|
||||
|
||||
guild_str = [
|
||||
"- "
|
||||
+ bold("Show Moderator: ")
|
||||
+ get_bool_emoji(guild_settings["show_moderator"]),
|
||||
"- "
|
||||
+ bold("Use Discord Permissions: ")
|
||||
+ get_bool_emoji(guild_settings["use_discord_permissions"]),
|
||||
"- "
|
||||
+ bold("Respect Hierarchy: ")
|
||||
+ get_bool_emoji(guild_settings["respect_hierarchy"]),
|
||||
"- "
|
||||
+ bold("Ignore Modlog: ")
|
||||
+ get_bool_emoji(guild_settings["ignore_modlog"]),
|
||||
"- "
|
||||
+ bold("Ignore Other Bots: ")
|
||||
+ get_bool_emoji(guild_settings["ignore_other_bots"]),
|
||||
"- " + bold("DM Users: ") + get_bool_emoji(guild_settings["dm_users"]),
|
||||
"- "
|
||||
+ bold("Auto Evidence Format: ")
|
||||
+ get_bool_emoji(guild_settings["auto_evidenceformat"]),
|
||||
"- "
|
||||
+ bold("Ephemeral: ")
|
||||
+ get_bool_emoji(guild_settings["history_ephemeral"]),
|
||||
"- "
|
||||
+ bold("History Inline: ")
|
||||
+ get_bool_emoji(guild_settings["history_inline"]),
|
||||
"- "
|
||||
+ bold("History Pagesize: ")
|
||||
+ get_pagesize_str(guild_settings["history_pagesize"]),
|
||||
"- "
|
||||
+ bold("History Inline Pagesize: ")
|
||||
+ get_pagesize_str(guild_settings["history_inline_pagesize"]),
|
||||
"- " + bold("Log Channel: ") + channel,
|
||||
]
|
||||
guild_str = "\n".join(guild_str)
|
||||
|
||||
e = await _config(ctx)
|
||||
e.title += ": Server Configuration"
|
||||
e.description = (
|
||||
"""
|
||||
Use the buttons below to manage Aurora's server configuration.\n
|
||||
"""
|
||||
+ guild_str
|
||||
)
|
||||
return e
|
||||
|
||||
|
||||
async def addrole_embed(ctx: commands.Context) -> Embed:
|
||||
"""Generates a configuration menu field value for a guild's addrole whitelist."""
|
||||
|
||||
roles = []
|
||||
async with config.guild(ctx.guild).addrole_whitelist() as whitelist:
|
||||
for role in whitelist:
|
||||
evalulated_role = ctx.guild.get_role(role) or error(f"`{role}` (Not Found)")
|
||||
if isinstance(evalulated_role, Role):
|
||||
roles.append({
|
||||
"id": evalulated_role.id,
|
||||
"mention": evalulated_role.mention,
|
||||
"position": evalulated_role.position
|
||||
})
|
||||
else:
|
||||
roles.append({
|
||||
"id": role,
|
||||
"mention": error(f"`{role}` (Not Found)"),
|
||||
"position": 0
|
||||
})
|
||||
|
||||
if roles:
|
||||
roles = sorted(roles, key=lambda x: x["position"], reverse=True)
|
||||
roles = [role["mention"] for role in roles]
|
||||
whitelist_str = "\n".join(roles)
|
||||
else:
|
||||
whitelist_str = warning("No roles are on the addrole whitelist!")
|
||||
|
||||
e = await _config(ctx)
|
||||
e.title += ": Addrole Whitelist"
|
||||
e.description = (
|
||||
"Use the select menu below to manage this guild's addrole whitelist."
|
||||
)
|
||||
|
||||
if len(whitelist_str) > 4000 and len(whitelist_str) < 5000:
|
||||
lines = whitelist_str.split("\n")
|
||||
chunks = []
|
||||
chunk = ""
|
||||
for line in lines:
|
||||
if len(chunk) + len(line) > 1024:
|
||||
chunks.append(chunk)
|
||||
chunk = line
|
||||
else:
|
||||
chunk += "\n" + line if chunk else line
|
||||
chunks.append(chunk)
|
||||
|
||||
for chunk in chunks:
|
||||
e.add_field(name="", value=chunk)
|
||||
else:
|
||||
e.description += "\n\n" + whitelist_str
|
||||
|
||||
return e
|
||||
|
||||
|
||||
async def immune_embed(ctx: commands.Context) -> Embed:
|
||||
"""Generates a configuration menu embed for a guild's immune roles."""
|
||||
|
||||
roles = []
|
||||
async with config.guild(ctx.guild).immune_roles() as immune_roles:
|
||||
for role in immune_roles:
|
||||
evalulated_role = ctx.guild.get_role(role) or error(f"`{role}` (Not Found)")
|
||||
if isinstance(evalulated_role, Role):
|
||||
roles.append({
|
||||
"id": evalulated_role.id,
|
||||
"mention": evalulated_role.mention,
|
||||
"position": evalulated_role.position
|
||||
})
|
||||
else:
|
||||
roles.append({
|
||||
"id": role,
|
||||
"mention": error(f"`{role}` (Not Found)"),
|
||||
"position": 0
|
||||
})
|
||||
|
||||
if roles:
|
||||
roles = sorted(roles, key=lambda x: x["position"], reverse=True)
|
||||
roles = [role["mention"] for role in roles]
|
||||
immune_str = "\n".join(roles)
|
||||
else:
|
||||
immune_str = warning("No roles are set as immune roles!")
|
||||
|
||||
e = await _config(ctx)
|
||||
e.title += ": Immune Roles"
|
||||
e.description = "Use the select menu below to manage this guild's immune roles."
|
||||
|
||||
if len(immune_str) > 4000 and len(immune_str) < 5000:
|
||||
lines = immune_str.split("\n")
|
||||
chunks = []
|
||||
chunk = ""
|
||||
for line in lines:
|
||||
if len(chunk) + len(line) > 1024:
|
||||
chunks.append(chunk)
|
||||
chunk = line
|
||||
else:
|
||||
chunk += "\n" + line if chunk else line
|
||||
chunks.append(chunk)
|
||||
|
||||
for chunk in chunks:
|
||||
e.add_field(name="", value=chunk)
|
||||
else:
|
||||
e.description += "\n\n" + immune_str
|
||||
|
||||
return e
|
||||
|
||||
async def type_embed(ctx: commands.Context, moderation_type = Type) -> Embed:
|
||||
"""Generates a configuration menu field value for a guild's settings."""
|
||||
|
||||
type_settings = {
|
||||
"show_in_history": await config.custom("types", ctx.guild.id, moderation_type.key).show_in_history(),
|
||||
"show_moderator": await config.custom("types", ctx.guild.id, moderation_type.key).show_moderator(),
|
||||
"use_discord_permissions": await config.custom("types", ctx.guild.id, moderation_type.key).use_discord_permissions(),
|
||||
"dm_users": await config.custom("types", ctx.guild.id, moderation_type.key).dm_users(),
|
||||
}
|
||||
|
||||
guild_str = [
|
||||
"- "
|
||||
+ bold("Show in History: ")
|
||||
+ get_bool_emoji(type_settings["show_in_history"]),
|
||||
"- "
|
||||
+ bold("Show Moderator: ")
|
||||
+ get_bool_emoji(type_settings["show_moderator"]),
|
||||
"- "
|
||||
+ bold("Use Discord Permissions: ")
|
||||
+ get_bool_emoji(type_settings["use_discord_permissions"]),
|
||||
"- "
|
||||
+ bold("DM Users: ")
|
||||
+ get_bool_emoji(type_settings["dm_users"]),
|
||||
]
|
||||
guild_str = "\n".join(guild_str)
|
||||
|
||||
e = await _config(ctx)
|
||||
e.title += f": {moderation_type.string.title()} Configuration"
|
||||
e.description = (
|
||||
f"""
|
||||
Use the buttons below to manage Aurora's configuration for the {bold(moderation_type.string)} moderation type.
|
||||
If an option has a question mark (\N{BLACK QUESTION MARK ORNAMENT}\N{VARIATION SELECTOR-16}) next to it, Aurora will default to the guild level setting instead.
|
||||
See `{ctx.prefix}aurora set guild` for more information.\n
|
||||
"""
|
||||
+ guild_str
|
||||
)
|
||||
return e
|
134
seautils/aurora/utils/json.py
Normal file
134
seautils/aurora/utils/json.py
Normal file
|
@ -0,0 +1,134 @@
|
|||
import json
|
||||
from datetime import datetime, timedelta
|
||||
from typing import Any
|
||||
|
||||
from redbot.core.bot import Red
|
||||
|
||||
from ..models.base import AuroraBaseModel
|
||||
from ..models.type import Type
|
||||
|
||||
|
||||
class JSONEncoder(json.JSONEncoder):
|
||||
def default(self, o) -> Any:
|
||||
match o:
|
||||
case datetime():
|
||||
return int(o.timestamp())
|
||||
case timedelta():
|
||||
return str(o)
|
||||
case AuroraBaseModel():
|
||||
return o.dump()
|
||||
case Type():
|
||||
return o.key
|
||||
case Red():
|
||||
return None
|
||||
case _:
|
||||
return super().default(o)
|
||||
|
||||
|
||||
# This is a wrapper around the json module's dumps function that uses our custom JSONEncoder class
|
||||
def dumps(obj, *, skipkeys=False, ensure_ascii=True, check_circular=True,
|
||||
allow_nan=True, indent=None, separators=None,
|
||||
default=None, sort_keys=False, **kw) -> str:
|
||||
"""Serialize ``obj`` to a JSON formatted ``str``.
|
||||
|
||||
If ``skipkeys`` is true then ``dict`` keys that are not basic types
|
||||
(``str``, ``int``, ``float``, ``bool``, ``None``) will be skipped
|
||||
instead of raising a ``TypeError``.
|
||||
|
||||
If ``ensure_ascii`` is false, then the return value can contain non-ASCII
|
||||
characters if they appear in strings contained in ``obj``. Otherwise, all
|
||||
such characters are escaped in JSON strings.
|
||||
|
||||
If ``check_circular`` is false, then the circular reference check
|
||||
for container types will be skipped and a circular reference will
|
||||
result in an ``RecursionError`` (or worse).
|
||||
|
||||
If ``allow_nan`` is false, then it will be a ``ValueError`` to
|
||||
serialize out of range ``float`` values (``nan``, ``inf``, ``-inf``) in
|
||||
strict compliance of the JSON specification, instead of using the
|
||||
JavaScript equivalents (``NaN``, ``Infinity``, ``-Infinity``).
|
||||
|
||||
If ``indent`` is a non-negative integer, then JSON array elements and
|
||||
object members will be pretty-printed with that indent level. An indent
|
||||
level of 0 will only insert newlines. ``None`` is the most compact
|
||||
representation.
|
||||
|
||||
If specified, ``separators`` should be an ``(item_separator, key_separator)``
|
||||
tuple. The default is ``(', ', ': ')`` if *indent* is ``None`` and
|
||||
``(',', ': ')`` otherwise. To get the most compact JSON representation,
|
||||
you should specify ``(',', ':')`` to eliminate whitespace.
|
||||
|
||||
``default(obj)`` is a function that should return a serializable version
|
||||
of obj or raise TypeError. The default simply raises TypeError.
|
||||
|
||||
If *sort_keys* is true (default: ``False``), then the output of
|
||||
dictionaries will be sorted by key.
|
||||
"""
|
||||
return json.dumps(
|
||||
obj,
|
||||
cls=JSONEncoder,
|
||||
skipkeys=skipkeys,
|
||||
ensure_ascii=ensure_ascii,
|
||||
check_circular=check_circular,
|
||||
allow_nan=allow_nan,
|
||||
indent=indent,
|
||||
separators=separators,
|
||||
default=default,
|
||||
sort_keys=sort_keys,
|
||||
**kw
|
||||
)
|
||||
|
||||
# This is a wrapper around the json module's dump function that uses our custom JSONEncoder class
|
||||
def dump(obj, fp, *, skipkeys=False, ensure_ascii=True, check_circular=True,
|
||||
allow_nan=True, indent=None, separators=None,
|
||||
default=None, sort_keys=False, **kw) -> str:
|
||||
"""Serialize ``obj`` as a JSON formatted stream to ``fp`` (a
|
||||
``.write()``-supporting file-like object).
|
||||
|
||||
If ``skipkeys`` is true then ``dict`` keys that are not basic types
|
||||
(``str``, ``int``, ``float``, ``bool``, ``None``) will be skipped
|
||||
instead of raising a ``TypeError``.
|
||||
|
||||
If ``ensure_ascii`` is false, then the strings written to ``fp`` can
|
||||
contain non-ASCII characters if they appear in strings contained in
|
||||
``obj``. Otherwise, all such characters are escaped in JSON strings.
|
||||
|
||||
If ``check_circular`` is false, then the circular reference check
|
||||
for container types will be skipped and a circular reference will
|
||||
result in an ``RecursionError`` (or worse).
|
||||
|
||||
If ``allow_nan`` is false, then it will be a ``ValueError`` to
|
||||
serialize out of range ``float`` values (``nan``, ``inf``, ``-inf``)
|
||||
in strict compliance of the JSON specification, instead of using the
|
||||
JavaScript equivalents (``NaN``, ``Infinity``, ``-Infinity``).
|
||||
|
||||
If ``indent`` is a non-negative integer, then JSON array elements and
|
||||
object members will be pretty-printed with that indent level. An indent
|
||||
level of 0 will only insert newlines. ``None`` is the most compact
|
||||
representation.
|
||||
|
||||
If specified, ``separators`` should be an ``(item_separator, key_separator)``
|
||||
tuple. The default is ``(', ', ': ')`` if *indent* is ``None`` and
|
||||
``(',', ': ')`` otherwise. To get the most compact JSON representation,
|
||||
you should specify ``(',', ':')`` to eliminate whitespace.
|
||||
|
||||
``default(obj)`` is a function that should return a serializable version
|
||||
of obj or raise TypeError. The default simply raises TypeError.
|
||||
|
||||
If *sort_keys* is true (default: ``False``), then the output of
|
||||
dictionaries will be sorted by key.
|
||||
"""
|
||||
return json.dump(
|
||||
obj,
|
||||
fp,
|
||||
cls=JSONEncoder,
|
||||
skipkeys=skipkeys,
|
||||
ensure_ascii=ensure_ascii,
|
||||
check_circular=check_circular,
|
||||
allow_nan=allow_nan,
|
||||
indent=indent,
|
||||
separators=separators,
|
||||
default=default,
|
||||
sort_keys=sort_keys,
|
||||
**kw
|
||||
)
|
3
seautils/aurora/utils/logger.py
Normal file
3
seautils/aurora/utils/logger.py
Normal file
|
@ -0,0 +1,3 @@
|
|||
from red_commons.logging import getLogger
|
||||
|
||||
logger = getLogger("red.SeaCogs.Aurora")
|
288
seautils/aurora/utils/misc.py
Normal file
288
seautils/aurora/utils/misc.py
Normal file
|
@ -0,0 +1,288 @@
|
|||
# pylint: disable=cyclic-import
|
||||
from datetime import datetime, timedelta
|
||||
from typing import Optional, Tuple, Union
|
||||
|
||||
import aiosqlite
|
||||
from dateutil.relativedelta import relativedelta as rd
|
||||
from discord import File, Guild, Interaction, Member, SelectOption, TextChannel, User
|
||||
from discord.errors import Forbidden
|
||||
from redbot.core import commands, data_manager
|
||||
from redbot.core.utils.chat_formatting import error
|
||||
|
||||
from seautils.aurora.models.type import Type
|
||||
from seautils.aurora.utils.config import config
|
||||
from seautils.aurora.utils.json import dumps
|
||||
from seautils.aurora.utils.logger import logger
|
||||
|
||||
|
||||
def check_permissions(
|
||||
user: User,
|
||||
permissions: Tuple[str],
|
||||
ctx: commands.Context | Interaction | None = None,
|
||||
guild: Guild | None = None,
|
||||
) -> Union[bool, str]:
|
||||
"""Checks if a user has a specific permission (or a list of permissions) in a channel."""
|
||||
if ctx:
|
||||
member = ctx.guild.get_member(user.id)
|
||||
resolved_permissions = ctx.channel.permissions_for(member)
|
||||
|
||||
elif guild:
|
||||
member = guild.get_member(user.id)
|
||||
resolved_permissions = member.guild_permissions
|
||||
|
||||
else:
|
||||
raise (KeyError)
|
||||
|
||||
for permission in permissions:
|
||||
if (
|
||||
not getattr(resolved_permissions, permission, False)
|
||||
and resolved_permissions.administrator is not True
|
||||
):
|
||||
return permission
|
||||
|
||||
return False
|
||||
|
||||
|
||||
async def check_moddable(
|
||||
target: Union[User, Member, TextChannel], ctx: commands.Context, permissions: Tuple[str], moderation_type: Type,
|
||||
) -> bool:
|
||||
"""Checks if a moderator can moderate a target."""
|
||||
is_channel = isinstance(target, TextChannel)
|
||||
|
||||
use_discord_permissions = await config.custom("types", ctx.guild.id, moderation_type.key).use_discord_permissions()
|
||||
if use_discord_permissions is None:
|
||||
use_discord_permissions = await config.guild(ctx.guild).use_discord_permissions()
|
||||
|
||||
if check_permissions(ctx.bot.user, permissions, guild=ctx.guild):
|
||||
await ctx.send(
|
||||
error(
|
||||
f"I do not have the `{permissions}` permission, required for this action."
|
||||
),
|
||||
ephemeral=True,
|
||||
)
|
||||
return False
|
||||
|
||||
if use_discord_permissions is True:
|
||||
if check_permissions(ctx.author, permissions, guild=ctx.guild):
|
||||
await ctx.send(
|
||||
error(
|
||||
f"You do not have the `{permissions}` permission, required for this action."
|
||||
),
|
||||
ephemeral=True,
|
||||
)
|
||||
return False
|
||||
|
||||
if ctx.author.id == target.id:
|
||||
await ctx.send(
|
||||
content="You cannot moderate yourself!", ephemeral=True
|
||||
)
|
||||
return False
|
||||
|
||||
if not is_channel and target.bot:
|
||||
await ctx.send(
|
||||
content="You cannot moderate bots!", ephemeral=True
|
||||
)
|
||||
return False
|
||||
|
||||
if isinstance(target, Member):
|
||||
if ctx.author.top_role <= target.top_role and await config.guild(ctx.guild).respect_hierarchy() is True:
|
||||
await ctx.send(
|
||||
content=error(
|
||||
"You cannot moderate members with a higher role than you!"
|
||||
),
|
||||
ephemeral=True,
|
||||
)
|
||||
return False
|
||||
|
||||
if (
|
||||
ctx.guild.get_member(ctx.bot.user.id).top_role
|
||||
<= target.top_role
|
||||
):
|
||||
await ctx.send(
|
||||
content=error(
|
||||
"You cannot moderate members with a role higher than the bot!"
|
||||
),
|
||||
ephemeral=True,
|
||||
)
|
||||
return False
|
||||
|
||||
immune_roles = await config.guild(target.guild).immune_roles()
|
||||
|
||||
for role in target.roles:
|
||||
if role.id in immune_roles:
|
||||
await ctx.send(
|
||||
content=error("You cannot moderate members with an immune role!"),
|
||||
ephemeral=True,
|
||||
)
|
||||
return False
|
||||
|
||||
return True
|
||||
|
||||
|
||||
async def log(ctx: commands.Context, moderation_id: int, resolved: bool = False) -> None:
|
||||
"""This function sends a message to the guild's configured logging channel when an infraction takes place."""
|
||||
from seautils.aurora.models.moderation import Moderation
|
||||
from seautils.aurora.utils.factory import log_factory
|
||||
|
||||
logging_channel_id = await config.guild(ctx.guild).log_channel()
|
||||
if logging_channel_id != " ":
|
||||
logging_channel = ctx.guild.get_channel(logging_channel_id)
|
||||
|
||||
try:
|
||||
moderation = await Moderation.find_by_id(ctx.bot, moderation_id, ctx.guild.id)
|
||||
embed = await log_factory(
|
||||
ctx=ctx, moderation=moderation, resolved=resolved
|
||||
)
|
||||
try:
|
||||
await logging_channel.send(embed=embed)
|
||||
except Forbidden:
|
||||
return
|
||||
except ValueError:
|
||||
return
|
||||
|
||||
|
||||
async def send_evidenceformat(ctx: commands.Context, moderation_id: int) -> None:
|
||||
"""This function sends an ephemeral message to the moderator who took the moderation action, with a pre-made codeblock for use in the mod-evidence channel."""
|
||||
from seautils.aurora.models.moderation import Moderation
|
||||
from seautils.aurora.utils.factory import evidenceformat_factory
|
||||
|
||||
send_evidence_bool = (
|
||||
await config.user(ctx.author).auto_evidenceformat()
|
||||
or await config.guild(guild=ctx.guild).auto_evidenceformat()
|
||||
or False
|
||||
)
|
||||
if send_evidence_bool is True:
|
||||
moderation = await Moderation.find_by_id(ctx.bot, moderation_id, ctx.guild.id)
|
||||
content = await evidenceformat_factory(moderation=moderation)
|
||||
if not ctx.interaction:
|
||||
await ctx.author.send(content=content)
|
||||
else:
|
||||
await ctx.send(content=content, ephemeral=True)
|
||||
|
||||
|
||||
def get_bool_emoji(value: Optional[bool]) -> str:
|
||||
"""Returns a unicode emoji based on a boolean value."""
|
||||
match value:
|
||||
case True:
|
||||
return "\N{WHITE HEAVY CHECK MARK}"
|
||||
case False:
|
||||
return "\N{NO ENTRY SIGN}"
|
||||
case _:
|
||||
return "\N{BLACK QUESTION MARK ORNAMENT}\N{VARIATION SELECTOR-16}"
|
||||
|
||||
|
||||
def get_pagesize_str(value: Union[int, None]) -> str:
|
||||
"""Returns a string based on a pagesize value."""
|
||||
if value is None:
|
||||
return "\N{BLACK QUESTION MARK ORNAMENT}\N{VARIATION SELECTOR-16}"
|
||||
return str(value) + " cases per page"
|
||||
|
||||
|
||||
def create_pagesize_options() -> list[SelectOption]:
|
||||
"""Returns a list of SelectOptions for pagesize configuration."""
|
||||
options = []
|
||||
options.append(
|
||||
SelectOption(
|
||||
label="Default",
|
||||
value="default",
|
||||
description="Reset the pagesize to the default value.",
|
||||
)
|
||||
)
|
||||
for i in range(1, 21):
|
||||
options.append(
|
||||
SelectOption(
|
||||
label=str(i),
|
||||
value=str(i),
|
||||
description=f"Set the pagesize to {i}.",
|
||||
)
|
||||
)
|
||||
return options
|
||||
|
||||
def timedelta_from_relativedelta(relativedelta: rd) -> timedelta:
|
||||
"""Converts a relativedelta object to a timedelta object."""
|
||||
now = datetime.now()
|
||||
then = now - relativedelta
|
||||
return now - then
|
||||
|
||||
def timedelta_from_string(string: str) -> timedelta:
|
||||
"""Converts a string to a timedelta object."""
|
||||
hours, minutes, seconds = map(int, string.split(":"))
|
||||
return timedelta(hours=hours, minutes=minutes, seconds=seconds)
|
||||
|
||||
def timedelta_to_string(td: timedelta) -> str:
|
||||
"""Converts a timedelta object to a string."""
|
||||
days = td.days * 24
|
||||
hours, remainder = divmod(td.seconds, 3600)
|
||||
minutes, seconds = divmod(remainder, 60)
|
||||
return f"{days + hours}:{minutes:02}:{seconds:02}"
|
||||
|
||||
def get_footer_image(coginstance: commands.Cog) -> File:
|
||||
"""Returns the footer image for the embeds."""
|
||||
image_path = data_manager.bundled_data_path(coginstance) / "arrow.png"
|
||||
return File(image_path, filename="arrow.png", description="arrow")
|
||||
|
||||
async def create_guild_table(guild: Guild) -> None:
|
||||
from seautils.aurora.models.moderation import Moderation
|
||||
|
||||
try:
|
||||
await Moderation.execute(f"SELECT * FROM `moderation_{guild.id}`", return_obj=False)
|
||||
logger.trace("SQLite Table exists for server %s (%s)", guild.name, guild.id)
|
||||
|
||||
except aiosqlite.OperationalError:
|
||||
query = f"""
|
||||
CREATE TABLE `moderation_{guild.id}` (
|
||||
moderation_id INTEGER PRIMARY KEY,
|
||||
timestamp INTEGER NOT NULL,
|
||||
moderation_type TEXT NOT NULL,
|
||||
target_type TEXT NOT NULL,
|
||||
target_id INTEGER NOT NULL,
|
||||
moderator_id INTEGER NOT NULL,
|
||||
role_id INTEGER,
|
||||
duration TEXT,
|
||||
end_timestamp INTEGER,
|
||||
reason TEXT,
|
||||
resolved INTEGER NOT NULL,
|
||||
resolved_by TEXT,
|
||||
resolve_reason TEXT,
|
||||
expired INTEGER NOT NULL,
|
||||
changes JSON NOT NULL,
|
||||
metadata JSON NOT NULL
|
||||
)
|
||||
"""
|
||||
await Moderation.execute(query=query, return_obj=False)
|
||||
|
||||
index_query_1 = f"CREATE INDEX IF NOT EXISTS idx_target_id ON moderation_{guild.id}(target_id);"
|
||||
await Moderation.execute(query=index_query_1, return_obj=False)
|
||||
|
||||
index_query_2 = f"CREATE INDEX IF NOT EXISTS idx_moderator_id ON moderation_{guild.id}(moderator_id);"
|
||||
await Moderation.execute(query=index_query_2, return_obj=False)
|
||||
|
||||
index_query_3 = f"CREATE INDEX IF NOT EXISTS idx_moderation_id ON moderation_{guild.id}(moderation_id);"
|
||||
await Moderation.execute(query=index_query_3, return_obj=False)
|
||||
|
||||
insert_query = f"""
|
||||
INSERT INTO `moderation_{guild.id}`
|
||||
(moderation_id, timestamp, moderation_type, target_type, target_id, moderator_id, role_id, duration, end_timestamp, reason, resolved, resolved_by, resolve_reason, expired, changes, metadata)
|
||||
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
|
||||
"""
|
||||
insert_values = (
|
||||
0,
|
||||
0,
|
||||
"NULL",
|
||||
"NULL",
|
||||
0,
|
||||
0,
|
||||
None,
|
||||
None,
|
||||
None,
|
||||
None,
|
||||
0,
|
||||
None,
|
||||
None,
|
||||
0,
|
||||
dumps([]),
|
||||
dumps({}),
|
||||
)
|
||||
await Moderation.execute(query=insert_query, parameters=insert_values, return_obj=False)
|
||||
|
||||
logger.trace("SQLite Table created for server %s (%s)", guild.name, guild.id)
|
Loading…
Reference in a new issue