SeaCogs/updatechecker/updatechecker.py

460 lines
20 KiB
Python

"""
MIT License
Copyright (c) 2018-Present NeuroAssassin
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
"""
# Huge thanks to Sinbad for allowing me to copy parts of his RSS cog
# (https://github.com/mikeshardmind/SinbadCogs/tree/v3/rss), which I
# used to grab the latest commits from repositories.
# Also, the code I use for updating repos I took directly from Red,
# and just took out the message interactions
import asyncio
import traceback
from datetime import datetime
from urllib.parse import urlparse
import aiohttp
import discord
from discord.ext import tasks
from redbot.cogs.downloader.repo_manager import Repo
from redbot.core import Config, commands
from redbot.core.utils.chat_formatting import box, humanize_list, inline
import feedparser
class UpdateChecker(commands.Cog):
"""Get notices or auto-update cogs when an update is available for its repo"""
def __init__(self, bot):
self.bot = bot
self.session = aiohttp.ClientSession()
self.conf = Config.get_conf(self, identifier=473541068378341376)
default_global = {
"repos": {},
"auto": False,
"gochannel": 0,
"embed": True,
"whitelist": [],
"blacklist": [],
}
self.conf.register_global(**default_global)
self.task = self.bot.loop.create_task(self.bg_task())
async def cog_unload(self):
self.__unload()
def __unload(self):
self.task.cancel()
self.session.detach()
async def red_delete_data_for_user(self, **kwargs): # pylint: disable=unused-argument
"""This cog does not store user data"""
return
@tasks.loop(minutes=1)
async def bg_task(self):
cog = self.bot.get_cog("Downloader")
if cog is not None:
data = await self.conf.all()
repos = data["repos"]
channel = data["gochannel"]
auto = data["auto"]
use_embed = data["embed"]
whitelist = data["whitelist"]
blacklist = data["blacklist"]
if channel:
channel = self.bot.get_channel(channel)
if channel is None:
await self.bot.send_to_owners(
"[Update Checker] It appears that I am no longer allowed to send messages to the designated update channel. "
"From now on, it will DM you."
)
await self.conf.gochannel.set(0)
send = self.bot.send_to_owners
else:
use_embed = (
use_embed and channel.permissions_for(channel.guild.me).embed_links
)
send = channel.send
else:
send = self.bot.send_to_owners
all_repos = cog._repo_manager.get_all_repo_names() # pylint: disable=protected-access
for repo in all_repos:
if not (repo in list(repos.keys())):
repos[repo] = "--default--"
await self.conf.repos.set(repos)
saving_dict = {k: v for k, v in repos.items() if k in all_repos}
for repo_name, commit_saved in saving_dict.items():
repo = cog._repo_manager.get_repo(repo_name) # pylint: disable=protected-access
if not repo:
continue
parsed_url = urlparse(repo.url)
if parsed_url.netloc == "github.com":
url = repo.url + r"/commits/" + repo.branch + ".atom"
response = await self.fetch_feed(url)
try:
commit = response.entries[0]["id"][33:]
chash = "[" + commit + "](" + response.entries[0]["link"] + ")"
cn = response.entries[0]["title"] + " - " + response.entries[0]["author"]
image = response.entries[0]["media_thumbnail"][0]["url"].split("?")[0]
except AttributeError:
continue
else:
url = repo.url + r"/rss/branch/" + repo.branch
response = await self.fetch_feed(url)
try:
commit = response.entries[0]["id"][33:]
chash = "[" + commit + "](" + response.entries[0]["link"] + ")"
cn = response.entries[0]["title"] + " - " + response.entries[0]["author"]
image = await self.fetch_gitea_thumbnail(parsed_url.scheme + "://" + parsed_url.netloc + "/api/v1/repos" + parsed_url.path)
except AttributeError:
continue
saving_dict[repo_name] = commit
if whitelist:
if repo_name not in whitelist:
continue
if repo_name in blacklist:
continue
# CN is used here for backwards compatability, don't want people to get an
# update for each and every one of their cogs when updating this cog
if commit_saved not in (commit, cn, '--default--'):
if use_embed:
e = discord.Embed(
title="Update Checker",
description=f"Update available for repo: {repo.name}",
timestamp=datetime.utcnow(),
color=0x00FF00,
)
e.add_field(name="URL", value=repo.url)
e.add_field(name="Branch", value=repo.branch)
e.add_field(name="Commit", value=cn)
e.add_field(name="Hash", value=chash)
if image is not None:
e.set_thumbnail(url=image)
else:
e = box(
"[Update Checker]"
f" Repo: {repo.name}\n"
f" URL: {repo.url}\n"
f" Commit: {cn}\n"
f" Hash: {commit}\n"
f" Time: {datetime.utcnow()}",
'css'
)
try:
if use_embed:
await send(embed=e)
else:
await send(e)
except discord.Forbidden:
# send_to_owners suppresses Forbidden, logging it to console.
# As a result, this will only happen if a channel was set.
await self.bot.send_to_owners(
"[Update Checker] It appears that I am no longer allowed to send messages to the designated update channel. "
"From now on, it will DM you."
)
if use_embed:
await self.bot.send_to_owners(embed=e)
else:
await self.bot.send_to_owners(e)
await self.conf.gochannel.set(0)
# Was already inaccessible before I got here, so I'm just gonna leave it and look at it later -- Sea
# try:
# await channel.send(
# f"[Update Checker] Update found for repo: {repo.name}. Updating repos..."
# )
# except AttributeError:
# owner = (await self.bot.application_info()).owner
# await owner.send(
# "[Update Checker] It appears that the channel for this cog has been deleted. From now on, it will DM you."
# )
# channel = owner
# await self.conf.gochannel.set(0)
# except discord.errors.Forbidden:
# owner = (await self.bot.application_info()).owner
# await owner.send(
# "[Update Checker] It appears that I am no longer allowed to send messages to the designated update channel. From now on, it will DM you."
# )
# channel = owner
# await self.conf.gochannel.set(0)
# # Just a copy of `[p]cog update`, but without using ctx things
# try:
# installed_cogs = set(await cog.installed_cogs())
# updated = await cog._repo_manager.update_all_repos()
# updated_cogs = set(
# cog for repo in updated for cog in repo.available_cogs
# )
# installed_and_updated = updated_cogs & installed_cogs
# if installed_and_updated:
# await cog._reinstall_requirements(installed_and_updated)
# await cog._reinstall_cogs(installed_and_updated)
# await cog._reinstall_libraries(installed_and_updated)
# cognames = {c.name for c in installed_and_updated}
# message = humanize_list(tuple(map(inline, cognames)))
# except Exception as error:
# exception_log = (
# "Exception while updating repos in Update Checker \n"
# )
# exception_log += "".join(
# traceback.format_exception(
# type(error), error, error.__traceback__
# )
# )
# try:
# await channel.send(
# f"[Update Checker]: Error while updating repos.\n\n{exception_log}"
# )
# except discord.errors.Forbidden:
# pass
# else:
# try:
# await channel.send(
# f"[Update Checker]: Ran cog update. Updated cogs: {message}"
# )
# except discord.errors.Forbidden:
# pass
await asyncio.sleep(1)
await self.conf.repos.set(saving_dict)
async def fetch_feed(self, url: str):
# Thank's to Sinbad's rss cog after which I copied this
timeout = aiohttp.client.ClientTimeout(total=15)
try:
async with self.session.get(url, timeout=timeout) as response:
data = await response.read()
except (aiohttp.ClientError, asyncio.TimeoutError):
return None
ret = feedparser.parse(data)
if ret.bozo:
return None
return ret
async def fetch_gitea_thumbnail(self, url: str) -> str:
timeout = aiohttp.client.ClientTimeout(total=15)
try:
async with self.session.get(url, timeout=timeout) as response:
data = await response.json()
except (aiohttp.ClientError, asyncio.TimeoutError):
return None
return data['avatar_url']
@commands.is_owner()
@commands.group(name="cogupdater", aliases=["cu"])
async def update(self, ctx: commands.Context):
"""Group command for controlling the update checker cog."""
@commands.is_owner()
@update.command()
async def auto(self, ctx: commands.Context):
"""Changes automatic cog updates to the opposite setting."""
# Was already inaccessible before I got here, so I'm just gonna leave it and look at it later -- Sea
# auto = await self.conf.auto()
# await self.conf.auto.set(not auto)
# status = "disabled" if auto else "enabled"
# await ctx.send(f"Auto cog updates are now {status}")
await ctx.send(
"This command is disabled for the time being. Cog updates will not run automatically, however notifications will still send."
)
@commands.is_owner()
@update.command()
async def channel(self, ctx: commands.Context, channel: discord.TextChannel = None):
"""
Sets a channel for update messages to go to.
If argument is not supplied, it will be sent to the default notifications channel(s) specified in `[p]set ownernotifications`.
By default, this goes to owner DMs.
"""
if channel:
await self.conf.gochannel.set(channel.id)
await ctx.send(f"Update messages will now be sent to {channel.mention}")
else:
await self.conf.gochannel.set(0)
await ctx.send("Update messages will now be DMed to you.")
@commands.is_owner()
@update.command()
async def settings(self, ctx: commands.Context):
"""See settings for the Update Checker cog.
Right now, this shows whether the bot updates cogs automatically and what channel logs are sent to.
"""
auto = await self.conf.auto()
channel = await self.conf.gochannel()
embed = await self.conf.embed()
if embed:
e = discord.Embed(title="Update Checker Settings", color=0x00FF00)
e.add_field(name="Automatic Cog Updates", value=str(auto))
if channel == 0:
channel = "Direct Messages"
else:
channel = self.bot.get_channel(channel).name
if channel is None:
channel = "Unknown"
e.add_field(name="Update Channel", value=channel)
await ctx.send(embed=e)
else:
if channel == 0:
channel = "Direct Messages"
else:
channel = self.bot.get_channel(channel).name
if channel is None:
channel = "Unknown"
message = (
"```css\n"
"[Update Checker settings]"
"``````css\n"
f"[Automatic Cog Updates]: {str(auto)}\n"
f" [Update Channel]: {channel}"
"```"
)
await ctx.send(message)
@commands.is_owner()
@update.command()
async def embed(self, ctx: commands.Context):
"""Toggles whether to use embeds or colorful codeblock messages when sending an update."""
c = await self.conf.embed()
await self.conf.embed.set(not c)
word = "disabled" if c else "enabled"
await ctx.send(f"Embeds are now {word}")
@commands.is_owner()
@update.group(name="list")
async def whiteblacklist(self, ctx: commands.Context):
"""Whitelist/blacklist certain repositories from which to receive updates."""
if ctx.invoked_subcommand is None:
data = await self.conf.all()
whitelist = data["whitelist"]
blacklist = data["blacklist"]
await ctx.send(
f"Whitelisted: {humanize_list(tuple(map(inline, whitelist or ['None'])))}\nBlacklisted: {humanize_list(tuple(map(inline, blacklist or ['None'])))}"
)
@whiteblacklist.group()
async def whitelist(self, ctx: commands.Context):
"""Whitelist certain repos from which to receive updates."""
@whitelist.command(name="add")
async def whitelistadd(self, ctx: commands.Context, *repos: Repo):
"""Add repos to the whitelist"""
data = await self.conf.whitelist()
ds = set(data)
ns = {r.name for r in repos}
ss = ds | ns
await self.conf.whitelist.set(list(ss))
await ctx.send(f"Whitelist update successful: {humanize_list(tuple(map(inline, ss)))}")
@whitelist.command(name="remove")
async def whitelistremove(self, ctx: commands.Context, *repos: Repo):
"""Remove repos from the whitelist"""
data = await self.conf.whitelist()
ds = set(data)
ns = {r.name for r in repos}
ss = ds - ns
await self.conf.whitelist.set(list(ss))
await ctx.send(
f"Whitelist update successful: {humanize_list(tuple(map(inline, ss or ['None'])))}"
)
@whitelist.command(name="clear")
async def whitelistclear(self, ctx: commands.Context):
"""Removes all repos from the whitelist"""
await self.conf.whitelist.set([])
await ctx.send("Whitelist update successful")
@whiteblacklist.group()
async def blacklist(self, ctx: commands.Context):
"""Blacklist certain repos from which to receive updates."""
@blacklist.command(name="add")
async def blacklistadd(self, ctx: commands.Context, *repos: Repo):
"""Add repos to the blacklist"""
data = await self.conf.blacklist()
ds = set(data)
ns = {r.name for r in repos}
ss = ds | ns
await self.conf.blacklist.set(list(ss))
await ctx.send(f"Backlist update successful: {humanize_list(tuple(map(inline, ss)))}")
@blacklist.command(name="remove")
async def blacklistremove(self, ctx: commands.Context, *repos: Repo):
"""Remove repos from the blacklist"""
data = await self.conf.blacklist()
ds = set(data)
ns = {r.name for r in repos}
ss = ds - ns
await self.conf.blacklist.set(list(ss))
await ctx.send(
f"Blacklist update successful: {humanize_list(tuple(map(inline, ss or ['None'])))}"
)
@blacklist.command(name="clear")
async def blacklistclear(self, ctx: commands.Context):
"""Removes all repos from the blacklist"""
await self.conf.blacklist.set([])
await ctx.send("Blacklist update successful")
@commands.is_owner()
@update.group(name="task")
async def _group_update_task(self, ctx: commands.Context):
"""View the status of the task (the one checking for updates)."""
@_group_update_task.command()
async def status(self, ctx: commands.Context):
"""Get the current status of the update task."""
message = "Task is currently "
cancelled = self.task.cancelled()
if cancelled:
message += "canceled."
else:
done = self.task.done()
if done:
message += "done."
else:
message += "running."
try:
self.task.exception()
except asyncio.exceptions.InvalidStateError:
message += " No error has been encountered."
else:
message += f" An error has been encountered. Please run `{ctx.prefix}cogupdater task error` and report it to SeaswimmerTheFsh (seasw.) on the help server."
await ctx.send(message)
@_group_update_task.command()
async def error(self, ctx: commands.Context):
"""Gets the latest error of the update task."""
try:
e = self.task.exception()
except asyncio.exceptions.InvalidStateError:
message = "No error has been encountered."
else:
ex = traceback.format_exception(type(e), e, e.__traceback__)
message = "An error has been encountered:" + box("".join(ex), "py")
await ctx.send(message)