2025-01-25 18:51:27 -05:00
|
|
|
from asyncio import run_coroutine_threadsafe
|
|
|
|
from pathlib import Path
|
2025-01-25 19:25:29 -05:00
|
|
|
from typing import Sequence
|
2025-01-25 18:51:27 -05:00
|
|
|
|
|
|
|
from red_commons.logging import RedTraceLogger, getLogger
|
|
|
|
from redbot.core import commands
|
|
|
|
from redbot.core.bot import Red
|
|
|
|
from redbot.core.core_commands import CoreLogic
|
|
|
|
from redbot.core.utils.chat_formatting import bold, humanize_list
|
2025-01-25 19:25:29 -05:00
|
|
|
from watchdog.events import FileSystemEvent, FileSystemMovedEvent, RegexMatchingEventHandler
|
2025-01-25 18:51:27 -05:00
|
|
|
from watchdog.observers import Observer
|
|
|
|
|
|
|
|
|
|
|
|
class HotReload(commands.Cog):
|
|
|
|
"""Automatically reload cogs in local cog paths on file change."""
|
|
|
|
|
|
|
|
__author__ = ["[cswimr](https://www.coastalcommits.com/cswimr)"]
|
|
|
|
__git__ = "https://www.coastalcommits.com/cswimr/SeaCogs"
|
2025-01-26 00:46:00 +00:00
|
|
|
__version__ = "1.1.2"
|
2025-01-25 18:51:27 -05:00
|
|
|
__documentation__ = "https://seacogs.coastalcommits.com/hotreload/"
|
|
|
|
|
|
|
|
def __init__(self, bot: Red) -> None:
|
|
|
|
super().__init__()
|
|
|
|
self.bot: Red = bot
|
|
|
|
self.logger: RedTraceLogger = getLogger(name="red.SeaCogs.HotReload")
|
|
|
|
self.observer = None
|
|
|
|
watchdog_loggers = [getLogger(name="watchdog.observers.inotify_buffer")]
|
|
|
|
for watchdog_logger in watchdog_loggers:
|
|
|
|
watchdog_logger.setLevel("INFO") # SHUT UP!!!!
|
|
|
|
|
2025-01-26 00:46:00 +00:00
|
|
|
async def cog_load(self) -> None:
|
2025-01-25 18:51:27 -05:00
|
|
|
"""Start the observer when the cog is loaded."""
|
|
|
|
self.bot.loop.create_task(self.start_observer())
|
|
|
|
|
2025-01-26 00:46:00 +00:00
|
|
|
async def cog_unload(self) -> None:
|
2025-01-25 18:51:27 -05:00
|
|
|
"""Stop the observer when the cog is unloaded."""
|
|
|
|
if self.observer:
|
|
|
|
self.observer.stop()
|
|
|
|
self.observer.join()
|
|
|
|
self.logger.info("Stopped observer. No longer watching for file changes.")
|
|
|
|
|
|
|
|
def format_help_for_context(self, ctx: commands.Context) -> str:
|
|
|
|
pre_processed = super().format_help_for_context(ctx) or ""
|
|
|
|
n = "\n" if "\n\n" not in pre_processed else ""
|
|
|
|
text = [
|
|
|
|
f"{pre_processed}{n}",
|
|
|
|
f"{bold('Cog Version:')} [{self.__version__}]({self.__git__})",
|
|
|
|
f"{bold('Author:')} {humanize_list(self.__author__)}",
|
|
|
|
f"{bold('Documentation:')} {self.__documentation__}",
|
|
|
|
]
|
|
|
|
return "\n".join(text)
|
|
|
|
|
|
|
|
async def get_paths(self) -> tuple[Path]:
|
|
|
|
"""Retrieve user defined paths."""
|
|
|
|
cog_manager = self.bot._cog_mgr
|
|
|
|
cog_paths = await cog_manager.user_defined_paths()
|
|
|
|
return (Path(path) for path in cog_paths)
|
|
|
|
|
|
|
|
async def start_observer(self) -> None:
|
|
|
|
"""Start the observer to watch for file changes."""
|
|
|
|
self.observer = Observer()
|
|
|
|
paths = await self.get_paths()
|
|
|
|
for path in paths:
|
|
|
|
self.observer.schedule(event_handler=HotReloadHandler(bot=self.bot, path=path), path=path, recursive=True)
|
|
|
|
self.observer.start()
|
|
|
|
self.logger.info("Started observer. Watching for file changes.")
|
|
|
|
|
|
|
|
|
|
|
|
class HotReloadHandler(RegexMatchingEventHandler):
|
|
|
|
"""Handler for file changes."""
|
|
|
|
|
|
|
|
def __init__(self, bot: Red, path: Path) -> None:
|
|
|
|
super().__init__(regexes=[r".*\.py$"])
|
|
|
|
self.bot: Red = bot
|
|
|
|
self.path: Path = path
|
|
|
|
self.logger: RedTraceLogger = getLogger(name="red.SeaCogs.HotReload.Observer")
|
|
|
|
|
2025-01-25 19:25:29 -05:00
|
|
|
def on_any_event(self, event: FileSystemEvent) -> None:
|
|
|
|
"""Handle filesystem events."""
|
2025-01-25 18:51:27 -05:00
|
|
|
if event.is_directory:
|
|
|
|
return
|
|
|
|
|
2025-01-25 19:25:29 -05:00
|
|
|
allowed_events = ("moved", "deleted", "created", "modified")
|
|
|
|
if event.event_type not in allowed_events:
|
|
|
|
return
|
|
|
|
|
|
|
|
relative_src_path = Path(event.src_path).relative_to(self.path)
|
|
|
|
src_package_name = relative_src_path.parts[0]
|
|
|
|
cogs_to_reload = [src_package_name]
|
|
|
|
|
|
|
|
if isinstance(event, FileSystemMovedEvent):
|
|
|
|
dest = f" to {event.dest_path}"
|
|
|
|
relative_dest_path = Path(event.dest_path).relative_to(self.path)
|
2025-01-26 00:31:02 +00:00
|
|
|
dest_package_name = relative_dest_path.parts[0]
|
|
|
|
if dest_package_name != src_package_name:
|
|
|
|
cogs_to_reload.append(dest_package_name)
|
2025-01-25 19:25:29 -05:00
|
|
|
else:
|
|
|
|
dest = ""
|
|
|
|
|
|
|
|
self.logger.info(f"File {event.src_path} has been {event.event_type}{dest}.")
|
|
|
|
|
|
|
|
run_coroutine_threadsafe(self.reload_cogs(cogs_to_reload), loop=self.bot.loop)
|
|
|
|
|
|
|
|
async def reload_cogs(self, cog_names: Sequence[str]) -> None:
|
2025-01-25 18:51:27 -05:00
|
|
|
"""Reload modified cog."""
|
|
|
|
core_logic = CoreLogic(bot=self.bot)
|
2025-01-25 19:25:29 -05:00
|
|
|
self.logger.info(f"Reloading cogs: {humanize_list(cog_names, style='unit')}")
|
|
|
|
await core_logic._reload(pkg_names=cog_names)
|
|
|
|
self.logger.info(f"Reloaded cogs: {humanize_list(cog_names, style='unit')}")
|