140 lines
5.2 KiB
Python
140 lines
5.2 KiB
Python
# pylint: disable=duplicate-code
|
|
import json
|
|
import os
|
|
from time import time
|
|
from typing import Dict
|
|
|
|
from discord import ButtonStyle, File, Interaction, Message, ui
|
|
from redbot.core import commands, data_manager
|
|
from redbot.core.utils.chat_formatting import warning
|
|
|
|
from ..models.moderation import Moderation
|
|
from ..models.type import Type
|
|
from ..utilities.json import dump
|
|
from ..utilities.registry import type_registry
|
|
from ..utilities.utils import create_guild_table, timedelta_from_string
|
|
|
|
|
|
class ImportAuroraView(ui.View):
|
|
def __init__(self, timeout, ctx, message):
|
|
super().__init__()
|
|
self.ctx: commands.Context = ctx
|
|
self.message: Message = message
|
|
|
|
@ui.button(label="Yes", style=ButtonStyle.success)
|
|
async def import_button_y(
|
|
self, interaction: Interaction, button: ui.Button
|
|
): # pylint: disable=unused-argument
|
|
await self.message.delete()
|
|
await interaction.response.send_message(
|
|
"Deleting original table...", ephemeral=True
|
|
)
|
|
|
|
query = f"DROP TABLE IF EXISTS moderation_{self.ctx.guild.id};"
|
|
await Moderation.execute(query=query, return_obj=False)
|
|
|
|
await interaction.edit_original_response(content="Creating new table...")
|
|
|
|
await create_guild_table(self.ctx.guild)
|
|
|
|
await interaction.edit_original_response(content="Importing moderations...")
|
|
|
|
file = await self.ctx.message.attachments[0].read()
|
|
data: list[dict] = sorted(json.loads(file), key=lambda x: x["moderation_id"])
|
|
|
|
failed_cases = []
|
|
|
|
for case in data:
|
|
if case["moderation_id"] == 0:
|
|
continue
|
|
|
|
moderation_type: Type = type_registry[case["moderation_type"].lower()]
|
|
if "target_type" not in case or not case["target_type"]:
|
|
if moderation_type.channel:
|
|
case["target_type"] = "channel"
|
|
else:
|
|
case["target_type"] = "user"
|
|
|
|
if "role_id" not in case or not case["role_id"]:
|
|
case["role_id"] = None
|
|
else:
|
|
case["role_id"] = int(case["role_id"])
|
|
|
|
case["target_id"] = int(case["target_id"])
|
|
case["moderator_id"] = int(case["moderator_id"])
|
|
|
|
if "changes" not in case or not case["changes"]:
|
|
changes = []
|
|
else:
|
|
changes = json.loads(case["changes"])
|
|
if isinstance(changes, str):
|
|
changes: list[dict] = json.loads(changes)
|
|
|
|
for change in changes:
|
|
if change.get("bot"):
|
|
del change["bot"]
|
|
|
|
if "metadata" not in case:
|
|
metadata = {}
|
|
else:
|
|
metadata: Dict[str, any] = json.loads(case["metadata"])
|
|
if not metadata.get("imported_from"):
|
|
metadata.update({"imported_from": "Aurora"})
|
|
metadata.update({"imported_timestamp": int(time())})
|
|
|
|
if case["duration"] != "NULL" and case["duration"] is not None:
|
|
duration = timedelta_from_string(case["duration"])
|
|
else:
|
|
duration = None
|
|
|
|
try:
|
|
await Moderation.log(
|
|
bot=interaction.client,
|
|
guild_id=self.ctx.guild.id,
|
|
moderator_id=case["moderator_id"],
|
|
moderation_type=moderation_type,
|
|
target_type=case["target_type"].lower(),
|
|
target_id=case["target_id"],
|
|
role_id=case["role_id"],
|
|
duration=duration,
|
|
reason=case["reason"],
|
|
timestamp=case["timestamp"],
|
|
resolved=case["resolved"],
|
|
resolved_by=case["resolved_by"],
|
|
resolved_reason=case["resolve_reason"],
|
|
expired=case["expired"],
|
|
changes=changes,
|
|
metadata=metadata,
|
|
return_obj=False
|
|
)
|
|
except Exception as e: # pylint: disable=broad-exception-caught
|
|
failed_cases.append(str(case["moderation_id"]) + f": {e}")
|
|
|
|
await interaction.edit_original_response(content="Import complete.")
|
|
if failed_cases:
|
|
filename = (
|
|
str(data_manager.cog_data_path(cog_instance=self))
|
|
+ str(os.sep)
|
|
+ f"failed_cases_{interaction.guild.id}.json"
|
|
)
|
|
|
|
with open(filename, "w", encoding="utf-8") as f:
|
|
dump(obj=failed_cases, fp=f, indent=2)
|
|
|
|
await interaction.channel.send(
|
|
content="Import complete.\n"
|
|
+ warning("Failed to import the following cases:\n"),
|
|
file=File(
|
|
filename, f"failed_cases_{interaction.guild.id}.json"
|
|
)
|
|
)
|
|
|
|
os.remove(filename)
|
|
|
|
@ui.button(label="No", style=ButtonStyle.danger)
|
|
async def import_button_n(
|
|
self, interaction: Interaction, button: ui.Button
|
|
): # pylint: disable=unused-argument
|
|
await self.message.edit(content="Import cancelled.", view=None)
|
|
await self.message.delete(10)
|
|
await self.ctx.message.delete(10)
|