245 lines
13 KiB
Python
245 lines
13 KiB
Python
import os
|
|
import time
|
|
from datetime import datetime
|
|
import dotenv
|
|
import mysql.connector
|
|
import revolt
|
|
from pytimeparse2 import disable_dateutil, parse
|
|
from revolt import utils
|
|
from revolt.ext import commands
|
|
from utils.embed import CustomEmbed
|
|
|
|
# This code reads the variables set in the bot's '.env' file.
|
|
env = dotenv.find_dotenv()
|
|
dotenv.load_dotenv(env)
|
|
prefix = os.getenv('PREFIX')
|
|
db_host = os.getenv('DB_HOST')
|
|
db_user = os.getenv('DB_USER')
|
|
db_password = os.getenv('DB_PASSWORD')
|
|
db = os.getenv('DB')
|
|
required_role_id = os.getenv('REQUIRED_ROLE_ID')
|
|
|
|
class Moderation(commands.Cog):
|
|
def __init__(self, client):
|
|
self.client: revolt.Client = client
|
|
disable_dateutil()
|
|
|
|
def mysql_connect(self):
|
|
connection = mysql.connector.connect(host=db_host,user=db_user,password=db_password,database=db)
|
|
return connection
|
|
|
|
def create_server_table(self, server: revolt.Server):
|
|
database = Moderation.mysql_connect(self)
|
|
cursor = database.cursor()
|
|
try:
|
|
cursor.execute(f"SELECT * FROM `{server.id.lower()}_moderation`")
|
|
print(f"MySQL Table exists for server {server.name} ({server.id})")
|
|
except mysql.connector.errors.ProgrammingError:
|
|
query = f"""
|
|
CREATE TABLE `{server.id.lower()}_moderation` (
|
|
moderation_id INT UNIQUE PRIMARY KEY NOT NULL,
|
|
timestamp INT NOT NULL,
|
|
moderation_type LONGTEXT NOT NULL,
|
|
target_id LONGTEXT NOT NULL,
|
|
moderator_id LONGTEXT NOT NULL,
|
|
duration LONGTEXT,
|
|
end_timestamp INT,
|
|
reason LONGTEXT,
|
|
resolved BOOL NOT NULL,
|
|
resolve_reason LONGTEXT,
|
|
expired BOOL NOT NULL
|
|
)
|
|
"""
|
|
cursor.execute(query)
|
|
insert_query = f"""
|
|
INSERT INTO `{server.id.lower()}_moderation`
|
|
(moderation_id, timestamp, moderation_type, target_id, moderator_id, duration, end_timestamp, reason, resolved, resolve_reason, expired)
|
|
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
|
|
"""
|
|
insert_values = (0, 0, "NULL", 0, 0, "NULL", 0, "NULL", 0, "NULL", 0)
|
|
cursor.execute(insert_query, insert_values)
|
|
database.commit()
|
|
database.close()
|
|
print(f"MySQL Table created for {server.name} ({server.id})\n{server.id.lower()}_moderation")
|
|
else:
|
|
database.close()
|
|
return
|
|
|
|
async def tempban_handler(self):
|
|
for server in self.client.servers:
|
|
database = Moderation.mysql_connect(self)
|
|
cursor = database.cursor()
|
|
bans = await server.fetch_bans()
|
|
|
|
for ServerBan in bans:
|
|
target_id = ServerBan.user_id
|
|
cursor.execute(f"SELECT moderation_id FROM `{server.id.lower()}_moderation` WHERE target_id = {target_id} AND moderation_type = 'Temporary Ban' AND end_timestamp < NOW() AND expired = 0")
|
|
result = cursor.fetchone()
|
|
if result is None:
|
|
continue
|
|
ServerBan.unban()
|
|
cursor.execute(f"UPDATE `{server.id.lower()}_moderation` SET expired = 1 WHERE moderation_id = {result[1]}")
|
|
database.commit()
|
|
|
|
cursor.close()
|
|
print("Tempban Handler run successful!")
|
|
|
|
def mysql_log(self, ctx: commands.Context, moderation_type: str, target_id: str, duration, reason:str ):
|
|
timestamp = int(time.time())
|
|
if duration != "NULL":
|
|
end_timedelta = datetime.fromtimestamp(timestamp) + duration
|
|
end_timestamp = int(end_timedelta.timestamp())
|
|
else:
|
|
end_timestamp = 0
|
|
database = Moderation.mysql_connect(self)
|
|
cursor = database.cursor()
|
|
cursor.execute(f"SELECT moderation_id FROM `{ctx.server.id.lower()}_moderation` ORDER BY moderation_id DESC LIMIT 1")
|
|
moderation_id = cursor.fetchone()[0] + 1
|
|
sql = f"INSERT INTO `{ctx.server.id.lower()}_moderation` (moderation_id, timestamp, moderation_type, target_id, moderator_id, duration, end_timestamp, reason, resolved, resolve_reason, expired) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)"
|
|
val = (moderation_id, timestamp, moderation_type, target_id, ctx.author.id, duration, end_timestamp, f"{reason}", 0, "NULL", 0)
|
|
cursor.execute(sql, val)
|
|
database.commit()
|
|
database.close()
|
|
print(f"MySQL row inserted into {ctx.server.id.lower()}_moderation!\n{moderation_id}, {timestamp}, {moderation_type}, {target_id}, {ctx.author.id}, {duration}, {end_timestamp}, {reason}, 0, NULL")
|
|
|
|
@commands.command(name="timeout", aliases=["mute"])
|
|
async def timeout(self, ctx: commands.Context, target: commands.MemberConverter, duration: str, *, reason: str):
|
|
required_role = utils.get(ctx.server.roles, id=required_role_id)
|
|
if required_role not in ctx.author.roles:
|
|
await ctx.message.reply("You do not have permission to use this command!")
|
|
return
|
|
try:
|
|
parsed_time = parse(sval=duration, as_timedelta=True, raise_exception=True)
|
|
except ValueError:
|
|
await ctx.message.reply(f"Please provide a valid duration!\nSee `{prefix}tdc`")
|
|
return
|
|
if not reason:
|
|
await ctx.message.reply("Please provide a reason!")
|
|
return
|
|
await target.timeout(parsed_time)
|
|
response = await ctx.message.reply(f"{target.mention} has been timed out for {str(parsed_time)}!\n**Reason** - `{reason}`")
|
|
try:
|
|
embed = CustomEmbed(title="Timed Out", description=f"You have been timed out for `{str(parsed_time)}` in {ctx.server.name}.\n### Reason\n`{reason}`", color="#5d82d1")
|
|
await target.send(embed=embed)
|
|
except revolt.errors.HTTPError:
|
|
await response.edit(content=f"{response.content}\n*Failed to send DM, user likely has the bot blocked.*")
|
|
Moderation.mysql_log(self, ctx, moderation_type='Timeout', target_id=target.id, duration=parsed_time, reason=reason)
|
|
|
|
@commands.command(name="untimeout", aliases=["unmute"])
|
|
async def untimeout(self, ctx: commands.Context, target: commands.MemberConverter, *, reason: str = "No reason provided."):
|
|
required_role = utils.get(ctx.server.roles, id=required_role_id)
|
|
if required_role not in ctx.author.roles:
|
|
await ctx.message.reply("You do not have permission to use this command!")
|
|
return
|
|
parsed_time = parse(sval="0s", as_timedelta=True, raise_exception=True)
|
|
await target.timeout(parsed_time)
|
|
response = await ctx.message.reply(f"{target.mention} has had their timeout removed!\n**Reason** - `{reason}`")
|
|
try:
|
|
embed = CustomEmbed(title="Timeout Removed", description=f"Your timeout has been removed in {ctx.server.name}.\n### Reason\n`{reason}`", color="#5d82d1")
|
|
await target.send(embed=embed)
|
|
except revolt.errors.HTTPError:
|
|
await response.edit(content=f"{response.content}\n*Failed to send DM, user likely has the bot blocked.*")
|
|
Moderation.mysql_log(self, ctx, moderation_type='Untimeout', target_id=target.id, duration='NULL', reason=reason)
|
|
|
|
@commands.command()
|
|
async def warn(self, ctx: commands.Context, target: commands.MemberConverter, *, reason: str):
|
|
required_role = utils.get(ctx.server.roles, id=required_role_id)
|
|
if required_role not in ctx.author.roles:
|
|
await ctx.message.reply("You do not have permission to use this command!")
|
|
return
|
|
if not reason:
|
|
await ctx.message.reply("Please include a reason!")
|
|
return
|
|
response = await ctx.message.reply(f"{target.mention} has been warned!\n**Reason** - `{reason}`")
|
|
try:
|
|
embed = CustomEmbed(title="Warned", description=f"You have been warned in {ctx.server.name}!\n### Reason\n`{reason}`", color="#5d82d1")
|
|
await target.send(embed=embed)
|
|
except revolt.errors.HTTPError:
|
|
await response.edit(content=f"{response.content}\n*Failed to send DM, user likely has the bot blocked.*")
|
|
Moderation.mysql_log(self, ctx, moderation_type='Warning', target_id=target.id, duration='NULL', reason=reason)
|
|
|
|
@commands.command()
|
|
async def kick(self, ctx: commands.Context, target: commands.MemberConverter, *, reason: str):
|
|
required_role = utils.get(ctx.server.roles, id=required_role_id)
|
|
if required_role not in ctx.author.roles:
|
|
await ctx.message.reply("You do not have permission to use this command!")
|
|
return
|
|
if not reason:
|
|
await ctx.message.reply("Please include a reason!")
|
|
return
|
|
try:
|
|
await target.kick()
|
|
except revolt.errors.HTTPError:
|
|
await ctx.message.reply("User is not in the server!")
|
|
return
|
|
response = await ctx.message.reply(f"{target.mention} has been kicked!\n**Reason** - `{reason}`")
|
|
try:
|
|
embed = CustomEmbed(title="Warned", description=f"You have been kicked from {ctx.server.name}!\n### Reason\n`{reason}`", color="#5d82d1")
|
|
await target.send(embed=embed)
|
|
except revolt.errors.HTTPError:
|
|
await response.edit(content=f"{response.content}\n*Failed to send DM, user likely has the bot blocked.*")
|
|
Moderation.mysql_log(self, ctx, moderation_type='Kick', target_id=target.id, duration='NULL', reason=reason)
|
|
|
|
@commands.command()
|
|
async def ban(self, ctx: commands.Context, target: commands.MemberConverter, *, reason: str):
|
|
required_role = utils.get(ctx.server.roles, id=required_role_id)
|
|
if required_role not in ctx.author.roles:
|
|
await ctx.message.reply("You do not have permission to use this command!")
|
|
return
|
|
if not reason:
|
|
await ctx.message.reply("Please include a reason!")
|
|
return
|
|
try:
|
|
await target.ban(reason=reason)
|
|
response = await ctx.message.reply(f"{target.mention} has been banned!\n**Reason** - `{reason}`")
|
|
try:
|
|
embed = CustomEmbed(title="Banned", description=f"You have been banned from `{ctx.server.name}`.\n### Reason\n`{reason}`", color="#5d82d1")
|
|
await target.send(embed=embed)
|
|
except revolt.errors.HTTPError:
|
|
await response.edit(content=f"{response.content}\n*Failed to send DM, user likely has the bot blocked.*")
|
|
Moderation.mysql_log(self, ctx, moderation_type='Ban', target_id=target.id, duration='NULL', reason=reason)
|
|
except revolt.errors.HTTPError:
|
|
await ctx.message.reply(f"{target.mention} is already banned!")
|
|
|
|
@commands.command()
|
|
async def unban(self, ctx: commands.Context, target: commands.UserConverter, *, reason: str):
|
|
required_role = utils.get(ctx.server.roles, id=required_role_id)
|
|
if required_role not in ctx.author.roles:
|
|
await ctx.message.reply("You do not have permission to use this command!")
|
|
return
|
|
if ctx.channel.channel_type is not revolt.ChannelType.text_channel:
|
|
await ctx.message.reply("You cannot use moderation commands in direct messages!")
|
|
return
|
|
if not reason:
|
|
await ctx.message.reply("Please include a reason!")
|
|
return
|
|
bans = await ctx.server.fetch_bans()
|
|
for ban in bans:
|
|
if ban.user_id == target.id:
|
|
await ban.unban()
|
|
response = await ctx.message.reply(f"{target.mention} has been unbanned!\n**Reason** - `{reason}`")
|
|
try:
|
|
embed = CustomEmbed(title="Unbanned", description=f"You have been unbanned from `{ctx.server.name}`.\n### Reason\n`{reason}`", color="#5d82d1")
|
|
await target.send(embed=embed)
|
|
except revolt.errors.HTTPError:
|
|
await response.edit(content=f"{response.content}\n*Failed to send DM, user likely has the bot blocked.*")
|
|
Moderation.mysql_log(self, ctx, moderation_type='Unban', target_id=target.id, duration='NULL', reason=str(reason))
|
|
return
|
|
await ctx.message.reply(f"{target.mention} is not banned!")
|
|
|
|
|
|
@commands.command(aliases=["tdc"])
|
|
async def timedeltaconvert(self, ctx: commands.Context, *, duration: str):
|
|
required_role = utils.get(ctx.server.roles, id=required_role_id)
|
|
if required_role not in ctx.author.roles:
|
|
await ctx.message.reply("You do not have permission to use this command!")
|
|
return
|
|
if not duration:
|
|
embeds = [CustomEmbed(description=f"## timedeltaconvert\nThis command converts a duration to a `timedelta` Python object.\n### Example Usage\n`{prefix}timedeltaconvert 1 day 15hr 82 minutes 52 s`\n### Output\n`1 day, 16:22:52`", color="#5d82d1")]
|
|
await ctx.message.reply(embeds=embeds)
|
|
else:
|
|
try:
|
|
parsed_time = parse(duration, as_timedelta=True, raise_exception=True)
|
|
await ctx.message.reply(f"`{str(parsed_time)}`")
|
|
except ValueError:
|
|
await ctx.message.reply("Please provide a convertible value!")
|