SeaCogs/pterodactyl/websocket.py

125 lines
6.1 KiB
Python
Raw Normal View History

import json
import re
from typing import Optional, Union
import aiohttp
import discord
import websockets
from pydactyl import PterodactylClient, exceptions
from redbot.core.utils.chat_formatting import pagify
from pterodactyl.config import config
from pterodactyl.logger import logger
from pterodactyl.pterodactyl import Pterodactyl
async def establish_websocket_connection(coginstance: Pterodactyl) -> None:
logger.debug("Establishing WebSocket connection")
base_url = await config.base_url()
api_key = await config.api_key()
server_id = await config.server_id()
try:
client = PterodactylClient(base_url, api_key, debug=True).client
coginstance.client = client
websocket_credentials = client.servers.get_websocket(server_id)
logger.debug("""Websocket connection details retrieved:
Socket: %s
Token: %s...""",
websocket_credentials['data']['socket'],
websocket_credentials['data']['token'][:20]
)
#NOTE - The token is truncated to prevent it from being logged in its entirety, for security reasons
except exceptions.ClientConfigError as e:
return logger.error('Failed to initialize Pterodactyl client: %s', e)
except exceptions.PterodactylApiError as e:
return logger.error('Failed to retrieve Pterodactyl websocket: %s', e)
async with websockets.connect(websocket_credentials['data']['socket'], origin=base_url, ping_timeout=60) as websocket:
logger.info("WebSocket connection established")
auth_message = json.dumps({"event": "auth", "args": [websocket_credentials['data']['token']]})
await websocket.send(auth_message)
logger.debug("Authentication message sent")
coginstance.websocket = websocket
current_status = ''
while True:
message = await websocket.recv()
if json.loads(message)['event'] in ('token expiring', 'token expired'):
logger.debug("Received token expiring/expired event. Refreshing token.")
websocket_credentials = client.servers.get_websocket(server_id)
auth_message = json.dumps({"event": "auth", "args": [websocket_credentials['data']['token']]})
await websocket.send(auth_message)
logger.debug("Authentication message sent")
if json.loads(message)['event'] == 'auth success':
logger.info("WebSocket authentication successful")
if json.loads(message)['event'] == 'console output' and await config.console_channel() is not None:
if current_status in ('running', 'offline', ''):
content = remove_ansi_escape_codes(json.loads(message)['args'][0])
channel = coginstance.bot.get_channel(await config.console_channel())
if channel is not None:
if content.startswith('['):
pagified_content = pagify(content, delims=[" ", "\n"])
for page in pagified_content:
await channel.send(content=page)
chat_message = await check_if_chat_message(content)
if chat_message:
info = await get_info(chat_message['username'])
if info is not None:
await send_chat_discord(coginstance, chat_message['username'], chat_message['message'], info['data']['player']['avatar'])
else:
await send_chat_discord(coginstance, chat_message['username'], chat_message['message'], 'https://seafsh.cc/u/j3AzqQ.png')
if json.loads(message)['event'] == 'status':
current_status = json.loads(message)['args'][0]
if await config.console_channel() is not None:
console = coginstance.bot.get_channel(await config.console_channel())
if console is not None:
await console.send(f"Server status changed! `{json.loads(message)['args'][0]}`")
def remove_ansi_escape_codes(text: str) -> str:
ansi_escape = re.compile(r'\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])')
#NOTE - https://chat.openai.com/share/d92f9acf-d776-4fd6-a53f-b14ac15dd540
return ansi_escape.sub('', text)
async def check_if_chat_message(text: str) -> Union[bool, dict]:
logger.debug("Checking if message is a chat message")
regex = await config.chat_regex()
match: Optional[re.Match[str]] = re.match(regex, text)
if match:
groups = {"time": match.group(1), "username": match.group(2), "message": match.group(3)}
logger.debug("Message is a chat message\n%s", json.dumps(groups))
return groups
logger.debug("Message is not a chat message")
return False
async def get_info(username: str) -> Optional[dict]:
logger.debug("Retrieving player info for %s", username)
endpoint = await config.api_endpoint()
async with aiohttp.ClientSession() as session:
async with session.get(f"https://playerdb.co/api/player/{endpoint}/{username}") as response:
if response.status == 200:
logger.debug("Player info retrieved for %s\n%s", username, json.dumps(await response.json()))
return await response.json()
logger.error("Failed to retrieve player info for %s: %s", username, response.status)
return None
async def send_chat_discord(coginstance: Pterodactyl, username: str, message: str, avatar_url: str) -> None:
logger.debug("Sending chat message to Discord")
channel = coginstance.bot.get_channel(await config.chat_channel())
if channel is not None:
webhooks = await channel.webhooks()
webhook = discord.utils.get(webhooks, name="Pterodactyl Chat")
if webhook is None:
webhook = await channel.create_webhook(name="Pterodactyl Chat")
await webhook.send(content=message, username=username, avatar_url=avatar_url)
logger.debug("Chat message sent to Discord")
else:
logger.debug("Chat channel not set. Skipping sending chat message to Discord")