# _____ _ # / ____| (_) # | (___ ___ __ _ _____ ___ _ __ ___ _ __ ___ ___ _ __ # \___ \ / _ \/ _` / __\ \ /\ / / | '_ ` _ \| '_ ` _ \ / _ \ '__| # ____) | __/ (_| \__ \\ V V /| | | | | | | | | | | | __/ | # |_____/ \___|\__,_|___/ \_/\_/ |_|_| |_| |_|_| |_| |_|\___|_| import asyncio import inspect import operator from asyncio.subprocess import Process from functools import partial, partialmethod from typing import Any import yaml from discord import Color, Embed, app_commands from discord.utils import CachedSlotProperty, cached_property from redbot.core import commands from redbot.core.bot import Red from redbot.core.dev_commands import cleanup_code from redbot.core.utils import chat_formatting as cf from redbot.core.utils.views import SimpleMenu class SeaUtils(commands.Cog): """A collection of random utilities.""" __author__ = ["SeaswimmerTheFsh"] __version__ = "1.0.0" def __init__(self, bot: Red): self.bot = bot def format_help_for_context(self, ctx: commands.Context) -> str: pre_processed = super().format_help_for_context(ctx) or "" n = "\n" if "\n\n" not in pre_processed else "" text = [ f"{pre_processed}{n}", f"Cog Version: **{self.__version__}**", f"Author: {cf.humanize_list(self.__author__)}" ] return "\n".join(text) def format_src(self, obj: Any) -> str: """A large portion of this code is repurposed from Zephyrkul's RTFS cog. https://github.com/Zephyrkul/FluffyCogs/blob/master/rtfs/rtfs.py""" obj = inspect.unwrap(obj) src: Any = getattr(obj, "__func__", obj) if isinstance(obj, (commands.Command, app_commands.Command)): src = obj.callback elif isinstance(obj, (partial, partialmethod)): src = obj.func elif isinstance(obj, property): src = obj.fget elif isinstance(obj, (cached_property, CachedSlotProperty)): src = obj.function return inspect.getsource(src) @commands.command(aliases=["source", "src", "code", "showsource"]) @commands.is_owner() async def showcode(self, ctx: commands.Context, *, object: str): # pylint: disable=redefined-builtin """Show the code for a particular object.""" try: if object.startswith("/") and (obj := ctx.bot.tree.get_command(object[1:])): text = self.format_src(obj) elif obj := ctx.bot.get_cog(object): text = self.format_src(type(obj)) elif obj := ctx.bot.get_command(object): text = self.format_src(obj) temp_content = cf.pagify( text=cleanup_code(text), escape_mass_mentions=True, page_length = 1977 ) content = [] max_i = operator.length_hint(temp_content) i = 1 for page in temp_content: content.append(f"**Page {i}/{max_i}**\n{cf.box(page, lang='py')}") i += 1 await SimpleMenu(pages=content, disable_after_timeout=True, timeout=180).start(ctx) except (OSError, AttributeError, UnboundLocalError): if ctx.embed_requested(): embed = Embed(title="Object not found!", color=await ctx.embed_color()) await ctx.send(embed=embed, reference=ctx.message.to_reference(fail_if_not_exists=False)) else: await ctx.send(content="Object not found!", reference=ctx.message.to_reference(fail_if_not_exists=False)) @commands.command(name='dig', aliases=['dnslookup', 'nslookup']) @commands.is_owner() async def dig(self, ctx: commands.Context, name: str, type: str | None = None, server: str | None = None, port: int = 53) -> None: """Retrieve DNS information for a domain. Uses `dig` to perform a DNS query. Will fall back to `nslookup` if `dig` is not installed on the system. `nslookup` does not provide as much information as `dig`, so only the `name` parameter will be used if `nslookup` is used. Will return the A, AAAA, and CNAME records for a domain by default. You can specify a different record type with the `type` parameter.""" command_opts: list[str | int] = ['dig'] query_types = [type] if type else ['A', 'AAAA', 'CNAME'] if server: command_opts.extend(['@', server]) for query_type in query_types: command_opts.extend([name, query_type]) command_opts.extend(['-p', str(port), '+yaml']) try: process: Process = await asyncio.create_subprocess_exec(*command_opts, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE) stdout, stderr = await process.communicate() if stderr: await ctx.maybe_send_embed(message="An error was encountered!\n" + cf.box(text=stderr.decode())) else: data = yaml.safe_load(stdout.decode()) message_data: dict = data[0]['message'] response_data: dict = message_data['response_message_data'] if ctx.embed_requested(): embed = Embed( title="DNS Query Result", color=await ctx.embed_color(), timestamp=message_data['response_time'] ) embed.add_field(name="Response Address", value=message_data['response_address'], inline=True) embed.add_field(name="Response Port", value=message_data['response_port'], inline=True) embed.add_field(name="Query Address", value=message_data['query_address'], inline=True) embed.add_field(name="Query Port", value=message_data['query_port'], inline=True) embed.add_field(name="Status", value=response_data['status'], inline=True) embed.add_field(name="Flags", value=response_data['flags'], inline=True) if response_data.get('status') != 'NOERROR': embed.colour = Color.red() embed.description = cf.error("Dig query did not return `NOERROR` status.") questions = [] answers = [] authorities = [] for m in data: response = m['message']['response_message_data'] if 'QUESTION_SECTION' in response: for question in response['QUESTION_SECTION']: if question not in questions: questions.append(question) if 'ANSWER_SECTION' in response: for answer in response['ANSWER_SECTION']: if answer not in answers: answers.append(answer) if 'AUTHORITY_SECTION' in response: for authority in response['AUTHORITY_SECTION']: if authority not in authorities: authorities.append(authority) if questions: question_section = "\n".join(questions) embed.add_field(name="Question Section", value=f"{cf.box(question_section)}", inline=False) if answers: answer_section = "\n".join(answers) if len(answer_section) > 1024: embed.description = cf.warning("Answer section is too long to fit within embed field, falling back to description.") + cf.box(answer_section) else: embed.add_field(name="Answer Section", value=f"{cf.box(answer_section)}", inline=False) if authorities: authority_section = "\n".join(authorities) embed.add_field(name="Authority Section", value=f"{cf.box(authority_section)}", inline=False) await ctx.send(embed=embed) else: await ctx.send(content=cf.box(text=stdout, lang='yaml')) except (FileNotFoundError): try: ns_process = await asyncio.create_subprocess_exec('nslookup', name, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE) ns_stdout, ns_stderr = await ns_process.communicate() if ns_stderr: await ctx.maybe_send_embed(message="An error was encountered!\n" + cf.box(text=ns_stderr.decode())) else: warning = cf.warning("`dig` is not installed! Defaulting to `nslookup`.\nThis command provides more information when `dig` is installed on the system.\n") if await ctx.embed_requested(): embed = Embed( title="DNS Query Result", color=await ctx.embed_color(), timestamp=ctx.message.created_at ) embed.description = warning + cf.box(text=ns_stdout.decode()) await ctx.send(embed=embed) else: await ctx.send(content= warning + cf.box(text=ns_stdout.decode())) except (FileNotFoundError): await ctx.maybe_send_embed(message=cf.error("Neither `dig` nor `nslookup` are installed on the system. Unable to resolve DNS query."))