import asyncio import json from typing import Mapping, Optional, Union import discord import websockets from discord.ext import tasks from pydactyl import PterodactylClient from redbot.core import app_commands, commands from redbot.core.app_commands import Choice from redbot.core.bot import Red from redbot.core.utils.chat_formatting import box, error from redbot.core.utils.views import ConfirmView from pterodactyl import mcsrvstatus from pterodactyl.config import config, register_config from pterodactyl.logger import logger class Pterodactyl(commands.Cog): """Pterodactyl allows you to manage your Pterodactyl Panel from Discord.""" def __init__(self, bot: Red): self.bot = bot self.client: Optional[PterodactylClient] = None self.task: Optional[asyncio.Task] = None self.websocket: Optional[websockets.WebSocketClientProtocol] = None self.retry_counter: int = 0 register_config(config) self.task = self.get_task() self.update_topic.start() async def cog_unload(self) -> None: self.update_topic.cancel() self.task.cancel() self.retry_counter = 0 await self.client._session.close() # pylint: disable=protected-access def get_task(self) -> asyncio.Task: from pterodactyl.websocket import establish_websocket_connection task = self.bot.loop.create_task(establish_websocket_connection(self), name="Pterodactyl Websocket Connection") task.add_done_callback(self.error_callback) return task def error_callback(self, fut) -> None: #NOTE - Thanks flame442 and zephyrkul for helping me figure this out try: fut.result() except asyncio.CancelledError: logger.info("WebSocket task has been cancelled.") except Exception as e: # pylint: disable=broad-exception-caught logger.error("WebSocket task has failed: %s", e, exc_info=e) self.task.cancel() if self.retry_counter < 5: self.retry_counter += 1 logger.info("Retrying in %s seconds...", 5 * self.retry_counter) self.task = self.bot.loop.call_later(5 * self.retry_counter, self.get_task) else: logger.info("Retry limit reached. Stopping task.") @tasks.loop(minutes=6) async def update_topic(self): topic = await self.get_topic() console = self.bot.get_channel(await config.console_channel()) chat = self.bot.get_channel(await config.chat_channel()) if console: await console.edit(topic=topic) if chat: await chat.edit(topic=topic) @commands.Cog.listener() async def on_message_without_command(self, message: discord.Message) -> None: if message.channel.id == await config.console_channel() and message.author.bot is False: logger.debug("Received console command from %s: %s", message.author.id, message.content) await message.channel.send(f"Received console command from {message.author.id}: {message.content[:1900]}", allowed_mentions=discord.AllowedMentions.none()) try: await self.websocket.send(json.dumps({"event": "send command", "args": [message.content]})) except websockets.exceptions.ConnectionClosed as e: logger.error("WebSocket connection closed: %s", e) self.task.cancel() self.retry_counter = 0 self.task = self.get_task() if message.channel.id == await config.chat_channel() and message.author.bot is False: logger.debug("Received chat message from %s: %s", message.author.id, message.content) channel = self.bot.get_channel(await config.console_channel()) if channel: await channel.send(f"Received chat message from {message.author.id}: {message.content[:1900]}", allowed_mentions=discord.AllowedMentions.none()) msg = json.dumps({"event": "send command", "args": [await self.get_chat_command(message)]}) logger.debug("Sending chat message to server:\n%s", msg) try: await self.websocket.send(msg) except websockets.exceptions.ConnectionClosed as e: logger.error("WebSocket connection closed: %s", e) self.task.cancel() self.retry_counter = 0 self.task = self.get_task() async def get_topic(self) -> str: topic: str = await config.topic() placeholders = { "H": await config.topic_hostname() or "unset", "O": str(await config.topic_port()), } if await config.api_endpoint() == "minecraft": status, response = await mcsrvstatus.get_status(await config.topic_ip(), await config.topic_port()) if status: placeholders.update({ "I": response['ip'], "M": str(response['players']['max']), "P": str(response['players']['online']), "V": response['version'], "D": response['motd']['clean'][0] if response['motd']['clean'] else "unset", }) else: placeholders.update({ "I": response['ip'], "M": "0", "P": "0", "V": "Server Offline", "D": "Server Offline", }) for key, value in placeholders.items(): topic = topic.replace('.$' + key, value) return topic async def get_chat_command(self, message: discord.Message) -> str: command: str = await config.chat_command() placeholders = { "C": str(message.author.color), "D": message.author.discriminator, "I": str(message.author.id), "M": message.content.replace('"',''), "N": message.author.display_name, "U": message.author.name, "V": await config.invite() or "use [p]pterodactyl config invite to change me", } for key, value in placeholders.items(): command = command.replace('.$' + key, value) return command async def power(self, ctx: Union[discord.Interaction, commands.Context], action: str, action_ing: str, warning: str = '') -> None: if isinstance(ctx, discord.Interaction): author = ctx.user else: author = ctx.author current_status = await config.current_status() if current_status == action_ing: if isinstance(ctx, discord.Interaction): return await ctx.response.send_message(f"Server is already {action_ing}.", ephemeral=True) return await ctx.send(f"Server is already {action_ing}.") if current_status in ["starting", "stopping"] and action != "kill": if isinstance(ctx, discord.Interaction): return await ctx.response.send_message("Another power action is already in progress.", ephemeral=True) return await ctx.send("Another power action is already in progress.") view = ConfirmView(author, disable_buttons=True) if isinstance(ctx, discord.Interaction): await ctx.response.send_message(f"{warning}Are you sure you want to {action} the server?", view=view) else: message = await ctx.send(f"{warning}Are you sure you want to {action} the server?", view=view) await view.wait() if view.result is True: if isinstance(ctx, discord.Interaction): await ctx.edit_original_response(content=f"Sending websocket command to {action} server...", view=None) else: await message.edit(content=f"Sending websocket command to {action} server...", view=None) await self.websocket.send(json.dumps({"event": "set state", "args": [action]})) if isinstance(ctx, discord.Interaction): await ctx.edit_original_response(content=f"Server {action_ing}", view=None) else: await message.edit(content=f"Server {action_ing}", view=None) else: if isinstance(ctx, discord.Interaction): await ctx.edit_original_response(content="Cancelled.", view=None) else: await message.edit(content="Cancelled.", view=None) async def send_command(self, ctx: Union[discord.Interaction, commands.Context], command: str): channel = self.bot.get_channel(await config.console_channel()) if isinstance(ctx, discord.Interaction): if channel: await channel.send(f"Received console command from {ctx.user.id}: {command[:1900]}", allowed_mentions=discord.AllowedMentions.none()) try: await self.websocket.send(json.dumps({"event": "send command", "args": [command]})) await ctx.response.send_message(f"Command sent to server. {box(command, 'json')}", ephemeral=True) except websockets.exceptions.ConnectionClosed as e: logger.error("WebSocket connection closed: %s", e) await ctx.response.send_message(error("WebSocket connection closed.")) self.task.cancel() self.retry_counter = 0 self.task = self.get_task() else: if channel: await channel.send(f"Received console command from {ctx.author.id}: {command[:1900]}", allowed_mentions=discord.AllowedMentions.none()) try: await self.websocket.send(json.dumps({"event": "send command", "args": [command]})) await ctx.send(f"Command sent to server. {box(command, 'json')}") except websockets.exceptions.ConnectionClosed as e: logger.error("WebSocket connection closed: %s", e) await ctx.send(error("WebSocket connection closed.")) self.task.cancel() self.retry_counter = 0 self.task = self.get_task() @commands.Cog.listener() async def on_red_api_tokens_update(self, service_name: str, api_tokens: Mapping[str,str]): # pylint: disable=unused-argument if service_name == "pterodactyl": logger.info("Configuration value set: api_key\nRestarting task...") self.task.cancel() self.retry_counter = 0 self.task = self.get_task() slash_pterodactyl = app_commands.Group(name="pterodactyl", description="Pterodactyl allows you to manage your Pterodactyl Panel from Discord.") @slash_pterodactyl.command(name = "command", description = "Send a command to the server console.") async def slash_pterodactyl_command(self, interaction: discord.Interaction, command: str) -> None: """Send a command to the server console. Parameters: ----------- command: str The command to send to the server.""" return await self.send_command(interaction, command) @slash_pterodactyl.command(name = "power", description = "Send power actions to the server.") @app_commands.choices(action=[ Choice(name="Start", value="start"), Choice(name="Stop", value="stop"), Choice(name="Restart", value="restart"), Choice(name="⚠️ Kill ⚠️", value="kill") ]) async def slash_pterodactyl_power(self, interaction: discord.Interaction, action: app_commands.Choice[str]) -> None: """Send power actions to the server. Parameters: ----------- action: app_commands.Choice[str] The action to perform on the server.""" if action.value == "kill": return await self.power(interaction, action.value, "stopping... (forcefully killed)", warning="**⚠️ Forcefully killing the server process can corrupt data in some cases. ⚠️**\n") if action.value == "stop": return await self.power(interaction, action.value, "stopping...") return await self.power(interaction, action.value, f"{action.value}ing...") @commands.group(autohelp = True, name = "pterodactyl", aliases = ["ptero"]) async def pterodactyl(self, ctx: commands.Context) -> None: """Pterodactyl allows you to manage your Pterodactyl Panel from Discord.""" @pterodactyl.command(name = "command", aliases = ["cmd", "execute", "exec"]) @commands.admin() async def pterodactyl_command(self, ctx: commands.Context, *, command: str) -> None: """Send a command to the server console.""" return await self.send_command(ctx, command) @pterodactyl.group(autohelp = True, name = "power") @commands.admin() async def pterodactyl_power(self, ctx: commands.Context) -> None: """Send power actions to the server.""" @pterodactyl_power.command(name = "start") async def pterodactyl_power_start(self, ctx: commands.Context) -> Optional[discord.Message]: """Start the server.""" return await self.power(ctx, "start", "starting...") @pterodactyl_power.command(name = "stop") async def pterodactyl_power_stop(self, ctx: commands.Context) -> Optional[discord.Message]: """Stop the server.""" return await self.power(ctx, "stop", "stopping...") @pterodactyl_power.command(name = "restart") async def pterodactyl_power_restart(self, ctx: commands.Context) -> Optional[discord.Message]: """Restart the server.""" return await self.power(ctx, "restart", "restarting...") @pterodactyl_power.command(name = "kill") async def pterodactyl_power_kill(self, ctx: commands.Context) -> Optional[discord.Message]: """Kill the server.""" return await self.power(ctx, "kill", "stopping... (forcefully killed)", warning="**⚠️ Forcefully killing the server process can corrupt data in some cases. ⚠️**\n") @pterodactyl.group(autohelp = True, name = "config", aliases = ["settings", "set"]) @commands.is_owner() async def pterodactyl_config(self, ctx: commands.Context) -> None: """Configure Pterodactyl settings.""" @pterodactyl_config.command(name = "url") async def pterodactyl_config_base_url(self, ctx: commands.Context, *, base_url: str) -> None: """Set the base URL of your Pterodactyl Panel. Please include the protocol (http/https). Example: `https://panel.example.com`""" await config.base_url.set(base_url) await ctx.send(f"Base URL set to {base_url}") logger.info("Configuration value set: base_url = %s\nRestarting task...", base_url) self.task.cancel() self.retry_counter = 0 self.task = self.get_task() @pterodactyl_config.command(name = "serverid") async def pterodactyl_config_server_id(self, ctx: commands.Context, *, server_id: str) -> None: """Set the ID of your server.""" await config.server_id.set(server_id) await ctx.send(f"Server ID set to {server_id}") logger.info("Configuration value set: server_id = %s\nRestarting task...", server_id) self.task.cancel() self.retry_counter = 0 self.task = self.get_task() @pterodactyl_config.command(name = "consolechannel") async def pterodactyl_config_console_channel(self, ctx: commands.Context, channel: discord.TextChannel) -> None: """Set the channel to send console output to.""" await config.console_channel.set(channel.id) await ctx.send(f"Console channel set to {channel.mention}") @pterodactyl_config.command(name = "invite") async def pterodactyl_config_invite(self, ctx: commands.Context, invite: str) -> None: """Set the invite link for your server.""" await config.invite.set(invite) await ctx.send(f"Invite link set to {invite}") @pterodactyl_config.group(name = "topic") async def pterodactyl_config_topic(self, ctx: commands.Context): """Set the topic for the console and chat channels.""" @pterodactyl_config_topic.command(name = "host", aliases = ["hostname", "ip"]) async def pterodactyl_config_topic_host(self, ctx: commands.Context, host: str) -> None: """Set the hostname or IP address of your server.""" await config.topic_hostname.set(host) await ctx.send(f"Hostname/IP set to {host}") @pterodactyl_config_topic.command(name = "port") async def pterodactyl_config_topic_port(self, ctx: commands.Context, port: int) -> None: """Set the port of your server.""" await config.topic_port.set(port) await ctx.send(f"Port set to {port}") @pterodactyl_config_topic.command(name = "text") async def pterodactyl_config_topic_text(self, ctx: commands.Context, *, text: str) -> None: """Set the text for the console and chat channels. Available placeholders: - `.$H` (hostname) - `.$O` (port) Available for Minecraft servers: - `.$I` (ip) - `.$M` (max players) - `.$P` (players online) - `.$V` (version) - `.$D` (description / Message of the Day)""" await config.topic.set(text) await ctx.send(f"Topic set to:\n{box(text, 'yaml')}") @pterodactyl_config.group(name = "chat") async def pterodactyl_config_chat(self, ctx: commands.Context): """Configure chat settings.""" @pterodactyl_config_chat.command(name = "channel") async def pterodactyl_config_chat_channel(self, ctx: commands.Context, channel: discord.TextChannel) -> None: """Set the channel to send chat output to.""" await config.chat_channel.set(channel.id) await ctx.send(f"Chat channel set to {channel.mention}") @pterodactyl_config_chat.command(name = "command") async def pterodactyl_config_chat_command(self, ctx: commands.Context, *, command: str) -> None: """Set the command that will be used to send messages from Discord. Required placeholders: `.$U` (username), `.$M` (message), `.$C` (color) See [documentation](https://seacogs.coastalcommits.com/pterodactyl/setup/#changing-the-tellraw-command) for more information.""" await config.chat_command.set(command) await ctx.send(f"Chat command set to:\n{box(command, 'json')}") @pterodactyl_config.group(name = "regex") async def pterodactyl_config_regex(self, ctx: commands.Context) -> None: """Set regex patterns.""" @pterodactyl_config_regex.command(name = "chat") async def pterodactyl_config_regex_chat(self, ctx: commands.Context, *, regex: str) -> None: """Set the regex pattern to match chat messages on the server. See [documentation](https://seacogs.coastalcommits.com/pterodactyl/setup/#my-chat-messages-arent-detected) for more information.""" await config.chat_regex.set(regex) await ctx.send(f"Chat regex set to:\n{box(regex, 'regex')}") @pterodactyl_config_regex.command(name = "server") async def pterodactyl_config_regex_server(self, ctx: commands.Context, *, regex: str) -> None: """Set the regex pattern to match server messages on the server. See [documentation](https://seacogs.coastalcommits.com/pterodactyl/setup/#my-chat-messages-arent-detected) for more information.""" await config.server_regex.set(regex) await ctx.send(f"Server regex set to:\n{box(regex, 'regex')}") @pterodactyl_config_regex.command(name = "join") async def pterodactyl_config_regex_join(self, ctx: commands.Context, *, regex: str) -> None: """Set the regex pattern to match join messages on the server. See [documentation](https://seacogs.coastalcommits.com/pterodactyl/setup/#my-chat-messages-arent-detected) for more information.""" await config.join_regex.set(regex) await ctx.send(f"Join regex set to:\n{box(regex, 'regex')}") @pterodactyl_config_regex.command(name = "leave") async def pterodactyl_config_regex_leave(self, ctx: commands.Context, *, regex: str) -> None: """Set the regex pattern to match leave messages on the server. See [documentation](https://seacogs.coastalcommits.com/pterodactyl/setup/#my-chat-messages-arent-detected) for more information.""" await config.leave_regex.set(regex) await ctx.send(f"Leave regex set to:\n{box(regex, 'regex')}") @pterodactyl_config_regex.command(name = "achievement") async def pterodactyl_config_regex_achievement(self, ctx: commands.Context, *, regex: str) -> None: """Set the regex pattern to match achievement messages on the server. See [documentation](https://seacogs.coastalcommits.com/pterodactyl/setup/#my-chat-messages-arent-detected) for more information.""" await config.achievement_regex.set(regex) await ctx.send(f"Achievement regex set to:\n{box(regex, 'regex')}") @pterodactyl_config.group(name = "messages", aliases = ['msg', 'msgs', 'message']) async def pterodactyl_config_messages(self, ctx: commands.Context): """Configure message settings.""" @pterodactyl_config_messages.command(name = "startup") async def pterodactyl_config_messages_startup(self, ctx: commands.Context, *, message: str) -> None: """Set the message that will be sent when the server starts.""" await config.startup_msg.set(message) await ctx.send(f"Startup message set to: {message}") @pterodactyl_config_messages.command(name = "shutdown") async def pterodactyl_config_messages_shutdown(self, ctx: commands.Context, *, message: str) -> None: """Set the message that will be sent when the server stops.""" await config.shutdown_msg.set(message) await ctx.send(f"Shutdown message set to: {message}") @pterodactyl_config_messages.command(name = "join") async def pterodactyl_config_messages_join(self, ctx: commands.Context, *, message: str) -> None: """Set the message that will be sent when a user joins the server. This is only shown in embeds.""" await config.join_msg.set(message) await ctx.send(f"Join message set to: {message}") @pterodactyl_config_messages.command(name = "leave") async def pterodactyl_config_messages_leave(self, ctx: commands.Context, *, message: str) -> None: """Set the message that will be sent when a user leaves the server. This is only shown in embeds.""" await config.leave_msg.set(message) await ctx.send(f"Leave message set to: {message}") @pterodactyl_config.command(name = "ip") async def pterodactyl_config_mask_ip(self, ctx: commands.Context, mask: bool) -> None: """Mask the IP addresses of users in console messages.""" await config.mask_ip.set(mask) await ctx.send(f"IP masking set to {mask}") @pterodactyl_config.command(name = "api") async def pterodactyl_config_api(self, ctx: commands.Context, endpoint: str) -> None: """Set the API endpoint for retrieving user avatars. This is only used for retrieving user avatars for webhook messages. See [PlayerDB](https://playerdb.co/) for valid endpoints. Usually, you should leave this as default.""" await config.api_endpoint.set(endpoint) await ctx.send(f"API endpoint set to {endpoint}") @pterodactyl_config_regex.group(name = "blacklist", aliases = ['block', 'blocklist'],) async def pterodactyl_config_regex_blacklist(self, ctx: commands.Context): """Blacklist regex patterns.""" @pterodactyl_config_regex_blacklist.command(name = "add") async def pterodactyl_config_regex_blacklist_add(self, ctx: commands.Context, name: str, *, regex: str) -> None: """Add a regex pattern to the blacklist.""" async with config.regex_blacklist() as blacklist: blacklist: dict if name not in blacklist: blacklist.update({name: regex}) await ctx.send(f"Added `{name}` to the regex blacklist.\n{box(regex, 're')}") else: view = ConfirmView(ctx.author, disable_buttons=True) msg = await ctx.send(f"Name `{name}` already exists in the blacklist. Would you like to update it? Current value:\n{box(blacklist[name], 're')}", view=view) await view.wait() if view.result is True: blacklist.update({name: regex}) await msg.edit(f"Updated `{name}` in the regex blacklist.\n{box(regex, 're')}") else: await msg.edit(content="Cancelled.") @pterodactyl_config_regex_blacklist.command(name = "remove") async def pterodactyl_config_regex_blacklist_remove(self, ctx: commands.Context, name: str) -> None: """Remove a regex pattern from the blacklist.""" async with config.regex_blacklist() as blacklist: blacklist: dict if name in blacklist: view = ConfirmView(ctx.author, disable_buttons=True) msg = await ctx.send(f"Are you sure you want to remove `{name}` from the regex blacklist?\n{box(blacklist[name], 're')}", view=view) await view.wait() if view.result is True: del blacklist[name] await msg.edit(content="Removed `{name}` from the regex blacklist.") else: await msg.edit(content="Cancelled.") else: await ctx.send(f"Name `{name}` does not exist in the blacklist.") @pterodactyl_config.command(name = 'view', aliases = ['show']) async def pterodactyl_config_view(self, ctx: commands.Context) -> None: """View the current configuration.""" base_url = await config.base_url() server_id = await config.server_id() console_channel = await config.console_channel() chat_channel = await config.chat_channel() chat_command = await config.chat_command() chat_regex = await config.chat_regex() server_regex = await config.server_regex() join_regex = await config.join_regex() leave_regex = await config.leave_regex() achievement_regex = await config.achievement_regex() startup_msg = await config.startup_msg() shutdown_msg = await config.shutdown_msg() join_msg = await config.join_msg() leave_msg = await config.leave_msg() mask_ip = await config.mask_ip() api_endpoint = await config.api_endpoint() invite = await config.invite() regex_blacklist: dict = await config.regex_blacklist() embed = discord.Embed(color = await ctx.embed_color(), title="Pterodactyl Configuration") embed.description = f"""**Base URL:** {base_url} **Server ID:** `{server_id}` **Console Channel:** <#{console_channel}> **Chat Channel:** <#{chat_channel}> **Startup Message:** {startup_msg} **Shutdown Message:** {shutdown_msg} **Join Message:** {join_msg} **Leave Message:** {leave_msg} **Mask IP:** {self.get_bool_str(mask_ip)} **API Endpoint:** `{api_endpoint}` **Invite:** {invite} **Chat Command:** {box(chat_command, 'json')} **Chat Regex:** {box(chat_regex, 're')} **Server Regex:** {box(server_regex, 're')} **Join Regex:** {box(join_regex, 're')} **Leave Regex:** {box(leave_regex, 're')} **Achievement Regex:** {box(achievement_regex, 're')}""" await ctx.send(embed=embed) if not len(regex_blacklist) == 0: regex_blacklist_embed = discord.Embed(color = await ctx.embed_color(), title="Regex Blacklist") for name, regex in regex_blacklist.items(): regex_blacklist_embed.add_field(name=name, value=box(regex, 're'), inline=False) await ctx.send(embed=regex_blacklist_embed) def get_bool_str(self, inp: bool) -> str: """Return a string representation of a boolean.""" return "Enabled" if inp else "Disabled"