2024-05-25 17:09:07 -04:00
# _____ _
# / ____| (_)
# | (___ ___ __ _ _____ ___ _ __ ___ _ __ ___ ___ _ __
# \___ \ / _ \/ _` / __\ \ /\ / / | '_ ` _ \| '_ ` _ \ / _ \ '__|
# ____) | __/ (_| \__ \\ V V /| | | | | | | | | | | | __/ |
# |_____/ \___|\__,_|___/ \_/\_/ |_|_| |_| |_|_| |_| |_|\___|_|
2024-05-25 17:19:48 -04:00
import asyncio
2024-05-25 17:09:07 -04:00
import json
import subprocess
2024-05-25 17:20:33 -04:00
from typing import Any
2024-05-25 17:09:07 -04:00
import discord
from redbot . core import commands
from redbot . core . bot import Red
from redbot . core . utils import chat_formatting as cf
class Speedtest ( commands . Cog ) :
""" A collection of random utilities. """
__author__ = [ " SeaswimmerTheFsh " ]
__version__ = " 1.0.0 "
def __init__ ( self , bot : Red ) :
self . bot = bot
def format_help_for_context ( self , ctx : commands . Context ) - > str :
pre_processed = super ( ) . format_help_for_context ( ctx ) or " "
n = " \n " if " \n \n " not in pre_processed else " "
text = [
f " { pre_processed } { n } " ,
f " Cog Version: ** { self . __version__ } ** " ,
f " Author: { cf . humanize_list ( self . __author__ ) } "
]
return " \n " . join ( text )
2024-05-25 17:20:33 -04:00
async def run_speedtest ( self ) - > str | Any :
2024-05-25 17:21:33 -04:00
try :
process = await asyncio . create_subprocess_exec (
" speedtest " , " -f json " , " --accept-license " ,
stdout = subprocess . PIPE ,
stderr = subprocess . PIPE
)
except FileNotFoundError :
return " Speedtest CLI is not installed. "
2024-05-25 17:19:48 -04:00
stdout , stderr = await process . communicate ( )
if process . returncode != 0 :
return stderr . decode ( " utf-8 " )
return json . loads ( stdout . decode ( " utf-8 " ) )
2024-05-25 17:09:07 -04:00
@commands.command ( )
@commands.is_owner ( )
2024-05-25 17:19:48 -04:00
async def speedtest ( self , ctx : commands . Context ) - > None :
2024-05-25 17:09:07 -04:00
""" Run a speedtest. """
msg = await ctx . maybe_send_embed ( " Running speedtest... " )
2024-05-25 17:19:48 -04:00
async with ctx . typing ( ) :
r = await self . run_speedtest ( )
if await ctx . embed_requested ( ) :
2024-05-25 17:24:04 -04:00
if isinstance ( r , str ) :
2024-05-25 17:24:40 -04:00
await msg . edit ( embed = discord . Embed ( description = f " An error occurred! { r } " , color = discord . Colour . red ( ) ) )
2024-05-25 17:24:04 -04:00
return
2024-05-25 17:19:48 -04:00
embed = discord . Embed ( title = " Speedtest Results " , link = r [ " result " ] [ " url " ] , color = await ctx . embed_color ( ) )
embed . add_field ( name = " Bandwidth " , value = f " { r [ ' download ' ] [ ' bandwidth ' ] / 1_000_000 : .2f } Mbps down, { r [ ' upload ' ] [ ' bandwidth ' ] / 1_000_000 : .2f } Mbps up " )
embed . add_field ( name = " Ping " , value = f " Base: { round ( r [ ' ping ' ] [ ' latency ' ] ) } ms \n Download: { round ( r [ ' download ' ] [ ' latency ' ] [ ' igm ' ] ) } ms \n Upload: { round ( r [ ' upload ' ] [ ' latency ' ] [ ' igm ' ] ) } ms " )
embed . set_image ( url = r [ " result " ] [ " image " ] + " .png " )
await msg . edit ( embed = embed )
else :
2024-05-25 17:24:04 -04:00
if isinstance ( r , str ) :
await msg . edit ( content = f " An error occurred! { r } " )
return
2024-05-25 17:19:48 -04:00
await msg . edit ( content = f " **Speedtest Results** \n "
f " **Bandwidth**: { r [ ' download ' ] [ ' bandwidth ' ] / 1_000_000 : .2f } Mbps down, { r [ ' upload ' ] [ ' bandwidth ' ] / 1_000_000 : .2f } Mbps up \n "
f " **Ping**: Base: { round ( r [ ' ping ' ] [ ' latency ' ] ) } ms, Download: { round ( r [ ' download ' ] [ ' latency ' ] [ ' igm ' ] ) } ms, Upload: { round ( r [ ' upload ' ] [ ' latency ' ] [ ' igm ' ] ) } ms \n "
f " **Results**: { r [ ' result ' ] [ ' url ' ] } " )