Compare commits

..

1 commit

Author SHA1 Message Date
e90dc60643
misc(pterodactyl): added some logging calls 2024-08-09 19:46:17 -04:00
53 changed files with 3230 additions and 3113 deletions

View file

@ -1,38 +0,0 @@
FROM ghcr.io/astral-sh/uv:0.5.24@sha256:2381d6aa60c326b71fd40023f921a0a3b8f91b14d5db6b90402e65a635053709 AS uv
FROM python:3.11-slim@sha256:6ed5bff4d7d377e2a27d9285553b8c21cfccc4f00881de1b24c9bc8d90016e82 AS python
FROM code.forgejo.org/forgejo/runner:6.2.1@sha256:fecc96a111a15811a6887ce488e75718089f24599e613e93db8e54fe70b706e8 AS forgejo-runner
FROM mcr.microsoft.com/vscode/devcontainers/base:bookworm@sha256:6155a486f236fd5127b76af33086029d64f64cf49dd504accb6e5f949098eb7e
LABEL repository="www.coastalcommits.com/cswimr/SeaCogs"
LABEL maintainer="cswimr <seaswimmerthefsh@gmail.com>"
RUN apt-get update; \
apt-get install -y --no-install-recommends \
# Red-DiscordBot
build-essential \
git \
# PyNaCl
libsodium-dev \
# CFFI
libffi-dev \
# SSH repository support
openssh-client \
# Cog dependencies
# Audio
openjdk-17-jre-headless \
# PyLav
libaio1 \
libaio-dev \
# SeaUtils
dnsutils; \
apt-get clean; \
rm -rf /var/lib/apt/lists/*
COPY --from=uv --chown=vscode: /uv /uvx /bin/
COPY --from=python --chown=vscode: /usr/local /usr/local
COPY --from=forgejo-runner --chown=vscode: /bin/forgejo-runner /bin/forgejo-runner
COPY --chown=vscode: .devcontainer/home/* /home/vscode/
RUN ln -s /usr/local/bin/python3.11 /usr/local/bin/python; \
python --version; \
python -m ensurepip

View file

@ -1,52 +0,0 @@
{
"name": "Red-DiscordBot: SeaCogs",
"build": {
"context": "..",
"dockerfile": "Dockerfile"
},
"features": {
"ghcr.io/devcontainers/features/docker-in-docker:2": {}
},
"customizations": {
"vscode": {
"settings": {
"python.terminal.activateEnvInCurrentTerminal": true,
"python.terminal.activateEnvironment": true,
"terminal.integrated.defaultProfile.linux": "zsh",
"terminal.integrated.profiles.linux": {
"zsh": {
"path": "/bin/zsh"
}
}
},
"extensions": [
"charliermarsh.ruff",
"ms-azuretools.vscode-docker",
"ms-python.python",
"tekumara.typos-vscode",
"tamasfe.even-better-toml",
"redhat.vscode-yaml",
"DavidAnson.vscode-markdownlint",
"yy0931.vscode-sqlite3-editor",
"aaron-bond.better-comments",
"donjayamanne.githistory",
"eamodio.gitlens"
]
}
},
"containerEnv": {
"DISPLAY": "dummy",
"PYTHONUNBUFFERED": "True",
"UV_LINK_MODE": "copy",
"UV_PYTHON_PREFERENCE": "only-system",
"UV_PYTHON_DOWNLOADS": "never",
"PROJECT_DIR": "/workspaces/SeaCogs"
},
"mounts": [
"source=seacogs-persistent-data,target=/workspaces/SeaCogs/.data,type=volume"
],
"postCreateCommand": {
"Setup Virtual Environment": "uv sync --frozen && sudo chown -R vscode:vscode /workspaces/SeaCogs/.data && uv run redbot-setup --no-prompt --instance-name=local --data-path=/workspaces/SeaCogs/.data --backend=json"
},
"remoteUser": "vscode"
}

View file

@ -1,3 +0,0 @@
#!/usr/bin/env bash
alias runactions="forgejo-runner exec --default-actions-url=https://www.coastalcommits.com --gitea-instance=https://www.coastalcommits.com"

View file

@ -10,7 +10,7 @@ Aurora is a fully-featured moderation system. It is heavily inspired by Galactic
## Installation
```bash
[p]repo add seacogs https://www.coastalcommits.com/cswimr/SeaCogs
[p]repo add seacogs https://www.coastalcommits.com/SeaswimmerTheFsh/SeaCogs
[p]cog install seacogs aurora
[p]load aurora
[p]cog load aurora
```

View file

@ -5,14 +5,14 @@ Backup allows you to export a JSON list of all of your installed repositories an
## Installation
```bash
[p]repo add seacogs https://www.coastalcommits.com/cswimr/SeaCogs
[p]repo add seacogs https://www.coastalcommits.com/SeaswimmerTheFsh/SeaCogs
[p]cog install seacogs backup
[p]load backup
[p]cog load backup
```
## Version Compatibility
As of commit [1edb08a](https://www.coastalcommits.com/cswimr/SeaCogs/commit/1edb08a1271f12098ca0bed11a735f7162cedd14), the Backup cog no longer supports Red versions older than 3.5.6. If you want to use the cog on an earlier version (3.5.0 - 3.5.5), install the cog pinned to this commit: `43464db6a7c51bc69282b1ae3dc507a4aae851de`.
As of commit [1edb08a](https://www.coastalcommits.com/SeaswimmerTheFsh/SeaCogs/commit/1edb08a1271f12098ca0bed11a735f7162cedd14), the Backup cog no longer supports Red versions older than 3.5.6. If you want to use the cog on an earlier version (3.5.0 - 3.5.5), install the cog pinned to this commit: `43464db6a7c51bc69282b1ae3dc507a4aae851de`.
```bash
[p]cog installversion sea-cogs 43464db6a7c51bc69282b1ae3dc507a4aae851de backup

View file

@ -6,9 +6,9 @@ This cog does require an api key to work.
## Installation
```bash
[p]repo add seacogs https://www.coastalcommits.com/cswimr/SeaCogs
[p]repo add seacogs https://www.coastalcommits.com/SeaswimmerTheFsh/SeaCogs
[p]cog install seacogs bible
[p]load bible
[p]cog load bible
```
## Setup
@ -21,9 +21,8 @@ Then, you can use `[p]set api` to set the API key. Make sure your formatting mat
## Commands
### bible passage
- Usage: `[p]bible passage <book> <passage>`
- Aliases: `verse`
- Usage: `[p]bible passage <book> <passage>`
- Aliases: `verse`
Get a Bible passage.
@ -32,7 +31,6 @@ Example usage:
`[p]bible passage John 3:16-3:17`
### bible random
- Usage: `[p]bible random`
- Usage: `[p]bible random`
Get a random Bible verse.

View file

@ -5,9 +5,9 @@ EmojiInfo allows you to retrieve information about an emoji.
## Installation
```bash
[p]repo add seacogs https://www.coastalcommits.com/cswimr/SeaCogs
[p]repo add seacogs https://www.coastalcommits.com/SeaswimmerTheFsh/SeaCogs
[p]cog install seacogs emojiinfo
[p]load emojiinfo
[p]cog load emojiinfo
```
## Commands

View file

@ -1,26 +0,0 @@
# HotReload
HotReload automatically reloads cogs in local cog paths on file change.
This is useful for development, as it allows you to make changes to your cogs and see the changes reflected in Discord immediately, without having to manually `[p]reload` the cog.
## Installation
```bash
[p]repo add seacogs https://www.coastalcommits.com/cswimr/SeaCogs
[p]cog install seacogs hotreload
[p]load hotreload
```
## Commands
### hotreload compile
Determines if the cog should try to compile a modified Python file before reloading the associated cog. Useful for catching syntax errors. Disabled by default.
### hotreload notifychannel
Set the channel where hotreload will send notifications when a cog is reloaded.
### hotreload list
Debugging command that shows the list of currently active observers. May be expanded in the future to show watched file paths.

View file

@ -5,9 +5,9 @@ Nerdify allows you to nerdify other people's text.
## Installation
```bash
[p]repo add seacogs https://www.coastalcommits.com/cswimr/SeaCogs
[p]repo add seacogs https://www.coastalcommits.com/SeaswimmerTheFsh/SeaCogs
[p]cog install seacogs nerdify
[p]load nerdify
[p]cog load nerdify
```
## Commands

View file

@ -28,7 +28,7 @@ The Downloader cog allows you to add Git repositories to your bot in order to do
Now, use Downloader to add my repository to your bot:
```
[p]repo add sea-cogs https://www.coastalcommits.com/cswimr/SeaCogs
[p]repo add sea-cogs https://www.coastalcommits.com/SeaswimmerTheFsh/SeaCogs
```
Now, install the Pterodactyl cog:

View file

@ -10,7 +10,7 @@ Pterodactyl allows for connecting to a Pterodactyl server through websockets. It
## Installation
```bash
[p]repo add seacogs https://www.coastalcommits.com/cswimr/SeaCogs
[p]repo add seacogs https://www.coastalcommits.com/SeaswimmerTheFsh/SeaCogs
[p]cog install seacogs pterodactyl
[p]load pterodactyl
[p]cog load aurora
```

View file

@ -10,7 +10,7 @@ There are a few caveats to running an instance of Red on Pterodactyl.
- You will not receive any support from the Red developers.
- The built-in Audio cog will not work.
- Depending on your host, you might have to request a [`tmpfs` size increase](https://github.com/pelican-eggs/eggs/tree/master/bots/discord/redbot#additional-requirements).
- Depending on your host, you might have to request a [`tmpfs` size increase](https://github.com/ign-gg/Pterodactyl-Eggs/tree/master/bots/discord/redbot#additional-requirements).
If these are unacceptable to you, you should [install Red normally](https://docs.discord.red/en/stable/install_guides/index.html).
///
@ -64,7 +64,7 @@ Red is quite a large bot, so I'll focus on the specifics of getting the bot work
```
2. Add my repository to the bot
```bash
[p]repo add sea-cogs https://www.coastalcommits.com/cswimr/SeaCogs
[p]repo add sea-cogs https://www.coastalcommits.com/SeaswimmerTheFsh/SeaCogs
```
3. Install and load the Pterodactyl cog
```bash

View file

@ -1,6 +1,8 @@
name: Bug Report
about: File a bug report
title: "[Cog Name] "
labels: [bug]
ref: master
body:
- type: markdown
attributes:
@ -11,7 +13,7 @@ body:
attributes:
label: Please confirm that;
options:
- label: I have checked that this bug does not already have an opened/closed [issue](https://www.coastalcommits.com/cswimr/SeaCogs/issues) or [pull request](https://www.coastalcommits.com/cswimr/SeaCogs/pulls) associated with it.
- label: I have checked that this bug does not already have an opened/closed [issue](https://www.coastalcommits.com/SeaswimmerTheFsh/SeaCogs/issues) or [pull request](https://www.coastalcommits.com/SeaswimmerTheFsh/SeaCogs/pulls) associated with it.
required: true
- label: I have checked that I am on the latest version of [Red-DiscordBot](https://github.com/CogCreators/Red-DiscordBot), and SeaCogs.
required: true

View file

@ -1,6 +1,8 @@
name: Suggestion
about: Trying to suggest something for SeaCogs? Use this.
labels: [enhancement]
title: "[Cog Name] "
labels: enhancement
ref: master
body:
- type: markdown
attributes:
@ -11,7 +13,7 @@ body:
attributes:
label: What cog is your feature request for?
description: Specify the cog within the repository.
placeholder: E.g., Pterodactyl
placeholder: E.g., ModerationCog
validations:
required: true
- type: textarea

View file

@ -2,5 +2,5 @@
<!-- Create a new issue, if it doesn't exist yet -->
- [ ] By submitting this pull request, I permit [cswimr](https://www.coastalcommits.com/cswimr) to license my work under
the [Mozilla Public License Version 2.0](https://www.coastalcommits.com/cswimr/SeaCogs/src/branch/main/LICENSE).
- [ ] By submitting this pull request, I permit SeaswimmerTheFsh to license my work under
the [Mozilla Public License Version 2.0](https://www.coastalcommits.com/SeaswimmerTheFsh/SeaCogs/src/branch/main/LICENSE).

View file

@ -12,7 +12,6 @@
too-many-locals,
too-many-public-methods,
too-many-statements,
too-many-positional-arguments,
arguments-differ,
too-many-return-statements,
import-outside-toplevel,

View file

@ -2,46 +2,38 @@ name: Actions
on:
push:
branches:
- main
- 'main'
pull_request:
jobs:
lint:
name: Lint Code (Ruff & Pylint)
Lint Code (Ruff & Pylint):
runs-on: docker
container: www.coastalcommits.com/cswimr/actions:uv@sha256:211aaf7d9ac98087579ebf9fab87a9122f51b2697e3a3649ac9f4bd3b03b8e5d
container: www.coastalcommits.com/seaswimmerthefsh/actionscontainers-seacogs:latest
steps:
- name: Checkout
uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4
- name: Install python
run: uv python install 3.11
uses: actions/checkout@v3
- name: Install dependencies
run: uv sync
run: poetry install --with dev --no-root
- name: Analysing code with Ruff
run: uv run ruff check $(git ls-files '*.py')
run: ./.venv/bin/ruff check $(git ls-files '*.py')
continue-on-error: true
- name: Analysing code with Pylint
run: uv run pylint --rcfile=.forgejo/workflows/config/.pylintrc $(git ls-files '*.py')
run: ./.venv/bin/pylint --rcfile=.forgejo/workflows/config/.pylintrc $(git ls-files '*.py')
docs:
name: Build Documentation (MkDocs)
Build Documentation (MkDocs):
runs-on: docker
container: www.coastalcommits.com/cswimr/actions:docs@sha256:e405cd6b9b1182a570ddee32ed8dd1b2f899edc625d006c8b4b2f18c100e724f
container: www.coastalcommits.com/seaswimmerthefsh/actionscontainers-seacogs:latest
steps:
- name: Checkout
uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4
uses: actions/checkout@v3
with:
fetch-depth: 0
- name: Install python
run: uv python install 3.11
- name: Install dependencies
run: uv sync --no-dev --extra=documentation
run: poetry install --with docs --no-root
- name: Set environment variables
uses: actions/env@v2
@ -50,7 +42,7 @@ jobs:
run: |
export SITE_URL="https://$CI_ACTION_REF_NAME_SLUG.seacogs.coastalcommits.com"
export EDIT_URI="src/branch/$CI_ACTION_REF_NAME/.docs"
uv run mkdocs build -v
./.venv/bin/mkdocs build -v
- name: Deploy documentation
run: |
@ -66,7 +58,7 @@ jobs:
npx -p "@getmeli/cli" meli upload ./site \
--url "https://pages.coastalcommits.com" \
--site "${{ vars.MELI_SITE_ID }}" \
--token "${{ secrets.MELI_TOKEN }}" \
--token "${{ secrets.MELI_SECRET }}" \
--release "$CI_ACTION_REF_NAME_SLUG/${{ env.GITHUB_SHA }}" \
--branch "$CI_ACTION_REF_NAME_SLUG"

4
.gitignore vendored
View file

@ -1,7 +1,5 @@
.cache
.vscode
site
.venv
.data
__pycache__
.mypy_cache/
.ruff_cache/

15
.vscode/launch.json vendored
View file

@ -1,15 +0,0 @@
{
// Use IntelliSense to learn about possible attributes.
// Hover to view descriptions of existing attributes.
// For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387
"version": "0.2.0",
"configurations": [
{
"name": "Python: Red-DiscordBot",
"type": "debugpy",
"request": "launch",
"module": "redbot",
"args": ["local", "--dev", "-vvv", "--load-cogs=hotreload"]
}
]
}

32
.vscode/settings.json vendored
View file

@ -1,32 +0,0 @@
{
"[python]": {
"editor.codeActionsOnSave": {
"source.fixAll": "explicit"
},
"editor.defaultFormatter": "charliermarsh.ruff"
},
"[json]": {
"editor.defaultFormatter": "vscode.json-language-features"
},
"[jsonc]": {
"editor.defaultFormatter": "vscode.json-language-features"
},
"files.exclude": {
"**/.git": true,
"**/__pycache__": true,
"**/.ruff_cache": true,
"**/.mypy_cache": true
},
"python.analysis.diagnosticSeverityOverrides": {
"reportAttributeAccessIssue": false, // disabled because `commands.group.command` is listed as Any / Unknown for some reason
"reportCallIssue": "information"
},
"python.analysis.diagnosticMode": "workspace",
"python.analysis.supportDocstringTemplate": true,
"python.analysis.typeCheckingMode": "basic",
"python.analysis.typeEvaluation.enableReachabilityAnalysis": true,
"python.analysis.typeEvaluation.strictDictionaryInference": true,
"python.analysis.typeEvaluation.strictListInference": true,
"python.analysis.typeEvaluation.strictSetInference": true,
"editor.formatOnSave": true,
}

View file

@ -11,7 +11,7 @@ My assorted cogs for Red-DiscordBot.
To get started with a development environment, first clone this repository.
```sh
git clone https://coastalcommits.com/cswimr/SeaCogs.git
git clone https://coastalcommits.com/SeaswimmerTheFsh/SeaCogs.git
```
Then, install Poetry.

View file

@ -9,15 +9,14 @@ import discord
from red_commons.logging import getLogger
from redbot.core import commands
from redbot.core.bot import Config, Red
from redbot.core.utils.chat_formatting import bold, humanize_list
from redbot.core.utils.chat_formatting import humanize_list
class AntiPolls(commands.Cog):
"""AntiPolls deletes messages that contain polls, with a configurable per-guild role and channel whitelist and support for default Discord permissions (Manage Messages)."""
__author__ = ["[cswimr](https://www.coastalcommits.com/cswimr)"]
__git__ = "https://www.coastalcommits.com/cswimr/SeaCogs"
__version__ = "1.0.3"
__author__ = ["SeaswimmerTheFsh"]
__version__ = "1.0.0"
__documentation__ = "https://seacogs.coastalcommits.com/antipolls/"
def __init__(self, bot: Red):
@ -39,17 +38,17 @@ class AntiPolls(commands.Cog):
n = "\n" if "\n\n" not in pre_processed else ""
text = [
f"{pre_processed}{n}",
f"{bold('Cog Version:')} [{self.__version__}]({self.__git__})",
f"{bold('Author:')} {humanize_list(self.__author__)}",
f"{bold('Documentation:')} {self.__documentation__}",
f"Cog Version: **{self.__version__}**",
f"Author: {humanize_list(self.__author__)}",
f"Documentation: {self.__documentation__}",
]
return "\n".join(text)
async def red_delete_data_for_user(self, **kwargs): # pylint: disable=unused-argument
async def red_delete_data_for_user(self, **kwargs): # pylint: disable=unused-argument
"""Nothing to delete."""
return
@commands.Cog.listener("on_message")
@commands.Cog.listener('on_message')
async def polls_listener(self, message: discord.Message) -> None:
if message.guild is None:
return self.logger.verbose("Message in direct messages ignored")
@ -62,13 +61,13 @@ class AntiPolls(commands.Cog):
guild_config = await self.config.guild(message.guild).all()
if guild_config["manage_messages"] is True and message.author.guild_permissions.manage_messages:
if guild_config['manage_messages'] is True and message.author.guild_permissions.manage_messages:
return self.logger.verbose("Message from user with Manage Messages permission ignored")
if message.channel.id in guild_config["channel_whitelist"]:
if message.channel.id in guild_config['channel_whitelist']:
return self.logger.verbose("Message in whitelisted channel %s ignored", message.channel.id)
if any(role.id in guild_config["role_whitelist"] for role in message.author.roles):
if any(role.id in guild_config['role_whitelist'] for role in message.author.roles):
return self.logger.verbose("Message from whitelisted role %s ignored", message.author.roles)
if not message.content and not message.embeds and not message.attachments and not message.stickers:
@ -80,9 +79,9 @@ class AntiPolls(commands.Cog):
return self.logger.error("Failed to delete message: %s", e)
return self.logger.trace("Deleted poll message %s", message.id)
return self.logger.verbose("Message %s is not a poll, ignoring", message.id)
self.logger.verbose("Message %s is not a poll, ignoring", message.id)
@commands.group(name="antipolls", aliases=["ap"]) # type: ignore
@commands.group(name="antipolls", aliases=["ap"])
@commands.guild_only()
@commands.admin_or_permissions(manage_guild=True)
async def antipolls(self, ctx: commands.Context) -> None:
@ -95,8 +94,6 @@ class AntiPolls(commands.Cog):
@antipolls_roles.command(name="add")
async def antipolls_roles_add(self, ctx: commands.Context, *roles: discord.Role) -> None:
"""Add roles to the whitelist."""
assert ctx.guild is not None # using `assert` here and in the rest of this file to satisfy typecheckers
# this is safe because the commands are part of a guild-only command group
async with self.config.guild(ctx.guild).role_whitelist() as role_whitelist:
role_whitelist: list
failed: list[discord.Role] = []
@ -112,7 +109,6 @@ class AntiPolls(commands.Cog):
@antipolls_roles.command(name="remove")
async def antipolls_roles_remove(self, ctx: commands.Context, *roles: discord.Role) -> None:
"""Remove roles from the whitelist."""
assert ctx.guild is not None
async with self.config.guild(ctx.guild).role_whitelist() as role_whitelist:
role_whitelist: list
failed: list[discord.Role] = []
@ -126,14 +122,13 @@ class AntiPolls(commands.Cog):
await ctx.send(f"The following roles were not in the whitelist: {humanize_list([role.mention for role in failed])}", delete_after=10)
@antipolls_roles.command(name="list")
async def antipolls_roles_list(self, ctx: commands.Context) -> discord.Message:
async def antipolls_roles_list(self, ctx: commands.Context) -> None:
"""List roles in the whitelist."""
assert ctx.guild is not None
role_whitelist = await self.config.guild(ctx.guild).role_whitelist()
if not role_whitelist:
return await ctx.send("No roles in the whitelist.")
roles = [role for role in (ctx.guild.get_role(role) for role in role_whitelist) if role is not None]
return await ctx.send(humanize_list([role.mention for role in roles]))
roles = [ctx.guild.get_role(role) for role in role_whitelist]
await ctx.send(humanize_list([role.mention for role in roles]))
@antipolls.group(name="channels")
async def antipolls_channels(self, ctx: commands.Context) -> None:
@ -142,7 +137,6 @@ class AntiPolls(commands.Cog):
@antipolls_channels.command(name="add")
async def antipolls_channels_add(self, ctx: commands.Context, *channels: discord.TextChannel) -> None:
"""Add channels to the whitelist."""
assert ctx.guild is not None
async with self.config.guild(ctx.guild).channel_whitelist() as channel_whitelist:
channel_whitelist: list
failed: list[discord.TextChannel] = []
@ -158,7 +152,6 @@ class AntiPolls(commands.Cog):
@antipolls_channels.command(name="remove")
async def antipolls_channels_remove(self, ctx: commands.Context, *channels: discord.TextChannel) -> None:
"""Remove channels from the whitelist."""
assert ctx.guild is not None
async with self.config.guild(ctx.guild).channel_whitelist() as channel_whitelist:
channel_whitelist: list
failed: list[discord.TextChannel] = []
@ -172,21 +165,16 @@ class AntiPolls(commands.Cog):
await ctx.send(f"The following channels were not in the whitelist: {humanize_list([channel.mention for channel in failed])}", delete_after=10)
@antipolls_channels.command(name="list")
async def antipolls_channels_list(self, ctx: commands.Context) -> discord.Message:
async def antipolls_channels_list(self, ctx: commands.Context) -> None:
"""List channels in the whitelist."""
assert ctx.guild is not None
channel_whitelist = await self.config.guild(ctx.guild).channel_whitelist()
if not channel_whitelist:
return await ctx.send("No channels in the whitelist.")
channels = [channel for channel in (ctx.guild.get_channel(channel) for channel in channel_whitelist) if channel is not None]
for c in channels:
if not c:
channels.remove(c)
return await ctx.send(humanize_list([channel.mention for channel in channels]))
channels = [ctx.guild.get_channel(channel) for channel in channel_whitelist]
await ctx.send(humanize_list([channel.mention for channel in channels]))
@antipolls.command(name="managemessages")
async def antipolls_managemessages(self, ctx: commands.Context, enabled: bool) -> None:
"""Toggle Manage Messages permission check."""
assert ctx.guild is not None
await self.config.guild(ctx.guild).manage_messages.set(enabled)
await ctx.tick()

View file

@ -1,6 +1,6 @@
{
"author" : ["cswimr"],
"install_msg" : "Thank you for installing AntiPolls!\nYou can find the source code of this cog [here](https://coastalcommits.com/cswimr/SeaCogs).",
"author" : ["SeaswimmerTheFsh (seasw.)"],
"install_msg" : "Thank you for installing AntiPolls!\nYou can find the source code of this cog [here](https://coastalcommits.com/SeaswimmerTheFsh/SeaCogs).",
"name" : "AntiPolls",
"short" : "AntiPolls deletes messages that contain polls.",
"description" : "AntiPolls deletes messages that contain polls, with a configurable per-guild role and channel whitelist and support for default Discord permissions (Manage Messages).",

View file

@ -39,7 +39,7 @@ class Aurora(commands.Cog):
It is heavily inspired by GalacticBot, and is designed to be a more user-friendly alternative to Red's core Mod cogs.
This cog stores all of its data in an SQLite database."""
__author__ = ["cswimr"]
__author__ = ["SeaswimmerTheFsh"]
__version__ = "2.1.3"
__documentation__ = "https://seacogs.coastalcommits.com/aurora/"

View file

@ -1,6 +1,6 @@
{
"author" : ["cswimr"],
"install_msg" : "Thank you for installing Aurora!\nMost of this cog's functionality requires enabling slash commands.\nYou can find the source code of this cog [here](https://coastalcommits.com/cswimr/SeaCogs).",
"author" : ["SeaswimmerTheFsh (seasw.)"],
"install_msg" : "Thank you for installing Aurora!\nMost of this cog's functionality requires enabling slash commands.\nYou can find the source code of this cog [here](https://coastalcommits.com/SeaswimmerTheFsh/SeaCogs).",
"name" : "Aurora",
"short" : "A full replacement for Red's core Mod cogs.",
"description" : "Aurora is a fully-featured moderation system. It is heavily inspired by GalacticBot, and is designed to be a more user-friendly alternative to Red's core Mod cogs. This cog stores all of its data in an SQLite database.",

View file

@ -14,19 +14,15 @@ from redbot.cogs.downloader import errors
from redbot.cogs.downloader.converters import InstalledCog
from redbot.core import commands
from redbot.core.bot import Red
from redbot.core.utils.chat_formatting import bold, error, humanize_list, text_to_file
from redbot.core.utils.chat_formatting import error, humanize_list, text_to_file
# Disable Ruff & Pylint complaining about accessing private members
# That's kind of necessary for this cog to function because the Downloader cog has a limited public API
# ruff: noqa: SLF001 # Private member access
# pylint: disable=protected-access
class Backup(commands.Cog):
"""A utility to make reinstalling repositories and cogs after migrating the bot far easier."""
__author__ = ["[cswimr](https://www.coastalcommits.com/cswimr)"]
__git__ = "https://www.coastalcommits.com/cswimr/SeaCogs"
__version__ = "1.1.3"
__author__ = ["SeaswimmerTheFsh"]
__version__ = "1.1.0"
__documentation__ = "https://seacogs.coastalcommits.com/backup/"
def __init__(self, bot: Red):
@ -39,25 +35,28 @@ class Backup(commands.Cog):
n = "\n" if "\n\n" not in pre_processed else ""
text = [
f"{pre_processed}{n}",
f"{bold('Cog Version:')} [{self.__version__}]({self.__git__})",
f"{bold('Author:')} {humanize_list(self.__author__)}",
f"{bold('Documentation:')} {self.__documentation__}",
f"Cog Version: **{self.__version__}**",
f"Author: {humanize_list(self.__author__)}",
f"Documentation: {self.__documentation__}",
]
return "\n".join(text)
@commands.group(autohelp=True) # type: ignore
@commands.group(autohelp=True)
@commands.is_owner()
async def backup(self, ctx: commands.Context) -> None:
async def backup(self, ctx: commands.Context):
"""Backup your installed cogs."""
pass
@backup.command(name="export")
@commands.is_owner()
async def backup_export(self, ctx: commands.Context) -> None:
async def backup_export(self, ctx: commands.Context):
"""Export your installed repositories and cogs to a file."""
downloader = ctx.bot.get_cog("Downloader")
if downloader is None:
await ctx.send(error(f"You do not have the `Downloader` cog loaded. Please run `{ctx.prefix}load downloader` and try again."))
await ctx.send(
error(
f"You do not have the `Downloader` cog loaded. Please run `{ctx.prefix}load downloader` and try again."
)
)
return
all_repos = list(downloader._repo_manager.repos)
@ -78,7 +77,7 @@ class Backup(commands.Cog):
if cog.repo_name == repo.name:
cog_dict = {
"name": cog.name,
# "loaded": cog.name in ctx.bot.extensions.keys(), # noqa: ERA001
# "loaded": cog.name in ctx.bot.extensions.keys(),
# this functionality was planned but never implemented due to Red limitations
# and the possibility of restoration functionality being added to Core
"pinned": cog.pinned,
@ -88,24 +87,30 @@ class Backup(commands.Cog):
export_data.append(repo_dict)
await ctx.send(file=text_to_file(json.dumps(export_data, indent=4), "backup.json"))
await ctx.send(
file=text_to_file(json.dumps(export_data, indent=4), "backup.json")
)
@backup.command(name="import")
@commands.is_owner()
async def backup_import(self, ctx: commands.Context) -> None:
async def backup_import(self, ctx: commands.Context):
"""Import your installed repositories and cogs from an export file."""
try:
export = json.loads(await ctx.message.attachments[0].read())
except (json.JSONDecodeError, IndexError):
try:
export = json.loads(await ctx.message.reference.resolved.attachments[0].read()) # type: ignore - this is fine to let error because it gets handled
export = json.loads(await ctx.message.reference.resolved.attachments[0].read())
except (json.JSONDecodeError, IndexError, AttributeError):
await ctx.send(error("Please provide a valid JSON export file."))
return
downloader = ctx.bot.get_cog("Downloader")
if downloader is None:
await ctx.send(error(f"You do not have the `Downloader` cog loaded. Please run `{ctx.prefix}load downloader` and try again."))
await ctx.send(
error(
f"You do not have the `Downloader` cog loaded. Please run `{ctx.prefix}load downloader` and try again."
)
)
return
repo_s = []
@ -127,20 +132,32 @@ class Backup(commands.Cog):
repo_e.append("PyLav cogs are not supported.")
continue
if name.startswith(".") or name.endswith("."):
repo_e.append(f"Invalid repository name: {name}\nRepository names cannot start or end with a dot.")
repo_e.append(
f"Invalid repository name: {name}\nRepository names cannot start or end with a dot."
)
continue
if re.match(r"^[a-zA-Z0-9_\-\.]+$", name) is None:
repo_e.append(f"Invalid repository name: {name}\nRepository names may only contain letters, numbers, underscores, hyphens, and dots.")
repo_e.append(
f"Invalid repository name: {name}\nRepository names may only contain letters, numbers, underscores, hyphens, and dots."
)
continue
try:
repository = await downloader._repo_manager.add_repo(url, name, branch)
repo_s.append(f"Added repository {name} from {url} on branch {branch}.")
self.logger.debug("Added repository %s from %s on branch %s", name, url, branch)
repository = await downloader._repo_manager.add_repo(
url, name, branch
)
repo_s.append(
f"Added repository {name} from {url} on branch {branch}."
)
self.logger.debug(
"Added repository %s from %s on branch %s", name, url, branch
)
except errors.ExistingGitRepo:
repo_e.append(f"Repository {name} already exists.")
repository = downloader._repo_manager.get_repo(name)
repository = downloader._repo_manager.get_repo(
name
)
self.logger.debug("Repository %s already exists", name)
except errors.AuthenticationError as err:
@ -154,7 +171,9 @@ class Backup(commands.Cog):
continue
except errors.CloningError as err:
repo_e.append(f"Cloning error while adding repository {name}. See logs for more information.")
repo_e.append(
f"Cloning error while adding repository {name}. See logs for more information."
)
self.logger.exception(
"Something went wrong whilst cloning %s (to revision %s)",
url,
@ -164,7 +183,9 @@ class Backup(commands.Cog):
continue
except OSError:
repo_e.append(f"OS error while adding repository {name}. See logs for more information.")
repo_e.append(
f"OS error while adding repository {name}. See logs for more information."
)
self.logger.exception(
"Something went wrong trying to add repo %s under name %s",
url,
@ -175,7 +196,7 @@ class Backup(commands.Cog):
cog_modules = []
for cog in cogs:
# If you're forking this cog, make sure to change these strings!
if cog["name"] == "backup" and "cswimr/SeaCogs" in url:
if cog["name"] == "backup" and "SeaswimmerTheFsh/SeaCogs" in url:
continue
try:
cog_module = await InstalledCog.convert(ctx, cog["name"])
@ -184,19 +205,23 @@ class Backup(commands.Cog):
continue
cog_modules.append(cog_module)
for cog in {cog.name for cog in cog_modules}:
for cog in set(cog.name for cog in cog_modules):
poss_installed_path = (await downloader.cog_install_path()) / cog
if poss_installed_path.exists():
with contextlib.suppress(commands.ExtensionNotLoaded):
await ctx.bot.unload_extension(cog)
await ctx.bot.remove_loaded_package(cog)
await downloader._delete_cog(poss_installed_path)
await downloader._delete_cog(
poss_installed_path
)
uninstall_s.append(f"Uninstalled {cog}")
self.logger.debug("Uninstalled %s", cog)
else:
uninstall_e.append(f"Failed to uninstall {cog}")
self.logger.warning("Failed to uninstall %s", cog)
await downloader._remove_from_installed(cog_modules)
await downloader._remove_from_installed(
cog_modules
)
for cog in cogs:
cog_name = cog["name"]
@ -207,18 +232,28 @@ class Backup(commands.Cog):
commit = None
# If you're forking this cog, make sure to change these strings!
if cog_name == "backup" and "cswimr/SeaCogs" in url:
if cog_name == "backup" and "SeaswimmerTheFsh/SeaCogs" in url:
continue
async with repository.checkout(commit, exit_to_rev=repository.branch):
cogs_c, message = await downloader._filter_incorrect_cogs_by_names(repository, [cog_name])
async with repository.checkout(
commit, exit_to_rev=repository.branch
):
cogs_c, message = (
await downloader._filter_incorrect_cogs_by_names(
repository, [cog_name]
)
)
if not cogs_c:
install_e.append(message)
self.logger.error(message)
continue
failed_reqs = await downloader._install_requirements(cogs_c)
failed_reqs = await downloader._install_requirements(
cogs_c
)
if failed_reqs:
install_e.append(f"Failed to install {cog_name} due to missing requirements: {failed_reqs}")
install_e.append(
f"Failed to install {cog_name} due to missing requirements: {failed_reqs}"
)
self.logger.error(
"Failed to install %s due to missing requirements: %s",
cog_name,
@ -226,37 +261,51 @@ class Backup(commands.Cog):
)
continue
installed_cogs, failed_cogs = await downloader._install_cogs(cogs_c)
installed_cogs, failed_cogs = await downloader._install_cogs(
cogs_c
)
if repository.available_libraries:
installed_libs, failed_libs = await repository.install_libraries(
target_dir=downloader.SHAREDLIB_PATH,
req_target_dir=downloader.LIB_PATH,
installed_libs, failed_libs = (
await repository.install_libraries(
target_dir=downloader.SHAREDLIB_PATH,
req_target_dir=downloader.LIB_PATH,
)
)
else:
installed_libs = None
failed_libs = None
if cog_pinned:
for cog in installed_cogs: # noqa: PLW2901
for cog in installed_cogs:
cog.pinned = True
await downloader._save_to_installed(installed_cogs + installed_libs if installed_libs else installed_cogs)
await downloader._save_to_installed(
installed_cogs + installed_libs
if installed_libs
else installed_cogs
)
if installed_cogs:
installed_cog_name = installed_cogs[0].name
install_s.append(f"Installed {installed_cog_name}")
self.logger.debug("Installed %s", installed_cog_name)
if installed_libs:
for lib in installed_libs:
install_s.append(f"Installed {lib.name} required for {cog_name}")
self.logger.debug("Installed %s required for %s", lib.name, cog_name)
install_s.append(
f"Installed {lib.name} required for {cog_name}"
)
self.logger.debug(
"Installed %s required for %s", lib.name, cog_name
)
if failed_cogs:
failed_cog_name = failed_cogs[0].name
install_e.append(f"Failed to install {failed_cog_name}")
self.logger.error("Failed to install %s", failed_cog_name)
if failed_libs:
for lib in failed_libs:
install_e.append(f"Failed to install {lib.name} required for {cog_name}")
install_e.append(
f"Failed to install {lib.name} required for {cog_name}"
)
self.logger.error(
"Failed to install %s required for %s",
lib.name,

View file

@ -1,21 +1,15 @@
{
"author": [
"cswimr"
],
"install_msg": "Thank you for installing Backup!\nYou can find the source code of this cog [here](https://coastalcommits.com/cswimr/SeaCogs).",
"name": "Backup",
"short": "A utility to make reinstalling repositories and cogs after migrating the bot far easier.",
"description": "A utility to make reinstalling repositories and cogs after migrating the bot far easier.",
"end_user_data_statement": "This cog does not store end user data.",
"author" : ["SeaswimmerTheFsh (seasw.)"],
"install_msg" : "Thank you for installing Backup!\nYou can find the source code of this cog [here](https://coastalcommits.com/SeaswimmerTheFsh/SeaCogs).",
"name" : "Backup",
"short" : "A utility to make reinstalling repositories and cogs after migrating the bot far easier.",
"description" : "A utility to make reinstalling repositories and cogs after migrating the bot far easier.",
"end_user_data_statement" : "This cog does not store end user data.",
"hidden": false,
"disabled": false,
"min_bot_version": "3.5.6",
"max_bot_version": "3.5.14",
"min_python_version": [
3,
9,
0
],
"max_bot_version": "3.5.10",
"min_python_version": [3, 9, 0],
"tags": [
"utility",
"backup",

View file

@ -6,7 +6,6 @@
# |_____/ \___|\__,_|___/ \_/\_/ |_|_| |_| |_|_| |_| |_|\___|_|
import random
from asyncio import create_task
from io import BytesIO
import aiohttp
@ -16,7 +15,7 @@ from PIL import Image
from red_commons.logging import getLogger
from redbot.core import Config, commands, data_manager
from redbot.core.bot import Red
from redbot.core.utils.chat_formatting import bold, error, humanize_list
from redbot.core.utils.chat_formatting import error, humanize_list
import bible.errors
from bible.models import Version
@ -25,31 +24,29 @@ from bible.models import Version
class Bible(commands.Cog):
"""Retrieve Bible verses from the API.bible API."""
__author__ = ["[cswimr](https://www.coastalcommits.com/cswimr)"]
__git__ = "https://www.coastalcommits.com/cswimr/SeaCogs"
__version__ = "1.1.4"
__documentation__ = "https://seacogs.coastalcommits.com/pterodactyl/"
__author__ = ["SeaswimmerTheFsh"]
__version__ = "1.1.0"
__documentation__ = "https://seacogs.coastalcommits.com/bible/"
def __init__(self, bot: Red):
super().__init__()
self.bot = bot
self.session = aiohttp.ClientSession()
self.config = Config.get_conf(self, identifier=481923957134912, force_registration=True)
self.config = Config.get_conf(
self, identifier=481923957134912, force_registration=True
)
self.logger = getLogger("red.SeaCogs.Bible")
self.config.register_global(bible="de4e12af7f28f599-02")
self.config.register_user(bible=None)
async def cog_unload(self):
create_task(self.session.close())
def format_help_for_context(self, ctx: commands.Context) -> str:
pre_processed = super().format_help_for_context(ctx) or ""
n = "\n" if "\n\n" not in pre_processed else ""
text = [
f"{pre_processed}{n}",
f"{bold('Cog Version:')} [{self.__version__}]({self.__git__})",
f"{bold('Author:')} {humanize_list(self.__author__)}",
f"{bold('Documentation:')} {self.__documentation__}",
f"Cog Version: **{self.__version__}**",
f"Author: {humanize_list(self.__author__)}",
f"Documentation: {self.__documentation__}",
]
return "\n".join(text)
@ -59,7 +56,7 @@ class Bible(commands.Cog):
image = Image.open(image_path)
image = image.convert("RGBA")
data = np.array(image)
red, green, blue, alpha = data.T # pylint: disable=unused-variable
red, green, blue, alpha = data.T # pylint: disable=unused-variable
white_areas = (red == 255) & (blue == 255) & (green == 255)
data[..., :-1][white_areas.T] = color.to_rgb()
image = Image.fromarray(data)
@ -71,7 +68,9 @@ class Bible(commands.Cog):
async def translate_book_name(self, bible_id: str, book_name: str) -> str:
"""Translate a book name to a book ID."""
book_name_list = [w.lower() if w.lower() == "of" else w.title() for w in book_name.split()]
book_name_list = [
w.lower() if w.lower() == "of" else w.title() for w in book_name.split()
]
book_name = " ".join(book_name_list)
books = await self._get_books(bible_id)
for book in books:
@ -91,20 +90,20 @@ class Bible(commands.Cog):
response.status,
)
if response.status == 401:
raise bible.errors.UnauthorizedError
raise bible.errors.Unauthorized()
if response.status == 403:
raise bible.errors.BibleAccessError
raise bible.errors.BibleAccessError()
if response.status == 503:
raise bible.errors.ServiceUnavailableError
raise bible.errors.ServiceUnavailable()
return Version(
bible_id=bible_id,
abbreviation=data["data"]["abbreviation"],
language=data["data"]["language"]["name"],
abbreviation_local=data["data"]["abbreviationLocal"],
language_local=data["data"]["language"]["nameLocal"],
description=data["data"]["description"],
description_local=data["data"]["descriptionLocal"],
version_copyright=data["data"]["copyright"],
bible_id,
data["data"]["abbreviation"],
data["data"]["language"]["name"],
data["data"]["abbreviationLocal"],
data["data"]["language"]["nameLocal"],
data["data"]["description"],
data["data"]["descriptionLocal"],
data["data"]["copyright"],
)
async def _get_passage(
@ -135,17 +134,16 @@ class Bible(commands.Cog):
response.status,
)
if response.status == 400:
raise bible.errors.InexplicableError
raise bible.errors.InexplicableError()
if response.status == 401:
raise bible.errors.UnauthorizedError
raise bible.errors.Unauthorized()
if response.status == 403:
raise bible.errors.BibleAccessError
raise bible.errors.BibleAccessError()
if response.status == 404:
raise bible.errors.NotFoundError
raise bible.errors.NotFound()
if response.status == 503:
raise bible.errors.ServiceUnavailableError
raise bible.errors.ServiceUnavailable()
assert self.bot.user is not None # bot will always be logged in
fums_url = "https://fums.api.bible/f3"
fums_params = {
"t": data["meta"]["fumsToken"],
@ -177,11 +175,11 @@ class Bible(commands.Cog):
response.status,
)
if response.status == 401:
raise bible.errors.UnauthorizedError
raise bible.errors.Unauthorized()
if response.status == 403:
raise bible.errors.BibleAccessError
raise bible.errors.BibleAccessError()
if response.status == 503:
raise bible.errors.ServiceUnavailableError
raise bible.errors.ServiceUnavailable()
return data["data"]
async def _get_chapters(self, bible_id: str, book_id: str) -> dict:
@ -196,11 +194,11 @@ class Bible(commands.Cog):
response.status,
)
if response.status == 401:
raise bible.errors.UnauthorizedError
raise bible.errors.Unauthorized()
if response.status == 403:
raise bible.errors.BibleAccessError
raise bible.errors.BibleAccessError()
if response.status == 503:
raise bible.errors.ServiceUnavailableError
raise bible.errors.ServiceUnavailable()
return data["data"]
async def _get_verses(self, bible_id: str, book_id: str, chapter: int) -> dict:
@ -215,11 +213,11 @@ class Bible(commands.Cog):
response.status,
)
if response.status == 401:
raise bible.errors.UnauthorizedError
raise bible.errors.Unauthorized()
if response.status == 403:
raise bible.errors.BibleAccessError
raise bible.errors.BibleAccessError()
if response.status == 503:
raise bible.errors.ServiceUnavailableError
raise bible.errors.ServiceUnavailable()
return data["data"]
@commands.group(autohelp=True)
@ -247,34 +245,41 @@ class Bible(commands.Cog):
from_verse, to_verse = passage.replace(":", ".").split("-")
if "." not in to_verse:
to_verse = f"{from_verse.split('.')[0]}.{to_verse}"
retrieved_passage = await self._get_passage(ctx, bible_id, f"{book_id}.{from_verse}-{book_id}.{to_verse}", True)
passage = await self._get_passage(
ctx, bible_id, f"{book_id}.{from_verse}-{book_id}.{to_verse}", True
)
else:
retrieved_passage = await self._get_passage(ctx, bible_id, f"{book_id}.{passage.replace(':', '.')}", False)
passage = await self._get_passage(
ctx, bible_id, f"{book_id}.{passage.replace(':', '.')}", False
)
except (
bible.errors.BibleAccessError,
bible.errors.NotFoundError,
bible.errors.NotFound,
bible.errors.InexplicableError,
bible.errors.ServiceUnavailableError,
bible.errors.UnauthorizedError,
bible.errors.ServiceUnavailable,
bible.errors.Unauthorized,
) as e:
await ctx.send(e.message)
return
if len(retrieved_passage["content"]) > 4096:
if len(passage["content"]) > 4096:
await ctx.send("The passage is too long to send.")
return
if await ctx.embed_requested():
icon = self.get_icon(await ctx.embed_color())
embed = Embed(
title=f"{retrieved_passage['reference']}",
description=retrieved_passage["content"].replace("", ""),
title=f"{passage['reference']}",
description=passage["content"].replace("", ""),
color=await ctx.embed_color(),
)
embed.set_footer(text=f"{ctx.prefix}bible passage - Powered by API.Bible - {version.abbreviation_local} ({version.language_local}, {version.description_local})", icon_url="attachment://icon.png")
embed.set_footer(
text=f"{ctx.prefix}bible passage - Powered by API.Bible - {version.abbreviationLocal} ({version.languageLocal}, {version.descriptionLocal})",
icon_url="attachment://icon.png"
)
await ctx.send(embed=embed, file=icon)
else:
await ctx.send(f"## {retrieved_passage['reference']}\n{retrieved_passage['content']}")
await ctx.send(f"## {passage['reference']}\n{passage['content']}")
@bible.command(name="random")
async def bible_random(self, ctx: commands.Context):
@ -295,10 +300,10 @@ class Bible(commands.Cog):
passage = await self._get_passage(ctx, bible_id, verse, False)
except (
bible.errors.BibleAccessError,
bible.errors.NotFoundError,
bible.errors.NotFound,
bible.errors.InexplicableError,
bible.errors.ServiceUnavailableError,
bible.errors.UnauthorizedError,
bible.errors.ServiceUnavailable,
bible.errors.Unauthorized,
) as e:
await ctx.send(e.message)
return
@ -310,7 +315,10 @@ class Bible(commands.Cog):
description=passage["content"].replace("", ""),
color=await ctx.embed_color(),
)
embed.set_footer(text=f"{ctx.prefix}bible random - Powered by API.Bible - {version.abbreviation_local} ({version.language_local}, {version.description_local})", icon_url="attachment://icon.png")
embed.set_footer(
text=f"{ctx.prefix}bible random - Powered by API.Bible - {version.abbreviationLocal} ({version.languageLocal}, {version.descriptionLocal})",
icon_url="attachment://icon.png"
)
await ctx.send(embed=embed, file=icon)
else:
await ctx.send(f"## {passage['reference']}\n{passage['content']}")

View file

@ -4,22 +4,26 @@ from redbot.core.utils.chat_formatting import error
class BibleAccessError(Exception):
def __init__(
self,
message: str = error("The provided API key cannot retrieve sections from the configured Bible. Please report this to the bot owner."),
message: str = error(
"The provided API key cannot retrieve sections from the configured Bible. Please report this to the bot owner."
),
):
super().__init__(message)
self.message = message
class UnauthorizedError(Exception):
class Unauthorized(Exception):
def __init__(
self,
message: str = error("The API key for API.Bible is missing or invalid. Please report this to the bot owner.\nIf you are the bot owner, please check the documentation [here](<https://seacogs.coastalcommits.com/bible/#setup>)."),
message: str = error(
"The API key for API.Bible is missing or invalid. Please report this to the bot owner.\nIf you are the bot owner, please check the documentation [here](<https://seacogs.coastalcommits.com/bible/#setup>)."
),
):
super().__init__(message)
self.message = message
class NotFoundError(Exception):
class NotFound(Exception):
def __init__(
self,
message: str = error("The requested passage was not found."),
@ -28,7 +32,7 @@ class NotFoundError(Exception):
self.message = message
class ServiceUnavailableError(Exception):
class ServiceUnavailable(Exception):
def __init__(
self,
message: str = error("The API.Bible service is currently unavailable."),
@ -40,7 +44,9 @@ class ServiceUnavailableError(Exception):
class InexplicableError(Exception):
def __init__(
self,
message: str = error("An inexplicable 'Bad Request' error occurred. This error happens occasionally with the API.Bible service. Please try again. If the error persists, please report this to the bot owner."),
message: str = error(
"An inexplicable 'Bad Request' error occurred. This error happens occassionally with the API.Bible service. Please try again. If the error persists, please report this to the bot owner."
),
):
super().__init__(message)
self.message = message

View file

@ -1,6 +1,6 @@
{
"author" : ["cswimr"],
"install_msg" : "Thank you for installing Bible!\nThis cog requires setting an API key for API.Bible. Please read the [documentation](https://seacogs.coastalcommits.com/bible/#setup) for more information.\nYou can find the source code of this cog [here](https://coastalcommits.com/cswimr/SeaCogs).",
"author" : ["SeaswimmerTheFsh (seasw.)"],
"install_msg" : "Thank you for installing Bible!\nThis cog requires setting an API key for API.Bible. Please read the [documentation](https://seacogs.coastalcommits.com/bible/#setup) for more information.\nYou can find the source code of this cog [here](https://coastalcommits.com/SeaswimmerTheFsh/SeaCogs).",
"name" : "Bible",
"short" : "Retrieve Bible verses from API.Bible.",
"description" : "Retrieve Bible verses from the API.Bible API. This cog requires an API.Bible api key.",

View file

@ -4,23 +4,23 @@ class Version:
bible_id,
abbreviation,
language,
abbreviation_local,
language_local,
abbreviationLocal,
languageLocal,
description,
description_local,
descriptionLocal,
version_copyright,
):
self.bible_id = bible_id
self.abbreviation = abbreviation
self.language = language
self.abbreviation_local = abbreviation_local
self.language_local = language_local
self.abbreviationLocal = abbreviationLocal
self.languageLocal = languageLocal
self.description = description
self.description_local = description_local
self.descriptionLocal = descriptionLocal
self.copyright = version_copyright
def __str__(self):
return self.abbreviation_local
return self.abbreviationLocal
def __repr__(self):
return f'bible.models.Version("{self.bible_id}", "{self.abbreviation}", "{self.language}", "{self.abbreviation_local}", "{self.language_local}", "{self.description}", "{self.description_local}", "{self.copyright}")'
return f'bible.models.Version("{self.bible_id}", "{self.abbreviation}", "{self.language}", "{self.abbreviationLocal}", "{self.languageLocal}", "{self.description}", "{self.descriptionLocal}", "{self.copyright}")'

View file

@ -1,4 +1,5 @@
import io
from typing import Any, Literal
import aiohttp
import discord
@ -14,24 +15,23 @@ from .model import PartialEmoji
class EmojiInfo(commands.Cog):
"""Retrieve information about emojis."""
__author__ = ["[cswimr](https://www.coastalcommits.com/cswimr)"]
__git__ = "https://www.coastalcommits.com/cswimr/SeaCogs"
__version__ = "1.0.3"
__documentation__ = "https://seacogs.coastalcommits.com/emojiinfo/"
__author__: list[str] = ["SeaswimmerTheFsh"]
__version__: str = "1.0.0"
__documentation__: str = "https://seacogs.coastalcommits.com/emojiinfo/"
def __init__(self, bot: Red) -> None:
super().__init__()
self.bot: Red = bot
self.logger: RedTraceLogger = getLogger(name="red.SeaCogs.EmojiInfo")
self.logger: RedTraceLogger = getLogger(name="red.SeaCogs.Emoji")
def format_help_for_context(self, ctx: commands.Context) -> str:
pre_processed = super().format_help_for_context(ctx) or ""
n = "\n" if "\n\n" not in pre_processed else ""
text = [
pre_processed: Any | Literal[''] = super().format_help_for_context(ctx) or ""
n: Literal['\n'] | Literal[''] = "\n" if "\n\n" not in pre_processed else ""
text: list[str] = [
f"{pre_processed}{n}",
f"{bold('Cog Version:')} [{self.__version__}]({self.__git__})",
f"{bold('Author:')} {humanize_list(self.__author__)}",
f"{bold('Documentation:')} {self.__documentation__}",
f"Cog Version: **{self.__version__}**",
f"Author: {humanize_list(items=self.__author__)}",
f"Documentation: {self.__documentation__}",
]
return "\n".join(text)
@ -40,7 +40,8 @@ class EmojiInfo(commands.Cog):
emoji_codepoint = "-".join([hex(ord(char))[2:] for char in unicode_emoji])
segments = emoji_codepoint.split("-")
valid_segments = [seg for seg in segments if len(seg) >= 4]
return f"{base_url}{valid_segments[0]}.png"
emoji_url = f"{base_url}{valid_segments[0]}.png"
return emoji_url
async def fetch_primary_color(self, emoji_url: str) -> discord.Color | None:
async with aiohttp.ClientSession() as session:
@ -49,7 +50,8 @@ class EmojiInfo(commands.Cog):
return None
image = await response.read()
dominant_color = ColorThief(io.BytesIO(image)).get_color(quality=1)
return discord.Color.from_rgb(*dominant_color)
color = discord.Color.from_rgb(*dominant_color)
return color
async def get_emoji_info(self, emoji: PartialEmoji) -> tuple[str, str]:
if emoji.is_unicode_emoji():
@ -69,51 +71,59 @@ class EmojiInfo(commands.Cog):
else:
emoji_id = ""
markdown = f"`{emoji}`"
name = f"{bold('Name:')} {emoji.aliases.pop(0) if emoji.aliases else emoji.name}\n"
name = f"{bold('Name:')} {emoji.aliases.pop(0)}\n"
aliases = f"{bold('Aliases:')} {', '.join(emoji.aliases)}\n" if emoji.aliases else ""
group = f"{bold('Group:')} {emoji.group}\n"
return (f"{name}{emoji_id}{bold('Native:')} {emoji.is_unicode_emoji()}\n{group}{aliases}{bold('Animated:')} {emoji.animated}\n{bold('Markdown:')} {markdown}\n{bold('URL:')} [Click Here]({emoji_url})"), emoji_url
return (
f"{name}"
f"{emoji_id}"
f"{bold('Native:')} {emoji.is_unicode_emoji()}\n"
f"{group}"
f"{aliases}"
f"{bold('Animated:')} {emoji.animated}\n"
f"{bold('Markdown:')} {markdown}\n"
f"{bold('URL:')} [Click Here]({emoji_url})"
), emoji_url
@app_commands.command(name="emoji")
@app_commands.describe(emoji="What emoji would you like to get information on?", ephemeral="Would you like the response to be hidden?")
@app_commands.describe(
emoji="What emoji would you like to get information on?",
ephemeral="Would you like the response to be hidden?"
)
async def emoji_slash(self, interaction: discord.Interaction, emoji: str, ephemeral: bool = True) -> None:
"""Retrieve information about an emoji."""
await interaction.response.defer(ephemeral=ephemeral)
try:
retrieved_emoji: PartialEmoji = PartialEmoji.from_str(self, value=emoji)
string, emoji_url = await self.get_emoji_info(retrieved_emoji)
emoji: PartialEmoji = PartialEmoji.from_str(self, value=emoji)
string, emoji_url, = await self.get_emoji_info(emoji)
self.logger.verbose(f"Emoji:\n{string}")
except (IndexError, UnboundLocalError):
return await interaction.followup.send("Please provide a valid emoji!")
assert isinstance(interaction.channel, discord.TextChannel)
if await self.bot.embed_requested(channel=interaction.channel):
embed = discord.Embed(title="Emoji Information", description=string, color=await self.fetch_primary_color(emoji_url) or await self.bot.get_embed_color(interaction.channel))
embed = embed = discord.Embed(title="Emoji Information", description=string, color = await self.fetch_primary_color(emoji_url) or await self.bot.get_embed_color(interaction.channel))
embed.set_thumbnail(url=emoji_url)
await interaction.followup.send(embed=embed)
return None
await interaction.followup.send(content=string)
return None
else:
await interaction.followup.send(content=string)
@commands.command(name="emoji")
async def emoji(self, ctx: commands.Context, *, emoji: str) -> None:
"""Retrieve information about an emoji."""
try:
retrieved_emoji: PartialEmoji = PartialEmoji.from_str(self, value=emoji)
string, emoji_url = await self.get_emoji_info(retrieved_emoji)
emoji: PartialEmoji = PartialEmoji.from_str(self, value=emoji)
string, emoji_url, = await self.get_emoji_info(emoji)
self.logger.verbose(f"Emoji:\n{string}")
except (IndexError, UnboundLocalError):
await ctx.send("Please provide a valid emoji!")
return
return await ctx.send("Please provide a valid emoji!")
if await ctx.embed_requested():
embed = discord.Embed(title="Emoji Information", description=string, color=await self.fetch_primary_color(emoji_url) or await ctx.embed_color())
embed = embed = discord.Embed(title="Emoji Information", description=string, color = await self.fetch_primary_color(emoji_url) or await ctx.embed_color)
embed.set_thumbnail(url=emoji_url)
await ctx.send(embed=embed)
return
await ctx.send(content=string)
return
else:
await ctx.send(content=string)

View file

@ -1,5 +1,5 @@
{
"author" : ["cswimr"],
"author" : ["SeaswimmerTheFsh (seasw.)"],
"install_msg" : "Thank you for installing Emoji!",
"name" : "Emoji",
"short" : "Retrieve information about emojis.",

View file

@ -39,7 +39,7 @@ class PartialEmoji(discord.PartialEmoji):
The group name of the emoji if it is a native emoji.
"""
def __init__(self, *, name: str, animated: bool = False, id: int | None = None, group: str | None = None, aliases: list | None = None) -> None: # pylint: disable=redefined-builtin # noqa: A002
def __init__(self, *, name: str, animated: bool = False, id: int | None = None, group: str | None = None, aliases: list | None = None) -> None: # pylint: disable=redefined-builtin
super().__init__(name=name, animated=animated, id=id)
self.group = group
self.aliases = aliases
@ -72,12 +72,12 @@ class PartialEmoji(discord.PartialEmoji):
match = cls._CUSTOM_EMOJI_RE.match(value)
if match is not None:
groups = match.groupdict()
animated = bool(groups["animated"])
emoji_id = int(groups["id"])
name = groups["name"]
animated = bool(groups['animated'])
emoji_id = int(groups['id'])
name = groups['name']
return cls(name=name, animated=animated, id=emoji_id)
path = data_manager.bundled_data_path(coginstance) / "emojis.json"
path: data_manager.Path = data_manager.bundled_data_path(coginstance) / "emojis.json"
with open(path, "r", encoding="UTF-8") as file:
emojis: dict = json.load(file)
emoji_aliases = []

View file

@ -1,5 +0,0 @@
from .hotreload import HotReload
async def setup(bot):
await bot.add_cog(HotReload(bot))

View file

@ -1,191 +0,0 @@
import py_compile
from asyncio import run_coroutine_threadsafe
from pathlib import Path
from tempfile import NamedTemporaryFile
from typing import Generator, List, Sequence
import discord
from red_commons.logging import RedTraceLogger, getLogger
from redbot.core import Config, checks, commands
from redbot.core.bot import Red
from redbot.core.core_commands import CoreLogic
from redbot.core.utils.chat_formatting import bold, box, humanize_list
from typing_extensions import override
from watchdog.events import FileSystemEvent, FileSystemMovedEvent, RegexMatchingEventHandler
from watchdog.observers import Observer
from watchdog.observers.api import BaseObserver
class HotReload(commands.Cog):
"""Automatically reload cogs in local cog paths on file change."""
__author__ = ["[cswimr](https://www.coastalcommits.com/cswimr)"]
__git__ = "https://www.coastalcommits.com/cswimr/SeaCogs"
__version__ = "1.4.1"
__documentation__ = "https://seacogs.coastalcommits.com/hotreload/"
def __init__(self, bot: Red) -> None:
super().__init__()
self.bot: Red = bot
self.config: Config = Config.get_conf(cog_instance=self, identifier=294518358420750336, force_registration=True)
self.logger: RedTraceLogger = getLogger(name="red.SeaCogs.HotReload")
self.observers: List[BaseObserver] = []
self.config.register_global(notify_channel=None, compile_before_reload=False)
watchdog_loggers = [getLogger(name="watchdog.observers.inotify_buffer")]
for watchdog_logger in watchdog_loggers:
watchdog_logger.setLevel("INFO") # SHUT UP!!!!
@override
async def cog_load(self) -> None:
"""Start the observer when the cog is loaded."""
_ = self.bot.loop.create_task(self.start_observer())
@override
async def cog_unload(self) -> None:
"""Stop the observer when the cog is unloaded."""
for observer in self.observers:
observer.stop()
observer.join()
self.logger.info("Stopped observer. No longer watching for file changes.")
@override
def format_help_for_context(self, ctx: commands.Context) -> str:
pre_processed = super().format_help_for_context(ctx) or ""
n = "\n" if "\n\n" not in pre_processed else ""
text = [
f"{pre_processed}{n}",
f"{bold('Cog Version:')} [{self.__version__}]({self.__git__})",
f"{bold('Author:')} {humanize_list(self.__author__)}",
f"{bold('Documentation:')} {self.__documentation__}",
]
return "\n".join(text)
async def get_paths(self) -> Generator[Path]:
"""Retrieve user defined paths."""
cog_manager = self.bot._cog_mgr # noqa: SLF001 # We have to use this private method because there is no public API to get user defined paths
cog_paths = await cog_manager.user_defined_paths()
return (Path(path) for path in cog_paths)
async def start_observer(self) -> None:
"""Start the observer to watch for file changes."""
self.observers.append(Observer())
paths = await self.get_paths()
is_first = True
for observer in self.observers:
if not is_first:
observer.stop()
observer.join()
self.logger.debug("Stopped hanging observer.")
continue
for path in paths:
if not path.exists():
self.logger.warning("Path %s does not exist. Skipping.", path)
continue
self.logger.debug("Adding observer schedule for path %s.", path)
observer.schedule(event_handler=HotReloadHandler(cog=self, path=path), path=str(path), recursive=True)
observer.start()
self.logger.info("Started observer. Watching for file changes.")
is_first = False
@checks.is_owner()
@commands.group(name="hotreload")
async def hotreload_group(self, ctx: commands.Context) -> None:
"""HotReload configuration commands."""
pass
@hotreload_group.command(name="notifychannel")
async def hotreload_notifychannel(self, ctx: commands.Context, channel: discord.TextChannel) -> None:
"""Set the channel to send notifications to."""
await self.config.notify_channel.set(channel.id)
await ctx.send(f"Notifications will be sent to {channel.mention}.")
@hotreload_group.command(name="compile") # type: ignore
async def hotreload_compile(self, ctx: commands.Context, compile_before_reload: bool) -> None:
"""Set whether to compile modified files before reloading."""
await self.config.compile_before_reload.set(compile_before_reload)
await ctx.send(f"I {'will' if compile_before_reload else 'will not'} compile modified files before hotreloading cogs.")
@hotreload_group.command(name="list") # type: ignore
async def hotreload_list(self, ctx: commands.Context) -> None:
"""List the currently active observers."""
if not self.observers:
await ctx.send("No observers are currently active.")
return
await ctx.send(f"Currently active observers (If there are more than one of these, report an issue): {box(humanize_list([str(o) for o in self.observers], style='unit'))}")
class HotReloadHandler(RegexMatchingEventHandler):
"""Handler for file changes."""
def __init__(self, cog: HotReload, path: Path) -> None:
super().__init__(regexes=[r".*\.py$"])
self.cog: HotReload = cog
self.path: Path = path
self.logger: RedTraceLogger = getLogger(name="red.SeaCogs.HotReload.Observer")
def on_any_event(self, event: FileSystemEvent) -> None:
"""Handle filesystem events."""
if event.is_directory:
return
allowed_events = ("moved", "deleted", "created", "modified")
if event.event_type not in allowed_events:
return
relative_src_path = Path(str(event.src_path)).relative_to(self.path)
src_package_name = relative_src_path.parts[0]
cogs_to_reload = [src_package_name]
if isinstance(event, FileSystemMovedEvent):
dest = f" to {event.dest_path}"
relative_dest_path = Path(str(event.dest_path)).relative_to(self.path)
dest_package_name = relative_dest_path.parts[0]
if dest_package_name != src_package_name:
cogs_to_reload.append(dest_package_name)
else:
dest = ""
self.logger.info("File %s has been %s%s.", event.src_path, event.event_type, dest)
run_coroutine_threadsafe(
coro=self.reload_cogs(
cog_names=cogs_to_reload,
paths=[Path(str(p)) for p in (event.src_path, getattr(event, "dest_path", None)) if p],
),
loop=self.cog.bot.loop,
)
async def reload_cogs(self, cog_names: Sequence[str], paths: Sequence[Path]) -> None:
"""Reload modified cogs."""
if not self.compile_modified_files(cog_names, paths):
return
core_logic = CoreLogic(bot=self.cog.bot)
self.logger.info("Reloading cogs: %s", humanize_list(cog_names, style="unit"))
await core_logic._reload(pkg_names=cog_names) # noqa: SLF001 # We have to use this private method because there is no public API to reload other cogs
self.logger.info("Reloaded cogs: %s", humanize_list(cog_names, style="unit"))
channel = self.cog.bot.get_channel(await self.cog.config.notify_channel())
if channel and isinstance(channel, discord.TextChannel):
await channel.send(f"Reloaded cogs: {humanize_list(cog_names, style='unit')}")
def compile_modified_files(self, cog_names: Sequence[str], paths: Sequence[Path]) -> bool:
"""Compile modified files to ensure they are valid Python files."""
for path in paths:
if not path.exists() or path.suffix != ".py":
self.logger.debug("Path %s does not exist or does not point to a Python file. Skipping compilation step.", path)
continue
try:
with NamedTemporaryFile() as temp_file:
self.logger.debug("Attempting to compile %s", path)
py_compile.compile(file=str(path), cfile=temp_file.name, doraise=True)
self.logger.debug("Successfully compiled %s", path)
except py_compile.PyCompileError as e:
e.__suppress_context__ = True
self.logger.exception("%s failed to compile. Not reloading cogs %s.", path, humanize_list(cog_names, style="unit"))
return False
except OSError:
self.logger.exception("Failed to create tempfile for compilation step. Skipping.")
return True

View file

@ -1,25 +0,0 @@
{
"author": [
"cswimr"
],
"install_msg": "Thank you for installing HotReload! Please see the [documentation](https://seacogs.coastalcommits.com/hotreload) to get started.",
"name": "HotReload",
"short": "Automatically reload cogs in local cog paths on file change.",
"description": "Automatically reload cogs in local cog paths on file change.",
"end_user_data_statement": "This cog does not store end user data.",
"hidden": false,
"disabled": false,
"min_bot_version": "3.5.0",
"min_python_version": [
3,
8,
0
],
"requirements": [
"watchdog"
],
"tags": [
"utility",
"development"
]
}

View file

@ -1,9 +1,9 @@
{
"author": [
"cswimr"
"SeaswimmerTheFsh (seasw.)"
],
"install_msg": "Thanks for installing my repo!\n\nIf you have any issues with any of the cogs, please create an issue [here](https://coastalcommits.com/cswimr/SeaCogs/issues) or join my [Discord Server](https://discord.gg/eMUMe77Yb8 ).",
"install_msg": "Thanks for installing my repo!\n\nIf you have any issues with any of the cogs, please create an issue [here](https://coastalcommits.com/SeaswimmerTheFsh/SeaCogs/issues) or join my [Discord Server](https://discord.gg/eMUMe77Yb8 ).",
"index_name": "sea-cogs",
"short": "Various cogs for Red, by cswimr",
"description": "Various cogs for Red, by cswimr"
"short": "Various cogs for Red, by SeaswimmerTheFsh (seasw.)",
"description": "Various cogs for Red, by SeaswimmerTheFsh (seasw.)"
}

View file

@ -1,37 +1,36 @@
site_name: SeaCogs Documentation
site_url: !ENV [SITE_URL, "https://seacogs.coastalcommits.com"]
site_url: !ENV [SITE_URL, 'https://seacogs.coastalcommits.com']
repo_name: CoastalCommits
repo_url: https://coastalcommits.com/cswimr/SeaCogs
edit_uri: !ENV [EDIT_URI, "src/branch/main/.docs"]
copyright: Copyright &copy; 2023-2024, cswimr
repo_url: https://coastalcommits.com/SeaswimmerTheFsh/SeaCogs
edit_uri: !ENV [EDIT_URI, 'src/branch/main/.docs']
copyright: Copyright &copy; 2023-2024, SeaswimmerTheFsh
docs_dir: .docs
site_author: cswimr
site_author: SeaswimmerTheFsh
site_description: Documentation for my Red-DiscordBot Cogs.
nav:
- Home: index.md
- Aurora:
- aurora/index.md
- Moderation Commands: aurora/moderation-commands.md
- Case Commands: aurora/case-commands.md
- Configuration: aurora/configuration.md
- aurora/index.md
- Moderation Commands: aurora/moderation-commands.md
- Case Commands: aurora/case-commands.md
- Configuration: aurora/configuration.md
- Bible: bible.md
- Backup: backup.md
- EmojiInfo: emojiinfo.md
- HotReload: hotreload.md
- Nerdify: nerdify.md
- Pterodactyl:
- pterodactyl/index.md
- Installing Red: pterodactyl/installing-red.md
- Getting Started: pterodactyl/getting-started.md
- Configuration: pterodactyl/configuration.md
- Regex Examples: pterodactyl/regex.md
- pterodactyl/index.md
- Installing Red: pterodactyl/installing-red.md
- Getting Started: pterodactyl/getting-started.md
- Configuration: pterodactyl/configuration.md
- Regex Examples: pterodactyl/regex.md
plugins:
- git-authors
- search
- social
#- social
- git-revision-date-localized:
enable_creation_date: true
type: timeago
@ -73,7 +72,7 @@ markdown_extensions:
theme:
name: material
palette:
- media: "(prefers-color-scheme: light)"
- media: '(prefers-color-scheme: light)'
scheme: default
primary: white
accent: light blue
@ -81,7 +80,7 @@ theme:
icon: material/toggle-switch
name: Switch to dark mode
- media: "(prefers-color-scheme: dark)"
- media: '(prefers-color-scheme: dark)'
scheme: slate
primary: black
accent: light blue
@ -114,5 +113,3 @@ watch:
- ./bible
- ./nerdify
- ./pterodactyl
- ./emojiinfo
- ./antipolls

View file

@ -1,6 +1,6 @@
{
"author" : ["cswimr"],
"install_msg" : "Thank you for installing Nerdify!\nYou can find the source code of this cog [here](https://coastalcommits.com/cswimr/SeaCogs). Based off of PhasecoreX's [UwU](<https://github.com/PhasecoreX/PCXCogs/tree/master/uwu>) cog.",
"author" : ["SeaswimmerTheFsh (seasw.)"],
"install_msg" : "Thank you for installing Nerdify!\nYou can find the source code of this cog [here](https://coastalcommits.com/SeaswimmerTheFsh/SeaCogs). Based off of PhasecoreX's [UwU](<https://github.com/PhasecoreX/PCXCogs/tree/master/uwu>) cog.",
"name" : "Nerdify",
"short" : "Nerdify your text!",
"description" : "Nerdify your text!",

View file

@ -12,15 +12,13 @@ from typing import Any, Optional, Union
import discord
from redbot.core import commands
from redbot.core.utils import chat_formatting, common_filters
from redbot.core.utils.chat_formatting import bold, humanize_list
class Nerdify(commands.Cog):
"""Nerdify your text."""
__author__ = ["[cswimr](https://www.coastalcommits.com/cswimr)"]
__git__ = "https://www.coastalcommits.com/cswimr/SeaCogs"
__version__ = "1.3.5"
__author__ = ["SeaswimmerTheFsh"]
__version__ = "1.3.4"
__documentation__ = "https://seacogs.coastalcommits.com/nerdify/"
def __init__(self, bot):
@ -31,26 +29,21 @@ class Nerdify(commands.Cog):
n = "\n" if "\n\n" not in pre_processed else ""
text = [
f"{pre_processed}{n}",
f"{bold('Cog Version:')} [{self.__version__}]({self.__git__})",
f"{bold('Author:')} {humanize_list(self.__author__)}",
f"{bold('Documentation:')} {self.__documentation__}",
f"Cog Version: **{self.__version__}**",
f"Author: {chat_formatting.humanize_list(self.__author__)}",
f"Documentation: {self.__documentation__}"
]
return "\n".join(text)
@commands.command(aliases=["nerd"])
async def nerdify(
self,
ctx: commands.Context,
*,
text: Optional[str] = None,
self, ctx: commands.Context, *, text: Optional[str] = None
) -> None:
"""Nerdify the replied to message, previous message, or your own text."""
if not text:
if hasattr(ctx.message, "reference") and ctx.message.reference:
with suppress(
discord.Forbidden,
discord.NotFound,
discord.HTTPException,
discord.Forbidden, discord.NotFound, discord.HTTPException
):
message_id = ctx.message.reference.message_id
if message_id:
@ -66,9 +59,7 @@ class Nerdify(commands.Cog):
ctx.channel,
self.nerdify_text(text),
allowed_mentions=discord.AllowedMentions(
everyone=False,
users=False,
roles=False,
everyone=False, users=False, roles=False
),
)
@ -83,10 +74,7 @@ class Nerdify(commands.Cog):
return f'"{text}" 🤓'
async def type_message(
self,
destination: discord.abc.Messageable,
content: str,
**kwargs: Any,
self, destination: discord.abc.Messageable, content: str, **kwargs: Any
) -> Union[discord.Message, None]:
"""Simulate typing and sending a message to a destination.

2501
poetry.lock generated Normal file

File diff suppressed because it is too large Load diff

Binary file not shown.

Before

Width:  |  Height:  |  Size: 65 KiB

View file

@ -1,6 +1,6 @@
{
"author" : ["cswimr"],
"install_msg" : "Thank you for installing Pterodactyl!\nYou can find the source code of this cog [here](https://coastalcommits.com/cswimr/SeaCogs).\nDocumentation can be found [here](https://seacogs.coastalcommits.com/pterodactyl ).",
"author" : ["SeaswimmerTheFsh (seasw.)"],
"install_msg" : "Thank you for installing Pterodactyl!\nYou can find the source code of this cog [here](https://coastalcommits.com/SeaswimmerTheFsh/SeaCogs).\nDocumentation can be found [here](https://seacogs.coastalcommits.com/pterodactyl ).",
"name" : "Pterodactyl",
"short" : "Interface with Pterodactyl through websockets.",
"description" : "Interface with Pterodactyl through websockets.",
@ -9,7 +9,7 @@
"disabled": false,
"min_bot_version": "3.5.0",
"min_python_version": [3, 8, 0],
"requirements": ["git+https://github.com/cswimr/pydactyl", "websockets"],
"requirements": ["git+https://github.com/SeaswimmerTheFsh/pydactyl", "websockets"],
"tags": [
"pterodactyl",
"minecraft",

View file

@ -1,8 +1,8 @@
from red_commons import logging
from red_commons.logging import getLogger
logger = getLogger("red.SeaCogs.Pterodactyl")
websocket_logger = getLogger("red.SeaCogs.Pterodactyl.Websocket")
logger = getLogger('red.SeaCogs.Pterodactyl')
websocket_logger = getLogger('red.SeaCogs.Pterodactyl.websocket')
if logger.level >= logging.VERBOSE:
websocket_logger.setLevel(logging.logging.INFO)
elif logger.level < logging.VERBOSE:

View file

@ -3,8 +3,8 @@ import aiohttp
async def get_status(host: str, port: int = 25565) -> tuple[bool, dict]:
async with aiohttp.ClientSession() as session:
async with session.get(f"https://api.mcsrvstat.us/2/{host}:{port}") as response:
response = await response.json() # noqa: PLW2901
if response["online"]:
async with session.get(f'https://api.mcsrvstat.us/2/{host}:{port}') as response:
response = await response.json()
if response['online']:
return (True, response)
return (False, response)

View file

@ -1,6 +1,6 @@
import asyncio
import json
from typing import AsyncIterable, Iterable, Mapping, Optional, Tuple, Union
from typing import Mapping, Optional, Tuple, Union
import discord
import websockets
@ -9,9 +9,8 @@ from pydactyl import PterodactylClient
from redbot.core import app_commands, commands
from redbot.core.app_commands import Choice
from redbot.core.bot import Red
from redbot.core.utils.chat_formatting import bold, box, humanize_list
from redbot.core.utils.chat_formatting import box, error, humanize_list
from redbot.core.utils.views import ConfirmView
from typing_extensions import override
from pterodactyl import mcsrvstatus
from pterodactyl.config import config, register_config
@ -21,84 +20,55 @@ from pterodactyl.logger import logger
class Pterodactyl(commands.Cog):
"""Pterodactyl allows you to manage your Pterodactyl Panel from Discord."""
__author__ = ["[cswimr](https://www.coastalcommits.com/cswimr)"]
__git__ = "https://www.coastalcommits.com/cswimr/SeaCogs"
__version__ = "2.0.6"
__author__ = ["SeaswimmerTheFsh"]
__version__ = "2.0.0"
__documentation__ = "https://seacogs.coastalcommits.com/pterodactyl/"
def __init__(self, bot: Red):
self.bot = bot
self.client: Optional[PterodactylClient] = None
self.task: Optional[asyncio.Task] = None
self.websocket: Optional[websockets.ClientConnection] = None
self.websocket: Optional[websockets.WebSocketClientProtocol] = None
self.retry_counter: int = 0
register_config(config)
self.task = self._get_task()
self.task = self.get_task()
self.update_topic.start()
@override
def format_help_for_context(self, ctx: commands.Context) -> str:
pre_processed = super().format_help_for_context(ctx) or ""
n = "\n" if "\n\n" not in pre_processed else ""
text = [
f"{pre_processed}{n}",
f"{bold('Cog Version:')} [{self.__version__}]({self.__git__})",
f"{bold('Author:')} {humanize_list(self.__author__)}",
f"{bold('Documentation:')} {self.__documentation__}",
f"Cog Version: **{self.__version__}**",
f"Author: {humanize_list(self.__author__)}",
f"Documentation: {self.__documentation__}",
]
return "\n".join(text)
@override
async def cog_load(self) -> None:
pterodactyl_keys = await self.bot.get_shared_api_tokens("pterodactyl")
api_key = pterodactyl_keys.get("api_key")
if api_key is None:
self.maybe_cancel_task()
logger.error("Pterodactyl API key not set. Please set it using `[p]set api`.")
return
base_url = await config.base_url()
if base_url is None:
self.maybe_cancel_task()
logger.error("Pterodactyl base URL not set. Please set it using `[p]pterodactyl config url`.")
return
server_id = await config.server_id()
if server_id is None:
self.maybe_cancel_task()
logger.error("Pterodactyl server ID not set. Please set it using `[p]pterodactyl config serverid`.")
return
self.client = PterodactylClient(base_url, api_key).client
@override
async def cog_unload(self) -> None:
self.update_topic.cancel()
self.maybe_cancel_task()
self.task.cancel()
self.retry_counter = 0
await self.client._session.close() # pylint: disable=protected-access
def maybe_cancel_task(self, reset_retry_counter: bool = True) -> None:
if self.task:
self.task.cancel()
if reset_retry_counter:
self.retry_counter = 0
def _get_task(self) -> asyncio.Task:
def get_task(self) -> asyncio.Task:
from pterodactyl.websocket import establish_websocket_connection
task = self.bot.loop.create_task(establish_websocket_connection(self), name="Pterodactyl Websocket Connection")
task.add_done_callback(self._error_callback)
task.add_done_callback(self.error_callback)
return task
def _error_callback(self, fut) -> None: # NOTE Thanks flame442 and zephyrkul for helping me figure this out
def error_callback(self, fut) -> None: #NOTE - Thanks flame442 and zephyrkul for helping me figure this out
try:
fut.result()
except asyncio.CancelledError:
logger.info("WebSocket task has been cancelled.")
except Exception as e: # pylint: disable=broad-exception-caught
except Exception as e: # pylint: disable=broad-exception-caught
logger.error("WebSocket task has failed: %s", e, exc_info=e)
self.maybe_cancel_task(reset_retry_counter=False)
self.task.cancel()
if self.retry_counter < 5:
self.retry_counter += 1
logger.info("Retrying in %s seconds...", 5 * self.retry_counter)
self.task = self.bot.loop.call_later(5 * self.retry_counter, self._get_task)
self.task = self.bot.loop.call_later(5 * self.retry_counter, self.get_task)
else:
logger.info("Retry limit reached. Stopping task.")
@ -109,9 +79,9 @@ class Pterodactyl(commands.Cog):
console = self.bot.get_channel(await config.console_channel())
chat = self.bot.get_channel(await config.chat_channel())
if console:
await console.edit(topic=topic) # type: ignore
await console.edit(topic=topic)
if chat:
await chat.edit(topic=topic) # type: ignore
await chat.edit(topic=topic)
@commands.Cog.listener()
async def on_message_without_command(self, message: discord.Message) -> None:
@ -122,7 +92,13 @@ class Pterodactyl(commands.Cog):
return
logger.debug("Received console command from %s: %s", message.author.id, message.content)
await message.channel.send(f"Received console command from {message.author.id}: {message.content[:1900]}", allowed_mentions=discord.AllowedMentions.none())
await self._send(json.dumps({"event": "send command", "args": [message.content]}))
try:
await self.websocket.send(json.dumps({"event": "send command", "args": [message.content]}))
except websockets.exceptions.ConnectionClosed as e:
logger.error("WebSocket connection closed: %s", e)
self.task.cancel()
self.retry_counter = 0
self.task = self.get_task()
if message.channel.id == await config.chat_channel() and message.author.bot is False:
logger.debug("Received chat message from %s: %s", message.author.id, message.content)
channel = self.bot.get_channel(await config.console_channel())
@ -130,22 +106,13 @@ class Pterodactyl(commands.Cog):
await channel.send(f"Received chat message from {message.author.id}: {message.content[:1900]}", allowed_mentions=discord.AllowedMentions.none())
msg = json.dumps({"event": "send command", "args": [await self.get_chat_command(message)]})
logger.debug("Sending chat message to server:\n%s", msg)
await self._send(message=msg)
async def _send(self, message: Union[websockets.Data, Iterable[websockets.Data], AsyncIterable[websockets.Data]], text: bool = False):
"""Send a message through the websocket connection. Restarts the websocket connection task if it is closed, and reinvokes itself."""
try:
await self.websocket.send(message=message, text=text) # type: ignore - we want this to error if `self.websocket` is none
except websockets.exceptions.ConnectionClosed as e:
logger.error("WebSocket connection closed: %s", e)
self.maybe_cancel_task()
self.task = self._get_task()
try:
await asyncio.wait_for(fut=self.task, timeout=60)
await self._send(message=message, text=text)
except asyncio.TimeoutError:
logger.error("Timeout while waiting for websocket connection")
raise
await self.websocket.send(msg)
except websockets.exceptions.ConnectionClosed as e:
logger.error("WebSocket connection closed: %s", e)
self.task.cancel()
self.retry_counter = 0
self.task = self.get_task()
async def get_topic(self) -> str:
topic: str = await config.topic()
@ -156,27 +123,23 @@ class Pterodactyl(commands.Cog):
if await config.api_endpoint() == "minecraft":
status, response = await mcsrvstatus.get_status(await config.topic_hostname(), await config.topic_port())
if status:
placeholders.update(
{
"I": response["ip"],
"M": str(response["players"]["max"]),
"P": str(response["players"]["online"]),
"V": response["version"],
"D": response["motd"]["clean"][0] if response["motd"]["clean"] else "unset",
},
)
placeholders.update({
"I": response['ip'],
"M": str(response['players']['max']),
"P": str(response['players']['online']),
"V": response['version'],
"D": response['motd']['clean'][0] if response['motd']['clean'] else "unset",
})
else:
placeholders.update(
{
"I": response["ip"],
"M": "0",
"P": "0",
"V": "Server Offline",
"D": "Server Offline",
},
)
placeholders.update({
"I": response['ip'],
"M": "0",
"P": "0",
"V": "Server Offline",
"D": "Server Offline",
})
for key, value in placeholders.items():
topic = topic.replace(".$" + key, value)
topic = topic.replace('.$' + key, value)
return topic
async def get_chat_command(self, message: discord.Message) -> str:
@ -185,82 +148,115 @@ class Pterodactyl(commands.Cog):
"C": str(message.author.color),
"D": message.author.discriminator,
"I": str(message.author.id),
"M": message.content.replace('"', "").replace("\n", " "),
"M": message.content.replace('"','').replace("\n", " "),
"N": message.author.display_name,
"U": message.author.name,
"V": await config.invite() or "use [p]pterodactyl config invite to change me",
}
for key, value in placeholders.items():
command = command.replace(".$" + key, value)
command = command.replace('.$' + key, value)
return command
async def get_player_list(self) -> Optional[Tuple[str, list]]:
if await config.api_endpoint() == "minecraft":
status, response = await mcsrvstatus.get_status(await config.topic_hostname(), await config.topic_port())
if status and "list" in response["players"]:
output_str = "\n".join(response["players"]["list"])
return output_str, response["players"]["list"]
if status and 'list' in response['players']:
output_str = '\n'.join(response['players']['list'])
return output_str, response['players']['list']
return None
return None
async def get_player_list_embed(self, ctx: Union[commands.Context, discord.Interaction]) -> Optional[discord.Embed]:
player_list = await self.get_player_list()
if player_list and isinstance(ctx.channel, discord.abc.Messageable):
if player_list:
embed = discord.Embed(color=await self.bot.get_embed_color(ctx.channel), title="Players Online")
embed.description = player_list[0]
return embed
return None
async def power(self, ctx: Union[discord.Interaction, commands.Context], action: str, action_ing: str, warning: str = "") -> None:
async def power(self, ctx: Union[discord.Interaction, commands.Context], action: str, action_ing: str, warning: str = '') -> None:
if isinstance(ctx, discord.Interaction):
ctx = await self.bot.get_context(ctx)
author = ctx.user
else:
author = ctx.author
current_status = await config.current_status()
if current_status == action_ing:
await ctx.send(f"Server is already {action_ing}.", ephemeral=True)
return
if isinstance(ctx, discord.Interaction):
return await ctx.response.send_message(f"Server is already {action_ing}.", ephemeral=True)
return await ctx.send(f"Server is already {action_ing}.")
if current_status in ["starting", "stopping"] and action != "kill":
await ctx.send("Another power action is already in progress.", ephemeral=True)
return
if isinstance(ctx, discord.Interaction):
return await ctx.response.send_message("Another power action is already in progress.", ephemeral=True)
return await ctx.send("Another power action is already in progress.")
view = ConfirmView(ctx.author, disable_buttons=True)
view = ConfirmView(author, disable_buttons=True)
message = await ctx.send(f"{warning}Are you sure you want to {action} the server?", view=view)
if isinstance(ctx, discord.Interaction):
await ctx.response.send_message(f"{warning}Are you sure you want to {action} the server?", view=view)
else:
message = await ctx.send(f"{warning}Are you sure you want to {action} the server?", view=view)
await view.wait()
if view.result is True:
await message.edit(content=f"Sending websocket command to {action} server...", view=None)
if isinstance(ctx, discord.Interaction):
await ctx.edit_original_response(content=f"Sending websocket command to {action} server...", view=None)
else:
await message.edit(content=f"Sending websocket command to {action} server...", view=None)
await self._websocket_send(json.dumps({"event": "set state", "args": [action]}))
await self.websocket.send(json.dumps({"event": "set state", "args": [action]}))
await message.edit(content=f"Server {action_ing}", view=None)
return
if isinstance(ctx, discord.Interaction):
await ctx.edit_original_response(content=f"Server {action_ing}", view=None)
else:
await message.edit(content=f"Server {action_ing}", view=None)
await message.edit(content="Cancelled.", view=None)
return
else:
if isinstance(ctx, discord.Interaction):
await ctx.edit_original_response(content="Cancelled.", view=None)
else:
await message.edit(content="Cancelled.", view=None)
async def send_command(self, ctx: Union[discord.Interaction, commands.Context], command: str):
channel = self.bot.get_channel(await config.console_channel())
if isinstance(ctx, discord.Interaction):
ctx = await self.bot.get_context(ctx)
if channel:
await channel.send(f"Received console command from {ctx.author.id}: {command[:1900]}", allowed_mentions=discord.AllowedMentions.none())
await self._websocket_send(json.dumps({"event": "send command", "args": [command]}))
await ctx.send(f"Command sent to server. {box(command, 'json')}")
if channel:
await channel.send(f"Received console command from {ctx.user.id}: {command[:1900]}", allowed_mentions=discord.AllowedMentions.none())
try:
await self.websocket.send(json.dumps({"event": "send command", "args": [command]}))
await ctx.response.send_message(f"Command sent to server. {box(command, 'json')}", ephemeral=True)
except websockets.exceptions.ConnectionClosed as e:
logger.error("WebSocket connection closed: %s", e)
await ctx.response.send_message(error("WebSocket connection closed."))
self.task.cancel()
self.retry_counter = 0
self.task = self.get_task()
else:
if channel:
await channel.send(f"Received console command from {ctx.author.id}: {command[:1900]}", allowed_mentions=discord.AllowedMentions.none())
try:
await self.websocket.send(json.dumps({"event": "send command", "args": [command]}))
await ctx.send(f"Command sent to server. {box(command, 'json')}")
except websockets.exceptions.ConnectionClosed as e:
logger.error("WebSocket connection closed: %s", e)
await ctx.send(error("WebSocket connection closed."))
self.task.cancel()
self.retry_counter = 0
self.task = self.get_task()
@commands.Cog.listener()
async def on_red_api_tokens_update(self, service_name: str, api_tokens: Mapping[str, str]): # pylint: disable=unused-argument
async def on_red_api_tokens_update(self, service_name: str, api_tokens: Mapping[str,str]): # pylint: disable=unused-argument
if service_name == "pterodactyl":
logger.info("Configuration value set: api_key\nRestarting task...")
self.maybe_cancel_task(reset_retry_counter=True)
self.task = self._get_task()
self.task.cancel()
self.retry_counter = 0
self.task = self.get_task()
slash_pterodactyl = app_commands.Group(name="pterodactyl", description="Pterodactyl allows you to manage your Pterodactyl Panel from Discord.")
@slash_pterodactyl.command(name="command", description="Send a command to the server console.")
@slash_pterodactyl.command(name = "command", description = "Send a command to the server console.")
async def slash_pterodactyl_command(self, interaction: discord.Interaction, command: str) -> None:
"""Send a command to the server console.
@ -270,7 +266,7 @@ class Pterodactyl(commands.Cog):
The command to send to the server."""
return await self.send_command(interaction, command)
@slash_pterodactyl.command(name="players", description="Retrieve a list of players on the server.")
@slash_pterodactyl.command(name = "players", description = "Retrieve a list of players on the server.")
async def slash_pterodactyl_players(self, interaction: discord.Interaction) -> None:
"""Retrieve a list of players on the server."""
e = await self.get_player_list_embed(interaction)
@ -279,8 +275,13 @@ class Pterodactyl(commands.Cog):
else:
await interaction.response.send_message("No players online.", ephemeral=True)
@slash_pterodactyl.command(name="power", description="Send power actions to the server.")
@app_commands.choices(action=[Choice(name="Start", value="start"), Choice(name="Stop", value="stop"), Choice(name="Restart", value="restart"), Choice(name="⚠️ Kill ⚠️", value="kill")])
@slash_pterodactyl.command(name = "power", description = "Send power actions to the server.")
@app_commands.choices(action=[
Choice(name="Start", value="start"),
Choice(name="Stop", value="stop"),
Choice(name="Restart", value="restart"),
Choice(name="⚠️ Kill ⚠️", value="kill")
])
async def slash_pterodactyl_power(self, interaction: discord.Interaction, action: app_commands.Choice[str]) -> None:
"""Send power actions to the server.
@ -294,11 +295,11 @@ class Pterodactyl(commands.Cog):
return await self.power(interaction, action.value, "stopping...")
return await self.power(interaction, action.value, f"{action.value}ing...")
@commands.group(autohelp=True, name="pterodactyl", aliases=["ptero"])
@commands.group(autohelp = True, name = "pterodactyl", aliases = ["ptero"])
async def pterodactyl(self, ctx: commands.Context) -> None:
"""Pterodactyl allows you to manage your Pterodactyl Panel from Discord."""
@pterodactyl.command(name="players", aliases=["list", "online", "playerlist", "who"])
@pterodactyl.command(name = "players", aliases=["list", "online", "playerlist", "who"])
async def pterodactyl_players(self, ctx: commands.Context) -> None:
"""Retrieve a list of players on the server."""
e = await self.get_player_list_embed(ctx)
@ -307,43 +308,43 @@ class Pterodactyl(commands.Cog):
else:
await ctx.send("No players online.")
@pterodactyl.command(name="command", aliases=["cmd", "execute", "exec"])
@pterodactyl.command(name = "command", aliases = ["cmd", "execute", "exec"])
@commands.admin()
async def pterodactyl_command(self, ctx: commands.Context, *, command: str) -> None:
"""Send a command to the server console."""
return await self.send_command(ctx, command)
@pterodactyl.group(autohelp=True, name="power")
@pterodactyl.group(autohelp = True, name = "power")
@commands.admin()
async def pterodactyl_power(self, ctx: commands.Context) -> None:
"""Send power actions to the server."""
@pterodactyl_power.command(name="start")
@pterodactyl_power.command(name = "start")
async def pterodactyl_power_start(self, ctx: commands.Context) -> Optional[discord.Message]:
"""Start the server."""
return await self.power(ctx, "start", "starting...")
@pterodactyl_power.command(name="stop")
@pterodactyl_power.command(name = "stop")
async def pterodactyl_power_stop(self, ctx: commands.Context) -> Optional[discord.Message]:
"""Stop the server."""
return await self.power(ctx, "stop", "stopping...")
@pterodactyl_power.command(name="restart")
@pterodactyl_power.command(name = "restart")
async def pterodactyl_power_restart(self, ctx: commands.Context) -> Optional[discord.Message]:
"""Restart the server."""
return await self.power(ctx, "restart", "restarting...")
@pterodactyl_power.command(name="kill")
@pterodactyl_power.command(name = "kill")
async def pterodactyl_power_kill(self, ctx: commands.Context) -> Optional[discord.Message]:
"""Kill the server."""
return await self.power(ctx, "kill", "stopping... (forcefully killed)", warning="**⚠️ Forcefully killing the server process can corrupt data in some cases. ⚠️**\n")
@pterodactyl.group(autohelp=True, name="config", aliases=["settings", "set"])
@pterodactyl.group(autohelp = True, name = "config", aliases = ["settings", "set"])
@commands.is_owner()
async def pterodactyl_config(self, ctx: commands.Context) -> None:
"""Configure Pterodactyl settings."""
@pterodactyl_config.command(name="url")
@pterodactyl_config.command(name = "url")
async def pterodactyl_config_base_url(self, ctx: commands.Context, *, base_url: str) -> None:
"""Set the base URL of your Pterodactyl Panel.
@ -352,57 +353,59 @@ class Pterodactyl(commands.Cog):
await config.base_url.set(base_url)
await ctx.send(f"Base URL set to {base_url}")
logger.info("Configuration value set: base_url = %s\nRestarting task...", base_url)
self.maybe_cancel_task(reset_retry_counter=True)
self.task = self._get_task()
self.task.cancel()
self.retry_counter = 0
self.task = self.get_task()
@pterodactyl_config.command(name="serverid")
@pterodactyl_config.command(name = "serverid")
async def pterodactyl_config_server_id(self, ctx: commands.Context, *, server_id: str) -> None:
"""Set the ID of your server."""
await config.server_id.set(server_id)
await ctx.send(f"Server ID set to {server_id}")
logger.info("Configuration value set: server_id = %s\nRestarting task...", server_id)
self.maybe_cancel_task(reset_retry_counter=True)
self.task = self._get_task()
self.task.cancel()
self.retry_counter = 0
self.task = self.get_task()
@pterodactyl_config.group(name="console")
@pterodactyl_config.group(name = "console")
async def pterodactyl_config_console(self, ctx: commands.Context):
"""Configure console settings."""
@pterodactyl_config_console.command(name="channel")
@pterodactyl_config_console.command(name = "channel")
async def pterodactyl_config_console_channel(self, ctx: commands.Context, channel: discord.TextChannel) -> None:
"""Set the channel to send console output to."""
await config.console_channel.set(channel.id)
await ctx.send(f"Console channel set to {channel.mention}")
@pterodactyl_config_console.command(name="commands")
@pterodactyl_config_console.command(name = "commands")
async def pterodactyl_config_console_commands(self, ctx: commands.Context, enabled: bool) -> None:
"""Enable or disable console commands."""
await config.console_commands_enabled.set(enabled)
await ctx.send(f"Console commands set to {enabled}")
@pterodactyl_config.command(name="invite")
@pterodactyl_config.command(name = "invite")
async def pterodactyl_config_invite(self, ctx: commands.Context, invite: str) -> None:
"""Set the invite link for your server."""
await config.invite.set(invite)
await ctx.send(f"Invite link set to {invite}")
@pterodactyl_config.group(name="topic")
@pterodactyl_config.group(name = "topic")
async def pterodactyl_config_topic(self, ctx: commands.Context):
"""Set the topic for the console and chat channels."""
@pterodactyl_config_topic.command(name="host", aliases=["hostname", "ip"])
@pterodactyl_config_topic.command(name = "host", aliases = ["hostname", "ip"])
async def pterodactyl_config_topic_host(self, ctx: commands.Context, host: str) -> None:
"""Set the hostname or IP address of your server."""
await config.topic_hostname.set(host)
await ctx.send(f"Hostname/IP set to `{host}`")
@pterodactyl_config_topic.command(name="port")
@pterodactyl_config_topic.command(name = "port")
async def pterodactyl_config_topic_port(self, ctx: commands.Context, port: int) -> None:
"""Set the port of your server."""
await config.topic_port.set(port)
await ctx.send(f"Port set to `{port}`")
@pterodactyl_config_topic.command(name="text")
@pterodactyl_config_topic.command(name = "text")
async def pterodactyl_config_topic_text(self, ctx: commands.Context, *, text: str) -> None:
"""Set the text for the console and chat channels.
@ -418,17 +421,17 @@ class Pterodactyl(commands.Cog):
await config.topic.set(text)
await ctx.send(f"Topic set to:\n{box(text, 'yaml')}")
@pterodactyl_config.group(name="chat")
@pterodactyl_config.group(name = "chat")
async def pterodactyl_config_chat(self, ctx: commands.Context):
"""Configure chat settings."""
@pterodactyl_config_chat.command(name="channel")
@pterodactyl_config_chat.command(name = "channel")
async def pterodactyl_config_chat_channel(self, ctx: commands.Context, channel: discord.TextChannel) -> None:
"""Set the channel to send chat output to."""
await config.chat_channel.set(channel.id)
await ctx.send(f"Chat channel set to {channel.mention}")
@pterodactyl_config_chat.command(name="command")
@pterodactyl_config_chat.command(name = "command")
async def pterodactyl_config_chat_command(self, ctx: commands.Context, *, command: str) -> None:
"""Set the command that will be used to send messages from Discord.
@ -437,11 +440,11 @@ class Pterodactyl(commands.Cog):
await config.chat_command.set(command)
await ctx.send(f"Chat command set to:\n{box(command, 'json')}")
@pterodactyl_config.group(name="regex")
@pterodactyl_config.group(name = "regex")
async def pterodactyl_config_regex(self, ctx: commands.Context) -> None:
"""Set regex patterns."""
@pterodactyl_config_regex.command(name="chat")
@pterodactyl_config_regex.command(name = "chat")
async def pterodactyl_config_regex_chat(self, ctx: commands.Context, *, regex: str) -> None:
"""Set the regex pattern to match chat messages on the server.
@ -449,7 +452,7 @@ class Pterodactyl(commands.Cog):
await config.chat_regex.set(regex)
await ctx.send(f"Chat regex set to:\n{box(regex, 'regex')}")
@pterodactyl_config_regex.command(name="server")
@pterodactyl_config_regex.command(name = "server")
async def pterodactyl_config_regex_server(self, ctx: commands.Context, *, regex: str) -> None:
"""Set the regex pattern to match server messages on the server.
@ -457,7 +460,7 @@ class Pterodactyl(commands.Cog):
await config.server_regex.set(regex)
await ctx.send(f"Server regex set to:\n{box(regex, 'regex')}")
@pterodactyl_config_regex.command(name="join")
@pterodactyl_config_regex.command(name = "join")
async def pterodactyl_config_regex_join(self, ctx: commands.Context, *, regex: str) -> None:
"""Set the regex pattern to match join messages on the server.
@ -465,7 +468,7 @@ class Pterodactyl(commands.Cog):
await config.join_regex.set(regex)
await ctx.send(f"Join regex set to:\n{box(regex, 'regex')}")
@pterodactyl_config_regex.command(name="leave")
@pterodactyl_config_regex.command(name = "leave")
async def pterodactyl_config_regex_leave(self, ctx: commands.Context, *, regex: str) -> None:
"""Set the regex pattern to match leave messages on the server.
@ -473,7 +476,7 @@ class Pterodactyl(commands.Cog):
await config.leave_regex.set(regex)
await ctx.send(f"Leave regex set to:\n{box(regex, 'regex')}")
@pterodactyl_config_regex.command(name="achievement")
@pterodactyl_config_regex.command(name = "achievement")
async def pterodactyl_config_regex_achievement(self, ctx: commands.Context, *, regex: str) -> None:
"""Set the regex pattern to match achievement messages on the server.
@ -481,41 +484,41 @@ class Pterodactyl(commands.Cog):
await config.achievement_regex.set(regex)
await ctx.send(f"Achievement regex set to:\n{box(regex, 'regex')}")
@pterodactyl_config.group(name="messages", aliases=["msg", "msgs", "message"])
@pterodactyl_config.group(name = "messages", aliases = ['msg', 'msgs', 'message'])
async def pterodactyl_config_messages(self, ctx: commands.Context):
"""Configure message settings."""
@pterodactyl_config_messages.command(name="startup")
@pterodactyl_config_messages.command(name = "startup")
async def pterodactyl_config_messages_startup(self, ctx: commands.Context, *, message: str) -> None:
"""Set the message that will be sent when the server starts."""
await config.startup_msg.set(message)
await ctx.send(f"Startup message set to: {message}")
@pterodactyl_config_messages.command(name="shutdown")
@pterodactyl_config_messages.command(name = "shutdown")
async def pterodactyl_config_messages_shutdown(self, ctx: commands.Context, *, message: str) -> None:
"""Set the message that will be sent when the server stops."""
await config.shutdown_msg.set(message)
await ctx.send(f"Shutdown message set to: {message}")
@pterodactyl_config_messages.command(name="join")
@pterodactyl_config_messages.command(name = "join")
async def pterodactyl_config_messages_join(self, ctx: commands.Context, *, message: str) -> None:
"""Set the message that will be sent when a user joins the server. This is only shown in embeds."""
await config.join_msg.set(message)
await ctx.send(f"Join message set to: {message}")
@pterodactyl_config_messages.command(name="leave")
@pterodactyl_config_messages.command(name = "leave")
async def pterodactyl_config_messages_leave(self, ctx: commands.Context, *, message: str) -> None:
"""Set the message that will be sent when a user leaves the server. This is only shown in embeds."""
await config.leave_msg.set(message)
await ctx.send(f"Leave message set to: {message}")
@pterodactyl_config.command(name="ip")
@pterodactyl_config.command(name = "ip")
async def pterodactyl_config_mask_ip(self, ctx: commands.Context, mask: bool) -> None:
"""Mask the IP addresses of users in console messages."""
await config.mask_ip.set(mask)
await ctx.send(f"IP masking set to {mask}")
@pterodactyl_config.command(name="api")
@pterodactyl_config.command(name = "api")
async def pterodactyl_config_api(self, ctx: commands.Context, endpoint: str) -> None:
"""Set the API endpoint for retrieving user avatars.
@ -524,14 +527,11 @@ class Pterodactyl(commands.Cog):
await config.api_endpoint.set(endpoint)
await ctx.send(f"API endpoint set to {endpoint}")
@pterodactyl_config_regex.group(
name="blacklist",
aliases=["block", "blocklist"],
)
@pterodactyl_config_regex.group(name = "blacklist", aliases = ['block', 'blocklist'],)
async def pterodactyl_config_regex_blacklist(self, ctx: commands.Context):
"""Blacklist regex patterns."""
@pterodactyl_config_regex_blacklist.command(name="add")
@pterodactyl_config_regex_blacklist.command(name = "add")
async def pterodactyl_config_regex_blacklist_add(self, ctx: commands.Context, name: str, *, regex: str) -> None:
"""Add a regex pattern to the blacklist."""
async with config.regex_blacklist() as blacklist:
@ -549,7 +549,7 @@ class Pterodactyl(commands.Cog):
else:
await msg.edit(content="Cancelled.")
@pterodactyl_config_regex_blacklist.command(name="remove")
@pterodactyl_config_regex_blacklist.command(name = "remove")
async def pterodactyl_config_regex_blacklist_remove(self, ctx: commands.Context, name: str) -> None:
"""Remove a regex pattern from the blacklist."""
async with config.regex_blacklist() as blacklist:
@ -566,7 +566,7 @@ class Pterodactyl(commands.Cog):
else:
await ctx.send(f"Name `{name}` does not exist in the blacklist.")
@pterodactyl_config.command(name="view", aliases=["show"])
@pterodactyl_config.command(name = 'view', aliases = ['show'])
async def pterodactyl_config_view(self, ctx: commands.Context) -> None:
"""View the current configuration."""
base_url = await config.base_url()
@ -591,7 +591,7 @@ class Pterodactyl(commands.Cog):
topic_text = await config.topic()
topic_hostname = await config.topic_hostname()
topic_port = await config.topic_port()
embed = discord.Embed(color=await ctx.embed_color(), title="Pterodactyl Configuration")
embed = discord.Embed(color = await ctx.embed_color(), title="Pterodactyl Configuration")
embed.description = f"""**Base URL:** {base_url}
**Server ID:** `{server_id}`
**Console Channel:** <#{console_channel}>
@ -607,19 +607,19 @@ class Pterodactyl(commands.Cog):
**Topic Hostname:** `{topic_hostname}`
**Topic Port:** `{topic_port}`
**Topic Text:** {box(topic_text, "yaml")}
**Topic Text:** {box(topic_text, 'yaml')}
**Chat Command:** {box(chat_command, "json")}
**Chat Regex:** {box(chat_regex, "re")}
**Server Regex:** {box(server_regex, "re")}
**Join Regex:** {box(join_regex, "re")}
**Leave Regex:** {box(leave_regex, "re")}
**Achievement Regex:** {box(achievement_regex, "re")}"""
**Chat Command:** {box(chat_command, 'json')}
**Chat Regex:** {box(chat_regex, 're')}
**Server Regex:** {box(server_regex, 're')}
**Join Regex:** {box(join_regex, 're')}
**Leave Regex:** {box(leave_regex, 're')}
**Achievement Regex:** {box(achievement_regex, 're')}"""
await ctx.send(embed=embed)
if not len(regex_blacklist) == 0:
regex_blacklist_embed = discord.Embed(color=await ctx.embed_color(), title="Regex Blacklist")
regex_blacklist_embed = discord.Embed(color = await ctx.embed_color(), title="Regex Blacklist")
for name, regex in regex_blacklist.items():
regex_blacklist_embed.add_field(name=name, value=box(regex, "re"), inline=False)
regex_blacklist_embed.add_field(name=name, value=box(regex, 're'), inline=False)
await ctx.send(embed=regex_blacklist_embed)
def get_bool_str(self, inp: bool) -> str:

View file

@ -1,15 +1,13 @@
# pylint: disable=cyclic-import
import json
import re
from pathlib import Path
from typing import Any, Optional, Tuple, Union
from typing import Optional, Union
import aiohttp
import discord
import websockets
from pydactyl import PterodactylClient
from redbot.core.data_manager import bundled_data_path
from redbot.core.utils.chat_formatting import bold, pagify
from websockets.asyncio.client import connect
from pterodactyl.config import config
from pterodactyl.logger import logger, websocket_logger
@ -19,106 +17,93 @@ from pterodactyl.pterodactyl import Pterodactyl
async def establish_websocket_connection(coginstance: Pterodactyl) -> None:
await coginstance.bot.wait_until_red_ready()
base_url = await config.base_url()
base_url = base_url[:-1] if base_url.endswith("/") else base_url
base_url = base_url[:-1] if base_url.endswith('/') else base_url
logger.info("Establishing WebSocket connection")
websocket_credentials = await retrieve_websocket_credentials(coginstance)
async with connect(websocket_credentials["data"]["socket"], origin=base_url, ping_timeout=60, logger=websocket_logger) as websocket:
async with websockets.connect(websocket_credentials['data']['socket'], origin=base_url, ping_timeout=60, logger=websocket_logger) as websocket:
logger.info("WebSocket connection established")
auth_message = json.dumps({"event": "auth", "args": [websocket_credentials["data"]["token"]]})
auth_message = json.dumps({"event": "auth", "args": [websocket_credentials['data']['token']]})
await websocket.send(auth_message)
logger.info("Authentication message sent")
coginstance.websocket = websocket
while True: # pylint: disable=too-many-nested-blocks
while True: # pylint: disable=too-many-nested-blocks
message = json.loads(await websocket.recv())
if message["event"] in ("token expiring", "token expired"):
logger.verbose("Received message: %s", message)
if message['event'] in ('token expiring', 'token expired'):
logger.info("Received token expiring/expired event. Refreshing token.")
websocket_credentials = await retrieve_websocket_credentials(coginstance)
auth_message = json.dumps({"event": "auth", "args": [websocket_credentials["data"]["token"]]})
auth_message = json.dumps({"event": "auth", "args": [websocket_credentials['data']['token']]})
await websocket.send(auth_message)
logger.info("Authentication message sent")
if message["event"] == "auth success":
if message['event'] == 'auth success':
logger.info("WebSocket authentication successful")
if message["event"] == "console output" and await config.console_channel() is not None:
regex_blacklist: dict = await config.regex_blacklist()
matches = [re.search(regex, message["args"][0]) for regex in regex_blacklist.values()]
if message['event'] == 'console output':
content = remove_ansi_escape_codes(message['args'][0])
if await config.mask_ip() is True:
content = mask_ip(content)
logger.verbose("Received console output: %s", content)
if await config.current_status() in ("running", "") and not any(matches):
content = remove_ansi_escape_codes(message["args"][0])
if await config.mask_ip() is True:
content = mask_ip(content)
if await config.console_channel() is not None:
regex_blacklist: dict = await config.regex_blacklist()
matches = [re.search(regex, message['args'][0]) for regex in regex_blacklist.values()]
console_channel = coginstance.bot.get_channel(await config.console_channel())
assert isinstance(console_channel, discord.abc.Messageable)
chat_channel = coginstance.bot.get_channel(await config.chat_channel())
assert isinstance(chat_channel, discord.abc.Messageable)
if console_channel is not None:
if content.startswith("["):
pagified_content = pagify(content, delims=[" ", "\n"])
for page in pagified_content:
await console_channel.send(content=page, allowed_mentions=discord.AllowedMentions.none())
if await config.current_status() in ('running', '') and not any(matches):
console_channel = coginstance.bot.get_channel(await config.console_channel())
chat_channel = coginstance.bot.get_channel(await config.chat_channel())
if console_channel is not None:
if content.startswith('['):
pagified_content = pagify(content, delims=[" ", "\n"])
for page in pagified_content:
await console_channel.send(content=page, allowed_mentions=discord.AllowedMentions.none())
server_message = await check_if_server_message(content)
if server_message:
if chat_channel is not None:
await chat_channel.send(server_message if len(server_message) < 2000 else server_message[:1997] + "...", allowed_mentions=discord.AllowedMentions.none())
server_message = await check_if_server_message(content)
if server_message:
if chat_channel is not None:
await chat_channel.send(server_message if len(server_message) < 2000 else server_message[:1997] + '...', allowed_mentions=discord.AllowedMentions.none())
chat_message = await check_if_chat_message(content)
if chat_message:
info = await get_info(chat_message["username"])
if info is not None:
await send_chat_discord(coginstance, chat_message["username"], chat_message["message"], info["data"]["player"]["avatar"])
else:
await send_chat_discord(coginstance, chat_message["username"], chat_message["message"], "https://seafsh.cc/u/j3AzqQ.png")
join_message = await check_if_join_message(content)
if join_message:
if chat_channel is not None:
if coginstance.bot.embed_requested(chat_channel):
embed, img = await generate_join_leave_embed(coginstance=coginstance, username=join_message, join=True)
if img:
with open(img, "rb") as file:
await chat_channel.send(embed=embed, file=discord.File(fp=file))
else:
await chat_channel.send(embed=embed)
chat_message = await check_if_chat_message(content)
if chat_message:
info = await get_info(chat_message['username'])
if info is not None:
await send_chat_discord(coginstance, chat_message['username'], chat_message['message'], info['data']['player']['avatar'])
else:
await chat_channel.send(f"{join_message} joined the game", allowed_mentions=discord.AllowedMentions.none())
await send_chat_discord(coginstance, chat_message['username'], chat_message['message'], 'https://seafsh.cc/u/j3AzqQ.png')
leave_message = await check_if_leave_message(content)
if leave_message:
if chat_channel is not None:
if coginstance.bot.embed_requested(chat_channel):
embed, img = await generate_join_leave_embed(coginstance=coginstance, username=leave_message, join=False)
if img:
with open(img, "rb") as file:
await chat_channel.send(embed=embed, file=discord.File(fp=file))
join_message = await check_if_join_message(content)
if join_message:
if chat_channel is not None:
if coginstance.bot.embed_requested(chat_channel):
await chat_channel.send(embed=await generate_join_leave_embed(join_message, True))
else:
await chat_channel.send(embed=embed)
else:
await chat_channel.send(f"{leave_message} left the game", allowed_mentions=discord.AllowedMentions.none())
await chat_channel.send(f"{join_message} joined the game", allowed_mentions=discord.AllowedMentions.none())
achievement_message = await check_if_achievement_message(content)
if achievement_message:
if chat_channel is not None:
if coginstance.bot.embed_requested(chat_channel):
embed, img = await generate_achievement_embed(coginstance, achievement_message["username"], achievement_message["achievement"], achievement_message["challenge"])
if img:
await chat_channel.send(embed=embed, file=discord.File(fp=img))
leave_message = await check_if_leave_message(content)
if leave_message:
if chat_channel is not None:
if coginstance.bot.embed_requested(chat_channel):
await chat_channel.send(embed=await generate_join_leave_embed(leave_message, False))
else:
await chat_channel.send(embed=embed)
else:
await chat_channel.send(f"{achievement_message['username']} has {'completed the challenge' if achievement_message['challenge'] else 'made the advancement'} {achievement_message['achievement']}")
await chat_channel.send(f"{leave_message} left the game", allowed_mentions=discord.AllowedMentions.none())
if message["event"] == "status":
achievement_message = await check_if_achievement_message(content)
if achievement_message:
if chat_channel is not None:
if coginstance.bot.embed_requested(chat_channel):
await chat_channel.send(embed=await generate_achievement_embed(achievement_message['username'], achievement_message['achievement'], achievement_message['challenge']))
else:
await chat_channel.send(f"{achievement_message['username']} has {'completed the challenge' if achievement_message['challenge'] else 'made the advancement'} {achievement_message['achievement']}")
if message['event'] == 'status':
old_status = await config.current_status()
current_status = message["args"][0]
current_status = message['args'][0]
if old_status != current_status:
await config.current_status.set(current_status)
if await config.console_channel() is not None:
@ -126,92 +111,81 @@ async def establish_websocket_connection(coginstance: Pterodactyl) -> None:
if console is not None:
await console.send(f"Server status changed! `{current_status}`")
if await config.chat_channel() is not None:
if current_status == "running" and await config.startup_msg() is not None:
if current_status == 'running' and await config.startup_msg() is not None:
chat = coginstance.bot.get_channel(await config.chat_channel())
if chat is not None:
await chat.send(await config.startup_msg())
if current_status == "stopping" and await config.shutdown_msg() is not None:
if current_status == 'stopping' and await config.shutdown_msg() is not None:
chat = coginstance.bot.get_channel(await config.chat_channel())
if chat is not None:
await chat.send(await config.shutdown_msg())
async def retrieve_websocket_credentials(coginstance: Pterodactyl) -> dict:
async def retrieve_websocket_credentials(coginstance: Pterodactyl) -> Optional[dict]:
pterodactyl_keys = await coginstance.bot.get_shared_api_tokens("pterodactyl")
api_key = pterodactyl_keys.get("api_key")
if api_key is None:
coginstance.maybe_cancel_task()
coginstance.task.cancel()
raise ValueError("Pterodactyl API key not set. Please set it using `[p]set api`.")
base_url = await config.base_url()
if base_url is None:
coginstance.maybe_cancel_task()
coginstance.task.cancel()
raise ValueError("Pterodactyl base URL not set. Please set it using `[p]pterodactyl config url`.")
server_id = await config.server_id()
if server_id is None:
coginstance.maybe_cancel_task()
coginstance.task.cancel()
raise ValueError("Pterodactyl server ID not set. Please set it using `[p]pterodactyl config serverid`.")
client = PterodactylClient(base_url, api_key).client
coginstance.client = client
websocket_credentials: dict[str, Any] = client.servers.get_websocket(server_id).json()
if not websocket_credentials:
coginstance.maybe_cancel_task()
raise ValueError("Failed to retrieve websocket credentials. Please ensure the API details are correctly configured.")
logger.debug(
"""Websocket connection details retrieved:
websocket_credentials = client.servers.get_websocket(server_id)
logger.debug("""Websocket connection details retrieved:
Socket: %s
Token: %s...""",
websocket_credentials["data"]["socket"],
websocket_credentials["data"]["token"][:20],
)
websocket_credentials['data']['socket'],
websocket_credentials['data']['token'][:20]
)
return websocket_credentials
# NOTE - The token is truncated to prevent it from being logged in its entirety, for security reasons
# The token is truncated to prevent it from being logged in its entirety, for security reasons
def remove_ansi_escape_codes(text: str) -> str:
ansi_escape = re.compile(r"\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])")
# NOTE - https://chat.openai.com/share/d92f9acf-d776-4fd6-a53f-b14ac15dd540
return ansi_escape.sub("", text)
ansi_escape = re.compile(r'\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])')
# https://chatgpt.com/share/d92f9acf-d776-4fd6-a53f-b14ac15dd540
return ansi_escape.sub('', text)
async def check_if_server_message(text: str) -> Optional[str]:
async def check_if_server_message(text: str) -> Union[bool, str]:
regex = await config.server_regex()
match: Optional[re.Match[str]] = re.match(regex, text)
if match:
logger.trace("Message is a server message")
logger.debug("Message is a server message")
return match.group(1)
return None
return False
async def check_if_chat_message(text: str) -> Optional[dict]:
async def check_if_chat_message(text: str) -> Union[bool, dict]:
regex = await config.chat_regex()
match: Optional[re.Match[str]] = re.match(regex, text)
if match:
groups = {"username": match.group(1), "message": match.group(2)}
logger.trace("Message is a chat message\n%s", json.dumps(groups))
logger.debug("Message is a chat message\n%s", json.dumps(groups))
return groups
return None
return False
async def check_if_join_message(text: str) -> Optional[str]:
async def check_if_join_message(text: str) -> Union[bool, str]:
regex = await config.join_regex()
match: Optional[re.Match[str]] = re.match(regex, text)
if match:
logger.trace("Message is a join message")
logger.debug("Message is a join message")
return match.group(1)
return None
return False
async def check_if_leave_message(text: str) -> Optional[str]:
async def check_if_leave_message(text: str) -> Union[bool, str]:
regex = await config.leave_regex()
match: Optional[re.Match[str]] = re.match(regex, text)
if match:
logger.trace("Message is a leave message")
logger.debug("Message is a leave message")
return match.group(1)
return None
return False
async def check_if_achievement_message(text: str) -> Optional[dict]:
async def check_if_achievement_message(text: str) -> Union[bool, dict]:
regex = await config.achievement_regex()
match: Optional[re.Match[str]] = re.match(regex, text)
if match:
@ -220,25 +194,23 @@ async def check_if_achievement_message(text: str) -> Optional[dict]:
groups["challenge"] = True
else:
groups["challenge"] = False
logger.trace("Message is an achievement message")
logger.debug("Message is an achievement message")
return groups
return None
return False
async def get_info(username: str) -> Optional[dict]:
logger.verbose("Retrieving player info for %s", username)
logger.debug("Retrieving player info for %s", username)
endpoint = await config.api_endpoint()
async with aiohttp.ClientSession() as session:
async with session.get(f"https://playerdb.co/api/player/{endpoint}/{username}") as response:
if response.status == 200:
logger.verbose("Player info retrieved for %s", username)
logger.debug("Player info retrieved for %s", username)
return await response.json()
logger.warning("Failed to retrieve player info for %s: %s", username, response.status)
logger.error("Failed to retrieve player info for %s: %s", username, response.status)
return None
async def send_chat_discord(coginstance: Pterodactyl, username: str, message: str, avatar_url: str) -> None:
logger.trace("Sending chat message to Discord")
logger.debug("Sending chat message to Discord")
channel = coginstance.bot.get_channel(await config.chat_channel())
if channel is not None:
webhooks = await channel.webhooks()
@ -246,44 +218,37 @@ async def send_chat_discord(coginstance: Pterodactyl, username: str, message: st
if webhook is None:
webhook = await channel.create_webhook(name="Pterodactyl Chat")
await webhook.send(content=message, username=username, avatar_url=avatar_url, allowed_mentions=discord.AllowedMentions(everyone=False, roles=False, users=True))
logger.trace("Chat message sent to Discord")
logger.debug("Chat message sent to Discord")
else:
logger.warning("Chat channel not set. Skipping sending chat message to Discord")
async def generate_join_leave_embed(coginstance: Pterodactyl, username: str, join: bool) -> Tuple[discord.Embed, Optional[Union[str, Path]]]:
async def generate_join_leave_embed(username: str, join: bool) -> discord.Embed:
embed = discord.Embed()
embed.color = discord.Color.green() if join else discord.Color.red()
embed.description = await config.join_msg() if join else await config.leave_msg()
info = await get_info(username)
if info:
img = None
embed.set_author(name=username, icon_url=info["data"]["player"]["avatar"])
embed.set_author(name=username, icon_url=info['data']['player']['avatar'])
else:
img = bundled_data_path(coginstance) / "unknown.png"
embed.set_author(name=username, icon_url="attachment://unknown.png")
embed.set_author(name=username, icon_url='https://seafsh.cc/u/j3AzqQ.png')
embed.timestamp = discord.utils.utcnow()
return embed, img
return embed
async def generate_achievement_embed(coginstance: Pterodactyl, username: str, achievement: str, challenge: bool) -> Tuple[discord.Embed, Optional[Union[str, Path]]]:
async def generate_achievement_embed(username: str, achievement: str, challenge: bool) -> discord.Embed:
embed = discord.Embed()
embed.color = discord.Color.from_str("#a800a7") if challenge else discord.Color.from_str("#54fb54")
embed.color = discord.Color.from_str('#a800a7') if challenge else discord.Color.from_str('#54fb54')
embed.description = f"{bold(username)} has {'completed the challenge' if challenge else 'made the advancement'} {bold(achievement)}"
info = await get_info(username)
if info:
img = None
embed.set_author(name=username, icon_url=info["data"]["player"]["avatar"])
embed.set_author(name=username, icon_url=info['data']['player']['avatar'])
else:
img = bundled_data_path(coginstance) / "unknown.png"
embed.set_author(name=username, icon_url="attachment://unknown.png")
embed.set_author(name=username, icon_url='https://seafsh.cc/u/j3AzqQ.png')
embed.timestamp = discord.utils.utcnow()
return embed, img
return embed
def mask_ip(string: str) -> str:
def check(match: re.Match[str]):
ip = match.group(0)
return ".".join(r"\*" * len(octet) for octet in ip.split("."))
return re.sub(r"\b(?:\d{1,3}\.){3}\d{1,3}\b", check, string)
masked_ip = '.'.join(r'\*' * len(octet) for octet in ip.split('.'))
return masked_ip
return re.sub(r'\b(?:\d{1,3}\.){3}\d{1,3}\b', check, string)

View file

@ -1,42 +1,44 @@
[project]
[tool.poetry]
name = "seacogs"
version = "0.1.0"
description = "My assorted cogs for Red-DiscordBot."
authors = [{ name = "cswimr", email = "seaswimmerthefsh@gmail.com" }]
license = { file = "LICENSE" }
authors = ["SeaswimmerTheFsh"]
license = "MPL 2"
readme = "README.md"
requires-python = ">=3.11"
dependencies = [
"aiosqlite>=0.20.0",
"beautifulsoup4>=4.12.3",
"colorthief>=0.2.1",
"markdownify>=0.14.1",
"numpy>=2.2.2",
"phx-class-registry>=5.1.1",
"pillow>=10.4.0",
"pip>=25.0",
"py-dactyl",
"pydantic>=2.10.6",
"red-discordbot>=3.5.14",
"watchdog>=6.0.0",
"websockets>=14.2",
]
package-mode = false
[dependency-groups]
documentation = [
"mkdocs>=1.6.1",
"mkdocs-git-authors-plugin>=0.9.2",
"mkdocs-git-revision-date-localized-plugin>=1.3.0",
"mkdocs-material[imaging]>=9.5.50",
"mkdocs-redirects>=1.2.2",
"mkdocstrings[python]>=0.27.0",
]
[tool.poetry.dependencies]
python = ">=3.11,<3.12"
Red-DiscordBot = "^3.5.5"
py-dactyl = "^2.0.4"
websockets = "^12.0"
pillow = "^10.3.0"
numpy = "^1.26.4"
colorthief = "^0.2.1"
beautifulsoup4 = "^4.12.3"
markdownify = "^0.12.1"
[tool.uv]
dev-dependencies = ["pylint>=3.3.3", "ruff>=0.9.3", "sqlite-web>=0.6.4"]
[tool.poetry.group.dev]
optional = true
[tool.uv.sources]
py-dactyl = { git = "https://github.com/cswimr/pydactyl" }
[tool.poetry.group.dev.dependencies]
ruff = "^0.3.1"
pylint = "^3.1.0"
[tool.poetry.group.docs]
optional = true
[tool.poetry.group.docs.dependencies]
mkdocs = "1.5.3"
mkdocstrings = {extras = ["python"], version = "0.24.0"}
mkdocs-git-authors-plugin = "0.7.2"
mkdocs-git-revision-date-localized-plugin = "1.2.2"
mkdocs-material = {extras = ["imaging"], version = "^9.5.2"}
mkdocs-redirects = "^1.2.1"
[build-system]
requires = ["poetry-core"]
build-backend = "poetry.core.masonry.api"
[tool.ruff]
# Exclude a variety of commonly ignored directories.
@ -80,32 +82,8 @@ target-version = "py311"
# Enable Pyflakes (`F`) and a subset of the pycodestyle (`E`) codes by default.
# Unlike Flake8, Ruff doesn't enable pycodestyle warnings (`W`) or
# McCabe complexity (`C901`) by default.
select = [
"I",
"N",
"F",
"W",
"E",
"G",
"A",
"COM",
"INP",
"T20",
"PLC",
"PLE",
"PLW",
"PLR",
"LOG",
"SLF",
"ERA",
"FIX",
"PERF",
"C4",
"EM",
"RET",
"RSE",
]
ignore = ["PLR0911", "PLR0912", "PLR0915", "PLR2004", "PLR0913", "EM101"]
select = ["F", "W", "E", "C901"]
ignore = ["C901"]
# Allow fix for all enabled rules (when `--fix`) is provided.
fixable = ["ALL"]

View file

@ -1,5 +0,0 @@
{
"extends": [
"local>cc/renovate-config"
]
}

View file

@ -1,6 +1,6 @@
{
"author" : ["cswimr"],
"install_msg" : "Thank you for installing SeaUtils!\nYou can find the source code of this cog [here](https://coastalcommits.com/cswimr/SeaCogs).",
"author" : ["SeaswimmerTheFsh (seasw.)"],
"install_msg" : "Thank you for installing SeaUtils!\nYou can find the source code of this cog [here](https://coastalcommits.com/SeaswimmerTheFsh/SeaCogs).",
"name" : "SeaUtils",
"short" : "A collection of useful utilities.",
"description" : "A collection of useful utilities.",

View file

@ -29,33 +29,28 @@ from redbot.core.utils.views import SimpleMenu
def md(soup: BeautifulSoup, **options) -> Any | str:
return MarkdownConverter(**options).convert_soup(soup=soup)
def format_rfc_text(text: str, number: int) -> str:
one: str = re.sub(r"\(\.\/rfc(\d+)", r"(https://www.rfc-editor.org/rfc/rfc\1.html", text)
two: str = re.sub(r"\((#(?:section|page)-\d+(?:.\d+)?)\)", f"(https://www.rfc-editor.org/rfc/rfc{number}.html\1)", one)
three: str = re.sub(r"\n{3,}", "\n\n", two)
return three
class SeaUtils(commands.Cog):
"""A collection of random utilities."""
__author__ = ["[cswimr](https://www.coastalcommits.com/cswimr)"]
__git__ = "https://www.coastalcommits.com/cswimr/SeaCogs"
__version__ = "1.0.2"
__documentation__ = "https://seacogs.coastalcommits.com/seautils/"
__author__ = ["SeaswimmerTheFsh"]
__version__ = "1.0.0"
def __init__(self, bot: Red) -> None:
self.bot = bot
def format_help_for_context(self, ctx: commands.Context) -> str:
pre_processed = super().format_help_for_context(ctx) or ""
pre_processed = super().format_help_for_context(ctx=ctx) or ""
n = "\n" if "\n\n" not in pre_processed else ""
text = [
f"{pre_processed}{n}",
f"{cf.bold('Cog Version:')} [{self.__version__}]({self.__git__})",
f"{cf.bold('Author:')} {cf.humanize_list(self.__author__)}",
f"{cf.bold('Documentation:')} {self.__documentation__}",
f"Cog Version: **{self.__version__}**",
f"Author: {cf.humanize_list(items=self.__author__)}"
]
return "\n".join(text)
@ -74,9 +69,9 @@ class SeaUtils(commands.Cog):
src = obj.function
return inspect.getsource(object=src)
@commands.command(aliases=["source", "src", "code", "showsource"]) # type: ignore
@commands.command(aliases=["source", "src", "code", "showsource"])
@commands.is_owner()
async def showcode(self, ctx: commands.Context, *, object: str) -> None: # pylint: disable=redefined-builtin # noqa: A002
async def showcode(self, ctx: commands.Context, *, object: str) -> None: # pylint: disable=redefined-builtin
"""Show the code for a particular object."""
try:
if object.startswith("/") and (obj := ctx.bot.tree.get_command(object[1:])):
@ -87,7 +82,11 @@ class SeaUtils(commands.Cog):
text = self.format_src(obj)
else:
raise AttributeError
temp_content = cf.pagify(text=cleanup_code(text), escape_mass_mentions=True, page_length=1977)
temp_content = cf.pagify(
text=cleanup_code(text),
escape_mass_mentions=True,
page_length = 1977
)
content = []
max_i = operator.length_hint(temp_content)
i = 1
@ -102,7 +101,7 @@ class SeaUtils(commands.Cog):
else:
await ctx.send(content="Object not found!", reference=ctx.message.to_reference(fail_if_not_exists=False))
@commands.command(name="dig", aliases=["dnslookup", "nslookup"]) # type: ignore
@commands.command(name='dig', aliases=['dnslookup', 'nslookup'])
@commands.is_owner()
async def dig(self, ctx: commands.Context, name: str, record_type: str | None = None, server: str | None = None, port: int = 53) -> None:
"""Retrieve DNS information for a domain.
@ -110,13 +109,13 @@ class SeaUtils(commands.Cog):
Uses `dig` to perform a DNS query. Will fall back to `nslookup` if `dig` is not installed on the system.
`nslookup` does not provide as much information as `dig`, so only the `name` parameter will be used if `nslookup` is used.
Will return the A, AAAA, and CNAME records for a domain by default. You can specify a different record type with the `type` parameter."""
command_opts: list[str] = ["dig"]
query_types: list[str] = [record_type] if record_type else ["A", "AAAA", "CNAME"]
command_opts: list[str | int] = ['dig']
query_types: list[str] = [record_type] if record_type else ['A', 'AAAA', 'CNAME']
if server:
command_opts.extend(["@", server])
command_opts.extend(['@', server])
for query_type in query_types:
command_opts.extend([name, query_type])
command_opts.extend(["-p", str(port), "+yaml"])
command_opts.extend(['-p', str(port), '+yaml'])
try:
process: Process = await asyncio.create_subprocess_exec(*command_opts, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE)
@ -125,18 +124,22 @@ class SeaUtils(commands.Cog):
await ctx.maybe_send_embed(message="An error was encountered!\n" + cf.box(text=stderr.decode()))
else:
data = yaml.safe_load(stdout.decode())
message_data: dict = data[0]["message"]
response_data: dict = message_data["response_message_data"]
message_data: dict = data[0]['message']
response_data: dict = message_data['response_message_data']
if ctx.embed_requested():
embed = Embed(title="DNS Query Result", color=await ctx.embed_color(), timestamp=message_data["response_time"])
embed.add_field(name="Response Address", value=message_data["response_address"], inline=True)
embed.add_field(name="Response Port", value=message_data["response_port"], inline=True)
embed.add_field(name="Query Address", value=message_data["query_address"], inline=True)
embed.add_field(name="Query Port", value=message_data["query_port"], inline=True)
embed.add_field(name="Status", value=response_data["status"], inline=True)
embed.add_field(name="Flags", value=response_data["flags"], inline=True)
embed = Embed(
title="DNS Query Result",
color=await ctx.embed_color(),
timestamp=message_data['response_time']
)
embed.add_field(name="Response Address", value=message_data['response_address'], inline=True)
embed.add_field(name="Response Port", value=message_data['response_port'], inline=True)
embed.add_field(name="Query Address", value=message_data['query_address'], inline=True)
embed.add_field(name="Query Port", value=message_data['query_port'], inline=True)
embed.add_field(name="Status", value=response_data['status'], inline=True)
embed.add_field(name="Flags", value=response_data['flags'], inline=True)
if response_data.get("status") != "NOERROR":
if response_data.get('status') != 'NOERROR':
embed.colour = Color.red()
embed.description = cf.error("Dig query did not return `NOERROR` status.")
@ -144,19 +147,19 @@ class SeaUtils(commands.Cog):
answers = []
authorities = []
for m in data:
response = m["message"]["response_message_data"]
if "QUESTION_SECTION" in response:
for question in response["QUESTION_SECTION"]:
response = m['message']['response_message_data']
if 'QUESTION_SECTION' in response:
for question in response['QUESTION_SECTION']:
if question not in questions:
questions.append(question)
if "ANSWER_SECTION" in response:
for answer in response["ANSWER_SECTION"]:
if 'ANSWER_SECTION' in response:
for answer in response['ANSWER_SECTION']:
if answer not in answers:
answers.append(answer)
if "AUTHORITY_SECTION" in response:
for authority in response["AUTHORITY_SECTION"]:
if 'AUTHORITY_SECTION' in response:
for authority in response['AUTHORITY_SECTION']:
if authority not in authorities:
authorities.append(authority)
@ -176,22 +179,26 @@ class SeaUtils(commands.Cog):
embed.add_field(name="Authority Section", value=f"{cf.box(text=authority_section, lang='prolog')}", inline=False)
await ctx.send(embed=embed)
else:
await ctx.send(content=cf.box(text=str(stdout), lang="yaml"))
except FileNotFoundError:
await ctx.send(content=cf.box(text=stdout, lang='yaml'))
except (FileNotFoundError):
try:
ns_process = await asyncio.create_subprocess_exec("nslookup", name, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE)
ns_process = await asyncio.create_subprocess_exec('nslookup', name, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE)
ns_stdout, ns_stderr = await ns_process.communicate()
if ns_stderr:
await ctx.maybe_send_embed(message="An error was encountered!\n" + cf.box(text=ns_stderr.decode()))
else:
warning = cf.warning("`dig` is not installed! Defaulting to `nslookup`.\nThis command provides more information when `dig` is installed on the system.\n")
if await ctx.embed_requested():
embed = Embed(title="DNS Query Result", color=await ctx.embed_color(), timestamp=ctx.message.created_at)
embed = Embed(
title="DNS Query Result",
color=await ctx.embed_color(),
timestamp=ctx.message.created_at
)
embed.description = warning + cf.box(text=ns_stdout.decode())
await ctx.send(embed=embed)
else:
await ctx.send(content=warning + cf.box(text=ns_stdout.decode()))
except FileNotFoundError:
await ctx.send(content = warning + cf.box(text=ns_stdout.decode()))
except (FileNotFoundError):
await ctx.maybe_send_embed(message=cf.error("Neither `dig` nor `nslookup` are installed on the system. Unable to resolve DNS query."))
@commands.command()
@ -199,34 +206,45 @@ class SeaUtils(commands.Cog):
"""Retrieve the text of an RFC document.
This command uses the [RFC Editor website](https://www.rfc-editor.org/) to fetch the text of an RFC document.
A [Request for Comments (RFC)](https://en.wikipedia.org/wiki/Request_for_Comments) is a publication in a series from the principal technical development and standards-setting bodies for the [Internet](https://en.wikipedia.org/wiki/Internet), most prominently the [Internet Engineering Task Force](https://en.wikipedia.org/wiki/Internet_Engineering_Task_Force). An RFC is authored by individuals or groups of engineers and [computer scientists](https://en.wikipedia.org/wiki/Computer_scientist) in the form of a [memorandum](https://en.wikipedia.org/wiki/Memorandum) describing methods, behaviors, research, or innovations applicable to the working of the Internet and Internet-connected systems. It is submitted either for [peer review](https://en.wikipedia.org/wiki/Peer_review) or to convey new concepts, information, or, occasionally, engineering humor.""" # noqa: E501
A [Request for Comments (RFC)](https://en.wikipedia.org/wiki/Request_for_Comments) is a publication in a series from the principal technical development and standards-setting bodies for the [Internet](https://en.wikipedia.org/wiki/Internet), most prominently the [Internet Engineering Task Force](https://en.wikipedia.org/wiki/Internet_Engineering_Task_Force). An RFC is authored by individuals or groups of engineers and [computer scientists](https://en.wikipedia.org/wiki/Computer_scientist) in the form of a [memorandum](https://en.wikipedia.org/wiki/Memorandum) describing methods, behaviors, research, or innovations applicable to the working of the Internet and Internet-connected systems. It is submitted either for [peer review](https://en.wikipedia.org/wiki/Peer_review) or to convey new concepts, information, or, occasionally, engineering humor.""" # noqa: E501
url = f"https://www.rfc-editor.org/rfc/rfc{number}.html"
datatracker_url = f"https://datatracker.ietf.org/doc/rfc{number}"
async with aiohttp.ClientSession() as session:
async with session.get(url=url) as response:
if response.status == 200:
html = await response.text()
soup = BeautifulSoup(html, "html.parser")
pre_tags = soup.find_all("pre")
content: list[str | Embed] = []
soup = BeautifulSoup(html, 'html.parser')
pre_tags = soup.find_all('pre')
content: list[Embed | str] = []
for pre_tag in pre_tags:
text = format_rfc_text(md(pre_tag), number)
if len(text) > 4096:
pagified_text = cf.pagify(text, delims=["\n\n"], page_length=4096)
for page in pagified_text:
if await ctx.embed_requested():
embed = Embed(title=f"RFC Document {number}", url=datatracker_url, description=page, color=await ctx.embed_color())
embed = Embed(
title=f"RFC Document {number}",
url=datatracker_url,
description=page,
color=await ctx.embed_color()
)
content.append(embed)
else:
content.append(page)
elif await ctx.embed_requested():
embed = Embed(title=f"RFC Document {number}", url=datatracker_url, description=text, color=await ctx.embed_color())
content.append(embed)
else:
content.append(text)
if await ctx.embed_requested():
embed = Embed(
title=f"RFC Document {number}",
url=datatracker_url,
description=text,
color=await ctx.embed_color()
)
content.append(embed)
else:
content.append(text)
if await ctx.embed_requested():
for embed in content:
embed.set_footer(text=f"Page {content.index(embed) + 1}/{len(content)}")
await SimpleMenu(pages=content, disable_after_timeout=True, timeout=300).start(ctx) # type: ignore
await SimpleMenu(pages=content, disable_after_timeout=True, timeout=300).start(ctx)
else:
await ctx.maybe_send_embed(message=cf.error(f"An error occurred while fetching RFC {number}. Status code: {response.status}."))
await ctx.maybe_send_embed(content=cf.error(f"An error occurred while fetching RFC {number}. Status code: {response.status}."))

1984
uv.lock generated

File diff suppressed because it is too large Load diff