# pylint: disable=duplicate-code import json import os from time import time from typing import Dict from discord import ButtonStyle, File, Interaction, Message, ui from redbot.core import commands, data_manager from redbot.core.utils.chat_formatting import warning from ..models.moderation import Moderation from ..utilities.database import connect, create_guild_table from ..utilities.json import dump from ..utilities.utils import timedelta_from_string class ImportAuroraView(ui.View): def __init__(self, timeout, ctx, message): super().__init__() self.ctx: commands.Context = ctx self.message: Message = message @ui.button(label="Yes", style=ButtonStyle.success) async def import_button_y( self, interaction: Interaction, button: ui.Button ): # pylint: disable=unused-argument await self.message.delete() await interaction.response.send_message( "Deleting original table...", ephemeral=True ) database = await connect() query = f"DROP TABLE IF EXISTS moderation_{self.ctx.guild.id};" database.execute(query) database.commit() await interaction.edit_original_response(content="Creating new table...") await create_guild_table(self.ctx.guild) await interaction.edit_original_response(content="Importing moderations...") file = await self.ctx.message.attachments[0].read() data: list[dict] = sorted(json.loads(file), key=lambda x: x["moderation_id"]) user_mod_types = ["NOTE", "WARN", "ADDROLE", "REMOVEROLE", "MUTE", "UNMUTE", "KICK", "TEMPBAN", "BAN", "UNBAN"] channel_mod_types = ["SLOWMODE", "LOCKDOWN"] failed_cases = [] for case in data: if case["moderation_id"] == 0: continue if "target_type" not in case or not case["target_type"]: if case["moderation_type"] in user_mod_types: case["target_type"] = "USER" elif case["moderation_type"] in channel_mod_types: case["target_type"] = "CHANNEL" else: case["target_type"] = "USER" if "role_id" not in case or not case["role_id"]: case["role_id"] = None else: case["role_id"] = int(case["role_id"]) case["target_id"] = int(case["target_id"]) case["moderator_id"] = int(case["moderator_id"]) if "changes" not in case or not case["changes"]: changes = [] else: changes = json.loads(case["changes"]) if isinstance(changes, str): changes: list[dict] = json.loads(changes) for change in changes: if change.get("bot"): del change["bot"] if "metadata" not in case: metadata = {} else: metadata: Dict[str, any] = json.loads(case["metadata"]) if not metadata.get("imported_from"): metadata.update({"imported_from": "Aurora"}) metadata.update({"imported_timestamp": int(time())}) if case["duration"] != "NULL" and case["duration"] is not None: duration = timedelta_from_string(case["duration"]) else: duration = None try: await Moderation.log( bot=interaction.client, guild_id=self.ctx.guild.id, moderator_id=case["moderator_id"], moderation_type=case["moderation_type"], target_type=case["target_type"], target_id=case["target_id"], role_id=case["role_id"], duration=duration, reason=case["reason"], timestamp=case["timestamp"], resolved=case["resolved"], resolved_by=case["resolved_by"], resolved_reason=case["resolve_reason"], expired=case["expired"], changes=changes, metadata=metadata, database=database, return_obj=False ) except Exception as e: # pylint: disable=broad-exception-caught failed_cases.append(str(case["moderation_id"]) + f": {e}") await database.commit() await database.close() await interaction.edit_original_response(content="Import complete.") if failed_cases: filename = ( str(data_manager.cog_data_path(cog_instance=self)) + str(os.sep) + f"failed_cases_{interaction.guild.id}.json" ) with open(filename, "w", encoding="utf-8") as f: dump(obj=failed_cases, fp=f, indent=2) await interaction.channel.send( content="Import complete.\n" + warning("Failed to import the following cases:\n"), file=File( filename, f"failed_cases_{interaction.guild.id}.json" ) ) os.remove(filename) @ui.button(label="No", style=ButtonStyle.danger) async def import_button_n( self, interaction: Interaction, button: ui.Button ): # pylint: disable=unused-argument await self.message.edit(content="Import cancelled.", view=None) await self.message.delete(10) await self.ctx.message.delete(10)