SeaCogs/aurora/importers/aurora.py

150 lines
5.5 KiB
Python

# pylint: disable=duplicate-code
import json
import os
from time import time
from typing import Dict
from discord import ButtonStyle, File, Interaction, Message, ui
from redbot.core import commands, data_manager
from redbot.core.utils.chat_formatting import warning
from ..models.moderation import Moderation
from ..utilities.database import connect, create_guild_table
from ..utilities.json import dump
from ..utilities.utils import timedelta_from_string
class ImportAuroraView(ui.View):
def __init__(self, timeout, ctx, message):
super().__init__()
self.ctx: commands.Context = ctx
self.message: Message = message
@ui.button(label="Yes", style=ButtonStyle.success)
async def import_button_y(
self, interaction: Interaction, button: ui.Button
): # pylint: disable=unused-argument
await self.message.delete()
await interaction.response.send_message(
"Deleting original table...", ephemeral=True
)
database = await connect()
query = f"DROP TABLE IF EXISTS moderation_{self.ctx.guild.id};"
await database.execute(query)
await database.commit()
await interaction.edit_original_response(content="Creating new table...")
await create_guild_table(self.ctx.guild)
await interaction.edit_original_response(content="Importing moderations...")
file = await self.ctx.message.attachments[0].read()
data: list[dict] = sorted(json.loads(file), key=lambda x: x["moderation_id"])
user_mod_types = ["NOTE", "WARN", "ADDROLE", "REMOVEROLE", "MUTE", "UNMUTE", "KICK", "TEMPBAN", "BAN", "UNBAN"]
channel_mod_types = ["SLOWMODE", "LOCKDOWN"]
failed_cases = []
for case in data:
if case["moderation_id"] == 0:
continue
if "target_type" not in case or not case["target_type"]:
if case["moderation_type"] in user_mod_types:
case["target_type"] = "USER"
elif case["moderation_type"] in channel_mod_types:
case["target_type"] = "CHANNEL"
else:
case["target_type"] = "USER"
if "role_id" not in case or not case["role_id"]:
case["role_id"] = None
else:
case["role_id"] = int(case["role_id"])
case["target_id"] = int(case["target_id"])
case["moderator_id"] = int(case["moderator_id"])
if "changes" not in case or not case["changes"]:
changes = []
else:
changes = json.loads(case["changes"])
if isinstance(changes, str):
changes: list[dict] = json.loads(changes)
for change in changes:
if change.get("bot"):
del change["bot"]
if "metadata" not in case:
metadata = {}
else:
metadata: Dict[str, any] = json.loads(case["metadata"])
if not metadata.get("imported_from"):
metadata.update({"imported_from": "Aurora"})
metadata.update({"imported_timestamp": int(time())})
if case["duration"] != "NULL" and case["duration"] is not None:
duration = timedelta_from_string(case["duration"])
else:
duration = None
try:
await Moderation.log(
bot=interaction.client,
guild_id=self.ctx.guild.id,
moderator_id=case["moderator_id"],
moderation_type=case["moderation_type"],
target_type=case["target_type"],
target_id=case["target_id"],
role_id=case["role_id"],
duration=duration,
reason=case["reason"],
timestamp=case["timestamp"],
resolved=case["resolved"],
resolved_by=case["resolved_by"],
resolved_reason=case["resolve_reason"],
expired=case["expired"],
changes=changes,
metadata=metadata,
database=database,
return_obj=False
)
except Exception as e: # pylint: disable=broad-exception-caught
failed_cases.append(str(case["moderation_id"]) + f": {e}")
await database.commit()
await database.close()
await interaction.edit_original_response(content="Import complete.")
if failed_cases:
filename = (
str(data_manager.cog_data_path(cog_instance=self))
+ str(os.sep)
+ f"failed_cases_{interaction.guild.id}.json"
)
with open(filename, "w", encoding="utf-8") as f:
dump(obj=failed_cases, fp=f, indent=2)
await interaction.channel.send(
content="Import complete.\n"
+ warning("Failed to import the following cases:\n"),
file=File(
filename, f"failed_cases_{interaction.guild.id}.json"
)
)
os.remove(filename)
@ui.button(label="No", style=ButtonStyle.danger)
async def import_button_n(
self, interaction: Interaction, button: ui.Button
): # pylint: disable=unused-argument
await self.message.edit(content="Import cancelled.", view=None)
await self.message.delete(10)
await self.ctx.message.delete(10)