2024-05-25 17:09:07 -04:00
# _____ _
# / ____| (_)
# | (___ ___ __ _ _____ ___ _ __ ___ _ __ ___ ___ _ __
# \___ \ / _ \/ _` / __\ \ /\ / / | '_ ` _ \| '_ ` _ \ / _ \ '__|
# ____) | __/ (_| \__ \\ V V /| | | | | | | | | | | | __/ |
# |_____/ \___|\__,_|___/ \_/\_/ |_|_| |_| |_|_| |_| |_|\___|_|
2024-05-25 17:19:48 -04:00
import asyncio
2024-05-25 17:09:07 -04:00
import json
import subprocess
import discord
from redbot . core import commands
from redbot . core . bot import Red
from redbot . core . utils import chat_formatting as cf
2024-05-25 17:55:05 -04:00
from . models import Speedtest as sp
2024-05-25 17:09:07 -04:00
class Speedtest ( commands . Cog ) :
""" A collection of random utilities. """
__author__ = [ " SeaswimmerTheFsh " ]
__version__ = " 1.0.0 "
def __init__ ( self , bot : Red ) :
self . bot = bot
def format_help_for_context ( self , ctx : commands . Context ) - > str :
pre_processed = super ( ) . format_help_for_context ( ctx ) or " "
n = " \n " if " \n \n " not in pre_processed else " "
text = [
f " { pre_processed } { n } " ,
f " Cog Version: ** { self . __version__ } ** " ,
f " Author: { cf . humanize_list ( self . __author__ ) } "
]
return " \n " . join ( text )
2024-05-25 17:55:05 -04:00
async def run_speedtest ( self ) - > str | sp :
2024-05-25 17:21:33 -04:00
try :
process = await asyncio . create_subprocess_exec (
2024-05-25 17:38:02 -04:00
" speedtest " , " -f " , " json " , " --accept-license " ,
2024-05-25 17:21:33 -04:00
stdout = subprocess . PIPE ,
stderr = subprocess . PIPE
)
except FileNotFoundError :
return " Speedtest CLI is not installed. "
2024-05-25 17:19:48 -04:00
stdout , stderr = await process . communicate ( )
if process . returncode != 0 :
return stderr . decode ( " utf-8 " )
2024-05-25 17:55:05 -04:00
return sp . from_json ( json . loads ( stdout . decode ( " utf-8 " ) ) )
2024-05-25 17:19:48 -04:00
2024-05-25 17:09:07 -04:00
@commands.command ( )
@commands.is_owner ( )
2024-05-25 17:19:48 -04:00
async def speedtest ( self , ctx : commands . Context ) - > None :
2024-05-25 17:09:07 -04:00
""" Run a speedtest. """
msg = await ctx . maybe_send_embed ( " Running speedtest... " )
2024-05-25 17:19:48 -04:00
async with ctx . typing ( ) :
2024-05-25 17:55:05 -04:00
speedtest = await self . run_speedtest ( )
2024-05-25 17:19:48 -04:00
if await ctx . embed_requested ( ) :
2024-05-25 17:55:05 -04:00
if not isinstance ( speedtest , sp ) :
await msg . edit ( embed = discord . Embed ( description = f " An error occurred! { speedtest } " , color = discord . Colour . red ( ) ) )
2024-05-25 17:24:04 -04:00
return
2024-05-25 17:55:05 -04:00
embed = discord . Embed ( title = " Speedtest Results " , url = speedtest . result . url , color = await ctx . embed_color ( ) )
embed . add_field ( name = " Bandwidth " , value = f " { speedtest . download . bandwidth / 1_000_000 : .2f } Mbps down, { speedtest . download . bandwidth / 1_000_000 : .2f } Mbps up " )
embed . add_field ( name = " Ping " , value = f " Base: { round ( speedtest . ping . latency ) } ms \n Download: { round ( speedtest . download . latency . iqm ) } ms \n Upload: { round ( speedtest . upload . latency . iqm ) } ms " )
embed . set_image ( url = speedtest . result . image )
2024-05-25 17:19:48 -04:00
await msg . edit ( embed = embed )
else :
2024-05-25 17:55:05 -04:00
if not isinstance ( speedtest , sp ) :
await msg . edit ( content = f " An error occurred! { speedtest } " )
2024-05-25 17:24:04 -04:00
return
2024-05-25 17:19:48 -04:00
await msg . edit ( content = f " **Speedtest Results** \n "
2024-05-25 17:55:05 -04:00
f " **Bandwidth**: { speedtest . download . bandwidth / 1_000_000 : .2f } Mbps down, { speedtest . download . bandwidth / 1_000_000 : .2f } Mbps up \n "
f " **Ping**: Base: { round ( speedtest . ping . latency ) } ms \n "
f " Download: { round ( speedtest . download . latency . iqm ) } ms \n "
f " Upload: { round ( speedtest . upload . latency . iqm ) } ms \n "
f " **Image**: { speedtest . result . image } " )