SeaCogs/speedtest/speedtest.py

75 lines
3.4 KiB
Python

# _____ _
# / ____| (_)
# | (___ ___ __ _ _____ ___ _ __ ___ _ __ ___ ___ _ __
# \___ \ / _ \/ _` / __\ \ /\ / / | '_ ` _ \| '_ ` _ \ / _ \ '__|
# ____) | __/ (_| \__ \\ V V /| | | | | | | | | | | | __/ |
# |_____/ \___|\__,_|___/ \_/\_/ |_|_| |_| |_|_| |_| |_|\___|_|
import asyncio
import json
import subprocess
from typing import Any
import discord
from redbot.core import commands
from redbot.core.bot import Red
from redbot.core.utils import chat_formatting as cf
class Speedtest(commands.Cog):
"""A collection of random utilities."""
__author__ = ["SeaswimmerTheFsh"]
__version__ = "1.0.0"
def __init__(self, bot: Red):
self.bot = bot
def format_help_for_context(self, ctx: commands.Context) -> str:
pre_processed = super().format_help_for_context(ctx) or ""
n = "\n" if "\n\n" not in pre_processed else ""
text = [
f"{pre_processed}{n}",
f"Cog Version: **{self.__version__}**",
f"Author: {cf.humanize_list(self.__author__)}"
]
return "\n".join(text)
async def run_speedtest(self) -> str | Any:
try:
process = await asyncio.create_subprocess_exec(
"speedtest", "-f json", "--accept-license",
stdout=subprocess.PIPE,
stderr=subprocess.PIPE
)
except FileNotFoundError:
return "Speedtest CLI is not installed."
stdout, stderr = await process.communicate()
if process.returncode != 0:
return stderr.decode("utf-8")
return json.loads(stdout.decode("utf-8"))
@commands.command()
@commands.is_owner()
async def speedtest(self, ctx: commands.Context) -> None:
"""Run a speedtest."""
msg = await ctx.maybe_send_embed("Running speedtest...")
async with ctx.typing():
r = await self.run_speedtest()
if await ctx.embed_requested():
if isinstance(r, str):
await msg.edit(embed=discord.Embed(description=f"An error occurred! {r}", color=await discord.Colour.red()))
return
embed = discord.Embed(title="Speedtest Results", link=r["result"]["url"], color=await ctx.embed_color())
embed.add_field(name="Bandwidth", value=f"{r['download']['bandwidth'] / 1_000_000:.2f} Mbps down, {r['upload']['bandwidth'] / 1_000_000:.2f} Mbps up")
embed.add_field(name="Ping", value=f"Base: {round(r['ping']['latency'])} ms \nDownload: {round(r['download']['latency']['igm'])} ms \nUpload: {round(r['upload']['latency']['igm'])} ms")
embed.set_image(url=r["result"]["image"] + ".png")
await msg.edit(embed=embed)
else:
if isinstance(r, str):
await msg.edit(content=f"An error occurred! {r}")
return
await msg.edit(content=f"**Speedtest Results**\n"
f"**Bandwidth**: {r['download']['bandwidth'] / 1_000_000:.2f} Mbps down, {r['upload']['bandwidth'] / 1_000_000:.2f} Mbps up\n"
f"**Ping**: Base: {round(r['ping']['latency'])} ms, Download: {round(r['download']['latency']['igm'])} ms, Upload: {round(r['upload']['latency']['igm'])} ms\n"
f"**Results**: {r['result']['url']}")