# _____ _ # / ____| (_) # | (___ ___ __ _ _____ ___ _ __ ___ _ __ ___ ___ _ __ # \___ \ / _ \/ _` / __\ \ /\ / / | '_ ` _ \| '_ ` _ \ / _ \ '__| # ____) | __/ (_| \__ \\ V V /| | | | | | | | | | | | __/ | # |_____/ \___|\__,_|___/ \_/\_/ |_|_| |_| |_|_| |_| |_|\___|_| import asyncio import json import subprocess import discord from redbot.core import commands from redbot.core.bot import Red from redbot.core.utils import chat_formatting as cf from .models import Speedtest as sp class Speedtest(commands.Cog): """A collection of random utilities.""" __author__ = ["SeaswimmerTheFsh"] __version__ = "1.0.0" def __init__(self, bot: Red): self.bot = bot def format_help_for_context(self, ctx: commands.Context) -> str: pre_processed = super().format_help_for_context(ctx) or "" n = "\n" if "\n\n" not in pre_processed else "" text = [ f"{pre_processed}{n}", f"Cog Version: **{self.__version__}**", f"Author: {cf.humanize_list(self.__author__)}" ] return "\n".join(text) async def run_speedtest(self) -> str | sp: try: process = await asyncio.create_subprocess_exec( "speedtest", "-f", "json", "--accept-license", "--accept-gdpr", stdout=subprocess.PIPE, stderr=subprocess.PIPE ) except FileNotFoundError: return "Speedtest CLI is not installed." stdout, stderr = await process.communicate() if process.returncode != 0: return stderr.decode("utf-8") return sp.from_json(json.loads(stdout.decode("utf-8"))) @commands.command() @commands.is_owner() async def speedtest(self, ctx: commands.Context) -> None: """Run a speedtest.""" msg = await ctx.maybe_send_embed("Running speedtest...") async with ctx.typing(): speedtest = await self.run_speedtest() if await ctx.embed_requested(): if not isinstance(speedtest, sp): await msg.edit(embed=discord.Embed(description=f"An error occurred! {speedtest}", color=discord.Colour.red())) return embed = discord.Embed(title="Speedtest Results", url=speedtest.result.url, color=await ctx.embed_color()) embed.set_image(url=speedtest.result.image) await msg.edit(embed=embed) else: if not isinstance(speedtest, sp): await msg.edit(content=f"An error occurred! \n`{speedtest}`") return await msg.edit(content=f"**[Result]({speedtest.result.url})**")