IndiumRevolt/cogs/moderation.py
SeaswimmerTheFsh 60493fbd77
All checks were successful
Pylint / Pylint (push) Successful in 50s
misc(moderation): improved code structure
2023-09-24 16:33:13 -04:00

245 lines
13 KiB
Python

import os
import time
from datetime import datetime
import dotenv
import mysql.connector
import revolt
from pytimeparse2 import disable_dateutil, parse
from revolt import utils
from revolt.ext import commands
from utils.embed import CustomEmbed
# This code reads the variables set in the bot's '.env' file.
env = dotenv.find_dotenv()
dotenv.load_dotenv(env)
prefix = os.getenv('PREFIX')
db_host = os.getenv('DB_HOST')
db_user = os.getenv('DB_USER')
db_password = os.getenv('DB_PASSWORD')
db = os.getenv('DB')
required_role_id = os.getenv('REQUIRED_ROLE_ID')
class Moderation(commands.Cog):
def __init__(self, client):
self.client: revolt.Client = client
disable_dateutil()
def mysql_connect(self):
connection = mysql.connector.connect(host=db_host,user=db_user,password=db_password,database=db)
return connection
def create_server_table(self, server: revolt.Server):
database = Moderation.mysql_connect(self)
cursor = database.cursor()
try:
cursor.execute(f"SELECT * FROM `{server.id.lower()}_moderation`")
print(f"MySQL Table exists for server {server.name} ({server.id})")
except mysql.connector.errors.ProgrammingError:
query = f"""
CREATE TABLE `{server.id.lower()}_moderation` (
moderation_id INT UNIQUE PRIMARY KEY NOT NULL,
timestamp INT NOT NULL,
moderation_type LONGTEXT NOT NULL,
target_id LONGTEXT NOT NULL,
moderator_id LONGTEXT NOT NULL,
duration LONGTEXT,
end_timestamp INT,
reason LONGTEXT,
resolved BOOL NOT NULL,
resolve_reason LONGTEXT,
expired BOOL NOT NULL
)
"""
cursor.execute(query)
insert_query = f"""
INSERT INTO `{server.id.lower()}_moderation`
(moderation_id, timestamp, moderation_type, target_id, moderator_id, duration, end_timestamp, reason, resolved, resolve_reason, expired)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
"""
insert_values = (0, 0, "NULL", 0, 0, "NULL", 0, "NULL", 0, "NULL", 0)
cursor.execute(insert_query, insert_values)
database.commit()
database.close()
print(f"MySQL Table created for {server.name} ({server.id})\n{server.id.lower()}_moderation")
else:
database.close()
return
async def tempban_handler(self):
for server in self.client.servers:
database = Moderation.mysql_connect(self)
cursor = database.cursor()
bans = await server.fetch_bans()
for ServerBan in bans:
target_id = ServerBan.user_id
cursor.execute(f"SELECT moderation_id FROM `{server.id.lower()}_moderation` WHERE target_id = {target_id} AND moderation_type = 'Temporary Ban' AND end_timestamp < NOW() AND expired = 0")
result = cursor.fetchone()
if result is None:
continue
ServerBan.unban()
cursor.execute(f"UPDATE `{server.id.lower()}_moderation` SET expired = 1 WHERE moderation_id = {result[1]}")
database.commit()
cursor.close()
print("Tempban Handler run successful!")
def mysql_log(self, ctx: commands.Context, moderation_type: str, target_id: str, duration, reason:str ):
timestamp = int(time.time())
if duration != "NULL":
end_timedelta = datetime.fromtimestamp(timestamp) + duration
end_timestamp = int(end_timedelta.timestamp())
else:
end_timestamp = 0
database = Moderation.mysql_connect(self)
cursor = database.cursor()
cursor.execute(f"SELECT moderation_id FROM `{ctx.server.id.lower()}_moderation` ORDER BY moderation_id DESC LIMIT 1")
moderation_id = cursor.fetchone()[0] + 1
sql = f"INSERT INTO `{ctx.server.id.lower()}_moderation` (moderation_id, timestamp, moderation_type, target_id, moderator_id, duration, end_timestamp, reason, resolved, resolve_reason, expired) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)"
val = (moderation_id, timestamp, moderation_type, target_id, ctx.author.id, duration, end_timestamp, f"{reason}", 0, "NULL", 0)
cursor.execute(sql, val)
database.commit()
database.close()
print(f"MySQL row inserted into {ctx.server.id.lower()}_moderation!\n{moderation_id}, {timestamp}, {moderation_type}, {target_id}, {ctx.author.id}, {duration}, {end_timestamp}, {reason}, 0, NULL")
@commands.command(name="timeout", aliases=["mute"])
async def timeout(self, ctx: commands.Context, target: commands.MemberConverter, duration: str, *, reason: str):
required_role = utils.get(ctx.server.roles, id=required_role_id)
if required_role not in ctx.author.roles:
await ctx.message.reply("You do not have permission to use this command!")
return
try:
parsed_time = parse(sval=duration, as_timedelta=True, raise_exception=True)
except ValueError:
await ctx.message.reply(f"Please provide a valid duration!\nSee `{prefix}tdc`")
return
if not reason:
await ctx.message.reply("Please provide a reason!")
return
await target.timeout(parsed_time)
response = await ctx.message.reply(f"{target.mention} has been timed out for {str(parsed_time)}!\n**Reason** - `{reason}`")
try:
embed = CustomEmbed(title="Timed Out", description=f"You have been timed out for `{str(parsed_time)}` in {ctx.server.name}.\n### Reason\n`{reason}`", color="#5d82d1")
await target.send(embed=embed)
except revolt.errors.HTTPError:
await response.edit(content=f"{response.content}\n*Failed to send DM, user likely has the bot blocked.*")
Moderation.mysql_log(self, ctx, moderation_type='Timeout', target_id=target.id, duration=parsed_time, reason=reason)
@commands.command(name="untimeout", aliases=["unmute"])
async def untimeout(self, ctx: commands.Context, target: commands.MemberConverter, *, reason: str = "No reason provided."):
required_role = utils.get(ctx.server.roles, id=required_role_id)
if required_role not in ctx.author.roles:
await ctx.message.reply("You do not have permission to use this command!")
return
parsed_time = parse(sval="0s", as_timedelta=True, raise_exception=True)
await target.timeout(parsed_time)
response = await ctx.message.reply(f"{target.mention} has had their timeout removed!\n**Reason** - `{reason}`")
try:
embed = CustomEmbed(title="Timeout Removed", description=f"Your timeout has been removed in {ctx.server.name}.\n### Reason\n`{reason}`", color="#5d82d1")
await target.send(embed=embed)
except revolt.errors.HTTPError:
await response.edit(content=f"{response.content}\n*Failed to send DM, user likely has the bot blocked.*")
Moderation.mysql_log(self, ctx, moderation_type='Untimeout', target_id=target.id, duration='NULL', reason=reason)
@commands.command()
async def warn(self, ctx: commands.Context, target: commands.MemberConverter, *, reason: str):
required_role = utils.get(ctx.server.roles, id=required_role_id)
if required_role not in ctx.author.roles:
await ctx.message.reply("You do not have permission to use this command!")
return
if not reason:
await ctx.message.reply("Please include a reason!")
return
response = await ctx.message.reply(f"{target.mention} has been warned!\n**Reason** - `{reason}`")
try:
embed = CustomEmbed(title="Warned", description=f"You have been warned in {ctx.server.name}!\n### Reason\n`{reason}`", color="#5d82d1")
await target.send(embed=embed)
except revolt.errors.HTTPError:
await response.edit(content=f"{response.content}\n*Failed to send DM, user likely has the bot blocked.*")
Moderation.mysql_log(self, ctx, moderation_type='Warning', target_id=target.id, duration='NULL', reason=reason)
@commands.command()
async def kick(self, ctx: commands.Context, target: commands.MemberConverter, *, reason: str):
required_role = utils.get(ctx.server.roles, id=required_role_id)
if required_role not in ctx.author.roles:
await ctx.message.reply("You do not have permission to use this command!")
return
if not reason:
await ctx.message.reply("Please include a reason!")
return
try:
await target.kick()
except revolt.errors.HTTPError:
await ctx.message.reply("User is not in the server!")
return
response = await ctx.message.reply(f"{target.mention} has been kicked!\n**Reason** - `{reason}`")
try:
embed = CustomEmbed(title="Warned", description=f"You have been kicked from {ctx.server.name}!\n### Reason\n`{reason}`", color="#5d82d1")
await target.send(embed=embed)
except revolt.errors.HTTPError:
await response.edit(content=f"{response.content}\n*Failed to send DM, user likely has the bot blocked.*")
Moderation.mysql_log(self, ctx, moderation_type='Kick', target_id=target.id, duration='NULL', reason=reason)
@commands.command()
async def ban(self, ctx: commands.Context, target: commands.MemberConverter, *, reason: str):
required_role = utils.get(ctx.server.roles, id=required_role_id)
if required_role not in ctx.author.roles:
await ctx.message.reply("You do not have permission to use this command!")
return
if not reason:
await ctx.message.reply("Please include a reason!")
return
try:
await target.ban(reason=reason)
response = await ctx.message.reply(f"{target.mention} has been banned!\n**Reason** - `{reason}`")
try:
embed = CustomEmbed(title="Banned", description=f"You have been banned from `{ctx.server.name}`.\n### Reason\n`{reason}`", color="#5d82d1")
await target.send(embed=embed)
except revolt.errors.HTTPError:
await response.edit(content=f"{response.content}\n*Failed to send DM, user likely has the bot blocked.*")
Moderation.mysql_log(self, ctx, moderation_type='Ban', target_id=target.id, duration='NULL', reason=reason)
except revolt.errors.HTTPError:
await ctx.message.reply(f"{target.mention} is already banned!")
@commands.command()
async def unban(self, ctx: commands.Context, target: commands.UserConverter, *, reason: str):
required_role = utils.get(ctx.server.roles, id=required_role_id)
if required_role not in ctx.author.roles:
await ctx.message.reply("You do not have permission to use this command!")
return
if ctx.channel.channel_type is not revolt.ChannelType.text_channel:
await ctx.message.reply("You cannot use moderation commands in direct messages!")
return
if not reason:
await ctx.message.reply("Please include a reason!")
return
bans = await ctx.server.fetch_bans()
for ban in bans:
if ban.user_id == target.id:
await ban.unban()
response = await ctx.message.reply(f"{target.mention} has been unbanned!\n**Reason** - `{reason}`")
try:
embed = CustomEmbed(title="Unbanned", description=f"You have been unbanned from `{ctx.server.name}`.\n### Reason\n`{reason}`", color="#5d82d1")
await target.send(embed=embed)
except revolt.errors.HTTPError:
await response.edit(content=f"{response.content}\n*Failed to send DM, user likely has the bot blocked.*")
Moderation.mysql_log(self, ctx, moderation_type='Unban', target_id=target.id, duration='NULL', reason=str(reason))
return
await ctx.message.reply(f"{target.mention} is not banned!")
@commands.command(aliases=["tdc"])
async def timedeltaconvert(self, ctx: commands.Context, *, duration: str):
required_role = utils.get(ctx.server.roles, id=required_role_id)
if required_role not in ctx.author.roles:
await ctx.message.reply("You do not have permission to use this command!")
return
if not duration:
embeds = [CustomEmbed(description=f"## timedeltaconvert\nThis command converts a duration to a `timedelta` Python object.\n### Example Usage\n`{prefix}timedeltaconvert 1 day 15hr 82 minutes 52 s`\n### Output\n`1 day, 16:22:52`", color="#5d82d1")]
await ctx.message.reply(embeds=embeds)
else:
try:
parsed_time = parse(duration, as_timedelta=True, raise_exception=True)
await ctx.message.reply(f"`{str(parsed_time)}`")
except ValueError:
await ctx.message.reply("Please provide a convertible value!")