Compare commits
17 commits
Author | SHA1 | Date | |
---|---|---|---|
9103d64fbc | |||
94b692a705 | |||
cba25d1f94 | |||
d557928fee | |||
418eeb983c | |||
45d0b32826 | |||
e88cba6cf4 | |||
eb41c1a72b | |||
c2333f40ef | |||
30003cd7c8 | |||
b210fa8bb0 | |||
7806aa29c5 | |||
8f55c6083a | |||
4795df7dcc | |||
14dfafea2a | |||
c3ab7593c7 | |||
ef591224cd |
12 changed files with 178 additions and 230 deletions
|
@ -18,5 +18,4 @@
|
||||||
import-self,
|
import-self,
|
||||||
relative-beyond-top-level,
|
relative-beyond-top-level,
|
||||||
too-many-instance-attributes,
|
too-many-instance-attributes,
|
||||||
duplicate-code,
|
duplicate-code
|
||||||
too-many-nested-blocks
|
|
||||||
|
|
|
@ -58,7 +58,7 @@ jobs:
|
||||||
npx -p "@getmeli/cli" meli upload ./site \
|
npx -p "@getmeli/cli" meli upload ./site \
|
||||||
--url "https://pages.coastalcommits.com" \
|
--url "https://pages.coastalcommits.com" \
|
||||||
--site "${{ vars.MELI_SITE_ID }}" \
|
--site "${{ vars.MELI_SITE_ID }}" \
|
||||||
--token "${{ secrets.MELI_SECRET }}" \
|
--token "${{ secrets.MELI_SITE_SECRET }}" \
|
||||||
--release "$CI_ACTION_REF_NAME_SLUG/${{ env.GITHUB_SHA }}" \
|
--release "$CI_ACTION_REF_NAME_SLUG/${{ env.GITHUB_SHA }}" \
|
||||||
--branch "$CI_ACTION_REF_NAME_SLUG"
|
--branch "$CI_ACTION_REF_NAME_SLUG"
|
||||||
|
|
||||||
|
|
|
@ -100,7 +100,7 @@ class Backup(commands.Cog):
|
||||||
except (json.JSONDecodeError, IndexError):
|
except (json.JSONDecodeError, IndexError):
|
||||||
try:
|
try:
|
||||||
export = json.loads(await ctx.message.reference.resolved.attachments[0].read())
|
export = json.loads(await ctx.message.reference.resolved.attachments[0].read())
|
||||||
except (json.JSONDecodeError, IndexError, AttributeError):
|
except (json.JSONDecodeError, IndexError):
|
||||||
await ctx.send(error("Please provide a valid JSON export file."))
|
await ctx.send(error("Please provide a valid JSON export file."))
|
||||||
return
|
return
|
||||||
|
|
||||||
|
|
|
@ -81,7 +81,6 @@ class PartialEmoji(discord.PartialEmoji):
|
||||||
with open(path, "r", encoding="UTF-8") as file:
|
with open(path, "r", encoding="UTF-8") as file:
|
||||||
emojis: dict = json.load(file)
|
emojis: dict = json.load(file)
|
||||||
emoji_aliases = []
|
emoji_aliases = []
|
||||||
emoji_group = None
|
|
||||||
for dict_name, group in emojis.items():
|
for dict_name, group in emojis.items():
|
||||||
for k, v in group.items():
|
for k, v in group.items():
|
||||||
if v == value:
|
if v == value:
|
||||||
|
|
49
poetry.lock
generated
49
poetry.lock
generated
|
@ -228,27 +228,6 @@ files = [
|
||||||
[package.extras]
|
[package.extras]
|
||||||
dev = ["freezegun (>=1.0,<2.0)", "pytest (>=6.0)", "pytest-cov"]
|
dev = ["freezegun (>=1.0,<2.0)", "pytest (>=6.0)", "pytest-cov"]
|
||||||
|
|
||||||
[[package]]
|
|
||||||
name = "beautifulsoup4"
|
|
||||||
version = "4.12.3"
|
|
||||||
description = "Screen-scraping library"
|
|
||||||
optional = false
|
|
||||||
python-versions = ">=3.6.0"
|
|
||||||
files = [
|
|
||||||
{file = "beautifulsoup4-4.12.3-py3-none-any.whl", hash = "sha256:b80878c9f40111313e55da8ba20bdba06d8fa3969fc68304167741bbf9e082ed"},
|
|
||||||
{file = "beautifulsoup4-4.12.3.tar.gz", hash = "sha256:74e3d1928edc070d21748185c46e3fb33490f22f52a3addee9aee0f4f7781051"},
|
|
||||||
]
|
|
||||||
|
|
||||||
[package.dependencies]
|
|
||||||
soupsieve = ">1.2"
|
|
||||||
|
|
||||||
[package.extras]
|
|
||||||
cchardet = ["cchardet"]
|
|
||||||
chardet = ["chardet"]
|
|
||||||
charset-normalizer = ["charset-normalizer"]
|
|
||||||
html5lib = ["html5lib"]
|
|
||||||
lxml = ["lxml"]
|
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "brotli"
|
name = "brotli"
|
||||||
version = "1.1.0"
|
version = "1.1.0"
|
||||||
|
@ -911,21 +890,6 @@ profiling = ["gprof2dot"]
|
||||||
rtd = ["jupyter_sphinx", "mdit-py-plugins", "myst-parser", "pyyaml", "sphinx", "sphinx-copybutton", "sphinx-design", "sphinx_book_theme"]
|
rtd = ["jupyter_sphinx", "mdit-py-plugins", "myst-parser", "pyyaml", "sphinx", "sphinx-copybutton", "sphinx-design", "sphinx_book_theme"]
|
||||||
testing = ["coverage", "pytest", "pytest-cov", "pytest-regressions"]
|
testing = ["coverage", "pytest", "pytest-cov", "pytest-regressions"]
|
||||||
|
|
||||||
[[package]]
|
|
||||||
name = "markdownify"
|
|
||||||
version = "0.12.1"
|
|
||||||
description = "Convert HTML to markdown."
|
|
||||||
optional = false
|
|
||||||
python-versions = "*"
|
|
||||||
files = [
|
|
||||||
{file = "markdownify-0.12.1-py3-none-any.whl", hash = "sha256:a3805abd8166dbb7b27783c5599d91f54f10d79894b2621404d85b333c7ce561"},
|
|
||||||
{file = "markdownify-0.12.1.tar.gz", hash = "sha256:1fb08c618b30e0ee7a31a39b998f44a18fb28ab254f55f4af06b6d35a2179e27"},
|
|
||||||
]
|
|
||||||
|
|
||||||
[package.dependencies]
|
|
||||||
beautifulsoup4 = ">=4.9,<5"
|
|
||||||
six = ">=1.15,<2"
|
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "markupsafe"
|
name = "markupsafe"
|
||||||
version = "2.1.5"
|
version = "2.1.5"
|
||||||
|
@ -2147,17 +2111,6 @@ files = [
|
||||||
{file = "smmap-5.0.1.tar.gz", hash = "sha256:dceeb6c0028fdb6734471eb07c0cd2aae706ccaecab45965ee83f11c8d3b1f62"},
|
{file = "smmap-5.0.1.tar.gz", hash = "sha256:dceeb6c0028fdb6734471eb07c0cd2aae706ccaecab45965ee83f11c8d3b1f62"},
|
||||||
]
|
]
|
||||||
|
|
||||||
[[package]]
|
|
||||||
name = "soupsieve"
|
|
||||||
version = "2.5"
|
|
||||||
description = "A modern CSS selector implementation for Beautiful Soup."
|
|
||||||
optional = false
|
|
||||||
python-versions = ">=3.8"
|
|
||||||
files = [
|
|
||||||
{file = "soupsieve-2.5-py3-none-any.whl", hash = "sha256:eaa337ff55a1579b6549dc679565eac1e3d000563bcb1c8ab0d0fefbc0c2cdc7"},
|
|
||||||
{file = "soupsieve-2.5.tar.gz", hash = "sha256:5663d5a7b3bfaeee0bc4372e7fc48f9cff4940b3eec54a6451cc5299f1097690"},
|
|
||||||
]
|
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "tinycss2"
|
name = "tinycss2"
|
||||||
version = "1.2.1"
|
version = "1.2.1"
|
||||||
|
@ -2498,4 +2451,4 @@ multidict = ">=4.0"
|
||||||
[metadata]
|
[metadata]
|
||||||
lock-version = "2.0"
|
lock-version = "2.0"
|
||||||
python-versions = ">=3.11,<3.12"
|
python-versions = ">=3.11,<3.12"
|
||||||
content-hash = "229d7fd39618cf708f3cd5409dde2e6e25b822e4f936e14b3ade9800bf00daab"
|
content-hash = "0ac382e0399d9c23c5f89a0ffeb3aae056dc8b28e864b22f815c0e3eb34175bd"
|
||||||
|
|
|
@ -15,8 +15,6 @@ websockets = "^12.0"
|
||||||
pillow = "^10.3.0"
|
pillow = "^10.3.0"
|
||||||
numpy = "^1.26.4"
|
numpy = "^1.26.4"
|
||||||
colorthief = "^0.2.1"
|
colorthief = "^0.2.1"
|
||||||
beautifulsoup4 = "^4.12.3"
|
|
||||||
markdownify = "^0.12.1"
|
|
||||||
|
|
||||||
[tool.poetry.group.dev]
|
[tool.poetry.group.dev]
|
||||||
optional = true
|
optional = true
|
||||||
|
|
|
@ -8,6 +8,5 @@
|
||||||
"hidden": true,
|
"hidden": true,
|
||||||
"disabled": false,
|
"disabled": false,
|
||||||
"min_bot_version": "3.5.0",
|
"min_bot_version": "3.5.0",
|
||||||
"min_python_version": [3, 8, 0],
|
"min_python_version": [3, 8, 0]
|
||||||
"requirements": ["beautifulsoup4", "markdownify"]
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -5,20 +5,13 @@
|
||||||
# ____) | __/ (_| \__ \\ V V /| | | | | | | | | | | | __/ |
|
# ____) | __/ (_| \__ \\ V V /| | | | | | | | | | | | __/ |
|
||||||
# |_____/ \___|\__,_|___/ \_/\_/ |_|_| |_| |_|_| |_| |_|\___|_|
|
# |_____/ \___|\__,_|___/ \_/\_/ |_|_| |_| |_|_| |_| |_|\___|_|
|
||||||
|
|
||||||
import asyncio
|
|
||||||
import inspect
|
import inspect
|
||||||
import operator
|
import operator
|
||||||
import re
|
|
||||||
from asyncio.subprocess import Process
|
|
||||||
from functools import partial, partialmethod
|
from functools import partial, partialmethod
|
||||||
from typing import Any
|
from typing import Any
|
||||||
|
|
||||||
import aiohttp
|
from discord import Embed, app_commands
|
||||||
import yaml
|
|
||||||
from bs4 import BeautifulSoup
|
|
||||||
from discord import Color, Embed, app_commands
|
|
||||||
from discord.utils import CachedSlotProperty, cached_property
|
from discord.utils import CachedSlotProperty, cached_property
|
||||||
from markdownify import MarkdownConverter
|
|
||||||
from redbot.core import commands
|
from redbot.core import commands
|
||||||
from redbot.core.bot import Red
|
from redbot.core.bot import Red
|
||||||
from redbot.core.dev_commands import cleanup_code
|
from redbot.core.dev_commands import cleanup_code
|
||||||
|
@ -26,38 +19,29 @@ from redbot.core.utils import chat_formatting as cf
|
||||||
from redbot.core.utils.views import SimpleMenu
|
from redbot.core.utils.views import SimpleMenu
|
||||||
|
|
||||||
|
|
||||||
def md(soup: BeautifulSoup, **options) -> Any | str:
|
|
||||||
return MarkdownConverter(**options).convert_soup(soup=soup)
|
|
||||||
|
|
||||||
def format_rfc_text(text: str, number: int) -> str:
|
|
||||||
one: str = re.sub(r"\(\.\/rfc(\d+)", r"(https://www.rfc-editor.org/rfc/rfc\1.html", text)
|
|
||||||
two: str = re.sub(r"\((#(?:section|page)-\d+(?:.\d+)?)\)", f"(https://www.rfc-editor.org/rfc/rfc{number}.html\1)", one)
|
|
||||||
three: str = re.sub(r"\n{3,}", "\n\n", two)
|
|
||||||
return three
|
|
||||||
|
|
||||||
class SeaUtils(commands.Cog):
|
class SeaUtils(commands.Cog):
|
||||||
"""A collection of random utilities."""
|
"""A collection of random utilities."""
|
||||||
|
|
||||||
__author__ = ["SeaswimmerTheFsh"]
|
__author__ = ["SeaswimmerTheFsh"]
|
||||||
__version__ = "1.0.0"
|
__version__ = "1.0.0"
|
||||||
|
|
||||||
def __init__(self, bot: Red) -> None:
|
def __init__(self, bot: Red):
|
||||||
self.bot = bot
|
self.bot = bot
|
||||||
|
|
||||||
def format_help_for_context(self, ctx: commands.Context) -> str:
|
def format_help_for_context(self, ctx: commands.Context) -> str:
|
||||||
pre_processed = super().format_help_for_context(ctx=ctx) or ""
|
pre_processed = super().format_help_for_context(ctx) or ""
|
||||||
n = "\n" if "\n\n" not in pre_processed else ""
|
n = "\n" if "\n\n" not in pre_processed else ""
|
||||||
text = [
|
text = [
|
||||||
f"{pre_processed}{n}",
|
f"{pre_processed}{n}",
|
||||||
f"Cog Version: **{self.__version__}**",
|
f"Cog Version: **{self.__version__}**",
|
||||||
f"Author: {cf.humanize_list(items=self.__author__)}"
|
f"Author: {cf.humanize_list(self.__author__)}"
|
||||||
]
|
]
|
||||||
return "\n".join(text)
|
return "\n".join(text)
|
||||||
|
|
||||||
def format_src(self, obj: Any) -> str:
|
def format_src(self, obj: Any) -> str:
|
||||||
"""A large portion of this code is repurposed from Zephyrkul's RTFS cog.
|
"""A large portion of this code is repurposed from Zephyrkul's RTFS cog.
|
||||||
https://github.com/Zephyrkul/FluffyCogs/blob/master/rtfs/rtfs.py"""
|
https://github.com/Zephyrkul/FluffyCogs/blob/master/rtfs/rtfs.py"""
|
||||||
obj = inspect.unwrap(func=obj)
|
obj = inspect.unwrap(obj)
|
||||||
src: Any = getattr(obj, "__func__", obj)
|
src: Any = getattr(obj, "__func__", obj)
|
||||||
if isinstance(obj, (commands.Command, app_commands.Command)):
|
if isinstance(obj, (commands.Command, app_commands.Command)):
|
||||||
src = obj.callback
|
src = obj.callback
|
||||||
|
@ -67,11 +51,11 @@ class SeaUtils(commands.Cog):
|
||||||
src = obj.fget
|
src = obj.fget
|
||||||
elif isinstance(obj, (cached_property, CachedSlotProperty)):
|
elif isinstance(obj, (cached_property, CachedSlotProperty)):
|
||||||
src = obj.function
|
src = obj.function
|
||||||
return inspect.getsource(object=src)
|
return inspect.getsource(src)
|
||||||
|
|
||||||
@commands.command(aliases=["source", "src", "code", "showsource"])
|
@commands.command(aliases=["source", "src", "code", "showsource"])
|
||||||
@commands.is_owner()
|
@commands.is_owner()
|
||||||
async def showcode(self, ctx: commands.Context, *, object: str) -> None: # pylint: disable=redefined-builtin
|
async def showcode(self, ctx: commands.Context, *, object: str): # pylint: disable=redefined-builtin
|
||||||
"""Show the code for a particular object."""
|
"""Show the code for a particular object."""
|
||||||
try:
|
try:
|
||||||
if object.startswith("/") and (obj := ctx.bot.tree.get_command(object[1:])):
|
if object.startswith("/") and (obj := ctx.bot.tree.get_command(object[1:])):
|
||||||
|
@ -80,8 +64,6 @@ class SeaUtils(commands.Cog):
|
||||||
text = self.format_src(type(obj))
|
text = self.format_src(type(obj))
|
||||||
elif obj := ctx.bot.get_command(object):
|
elif obj := ctx.bot.get_command(object):
|
||||||
text = self.format_src(obj)
|
text = self.format_src(obj)
|
||||||
else:
|
|
||||||
raise AttributeError
|
|
||||||
temp_content = cf.pagify(
|
temp_content = cf.pagify(
|
||||||
text=cleanup_code(text),
|
text=cleanup_code(text),
|
||||||
escape_mass_mentions=True,
|
escape_mass_mentions=True,
|
||||||
|
@ -100,151 +82,3 @@ class SeaUtils(commands.Cog):
|
||||||
await ctx.send(embed=embed, reference=ctx.message.to_reference(fail_if_not_exists=False))
|
await ctx.send(embed=embed, reference=ctx.message.to_reference(fail_if_not_exists=False))
|
||||||
else:
|
else:
|
||||||
await ctx.send(content="Object not found!", reference=ctx.message.to_reference(fail_if_not_exists=False))
|
await ctx.send(content="Object not found!", reference=ctx.message.to_reference(fail_if_not_exists=False))
|
||||||
|
|
||||||
@commands.command(name='dig', aliases=['dnslookup', 'nslookup'])
|
|
||||||
@commands.is_owner()
|
|
||||||
async def dig(self, ctx: commands.Context, name: str, record_type: str | None = None, server: str | None = None, port: int = 53) -> None:
|
|
||||||
"""Retrieve DNS information for a domain.
|
|
||||||
|
|
||||||
Uses `dig` to perform a DNS query. Will fall back to `nslookup` if `dig` is not installed on the system.
|
|
||||||
`nslookup` does not provide as much information as `dig`, so only the `name` parameter will be used if `nslookup` is used.
|
|
||||||
Will return the A, AAAA, and CNAME records for a domain by default. You can specify a different record type with the `type` parameter."""
|
|
||||||
command_opts: list[str | int] = ['dig']
|
|
||||||
query_types: list[str] = [record_type] if record_type else ['A', 'AAAA', 'CNAME']
|
|
||||||
if server:
|
|
||||||
command_opts.extend(['@', server])
|
|
||||||
for query_type in query_types:
|
|
||||||
command_opts.extend([name, query_type])
|
|
||||||
command_opts.extend(['-p', str(port), '+yaml'])
|
|
||||||
|
|
||||||
try:
|
|
||||||
process: Process = await asyncio.create_subprocess_exec(*command_opts, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE)
|
|
||||||
stdout, stderr = await process.communicate()
|
|
||||||
if stderr:
|
|
||||||
await ctx.maybe_send_embed(message="An error was encountered!\n" + cf.box(text=stderr.decode()))
|
|
||||||
else:
|
|
||||||
data = yaml.safe_load(stdout.decode())
|
|
||||||
message_data: dict = data[0]['message']
|
|
||||||
response_data: dict = message_data['response_message_data']
|
|
||||||
if ctx.embed_requested():
|
|
||||||
embed = Embed(
|
|
||||||
title="DNS Query Result",
|
|
||||||
color=await ctx.embed_color(),
|
|
||||||
timestamp=message_data['response_time']
|
|
||||||
)
|
|
||||||
embed.add_field(name="Response Address", value=message_data['response_address'], inline=True)
|
|
||||||
embed.add_field(name="Response Port", value=message_data['response_port'], inline=True)
|
|
||||||
embed.add_field(name="Query Address", value=message_data['query_address'], inline=True)
|
|
||||||
embed.add_field(name="Query Port", value=message_data['query_port'], inline=True)
|
|
||||||
embed.add_field(name="Status", value=response_data['status'], inline=True)
|
|
||||||
embed.add_field(name="Flags", value=response_data['flags'], inline=True)
|
|
||||||
|
|
||||||
if response_data.get('status') != 'NOERROR':
|
|
||||||
embed.colour = Color.red()
|
|
||||||
embed.description = cf.error("Dig query did not return `NOERROR` status.")
|
|
||||||
|
|
||||||
questions = []
|
|
||||||
answers = []
|
|
||||||
authorities = []
|
|
||||||
for m in data:
|
|
||||||
response = m['message']['response_message_data']
|
|
||||||
if 'QUESTION_SECTION' in response:
|
|
||||||
for question in response['QUESTION_SECTION']:
|
|
||||||
if question not in questions:
|
|
||||||
questions.append(question)
|
|
||||||
|
|
||||||
if 'ANSWER_SECTION' in response:
|
|
||||||
for answer in response['ANSWER_SECTION']:
|
|
||||||
if answer not in answers:
|
|
||||||
answers.append(answer)
|
|
||||||
|
|
||||||
if 'AUTHORITY_SECTION' in response:
|
|
||||||
for authority in response['AUTHORITY_SECTION']:
|
|
||||||
if authority not in authorities:
|
|
||||||
authorities.append(authority)
|
|
||||||
|
|
||||||
if questions:
|
|
||||||
question_section = "\n".join(questions)
|
|
||||||
embed.add_field(name="Question Section", value=f"{cf.box(text=question_section, lang='prolog')}", inline=False)
|
|
||||||
|
|
||||||
if answers:
|
|
||||||
answer_section = "\n".join(answers)
|
|
||||||
if len(answer_section) > 1024:
|
|
||||||
embed.description = cf.warning("Answer section is too long to fit within embed field, falling back to description.") + cf.box(answer_section)
|
|
||||||
else:
|
|
||||||
embed.add_field(name="Answer Section", value=f"{cf.box(text=answer_section, lang='prolog')}", inline=False)
|
|
||||||
|
|
||||||
if authorities:
|
|
||||||
authority_section = "\n".join(authorities)
|
|
||||||
embed.add_field(name="Authority Section", value=f"{cf.box(text=authority_section, lang='prolog')}", inline=False)
|
|
||||||
await ctx.send(embed=embed)
|
|
||||||
else:
|
|
||||||
await ctx.send(content=cf.box(text=stdout, lang='yaml'))
|
|
||||||
except (FileNotFoundError):
|
|
||||||
try:
|
|
||||||
ns_process = await asyncio.create_subprocess_exec('nslookup', name, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE)
|
|
||||||
ns_stdout, ns_stderr = await ns_process.communicate()
|
|
||||||
if ns_stderr:
|
|
||||||
await ctx.maybe_send_embed(message="An error was encountered!\n" + cf.box(text=ns_stderr.decode()))
|
|
||||||
else:
|
|
||||||
warning = cf.warning("`dig` is not installed! Defaulting to `nslookup`.\nThis command provides more information when `dig` is installed on the system.\n")
|
|
||||||
if await ctx.embed_requested():
|
|
||||||
embed = Embed(
|
|
||||||
title="DNS Query Result",
|
|
||||||
color=await ctx.embed_color(),
|
|
||||||
timestamp=ctx.message.created_at
|
|
||||||
)
|
|
||||||
embed.description = warning + cf.box(text=ns_stdout.decode())
|
|
||||||
await ctx.send(embed=embed)
|
|
||||||
else:
|
|
||||||
await ctx.send(content = warning + cf.box(text=ns_stdout.decode()))
|
|
||||||
except (FileNotFoundError):
|
|
||||||
await ctx.maybe_send_embed(message=cf.error("Neither `dig` nor `nslookup` are installed on the system. Unable to resolve DNS query."))
|
|
||||||
|
|
||||||
@commands.command()
|
|
||||||
async def rfc(self, ctx: commands.Context, number: int) -> None:
|
|
||||||
"""Retrieve the text of an RFC document.
|
|
||||||
|
|
||||||
This command uses the [RFC Editor website](https://www.rfc-editor.org/) to fetch the text of an RFC document.
|
|
||||||
A [Request for Comments (RFC)](https://en.wikipedia.org/wiki/Request_for_Comments) is a publication in a series from the principal technical development and standards-setting bodies for the [Internet](https://en.wikipedia.org/wiki/Internet), most prominently the [Internet Engineering Task Force](https://en.wikipedia.org/wiki/Internet_Engineering_Task_Force). An RFC is authored by individuals or groups of engineers and [computer scientists](https://en.wikipedia.org/wiki/Computer_scientist) in the form of a [memorandum](https://en.wikipedia.org/wiki/Memorandum) describing methods, behaviors, research, or innovations applicable to the working of the Internet and Internet-connected systems. It is submitted either for [peer review](https://en.wikipedia.org/wiki/Peer_review) or to convey new concepts, information, or, occasionally, engineering humor.""" # noqa: E501
|
|
||||||
url = f"https://www.rfc-editor.org/rfc/rfc{number}.html"
|
|
||||||
datatracker_url = f"https://datatracker.ietf.org/doc/rfc{number}"
|
|
||||||
async with aiohttp.ClientSession() as session:
|
|
||||||
async with session.get(url=url) as response:
|
|
||||||
if response.status == 200:
|
|
||||||
html = await response.text()
|
|
||||||
soup = BeautifulSoup(html, 'html.parser')
|
|
||||||
pre_tags = soup.find_all('pre')
|
|
||||||
content: list[Embed | str] = []
|
|
||||||
for pre_tag in pre_tags:
|
|
||||||
text = format_rfc_text(md(pre_tag), number)
|
|
||||||
if len(text) > 4096:
|
|
||||||
pagified_text = cf.pagify(text, delims=["\n\n"], page_length=4096)
|
|
||||||
for page in pagified_text:
|
|
||||||
if await ctx.embed_requested():
|
|
||||||
embed = Embed(
|
|
||||||
title=f"RFC Document {number}",
|
|
||||||
url=datatracker_url,
|
|
||||||
description=page,
|
|
||||||
color=await ctx.embed_color()
|
|
||||||
)
|
|
||||||
content.append(embed)
|
|
||||||
else:
|
|
||||||
content.append(page)
|
|
||||||
else:
|
|
||||||
if await ctx.embed_requested():
|
|
||||||
embed = Embed(
|
|
||||||
title=f"RFC Document {number}",
|
|
||||||
url=datatracker_url,
|
|
||||||
description=text,
|
|
||||||
color=await ctx.embed_color()
|
|
||||||
)
|
|
||||||
content.append(embed)
|
|
||||||
else:
|
|
||||||
content.append(text)
|
|
||||||
if await ctx.embed_requested():
|
|
||||||
for embed in content:
|
|
||||||
embed.set_footer(text=f"Page {content.index(embed) + 1}/{len(content)}")
|
|
||||||
await SimpleMenu(pages=content, disable_after_timeout=True, timeout=300).start(ctx)
|
|
||||||
else:
|
|
||||||
await ctx.maybe_send_embed(content=cf.error(f"An error occurred while fetching RFC {number}. Status code: {response.status}."))
|
|
||||||
|
|
5
speedtest/__init__.py
Normal file
5
speedtest/__init__.py
Normal file
|
@ -0,0 +1,5 @@
|
||||||
|
from .speedtest import Speedtest
|
||||||
|
|
||||||
|
|
||||||
|
async def setup(bot):
|
||||||
|
await bot.add_cog(Speedtest(bot))
|
14
speedtest/info.json
Normal file
14
speedtest/info.json
Normal file
|
@ -0,0 +1,14 @@
|
||||||
|
{
|
||||||
|
"author" : ["SeaswimmerTheFsh (seasw.)"],
|
||||||
|
"install_msg" : "Thank you for installing Speedtest!\nYou can find the source code of this cog [here](https://coastalcommits.com/SeaswimmerTheFsh/SeaCogs).",
|
||||||
|
"name" : "Speedtest",
|
||||||
|
"short" : "A collection of useful utilities.",
|
||||||
|
"description" : "A collection of useful utilities.",
|
||||||
|
"end_user_data_statement" : "This cog does not store end user data.",
|
||||||
|
"hidden": true,
|
||||||
|
"disabled": false,
|
||||||
|
"min_bot_version": "3.5.0",
|
||||||
|
"min_python_version": [3, 10, 0],
|
||||||
|
"tags" : ["utility", "information"],
|
||||||
|
"requirements": ["pydantic"]
|
||||||
|
}
|
76
speedtest/models.py
Normal file
76
speedtest/models.py
Normal file
|
@ -0,0 +1,76 @@
|
||||||
|
from datetime import datetime
|
||||||
|
|
||||||
|
from pydantic import BaseModel
|
||||||
|
|
||||||
|
|
||||||
|
class Speedtest(BaseModel):
|
||||||
|
type: str
|
||||||
|
timestamp: datetime
|
||||||
|
ping: "Ping"
|
||||||
|
download: "Bandwidth"
|
||||||
|
upload: "Bandwidth"
|
||||||
|
isp: str
|
||||||
|
interface: "Interface"
|
||||||
|
server: "Server"
|
||||||
|
result: "Result"
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def from_json(cls, data: dict) -> "Speedtest":
|
||||||
|
return cls(
|
||||||
|
type=data["type"],
|
||||||
|
timestamp=datetime.fromisoformat(data["timestamp"]),
|
||||||
|
ping=Ping(**data["ping"]),
|
||||||
|
download=Bandwidth(**data["download"]),
|
||||||
|
upload=Bandwidth(**data["upload"]),
|
||||||
|
isp=data["isp"],
|
||||||
|
interface=Interface(**data["interface"]),
|
||||||
|
server=Server(**data["server"]),
|
||||||
|
result=Result(**data["result"])
|
||||||
|
)
|
||||||
|
|
||||||
|
class Bandwidth(BaseModel):
|
||||||
|
bandwidth: float
|
||||||
|
bytes: int
|
||||||
|
elapsed: int
|
||||||
|
latency: "Latency"
|
||||||
|
|
||||||
|
@property
|
||||||
|
def mbps(self) -> float:
|
||||||
|
return self.bandwidth / 1_000_000
|
||||||
|
|
||||||
|
class Latency(BaseModel):
|
||||||
|
iqm: float
|
||||||
|
low: float
|
||||||
|
high: float
|
||||||
|
jitter: float
|
||||||
|
|
||||||
|
class Interface(BaseModel):
|
||||||
|
internalIp: str
|
||||||
|
name: str
|
||||||
|
macAddr: str
|
||||||
|
isVpn: bool
|
||||||
|
externalIp: str
|
||||||
|
|
||||||
|
class Ping(BaseModel):
|
||||||
|
jitter: float
|
||||||
|
latency: float
|
||||||
|
low: float
|
||||||
|
high: float
|
||||||
|
|
||||||
|
class Server(BaseModel):
|
||||||
|
id: int
|
||||||
|
name: str
|
||||||
|
location: str
|
||||||
|
country: str
|
||||||
|
host: str
|
||||||
|
port: int
|
||||||
|
ip: str
|
||||||
|
|
||||||
|
class Result(BaseModel):
|
||||||
|
id: str
|
||||||
|
url: str
|
||||||
|
persisted: bool
|
||||||
|
|
||||||
|
@property
|
||||||
|
def image(self) -> str:
|
||||||
|
return self.url + ".png"
|
71
speedtest/speedtest.py
Normal file
71
speedtest/speedtest.py
Normal file
|
@ -0,0 +1,71 @@
|
||||||
|
# _____ _
|
||||||
|
# / ____| (_)
|
||||||
|
# | (___ ___ __ _ _____ ___ _ __ ___ _ __ ___ ___ _ __
|
||||||
|
# \___ \ / _ \/ _` / __\ \ /\ / / | '_ ` _ \| '_ ` _ \ / _ \ '__|
|
||||||
|
# ____) | __/ (_| \__ \\ V V /| | | | | | | | | | | | __/ |
|
||||||
|
# |_____/ \___|\__,_|___/ \_/\_/ |_|_| |_| |_|_| |_| |_|\___|_|
|
||||||
|
|
||||||
|
import asyncio
|
||||||
|
import json
|
||||||
|
import subprocess
|
||||||
|
|
||||||
|
import discord
|
||||||
|
from redbot.core import commands
|
||||||
|
from redbot.core.bot import Red
|
||||||
|
from redbot.core.utils import chat_formatting as cf
|
||||||
|
|
||||||
|
from .models import Speedtest as sp
|
||||||
|
|
||||||
|
|
||||||
|
class Speedtest(commands.Cog):
|
||||||
|
"""A collection of random utilities."""
|
||||||
|
|
||||||
|
__author__ = ["SeaswimmerTheFsh"]
|
||||||
|
__version__ = "1.0.0"
|
||||||
|
|
||||||
|
def __init__(self, bot: Red):
|
||||||
|
self.bot = bot
|
||||||
|
|
||||||
|
def format_help_for_context(self, ctx: commands.Context) -> str:
|
||||||
|
pre_processed = super().format_help_for_context(ctx) or ""
|
||||||
|
n = "\n" if "\n\n" not in pre_processed else ""
|
||||||
|
text = [
|
||||||
|
f"{pre_processed}{n}",
|
||||||
|
f"Cog Version: **{self.__version__}**",
|
||||||
|
f"Author: {cf.humanize_list(self.__author__)}"
|
||||||
|
]
|
||||||
|
return "\n".join(text)
|
||||||
|
|
||||||
|
async def run_speedtest(self) -> str | sp:
|
||||||
|
try:
|
||||||
|
process = await asyncio.create_subprocess_exec(
|
||||||
|
"speedtest", "-f", "json", "--accept-license",
|
||||||
|
stdout=subprocess.PIPE,
|
||||||
|
stderr=subprocess.PIPE
|
||||||
|
)
|
||||||
|
except FileNotFoundError:
|
||||||
|
return "Speedtest CLI is not installed."
|
||||||
|
stdout, stderr = await process.communicate()
|
||||||
|
if process.returncode != 0:
|
||||||
|
return stderr.decode("utf-8")
|
||||||
|
return sp.from_json(json.loads(stdout.decode("utf-8")))
|
||||||
|
|
||||||
|
@commands.command()
|
||||||
|
@commands.is_owner()
|
||||||
|
async def speedtest(self, ctx: commands.Context) -> None:
|
||||||
|
"""Run a speedtest."""
|
||||||
|
msg = await ctx.maybe_send_embed("Running speedtest...")
|
||||||
|
async with ctx.typing():
|
||||||
|
speedtest = await self.run_speedtest()
|
||||||
|
if await ctx.embed_requested():
|
||||||
|
if not isinstance(speedtest, sp):
|
||||||
|
await msg.edit(embed=discord.Embed(description=f"An error occurred! {speedtest}", color=discord.Colour.red()))
|
||||||
|
return
|
||||||
|
embed = discord.Embed(title="Speedtest Results", url=speedtest.result.url, color=await ctx.embed_color())
|
||||||
|
embed.set_image(url=speedtest.result.image)
|
||||||
|
await msg.edit(embed=embed)
|
||||||
|
else:
|
||||||
|
if not isinstance(speedtest, sp):
|
||||||
|
await msg.edit(content=f"An error occurred! \n`{speedtest}`")
|
||||||
|
return
|
||||||
|
await msg.edit(content=f"**[Result]({speedtest.result.url})**")
|
Loading…
Reference in a new issue