forked from cswimr/SeaCogs
314 lines
16 KiB
Python
314 lines
16 KiB
Python
import asyncio
|
|
import json
|
|
from typing import Mapping, Optional
|
|
|
|
import discord
|
|
import websockets
|
|
from pydactyl import PterodactylClient
|
|
from redbot.core import commands
|
|
from redbot.core.bot import Red
|
|
from redbot.core.utils.chat_formatting import box
|
|
|
|
from pterodactyl.config import config, register_config
|
|
from pterodactyl.logger import logger
|
|
|
|
|
|
class Pterodactyl(commands.Cog):
|
|
"""Pterodactyl allows you to manage your Pterodactyl Panel from Discord."""
|
|
|
|
def __init__(self, bot: Red):
|
|
self.bot = bot
|
|
self.client: Optional[PterodactylClient] = None
|
|
self.task: Optional[asyncio.Task] = None
|
|
self.websocket: Optional[websockets.WebSocketClientProtocol] = None
|
|
self.retry_counter: int = 0
|
|
register_config(config)
|
|
|
|
async def cog_load(self) -> None:
|
|
self.retry_counter = 0
|
|
self.task = self.get_task()
|
|
|
|
async def cog_unload(self) -> None:
|
|
self.task.cancel()
|
|
self.retry_counter = 0
|
|
await self.client._session.close() # pylint: disable=protected-access
|
|
|
|
def get_task(self) -> asyncio.Task:
|
|
from pterodactyl.websocket import establish_websocket_connection
|
|
task = self.bot.loop.create_task(establish_websocket_connection(self), name="Pterodactyl Websocket Connection")
|
|
task.add_done_callback(self.error_callback)
|
|
return task
|
|
|
|
def error_callback(self, fut) -> None: #NOTE - Thanks flame442 and zephyrkul for helping me figure this out
|
|
try:
|
|
fut.result()
|
|
except asyncio.CancelledError:
|
|
logger.info("WebSocket task has been cancelled.")
|
|
except Exception as e: # pylint: disable=broad-exception-caught
|
|
logger.error("WebSocket task has failed: %s", e, exc_info=e)
|
|
self.task.cancel()
|
|
if self.retry_counter < 5:
|
|
self.retry_counter += 1
|
|
logger.info("Retrying in %s seconds...", 5 * self.retry_counter)
|
|
self.task = self.bot.loop.call_later(5 * self.retry_counter, self.get_task)
|
|
else:
|
|
logger.info("Retry limit reached. Stopping task.")
|
|
|
|
@commands.Cog.listener()
|
|
async def on_message_without_command(self, message: discord.Message) -> None:
|
|
if message.channel.id == await config.console_channel() and message.author.bot is False:
|
|
logger.debug("Received console command from %s: %s", message.author.id, message.content)
|
|
await message.channel.send(f"Received console command from {message.author.id}: {message.content[:1900]}")
|
|
try:
|
|
await self.websocket.send(json.dumps({"event": "send command", "args": [message.content]}))
|
|
except websockets.exceptions.ConnectionClosed as e:
|
|
logger.error("WebSocket connection closed: %s", e)
|
|
self.task.cancel()
|
|
self.retry_counter = 0
|
|
self.task = self.get_task()
|
|
if message.channel.id == await config.chat_channel() and message.author.bot is False:
|
|
logger.debug("Received chat message from %s: %s", message.author.id, message.content)
|
|
channel = self.bot.get_channel(await config.console_channel())
|
|
if channel:
|
|
await channel.send(f"Received chat message from {message.author.id}: {message.content[:1900]}")
|
|
msg = json.dumps({"event": "send command", "args": [await self.get_chat_command(message)]})
|
|
logger.debug("Sending chat message to server:\n%s", msg)
|
|
try:
|
|
await self.websocket.send(msg)
|
|
except websockets.exceptions.ConnectionClosed as e:
|
|
logger.error("WebSocket connection closed: %s", e)
|
|
self.task.cancel()
|
|
self.retry_counter = 0
|
|
self.task = self.get_task()
|
|
|
|
async def get_chat_command(self, message: discord.Message) -> str:
|
|
command: str = await config.chat_command()
|
|
placeholders = {
|
|
"C": str(message.author.color),
|
|
"D": message.author.discriminator,
|
|
"I": str(message.author.id),
|
|
"M": message.content,
|
|
"N": message.author.display_name,
|
|
"U": message.author.name,
|
|
}
|
|
for key, value in placeholders.items():
|
|
command = command.replace('.$' + key, value)
|
|
return command
|
|
|
|
@commands.Cog.listener()
|
|
async def on_red_api_tokens_update(self, service_name: str, api_tokens: Mapping[str,str]): # pylint: disable=unused-argument
|
|
if service_name == "pterodactyl":
|
|
logger.info("Configuration value set: api_key\nRestarting task...")
|
|
self.task.cancel()
|
|
self.retry_counter = 0
|
|
self.task = self.get_task()
|
|
|
|
@commands.group(autohelp = True, name = "pterodactyl", aliases = ["ptero"])
|
|
async def pterodactyl(self, ctx: commands.Context) -> None:
|
|
"""Pterodactyl allows you to manage your Pterodactyl Panel from Discord."""
|
|
|
|
@pterodactyl.group(autohelp = True, name = "power")
|
|
@commands.admin()
|
|
async def pterodactyl_power(self, ctx: commands.Context) -> None:
|
|
"""Send power actions to the server."""
|
|
|
|
@pterodactyl_power.command(name = "start")
|
|
async def pterodactyl_power_start(self, ctx: commands.Context) -> None:
|
|
"""Start the server."""
|
|
current_status = await config.current_status()
|
|
if current_status == "running":
|
|
return await ctx.send("Server is already running.")
|
|
elif current_status in ["starting", "stopping"]:
|
|
return await ctx.send("Another power action is already in progress.")
|
|
message = await ctx.send("Sending websocket command to start server...")
|
|
await self.websocket.send(json.dumps({"event": "set state", "args": ["start"]}))
|
|
await message.edit(content="Server starting...")
|
|
|
|
@pterodactyl_power.command(name = "stop")
|
|
async def pterodactyl_power_stop(self, ctx: commands.Context) -> None:
|
|
"""Stop the server."""
|
|
current_status = await config.current_status()
|
|
if current_status == "stopped":
|
|
return await ctx.send("Server is already stopped.")
|
|
elif current_status in ["starting", "stopping"]:
|
|
return await ctx.send("Another power action is already in progress.")
|
|
message = await ctx.send("Sending websocket command to stop server...")
|
|
await self.websocket.send(json.dumps({"event": "set state", "args": ["stop"]}))
|
|
await message.edit(content="Server stopping...")
|
|
|
|
@pterodactyl_power.command(name = "restart")
|
|
async def pterodactyl_power_restart(self, ctx: commands.Context) -> None:
|
|
"""Restart the server."""
|
|
current_status = await config.current_status()
|
|
if current_status in ["starting", "stopping"]:
|
|
return await ctx.send("Another power action is already in progress.")
|
|
message = await ctx.send("Sending websocket command to restart server...")
|
|
await self.websocket.send(json.dumps({"event": "set state", "args": ["restart"]}))
|
|
await message.edit(content="Server restarting...")
|
|
|
|
@pterodactyl.group(autohelp = True, name = "config", aliases = ["settings", "set"])
|
|
@commands.is_owner()
|
|
async def pterodactyl_config(self, ctx: commands.Context) -> None:
|
|
"""Configure Pterodactyl settings."""
|
|
|
|
@pterodactyl_config.command(name = "url")
|
|
async def pterodactyl_config_base_url(self, ctx: commands.Context, *, base_url: str = None) -> None:
|
|
"""Set the base URL of your Pterodactyl Panel.
|
|
|
|
Please include the protocol (http/https).
|
|
Example: `https://panel.example.com`"""
|
|
if base_url is None:
|
|
base_url = await config.base_url()
|
|
return await ctx.send(f"Base URL is currently set to {base_url}")
|
|
await config.base_url.set(base_url)
|
|
await ctx.send(f"Base URL set to {base_url}")
|
|
logger.info("Configuration value set: base_url = %s\nRestarting task...", base_url)
|
|
self.task.cancel()
|
|
self.retry_counter = 0
|
|
self.task = self.get_task()
|
|
|
|
@pterodactyl_config.command(name = "serverid")
|
|
async def pterodactyl_config_server_id(self, ctx: commands.Context, *, server_id: str) -> None:
|
|
"""Set the server ID for your Pterodactyl Panel."""
|
|
await config.server_id.set(server_id)
|
|
await ctx.send(f"Server ID set to {server_id}")
|
|
logger.info("Configuration value set: server_id = %s\nRestarting task...", server_id)
|
|
self.task.cancel()
|
|
self.retry_counter = 0
|
|
self.task = self.get_task()
|
|
|
|
@pterodactyl_config.command(name = "consolechannel")
|
|
async def pterodactyl_config_console_channel(self, ctx: commands.Context, channel: discord.TextChannel) -> None:
|
|
"""Set the channel to send console output to."""
|
|
await config.console_channel.set(channel.id)
|
|
await ctx.send(f"Console channel set to {channel.mention}")
|
|
|
|
@pterodactyl_config.group(name = "chat")
|
|
async def pterodactyl_config_chat(self, ctx: commands.Context):
|
|
"""Configure chat settings."""
|
|
|
|
@pterodactyl_config_chat.command(name = "channel")
|
|
async def pterodactyl_config_chat_channel(self, ctx: commands.Context, channel: discord.TextChannel) -> None:
|
|
"""Set the channel to send chat output to."""
|
|
await config.chat_channel.set(channel.id)
|
|
await ctx.send(f"Chat channel set to {channel.mention}")
|
|
|
|
@pterodactyl_config_chat.command(name = "command")
|
|
async def pterodactyl_config_chat_command(self, ctx: commands.Context, *, command: str = None) -> None:
|
|
"""Set the command that will be used to send messages from Discord.
|
|
|
|
Required placeholders: `.$U` (username), `.$M` (message), `.$C` (color)
|
|
See [documentation](https://seacogs.coastalcommits.com/pterodactyl/setup/#changing-the-tellraw-command) for more information."""
|
|
if command is None:
|
|
command = await config.chat_command()
|
|
return await ctx.send(f"Chat command is currently set to:\n{box(command, 'json')}")
|
|
await config.chat_command.set(command)
|
|
await ctx.send(f"Chat command set to:\n{box(command, 'json')}")
|
|
|
|
@pterodactyl_config.group(name = "regex")
|
|
async def pterodactyl_config_regex(self, ctx: commands.Context) -> None:
|
|
"""Set regex patterns."""
|
|
|
|
@pterodactyl_config_regex.command(name = "chat")
|
|
async def pterodactyl_config_regex_chat(self, ctx: commands.Context, *, regex: str = None) -> None:
|
|
"""Set the regex pattern to match chat messages on the server.
|
|
|
|
See [documentation](https://seacogs.coastalcommits.com/pterodactyl/setup/#my-chat-messages-arent-detected) for more information."""
|
|
if regex is None:
|
|
regex = await config.chat_regex()
|
|
return await ctx.send(f"Chat regex is currently set to:\n{box(regex, 'regex')}")
|
|
await config.chat_regex.set(regex)
|
|
await ctx.send(f"Chat regex set to:\n{box(regex, 'regex')}")
|
|
|
|
@pterodactyl_config_regex.command(name = "server")
|
|
async def pterodactyl_config_regex_server(self, ctx: commands.Context, *, regex: str = None) -> None:
|
|
"""Set the regex pattern to match server messages on the server.
|
|
|
|
See [documentation](https://seacogs.coastalcommits.com/pterodactyl/setup/#my-chat-messages-arent-detected) for more information."""
|
|
if regex is None:
|
|
regex = await config.server_regex()
|
|
return await ctx.send(f"Server regex is currently set to:\n{box(regex, 'regex')}")
|
|
await config.server_regex.set(regex)
|
|
await ctx.send(f"Server regex set to:\n{box(regex, 'regex')}")
|
|
|
|
@pterodactyl_config_regex.command(name = "join")
|
|
async def pterodactyl_config_regex_join(self, ctx: commands.Context, *, regex: str = None) -> None:
|
|
"""Set the regex pattern to match join messages on the server.
|
|
|
|
See [documentation](https://seacogs.coastalcommits.com/pterodactyl/setup/#my-chat-messages-arent-detected) for more information."""
|
|
if regex is None:
|
|
regex = await config.join_regex()
|
|
return await ctx.send(f"Join regex is currently set to:\n{box(regex, 'regex')}")
|
|
await config.join_regex.set(regex)
|
|
await ctx.send(f"Join regex set to:\n{box(regex, 'regex')}")
|
|
|
|
@pterodactyl_config_regex.command(name = "leave")
|
|
async def pterodactyl_config_regex_leave(self, ctx: commands.Context, *, regex: str = None) -> None:
|
|
"""Set the regex pattern to match leave messages on the server.
|
|
|
|
See [documentation](https://seacogs.coastalcommits.com/pterodactyl/setup/#my-chat-messages-arent-detected) for more information."""
|
|
if regex is None:
|
|
regex = await config.leave_regex()
|
|
return await ctx.send(f"Leave regex is currently set to:\n{box(regex, 'regex')}")
|
|
await config.leave_regex.set(regex)
|
|
await ctx.send(f"Leave regex set to:\n{box(regex, 'regex')}")
|
|
|
|
@pterodactyl_config_regex.command(name = "achievement")
|
|
async def pterodactyl_config_regex_achievement(self, ctx: commands.Context, *, regex: str = None) -> None:
|
|
"""Set the regex pattern to match achievement messages on the server.
|
|
|
|
See [documentation](https://seacogs.coastalcommits.com/pterodactyl/setup/#my-chat-messages-arent-detected) for more information."""
|
|
if regex is None:
|
|
regex = await config.achievement_regex()
|
|
return await ctx.send(f"Achievement regex is currently set to:\n{box(regex, 'regex')}")
|
|
await config.achievement_regex.set(regex)
|
|
await ctx.send(f"Achievement regex set to:\n{box(regex, 'regex')}")
|
|
|
|
@pterodactyl_config.group(name = "messages", aliases = ['msg', 'msgs', 'message'])
|
|
async def pterodactyl_config_messages(self, ctx: commands.Context):
|
|
"""Configure message settings."""
|
|
|
|
@pterodactyl_config_messages.command(name = "startup")
|
|
async def pterodactyl_config_messages_startup(self, ctx: commands.Context, *, message: str = None) -> None:
|
|
"""Set the message that will be sent when the server starts."""
|
|
if message is None:
|
|
message = await config.startup_msg()
|
|
return await ctx.send(f"Startup message is currently set to: {message}")
|
|
await config.startup_msg.set(message)
|
|
await ctx.send(f"Startup message set to: {message}")
|
|
|
|
@pterodactyl_config_messages.command(name = "shutdown")
|
|
async def pterodactyl_config_messages_shutdown(self, ctx: commands.Context, *, message: str = None) -> None:
|
|
"""Set the message that will be sent when the server stops."""
|
|
if message is None:
|
|
message = await config.shutdown_msg()
|
|
return await ctx.send(f"Shutdown message is currently set to: {message}")
|
|
await config.shutdown_msg.set(message)
|
|
await ctx.send(f"Shutdown message set to: {message}")
|
|
|
|
@pterodactyl_config_messages.command(name = "join")
|
|
async def pterodactyl_config_messages_join(self, ctx: commands.Context, *, message: str = None) -> None:
|
|
"""Set the message that will be sent when a user joins the server. This is only shown in embeds."""
|
|
if message is None:
|
|
message = await config.join_msg()
|
|
return await ctx.send(f"Join message is currently set to: {message}")
|
|
await config.join_msg.set(message)
|
|
await ctx.send(f"Join message set to: {message}")
|
|
|
|
@pterodactyl_config_messages.command(name = "leave")
|
|
async def pterodactyl_config_messages_leave(self, ctx: commands.Context, *, message: str = None) -> None:
|
|
"""Set the message that will be sent when a user leaves the server. This is only shown in embeds."""
|
|
if message is None:
|
|
message = await config.leave_msg()
|
|
return await ctx.send(f"Leave message is currently set to: {message}")
|
|
await config.leave_msg.set(message)
|
|
await ctx.send(f"Leave message set to: {message}")
|
|
|
|
@pterodactyl_config.command(name = "ip")
|
|
async def pterodactyl_config_mask_ip(self, ctx: commands.Context, mask: bool = None) -> None:
|
|
"""Mask the IP addresses of users in console messages."""
|
|
if mask is None:
|
|
mask = await config.mask_ip()
|
|
return await ctx.send(f"IP masking is currently set to {mask}")
|
|
await config.mask_ip.set(mask)
|
|
await ctx.send(f"IP masking set to {mask}")
|