# pylint: disable=cyclic-import import json import re from logging import getLogger from typing import Optional, Union import aiohttp import discord import websockets from pydactyl import PterodactylClient from redbot.core.utils.chat_formatting import bold, pagify from pterodactyl.config import config from pterodactyl.logger import logger from pterodactyl.pterodactyl import Pterodactyl async def establish_websocket_connection(coginstance: Pterodactyl) -> None: base_url = await config.base_url() base_url = base_url[:-1] if base_url.endswith('/') else base_url logger.info("Establishing WebSocket connection") websocket_credentials = await retrieve_websocket_credentials(coginstance) async with websockets.connect(websocket_credentials['data']['socket'], origin=base_url, ping_timeout=60, logger=getLogger("red.sea.pterodactyl.websocket")) as websocket: logger.info("WebSocket connection established") auth_message = json.dumps({"event": "auth", "args": [websocket_credentials['data']['token']]}) await websocket.send(auth_message) logger.info("Authentication message sent") coginstance.websocket = websocket while True: # pylint: disable=too-many-nested-blocks message = await websocket.recv() if json.loads(message)['event'] in ('token expiring', 'token expired'): logger.info("Received token expiring/expired event. Refreshing token.") websocket_credentials = await retrieve_websocket_credentials(coginstance) auth_message = json.dumps({"event": "auth", "args": [websocket_credentials['data']['token']]}) await websocket.send(auth_message) logger.info("Authentication message sent") if json.loads(message)['event'] == 'auth success': logger.info("WebSocket authentication successful") if json.loads(message)['event'] == 'console output' and await config.console_channel() is not None: if await config.current_status() in ('running', 'offline', ''): content = remove_ansi_escape_codes(json.loads(message)['args'][0]) if await config.mask_ip() is True: content = mask_ip(content) channel = coginstance.bot.get_channel(await config.console_channel()) if channel is not None: if content.startswith('['): pagified_content = pagify(content, delims=[" ", "\n"]) for page in pagified_content: await channel.send(content=page) server_message = await check_if_server_message(content) if server_message: channel = coginstance.bot.get_channel(await config.chat_channel()) if channel is not None: await channel.send(server_message if len(server_message) < 2000 else server_message[:1997] + '...', allowed_mentions=discord.AllowedMentions.none()) chat_message = await check_if_chat_message(content) if chat_message: info = await get_info(chat_message['username']) if info is not None: await send_chat_discord(coginstance, chat_message['username'], chat_message['message'], info['data']['player']['avatar']) else: await send_chat_discord(coginstance, chat_message['username'], chat_message['message'], 'https://seafsh.cc/u/j3AzqQ.png') join_message = await check_if_join_message(content) if join_message: channel = coginstance.bot.get_channel(await config.chat_channel()) if channel is not None: if coginstance.bot.embed_requested(channel): await channel.send(embed=await generate_join_leave_embed(join_message, True)) else: await channel.send(f"{join_message} joined the game", allowed_mentions=discord.AllowedMentions.none()) leave_message = await check_if_leave_message(content) if leave_message: channel = coginstance.bot.get_channel(await config.chat_channel()) if channel is not None: if coginstance.bot.embed_requested(channel): await channel.send(embed=await generate_join_leave_embed(leave_message, False)) else: await channel.send(f"{leave_message} left the game", allowed_mentions=discord.AllowedMentions.none()) achievement_message = await check_if_achievement_message(content) if achievement_message: channel = coginstance.bot.get_channel(await config.chat_channel()) if channel is not None: if coginstance.bot.embed_requested(channel): await channel.send(embed=await generate_achievement_embed(achievement_message['username'], achievement_message['achievement'], achievement_message['challenge'])) else: await channel.send(f"{achievement_message['username']} has {'completed the challenge' if achievement_message['challenge'] else 'made the advancement'} {achievement_message['achievement']}") if json.loads(message)['event'] == 'status': old_status = await config.current_status() current_status = json.loads(message)['args'][0] if old_status != current_status: await config.current_status.set(current_status) if await config.console_channel() is not None: console = coginstance.bot.get_channel(await config.console_channel()) if console is not None: await console.send(f"Server status changed! `{current_status}`") if await config.chat_channel() is not None: if current_status == 'running' and await config.startup_msg() is not None: chat = coginstance.bot.get_channel(await config.chat_channel()) if chat is not None: await chat.send(await config.startup_msg()) if current_status == 'stopping' and await config.shutdown_msg() is not None: chat = coginstance.bot.get_channel(await config.chat_channel()) if chat is not None: await chat.send(await config.shutdown_msg()) async def retrieve_websocket_credentials(coginstance: Pterodactyl) -> Optional[dict]: pterodactyl_keys = await coginstance.bot.get_shared_api_tokens("pterodactyl") api_key = pterodactyl_keys.get("api_key") if api_key is None: coginstance.task.cancel() raise ValueError("Pterodactyl API key not set. Please set it using `[p]set api`.") base_url = await config.base_url() if base_url is None: coginstance.task.cancel() raise ValueError("Pterodactyl base URL not set. Please set it using `[p]pterodactyl config url`.") server_id = await config.server_id() if server_id is None: coginstance.task.cancel() raise ValueError("Pterodactyl server ID not set. Please set it using `[p]pterodactyl config serverid`.") client = PterodactylClient(base_url, api_key, debug=True).client coginstance.client = client websocket_credentials = client.servers.get_websocket(server_id) logger.debug("""Websocket connection details retrieved: Socket: %s Token: %s...""", websocket_credentials['data']['socket'], websocket_credentials['data']['token'][:20] ) return websocket_credentials #NOTE - The token is truncated to prevent it from being logged in its entirety, for security reasons def remove_ansi_escape_codes(text: str) -> str: ansi_escape = re.compile(r'\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])') #NOTE - https://chat.openai.com/share/d92f9acf-d776-4fd6-a53f-b14ac15dd540 return ansi_escape.sub('', text) async def check_if_server_message(text: str) -> Union[bool, str]: logger.debug("Checking if message is a server message") regex = await config.server_regex() match: Optional[re.Match[str]] = re.match(regex, text) if match: logger.debug("Message is a server message") return match.group(1) logger.debug("Message is not a server message") return False async def check_if_chat_message(text: str) -> Union[bool, dict]: logger.debug("Checking if message is a chat message") regex = await config.chat_regex() match: Optional[re.Match[str]] = re.match(regex, text) if match: groups = {"username": match.group(1), "message": match.group(2)} logger.debug("Message is a chat message\n%s", json.dumps(groups)) return groups logger.debug("Message is not a chat message") return False async def check_if_join_message(text: str) -> Union[bool, str]: logger.debug("Checking if message is a join message") regex = await config.join_regex() match: Optional[re.Match[str]] = re.match(regex, text) if match: logger.debug("Message is a join message") return match.group(1) logger.debug("Message is not a join message") return False async def check_if_leave_message(text: str) -> Union[bool, str]: logger.debug("Checking if message is a leave message") regex = await config.leave_regex() match: Optional[re.Match[str]] = re.match(regex, text) if match: logger.debug("Message is a leave message") return match.group(1) logger.debug("Message is not a leave message") return False async def check_if_achievement_message(text: str) -> Union[bool, dict]: logger.debug("Checking if message is an achievement message") regex = await config.achievement_regex() match: Optional[re.Match[str]] = re.match(regex, text) if match: groups = {"username": match.group(1), "achievement": match.group(3)} if match.group(2) == "completed the challenge": groups["challenge"] = True else: groups["challenge"] = False logger.debug("Message is an achievement message\n%s", json.dumps(groups)) return groups logger.debug("Message is not an achievement message") return False async def get_info(username: str) -> Optional[dict]: logger.debug("Retrieving player info for %s", username) endpoint = await config.api_endpoint() async with aiohttp.ClientSession() as session: async with session.get(f"https://playerdb.co/api/player/{endpoint}/{username}") as response: if response.status == 200: logger.debug("Player info retrieved for %s\n%s", username, json.dumps(await response.json())) return await response.json() logger.error("Failed to retrieve player info for %s: %s", username, response.status) return None async def send_chat_discord(coginstance: Pterodactyl, username: str, message: str, avatar_url: str) -> None: logger.debug("Sending chat message to Discord") channel = coginstance.bot.get_channel(await config.chat_channel()) if channel is not None: webhooks = await channel.webhooks() webhook = discord.utils.get(webhooks, name="Pterodactyl Chat") if webhook is None: webhook = await channel.create_webhook(name="Pterodactyl Chat") await webhook.send(content=message, username=username, avatar_url=avatar_url, allowed_mentions=discord.AllowedMentions.none()) logger.debug("Chat message sent to Discord") else: logger.debug("Chat channel not set. Skipping sending chat message to Discord") async def generate_join_leave_embed(username: str, join: bool) -> discord.Embed: embed = discord.Embed() embed.color = discord.Color.green() if join else discord.Color.red() embed.description = await config.join_msg() if join else await config.leave_msg() info = await get_info(username) if info: embed.set_author(name=username, icon_url=info['data']['player']['avatar']) else: embed.set_author(name=username, icon_url='https://seafsh.cc/u/j3AzqQ.png') embed.timestamp = discord.utils.utcnow() return embed async def generate_achievement_embed(username: str, achievement: str, challenge: bool) -> discord.Embed: embed = discord.Embed() embed.color = discord.Color.dark_purple() if challenge else discord.Color.brand_green() embed.description = f"{bold(username)} has {'completed the challenge' if challenge else 'made the advancement'} {bold(achievement)}" info = await get_info(username) if info: embed.set_author(name=username, icon_url=info['data']['player']['avatar']) else: embed.set_author(name=username, icon_url='https://seafsh.cc/u/j3AzqQ.png') embed.timestamp = discord.utils.utcnow() return embed def mask_ip(string: str) -> str: def check(match): ip = match.group(0) masked_ip = '.'.join(r'\*' * len(octet) for octet in ip.split('.')) return masked_ip return re.sub(r'\b(?:\d{1,3}\.){3}\d{1,3}\b', check, string)