SeaCogs/pterodactyl/websocket.py

125 lines
6.1 KiB
Python

import json
import re
from typing import Optional, Union
import aiohttp
import discord
import websockets
from pydactyl import PterodactylClient, exceptions
from redbot.core.utils.chat_formatting import pagify
from pterodactyl.config import config
from pterodactyl.logger import logger
from pterodactyl.pterodactyl import Pterodactyl
async def establish_websocket_connection(coginstance: Pterodactyl) -> None:
logger.debug("Establishing WebSocket connection")
base_url = await config.base_url()
api_key = await config.api_key()
server_id = await config.server_id()
try:
client = PterodactylClient(base_url, api_key, debug=True).client
coginstance.client = client
websocket_credentials = client.servers.get_websocket(server_id)
logger.debug("""Websocket connection details retrieved:
Socket: %s
Token: %s...""",
websocket_credentials['data']['socket'],
websocket_credentials['data']['token'][:20]
)
#NOTE - The token is truncated to prevent it from being logged in its entirety, for security reasons
except exceptions.ClientConfigError as e:
return logger.error('Failed to initialize Pterodactyl client: %s', e)
except exceptions.PterodactylApiError as e:
return logger.error('Failed to retrieve Pterodactyl websocket: %s', e)
async with websockets.connect(websocket_credentials['data']['socket'], origin=base_url, ping_timeout=60) as websocket:
logger.info("WebSocket connection established")
auth_message = json.dumps({"event": "auth", "args": [websocket_credentials['data']['token']]})
await websocket.send(auth_message)
logger.debug("Authentication message sent")
coginstance.websocket = websocket
current_status = ''
while True:
message = await websocket.recv()
if json.loads(message)['event'] in ('token expiring', 'token expired'):
logger.debug("Received token expiring/expired event. Refreshing token.")
websocket_credentials = client.servers.get_websocket(server_id)
auth_message = json.dumps({"event": "auth", "args": [websocket_credentials['data']['token']]})
await websocket.send(auth_message)
logger.debug("Authentication message sent")
if json.loads(message)['event'] == 'auth success':
logger.info("WebSocket authentication successful")
if json.loads(message)['event'] == 'console output' and await config.console_channel() is not None:
if current_status in ('running', 'offline', ''):
content = remove_ansi_escape_codes(json.loads(message)['args'][0])
channel = coginstance.bot.get_channel(await config.console_channel())
if channel is not None:
if content.startswith('['):
pagified_content = pagify(content, delims=[" ", "\n"])
for page in pagified_content:
await channel.send(content=page)
chat_message = await check_if_chat_message(content)
if chat_message:
info = await get_info(chat_message['username'])
if info is not None:
await send_chat_discord(coginstance, chat_message['username'], chat_message['message'], info['data']['player']['avatar'])
else:
await send_chat_discord(coginstance, chat_message['username'], chat_message['message'], 'https://seafsh.cc/u/j3AzqQ.png')
if json.loads(message)['event'] == 'status':
current_status = json.loads(message)['args'][0]
if await config.console_channel() is not None:
console = coginstance.bot.get_channel(await config.console_channel())
if console is not None:
await console.send(f"Server status changed! `{json.loads(message)['args'][0]}`")
def remove_ansi_escape_codes(text: str) -> str:
ansi_escape = re.compile(r'\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])')
#NOTE - https://chat.openai.com/share/d92f9acf-d776-4fd6-a53f-b14ac15dd540
return ansi_escape.sub('', text)
async def check_if_chat_message(text: str) -> Union[bool, dict]:
logger.debug("Checking if message is a chat message")
regex = await config.chat_regex()
match: Optional[re.Match[str]] = re.match(regex, text)
if match:
groups = {"time": match.group(1), "username": match.group(2), "message": match.group(3)}
logger.debug("Message is a chat message\n%s", json.dumps(groups))
return groups
logger.debug("Message is not a chat message")
return False
async def get_info(username: str) -> Optional[dict]:
logger.debug("Retrieving player info for %s", username)
endpoint = await config.api_endpoint()
async with aiohttp.ClientSession() as session:
async with session.get(f"https://playerdb.co/api/player/{endpoint}/{username}") as response:
if response.status == 200:
logger.debug("Player info retrieved for %s\n%s", username, json.dumps(await response.json()))
return await response.json()
logger.error("Failed to retrieve player info for %s: %s", username, response.status)
return None
async def send_chat_discord(coginstance: Pterodactyl, username: str, message: str, avatar_url: str) -> None:
logger.debug("Sending chat message to Discord")
channel = coginstance.bot.get_channel(await config.chat_channel())
if channel is not None:
webhooks = await channel.webhooks()
webhook = discord.utils.get(webhooks, name="Pterodactyl Chat")
if webhook is None:
webhook = await channel.create_webhook(name="Pterodactyl Chat")
await webhook.send(content=message, username=username, avatar_url=avatar_url)
logger.debug("Chat message sent to Discord")
else:
logger.debug("Chat channel not set. Skipping sending chat message to Discord")