# _____ _ # / ____| (_) # | (___ ___ __ _ _____ ___ _ __ ___ _ __ ___ ___ _ __ # \___ \ / _ \/ _` / __\ \ /\ / / | '_ ` _ \| '_ ` _ \ / _ \ '__| # ____) | __/ (_| \__ \\ V V /| | | | | | | | | | | | __/ | # |_____/ \___|\__,_|___/ \_/\_/ |_|_| |_| |_|_| |_| |_|\___|_| import asyncio import inspect import operator import re from asyncio.subprocess import Process from functools import partial, partialmethod from typing import Any import aiohttp import yaml from bs4 import BeautifulSoup from discord import Color, Embed, app_commands from discord.utils import CachedSlotProperty, cached_property from markdownify import MarkdownConverter from redbot.core import commands from redbot.core.bot import Red from redbot.core.dev_commands import cleanup_code from redbot.core.utils import chat_formatting as cf from redbot.core.utils.views import SimpleMenu def md(soup: BeautifulSoup, **options) -> Any | str: return MarkdownConverter(**options).convert_soup(soup) def convert_rfc_references(text: str): return re.sub(r"\(\.rfc\s(\d+)", r"(https://www.rfc-editor.org/rfc/rfc\1.html", text) class SeaUtils(commands.Cog): """A collection of random utilities.""" __author__ = ["SeaswimmerTheFsh"] __version__ = "1.0.0" def __init__(self, bot: Red): self.bot = bot def format_help_for_context(self, ctx: commands.Context) -> str: pre_processed = super().format_help_for_context(ctx) or "" n = "\n" if "\n\n" not in pre_processed else "" text = [ f"{pre_processed}{n}", f"Cog Version: **{self.__version__}**", f"Author: {cf.humanize_list(self.__author__)}" ] return "\n".join(text) def format_src(self, obj: Any) -> str: """A large portion of this code is repurposed from Zephyrkul's RTFS cog. https://github.com/Zephyrkul/FluffyCogs/blob/master/rtfs/rtfs.py""" obj = inspect.unwrap(obj) src: Any = getattr(obj, "__func__", obj) if isinstance(obj, (commands.Command, app_commands.Command)): src = obj.callback elif isinstance(obj, (partial, partialmethod)): src = obj.func elif isinstance(obj, property): src = obj.fget elif isinstance(obj, (cached_property, CachedSlotProperty)): src = obj.function return inspect.getsource(src) @commands.command(aliases=["source", "src", "code", "showsource"]) @commands.is_owner() async def showcode(self, ctx: commands.Context, *, object: str): # pylint: disable=redefined-builtin """Show the code for a particular object.""" try: if object.startswith("/") and (obj := ctx.bot.tree.get_command(object[1:])): text = self.format_src(obj) elif obj := ctx.bot.get_cog(object): text = self.format_src(type(obj)) elif obj := ctx.bot.get_command(object): text = self.format_src(obj) temp_content = cf.pagify( text=cleanup_code(text), escape_mass_mentions=True, page_length = 1977 ) content = [] max_i = operator.length_hint(temp_content) i = 1 for page in temp_content: content.append(f"**Page {i}/{max_i}**\n{cf.box(page, lang='py')}") i += 1 await SimpleMenu(pages=content, disable_after_timeout=True, timeout=180).start(ctx) except (OSError, AttributeError, UnboundLocalError): if ctx.embed_requested(): embed = Embed(title="Object not found!", color=await ctx.embed_color()) await ctx.send(embed=embed, reference=ctx.message.to_reference(fail_if_not_exists=False)) else: await ctx.send(content="Object not found!", reference=ctx.message.to_reference(fail_if_not_exists=False)) @commands.command(name='dig', aliases=['dnslookup', 'nslookup']) @commands.is_owner() async def dig(self, ctx: commands.Context, name: str, type: str | None = None, server: str | None = None, port: int = 53) -> None: """Retrieve DNS information for a domain. Uses `dig` to perform a DNS query. Will fall back to `nslookup` if `dig` is not installed on the system. `nslookup` does not provide as much information as `dig`, so only the `name` parameter will be used if `nslookup` is used. Will return the A, AAAA, and CNAME records for a domain by default. You can specify a different record type with the `type` parameter.""" command_opts: list[str | int] = ['dig'] query_types = [type] if type else ['A', 'AAAA', 'CNAME'] if server: command_opts.extend(['@', server]) for query_type in query_types: command_opts.extend([name, query_type]) command_opts.extend(['-p', str(port), '+yaml']) try: process: Process = await asyncio.create_subprocess_exec(*command_opts, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE) stdout, stderr = await process.communicate() if stderr: await ctx.maybe_send_embed(message="An error was encountered!\n" + cf.box(text=stderr.decode())) else: data = yaml.safe_load(stdout.decode()) message_data: dict = data[0]['message'] response_data: dict = message_data['response_message_data'] if ctx.embed_requested(): embed = Embed( title="DNS Query Result", color=await ctx.embed_color(), timestamp=message_data['response_time'] ) embed.add_field(name="Response Address", value=message_data['response_address'], inline=True) embed.add_field(name="Response Port", value=message_data['response_port'], inline=True) embed.add_field(name="Query Address", value=message_data['query_address'], inline=True) embed.add_field(name="Query Port", value=message_data['query_port'], inline=True) embed.add_field(name="Status", value=response_data['status'], inline=True) embed.add_field(name="Flags", value=response_data['flags'], inline=True) if response_data.get('status') != 'NOERROR': embed.colour = Color.red() embed.description = cf.error("Dig query did not return `NOERROR` status.") questions = [] answers = [] authorities = [] for m in data: response = m['message']['response_message_data'] if 'QUESTION_SECTION' in response: for question in response['QUESTION_SECTION']: if question not in questions: questions.append(question) if 'ANSWER_SECTION' in response: for answer in response['ANSWER_SECTION']: if answer not in answers: answers.append(answer) if 'AUTHORITY_SECTION' in response: for authority in response['AUTHORITY_SECTION']: if authority not in authorities: authorities.append(authority) if questions: question_section = "\n".join(questions) embed.add_field(name="Question Section", value=f"{cf.box(text=question_section, lang='prolog')}", inline=False) if answers: answer_section = "\n".join(answers) if len(answer_section) > 1024: embed.description = cf.warning("Answer section is too long to fit within embed field, falling back to description.") + cf.box(answer_section) else: embed.add_field(name="Answer Section", value=f"{cf.box(text=answer_section, lang='prolog')}", inline=False) if authorities: authority_section = "\n".join(authorities) embed.add_field(name="Authority Section", value=f"{cf.box(text=authority_section, lang='prolog')}", inline=False) await ctx.send(embed=embed) else: await ctx.send(content=cf.box(text=stdout, lang='yaml')) except (FileNotFoundError): try: ns_process = await asyncio.create_subprocess_exec('nslookup', name, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE) ns_stdout, ns_stderr = await ns_process.communicate() if ns_stderr: await ctx.maybe_send_embed(message="An error was encountered!\n" + cf.box(text=ns_stderr.decode())) else: warning = cf.warning("`dig` is not installed! Defaulting to `nslookup`.\nThis command provides more information when `dig` is installed on the system.\n") if await ctx.embed_requested(): embed = Embed( title="DNS Query Result", color=await ctx.embed_color(), timestamp=ctx.message.created_at ) embed.description = warning + cf.box(text=ns_stdout.decode()) await ctx.send(embed=embed) else: await ctx.send(content= warning + cf.box(text=ns_stdout.decode())) except (FileNotFoundError): await ctx.maybe_send_embed(message=cf.error("Neither `dig` nor `nslookup` are installed on the system. Unable to resolve DNS query.")) @commands.command() async def rfc(self, ctx: commands.Context, number: int) -> None: """Retrieve the text of an RFC document.""" url = f"https://www.rfc-editor.org/rfc/rfc{number}.html" async with aiohttp.ClientSession() as session: async with session.get(url=url) as response: if response.status == 200: html = await response.text() soup = BeautifulSoup(html, 'html.parser') pre_tags = soup.find_all('pre') content = [] for pre_tag in pre_tags: text = convert_rfc_references(md(pre_tag)) if await ctx.embed_requested(): embed = Embed( title="RFC Document", description=text, color=await ctx.embed_color() ) content.append(embed) else: content.append(text) await SimpleMenu(pages=content, disable_after_timeout=True, timeout=300).start(ctx) else: await ctx.maybe_send_embed(content=cf.error(f"An error occurred while fetching RFC {number}. Status code: {response.status}."))