352 lines
19 KiB
Python
352 lines
19 KiB
Python
import logging
|
|
import time
|
|
from datetime import datetime, timedelta, timezone
|
|
import discord
|
|
import humanize
|
|
import mysql.connector
|
|
from pytimeparse2 import disable_dateutil, parse
|
|
from redbot.core import app_commands, checks, Config, commands
|
|
|
|
|
|
class Moderation(commands.Cog):
|
|
"""Custom cog moderation cog, meant to copy GalacticBot.
|
|
Developed by SeaswimmerTheFsh."""
|
|
|
|
def __init__(self, bot):
|
|
self.bot = bot
|
|
self.config = Config.get_conf(self, identifier=481923957134912)
|
|
self.config.register_global(
|
|
mysql_address= " ",
|
|
mysql_database = " ",
|
|
mysql_username = " ",
|
|
mysql_password = " ",
|
|
ignore_other_bots = True
|
|
)
|
|
disable_dateutil()
|
|
|
|
async def cog_load(self):
|
|
"""This method prepares the database schema for all of the guilds the bot is currently in."""
|
|
conf = await self.check_conf([
|
|
'mysql_address',
|
|
'mysql_database',
|
|
'mysql_username',
|
|
'mysql_password'
|
|
])
|
|
if conf:
|
|
logging.fatal("Failed to create tables, due to MySQL connection configuration being unset.")
|
|
return
|
|
guilds: list[discord.Guild] = self.bot.guilds
|
|
try:
|
|
for guild in guilds:
|
|
await self.create_guild_table(guild)
|
|
except ConnectionRefusedError:
|
|
return
|
|
|
|
@commands.Cog.listener('on_guild_join')
|
|
async def db_generate_guild_join(self, guild: discord.Guild):
|
|
"""This method prepares the database schema whenever the bot joins a guild."""
|
|
conf = await self.check_conf([
|
|
'mysql_address',
|
|
'mysql_database',
|
|
'mysql_username',
|
|
'mysql_password'
|
|
])
|
|
if conf:
|
|
logging.error("Failed to create a table for %s, due to MySQL connection configuration being unset.", guild.id)
|
|
return
|
|
try:
|
|
await self.create_guild_table(guild)
|
|
except ConnectionRefusedError:
|
|
return
|
|
|
|
@commands.Cog.listener('on_audit_log_entry_create')
|
|
async def autologger(self, entry: discord.AuditLogEntry):
|
|
"""This method automatically logs moderations done by users manually ("right clicks")."""
|
|
if await self.config.ignore_other_bots() is True:
|
|
if entry.user.bot or entry.target.bot:
|
|
return
|
|
else:
|
|
if entry.user.id == self.bot.user.id:
|
|
return
|
|
duration = "NULL"
|
|
if entry.reason:
|
|
reason = entry.reason + " (This action was performed without the bot.)"
|
|
else:
|
|
reason = "This action was performed without the bot."
|
|
if entry.action == discord.AuditLogAction.kick:
|
|
moderation_type = 'KICK'
|
|
elif entry.action == discord.AuditLogAction.ban:
|
|
moderation_type = 'BAN'
|
|
elif entry.action == discord.AuditLogAction.unban:
|
|
moderation_type = 'UNBAN'
|
|
elif entry.action == discord.AuditLogAction.member_update:
|
|
if entry.after.timed_out_until is not None:
|
|
timed_out_until_aware = entry.after.timed_out_until.replace(tzinfo=timezone.utc)
|
|
duration_datetime = timed_out_until_aware - datetime.now(tz=timezone.utc)
|
|
minutes = round(duration_datetime.total_seconds() / 60)
|
|
duration = timedelta(minutes=minutes)
|
|
moderation_type = 'MUTE'
|
|
else:
|
|
moderation_type = 'UNMUTE'
|
|
else:
|
|
return
|
|
await self.mysql_log(entry.guild.id, entry.user.id, moderation_type, entry.target.id, duration, reason)
|
|
|
|
async def connect(self):
|
|
"""Connects to the MySQL database, and returns a connection object."""
|
|
conf = await self.check_conf([
|
|
'mysql_address',
|
|
'mysql_database',
|
|
'mysql_username',
|
|
'mysql_password'
|
|
])
|
|
if conf:
|
|
raise LookupError("MySQL connection details not set properly!")
|
|
try:
|
|
connection = mysql.connector.connect(
|
|
host=await self.config.mysql_address(),
|
|
user=await self.config.mysql_username(),
|
|
password=await self.config.mysql_password(),
|
|
database=await self.config.mysql_database()
|
|
)
|
|
return connection
|
|
except mysql.connector.ProgrammingError as e:
|
|
logging.fatal("Unable to access the MySQL database!\nError:\n%s", e.msg)
|
|
raise ConnectionRefusedError(f"Unable to access the MySQL Database!\n{e.msg}") from e
|
|
|
|
async def create_guild_table(self, guild: discord.Guild):
|
|
database = await self.connect()
|
|
cursor = database.cursor()
|
|
try:
|
|
cursor.execute(f"SELECT * FROM `moderation_{guild.id}`")
|
|
logging.info("MySQL Table exists for server %s (%s)", guild.name, guild.id)
|
|
except mysql.connector.errors.ProgrammingError:
|
|
query = f"""
|
|
CREATE TABLE `moderation_{guild.id}` (
|
|
moderation_id INT UNIQUE PRIMARY KEY NOT NULL,
|
|
timestamp INT NOT NULL,
|
|
moderation_type LONGTEXT NOT NULL,
|
|
target_id LONGTEXT NOT NULL,
|
|
moderator_id LONGTEXT NOT NULL,
|
|
duration LONGTEXT,
|
|
end_timestamp INT,
|
|
reason LONGTEXT,
|
|
resolved BOOL NOT NULL,
|
|
resolve_reason LONGTEXT,
|
|
expired BOOL NOT NULL
|
|
)
|
|
"""
|
|
cursor.execute(query)
|
|
insert_query = f"""
|
|
INSERT INTO `moderation_{guild.id}`
|
|
(moderation_id, timestamp, moderation_type, target_id, moderator_id, duration, end_timestamp, reason, resolved, resolve_reason, expired)
|
|
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
|
|
"""
|
|
insert_values = (0, 0, "NULL", 0, 0, "NULL", 0, "NULL", 0, "NULL", 0)
|
|
cursor.execute(insert_query, insert_values)
|
|
database.commit()
|
|
database.close()
|
|
logging.info("MySQL Table (moderation_%s) created for %s (%s)", guild.id, guild.name, guild.id)
|
|
else:
|
|
database.close()
|
|
return
|
|
|
|
async def check_conf(self, config: list):
|
|
"""Checks if any required config options are not set."""
|
|
not_found_list = []
|
|
for item in config:
|
|
if await self.config.item() == " ":
|
|
not_found_list.append(item)
|
|
return not_found_list
|
|
|
|
async def mysql_log(self, guild_id: str, author_id: str, moderation_type: str, target_id: int, duration, reason: str):
|
|
timestamp = int(time.time())
|
|
if duration != "NULL":
|
|
end_timedelta = datetime.fromtimestamp(timestamp) + duration
|
|
end_timestamp = int(end_timedelta.timestamp())
|
|
else:
|
|
end_timestamp = 0
|
|
database = await self.connect()
|
|
cursor = database.cursor()
|
|
cursor.execute(f"SELECT moderation_id FROM `moderation_{guild_id}` ORDER BY moderation_id DESC LIMIT 1")
|
|
moderation_id = cursor.fetchone()[0] + 1
|
|
sql = f"INSERT INTO `moderation_{guild_id}` (moderation_id, timestamp, moderation_type, target_id, moderator_id, duration, end_timestamp, reason, resolved, resolve_reason, expired) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)"
|
|
val = (moderation_id, timestamp, moderation_type, target_id, author_id, duration, end_timestamp, f"{reason}", 0, "NULL", 0)
|
|
cursor.execute(sql, val)
|
|
database.commit()
|
|
database.close()
|
|
logging.debug("MySQL row inserted into moderation_%s!\n%s, %s, %s, %s, %s, %s, %s, %s, 0, NULL", guild_id, moderation_id, timestamp, moderation_type, target_id, author_id, duration, end_timestamp, reason)
|
|
|
|
async def get_next_case_number(self, guild_id: str):
|
|
"""This method returns the next case number from the MySQL table for a specific guild."""
|
|
database = await self.connect()
|
|
cursor = database.cursor()
|
|
cursor.execute(f"SELECT moderation_id FROM `moderation_{guild_id}` ORDER BY moderation_id DESC LIMIT 1")
|
|
return cursor.fetchone()[0] + 1
|
|
|
|
@app_commands.command(name="warn")
|
|
async def warn(self, interaction: discord.Interaction, target: discord.Member, reason: str):
|
|
"""Warn a user."""
|
|
response = await interaction.response.send_message(content=f"{target.mention} has been warned!\n**Reason** - `{reason}`")
|
|
try:
|
|
embed = discord.Embed(title="Warned", description=f"You have been warned in [{interaction.guild.name}]({response.jump_url}).", color=await self.bot.get_embed_color(None), timestamp=datetime.now())
|
|
embed.add_field(name='Reason', value=f"`{reason}`")
|
|
embed.set_footer(text=f"Case #{await self.get_next_case_number(interaction.guild.id)}", icon_url="https://cdn.discordapp.com/attachments/1070822161389994054/1159469476773904414/arrow-right-circle-icon-512x512-2p1e2aaw.png?ex=65312319&is=651eae19&hm=3cebdd28e805c13a79ec48ef87c32ca532ffa6b9ede2e48d0cf8e5e81f3a6818&")
|
|
await target.send(embed=embed)
|
|
except discord.errors.HTTPException:
|
|
await response.edit(content=f"{response.content}\n*Failed to send DM, user likely has the bot blocked.*")
|
|
await self.mysql_log(interaction.guild.id, interaction.user.id, 'WARN', target.id, 'NULL', reason)
|
|
|
|
@app_commands.command(name="mute", aliases=['timeout', 'tm'])
|
|
async def mute(self, interaction: discord.Interaction, target: discord.Member, duration: str, reason: str):
|
|
"""Mute a user."""
|
|
if target.is_timed_out() is True:
|
|
await interaction.response.send_message(f"{target.mention} is already muted!", allowed_mentions=discord.AllowedMentions(users=False), ephemeral=True)
|
|
return
|
|
try:
|
|
parsed_time = parse(sval=duration, as_timedelta=True, raise_exception=True)
|
|
except ValueError:
|
|
await interaction.response.send_message(f"Please provide a valid duration!", ephemeral=True)
|
|
return
|
|
if parsed_time.total_seconds() / 1000 > 2419200000:
|
|
await interaction.response.send_message("Please provide a duration that is less than 28 days.")
|
|
return
|
|
await target.timeout(parsed_time)
|
|
response = await interaction.response.send_message(content=f"{target.mention} has been muted for {humanize.precisedelta(parsed_time)}!\n**Reason** - `{reason}`")
|
|
try:
|
|
embed = discord.Embed(title="Muted", description=f"You have been muted for {humanize.precisedelta(parsed_time)} in [{interaction.guild.name}]({response.jump_url}).", color=await self.bot.get_embed_color(None), timestamp=datetime.now())
|
|
embed.add_field(name='Reason', value=f"`{reason}`")
|
|
embed.set_footer(text=f"Case #{await self.get_next_case_number(interaction.guild.id)}", icon_url="https://cdn.discordapp.com/attachments/1070822161389994054/1159469476773904414/arrow-right-circle-icon-512x512-2p1e2aaw.png?ex=65312319&is=651eae19&hm=3cebdd28e805c13a79ec48ef87c32ca532ffa6b9ede2e48d0cf8e5e81f3a6818&")
|
|
await target.send(embed=embed)
|
|
except discord.errors.HTTPException:
|
|
await response.edit(content=f"{response.content}\n*Failed to send DM, user likely has the bot blocked.*")
|
|
await self.mysql_log(interaction.guild.id, interaction.user.id, 'MUTE', target.id, parsed_time, reason)
|
|
|
|
@app_commands.command(name="unmute", aliases=['untimeout', 'utm'])
|
|
async def unmute(self, interaction: discord.Interaction, target: discord.Member, reason: str = None):
|
|
"""Unmute a user."""
|
|
if target.is_timed_out() is False:
|
|
await interaction.response.send_message(f"{target.mention} is not muted!", allowed_mentions=discord.AllowedMentions(users=False), ephemeral=True)
|
|
return
|
|
if reason:
|
|
await target.timeout(None, reason=f"Unmuted by {interaction.user.id} for: {reason}")
|
|
else:
|
|
await target.timeout(None)
|
|
reason = "No reason given."
|
|
response = await interaction.response.send_message(content=f"{target.mention} has been unmuted!\n**Reason** - `{reason}`")
|
|
try:
|
|
embed = discord.Embed(title="Unmuted", description=f"You have been unmuted in [{interaction.guild.name}]({response.jump_url}).", color=await self.bot.get_embed_color(None), timestamp=datetime.now())
|
|
embed.add_field(name='Reason', value=f"`{reason}`")
|
|
embed.set_footer(text=f"Case #{await self.get_next_case_number(interaction.guild.id)}", icon_url="https://cdn.discordapp.com/attachments/1070822161389994054/1159469476773904414/arrow-right-circle-icon-512x512-2p1e2aaw.png?ex=65312319&is=651eae19&hm=3cebdd28e805c13a79ec48ef87c32ca532ffa6b9ede2e48d0cf8e5e81f3a6818&")
|
|
await target.send(embed=embed)
|
|
except discord.errors.HTTPException:
|
|
await response.edit(content=f"{response.content}\n*Failed to send DM, user likely has the bot blocked.*")
|
|
await self.mysql_log(interaction.guild.id, interaction.user.id, 'UNMUTE', target.id, 'NULL', reason)
|
|
|
|
@app_commands.command(name="kick")
|
|
async def kick(self, interaction: discord.Interaction, target: discord.Member, reason: str):
|
|
"""Kick a user."""
|
|
response = await interaction.response.send_message(content=f"{target.mention} has been kicked!\n**Reason** - `{reason}`")
|
|
try:
|
|
embed = discord.Embed(title="Warned", description=f"You have been kicked from {interaction.guild.name}.", color=await self.bot.get_embed_color(None), timestamp=datetime.now())
|
|
embed.add_field(name='Reason', value=f"`{reason}`")
|
|
embed.set_footer(text=f"Case #{await self.get_next_case_number(interaction.guild.id)}", icon_url="https://cdn.discordapp.com/attachments/1070822161389994054/1159469476773904414/arrow-right-circle-icon-512x512-2p1e2aaw.png?ex=65312319&is=651eae19&hm=3cebdd28e805c13a79ec48ef87c32ca532ffa6b9ede2e48d0cf8e5e81f3a6818&")
|
|
await target.send(embed=embed)
|
|
except discord.errors.HTTPException:
|
|
await response.edit(content=f"{response.content}\n*Failed to send DM, user likely has the bot blocked.*")
|
|
await target.kick(f"Kicked by {interaction.user.id} for: {reason}")
|
|
await self.mysql_log(interaction.guild.id, interaction.user.id, 'KICK', target.id, 'NULL', reason)
|
|
|
|
@commands.group(autohelp=True)
|
|
@checks.admin()
|
|
async def moderationset(self, ctx: commands.Context):
|
|
"""Manage moderation commands."""
|
|
|
|
@moderationset.command(name="ignorebots")
|
|
@checks.admin()
|
|
async def moderationset_ignorebots(self, ctx: commands.Context):
|
|
await self.config.ignore_other_bots.set(not await self.config.ignore_other_bots())
|
|
await ctx.send(f"Ignore bots setting set to {await self.config.ignore_other_bots()}")
|
|
|
|
@moderationset.command(name="mysql")
|
|
@checks.is_owner()
|
|
async def moderationset_mysql(self, ctx: commands.Context):
|
|
"""Configure MySQL connection details."""
|
|
await ctx.message.add_reaction("✅")
|
|
await ctx.author.send(content="Click the button below to configure your MySQL connection details.", view=self.ConfigButtons(60))
|
|
|
|
class ConfigButtons(discord.ui.View):
|
|
def __init__(self, timeout):
|
|
super().__init__()
|
|
self.config = Config.get_conf(None, cog_name='Moderation', identifier=481923957134912)
|
|
|
|
@discord.ui.button(label="Edit", style=discord.ButtonStyle.success)
|
|
async def config_button(self, interaction: discord.Interaction, button: discord.ui.Button): # pylint: disable=unused-argument
|
|
await interaction.response.send_modal(Moderation.MySQLConfigModal(self.config))
|
|
|
|
class MySQLConfigModal(discord.ui.Modal, title="MySQL Database Configuration"):
|
|
def __init__(self, config):
|
|
super().__init__()
|
|
self.config = config
|
|
address = discord.ui.TextInput(
|
|
label="Address",
|
|
placeholder="Input your MySQL address here.",
|
|
style=discord.TextStyle.short,
|
|
required=False,
|
|
max_length=300
|
|
)
|
|
database = discord.ui.TextInput(
|
|
label="Database",
|
|
placeholder="Input the name of your database here.",
|
|
style=discord.TextStyle.short,
|
|
required=False,
|
|
max_length=300
|
|
)
|
|
username = discord.ui.TextInput(
|
|
label="Username",
|
|
placeholder="Input your MySQL username here.",
|
|
style=discord.TextStyle.short,
|
|
required=False,
|
|
max_length=300
|
|
)
|
|
password = discord.ui.TextInput(
|
|
label="Password",
|
|
placeholder="Input your MySQL password here.",
|
|
style=discord.TextStyle.short,
|
|
required=False,
|
|
max_length=300
|
|
)
|
|
|
|
async def on_submit(self, interaction: discord.Interaction):
|
|
message = ""
|
|
if self.address.value != "":
|
|
await self.config.mysql_address.set(self.address.value)
|
|
message += f"- Address set to\n - `{self.address.value}`\n"
|
|
if self.database.value != "":
|
|
await self.config.mysql_database.set(self.database.value)
|
|
message += f"- Database set to\n - `{self.database.value}`\n"
|
|
if self.username.value != "":
|
|
await self.config.mysql_username.set(self.username.value)
|
|
message += f"- Username set to\n - `{self.username.value}`\n"
|
|
if self.password.value != "":
|
|
await self.config.mysql_password.set(self.password.value)
|
|
trimmed_password = self.password.value[:8]
|
|
message += f"- Password set to\n - `{trimmed_password}` - Trimmed for security\n"
|
|
if message == "":
|
|
trimmed_password = str(await self.config.mysql_password())[:8]
|
|
send = f"No changes were made.\nCurrent configuration:\n- Address:\n - `{await self.config.mysql_address()}`\n- Database:\n - `{await self.config.mysql_database()}`\n- Username:\n - `{await self.config.mysql_username()}`\n- Password:\n - `{trimmed_password}` - Trimmed for security"
|
|
else:
|
|
send = f"Configuration changed:\n{message}"
|
|
await interaction.response.send_message(send, ephemeral=True)
|
|
|
|
@commands.command(aliases=["tdc"])
|
|
async def timedeltaconvert(self, ctx: commands.Context, *, duration: str = None):
|
|
if not duration:
|
|
embed = discord.Embed(description=f"## timedeltaconvert\nThis command converts a duration to a `timedelta` Python object.\n### Example Usage\n`{ctx.prefix}timedeltaconvert 1 day 15hr 82 minutes 52 s`\n### Output\n`1 day, 16:22:52`", color=await self.bot.get_embed_color(None))
|
|
await ctx.send(embed=embed)
|
|
else:
|
|
try:
|
|
parsed_time = parse(duration, as_timedelta=True, raise_exception=True)
|
|
await ctx.send(f"`{str(parsed_time)}`")
|
|
except ValueError:
|
|
await ctx.send("Please provide a convertible value!")
|