SeaCogs/pterodactyl/pterodactyl.py

222 lines
12 KiB
Python

import json
import logging
import re
from typing import Optional, Union
import aiohttp
import discord
import websockets
from pydactyl import PterodactylClient, exceptions
from redbot.core import Config, commands
from redbot.core.bot import Red
class Pterodactyl(commands.Cog):
"""Pterodactyl allows you to manage your Pterodactyl Panel from Discord."""
def __init__(self, bot: Red):
self.bot = bot
self.config = Config.get_conf(self, identifier=457581387213637448123567, force_registration=True)
self.config.register_global(
base_url=None,
api_key=None,
server_id=None,
console_channel=None,
startup_jar=None,
startup_arguments=None,
power_action_in_progress=False,
chat_regex=r"\[(\d{2}:\d{2}:\d{2})\sINFO\]:\s<(\w+)>\s(.*)",
api_endpoint="minecraft",
chat_channel=None
)
self.logger = logging.getLogger('red.sea.pterodactyl')
self.client = None
self.task = None
self.websocket = None
async def establish_websocket_connection(self):
self.logger.debug("Establishing WebSocket connection")
base_url = await self.config.base_url()
api_key = await self.config.api_key()
server_id = await self.config.server_id()
try:
client = PterodactylClient(base_url, api_key, debug=True).client
self.client = client
websocket_credentials = client.servers.get_websocket(server_id)
self.logger.debug("""Websocket connection details retrieved:
Socket: %s
Token: %s...""",
websocket_credentials['data']['socket'],
websocket_credentials['data']['token'][:20]
)
#NOTE - The token is truncated to prevent it from being logged in its entirety, for security reasons
except exceptions.ClientConfigError as e:
self.logger.error('Failed to initialize Pterodactyl client: %s', e)
return
except exceptions.PterodactylApiError as e:
self.logger.error('Failed to retrieve Pterodactyl websocket: %s', e)
return
async for websocket in websockets.connect(websocket_credentials['data']['socket'], origin=base_url, ping_timeout=60):
try:
self.logger.debug("WebSocket connection established")
auth_message = json.dumps({"event": "auth", "args": [websocket_credentials['data']['token']]})
await websocket.send(auth_message)
self.logger.debug("Authentication message sent")
self.websocket = websocket
current_status = ''
while True:
message = await websocket.recv()
if json.loads(message)['event'] in ['token expiring', 'token expired']:
self.logger.debug("Received token expiring/expired event. Refreshing token.")
websocket_credentials = client.servers.get_websocket(server_id)
auth_message = json.dumps({"event": "auth", "args": [websocket_credentials['data']['token']]})
await websocket.send(auth_message)
self.logger.debug("Authentication message sent")
if json.loads(message)['event'] == 'auth success':
self.logger.debug("Authentication successful")
if json.loads(message)['event'] == 'console output' and await self.config.console_channel() is not None:
if current_status == 'running' or current_status == 'offline' or current_status == '':
channel = self.bot.get_channel(await self.config.console_channel())
if channel is not None:
content = self.remove_ansi_escape_codes(json.loads(message)['args'][0][:1900])
if content.startswith('['):
await channel.send(content=content)
#TODO - Add pagification for long messages to prevent Discord API errors
chat_message = await self.check_if_chat_message(content)
if chat_message:
info = await self.get_info(chat_message['username'])
if info is not None:
await self.send_chat_discord(chat_message['username'], chat_message['message'], info['data']['player']['avatar'])
if json.loads(message)['event'] == 'status':
current_status = json.loads(message)['args'][0]
console = self.bot.get_channel(await self.config.console_channel())
if console is not None:
await console.send(f"Server status changed! `{json.loads(message)['args'][0]}`")
except websockets.exceptions.ConnectionClosed as e:
self.logger.debug("WebSocket connection closed: %s", e)
websocket_credentials = client.servers.get_websocket(server_id)
continue
def remove_ansi_escape_codes(self, text: str) -> str:
ansi_escape = re.compile(r'\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])')
#NOTE - https://chat.openai.com/share/d92f9acf-d776-4fd6-a53f-b14ac15dd540
return ansi_escape.sub('', text)
async def check_if_chat_message(self, text: str) -> Union[bool, dict]:
self.logger.debug("Checking if message is a chat message")
regex = await self.config.chat_regex()
match: Optional[re.Match[str]] = re.match(regex, text)
if match:
dict = {"time": match.group(1), "username": match.group(2), "message": match.group(3)}
self.logger.debug("Message is a chat message\n%s", json.dumps(dict))
return dict
self.logger.debug("Message is not a chat message")
return False
async def get_info(self, username: str) -> Optional[dict]:
self.logger.debug("Retrieving player info for %s", username)
endpoint = await self.config.api_endpoint()
async with aiohttp.ClientSession() as session:
async with session.get(f"https://playerdb.co/api/player/{endpoint}/{username}") as response:
if response.status == 200:
self.logger.debug("Player info retrieved for %s\n%s", username, json.dumps(await response.json()))
return await response.json()
else:
self.logger.error("Failed to retrieve player info for %s: %s", username, response.status)
return None
async def send_chat_discord(self, username: str, message: str, avatar_url: str) -> None:
self.logger.debug("Sending chat message to Discord")
channel = self.bot.get_channel(await self.config.chat_channel())
if channel is not None:
webhooks = await channel.webhooks()
webhook = discord.utils.get(webhooks, name="Pterodactyl Chat")
if webhook is None:
webhook = await channel.create_webhook(name="Pterodactyl Chat")
await webhook.send(content=message, username=username, avatar_url=avatar_url)
self.logger.debug("Chat message sent to Discord")
else:
self.logger.debug("Chat channel not set. Skipping sending chat message to Discord")
def get_tellraw_string(self, username: str, message: str):
return f'tellraw @a ["",{{"text":"{username} (DISCORD): ","color":"blue"}},{{"text":"{message}","color":"white"}}]'
def get_task(self):
return self.bot.loop.create_task(self.establish_websocket_connection(), name="Pterodactyl Websocket Connection")
async def cog_load(self):
self.task = self.get_task()
async def cog_unload(self):
self.task.cancel()
await self.client._session.close() # pylint: disable=protected-access
@commands.Cog.listener()
async def on_message(self, message: discord.Message):
if message.channel.id == await self.config.console_channel() and message.author.id != self.bot.user.id:
self.logger.debug("Received console command from %s: %s", message.author.id, message.content)
await message.channel.send(f"Received console command from {message.author.id}: {message.content[:1900]}")
await self.websocket.send(json.dumps({"event": "send command", "args": [message.content]}))
if message.channel.id == await self.config.chat_channel() and message.author.id != self.bot.user.id:
self.logger.debug("Received chat message from %s: %s", message.author.id, message.content)
channel = self.bot.get_channel(await self.config.console_channel())
if channel:
await channel.send(f"Received chat message from {message.author.id}: {message.content[:1900]}")
msg = json.dumps({"event": "send command", "args": [self.get_tellraw_string(message.author.name, message.content)]})
self.logger.debug("Sending chat message to server:\n%s", msg)
await self.websocket.send(msg)
@commands.group(autohelp = True, name = "pterodactyl", aliases = ["ptero"])
async def pterodactyl(self, ctx: commands.Context):
"""Pterodactyl allows you to manage your Pterodactyl Panel from Discord."""
@pterodactyl.group(autohelp = True, name = "config", aliases = ["settings", "set"])
async def pterodactyl_config(self, ctx: commands.Context):
"""Configure Pterodactyl settings."""
@pterodactyl_config.command(name = "url")
async def pterodactyl_config_base_url(self, ctx: commands.Context, base_url: str):
"""Set the base URL of your Pterodactyl Panel. Please include the protocol (http/https)."""
await self.config.base_url.set(base_url)
await ctx.send(f"Base URL set to {base_url}")
self.logger.debug("Configuration value set: base_url = %s\nRestarting task...", base_url)
self.task.cancel()
self.task = self.get_task()
@pterodactyl_config.command(name = "apikey")
async def pterodactyl_config_api_key(self, ctx: commands.Context, api_key: str):
"""Set the API key for your Pterodactyl Panel."""
await self.config.api_key.set(api_key)
await ctx.send(f"API key set to `{api_key[:5]}...{api_key[-4:]}`")
self.logger.debug("Configuration value set: api_key = %s\nRestarting task...", api_key)
self.task.cancel()
self.task = self.get_task()
@pterodactyl_config.command(name = "serverid")
async def pterodactyl_config_server_id(self, ctx: commands.Context, server_id: str):
"""Set the server ID for your Pterodactyl Panel."""
await self.config.server_id.set(server_id)
await ctx.send(f"Server ID set to {server_id}")
self.logger.debug("Configuration value set: server_id = %s\nRestarting task...", server_id)
self.task.cancel()
self.task = self.get_task()
@pterodactyl_config.command(name = "consolechannel")
async def pterodactyl_config_console_channel(self, ctx: commands.Context, channel: discord.TextChannel):
"""Set the channel to send console output to."""
await self.config.console_channel.set(channel.id)
await ctx.send(f"Console channel set to {channel.mention}")
@pterodactyl_config.command(name = "chatchannel")
async def pterodactyl_config_chat_channel(self, ctx: commands.Context, channel: discord.TextChannel):
"""Set the channel to send chat output to."""
await self.config.chat_channel.set(channel.id)
await ctx.send(f"Chat channel set to {channel.mention}")