Compare commits
17 commits
Author | SHA1 | Date | |
---|---|---|---|
9103d64fbc | |||
94b692a705 | |||
cba25d1f94 | |||
d557928fee | |||
418eeb983c | |||
45d0b32826 | |||
e88cba6cf4 | |||
eb41c1a72b | |||
c2333f40ef | |||
30003cd7c8 | |||
b210fa8bb0 | |||
7806aa29c5 | |||
8f55c6083a | |||
4795df7dcc | |||
14dfafea2a | |||
c3ab7593c7 | |||
ef591224cd |
12 changed files with 178 additions and 230 deletions
|
@ -18,5 +18,4 @@
|
|||
import-self,
|
||||
relative-beyond-top-level,
|
||||
too-many-instance-attributes,
|
||||
duplicate-code,
|
||||
too-many-nested-blocks
|
||||
duplicate-code
|
||||
|
|
|
@ -58,7 +58,7 @@ jobs:
|
|||
npx -p "@getmeli/cli" meli upload ./site \
|
||||
--url "https://pages.coastalcommits.com" \
|
||||
--site "${{ vars.MELI_SITE_ID }}" \
|
||||
--token "${{ secrets.MELI_SECRET }}" \
|
||||
--token "${{ secrets.MELI_SITE_SECRET }}" \
|
||||
--release "$CI_ACTION_REF_NAME_SLUG/${{ env.GITHUB_SHA }}" \
|
||||
--branch "$CI_ACTION_REF_NAME_SLUG"
|
||||
|
||||
|
|
|
@ -100,7 +100,7 @@ class Backup(commands.Cog):
|
|||
except (json.JSONDecodeError, IndexError):
|
||||
try:
|
||||
export = json.loads(await ctx.message.reference.resolved.attachments[0].read())
|
||||
except (json.JSONDecodeError, IndexError, AttributeError):
|
||||
except (json.JSONDecodeError, IndexError):
|
||||
await ctx.send(error("Please provide a valid JSON export file."))
|
||||
return
|
||||
|
||||
|
|
|
@ -81,7 +81,6 @@ class PartialEmoji(discord.PartialEmoji):
|
|||
with open(path, "r", encoding="UTF-8") as file:
|
||||
emojis: dict = json.load(file)
|
||||
emoji_aliases = []
|
||||
emoji_group = None
|
||||
for dict_name, group in emojis.items():
|
||||
for k, v in group.items():
|
||||
if v == value:
|
||||
|
|
49
poetry.lock
generated
49
poetry.lock
generated
|
@ -228,27 +228,6 @@ files = [
|
|||
[package.extras]
|
||||
dev = ["freezegun (>=1.0,<2.0)", "pytest (>=6.0)", "pytest-cov"]
|
||||
|
||||
[[package]]
|
||||
name = "beautifulsoup4"
|
||||
version = "4.12.3"
|
||||
description = "Screen-scraping library"
|
||||
optional = false
|
||||
python-versions = ">=3.6.0"
|
||||
files = [
|
||||
{file = "beautifulsoup4-4.12.3-py3-none-any.whl", hash = "sha256:b80878c9f40111313e55da8ba20bdba06d8fa3969fc68304167741bbf9e082ed"},
|
||||
{file = "beautifulsoup4-4.12.3.tar.gz", hash = "sha256:74e3d1928edc070d21748185c46e3fb33490f22f52a3addee9aee0f4f7781051"},
|
||||
]
|
||||
|
||||
[package.dependencies]
|
||||
soupsieve = ">1.2"
|
||||
|
||||
[package.extras]
|
||||
cchardet = ["cchardet"]
|
||||
chardet = ["chardet"]
|
||||
charset-normalizer = ["charset-normalizer"]
|
||||
html5lib = ["html5lib"]
|
||||
lxml = ["lxml"]
|
||||
|
||||
[[package]]
|
||||
name = "brotli"
|
||||
version = "1.1.0"
|
||||
|
@ -911,21 +890,6 @@ profiling = ["gprof2dot"]
|
|||
rtd = ["jupyter_sphinx", "mdit-py-plugins", "myst-parser", "pyyaml", "sphinx", "sphinx-copybutton", "sphinx-design", "sphinx_book_theme"]
|
||||
testing = ["coverage", "pytest", "pytest-cov", "pytest-regressions"]
|
||||
|
||||
[[package]]
|
||||
name = "markdownify"
|
||||
version = "0.12.1"
|
||||
description = "Convert HTML to markdown."
|
||||
optional = false
|
||||
python-versions = "*"
|
||||
files = [
|
||||
{file = "markdownify-0.12.1-py3-none-any.whl", hash = "sha256:a3805abd8166dbb7b27783c5599d91f54f10d79894b2621404d85b333c7ce561"},
|
||||
{file = "markdownify-0.12.1.tar.gz", hash = "sha256:1fb08c618b30e0ee7a31a39b998f44a18fb28ab254f55f4af06b6d35a2179e27"},
|
||||
]
|
||||
|
||||
[package.dependencies]
|
||||
beautifulsoup4 = ">=4.9,<5"
|
||||
six = ">=1.15,<2"
|
||||
|
||||
[[package]]
|
||||
name = "markupsafe"
|
||||
version = "2.1.5"
|
||||
|
@ -2147,17 +2111,6 @@ files = [
|
|||
{file = "smmap-5.0.1.tar.gz", hash = "sha256:dceeb6c0028fdb6734471eb07c0cd2aae706ccaecab45965ee83f11c8d3b1f62"},
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "soupsieve"
|
||||
version = "2.5"
|
||||
description = "A modern CSS selector implementation for Beautiful Soup."
|
||||
optional = false
|
||||
python-versions = ">=3.8"
|
||||
files = [
|
||||
{file = "soupsieve-2.5-py3-none-any.whl", hash = "sha256:eaa337ff55a1579b6549dc679565eac1e3d000563bcb1c8ab0d0fefbc0c2cdc7"},
|
||||
{file = "soupsieve-2.5.tar.gz", hash = "sha256:5663d5a7b3bfaeee0bc4372e7fc48f9cff4940b3eec54a6451cc5299f1097690"},
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "tinycss2"
|
||||
version = "1.2.1"
|
||||
|
@ -2498,4 +2451,4 @@ multidict = ">=4.0"
|
|||
[metadata]
|
||||
lock-version = "2.0"
|
||||
python-versions = ">=3.11,<3.12"
|
||||
content-hash = "229d7fd39618cf708f3cd5409dde2e6e25b822e4f936e14b3ade9800bf00daab"
|
||||
content-hash = "0ac382e0399d9c23c5f89a0ffeb3aae056dc8b28e864b22f815c0e3eb34175bd"
|
||||
|
|
|
@ -15,8 +15,6 @@ websockets = "^12.0"
|
|||
pillow = "^10.3.0"
|
||||
numpy = "^1.26.4"
|
||||
colorthief = "^0.2.1"
|
||||
beautifulsoup4 = "^4.12.3"
|
||||
markdownify = "^0.12.1"
|
||||
|
||||
[tool.poetry.group.dev]
|
||||
optional = true
|
||||
|
|
|
@ -8,6 +8,5 @@
|
|||
"hidden": true,
|
||||
"disabled": false,
|
||||
"min_bot_version": "3.5.0",
|
||||
"min_python_version": [3, 8, 0],
|
||||
"requirements": ["beautifulsoup4", "markdownify"]
|
||||
"min_python_version": [3, 8, 0]
|
||||
}
|
||||
|
|
|
@ -5,20 +5,13 @@
|
|||
# ____) | __/ (_| \__ \\ V V /| | | | | | | | | | | | __/ |
|
||||
# |_____/ \___|\__,_|___/ \_/\_/ |_|_| |_| |_|_| |_| |_|\___|_|
|
||||
|
||||
import asyncio
|
||||
import inspect
|
||||
import operator
|
||||
import re
|
||||
from asyncio.subprocess import Process
|
||||
from functools import partial, partialmethod
|
||||
from typing import Any
|
||||
|
||||
import aiohttp
|
||||
import yaml
|
||||
from bs4 import BeautifulSoup
|
||||
from discord import Color, Embed, app_commands
|
||||
from discord import Embed, app_commands
|
||||
from discord.utils import CachedSlotProperty, cached_property
|
||||
from markdownify import MarkdownConverter
|
||||
from redbot.core import commands
|
||||
from redbot.core.bot import Red
|
||||
from redbot.core.dev_commands import cleanup_code
|
||||
|
@ -26,38 +19,29 @@ from redbot.core.utils import chat_formatting as cf
|
|||
from redbot.core.utils.views import SimpleMenu
|
||||
|
||||
|
||||
def md(soup: BeautifulSoup, **options) -> Any | str:
|
||||
return MarkdownConverter(**options).convert_soup(soup=soup)
|
||||
|
||||
def format_rfc_text(text: str, number: int) -> str:
|
||||
one: str = re.sub(r"\(\.\/rfc(\d+)", r"(https://www.rfc-editor.org/rfc/rfc\1.html", text)
|
||||
two: str = re.sub(r"\((#(?:section|page)-\d+(?:.\d+)?)\)", f"(https://www.rfc-editor.org/rfc/rfc{number}.html\1)", one)
|
||||
three: str = re.sub(r"\n{3,}", "\n\n", two)
|
||||
return three
|
||||
|
||||
class SeaUtils(commands.Cog):
|
||||
"""A collection of random utilities."""
|
||||
|
||||
__author__ = ["SeaswimmerTheFsh"]
|
||||
__version__ = "1.0.0"
|
||||
|
||||
def __init__(self, bot: Red) -> None:
|
||||
def __init__(self, bot: Red):
|
||||
self.bot = bot
|
||||
|
||||
def format_help_for_context(self, ctx: commands.Context) -> str:
|
||||
pre_processed = super().format_help_for_context(ctx=ctx) or ""
|
||||
pre_processed = super().format_help_for_context(ctx) or ""
|
||||
n = "\n" if "\n\n" not in pre_processed else ""
|
||||
text = [
|
||||
f"{pre_processed}{n}",
|
||||
f"Cog Version: **{self.__version__}**",
|
||||
f"Author: {cf.humanize_list(items=self.__author__)}"
|
||||
f"Author: {cf.humanize_list(self.__author__)}"
|
||||
]
|
||||
return "\n".join(text)
|
||||
|
||||
def format_src(self, obj: Any) -> str:
|
||||
"""A large portion of this code is repurposed from Zephyrkul's RTFS cog.
|
||||
https://github.com/Zephyrkul/FluffyCogs/blob/master/rtfs/rtfs.py"""
|
||||
obj = inspect.unwrap(func=obj)
|
||||
obj = inspect.unwrap(obj)
|
||||
src: Any = getattr(obj, "__func__", obj)
|
||||
if isinstance(obj, (commands.Command, app_commands.Command)):
|
||||
src = obj.callback
|
||||
|
@ -67,11 +51,11 @@ class SeaUtils(commands.Cog):
|
|||
src = obj.fget
|
||||
elif isinstance(obj, (cached_property, CachedSlotProperty)):
|
||||
src = obj.function
|
||||
return inspect.getsource(object=src)
|
||||
return inspect.getsource(src)
|
||||
|
||||
@commands.command(aliases=["source", "src", "code", "showsource"])
|
||||
@commands.is_owner()
|
||||
async def showcode(self, ctx: commands.Context, *, object: str) -> None: # pylint: disable=redefined-builtin
|
||||
async def showcode(self, ctx: commands.Context, *, object: str): # pylint: disable=redefined-builtin
|
||||
"""Show the code for a particular object."""
|
||||
try:
|
||||
if object.startswith("/") and (obj := ctx.bot.tree.get_command(object[1:])):
|
||||
|
@ -80,8 +64,6 @@ class SeaUtils(commands.Cog):
|
|||
text = self.format_src(type(obj))
|
||||
elif obj := ctx.bot.get_command(object):
|
||||
text = self.format_src(obj)
|
||||
else:
|
||||
raise AttributeError
|
||||
temp_content = cf.pagify(
|
||||
text=cleanup_code(text),
|
||||
escape_mass_mentions=True,
|
||||
|
@ -100,151 +82,3 @@ class SeaUtils(commands.Cog):
|
|||
await ctx.send(embed=embed, reference=ctx.message.to_reference(fail_if_not_exists=False))
|
||||
else:
|
||||
await ctx.send(content="Object not found!", reference=ctx.message.to_reference(fail_if_not_exists=False))
|
||||
|
||||
@commands.command(name='dig', aliases=['dnslookup', 'nslookup'])
|
||||
@commands.is_owner()
|
||||
async def dig(self, ctx: commands.Context, name: str, record_type: str | None = None, server: str | None = None, port: int = 53) -> None:
|
||||
"""Retrieve DNS information for a domain.
|
||||
|
||||
Uses `dig` to perform a DNS query. Will fall back to `nslookup` if `dig` is not installed on the system.
|
||||
`nslookup` does not provide as much information as `dig`, so only the `name` parameter will be used if `nslookup` is used.
|
||||
Will return the A, AAAA, and CNAME records for a domain by default. You can specify a different record type with the `type` parameter."""
|
||||
command_opts: list[str | int] = ['dig']
|
||||
query_types: list[str] = [record_type] if record_type else ['A', 'AAAA', 'CNAME']
|
||||
if server:
|
||||
command_opts.extend(['@', server])
|
||||
for query_type in query_types:
|
||||
command_opts.extend([name, query_type])
|
||||
command_opts.extend(['-p', str(port), '+yaml'])
|
||||
|
||||
try:
|
||||
process: Process = await asyncio.create_subprocess_exec(*command_opts, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE)
|
||||
stdout, stderr = await process.communicate()
|
||||
if stderr:
|
||||
await ctx.maybe_send_embed(message="An error was encountered!\n" + cf.box(text=stderr.decode()))
|
||||
else:
|
||||
data = yaml.safe_load(stdout.decode())
|
||||
message_data: dict = data[0]['message']
|
||||
response_data: dict = message_data['response_message_data']
|
||||
if ctx.embed_requested():
|
||||
embed = Embed(
|
||||
title="DNS Query Result",
|
||||
color=await ctx.embed_color(),
|
||||
timestamp=message_data['response_time']
|
||||
)
|
||||
embed.add_field(name="Response Address", value=message_data['response_address'], inline=True)
|
||||
embed.add_field(name="Response Port", value=message_data['response_port'], inline=True)
|
||||
embed.add_field(name="Query Address", value=message_data['query_address'], inline=True)
|
||||
embed.add_field(name="Query Port", value=message_data['query_port'], inline=True)
|
||||
embed.add_field(name="Status", value=response_data['status'], inline=True)
|
||||
embed.add_field(name="Flags", value=response_data['flags'], inline=True)
|
||||
|
||||
if response_data.get('status') != 'NOERROR':
|
||||
embed.colour = Color.red()
|
||||
embed.description = cf.error("Dig query did not return `NOERROR` status.")
|
||||
|
||||
questions = []
|
||||
answers = []
|
||||
authorities = []
|
||||
for m in data:
|
||||
response = m['message']['response_message_data']
|
||||
if 'QUESTION_SECTION' in response:
|
||||
for question in response['QUESTION_SECTION']:
|
||||
if question not in questions:
|
||||
questions.append(question)
|
||||
|
||||
if 'ANSWER_SECTION' in response:
|
||||
for answer in response['ANSWER_SECTION']:
|
||||
if answer not in answers:
|
||||
answers.append(answer)
|
||||
|
||||
if 'AUTHORITY_SECTION' in response:
|
||||
for authority in response['AUTHORITY_SECTION']:
|
||||
if authority not in authorities:
|
||||
authorities.append(authority)
|
||||
|
||||
if questions:
|
||||
question_section = "\n".join(questions)
|
||||
embed.add_field(name="Question Section", value=f"{cf.box(text=question_section, lang='prolog')}", inline=False)
|
||||
|
||||
if answers:
|
||||
answer_section = "\n".join(answers)
|
||||
if len(answer_section) > 1024:
|
||||
embed.description = cf.warning("Answer section is too long to fit within embed field, falling back to description.") + cf.box(answer_section)
|
||||
else:
|
||||
embed.add_field(name="Answer Section", value=f"{cf.box(text=answer_section, lang='prolog')}", inline=False)
|
||||
|
||||
if authorities:
|
||||
authority_section = "\n".join(authorities)
|
||||
embed.add_field(name="Authority Section", value=f"{cf.box(text=authority_section, lang='prolog')}", inline=False)
|
||||
await ctx.send(embed=embed)
|
||||
else:
|
||||
await ctx.send(content=cf.box(text=stdout, lang='yaml'))
|
||||
except (FileNotFoundError):
|
||||
try:
|
||||
ns_process = await asyncio.create_subprocess_exec('nslookup', name, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE)
|
||||
ns_stdout, ns_stderr = await ns_process.communicate()
|
||||
if ns_stderr:
|
||||
await ctx.maybe_send_embed(message="An error was encountered!\n" + cf.box(text=ns_stderr.decode()))
|
||||
else:
|
||||
warning = cf.warning("`dig` is not installed! Defaulting to `nslookup`.\nThis command provides more information when `dig` is installed on the system.\n")
|
||||
if await ctx.embed_requested():
|
||||
embed = Embed(
|
||||
title="DNS Query Result",
|
||||
color=await ctx.embed_color(),
|
||||
timestamp=ctx.message.created_at
|
||||
)
|
||||
embed.description = warning + cf.box(text=ns_stdout.decode())
|
||||
await ctx.send(embed=embed)
|
||||
else:
|
||||
await ctx.send(content = warning + cf.box(text=ns_stdout.decode()))
|
||||
except (FileNotFoundError):
|
||||
await ctx.maybe_send_embed(message=cf.error("Neither `dig` nor `nslookup` are installed on the system. Unable to resolve DNS query."))
|
||||
|
||||
@commands.command()
|
||||
async def rfc(self, ctx: commands.Context, number: int) -> None:
|
||||
"""Retrieve the text of an RFC document.
|
||||
|
||||
This command uses the [RFC Editor website](https://www.rfc-editor.org/) to fetch the text of an RFC document.
|
||||
A [Request for Comments (RFC)](https://en.wikipedia.org/wiki/Request_for_Comments) is a publication in a series from the principal technical development and standards-setting bodies for the [Internet](https://en.wikipedia.org/wiki/Internet), most prominently the [Internet Engineering Task Force](https://en.wikipedia.org/wiki/Internet_Engineering_Task_Force). An RFC is authored by individuals or groups of engineers and [computer scientists](https://en.wikipedia.org/wiki/Computer_scientist) in the form of a [memorandum](https://en.wikipedia.org/wiki/Memorandum) describing methods, behaviors, research, or innovations applicable to the working of the Internet and Internet-connected systems. It is submitted either for [peer review](https://en.wikipedia.org/wiki/Peer_review) or to convey new concepts, information, or, occasionally, engineering humor.""" # noqa: E501
|
||||
url = f"https://www.rfc-editor.org/rfc/rfc{number}.html"
|
||||
datatracker_url = f"https://datatracker.ietf.org/doc/rfc{number}"
|
||||
async with aiohttp.ClientSession() as session:
|
||||
async with session.get(url=url) as response:
|
||||
if response.status == 200:
|
||||
html = await response.text()
|
||||
soup = BeautifulSoup(html, 'html.parser')
|
||||
pre_tags = soup.find_all('pre')
|
||||
content: list[Embed | str] = []
|
||||
for pre_tag in pre_tags:
|
||||
text = format_rfc_text(md(pre_tag), number)
|
||||
if len(text) > 4096:
|
||||
pagified_text = cf.pagify(text, delims=["\n\n"], page_length=4096)
|
||||
for page in pagified_text:
|
||||
if await ctx.embed_requested():
|
||||
embed = Embed(
|
||||
title=f"RFC Document {number}",
|
||||
url=datatracker_url,
|
||||
description=page,
|
||||
color=await ctx.embed_color()
|
||||
)
|
||||
content.append(embed)
|
||||
else:
|
||||
content.append(page)
|
||||
else:
|
||||
if await ctx.embed_requested():
|
||||
embed = Embed(
|
||||
title=f"RFC Document {number}",
|
||||
url=datatracker_url,
|
||||
description=text,
|
||||
color=await ctx.embed_color()
|
||||
)
|
||||
content.append(embed)
|
||||
else:
|
||||
content.append(text)
|
||||
if await ctx.embed_requested():
|
||||
for embed in content:
|
||||
embed.set_footer(text=f"Page {content.index(embed) + 1}/{len(content)}")
|
||||
await SimpleMenu(pages=content, disable_after_timeout=True, timeout=300).start(ctx)
|
||||
else:
|
||||
await ctx.maybe_send_embed(content=cf.error(f"An error occurred while fetching RFC {number}. Status code: {response.status}."))
|
||||
|
|
5
speedtest/__init__.py
Normal file
5
speedtest/__init__.py
Normal file
|
@ -0,0 +1,5 @@
|
|||
from .speedtest import Speedtest
|
||||
|
||||
|
||||
async def setup(bot):
|
||||
await bot.add_cog(Speedtest(bot))
|
14
speedtest/info.json
Normal file
14
speedtest/info.json
Normal file
|
@ -0,0 +1,14 @@
|
|||
{
|
||||
"author" : ["SeaswimmerTheFsh (seasw.)"],
|
||||
"install_msg" : "Thank you for installing Speedtest!\nYou can find the source code of this cog [here](https://coastalcommits.com/SeaswimmerTheFsh/SeaCogs).",
|
||||
"name" : "Speedtest",
|
||||
"short" : "A collection of useful utilities.",
|
||||
"description" : "A collection of useful utilities.",
|
||||
"end_user_data_statement" : "This cog does not store end user data.",
|
||||
"hidden": true,
|
||||
"disabled": false,
|
||||
"min_bot_version": "3.5.0",
|
||||
"min_python_version": [3, 10, 0],
|
||||
"tags" : ["utility", "information"],
|
||||
"requirements": ["pydantic"]
|
||||
}
|
76
speedtest/models.py
Normal file
76
speedtest/models.py
Normal file
|
@ -0,0 +1,76 @@
|
|||
from datetime import datetime
|
||||
|
||||
from pydantic import BaseModel
|
||||
|
||||
|
||||
class Speedtest(BaseModel):
|
||||
type: str
|
||||
timestamp: datetime
|
||||
ping: "Ping"
|
||||
download: "Bandwidth"
|
||||
upload: "Bandwidth"
|
||||
isp: str
|
||||
interface: "Interface"
|
||||
server: "Server"
|
||||
result: "Result"
|
||||
|
||||
@classmethod
|
||||
def from_json(cls, data: dict) -> "Speedtest":
|
||||
return cls(
|
||||
type=data["type"],
|
||||
timestamp=datetime.fromisoformat(data["timestamp"]),
|
||||
ping=Ping(**data["ping"]),
|
||||
download=Bandwidth(**data["download"]),
|
||||
upload=Bandwidth(**data["upload"]),
|
||||
isp=data["isp"],
|
||||
interface=Interface(**data["interface"]),
|
||||
server=Server(**data["server"]),
|
||||
result=Result(**data["result"])
|
||||
)
|
||||
|
||||
class Bandwidth(BaseModel):
|
||||
bandwidth: float
|
||||
bytes: int
|
||||
elapsed: int
|
||||
latency: "Latency"
|
||||
|
||||
@property
|
||||
def mbps(self) -> float:
|
||||
return self.bandwidth / 1_000_000
|
||||
|
||||
class Latency(BaseModel):
|
||||
iqm: float
|
||||
low: float
|
||||
high: float
|
||||
jitter: float
|
||||
|
||||
class Interface(BaseModel):
|
||||
internalIp: str
|
||||
name: str
|
||||
macAddr: str
|
||||
isVpn: bool
|
||||
externalIp: str
|
||||
|
||||
class Ping(BaseModel):
|
||||
jitter: float
|
||||
latency: float
|
||||
low: float
|
||||
high: float
|
||||
|
||||
class Server(BaseModel):
|
||||
id: int
|
||||
name: str
|
||||
location: str
|
||||
country: str
|
||||
host: str
|
||||
port: int
|
||||
ip: str
|
||||
|
||||
class Result(BaseModel):
|
||||
id: str
|
||||
url: str
|
||||
persisted: bool
|
||||
|
||||
@property
|
||||
def image(self) -> str:
|
||||
return self.url + ".png"
|
71
speedtest/speedtest.py
Normal file
71
speedtest/speedtest.py
Normal file
|
@ -0,0 +1,71 @@
|
|||
# _____ _
|
||||
# / ____| (_)
|
||||
# | (___ ___ __ _ _____ ___ _ __ ___ _ __ ___ ___ _ __
|
||||
# \___ \ / _ \/ _` / __\ \ /\ / / | '_ ` _ \| '_ ` _ \ / _ \ '__|
|
||||
# ____) | __/ (_| \__ \\ V V /| | | | | | | | | | | | __/ |
|
||||
# |_____/ \___|\__,_|___/ \_/\_/ |_|_| |_| |_|_| |_| |_|\___|_|
|
||||
|
||||
import asyncio
|
||||
import json
|
||||
import subprocess
|
||||
|
||||
import discord
|
||||
from redbot.core import commands
|
||||
from redbot.core.bot import Red
|
||||
from redbot.core.utils import chat_formatting as cf
|
||||
|
||||
from .models import Speedtest as sp
|
||||
|
||||
|
||||
class Speedtest(commands.Cog):
|
||||
"""A collection of random utilities."""
|
||||
|
||||
__author__ = ["SeaswimmerTheFsh"]
|
||||
__version__ = "1.0.0"
|
||||
|
||||
def __init__(self, bot: Red):
|
||||
self.bot = bot
|
||||
|
||||
def format_help_for_context(self, ctx: commands.Context) -> str:
|
||||
pre_processed = super().format_help_for_context(ctx) or ""
|
||||
n = "\n" if "\n\n" not in pre_processed else ""
|
||||
text = [
|
||||
f"{pre_processed}{n}",
|
||||
f"Cog Version: **{self.__version__}**",
|
||||
f"Author: {cf.humanize_list(self.__author__)}"
|
||||
]
|
||||
return "\n".join(text)
|
||||
|
||||
async def run_speedtest(self) -> str | sp:
|
||||
try:
|
||||
process = await asyncio.create_subprocess_exec(
|
||||
"speedtest", "-f", "json", "--accept-license",
|
||||
stdout=subprocess.PIPE,
|
||||
stderr=subprocess.PIPE
|
||||
)
|
||||
except FileNotFoundError:
|
||||
return "Speedtest CLI is not installed."
|
||||
stdout, stderr = await process.communicate()
|
||||
if process.returncode != 0:
|
||||
return stderr.decode("utf-8")
|
||||
return sp.from_json(json.loads(stdout.decode("utf-8")))
|
||||
|
||||
@commands.command()
|
||||
@commands.is_owner()
|
||||
async def speedtest(self, ctx: commands.Context) -> None:
|
||||
"""Run a speedtest."""
|
||||
msg = await ctx.maybe_send_embed("Running speedtest...")
|
||||
async with ctx.typing():
|
||||
speedtest = await self.run_speedtest()
|
||||
if await ctx.embed_requested():
|
||||
if not isinstance(speedtest, sp):
|
||||
await msg.edit(embed=discord.Embed(description=f"An error occurred! {speedtest}", color=discord.Colour.red()))
|
||||
return
|
||||
embed = discord.Embed(title="Speedtest Results", url=speedtest.result.url, color=await ctx.embed_color())
|
||||
embed.set_image(url=speedtest.result.image)
|
||||
await msg.edit(embed=embed)
|
||||
else:
|
||||
if not isinstance(speedtest, sp):
|
||||
await msg.edit(content=f"An error occurred! \n`{speedtest}`")
|
||||
return
|
||||
await msg.edit(content=f"**[Result]({speedtest.result.url})**")
|
Loading…
Reference in a new issue