SeaCogs/aurora/importers/aurora.py

142 lines
5.3 KiB
Python

# pylint: disable=duplicate-code
import json
import os
from time import time
from typing import Dict, List
from discord import ButtonStyle, File, Interaction, Message, ui
from redbot.core import commands, data_manager
from redbot.core.utils.chat_formatting import warning
from ..models.moderation import Moderation
from ..models.type import Type, type_registry
from ..utilities.json import dump
from ..utilities.utils import create_guild_table, timedelta_from_string
class ImportAuroraView(ui.View):
def __init__(self, timeout, ctx, message, data: List[Dict[str, any]]):
super().__init__()
self.ctx: commands.Context = ctx
self.message: Message = message
self.data: List[Dict[str, any]] = data
@ui.button(label="Yes", style=ButtonStyle.success)
async def import_button_y(
self, interaction: Interaction, button: ui.Button
): # pylint: disable=unused-argument
await self.message.delete()
await interaction.response.send_message(
"Deleting original table...", ephemeral=True
)
query = f"DROP TABLE IF EXISTS moderation_{self.ctx.guild.id};"
await Moderation.execute(query=query, return_obj=False)
await interaction.edit_original_response(content="Creating new table...")
await create_guild_table(self.ctx.guild)
await interaction.edit_original_response(content="Importing moderations...")
failed_cases = []
for case in self.data:
if case["moderation_id"] == 0:
continue
moderation_type: Type = type_registry[case["moderation_type"].lower()]
if "target_type" not in case or not case["target_type"]:
if moderation_type.channel:
case["target_type"] = "channel"
else:
case["target_type"] = "user"
if "role_id" not in case or not case["role_id"]:
case["role_id"] = None
else:
case["role_id"] = int(case["role_id"])
case["target_id"] = int(case["target_id"])
case["moderator_id"] = int(case["moderator_id"])
changes = case.get("changes", None)
if not changes:
changes = []
else:
if not isinstance(changes, list):
changes = json.loads(changes)
if isinstance(changes, str):
changes: list[dict] = json.loads(changes)
for change in changes:
if "bot" in change:
del change["bot"]
if "metadata" not in case:
metadata = {}
else:
if isinstance(case["metadata"], str):
metadata: Dict[str, any] = json.loads(case["metadata"])
else:
metadata = case["metadata"]
if not metadata.get("imported_from"):
metadata.update({"imported_from": "Aurora"})
metadata.update({"imported_timestamp": int(time())})
if case["duration"] != "NULL" and case["duration"] is not None:
duration = timedelta_from_string(case["duration"])
else:
duration = None
try:
await Moderation.log(
bot=interaction.client,
guild_id=self.ctx.guild.id,
moderator_id=case["moderator_id"],
moderation_type=moderation_type,
target_type=case["target_type"].lower(),
target_id=case["target_id"],
role_id=case["role_id"],
duration=duration,
reason=case["reason"],
timestamp=case["timestamp"],
resolved=case["resolved"],
resolved_by=case["resolved_by"],
resolved_reason=case["resolve_reason"],
expired=case["expired"],
changes=changes,
metadata=metadata,
return_obj=False
)
except Exception as e: # pylint: disable=broad-exception-caught
failed_cases.append(str(case["moderation_id"]) + f": {e}")
await interaction.edit_original_response(content="Import complete.")
if failed_cases:
filename = (
str(data_manager.cog_data_path(cog_instance=self))
+ str(os.sep)
+ f"failed_cases_{interaction.guild.id}.json"
)
with open(filename, "w", encoding="utf-8") as f:
dump(obj=failed_cases, fp=f, indent=2)
await interaction.channel.send(
content="Import complete.\n"
+ warning("Failed to import the following cases:\n"),
file=File(
filename, f"failed_cases_{interaction.guild.id}.json"
)
)
os.remove(filename)
@ui.button(label="No", style=ButtonStyle.danger)
async def import_button_n(
self, interaction: Interaction, button: ui.Button
): # pylint: disable=unused-argument
await self.message.edit(content="Import cancelled.", view=None)
await self.message.delete(10)
await self.ctx.message.delete(10)