SeaCogs/pterodactyl/pterodactyl.py

281 lines
15 KiB
Python

import json
import logging
import re
from typing import Optional, Union
import aiohttp
import discord
import websockets
from pydactyl import PterodactylClient, exceptions
from redbot.core import Config, commands
from redbot.core.bot import Red
from redbot.core.utils.chat_formatting import box, pagify
class Pterodactyl(commands.Cog):
"""Pterodactyl allows you to manage your Pterodactyl Panel from Discord."""
def __init__(self, bot: Red):
self.bot = bot
self.config = Config.get_conf(self, identifier=457581387213637448123567, force_registration=True)
self.config.register_global(
base_url=None,
api_key=None,
server_id=None,
console_channel=None,
startup_jar=None,
startup_arguments=None,
power_action_in_progress=False,
chat_regex=r"\[(\d{2}:\d{2}:\d{2})\sINFO\]:\s<(\w+)>\s(.*)",
tellraw_json='tellraw @a ["",{{"text":".$U ","color":".$C"}},{{"text":" (DISCORD): ","color":"blue"}},{{"text":".$M","color":"white"}}]',
api_endpoint="minecraft",
chat_channel=None
)
self.logger = logging.getLogger('red.sea.pterodactyl')
self.client = None
self.task = None
self.websocket = None
async def establish_websocket_connection(self) -> None:
self.logger.debug("Establishing WebSocket connection")
base_url = await self.config.base_url()
api_key = await self.config.api_key()
server_id = await self.config.server_id()
try:
client = PterodactylClient(base_url, api_key, debug=True).client
self.client = client
websocket_credentials = client.servers.get_websocket(server_id)
self.logger.debug("""Websocket connection details retrieved:
Socket: %s
Token: %s...""",
websocket_credentials['data']['socket'],
websocket_credentials['data']['token'][:20]
)
#NOTE - The token is truncated to prevent it from being logged in its entirety, for security reasons
except exceptions.ClientConfigError as e:
return self.logger.error('Failed to initialize Pterodactyl client: %s', e)
except exceptions.PterodactylApiError as e:
return self.logger.error('Failed to retrieve Pterodactyl websocket: %s', e)
async for websocket in websockets.connect(websocket_credentials['data']['socket'], origin=base_url, ping_timeout=60):
try:
self.logger.info("WebSocket connection established")
auth_message = json.dumps({"event": "auth", "args": [websocket_credentials['data']['token']]})
await websocket.send(auth_message)
self.logger.debug("Authentication message sent")
self.websocket = websocket
current_status = ''
while True:
message = await websocket.recv()
if json.loads(message)['event'] in ['token expiring', 'token expired']:
self.logger.debug("Received token expiring/expired event. Refreshing token.")
websocket_credentials = client.servers.get_websocket(server_id)
auth_message = json.dumps({"event": "auth", "args": [websocket_credentials['data']['token']]})
await websocket.send(auth_message)
self.logger.debug("Authentication message sent")
if json.loads(message)['event'] == 'auth success':
self.logger.info("WebSocket authentication successful")
if json.loads(message)['event'] == 'console output' and await self.config.console_channel() is not None:
if current_status == 'running' or current_status == 'offline' or current_status == '':
channel = self.bot.get_channel(await self.config.console_channel())
if channel is not None:
content = self.remove_ansi_escape_codes(json.loads(message)['args'][0][:1900])
if content.startswith('['):
content = pagify(content, delims=[" ", "\n"])
for page in content:
await channel.send(content=page)
chat_message = await self.check_if_chat_message(content)
if chat_message:
info = await self.get_info(chat_message['username'])
if info is not None:
await self.send_chat_discord(chat_message['username'], chat_message['message'], info['data']['player']['avatar'])
else:
await self.send_chat_discord(chat_message['username'], chat_message['message'], 'https://seafsh.cc/u/j3AzqQ.png')
if json.loads(message)['event'] == 'status':
current_status = json.loads(message)['args'][0]
if await self.config.console_channel() is not None:
console = self.bot.get_channel(await self.config.console_channel())
if console is not None:
await console.send(f"Server status changed! `{json.loads(message)['args'][0]}`")
except websockets.exceptions.ConnectionClosed as e:
self.logger.info("WebSocket connection closed: %s", e)
websocket_credentials = client.servers.get_websocket(server_id)
continue
def remove_ansi_escape_codes(self, text: str) -> str:
ansi_escape = re.compile(r'\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])')
#NOTE - https://chat.openai.com/share/d92f9acf-d776-4fd6-a53f-b14ac15dd540
return ansi_escape.sub('', text)
async def check_if_chat_message(self, text: str) -> Union[bool, dict]:
self.logger.debug("Checking if message is a chat message")
regex = await self.config.chat_regex()
self.logger.debug(regex)
self.logger.debug('1')
match: Optional[re.Match[str]] = re.match(regex, text)
self.logger.debug('2')
if match:
self.logger.debug('3')
dict = {"time": match.group(1), "username": match.group(2), "message": match.group(3)}
self.logger.debug('4')
self.logger.debug("Message is a chat message\n%s", json.dumps(dict))
self.logger.debug('5')
return dict
self.logger.debug('6')
self.logger.debug("Message is not a chat message")
self.logger.debug('7')
return False
async def get_info(self, username: str) -> Optional[dict]:
self.logger.debug("Retrieving player info for %s", username)
endpoint = await self.config.api_endpoint()
async with aiohttp.ClientSession() as session:
async with session.get(f"https://playerdb.co/api/player/{endpoint}/{username}") as response:
if response.status == 200:
self.logger.debug("Player info retrieved for %s\n%s", username, json.dumps(await response.json()))
return await response.json()
else:
self.logger.error("Failed to retrieve player info for %s: %s", username, response.status)
return None
async def send_chat_discord(self, username: str, message: str, avatar_url: str) -> None:
self.logger.debug("Sending chat message to Discord")
channel = self.bot.get_channel(await self.config.chat_channel())
if channel is not None:
webhooks = await channel.webhooks()
webhook = discord.utils.get(webhooks, name="Pterodactyl Chat")
if webhook is None:
webhook = await channel.create_webhook(name="Pterodactyl Chat")
await webhook.send(content=message, username=username, avatar_url=avatar_url)
self.logger.debug("Chat message sent to Discord")
else:
self.logger.debug("Chat channel not set. Skipping sending chat message to Discord")
async def get_tellraw_string(self, username: str, message: str, color: discord.Color):
tellraw = await self.config.tellraw_json()
self.logger.debug("Generating tellraw string:\n%s", tellraw)
tellraw = tellraw.replace(".$U", username).replace(".$M", message).replace(".$C", str(color))
self.logger.debug("Tellraw string generated:\n%s", tellraw)
return tellraw
def get_task(self):
return self.bot.loop.create_task(self.establish_websocket_connection(), name="Pterodactyl Websocket Connection")
async def cog_load(self):
self.task = self.get_task()
async def cog_unload(self):
self.task.cancel()
await self.client._session.close() # pylint: disable=protected-access
@commands.Cog.listener()
async def on_message(self, message: discord.Message):
if message.channel.id == await self.config.console_channel() and message.author.bot is False:
self.logger.debug("Received console command from %s: %s", message.author.id, message.content)
await message.channel.send(f"Received console command from {message.author.id}: {message.content[:1900]}")
try:
await self.websocket.send(json.dumps({"event": "send command", "args": [message.content]}))
except websockets.exceptions.ConnectionClosed as e:
self.logger.error("WebSocket connection closed: %s", e)
self.task.cancel()
self.task = self.get_task()
if message.channel.id == await self.config.chat_channel() and message.author.bot is False:
self.logger.debug("Received chat message from %s: %s", message.author.id, message.content)
channel = self.bot.get_channel(await self.config.console_channel())
if channel:
await channel.send(f"Received chat message from {message.author.id}: {message.content[:1900]}")
msg = json.dumps({"event": "send command", "args": [await self.get_tellraw_string(message.author.display_name, message.content, message.author.color)]})
self.logger.debug("Sending chat message to server:\n%s", msg)
try:
await self.websocket.send(msg)
except websockets.exceptions.ConnectionClosed as e:
self.logger.error("WebSocket connection closed: %s", e)
self.task.cancel()
self.task = self.get_task()
@commands.group(autohelp = True, name = "pterodactyl", aliases = ["ptero"])
async def pterodactyl(self, ctx: commands.Context):
"""Pterodactyl allows you to manage your Pterodactyl Panel from Discord."""
@pterodactyl.group(autohelp = True, name = "config", aliases = ["settings", "set"])
async def pterodactyl_config(self, ctx: commands.Context):
"""Configure Pterodactyl settings."""
@pterodactyl_config.command(name = "url")
async def pterodactyl_config_base_url(self, ctx: commands.Context, *, base_url: str = None) -> None:
"""Set the base URL of your Pterodactyl Panel. Please include the protocol (http/https)."""
if base_url is None:
base_url = await self.config.base_url()
return await ctx.send(f"Base URL is currently set to {base_url}")
await self.config.base_url.set(base_url)
await ctx.send(f"Base URL set to {base_url}")
self.logger.info("Configuration value set: base_url = %s\nRestarting task...", base_url)
self.task.cancel()
self.task = self.get_task()
@pterodactyl_config.command(name = "apikey")
async def pterodactyl_config_api_key(self, ctx: commands.Context, *, api_key: str) -> None:
"""Set the API key for your Pterodactyl Panel."""
await self.config.api_key.set(api_key)
await ctx.send(f"API key set to `{api_key[:5]}...{api_key[-4:]}`")
self.logger.info("Configuration value set: api_key = %s\nRestarting task...", api_key)
self.task.cancel()
self.task = self.get_task()
@pterodactyl_config.command(name = "serverid")
async def pterodactyl_config_server_id(self, ctx: commands.Context, *, server_id: str) -> None:
"""Set the server ID for your Pterodactyl Panel."""
await self.config.server_id.set(server_id)
await ctx.send(f"Server ID set to {server_id}")
self.logger.info("Configuration value set: server_id = %s\nRestarting task...", server_id)
self.task.cancel()
self.task = self.get_task()
@pterodactyl_config.command(name = "consolechannel")
async def pterodactyl_config_console_channel(self, ctx: commands.Context, channel: discord.TextChannel) -> None:
"""Set the channel to send console output to."""
await self.config.console_channel.set(channel.id)
await ctx.send(f"Console channel set to {channel.mention}")
@pterodactyl_config.group(name = "chat")
async def pterodactyl_config_chat(self, ctx: commands.Context):
"""Configure chat settings."""
@pterodactyl_config_chat.command(name = "channel")
async def pterodactyl_config_chat_channel(self, ctx: commands.Context, channel: discord.TextChannel) -> None:
"""Set the channel to send chat output to."""
await self.config.chat_channel.set(channel.id)
await ctx.send(f"Chat channel set to {channel.mention}")
@pterodactyl_config_chat.command(name = "regex")
async def pterodactyl_config_chat_regex(self, ctx: commands.Context, *, regex: str = None) -> None:
"""Set the regex pattern to match chat messages.
See [documentation]() for more information."""
#TODO - fix this link
if regex is None:
regex = await self.config.chat_regex()
return await ctx.send(f"Chat regex is currently set to:\n{box(regex, 'regex')}")
await self.config.chat_regex.set(regex)
await ctx.send(f"Chat regex set to:\n{box(regex, 'regex')}")
@pterodactyl_config_chat.command(name = "tellraw")
async def pterodactyl_config_chat_tellraw(self, ctx: commands.Context, *, tellraw: str = None) -> None:
"""Set the tellraw JSON to send chat messages to Discord.
Required placeholders: `.$U` (username), `.$M` (message), `.$C` (color)
See [documentation]() for more information."""
#TODO - fix this link
if tellraw is None:
tellraw = await self.config.tellraw_json()
return await ctx.send(f"Tellraw JSON is currently set to:\n{box(tellraw, 'json')}")
await self.config.tellraw_json.set(tellraw)
await ctx.send(f"Tellraw JSON set to:\n{box(tellraw, 'json')}")