254 lines
14 KiB
Python
254 lines
14 KiB
Python
# _____ _
|
|
# / ____| (_)
|
|
# | (___ ___ __ _ _____ ___ _ __ ___ _ __ ___ ___ _ __
|
|
# \___ \ / _ \/ _` / __\ \ /\ / / | '_ ` _ \| '_ ` _ \ / _ \ '__|
|
|
# ____) | __/ (_| \__ \\ V V /| | | | | | | | | | | | __/ |
|
|
# |_____/ \___|\__,_|___/ \_/\_/ |_|_| |_| |_|_| |_| |_|\___|_|
|
|
|
|
import asyncio
|
|
import inspect
|
|
import operator
|
|
import re
|
|
from asyncio.subprocess import Process
|
|
from functools import partial, partialmethod
|
|
from typing import Any
|
|
|
|
import aiohttp
|
|
import yaml
|
|
from bs4 import BeautifulSoup
|
|
from discord import Color, Embed, app_commands
|
|
from discord.utils import CachedSlotProperty, cached_property
|
|
from markdownify import MarkdownConverter
|
|
from redbot.core import commands
|
|
from redbot.core.bot import Red
|
|
from redbot.core.dev_commands import cleanup_code
|
|
from redbot.core.utils import chat_formatting as cf
|
|
from redbot.core.utils.views import SimpleMenu
|
|
|
|
|
|
def md(soup: BeautifulSoup, **options) -> Any | str:
|
|
return MarkdownConverter(**options).convert_soup(soup=soup)
|
|
|
|
def format_rfc_text(text: str, number: int) -> str:
|
|
one: str = re.sub(r"\(\.\/rfc(\d+)", r"(https://www.rfc-editor.org/rfc/rfc\1.html", text)
|
|
two: str = re.sub(r"\((#(?:section|page)-\d+(?:.\d+)?)\)", f"(https://www.rfc-editor.org/rfc/rfc{number}.html\1)", one)
|
|
three: str = re.sub(r"\n{3,}", "\n\n", two)
|
|
return three
|
|
|
|
class SeaUtils(commands.Cog):
|
|
"""A collection of random utilities."""
|
|
|
|
__author__ = ["[cswimr](https://www.coastalcommits.com/cswimr)"]
|
|
__git__ = "https://www.coastalcommits.com/cswimr/SeaCogs"
|
|
__version__ = "1.0.1"
|
|
__documentation__ = "https://seacogs.coastalcommits.com/seautils/"
|
|
|
|
def __init__(self, bot: Red) -> None:
|
|
self.bot = bot
|
|
|
|
def format_help_for_context(self, ctx: commands.Context) -> str:
|
|
pre_processed = super().format_help_for_context(ctx) or ""
|
|
n = "\n" if "\n\n" not in pre_processed else ""
|
|
text = [
|
|
f"{pre_processed}{n}",
|
|
f"{cf.bold('Cog Version:')} [{self.__version__}]({self.__git__})",
|
|
f"{cf.bold('Author:')} {cf.humanize_list(self.__author__)}",
|
|
f"{cf.bold('Documentation:')} {self.__documentation__}",
|
|
]
|
|
return "\n".join(text)
|
|
|
|
|
|
def format_src(self, obj: Any) -> str:
|
|
"""A large portion of this code is repurposed from Zephyrkul's RTFS cog.
|
|
https://github.com/Zephyrkul/FluffyCogs/blob/master/rtfs/rtfs.py"""
|
|
obj = inspect.unwrap(func=obj)
|
|
src: Any = getattr(obj, "__func__", obj)
|
|
if isinstance(obj, (commands.Command, app_commands.Command)):
|
|
src = obj.callback
|
|
elif isinstance(obj, (partial, partialmethod)):
|
|
src = obj.func
|
|
elif isinstance(obj, property):
|
|
src = obj.fget
|
|
elif isinstance(obj, (cached_property, CachedSlotProperty)):
|
|
src = obj.function
|
|
return inspect.getsource(object=src)
|
|
|
|
@commands.command(aliases=["source", "src", "code", "showsource"])
|
|
@commands.is_owner()
|
|
async def showcode(self, ctx: commands.Context, *, object: str) -> None: # pylint: disable=redefined-builtin
|
|
"""Show the code for a particular object."""
|
|
try:
|
|
if object.startswith("/") and (obj := ctx.bot.tree.get_command(object[1:])):
|
|
text = self.format_src(obj)
|
|
elif obj := ctx.bot.get_cog(object):
|
|
text = self.format_src(type(obj))
|
|
elif obj := ctx.bot.get_command(object):
|
|
text = self.format_src(obj)
|
|
else:
|
|
raise AttributeError
|
|
temp_content = cf.pagify(
|
|
text=cleanup_code(text),
|
|
escape_mass_mentions=True,
|
|
page_length = 1977
|
|
)
|
|
content = []
|
|
max_i = operator.length_hint(temp_content)
|
|
i = 1
|
|
for page in temp_content:
|
|
content.append(f"**Page {i}/{max_i}**\n{cf.box(page, lang='py')}")
|
|
i += 1
|
|
await SimpleMenu(pages=content, disable_after_timeout=True, timeout=180).start(ctx)
|
|
except (OSError, AttributeError, UnboundLocalError):
|
|
if ctx.embed_requested():
|
|
embed = Embed(title="Object not found!", color=await ctx.embed_color())
|
|
await ctx.send(embed=embed, reference=ctx.message.to_reference(fail_if_not_exists=False))
|
|
else:
|
|
await ctx.send(content="Object not found!", reference=ctx.message.to_reference(fail_if_not_exists=False))
|
|
|
|
@commands.command(name='dig', aliases=['dnslookup', 'nslookup'])
|
|
@commands.is_owner()
|
|
async def dig(self, ctx: commands.Context, name: str, record_type: str | None = None, server: str | None = None, port: int = 53) -> None:
|
|
"""Retrieve DNS information for a domain.
|
|
|
|
Uses `dig` to perform a DNS query. Will fall back to `nslookup` if `dig` is not installed on the system.
|
|
`nslookup` does not provide as much information as `dig`, so only the `name` parameter will be used if `nslookup` is used.
|
|
Will return the A, AAAA, and CNAME records for a domain by default. You can specify a different record type with the `type` parameter."""
|
|
command_opts: list[str | int] = ['dig']
|
|
query_types: list[str] = [record_type] if record_type else ['A', 'AAAA', 'CNAME']
|
|
if server:
|
|
command_opts.extend(['@', server])
|
|
for query_type in query_types:
|
|
command_opts.extend([name, query_type])
|
|
command_opts.extend(['-p', str(port), '+yaml'])
|
|
|
|
try:
|
|
process: Process = await asyncio.create_subprocess_exec(*command_opts, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE)
|
|
stdout, stderr = await process.communicate()
|
|
if stderr:
|
|
await ctx.maybe_send_embed(message="An error was encountered!\n" + cf.box(text=stderr.decode()))
|
|
else:
|
|
data = yaml.safe_load(stdout.decode())
|
|
message_data: dict = data[0]['message']
|
|
response_data: dict = message_data['response_message_data']
|
|
if ctx.embed_requested():
|
|
embed = Embed(
|
|
title="DNS Query Result",
|
|
color=await ctx.embed_color(),
|
|
timestamp=message_data['response_time']
|
|
)
|
|
embed.add_field(name="Response Address", value=message_data['response_address'], inline=True)
|
|
embed.add_field(name="Response Port", value=message_data['response_port'], inline=True)
|
|
embed.add_field(name="Query Address", value=message_data['query_address'], inline=True)
|
|
embed.add_field(name="Query Port", value=message_data['query_port'], inline=True)
|
|
embed.add_field(name="Status", value=response_data['status'], inline=True)
|
|
embed.add_field(name="Flags", value=response_data['flags'], inline=True)
|
|
|
|
if response_data.get('status') != 'NOERROR':
|
|
embed.colour = Color.red()
|
|
embed.description = cf.error("Dig query did not return `NOERROR` status.")
|
|
|
|
questions = []
|
|
answers = []
|
|
authorities = []
|
|
for m in data:
|
|
response = m['message']['response_message_data']
|
|
if 'QUESTION_SECTION' in response:
|
|
for question in response['QUESTION_SECTION']:
|
|
if question not in questions:
|
|
questions.append(question)
|
|
|
|
if 'ANSWER_SECTION' in response:
|
|
for answer in response['ANSWER_SECTION']:
|
|
if answer not in answers:
|
|
answers.append(answer)
|
|
|
|
if 'AUTHORITY_SECTION' in response:
|
|
for authority in response['AUTHORITY_SECTION']:
|
|
if authority not in authorities:
|
|
authorities.append(authority)
|
|
|
|
if questions:
|
|
question_section = "\n".join(questions)
|
|
embed.add_field(name="Question Section", value=f"{cf.box(text=question_section, lang='prolog')}", inline=False)
|
|
|
|
if answers:
|
|
answer_section = "\n".join(answers)
|
|
if len(answer_section) > 1024:
|
|
embed.description = cf.warning("Answer section is too long to fit within embed field, falling back to description.") + cf.box(answer_section)
|
|
else:
|
|
embed.add_field(name="Answer Section", value=f"{cf.box(text=answer_section, lang='prolog')}", inline=False)
|
|
|
|
if authorities:
|
|
authority_section = "\n".join(authorities)
|
|
embed.add_field(name="Authority Section", value=f"{cf.box(text=authority_section, lang='prolog')}", inline=False)
|
|
await ctx.send(embed=embed)
|
|
else:
|
|
await ctx.send(content=cf.box(text=stdout, lang='yaml'))
|
|
except (FileNotFoundError):
|
|
try:
|
|
ns_process = await asyncio.create_subprocess_exec('nslookup', name, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE)
|
|
ns_stdout, ns_stderr = await ns_process.communicate()
|
|
if ns_stderr:
|
|
await ctx.maybe_send_embed(message="An error was encountered!\n" + cf.box(text=ns_stderr.decode()))
|
|
else:
|
|
warning = cf.warning("`dig` is not installed! Defaulting to `nslookup`.\nThis command provides more information when `dig` is installed on the system.\n")
|
|
if await ctx.embed_requested():
|
|
embed = Embed(
|
|
title="DNS Query Result",
|
|
color=await ctx.embed_color(),
|
|
timestamp=ctx.message.created_at
|
|
)
|
|
embed.description = warning + cf.box(text=ns_stdout.decode())
|
|
await ctx.send(embed=embed)
|
|
else:
|
|
await ctx.send(content = warning + cf.box(text=ns_stdout.decode()))
|
|
except (FileNotFoundError):
|
|
await ctx.maybe_send_embed(message=cf.error("Neither `dig` nor `nslookup` are installed on the system. Unable to resolve DNS query."))
|
|
|
|
@commands.command()
|
|
async def rfc(self, ctx: commands.Context, number: int) -> None:
|
|
"""Retrieve the text of an RFC document.
|
|
|
|
This command uses the [RFC Editor website](https://www.rfc-editor.org/) to fetch the text of an RFC document.
|
|
A [Request for Comments (RFC)](https://en.wikipedia.org/wiki/Request_for_Comments) is a publication in a series from the principal technical development and standards-setting bodies for the [Internet](https://en.wikipedia.org/wiki/Internet), most prominently the [Internet Engineering Task Force](https://en.wikipedia.org/wiki/Internet_Engineering_Task_Force). An RFC is authored by individuals or groups of engineers and [computer scientists](https://en.wikipedia.org/wiki/Computer_scientist) in the form of a [memorandum](https://en.wikipedia.org/wiki/Memorandum) describing methods, behaviors, research, or innovations applicable to the working of the Internet and Internet-connected systems. It is submitted either for [peer review](https://en.wikipedia.org/wiki/Peer_review) or to convey new concepts, information, or, occasionally, engineering humor.""" # noqa: E501
|
|
url = f"https://www.rfc-editor.org/rfc/rfc{number}.html"
|
|
datatracker_url = f"https://datatracker.ietf.org/doc/rfc{number}"
|
|
async with aiohttp.ClientSession() as session:
|
|
async with session.get(url=url) as response:
|
|
if response.status == 200:
|
|
html = await response.text()
|
|
soup = BeautifulSoup(html, 'html.parser')
|
|
pre_tags = soup.find_all('pre')
|
|
content: list[Embed | str] = []
|
|
for pre_tag in pre_tags:
|
|
text = format_rfc_text(md(pre_tag), number)
|
|
if len(text) > 4096:
|
|
pagified_text = cf.pagify(text, delims=["\n\n"], page_length=4096)
|
|
for page in pagified_text:
|
|
if await ctx.embed_requested():
|
|
embed = Embed(
|
|
title=f"RFC Document {number}",
|
|
url=datatracker_url,
|
|
description=page,
|
|
color=await ctx.embed_color()
|
|
)
|
|
content.append(embed)
|
|
else:
|
|
content.append(page)
|
|
else:
|
|
if await ctx.embed_requested():
|
|
embed = Embed(
|
|
title=f"RFC Document {number}",
|
|
url=datatracker_url,
|
|
description=text,
|
|
color=await ctx.embed_color()
|
|
)
|
|
content.append(embed)
|
|
else:
|
|
content.append(text)
|
|
if await ctx.embed_requested():
|
|
for embed in content:
|
|
embed.set_footer(text=f"Page {content.index(embed) + 1}/{len(content)}")
|
|
await SimpleMenu(pages=content, disable_after_timeout=True, timeout=300).start(ctx)
|
|
else:
|
|
await ctx.maybe_send_embed(message=cf.error(f"An error occurred while fetching RFC {number}. Status code: {response.status}."))
|