feat(pterodactyl): fixed slash commands
Some checks failed
Actions / Lint Code (Ruff & Pylint) (push) Failing after 21s
Actions / Build Documentation (MkDocs) (push) Successful in 25s

This commit is contained in:
Seaswimmer 2024-03-07 00:56:50 -05:00
parent f033f6a483
commit bbb54f0f55
Signed by: cswimr
GPG key ID: B8953EC01E5C4063

View file

@ -1,11 +1,12 @@
import asyncio import asyncio
import json import json
from typing import Mapping, Optional from typing import Mapping, Optional, Union
import discord import discord
import websockets import websockets
from pydactyl import PterodactylClient from pydactyl import PterodactylClient
from redbot.core import commands from redbot.core import app_commands, commands
from redbot.core.app_commands import Choice
from redbot.core.bot import Red from redbot.core.bot import Red
from redbot.core.utils.chat_formatting import box, error from redbot.core.utils.chat_formatting import box, error
from redbot.core.utils.views import ConfirmView from redbot.core.utils.views import ConfirmView
@ -97,6 +98,80 @@ class Pterodactyl(commands.Cog):
command = command.replace('.$' + key, value) command = command.replace('.$' + key, value)
return command return command
async def power(self, ctx: Union[discord.Interaction, commands.Context], action: str, action_ing: str, warning: str = '') -> None:
if isinstance(ctx, discord.Interaction):
author = ctx.user
else:
author = ctx.author
current_status = await config.current_status()
if current_status == action_ing:
if isinstance(ctx, discord.Interaction):
return await ctx.response.send_message(f"Server is already {action_ing}.", ephemeral=True)
else:
return await ctx.send(f"Server is already {action_ing}.")
if current_status in ["starting", "stopping"]:
if isinstance(ctx, discord.Interaction):
return await ctx.response.send_message("Another power action is already in progress.", ephemeral=True)
return await ctx.send("Another power action is already in progress.")
view = ConfirmView(author, disable_buttons=True)
if isinstance(ctx, discord.Interaction):
await ctx.response.send_message(f"{warning}Are you sure you want to {action} the server?", view=view)
else:
message = await ctx.send(f"{warning}Are you sure you want to {action} the server?", view=view)
await view.wait()
if view.result is True:
if isinstance(ctx, discord.Interaction):
await ctx.response.edit_message(content=f"Sending websocket command to {action} server...", view=None)
else:
await message.edit(content=f"Sending websocket command to {action} server...", view=None)
await self.websocket.send(json.dumps({"event": "set state", "args": [action]}))
if isinstance(ctx, discord.Interaction):
await ctx.response.edit_message(content=f"Server {action_ing}", view=None)
else:
await message.edit(content=f"Server {action_ing}", view=None)
else:
if isinstance(ctx, discord.Interaction):
await ctx.response.edit_message(content="Cancelled.", view=None)
else:
await message.edit(content="Cancelled.", view=None)
async def send_command(self, ctx: Union[discord.Interaction, commands.Context], command: str):
channel = self.bot.get_channel(await config.console_channel())
if isinstance(ctx, discord.Interaction):
if channel:
await channel.send(f"Received console command from {ctx.user.id}: {command[:1900]}")
try:
await self.websocket.send(json.dumps({"event": "send command", "args": [command]}))
await ctx.response.send_message(f"Command sent to server. {box(command, 'json')}", ephemeral=True)
except websockets.exceptions.ConnectionClosed as e:
logger.error("WebSocket connection closed: %s", e)
await ctx.response.send_message(error("WebSocket connection closed."))
self.task.cancel()
self.retry_counter = 0
self.task = self.get_task()
else:
if channel:
await channel.send(f"Received console command from {ctx.author.id}: {command[:1900]}")
try:
await self.websocket.send(json.dumps({"event": "send command", "args": [command]}))
await ctx.send(f"Command sent to server. {box(command, 'json')}")
except websockets.exceptions.ConnectionClosed as e:
logger.error("WebSocket connection closed: %s", e)
await ctx.send(error("WebSocket connection closed."))
self.task.cancel()
self.retry_counter = 0
self.task = self.get_task()
@commands.Cog.listener() @commands.Cog.listener()
async def on_red_api_tokens_update(self, service_name: str, api_tokens: Mapping[str,str]): # pylint: disable=unused-argument async def on_red_api_tokens_update(self, service_name: str, api_tokens: Mapping[str,str]): # pylint: disable=unused-argument
if service_name == "pterodactyl": if service_name == "pterodactyl":
@ -105,7 +180,38 @@ class Pterodactyl(commands.Cog):
self.retry_counter = 0 self.retry_counter = 0
self.task = self.get_task() self.task = self.get_task()
@commands.hybrid_group(autohelp = True, name = "pterodactyl", aliases = ["ptero"]) slash_pterodactyl = app_commands.Group(name="pterodactyl", description="Pterodactyl allows you to manage your Pterodactyl Panel from Discord.")
@slash_pterodactyl.command(name = "command", description = "Send a command to the server console.")
async def slash_pterodactyl_command(self, interaction: discord.Interaction, command: str) -> None:
"""Send a command to the server console.
Parameters:
-----------
command: str
The command to send to the server."""
return await self.send_command(interaction, command)
@slash_pterodactyl.command(name = "power", description = "Send power actions to the server.")
@app_commands.choices(action=[
Choice(name="Start", value="start"),
Choice(name="Stop", value="stop"),
Choice(name="Restart", value="restart"),
Choice(name="⚠️ Kill ⚠️", value="kill")
])
@app_commands.describe("action", "The action to perform on the server.")
async def slash_pterodactyl_power(self, interaction: discord.Interaction, action: app_commands.Choice[str]) -> None:
"""Send power actions to the server.
Parameters:
-----------
action: app_commands.Choice[str]
The action to perform on the server."""
if action.value == "kill":
return await self.power(interaction, action.value, "stopping... (forcefully killed)", warning="**⚠️ Forcefully killing the server process can corrupt data in some cases. ⚠️**\n")
return await self.power(interaction, action.value, f"{action.value}ing...")
@commands.group(autohelp = True, name = "pterodactyl", aliases = ["ptero"])
async def pterodactyl(self, ctx: commands.Context) -> None: async def pterodactyl(self, ctx: commands.Context) -> None:
"""Pterodactyl allows you to manage your Pterodactyl Panel from Discord.""" """Pterodactyl allows you to manage your Pterodactyl Panel from Discord."""
@ -113,18 +219,7 @@ class Pterodactyl(commands.Cog):
@commands.admin() @commands.admin()
async def pterodactyl_command(self, ctx: commands.Context, *, command: str) -> None: async def pterodactyl_command(self, ctx: commands.Context, *, command: str) -> None:
"""Send a command to the server console.""" """Send a command to the server console."""
channel = self.bot.get_channel(await config.console_channel()) return await self.send_command(ctx, command)
if channel:
await channel.send(f"Received console command from {ctx.author.id}: {command[:1900]}")
try:
await self.websocket.send(json.dumps({"event": "send command", "args": [command]}))
await ctx.send(f"Command sent to server. {box(command, 'json')}")
except websockets.exceptions.ConnectionClosed as e:
logger.error("WebSocket connection closed: %s", e)
await ctx.send(error("WebSocket connection closed."))
self.task.cancel()
self.retry_counter = 0
self.task = self.get_task()
@pterodactyl.group(autohelp = True, name = "power") @pterodactyl.group(autohelp = True, name = "power")
@commands.admin() @commands.admin()
@ -134,72 +229,24 @@ class Pterodactyl(commands.Cog):
@pterodactyl_power.command(name = "start") @pterodactyl_power.command(name = "start")
async def pterodactyl_power_start(self, ctx: commands.Context) -> Optional[discord.Message]: async def pterodactyl_power_start(self, ctx: commands.Context) -> Optional[discord.Message]:
"""Start the server.""" """Start the server."""
current_status = await config.current_status() return await self.power(ctx, "start", "starting...")
if current_status == "running":
return await ctx.send("Server is already running.")
if current_status in ["starting", "stopping"]:
return await ctx.send("Another power action is already in progress.")
view = ConfirmView(ctx.author, disable_buttons=True)
message = await ctx.send("Are you sure you want to start the server?", view=view)
await view.wait()
if view.result is True:
await message.edit(content="Sending websocket command to start server...", view=None)
await self.websocket.send(json.dumps({"event": "set state", "args": ["start"]}))
await message.edit(content="Server starting...", view=None)
else:
await message.edit(content="Cancelled.", view=None)
@pterodactyl_power.command(name = "stop") @pterodactyl_power.command(name = "stop")
async def pterodactyl_power_stop(self, ctx: commands.Context) -> Optional[discord.Message]: async def pterodactyl_power_stop(self, ctx: commands.Context) -> Optional[discord.Message]:
"""Stop the server.""" """Stop the server."""
current_status = await config.current_status() return await self.power(ctx, "stop", "stopping...")
if current_status == "stopped":
return await ctx.send("Server is already stopped.")
if current_status in ["starting", "stopping"]:
return await ctx.send("Another power action is already in progress.")
view = ConfirmView(ctx.author, disable_buttons=True)
message = await ctx.send("Are you sure you want to stop the server?", view=view)
await view.wait()
if view.result is True:
await message.edit(content="Sending websocket command to stop server...", view=None)
await self.websocket.send(json.dumps({"event": "set state", "args": ["stop"]}))
await message.edit(content="Server stopping...", view=None)
else:
await message.edit(content="Cancelled.", view=None)
@pterodactyl_power.command(name = "restart") @pterodactyl_power.command(name = "restart")
async def pterodactyl_power_restart(self, ctx: commands.Context) -> Optional[discord.Message]: async def pterodactyl_power_restart(self, ctx: commands.Context) -> Optional[discord.Message]:
"""Restart the server.""" """Restart the server."""
current_status = await config.current_status() return await self.power(ctx, "restart", "restarting...")
if current_status in ["starting", "stopping"]:
return await ctx.send("Another power action is already in progress.")
view = ConfirmView(ctx.author, disable_buttons=True)
message = await ctx.send("Are you sure you want to restart the server?", view=view)
await view.wait()
if view.result is True:
await message.edit(content="Sending websocket command to restart server...", view=None)
await self.websocket.send(json.dumps({"event": "set state", "args": ["restart"]}))
await message.edit(content="Server restarting...", view=None)
else:
await message.edit(content="Cancelled.", view=None)
@pterodactyl_power.command(name = "kill") @pterodactyl_power.command(name = "kill")
async def pterodactyl_power_kill(self, ctx: commands.Context) -> Optional[discord.Message]: async def pterodactyl_power_kill(self, ctx: commands.Context) -> Optional[discord.Message]:
"""Kill the server.""" """Kill the server."""
current_status = await config.current_status() return await self.power(ctx, "kill", "stopping... (forcefully killed)", warning="**⚠️ Forcefully killing the server process can corrupt data in some cases. ⚠️**\n")
if current_status == 'stopped':
return await ctx.send("Server is already stopped.")
view = ConfirmView(ctx.author, disable_buttons=True)
message = await ctx.send("**⚠️ Forcefully killing the server process can corrupt data in some cases. ⚠️**\nAre you sure you want to kill the server?", view=view)
await view.wait()
if view.result is True:
await message.edit(content="Sending websocket command to kill server...", view=None)
await self.websocket.send(json.dumps({"event": "set state", "args": ["kill"]}))
await message.edit(content="Server stopping... (forcefully killed)", view=None)
else:
await message.edit(content="Cancelled.", view=None)
@pterodactyl.hybrid_group(autohelp = True, name = "config", aliases = ["settings", "set"], with_app_command = False) @pterodactyl.group(autohelp = True, name = "config", aliases = ["settings", "set"])
@commands.is_owner() @commands.is_owner()
async def pterodactyl_config(self, ctx: commands.Context) -> None: async def pterodactyl_config(self, ctx: commands.Context) -> None:
"""Configure Pterodactyl settings.""" """Configure Pterodactyl settings."""
@ -239,7 +286,7 @@ class Pterodactyl(commands.Cog):
await config.invite.set(invite) await config.invite.set(invite)
await ctx.send(f"Invite link set to {invite}") await ctx.send(f"Invite link set to {invite}")
@pterodactyl_config.hybrid_group(name = "chat", with_app_command = False) @pterodactyl_config.group(name = "chat")
async def pterodactyl_config_chat(self, ctx: commands.Context): async def pterodactyl_config_chat(self, ctx: commands.Context):
"""Configure chat settings.""" """Configure chat settings."""
@ -258,7 +305,7 @@ class Pterodactyl(commands.Cog):
await config.chat_command.set(command) await config.chat_command.set(command)
await ctx.send(f"Chat command set to:\n{box(command, 'json')}") await ctx.send(f"Chat command set to:\n{box(command, 'json')}")
@pterodactyl_config.hybrid_group(name = "regex", with_app_command = False) @pterodactyl_config.group(name = "regex")
async def pterodactyl_config_regex(self, ctx: commands.Context) -> None: async def pterodactyl_config_regex(self, ctx: commands.Context) -> None:
"""Set regex patterns.""" """Set regex patterns."""
@ -302,7 +349,7 @@ class Pterodactyl(commands.Cog):
await config.achievement_regex.set(regex) await config.achievement_regex.set(regex)
await ctx.send(f"Achievement regex set to:\n{box(regex, 'regex')}") await ctx.send(f"Achievement regex set to:\n{box(regex, 'regex')}")
@pterodactyl_config.hybrid_group(name = "messages", aliases = ['msg', 'msgs', 'message'], with_app_command = False) @pterodactyl_config.group(name = "messages", aliases = ['msg', 'msgs', 'message'])
async def pterodactyl_config_messages(self, ctx: commands.Context): async def pterodactyl_config_messages(self, ctx: commands.Context):
"""Configure message settings.""" """Configure message settings."""
@ -345,7 +392,7 @@ class Pterodactyl(commands.Cog):
await config.api_endpoint.set(endpoint) await config.api_endpoint.set(endpoint)
await ctx.send(f"API endpoint set to {endpoint}") await ctx.send(f"API endpoint set to {endpoint}")
@pterodactyl_config_regex.hybrid_group(name = "blacklist", aliases = ['block', 'blocklist'], with_app_command = False) @pterodactyl_config_regex.group(name = "blacklist", aliases = ['block', 'blocklist'],)
async def pterodactyl_config_regex_blacklist(self, ctx: commands.Context): async def pterodactyl_config_regex_blacklist(self, ctx: commands.Context):
"""Blacklist regex patterns.""" """Blacklist regex patterns."""