159 lines
8.1 KiB
Python
159 lines
8.1 KiB
Python
# pylint: disable=cyclic-import
|
|
import json
|
|
import re
|
|
from typing import Optional, Union
|
|
|
|
import aiohttp
|
|
import discord
|
|
import websockets
|
|
from pydactyl import PterodactylClient, exceptions
|
|
from redbot.core.utils.chat_formatting import pagify
|
|
|
|
from pterodactyl.config import config
|
|
from pterodactyl.logger import logger
|
|
from pterodactyl.pterodactyl import Pterodactyl
|
|
|
|
|
|
async def establish_websocket_connection(coginstance: Pterodactyl) -> None:
|
|
base_url = await config.base_url()
|
|
|
|
logger.debug("Establishing WebSocket connection")
|
|
|
|
websocket_credentials = await retrieve_websocket_credentials(coginstance)
|
|
|
|
async with websockets.connect(websocket_credentials['data']['socket'], origin=base_url, ping_timeout=60) as websocket:
|
|
logger.info("WebSocket connection established")
|
|
|
|
auth_message = json.dumps({"event": "auth", "args": [websocket_credentials['data']['token']]})
|
|
await websocket.send(auth_message)
|
|
logger.debug("Authentication message sent")
|
|
|
|
coginstance.websocket = websocket
|
|
|
|
while True: # pylint: disable=too-many-nested-blocks
|
|
message = await websocket.recv()
|
|
if json.loads(message)['event'] in ('token expiring', 'token expired'):
|
|
logger.debug("Received token expiring/expired event. Refreshing token.")
|
|
websocket_credentials = await retrieve_websocket_credentials(coginstance)
|
|
auth_message = json.dumps({"event": "auth", "args": [websocket_credentials['data']['token']]})
|
|
await websocket.send(auth_message)
|
|
logger.debug("Authentication message sent")
|
|
|
|
if json.loads(message)['event'] == 'auth success':
|
|
logger.info("WebSocket authentication successful")
|
|
|
|
if json.loads(message)['event'] == 'console output' and await config.console_channel() is not None:
|
|
if await config.current_status() in ('running', 'offline', ''):
|
|
content = remove_ansi_escape_codes(json.loads(message)['args'][0])
|
|
|
|
channel = coginstance.bot.get_channel(await config.console_channel())
|
|
if channel is not None:
|
|
if content.startswith('['):
|
|
pagified_content = pagify(content, delims=[" ", "\n"])
|
|
for page in pagified_content:
|
|
await channel.send(content=page)
|
|
|
|
chat_message = await check_if_chat_message(content)
|
|
if chat_message:
|
|
info = await get_info(chat_message['username'])
|
|
if info is not None:
|
|
await send_chat_discord(coginstance, chat_message['username'], chat_message['message'], info['data']['player']['avatar'])
|
|
else:
|
|
await send_chat_discord(coginstance, chat_message['username'], chat_message['message'], 'https://seafsh.cc/u/j3AzqQ.png')
|
|
|
|
server_message = await check_if_server_message(content)
|
|
if server_message:
|
|
channel = coginstance.bot.get_channel(await config.chat_channel())
|
|
if channel is not None:
|
|
await channel.send(server_message if len(server_message) < 2000 else server_message[:1997] + '...')
|
|
|
|
if json.loads(message)['event'] == 'status':
|
|
old_status = await config.current_status()
|
|
current_status = json.loads(message)['args'][0]
|
|
if old_status != current_status:
|
|
await config.current_status.set(current_status)
|
|
if await config.console_channel() is not None:
|
|
console = coginstance.bot.get_channel(await config.console_channel())
|
|
if console is not None:
|
|
await console.send(f"Server status changed! `{current_status}`")
|
|
if await config.chat_channel() is not None:
|
|
if current_status == 'running' and await config.startup_msg() is not None:
|
|
chat = coginstance.bot.get_channel(await config.chat_channel())
|
|
if chat is not None:
|
|
await chat.send(await config.startup_msg())
|
|
if current_status == 'stopping' and await config.shutdown_msg() is not None:
|
|
chat = coginstance.bot.get_channel(await config.chat_channel())
|
|
if chat is not None:
|
|
await chat.send(await config.shutdown_msg())
|
|
|
|
async def retrieve_websocket_credentials(coginstance: Pterodactyl) -> Optional[dict]:
|
|
base_url = await config.base_url()
|
|
api_key = await config.api_key()
|
|
server_id = await config.server_id()
|
|
|
|
try:
|
|
client = PterodactylClient(base_url, api_key, debug=True).client
|
|
coginstance.client = client
|
|
websocket_credentials = client.servers.get_websocket(server_id)
|
|
logger.debug("""Websocket connection details retrieved:
|
|
Socket: %s
|
|
Token: %s...""",
|
|
websocket_credentials['data']['socket'],
|
|
websocket_credentials['data']['token'][:20]
|
|
)
|
|
return websocket_credentials
|
|
#NOTE - The token is truncated to prevent it from being logged in its entirety, for security reasons
|
|
except exceptions.ClientConfigError as e:
|
|
return logger.error('Failed to initialize Pterodactyl client: %s', e)
|
|
except exceptions.PterodactylApiError as e:
|
|
return logger.error('Failed to retrieve Pterodactyl websocket: %s', e)
|
|
|
|
def remove_ansi_escape_codes(text: str) -> str:
|
|
ansi_escape = re.compile(r'\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])')
|
|
#NOTE - https://chat.openai.com/share/d92f9acf-d776-4fd6-a53f-b14ac15dd540
|
|
return ansi_escape.sub('', text)
|
|
|
|
async def check_if_server_message(text: str) -> Union[bool, str]:
|
|
logger.debug("Checking if message is a server message")
|
|
regex = await config.server_regex()
|
|
match: Optional[re.Match[str]] = re.match(regex, text)
|
|
if match:
|
|
logger.debug("Message is a server message")
|
|
return match.group(1)
|
|
logger.debug("Message is not a server message")
|
|
return False
|
|
|
|
async def check_if_chat_message(text: str) -> Union[bool, dict]:
|
|
logger.debug("Checking if message is a chat message")
|
|
regex = await config.chat_regex()
|
|
match: Optional[re.Match[str]] = re.match(regex, text)
|
|
if match:
|
|
groups = {"time": match.group(1), "username": match.group(2), "message": match.group(3)}
|
|
logger.debug("Message is a chat message\n%s", json.dumps(groups))
|
|
return groups
|
|
logger.debug("Message is not a chat message")
|
|
return False
|
|
|
|
async def get_info(username: str) -> Optional[dict]:
|
|
logger.debug("Retrieving player info for %s", username)
|
|
endpoint = await config.api_endpoint()
|
|
async with aiohttp.ClientSession() as session:
|
|
async with session.get(f"https://playerdb.co/api/player/{endpoint}/{username}") as response:
|
|
if response.status == 200:
|
|
logger.debug("Player info retrieved for %s\n%s", username, json.dumps(await response.json()))
|
|
return await response.json()
|
|
logger.error("Failed to retrieve player info for %s: %s", username, response.status)
|
|
return None
|
|
|
|
async def send_chat_discord(coginstance: Pterodactyl, username: str, message: str, avatar_url: str) -> None:
|
|
logger.debug("Sending chat message to Discord")
|
|
channel = coginstance.bot.get_channel(await config.chat_channel())
|
|
if channel is not None:
|
|
webhooks = await channel.webhooks()
|
|
webhook = discord.utils.get(webhooks, name="Pterodactyl Chat")
|
|
if webhook is None:
|
|
webhook = await channel.create_webhook(name="Pterodactyl Chat")
|
|
await webhook.send(content=message, username=username, avatar_url=avatar_url)
|
|
logger.debug("Chat message sent to Discord")
|
|
else:
|
|
logger.debug("Chat channel not set. Skipping sending chat message to Discord")
|