forked from cswimr/SeaCogs
212 lines
11 KiB
Python
212 lines
11 KiB
Python
import json
|
|
import logging
|
|
import re
|
|
from typing import Optional, Union
|
|
|
|
import aiohttp
|
|
import discord
|
|
import websockets
|
|
from pydactyl import PterodactylClient, exceptions
|
|
from redbot.core import Config, commands
|
|
from redbot.core.bot import Red
|
|
|
|
|
|
class Pterodactyl(commands.Cog):
|
|
"""Pterodactyl allows you to manage your Pterodactyl Panel from Discord."""
|
|
|
|
def __init__(self, bot: Red):
|
|
self.bot = bot
|
|
self.config = Config.get_conf(self, identifier=457581387213637448123567, force_registration=True)
|
|
self.config.register_global(
|
|
base_url=None,
|
|
api_key=None,
|
|
server_id=None,
|
|
console_channel=None,
|
|
startup_jar=None,
|
|
startup_arguments=None,
|
|
power_action_in_progress=False,
|
|
chat_regex=r"\[(\d{2}:\d{2}:\d{2})\sINFO\]:\s<(\w+)>\s(.*)",
|
|
api_endpoint="minecraft",
|
|
chat_channel=None
|
|
)
|
|
self.logger = logging.getLogger('red.sea.pterodactyl')
|
|
self.client = None
|
|
self.task = None
|
|
self.websocket = None
|
|
|
|
async def establish_websocket_connection(self):
|
|
self.logger.debug("Establishing WebSocket connection")
|
|
base_url = await self.config.base_url()
|
|
api_key = await self.config.api_key()
|
|
server_id = await self.config.server_id()
|
|
|
|
try:
|
|
client = PterodactylClient(base_url, api_key, debug=True).client
|
|
self.client = client
|
|
websocket_credentials = client.servers.get_websocket(server_id)
|
|
self.logger.debug("""Websocket connection details retrieved:
|
|
Socket: %s
|
|
Token: %s...""",
|
|
websocket_credentials['data']['socket'],
|
|
websocket_credentials['data']['token'][:20]
|
|
)
|
|
#NOTE - The token is truncated to prevent it from being logged in its entirety, for security reasons
|
|
except exceptions.ClientConfigError as e:
|
|
self.logger.error('Failed to initialize Pterodactyl client: %s', e)
|
|
return
|
|
except exceptions.PterodactylApiError as e:
|
|
self.logger.error('Failed to retrieve Pterodactyl websocket: %s', e)
|
|
return
|
|
|
|
async for websocket in websockets.connect(websocket_credentials['data']['socket'], origin=base_url, ping_timeout=60):
|
|
try:
|
|
self.logger.debug("WebSocket connection established")
|
|
|
|
auth_message = json.dumps({"event": "auth", "args": [websocket_credentials['data']['token']]})
|
|
await websocket.send(auth_message)
|
|
self.logger.debug("Authentication message sent")
|
|
|
|
self.websocket = websocket
|
|
current_status = ''
|
|
|
|
while True:
|
|
message = await websocket.recv()
|
|
if json.loads(message)['event'] in ['token expiring', 'token expired']:
|
|
self.logger.debug("Received token expiring/expired event. Refreshing token.")
|
|
websocket_credentials = client.servers.get_websocket(server_id)
|
|
auth_message = json.dumps({"event": "auth", "args": [websocket_credentials['data']['token']]})
|
|
await websocket.send(auth_message)
|
|
self.logger.debug("Authentication message sent")
|
|
|
|
if json.loads(message)['event'] == 'auth success':
|
|
self.logger.debug("Authentication successful")
|
|
|
|
if json.loads(message)['event'] == 'console output' and await self.config.console_channel() is not None:
|
|
if current_status == 'running' or current_status == 'offline' or current_status == '':
|
|
channel = self.bot.get_channel(await self.config.console_channel())
|
|
if channel is not None:
|
|
content = self.remove_ansi_escape_codes(json.loads(message)['args'][0][:1900])
|
|
if content.startswith('['):
|
|
await channel.send(content=content)
|
|
#TODO - Add pagification for long messages to prevent Discord API errors
|
|
chat_message = await self.check_if_chat_message(content)
|
|
if chat_message:
|
|
info = await self.get_info(chat_message['username'])
|
|
if info is not None:
|
|
await self.send_chat_discord(chat_message['username'], chat_message['message'], info['data']['player']['avatar'])
|
|
|
|
if json.loads(message)['event'] == 'status':
|
|
current_status = json.loads(message)['args'][0]
|
|
console = self.bot.get_channel(await self.config.console_channel())
|
|
if console is not None:
|
|
await console.send(f"Server status changed! `{json.loads(message)['args'][0]}`")
|
|
except websockets.exceptions.ConnectionClosed as e:
|
|
self.logger.debug("WebSocket connection closed: %s", e)
|
|
websocket_credentials = client.servers.get_websocket(server_id)
|
|
continue
|
|
|
|
def remove_ansi_escape_codes(self, text: str) -> str:
|
|
ansi_escape = re.compile(r'\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])')
|
|
#NOTE - https://chat.openai.com/share/d92f9acf-d776-4fd6-a53f-b14ac15dd540
|
|
return ansi_escape.sub('', text)
|
|
|
|
async def check_if_chat_message(self, text: str) -> Union[bool, dict]:
|
|
self.logger.debug("Checking if message is a chat message")
|
|
regex = await self.config.chat_regex()
|
|
match: Optional[re.Match[str]] = re.match(regex, text)
|
|
if match:
|
|
dict = {"time": match.group(1), "username": match.group(2), "message": match.group(3)}
|
|
self.logger.debug("Message is a chat message\n%s", json.dumps(dict))
|
|
return dict
|
|
self.logger.debug("Message is not a chat message")
|
|
return False
|
|
|
|
async def get_info(self, username: str) -> Optional[dict]:
|
|
self.logger.debug("Retrieving player info for %s", username)
|
|
endpoint = await self.config.api_endpoint()
|
|
async with aiohttp.ClientSession() as session:
|
|
async with session.get(f"https://playerdb.co/api/player/{endpoint}/{username}") as response:
|
|
if response.status == 200:
|
|
self.logger.debug("Player info retrieved for %s\n%s", username, json.dumps(await response.json()))
|
|
return await response.json()
|
|
else:
|
|
self.logger.error("Failed to retrieve player info for %s: %s", username, response.status)
|
|
return None
|
|
|
|
async def send_chat_discord(self, username: str, message: str, avatar_url: str) -> None:
|
|
self.logger.debug("Sending chat message to Discord")
|
|
channel = self.bot.get_channel(await self.config.chat_channel())
|
|
if channel is not None:
|
|
webhooks = await channel.webhooks()
|
|
webhook = discord.utils.get(webhooks, name="Pterodactyl Chat")
|
|
if webhook is None:
|
|
webhook = await channel.create_webhook(name="Pterodactyl Chat")
|
|
await webhook.send(content=message, username=username, avatar_url=avatar_url)
|
|
self.logger.debug("Chat message sent to Discord")
|
|
else:
|
|
self.logger.debug("Chat channel not set. Skipping sending chat message to Discord")
|
|
|
|
def get_task(self):
|
|
return self.bot.loop.create_task(self.establish_websocket_connection(), name="Pterodactyl Websocket Connection")
|
|
|
|
async def cog_load(self):
|
|
self.task = self.get_task()
|
|
|
|
async def cog_unload(self):
|
|
self.task.cancel()
|
|
await self.client._session.close() # pylint: disable=protected-access
|
|
|
|
@commands.Cog.listener()
|
|
async def on_message(self, message: discord.Message):
|
|
if message.channel.id == await self.config.console_channel() and message.author.id != self.bot.user.id:
|
|
await message.channel.send(f"Received message from {message.author.id}: {message.content}")
|
|
await self.websocket.send(json.dumps({"event": "send command", "args": [message.content]}))
|
|
if message.channel.id == await self.config.chat_channel() and message.author.id != self.bot.user.id:
|
|
await self.websocket.send(json.dumps({"event": "send command", "args": [f"""tellraw @a ["",{"text":"{message.author.display_name} (DISCORD): ","color":"blue"},{"text":"{message.content}","color":"white"}]"""]}))
|
|
|
|
@commands.group(autohelp = True, name = "pterodactyl", aliases = ["ptero"])
|
|
async def pterodactyl(self, ctx: commands.Context):
|
|
"""Pterodactyl allows you to manage your Pterodactyl Panel from Discord."""
|
|
|
|
@pterodactyl.group(autohelp = True, name = "config", aliases = ["settings", "set"])
|
|
async def pterodactyl_config(self, ctx: commands.Context):
|
|
"""Configure Pterodactyl settings."""
|
|
|
|
@pterodactyl_config.command(name = "url")
|
|
async def pterodactyl_config_base_url(self, ctx: commands.Context, base_url: str):
|
|
"""Set the base URL of your Pterodactyl Panel. Please include the protocol (http/https)."""
|
|
await self.config.base_url.set(base_url)
|
|
await ctx.send(f"Base URL set to {base_url}")
|
|
self.logger.debug("Configuration value set: base_url = %s\nRestarting task...", base_url)
|
|
self.task.cancel()
|
|
self.task = self.get_task()
|
|
|
|
@pterodactyl_config.command(name = "apikey")
|
|
async def pterodactyl_config_api_key(self, ctx: commands.Context, api_key: str):
|
|
"""Set the API key for your Pterodactyl Panel."""
|
|
await self.config.api_key.set(api_key)
|
|
await ctx.send(f"API key set to `{api_key[:5]}...{api_key[-4:]}`")
|
|
self.logger.debug("Configuration value set: api_key = %s\nRestarting task...", api_key)
|
|
self.task.cancel()
|
|
self.task = self.get_task()
|
|
|
|
@pterodactyl_config.command(name = "serverid")
|
|
async def pterodactyl_config_server_id(self, ctx: commands.Context, server_id: str):
|
|
"""Set the server ID for your Pterodactyl Panel."""
|
|
await self.config.server_id.set(server_id)
|
|
await ctx.send(f"Server ID set to {server_id}")
|
|
self.logger.debug("Configuration value set: server_id = %s\nRestarting task...", server_id)
|
|
self.task.cancel()
|
|
self.task = self.get_task()
|
|
|
|
@pterodactyl_config.command(name = "consolechannel")
|
|
async def pterodactyl_config_console_channel(self, ctx: commands.Context, channel: discord.TextChannel):
|
|
"""Set the channel to send console output to."""
|
|
await self.config.console_channel.set(channel.id)
|
|
await ctx.send(f"Console channel set to {channel.mention}")
|
|
|
|
@pterodactyl_config.command(name = "chatchannel")
|
|
async def pterodactyl_config_chat_channel(self, ctx: commands.Context, channel: discord.TextChannel):
|
|
"""Set the channel to send chat output to."""
|
|
await self.config.chat_channel.set(channel.id)
|
|
await ctx.send(f"Chat channel set to {channel.mention}")
|