import os import time from datetime import datetime import dotenv import mysql.connector import revolt from pytimeparse2 import disable_dateutil, parse from revolt import utils from revolt.ext import commands from utils.embed import CustomEmbed # This code reads the variables set in the bot's '.env' file. env = dotenv.find_dotenv() dotenv.load_dotenv(env) prefix = os.getenv('PREFIX') db_host = os.getenv('DB_HOST') db_user = os.getenv('DB_USER') db_password = os.getenv('DB_PASSWORD') db = os.getenv('DB') required_role_id = os.getenv('REQUIRED_ROLE_ID') class Moderation(commands.Cog): def __init__(self, client): self.client: revolt.Client = client disable_dateutil() def mysql_connect(self): connection = mysql.connector.connect(host=db_host,user=db_user,password=db_password,database=db) return connection def create_server_table(self, server): database = Moderation.mysql_connect(self) cursor = database.cursor() try: cursor.execute(f"SELECT * FROM `{server.id.lower()}_moderation`") print(f"MySQL Table exists for server {server.name} ({server.id})") except mysql.connector.errors.ProgrammingError: query = f""" CREATE TABLE `{server.id.lower()}_moderation` ( moderation_id INT UNIQUE PRIMARY KEY NOT NULL, timestamp INT NOT NULL, moderation_type LONGTEXT NOT NULL, target_id LONGTEXT NOT NULL, moderator_id LONGTEXT NOT NULL, duration LONGTEXT, end_timestamp INT, reason LONGTEXT, resolved BOOL NOT NULL, resolve_reason LONGTEXT, expired BOOL NOT NULL ) """ cursor.execute(query) insert_query = f""" INSERT INTO `{server.id.lower()}_moderation` (moderation_id, timestamp, moderation_type, target_id, moderator_id, duration, end_timestamp, reason, resolved, resolve_reason, expired) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s) """ insert_values = (0, 0, "NULL", 0, 0, "NULL", 0, "NULL", 0, "NULL", 0) cursor.execute(insert_query, insert_values) database.commit() database.close() print(f"MySQL Table created for {server.name} ({server.id})\n{server.id.lower()}_moderation") else: database.close() return async def tempban_handler(self): for server in self.client.servers: database = Moderation.mysql_connect(self) cursor = database.cursor() bans = await server.fetch_bans() for ServerBan in bans: target_id = ServerBan.user_id cursor.execute(f"SELECT moderation_id FROM `{server.id.lower()}_moderation` WHERE target_id = {target_id} AND moderation_type = 'Temporary Ban' AND end_timestamp < NOW() AND expired = 0") result = cursor.fetchone() if result is None: continue ServerBan.unban() cursor.execute(f"UPDATE `{server.id.lower()}_moderation` SET expired = 1 WHERE moderation_id = {result[1]}") database.commit() cursor.close() print("Tempban Handler run successful!") def mysql_log(self, ctx: commands.Context, moderation_type, target_id, duration, reason): timestamp = int(time.time()) if duration != "NULL": end_timedelta = datetime.fromtimestamp(timestamp) + duration end_timestamp = int(end_timedelta.timestamp()) else: end_timestamp = 0 database = Moderation.mysql_connect(self) cursor = database.cursor() cursor.execute(f"SELECT moderation_id FROM `{ctx.server.id.lower()}_moderation` ORDER BY moderation_id DESC LIMIT 1") moderation_id = cursor.fetchone()[0] + 1 sql = f"INSERT INTO `{ctx.server.id.lower()}_moderation` (moderation_id, timestamp, moderation_type, target_id, moderator_id, duration, end_timestamp, reason, resolved, resolve_reason, expired) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)" val = (moderation_id, timestamp, moderation_type, target_id, ctx.author.id, duration, end_timestamp, f"{reason}", 0, "NULL", 0) cursor.execute(sql, val) database.commit() database.close() print(f"MySQL row inserted into {ctx.server.id.lower()}_moderation!\n{moderation_id}, {timestamp}, {moderation_type}, {target_id}, {ctx.author.id}, {duration}, {end_timestamp}, {reason}, 0, NULL") @commands.command(name="timeout", aliases=["mute"]) async def timeout(self, ctx: commands.Context, target: commands.MemberConverter, duration: str, *, reason: str): required_role = utils.get(ctx.server.roles, id=required_role_id) if required_role not in ctx.author.roles: await ctx.message.reply("You do not have permission to use this command!") return try: parsed_time = parse(sval=duration, as_timedelta=True, raise_exception=True) except ValueError: await ctx.message.reply(f"Please provide a valid duration!\nSee `{prefix}tdc`") return if not reason: await ctx.message.reply("Please provide a reason!") return await target.timeout(parsed_time) response = await ctx.message.reply(f"{target.mention} has been timed out for {str(parsed_time)}!\n**Reason** - `{reason}`") try: embed = CustomEmbed(title="Timed Out", description=f"You have been timed out for `{str(parsed_time)}` in {ctx.server.name}.\n### Reason\n`{reason}`", color="#5d82d1") await target.send(embed=embed) except revolt.errors.HTTPError: await response.edit(content=f"{response.content}\n*Failed to send DM, user likely has the bot blocked.*") Moderation.mysql_log(self, ctx, moderation_type='Timeout', target_id=target.id, duration=parsed_time, reason=reason) @commands.command(name="untimeout", aliases=["unmute"]) async def untimeout(self, ctx: commands.Context, target: commands.MemberConverter, *, reason: str = "No reason provided."): required_role = utils.get(ctx.server.roles, id=required_role_id) if required_role not in ctx.author.roles: await ctx.message.reply("You do not have permission to use this command!") return parsed_time = parse(sval="0s", as_timedelta=True, raise_exception=True) await target.timeout(parsed_time) response = await ctx.message.reply(f"{target.mention} has had their timeout removed!\n**Reason** - `{reason}`") try: embed = CustomEmbed(title="Timeout Removed", description=f"Your timeout has been removed in {ctx.server.name}.\n### Reason\n`{reason}`", color="#5d82d1") await target.send(embed=embed) except revolt.errors.HTTPError: await response.edit(content=f"{response.content}\n*Failed to send DM, user likely has the bot blocked.*") Moderation.mysql_log(self, ctx, moderation_type='Untimeout', target_id=target.id, duration='NULL', reason=reason) @commands.command() async def warn(self, ctx: commands.Context, target: commands.MemberConverter, *, reason: str): required_role = utils.get(ctx.server.roles, id=required_role_id) if required_role not in ctx.author.roles: await ctx.message.reply("You do not have permission to use this command!") return if not reason: await ctx.message.reply("Please include a reason!") return response = await ctx.message.reply(f"{target.mention} has been warned!\n**Reason** - `{reason}`") try: embed = CustomEmbed(title="Warned", description=f"You have been warned in {ctx.server.name}!\n### Reason\n`{reason}`", color="#5d82d1") await target.send(embed=embed) except revolt.errors.HTTPError: await response.edit(content=f"{response.content}\n*Failed to send DM, user likely has the bot blocked.*") Moderation.mysql_log(self, ctx, moderation_type='Warning', target_id=target.id, duration='NULL', reason=reason) @commands.command() async def kick(self, ctx: commands.Context, target: commands.MemberConverter, *, reason: str): required_role = utils.get(ctx.server.roles, id=required_role_id) if required_role not in ctx.author.roles: await ctx.message.reply("You do not have permission to use this command!") return if not reason: await ctx.message.reply("Please include a reason!") return try: await target.kick() except revolt.errors.HTTPError: await ctx.message.reply("User is not in the server!") return response = await ctx.message.reply(f"{target.mention} has been kicked!\n**Reason** - `{reason}`") try: embed = CustomEmbed(title="Warned", description=f"You have been kicked from {ctx.server.name}!\n### Reason\n`{reason}`", color="#5d82d1") await target.send(embed=embed) except revolt.errors.HTTPError: await response.edit(content=f"{response.content}\n*Failed to send DM, user likely has the bot blocked.*") Moderation.mysql_log(self, ctx, moderation_type='Kick', target_id=target.id, duration='NULL', reason=reason) @commands.command() async def ban(self, ctx: commands.Context, target: commands.MemberConverter, *, reason: str): required_role = utils.get(ctx.server.roles, id=required_role_id) if required_role not in ctx.author.roles: await ctx.message.reply("You do not have permission to use this command!") return if not reason: await ctx.message.reply("Please include a reason!") return try: await target.ban(reason=reason) response = await ctx.message.reply(f"{target.mention} has been banned!\n**Reason** - `{reason}`") try: embed = CustomEmbed(title="Banned", description=f"You have been banned from `{ctx.server.name}`.\n### Reason\n`{reason}`", color="#5d82d1") await target.send(embed=embed) except revolt.errors.HTTPError: await response.edit(content=f"{response.content}\n*Failed to send DM, user likely has the bot blocked.*") Moderation.mysql_log(self, ctx, moderation_type='Ban', target_id=target.id, duration='NULL', reason=reason) except revolt.errors.HTTPError: await ctx.message.reply(f"{target.mention} is already banned!") @commands.command() async def unban(self, ctx: commands.Context, target: commands.UserConverter, *, reason: str): required_role = utils.get(ctx.server.roles, id=required_role_id) if required_role not in ctx.author.roles: await ctx.message.reply("You do not have permission to use this command!") return if ctx.channel.channel_type is not revolt.ChannelType.text_channel: await ctx.message.reply("You cannot use moderation commands in direct messages!") return if not reason: await ctx.message.reply("Please include a reason!") return bans = await ctx.server.fetch_bans() for ban in bans: if ban.user_id == target.id: await ban.unban() response = await ctx.message.reply(f"{target.mention} has been unbanned!\n**Reason** - `{reason}`") try: embed = CustomEmbed(title="Unbanned", description=f"You have been unbanned from `{ctx.server.name}`.\n### Reason\n`{reason}`", color="#5d82d1") await target.send(embed=embed) except revolt.errors.HTTPError: await response.edit(content=f"{response.content}\n*Failed to send DM, user likely has the bot blocked.*") Moderation.mysql_log(self, ctx, moderation_type='Unban', target_id=target.id, duration='NULL', reason=str(reason)) return await ctx.message.reply(f"{target.mention} is not banned!") @commands.command(aliases=["tdc"]) async def timedeltaconvert(self, ctx, *, duration): required_role = utils.get(ctx.server.roles, id=required_role_id) if required_role not in ctx.author.roles: await ctx.message.reply("You do not have permission to use this command!") return if not duration: embeds = [CustomEmbed(description=f"## timedeltaconvert\nThis command converts a duration to a `timedelta` Python object.\n### Example Usage\n`{prefix}timedeltaconvert 1 day 15hr 82 minutes 52 s`\n### Output\n`1 day, 16:22:52`", color="#5d82d1")] await ctx.message.reply(embeds=embeds) else: try: parsed_time = parse(duration, as_timedelta=True, raise_exception=True) await ctx.message.reply(f"`{str(parsed_time)}`") except ValueError: await ctx.message.reply("Please provide a convertible value!")