# _____ _ # / ____| (_) # | (___ ___ __ _ _____ ___ _ __ ___ _ __ ___ ___ _ __ # \___ \ / _ \/ _` / __\ \ /\ / / | '_ ` _ \| '_ ` _ \ / _ \ '__| # ____) | __/ (_| \__ \\ V V /| | | | | | | | | | | | __/ | # |_____/ \___|\__,_|___/ \_/\_/ |_|_| |_| |_|_| |_| |_|\___|_| import asyncio import inspect import operator import re from asyncio.subprocess import Process from functools import partial, partialmethod from typing import Any import aiohttp import yaml from bs4 import BeautifulSoup from discord import Color, Embed, app_commands from discord.utils import CachedSlotProperty, cached_property from markdownify import MarkdownConverter from redbot.core import commands from redbot.core.bot import Red from redbot.core.dev_commands import cleanup_code from redbot.core.utils import chat_formatting as cf from redbot.core.utils.views import SimpleMenu def md(soup: BeautifulSoup, **options) -> Any | str: return MarkdownConverter(**options).convert_soup(soup) def format_rfc_text(text: str, number: int) -> str: one = re.sub(r"\(\.\/rfc(\d+)", r"(https://www.rfc-editor.org/rfc/rfc\1.html", text) two = re.sub(r"\((#(?:section|page)-\d+(?:.\d+)?)\)", f"(https://www.rfc-editor.org/rfc/rfc{number}.html\1)", one) three: str = re.sub(r"\n{3,}", "\n\n", two) return three class SeaUtils(commands.Cog): """A collection of random utilities.""" __author__ = ["SeaswimmerTheFsh"] __version__ = "1.0.0" def __init__(self, bot: Red): self.bot = bot def format_help_for_context(self, ctx: commands.Context) -> str: pre_processed = super().format_help_for_context(ctx) or "" n = "\n" if "\n\n" not in pre_processed else "" text = [ f"{pre_processed}{n}", f"Cog Version: **{self.__version__}**", f"Author: {cf.humanize_list(self.__author__)}" ] return "\n".join(text) def format_src(self, obj: Any) -> str: """A large portion of this code is repurposed from Zephyrkul's RTFS cog. https://github.com/Zephyrkul/FluffyCogs/blob/master/rtfs/rtfs.py""" obj = inspect.unwrap(obj) src: Any = getattr(obj, "__func__", obj) if isinstance(obj, (commands.Command, app_commands.Command)): src = obj.callback elif isinstance(obj, (partial, partialmethod)): src = obj.func elif isinstance(obj, property): src = obj.fget elif isinstance(obj, (cached_property, CachedSlotProperty)): src = obj.function return inspect.getsource(src) @commands.command(aliases=["source", "src", "code", "showsource"]) @commands.is_owner() async def showcode(self, ctx: commands.Context, *, object: str): # pylint: disable=redefined-builtin """Show the code for a particular object.""" try: if object.startswith("/") and (obj := ctx.bot.tree.get_command(object[1:])): text = self.format_src(obj) elif obj := ctx.bot.get_cog(object): text = self.format_src(type(obj)) elif obj := ctx.bot.get_command(object): text = self.format_src(obj) temp_content = cf.pagify( text=cleanup_code(text), escape_mass_mentions=True, page_length = 1977 ) content = [] max_i = operator.length_hint(temp_content) i = 1 for page in temp_content: content.append(f"**Page {i}/{max_i}**\n{cf.box(page, lang='py')}") i += 1 await SimpleMenu(pages=content, disable_after_timeout=True, timeout=180).start(ctx) except (OSError, AttributeError, UnboundLocalError): if ctx.embed_requested(): embed = Embed(title="Object not found!", color=await ctx.embed_color()) await ctx.send(embed=embed, reference=ctx.message.to_reference(fail_if_not_exists=False)) else: await ctx.send(content="Object not found!", reference=ctx.message.to_reference(fail_if_not_exists=False)) @commands.command(name='dig', aliases=['dnslookup', 'nslookup']) @commands.is_owner() async def dig(self, ctx: commands.Context, name: str, type: str | None = None, server: str | None = None, port: int = 53) -> None: """Retrieve DNS information for a domain. Uses `dig` to perform a DNS query. Will fall back to `nslookup` if `dig` is not installed on the system. `nslookup` does not provide as much information as `dig`, so only the `name` parameter will be used if `nslookup` is used. Will return the A, AAAA, and CNAME records for a domain by default. You can specify a different record type with the `type` parameter.""" command_opts: list[str | int] = ['dig'] query_types = [type] if type else ['A', 'AAAA', 'CNAME'] if server: command_opts.extend(['@', server]) for query_type in query_types: command_opts.extend([name, query_type]) command_opts.extend(['-p', str(port), '+yaml']) try: process: Process = await asyncio.create_subprocess_exec(*command_opts, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE) stdout, stderr = await process.communicate() if stderr: await ctx.maybe_send_embed(message="An error was encountered!\n" + cf.box(text=stderr.decode())) else: data = yaml.safe_load(stdout.decode()) message_data: dict = data[0]['message'] response_data: dict = message_data['response_message_data'] if ctx.embed_requested(): embed = Embed( title="DNS Query Result", color=await ctx.embed_color(), timestamp=message_data['response_time'] ) embed.add_field(name="Response Address", value=message_data['response_address'], inline=True) embed.add_field(name="Response Port", value=message_data['response_port'], inline=True) embed.add_field(name="Query Address", value=message_data['query_address'], inline=True) embed.add_field(name="Query Port", value=message_data['query_port'], inline=True) embed.add_field(name="Status", value=response_data['status'], inline=True) embed.add_field(name="Flags", value=response_data['flags'], inline=True) if response_data.get('status') != 'NOERROR': embed.colour = Color.red() embed.description = cf.error("Dig query did not return `NOERROR` status.") questions = [] answers = [] authorities = [] for m in data: response = m['message']['response_message_data'] if 'QUESTION_SECTION' in response: for question in response['QUESTION_SECTION']: if question not in questions: questions.append(question) if 'ANSWER_SECTION' in response: for answer in response['ANSWER_SECTION']: if answer not in answers: answers.append(answer) if 'AUTHORITY_SECTION' in response: for authority in response['AUTHORITY_SECTION']: if authority not in authorities: authorities.append(authority) if questions: question_section = "\n".join(questions) embed.add_field(name="Question Section", value=f"{cf.box(text=question_section, lang='prolog')}", inline=False) if answers: answer_section = "\n".join(answers) if len(answer_section) > 1024: embed.description = cf.warning("Answer section is too long to fit within embed field, falling back to description.") + cf.box(answer_section) else: embed.add_field(name="Answer Section", value=f"{cf.box(text=answer_section, lang='prolog')}", inline=False) if authorities: authority_section = "\n".join(authorities) embed.add_field(name="Authority Section", value=f"{cf.box(text=authority_section, lang='prolog')}", inline=False) await ctx.send(embed=embed) else: await ctx.send(content=cf.box(text=stdout, lang='yaml')) except (FileNotFoundError): try: ns_process = await asyncio.create_subprocess_exec('nslookup', name, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE) ns_stdout, ns_stderr = await ns_process.communicate() if ns_stderr: await ctx.maybe_send_embed(message="An error was encountered!\n" + cf.box(text=ns_stderr.decode())) else: warning = cf.warning("`dig` is not installed! Defaulting to `nslookup`.\nThis command provides more information when `dig` is installed on the system.\n") if await ctx.embed_requested(): embed = Embed( title="DNS Query Result", color=await ctx.embed_color(), timestamp=ctx.message.created_at ) embed.description = warning + cf.box(text=ns_stdout.decode()) await ctx.send(embed=embed) else: await ctx.send(content= warning + cf.box(text=ns_stdout.decode())) except (FileNotFoundError): await ctx.maybe_send_embed(message=cf.error("Neither `dig` nor `nslookup` are installed on the system. Unable to resolve DNS query.")) @commands.command() async def rfc(self, ctx: commands.Context, number: int) -> None: """Retrieve the text of an RFC document. This command uses the [RFC Editor website](https://www.rfc-editor.org/) to fetch the text of an RFC document. A [Request for Comments (RFC)](https://en.wikipedia.org/wiki/Request_for_Comments) is a publication in a series from the principal technical development and standards-setting bodies for the [Internet](https://en.wikipedia.org/wiki/Internet), most prominently the [Internet Engineering Task Force](https://en.wikipedia.org/wiki/Internet_Engineering_Task_Force). An RFC is authored by individuals or groups of engineers and [computer scientists](https://en.wikipedia.org/wiki/Computer_scientist) in the form of a [memorandum](https://en.wikipedia.org/wiki/Memorandum) describing methods, behaviors, research, or innovations applicable to the working of the Internet and Internet-connected systems. It is submitted either for [peer review](https://en.wikipedia.org/wiki/Peer_review) or to convey new concepts, information, or, occasionally, engineering humor.""" # noqa: E501 url = f"https://www.rfc-editor.org/rfc/rfc{number}.html" datatracker_url = f"https://datatracker.ietf.org/doc/rfc{number}" async with aiohttp.ClientSession() as session: async with session.get(url=url) as response: if response.status == 200: html = await response.text() soup = BeautifulSoup(html, 'html.parser') pre_tags = soup.find_all('pre') content: list[Embed | str] = [] for pre_tag in pre_tags: text = format_rfc_text(md(pre_tag), number) if len(text) > 4096: pagified_text = cf.pagify(text, delims=["\n\n"], page_length=4096) for page in pagified_text: if await ctx.embed_requested(): embed = Embed( title=f"RFC Document {number}", url=datatracker_url, description=page, color=await ctx.embed_color() ) content.append(embed) else: content.append(page) else: if await ctx.embed_requested(): embed = Embed( title=f"RFC Document {number}", url=datatracker_url, description=text, color=await ctx.embed_color() ) content.append(embed) else: content.append(text) if await ctx.embed_requested(): for embed in content: embed.set_footer(text=f"Page {content.index(embed) + 1}/{len(content)}") await SimpleMenu(pages=content, disable_after_timeout=True, timeout=300).start(ctx) else: await ctx.maybe_send_embed(content=cf.error(f"An error occurred while fetching RFC {number}. Status code: {response.status}."))