import asyncio import json from typing import Mapping, Optional import discord import websockets from pydactyl import PterodactylClient from redbot.core import commands from redbot.core.bot import Red from redbot.core.utils.chat_formatting import box from pterodactyl.config import config, register_config from pterodactyl.logger import logger class Pterodactyl(commands.Cog): """Pterodactyl allows you to manage your Pterodactyl Panel from Discord.""" def __init__(self, bot: Red): self.bot = bot self.client: Optional[PterodactylClient] = None self.task: Optional[asyncio.Task] = None self.websocket: Optional[websockets.WebSocketClientProtocol] = None self.retry_counter: int = 0 register_config(config) async def cog_load(self) -> None: self.retry_counter = 0 self.task = self.get_task() async def cog_unload(self) -> None: self.task.cancel() self.retry_counter = 0 await self.client._session.close() # pylint: disable=protected-access def get_task(self) -> asyncio.Task: from pterodactyl.websocket import establish_websocket_connection task = self.bot.loop.create_task(establish_websocket_connection(self), name="Pterodactyl Websocket Connection") task.add_done_callback(self.error_callback) return task def error_callback(self, fut) -> None: #NOTE - Thanks flame442 and zephyrkul for helping me figure this out try: fut.result() except asyncio.CancelledError: logger.info("WebSocket task has been cancelled.") except Exception as e: # pylint: disable=broad-exception-caught logger.error("WebSocket task has failed: %s", e, exc_info=e) self.task.cancel() if self.retry_counter < 5: self.retry_counter += 1 logger.info("Retrying in %s seconds...", 5 * self.retry_counter) self.task = self.bot.loop.call_later(5 * self.retry_counter, self.get_task) else: logger.info("Retry limit reached. Stopping task.") @commands.Cog.listener() async def on_message_without_command(self, message: discord.Message) -> None: if message.channel.id == await config.console_channel() and message.author.bot is False: logger.debug("Received console command from %s: %s", message.author.id, message.content) await message.channel.send(f"Received console command from {message.author.id}: {message.content[:1900]}") try: await self.websocket.send(json.dumps({"event": "send command", "args": [message.content]})) except websockets.exceptions.ConnectionClosed as e: logger.error("WebSocket connection closed: %s", e) self.task.cancel() self.retry_counter = 0 self.task = self.get_task() if message.channel.id == await config.chat_channel() and message.author.bot is False: logger.debug("Received chat message from %s: %s", message.author.id, message.content) channel = self.bot.get_channel(await config.console_channel()) if channel: await channel.send(f"Received chat message from {message.author.id}: {message.content[:1900]}") msg = json.dumps({"event": "send command", "args": [await self.get_chat_command(message)]}) logger.debug("Sending chat message to server:\n%s", msg) try: await self.websocket.send(msg) except websockets.exceptions.ConnectionClosed as e: logger.error("WebSocket connection closed: %s", e) self.task.cancel() self.retry_counter = 0 self.task = self.get_task() async def get_chat_command(self, message: discord.Message) -> str: command: str = await config.chat_command() placeholders = { "C": str(message.author.color), "D": message.author.discriminator, "I": str(message.author.id), "M": message.content.replace('"',''), "N": message.author.display_name, "U": message.author.name, } for key, value in placeholders.items(): command = command.replace('.$' + key, value) return command @commands.Cog.listener() async def on_red_api_tokens_update(self, service_name: str, api_tokens: Mapping[str,str]): # pylint: disable=unused-argument if service_name == "pterodactyl": logger.info("Configuration value set: api_key\nRestarting task...") self.task.cancel() self.retry_counter = 0 self.task = self.get_task() @commands.group(autohelp = True, name = "pterodactyl", aliases = ["ptero"]) async def pterodactyl(self, ctx: commands.Context) -> None: """Pterodactyl allows you to manage your Pterodactyl Panel from Discord.""" @pterodactyl.group(autohelp = True, name = "power") @commands.admin() async def pterodactyl_power(self, ctx: commands.Context) -> None: """Send power actions to the server.""" @pterodactyl_power.command(name = "start") async def pterodactyl_power_start(self, ctx: commands.Context) -> None: """Start the server.""" current_status = await config.current_status() if current_status == "running": return await ctx.send("Server is already running.") if current_status in ["starting", "stopping"]: return await ctx.send("Another power action is already in progress.") message = await ctx.send("Sending websocket command to start server...") await self.websocket.send(json.dumps({"event": "set state", "args": ["start"]})) await message.edit(content="Server starting...") @pterodactyl_power.command(name = "stop") async def pterodactyl_power_stop(self, ctx: commands.Context) -> None: """Stop the server.""" current_status = await config.current_status() if current_status == "stopped": return await ctx.send("Server is already stopped.") if current_status in ["starting", "stopping"]: return await ctx.send("Another power action is already in progress.") message = await ctx.send("Sending websocket command to stop server...") await self.websocket.send(json.dumps({"event": "set state", "args": ["stop"]})) await message.edit(content="Server stopping...") @pterodactyl_power.command(name = "restart") async def pterodactyl_power_restart(self, ctx: commands.Context) -> None: """Restart the server.""" current_status = await config.current_status() if current_status in ["starting", "stopping"]: return await ctx.send("Another power action is already in progress.") message = await ctx.send("Sending websocket command to restart server...") await self.websocket.send(json.dumps({"event": "set state", "args": ["restart"]})) await message.edit(content="Server restarting...") @pterodactyl.group(autohelp = True, name = "config", aliases = ["settings", "set"]) @commands.is_owner() async def pterodactyl_config(self, ctx: commands.Context) -> None: """Configure Pterodactyl settings.""" @pterodactyl_config.command(name = "url") async def pterodactyl_config_base_url(self, ctx: commands.Context, *, base_url: str) -> None: """Set the base URL of your Pterodactyl Panel. Please include the protocol (http/https). Example: `https://panel.example.com`""" await config.base_url.set(base_url) await ctx.send(f"Base URL set to {base_url}") logger.info("Configuration value set: base_url = %s\nRestarting task...", base_url) self.task.cancel() self.retry_counter = 0 self.task = self.get_task() @pterodactyl_config.command(name = "serverid") async def pterodactyl_config_server_id(self, ctx: commands.Context, *, server_id: str) -> None: """Set the ID of your server.""" await config.server_id.set(server_id) await ctx.send(f"Server ID set to {server_id}") logger.info("Configuration value set: server_id = %s\nRestarting task...", server_id) self.task.cancel() self.retry_counter = 0 self.task = self.get_task() @pterodactyl_config.command(name = "consolechannel") async def pterodactyl_config_console_channel(self, ctx: commands.Context, channel: discord.TextChannel) -> None: """Set the channel to send console output to.""" await config.console_channel.set(channel.id) await ctx.send(f"Console channel set to {channel.mention}") @pterodactyl_config.group(name = "chat") async def pterodactyl_config_chat(self, ctx: commands.Context): """Configure chat settings.""" @pterodactyl_config_chat.command(name = "channel") async def pterodactyl_config_chat_channel(self, ctx: commands.Context, channel: discord.TextChannel) -> None: """Set the channel to send chat output to.""" await config.chat_channel.set(channel.id) await ctx.send(f"Chat channel set to {channel.mention}") @pterodactyl_config_chat.command(name = "command") async def pterodactyl_config_chat_command(self, ctx: commands.Context, *, command: str) -> None: """Set the command that will be used to send messages from Discord. Required placeholders: `.$U` (username), `.$M` (message), `.$C` (color) See [documentation](https://seacogs.coastalcommits.com/pterodactyl/setup/#changing-the-tellraw-command) for more information.""" await config.chat_command.set(command) await ctx.send(f"Chat command set to:\n{box(command, 'json')}") @pterodactyl_config.group(name = "regex") async def pterodactyl_config_regex(self, ctx: commands.Context) -> None: """Set regex patterns.""" @pterodactyl_config_regex.command(name = "chat") async def pterodactyl_config_regex_chat(self, ctx: commands.Context, *, regex: str) -> None: """Set the regex pattern to match chat messages on the server. See [documentation](https://seacogs.coastalcommits.com/pterodactyl/setup/#my-chat-messages-arent-detected) for more information.""" await config.chat_regex.set(regex) await ctx.send(f"Chat regex set to:\n{box(regex, 'regex')}") @pterodactyl_config_regex.command(name = "server") async def pterodactyl_config_regex_server(self, ctx: commands.Context, *, regex: str) -> None: """Set the regex pattern to match server messages on the server. See [documentation](https://seacogs.coastalcommits.com/pterodactyl/setup/#my-chat-messages-arent-detected) for more information.""" await config.server_regex.set(regex) await ctx.send(f"Server regex set to:\n{box(regex, 'regex')}") @pterodactyl_config_regex.command(name = "join") async def pterodactyl_config_regex_join(self, ctx: commands.Context, *, regex: str) -> None: """Set the regex pattern to match join messages on the server. See [documentation](https://seacogs.coastalcommits.com/pterodactyl/setup/#my-chat-messages-arent-detected) for more information.""" await config.join_regex.set(regex) await ctx.send(f"Join regex set to:\n{box(regex, 'regex')}") @pterodactyl_config_regex.command(name = "leave") async def pterodactyl_config_regex_leave(self, ctx: commands.Context, *, regex: str) -> None: """Set the regex pattern to match leave messages on the server. See [documentation](https://seacogs.coastalcommits.com/pterodactyl/setup/#my-chat-messages-arent-detected) for more information.""" await config.leave_regex.set(regex) await ctx.send(f"Leave regex set to:\n{box(regex, 'regex')}") @pterodactyl_config_regex.command(name = "achievement") async def pterodactyl_config_regex_achievement(self, ctx: commands.Context, *, regex: str) -> None: """Set the regex pattern to match achievement messages on the server. See [documentation](https://seacogs.coastalcommits.com/pterodactyl/setup/#my-chat-messages-arent-detected) for more information.""" await config.achievement_regex.set(regex) await ctx.send(f"Achievement regex set to:\n{box(regex, 'regex')}") @pterodactyl_config.group(name = "messages", aliases = ['msg', 'msgs', 'message']) async def pterodactyl_config_messages(self, ctx: commands.Context): """Configure message settings.""" @pterodactyl_config_messages.command(name = "startup") async def pterodactyl_config_messages_startup(self, ctx: commands.Context, *, message: str) -> None: """Set the message that will be sent when the server starts.""" await config.startup_msg.set(message) await ctx.send(f"Startup message set to: {message}") @pterodactyl_config_messages.command(name = "shutdown") async def pterodactyl_config_messages_shutdown(self, ctx: commands.Context, *, message: str) -> None: """Set the message that will be sent when the server stops.""" await config.shutdown_msg.set(message) await ctx.send(f"Shutdown message set to: {message}") @pterodactyl_config_messages.command(name = "join") async def pterodactyl_config_messages_join(self, ctx: commands.Context, *, message: str) -> None: """Set the message that will be sent when a user joins the server. This is only shown in embeds.""" await config.join_msg.set(message) await ctx.send(f"Join message set to: {message}") @pterodactyl_config_messages.command(name = "leave") async def pterodactyl_config_messages_leave(self, ctx: commands.Context, *, message: str) -> None: """Set the message that will be sent when a user leaves the server. This is only shown in embeds.""" await config.leave_msg.set(message) await ctx.send(f"Leave message set to: {message}") @pterodactyl_config.command(name = "ip") async def pterodactyl_config_mask_ip(self, ctx: commands.Context, mask: bool) -> None: """Mask the IP addresses of users in console messages.""" await config.mask_ip.set(mask) await ctx.send(f"IP masking set to {mask}") @pterodactyl_config.command(name = "api") async def pterodactyl_config_api(self, ctx: commands.Context, endpoint: str) -> None: """Set the API endpoint for retrieving user avatars. This is only used for retrieving user avatars for webhook messages. See [PlayerDB](https://playerdb.co/) for valid endpoints. Usually, you should leave this as default.""" await config.api_endpoint.set(endpoint) await ctx.send(f"API endpoint set to {endpoint}") @pterodactyl_config.command(name = 'view', aliases = ['show']) async def pterodactyl_config_view(self, ctx: commands.Context) -> None: """View the current configuration.""" base_url = await config.base_url() server_id = await config.server_id() console_channel = await config.console_channel() chat_channel = await config.chat_channel() chat_command = await config.chat_command() chat_regex = await config.chat_regex() server_regex = await config.server_regex() join_regex = await config.join_regex() leave_regex = await config.leave_regex() achievement_regex = await config.achievement_regex() startup_msg = await config.startup_msg() shutdown_msg = await config.shutdown_msg() join_msg = await config.join_msg() leave_msg = await config.leave_msg() mask_ip = await config.mask_ip() api_endpoint = await config.api_endpoint() embed = discord.Embed(color = await ctx.embed_color(), title="Pterodactyl Configuration") embed.description = f"""**Base URL:** {base_url} **Server ID:** `{server_id}` **Console Channel:** <#{console_channel}> **Chat Channel:** <#{chat_channel}> **Startup Message:** {startup_msg} **Shutdown Message:** {shutdown_msg} **Join Message:** {join_msg} **Leave Message:** {leave_msg} **Mask IP:** {self.get_bool_str(mask_ip)} **API Endpoint:** `{api_endpoint}` **Chat Command:** {box(chat_command, 'json')} **Chat Regex:** {box(chat_regex, 're')} **Server Regex:** {box(server_regex, 're')} **Join Regex:** {box(join_regex, 're')} **Leave Regex:** {box(leave_regex, 're')} **Achievement Regex:** {box(achievement_regex, 're')}""" await ctx.send(embed=embed) def get_bool_str(self, inp: bool) -> str: """Return a string representation of a boolean.""" return "Enabled" if inp else "Disabled"