forked from blizzthewolf/SeaCogs
282 lines
15 KiB
Python
282 lines
15 KiB
Python
import asyncio
|
|
import json
|
|
import logging
|
|
import re
|
|
from typing import Optional, Union
|
|
|
|
import aiohttp
|
|
import discord
|
|
import websockets
|
|
from pydactyl import PterodactylClient, exceptions
|
|
from redbot.core import Config, commands
|
|
from redbot.core.bot import Red
|
|
from redbot.core.utils.chat_formatting import box, pagify
|
|
|
|
|
|
class Pterodactyl(commands.Cog):
|
|
"""Pterodactyl allows you to manage your Pterodactyl Panel from Discord."""
|
|
|
|
def __init__(self, bot: Red):
|
|
self.bot = bot
|
|
self.config = Config.get_conf(self, identifier=457581387213637448123567, force_registration=True)
|
|
self.config.register_global(
|
|
base_url=None,
|
|
api_key=None,
|
|
server_id=None,
|
|
console_channel=None,
|
|
startup_jar=None,
|
|
startup_arguments=None,
|
|
power_action_in_progress=False,
|
|
chat_regex=r"\[(\d{2}:\d{2}:\d{2})\sINFO\]:\s<(\w+)>\s(.*)",
|
|
tellraw_json='tellraw @a ["",{"text":".$U ","color":".$C"},{"text":" (DISCORD): ","color":"blue"},{"text":".$M","color":"white"}]',
|
|
api_endpoint="minecraft",
|
|
chat_channel=None
|
|
)
|
|
self.logger = logging.getLogger('red.sea.pterodactyl')
|
|
self.client = None
|
|
self.task = None
|
|
self.websocket = None
|
|
|
|
async def establish_websocket_connection(self) -> None:
|
|
self.logger.debug("Establishing WebSocket connection")
|
|
base_url = await self.config.base_url()
|
|
api_key = await self.config.api_key()
|
|
server_id = await self.config.server_id()
|
|
|
|
try:
|
|
client = PterodactylClient(base_url, api_key, debug=True).client
|
|
self.client = client
|
|
websocket_credentials = client.servers.get_websocket(server_id)
|
|
self.logger.debug("""Websocket connection details retrieved:
|
|
Socket: %s
|
|
Token: %s...""",
|
|
websocket_credentials['data']['socket'],
|
|
websocket_credentials['data']['token'][:20]
|
|
)
|
|
#NOTE - The token is truncated to prevent it from being logged in its entirety, for security reasons
|
|
except exceptions.ClientConfigError as e:
|
|
return self.logger.error('Failed to initialize Pterodactyl client: %s', e)
|
|
except exceptions.PterodactylApiError as e:
|
|
return self.logger.error('Failed to retrieve Pterodactyl websocket: %s', e)
|
|
|
|
async with websockets.connect(websocket_credentials['data']['socket'], origin=base_url, ping_timeout=60) as websocket:
|
|
self.logger.info("WebSocket connection established")
|
|
|
|
auth_message = json.dumps({"event": "auth", "args": [websocket_credentials['data']['token']]})
|
|
await websocket.send(auth_message)
|
|
self.logger.debug("Authentication message sent")
|
|
|
|
self.websocket = websocket
|
|
current_status = ''
|
|
|
|
while True:
|
|
message = await websocket.recv()
|
|
if json.loads(message)['event'] in ['token expiring', 'token expired']:
|
|
self.logger.debug("Received token expiring/expired event. Refreshing token.")
|
|
websocket_credentials = client.servers.get_websocket(server_id)
|
|
auth_message = json.dumps({"event": "auth", "args": [websocket_credentials['data']['token']]})
|
|
await websocket.send(auth_message)
|
|
self.logger.debug("Authentication message sent")
|
|
|
|
if json.loads(message)['event'] == 'auth success':
|
|
self.logger.info("WebSocket authentication successful")
|
|
|
|
if json.loads(message)['event'] == 'console output' and await self.config.console_channel() is not None:
|
|
if current_status == 'running' or current_status == 'offline' or current_status == '':
|
|
content = self.remove_ansi_escape_codes(json.loads(message)['args'][0])
|
|
|
|
channel = self.bot.get_channel(await self.config.console_channel())
|
|
if channel is not None:
|
|
if content.startswith('['):
|
|
pagified_content = pagify(content, delims=[" ", "\n"])
|
|
for page in pagified_content:
|
|
await channel.send(content=page)
|
|
|
|
chat_message = await self.check_if_chat_message(content)
|
|
if chat_message:
|
|
info = await self.get_info(chat_message['username'])
|
|
if info is not None:
|
|
await self.send_chat_discord(chat_message['username'], chat_message['message'], info['data']['player']['avatar'])
|
|
else:
|
|
await self.send_chat_discord(chat_message['username'], chat_message['message'], 'https://seafsh.cc/u/j3AzqQ.png')
|
|
|
|
if json.loads(message)['event'] == 'status':
|
|
current_status = json.loads(message)['args'][0]
|
|
if await self.config.console_channel() is not None:
|
|
console = self.bot.get_channel(await self.config.console_channel())
|
|
if console is not None:
|
|
await console.send(f"Server status changed! `{json.loads(message)['args'][0]}`")
|
|
|
|
def remove_ansi_escape_codes(self, text: str) -> str:
|
|
ansi_escape = re.compile(r'\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])')
|
|
#NOTE - https://chat.openai.com/share/d92f9acf-d776-4fd6-a53f-b14ac15dd540
|
|
return ansi_escape.sub('', text)
|
|
|
|
async def check_if_chat_message(self, text: str) -> Union[bool, dict]:
|
|
self.logger.debug("Checking if message is a chat message")
|
|
regex = await self.config.chat_regex()
|
|
match: Optional[re.Match[str]] = re.match(regex, text)
|
|
if match:
|
|
dict = {"time": match.group(1), "username": match.group(2), "message": match.group(3)}
|
|
self.logger.debug("Message is a chat message\n%s", json.dumps(dict))
|
|
return dict
|
|
self.logger.debug("Message is not a chat message")
|
|
return False
|
|
|
|
async def get_info(self, username: str) -> Optional[dict]:
|
|
self.logger.debug("Retrieving player info for %s", username)
|
|
endpoint = await self.config.api_endpoint()
|
|
async with aiohttp.ClientSession() as session:
|
|
async with session.get(f"https://playerdb.co/api/player/{endpoint}/{username}") as response:
|
|
if response.status == 200:
|
|
self.logger.debug("Player info retrieved for %s\n%s", username, json.dumps(await response.json()))
|
|
return await response.json()
|
|
else:
|
|
self.logger.error("Failed to retrieve player info for %s: %s", username, response.status)
|
|
return None
|
|
|
|
async def send_chat_discord(self, username: str, message: str, avatar_url: str) -> None:
|
|
self.logger.debug("Sending chat message to Discord")
|
|
channel = self.bot.get_channel(await self.config.chat_channel())
|
|
if channel is not None:
|
|
webhooks = await channel.webhooks()
|
|
webhook = discord.utils.get(webhooks, name="Pterodactyl Chat")
|
|
if webhook is None:
|
|
webhook = await channel.create_webhook(name="Pterodactyl Chat")
|
|
await webhook.send(content=message, username=username, avatar_url=avatar_url)
|
|
self.logger.debug("Chat message sent to Discord")
|
|
else:
|
|
self.logger.debug("Chat channel not set. Skipping sending chat message to Discord")
|
|
|
|
async def get_tellraw_string(self, username: str, message: str, color: discord.Color):
|
|
tellraw = await self.config.tellraw_json()
|
|
self.logger.debug("Generating tellraw string:\n%s", tellraw)
|
|
tellraw = tellraw.replace(".$U", username).replace(".$M", message).replace(".$C", str(color))
|
|
self.logger.debug("Tellraw string generated:\n%s", tellraw)
|
|
return tellraw
|
|
|
|
def get_task(self):
|
|
task = self.bot.loop.create_task(self.establish_websocket_connection(), name="Pterodactyl Websocket Connection")
|
|
task.add_done_callback(self.error_callback)
|
|
return task
|
|
|
|
def error_callback(self, fut): #NOTE - Thanks flame442 and zephyrkul for helping me figure this out
|
|
try:
|
|
fut.result()
|
|
except asyncio.CancelledError:
|
|
pass
|
|
except Exception as e:
|
|
self.logger.error("WebSocket task has failed: %s", e, exc_info=e)
|
|
self.task.cancel()
|
|
self.task = self.get_task()
|
|
|
|
async def cog_load(self):
|
|
self.task = self.get_task()
|
|
|
|
async def cog_unload(self):
|
|
self.task.cancel()
|
|
await self.client._session.close() # pylint: disable=protected-access
|
|
|
|
@commands.Cog.listener()
|
|
async def on_message_without_command(self, message: discord.Message):
|
|
if message.channel.id == await self.config.console_channel() and message.author.bot is False:
|
|
self.logger.debug("Received console command from %s: %s", message.author.id, message.content)
|
|
await message.channel.send(f"Received console command from {message.author.id}: {message.content[:1900]}")
|
|
try:
|
|
await self.websocket.send(json.dumps({"event": "send command", "args": [message.content]}))
|
|
except websockets.exceptions.ConnectionClosed as e:
|
|
self.logger.error("WebSocket connection closed: %s", e)
|
|
self.task.cancel()
|
|
self.task = self.get_task()
|
|
if message.channel.id == await self.config.chat_channel() and message.author.bot is False:
|
|
self.logger.debug("Received chat message from %s: %s", message.author.id, message.content)
|
|
channel = self.bot.get_channel(await self.config.console_channel())
|
|
if channel:
|
|
await channel.send(f"Received chat message from {message.author.id}: {message.content[:1900]}")
|
|
msg = json.dumps({"event": "send command", "args": [await self.get_tellraw_string(message.author.display_name, message.content, message.author.color)]})
|
|
self.logger.debug("Sending chat message to server:\n%s", msg)
|
|
try:
|
|
await self.websocket.send(msg)
|
|
except websockets.exceptions.ConnectionClosed as e:
|
|
self.logger.error("WebSocket connection closed: %s", e)
|
|
self.task.cancel()
|
|
self.task = self.get_task()
|
|
|
|
@commands.group(autohelp = True, name = "pterodactyl", aliases = ["ptero"])
|
|
async def pterodactyl(self, ctx: commands.Context):
|
|
"""Pterodactyl allows you to manage your Pterodactyl Panel from Discord."""
|
|
|
|
@pterodactyl.group(autohelp = True, name = "config", aliases = ["settings", "set"])
|
|
async def pterodactyl_config(self, ctx: commands.Context):
|
|
"""Configure Pterodactyl settings."""
|
|
|
|
@pterodactyl_config.command(name = "url")
|
|
async def pterodactyl_config_base_url(self, ctx: commands.Context, *, base_url: str = None) -> None:
|
|
"""Set the base URL of your Pterodactyl Panel. Please include the protocol (http/https)."""
|
|
if base_url is None:
|
|
base_url = await self.config.base_url()
|
|
return await ctx.send(f"Base URL is currently set to {base_url}")
|
|
await self.config.base_url.set(base_url)
|
|
await ctx.send(f"Base URL set to {base_url}")
|
|
self.logger.info("Configuration value set: base_url = %s\nRestarting task...", base_url)
|
|
self.task.cancel()
|
|
self.task = self.get_task()
|
|
|
|
@pterodactyl_config.command(name = "apikey")
|
|
async def pterodactyl_config_api_key(self, ctx: commands.Context, *, api_key: str) -> None:
|
|
"""Set the API key for your Pterodactyl Panel."""
|
|
await self.config.api_key.set(api_key)
|
|
await ctx.send(f"API key set to `{api_key[:5]}...{api_key[-4:]}`")
|
|
self.logger.info("Configuration value set: api_key = %s\nRestarting task...", api_key)
|
|
self.task.cancel()
|
|
self.task = self.get_task()
|
|
|
|
@pterodactyl_config.command(name = "serverid")
|
|
async def pterodactyl_config_server_id(self, ctx: commands.Context, *, server_id: str) -> None:
|
|
"""Set the server ID for your Pterodactyl Panel."""
|
|
await self.config.server_id.set(server_id)
|
|
await ctx.send(f"Server ID set to {server_id}")
|
|
self.logger.info("Configuration value set: server_id = %s\nRestarting task...", server_id)
|
|
self.task.cancel()
|
|
self.task = self.get_task()
|
|
|
|
@pterodactyl_config.command(name = "consolechannel")
|
|
async def pterodactyl_config_console_channel(self, ctx: commands.Context, channel: discord.TextChannel) -> None:
|
|
"""Set the channel to send console output to."""
|
|
await self.config.console_channel.set(channel.id)
|
|
await ctx.send(f"Console channel set to {channel.mention}")
|
|
|
|
@pterodactyl_config.group(name = "chat")
|
|
async def pterodactyl_config_chat(self, ctx: commands.Context):
|
|
"""Configure chat settings."""
|
|
|
|
@pterodactyl_config_chat.command(name = "channel")
|
|
async def pterodactyl_config_chat_channel(self, ctx: commands.Context, channel: discord.TextChannel) -> None:
|
|
"""Set the channel to send chat output to."""
|
|
await self.config.chat_channel.set(channel.id)
|
|
await ctx.send(f"Chat channel set to {channel.mention}")
|
|
|
|
@pterodactyl_config_chat.command(name = "regex")
|
|
async def pterodactyl_config_chat_regex(self, ctx: commands.Context, *, regex: str = None) -> None:
|
|
"""Set the regex pattern to match chat messages.
|
|
|
|
See [documentation]() for more information."""
|
|
#TODO - fix this link
|
|
if regex is None:
|
|
regex = await self.config.chat_regex()
|
|
return await ctx.send(f"Chat regex is currently set to:\n{box(regex, 'regex')}")
|
|
await self.config.chat_regex.set(regex)
|
|
await ctx.send(f"Chat regex set to:\n{box(regex, 'regex')}")
|
|
|
|
@pterodactyl_config_chat.command(name = "tellraw")
|
|
async def pterodactyl_config_chat_tellraw(self, ctx: commands.Context, *, tellraw: str = None) -> None:
|
|
"""Set the tellraw JSON to send chat messages to Discord.
|
|
|
|
Required placeholders: `.$U` (username), `.$M` (message), `.$C` (color)
|
|
See [documentation]() for more information."""
|
|
#TODO - fix this link
|
|
if tellraw is None:
|
|
tellraw = await self.config.tellraw_json()
|
|
return await ctx.send(f"Tellraw JSON is currently set to:\n{box(tellraw, 'json')}")
|
|
await self.config.tellraw_json.set(tellraw)
|
|
await ctx.send(f"Tellraw JSON set to:\n{box(tellraw, 'json')}")
|