import json import logging import re from typing import Optional, Union import aiohttp import discord import websockets from pydactyl import PterodactylClient, exceptions from redbot.core import Config, commands from redbot.core.bot import Red class Pterodactyl(commands.Cog): """Pterodactyl allows you to manage your Pterodactyl Panel from Discord.""" def __init__(self, bot: Red): self.bot = bot self.config = Config.get_conf(self, identifier=457581387213637448123567, force_registration=True) self.config.register_global( base_url=None, api_key=None, server_id=None, console_channel=None, startup_jar=None, startup_arguments=None, power_action_in_progress=False, chat_regex=r"\[(\d{2}:\d{2}:\d{2})\sINFO\]:\s<(\w+)>\s(.*)", api_endpoint="minecraft", chat_channel=None ) self.logger = logging.getLogger('red.sea.pterodactyl') self.client = None self.task = None self.websocket = None async def establish_websocket_connection(self): self.logger.debug("Establishing WebSocket connection") base_url = await self.config.base_url() api_key = await self.config.api_key() server_id = await self.config.server_id() try: client = PterodactylClient(base_url, api_key, debug=True).client self.client = client websocket_credentials = client.servers.get_websocket(server_id) self.logger.debug("""Websocket connection details retrieved: Socket: %s Token: %s...""", websocket_credentials['data']['socket'], websocket_credentials['data']['token'][:20] ) #NOTE - The token is truncated to prevent it from being logged in its entirety, for security reasons except exceptions.ClientConfigError as e: self.logger.error('Failed to initialize Pterodactyl client: %s', e) return except exceptions.PterodactylApiError as e: self.logger.error('Failed to retrieve Pterodactyl websocket: %s', e) return async for websocket in websockets.connect(websocket_credentials['data']['socket'], origin=base_url, ping_timeout=60): try: self.logger.debug("WebSocket connection established") auth_message = json.dumps({"event": "auth", "args": [websocket_credentials['data']['token']]}) await websocket.send(auth_message) self.logger.debug("Authentication message sent") self.websocket = websocket current_status = '' while True: message = await websocket.recv() if json.loads(message)['event'] in ['token expiring', 'token expired']: self.logger.debug("Received token expiring/expired event. Refreshing token.") websocket_credentials = client.servers.get_websocket(server_id) auth_message = json.dumps({"event": "auth", "args": [websocket_credentials['data']['token']]}) await websocket.send(auth_message) self.logger.debug("Authentication message sent") if json.loads(message)['event'] == 'auth success': self.logger.debug("Authentication successful") if json.loads(message)['event'] == 'console output' and await self.config.console_channel() is not None: if current_status == 'running' or current_status == 'offline' or current_status == '': channel = self.bot.get_channel(await self.config.console_channel()) if channel is not None: content = self.remove_ansi_escape_codes(json.loads(message)['args'][0][:1900]) if content.startswith('['): await channel.send(content=content) #TODO - Add pagification for long messages to prevent Discord API errors chat_message = await self.check_if_chat_message(content) if chat_message: info = await self.get_info(chat_message['username']) if info is not None: await self.send_chat_discord(chat_message['username'], chat_message['message'], info['data']['player']['avatar']) if json.loads(message)['event'] == 'status': current_status = json.loads(message)['args'][0] console = self.bot.get_channel(await self.config.console_channel()) if console is not None: await console.send(f"Server status changed! `{json.loads(message)['args'][0]}`") except websockets.exceptions.ConnectionClosed as e: self.logger.debug("WebSocket connection closed: %s", e) websocket_credentials = client.servers.get_websocket(server_id) continue def remove_ansi_escape_codes(self, text: str) -> str: ansi_escape = re.compile(r'\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])') #NOTE - https://chat.openai.com/share/d92f9acf-d776-4fd6-a53f-b14ac15dd540 return ansi_escape.sub('', text) async def check_if_chat_message(self, text: str) -> Union[bool, dict]: self.logger.debug("Checking if message is a chat message") regex = await self.config.chat_regex() match: Optional[re.Match[str]] = re.match(regex, text) if match: dict = {"time": match.group(1), "username": match.group(2), "message": match.group(3)} self.logger.debug("Message is a chat message\n%s", json.dumps(dict)) return dict self.logger.debug("Message is not a chat message") return False async def get_info(self, username: str) -> Optional[dict]: self.logger.debug("Retrieving player info for %s", username) endpoint = await self.config.api_endpoint() async with aiohttp.ClientSession() as session: async with session.get(f"https://playerdb.co/api/player/{endpoint}/{username}") as response: if response.status == 200: self.logger.debug("Player info retrieved for %s\n%s", username, json.dumps(await response.json())) return await response.json() else: self.logger.error("Failed to retrieve player info for %s: %s", username, response.status) return None async def send_chat_discord(self, username: str, message: str, avatar_url: str) -> None: self.logger.debug("Sending chat message to Discord") channel = self.bot.get_channel(await self.config.chat_channel()) if channel is not None: webhooks = await channel.webhooks() webhook = discord.utils.get(webhooks, name="Pterodactyl Chat") if webhook is None: webhook = await channel.create_webhook(name="Pterodactyl Chat") await webhook.send(content=message, username=username, avatar_url=avatar_url) self.logger.debug("Chat message sent to Discord") else: self.logger.debug("Chat channel not set. Skipping sending chat message to Discord") def get_tellraw_string(self, username: str, message: str, color: discord.Color): return f'tellraw @a ["",{{"text":"{username} ","color":"{str(color)}"}},{{"text":" (DISCORD): ","color":"blue"}},{{"text":"{message}","color":"white"}}]' def get_task(self): return self.bot.loop.create_task(self.establish_websocket_connection(), name="Pterodactyl Websocket Connection") async def cog_load(self): self.task = self.get_task() async def cog_unload(self): self.task.cancel() await self.client._session.close() # pylint: disable=protected-access @commands.Cog.listener() async def on_message(self, message: discord.Message): if message.channel.id == await self.config.console_channel() and not message.author.bot: self.logger.debug("Received console command from %s: %s", message.author.id, message.content) await message.channel.send(f"Received console command from {message.author.id}: {message.content[:1900]}") await self.websocket.send(json.dumps({"event": "send command", "args": [message.content]})) if message.channel.id == await self.config.chat_channel() and not message.author.bot: self.logger.debug("Received chat message from %s: %s", message.author.id, message.content) channel = self.bot.get_channel(await self.config.console_channel()) if channel: await channel.send(f"Received chat message from {message.author.id}: {message.content[:1900]}") msg = json.dumps({"event": "send command", "args": [self.get_tellraw_string(message.author.name, message.content, message.author.color)]}) self.logger.debug("Sending chat message to server:\n%s", msg) await self.websocket.send(msg) @commands.group(autohelp = True, name = "pterodactyl", aliases = ["ptero"]) async def pterodactyl(self, ctx: commands.Context): """Pterodactyl allows you to manage your Pterodactyl Panel from Discord.""" @pterodactyl.group(autohelp = True, name = "config", aliases = ["settings", "set"]) async def pterodactyl_config(self, ctx: commands.Context): """Configure Pterodactyl settings.""" @pterodactyl_config.command(name = "url") async def pterodactyl_config_base_url(self, ctx: commands.Context, base_url: str): """Set the base URL of your Pterodactyl Panel. Please include the protocol (http/https).""" await self.config.base_url.set(base_url) await ctx.send(f"Base URL set to {base_url}") self.logger.debug("Configuration value set: base_url = %s\nRestarting task...", base_url) self.task.cancel() self.task = self.get_task() @pterodactyl_config.command(name = "apikey") async def pterodactyl_config_api_key(self, ctx: commands.Context, api_key: str): """Set the API key for your Pterodactyl Panel.""" await self.config.api_key.set(api_key) await ctx.send(f"API key set to `{api_key[:5]}...{api_key[-4:]}`") self.logger.debug("Configuration value set: api_key = %s\nRestarting task...", api_key) self.task.cancel() self.task = self.get_task() @pterodactyl_config.command(name = "serverid") async def pterodactyl_config_server_id(self, ctx: commands.Context, server_id: str): """Set the server ID for your Pterodactyl Panel.""" await self.config.server_id.set(server_id) await ctx.send(f"Server ID set to {server_id}") self.logger.debug("Configuration value set: server_id = %s\nRestarting task...", server_id) self.task.cancel() self.task = self.get_task() @pterodactyl_config.command(name = "consolechannel") async def pterodactyl_config_console_channel(self, ctx: commands.Context, channel: discord.TextChannel): """Set the channel to send console output to.""" await self.config.console_channel.set(channel.id) await ctx.send(f"Console channel set to {channel.mention}") @pterodactyl_config.command(name = "chatchannel") async def pterodactyl_config_chat_channel(self, ctx: commands.Context, channel: discord.TextChannel): """Set the channel to send chat output to.""" await self.config.chat_channel.set(channel.id) await ctx.send(f"Chat channel set to {channel.mention}")