SeaCogs/seautils/seautils.py

185 lines
9.4 KiB
Python

# _____ _
# / ____| (_)
# | (___ ___ __ _ _____ ___ _ __ ___ _ __ ___ ___ _ __
# \___ \ / _ \/ _` / __\ \ /\ / / | '_ ` _ \| '_ ` _ \ / _ \ '__|
# ____) | __/ (_| \__ \\ V V /| | | | | | | | | | | | __/ |
# |_____/ \___|\__,_|___/ \_/\_/ |_|_| |_| |_|_| |_| |_|\___|_|
import asyncio
import inspect
import operator
from asyncio.subprocess import Process
from functools import partial, partialmethod
from typing import Any
import yaml
from discord import Color, Embed, app_commands
from discord.utils import CachedSlotProperty, cached_property
from redbot.core import commands
from redbot.core.bot import Red
from redbot.core.dev_commands import cleanup_code
from redbot.core.utils import chat_formatting as cf
from redbot.core.utils.views import SimpleMenu
class SeaUtils(commands.Cog):
"""A collection of random utilities."""
__author__ = ["SeaswimmerTheFsh"]
__version__ = "1.0.0"
def __init__(self, bot: Red):
self.bot = bot
def format_help_for_context(self, ctx: commands.Context) -> str:
pre_processed = super().format_help_for_context(ctx) or ""
n = "\n" if "\n\n" not in pre_processed else ""
text = [
f"{pre_processed}{n}",
f"Cog Version: **{self.__version__}**",
f"Author: {cf.humanize_list(self.__author__)}"
]
return "\n".join(text)
def format_src(self, obj: Any) -> str:
"""A large portion of this code is repurposed from Zephyrkul's RTFS cog.
https://github.com/Zephyrkul/FluffyCogs/blob/master/rtfs/rtfs.py"""
obj = inspect.unwrap(obj)
src: Any = getattr(obj, "__func__", obj)
if isinstance(obj, (commands.Command, app_commands.Command)):
src = obj.callback
elif isinstance(obj, (partial, partialmethod)):
src = obj.func
elif isinstance(obj, property):
src = obj.fget
elif isinstance(obj, (cached_property, CachedSlotProperty)):
src = obj.function
return inspect.getsource(src)
@commands.command(aliases=["source", "src", "code", "showsource"])
@commands.is_owner()
async def showcode(self, ctx: commands.Context, *, object: str): # pylint: disable=redefined-builtin
"""Show the code for a particular object."""
try:
if object.startswith("/") and (obj := ctx.bot.tree.get_command(object[1:])):
text = self.format_src(obj)
elif obj := ctx.bot.get_cog(object):
text = self.format_src(type(obj))
elif obj := ctx.bot.get_command(object):
text = self.format_src(obj)
temp_content = cf.pagify(
text=cleanup_code(text),
escape_mass_mentions=True,
page_length = 1977
)
content = []
max_i = operator.length_hint(temp_content)
i = 1
for page in temp_content:
content.append(f"**Page {i}/{max_i}**\n{cf.box(page, lang='py')}")
i += 1
await SimpleMenu(pages=content, disable_after_timeout=True, timeout=180).start(ctx)
except (OSError, AttributeError, UnboundLocalError):
if ctx.embed_requested():
embed = Embed(title="Object not found!", color=await ctx.embed_color())
await ctx.send(embed=embed, reference=ctx.message.to_reference(fail_if_not_exists=False))
else:
await ctx.send(content="Object not found!", reference=ctx.message.to_reference(fail_if_not_exists=False))
@commands.command(name='dig', aliases=['dnslookup', 'nslookup'])
@commands.is_owner()
async def dig(self, ctx: commands.Context, name: str, type: str | None = None, server: str | None = None, port: int = 53) -> None:
"""Retrieve DNS information for a domain.
Uses `dig` to perform a DNS query. Will fall back to `nslookup` if `dig` is not installed on the system.
`nslookup` does not provide as much information as `dig`, so only the `name` parameter will be used if `nslookup` is used.
Will return the A, AAAA, and CNAME records for a domain by default. You can specify a different record type with the `type` parameter."""
command_opts: list[str | int] = ['dig']
query_types = [type] if type else ['A', 'AAAA', 'CNAME']
if server:
command_opts.extend(['@', server])
for query_type in query_types:
command_opts.extend([name, query_type])
command_opts.extend(['-p', str(port), '+yaml'])
try:
process: Process = await asyncio.create_subprocess_exec(*command_opts, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE)
stdout, stderr = await process.communicate()
if stderr:
await ctx.maybe_send_embed(message="An error was encountered!\n" + cf.box(text=stderr.decode()))
else:
data = yaml.safe_load(stdout.decode())
message_data: dict = data[0]['message']
response_data: dict = message_data['response_message_data']
if ctx.embed_requested():
embed = Embed(
title="DNS Query Result",
color=await ctx.embed_color(),
timestamp=message_data['response_time']
)
embed.add_field(name="Response Address", value=message_data['response_address'], inline=True)
embed.add_field(name="Response Port", value=message_data['response_port'], inline=True)
embed.add_field(name="Query Address", value=message_data['query_address'], inline=True)
embed.add_field(name="Query Port", value=message_data['query_port'], inline=True)
embed.add_field(name="Status", value=response_data['status'], inline=True)
embed.add_field(name="Flags", value=response_data['flags'], inline=True)
if response_data.get('status') != 'NOERROR':
embed.colour = Color.red()
embed.description = cf.error("Dig query did not return `NOERROR` status.")
questions = []
answers = []
authorities = []
for m in data:
response = m['message']['response_message_data']
if 'QUESTION_SECTION' in response:
for question in response['QUESTION_SECTION']:
if question not in questions:
questions.append(question)
if 'ANSWER_SECTION' in response:
for answer in response['ANSWER_SECTION']:
if answer not in answers:
answers.append(answer)
if 'AUTHORITY_SECTION' in response:
for authority in response['AUTHORITY_SECTION']:
if authority not in authorities:
authorities.append(authority)
question_section = "\n".join(questions)
embed.add_field(name="Question Section", value=f"{cf.box(question_section)}", inline=False)
answer_section = "\n".join(answers)
if len(answer_section) > 1024:
embed.description = cf.warning("Answer section is too long to fit within embed field, falling back to description.") + cf.box(answer_section)
else:
embed.add_field(name="Answer Section", value=f"{cf.box(answer_section)}", inline=False)
if not len(authorities) == 0:
authority_section = "\n".join(authorities)
embed.add_field(name="Authority Section", value=f"{cf.box(authority_section)}", inline=False)
await ctx.send(embed=embed)
else:
await ctx.send(content=cf.box(text=stdout, lang='yaml'))
except (FileNotFoundError):
try:
ns_process = await asyncio.create_subprocess_exec('nslookup', name, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE)
ns_stdout, ns_stderr = await ns_process.communicate()
if ns_stderr:
await ctx.maybe_send_embed(message="An error was encountered!\n" + cf.box(text=ns_stderr.decode()))
else:
warning = cf.warning("`dig` is not installed! Defaulting to `nslookup`.\nThis command provides more information when `dig` is installed on the system.\n")
if await ctx.embed_requested():
embed = Embed(
title="DNS Query Result",
color=await ctx.embed_color(),
timestamp=ctx.message.created_at
)
embed.description = warning + cf.box(text=ns_stdout.decode())
await ctx.send(embed=embed)
else:
await ctx.send(content= warning + cf.box(text=ns_stdout.decode()))
except (FileNotFoundError):
await ctx.maybe_send_embed(message=cf.error("Neither `dig` nor `nslookup` are installed on the system. Unable to resolve DNS query."))