Source code for jishaku.shim.paginator_170

# -*- coding: utf-8 -*-

"""
jishaku.paginators (shim for discord.py 1.7.x)
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

Paginator-related tools and interfaces for Jishaku.

:copyright: (c) 2021 Devon (Gorialis) R
:license: MIT, see LICENSE for more details.

"""

import asyncio

import discord
from discord.ext import commands

from jishaku.shim.paginator_base import EMOJI_DEFAULT


[docs]class PaginatorInterface: # pylint: disable=too-many-instance-attributes """ A message and reaction based interface for paginators. This allows users to interactively navigate the pages of a Paginator, and supports live output. An example of how to use this with a standard Paginator: .. code:: python3 from discord.ext import commands from jishaku.paginators import PaginatorInterface # In a command somewhere... # Paginators need to have a reduced max_size to accommodate the extra text added by the interface. paginator = commands.Paginator(max_size=1900) # Populate the paginator with some information for line in range(100): paginator.add_line(f"Line {line + 1}") # Create and send the interface. # The 'owner' field determines who can interact with this interface. If it's None, anyone can use it. interface = PaginatorInterface(ctx.bot, paginator, owner=ctx.author) await interface.send_to(ctx) # send_to creates a task and returns control flow. # It will raise if the interface can't be created, e.g., if there's no reaction permission in the channel. # Once the interface has been sent, line additions have to be done asynchronously, so the interface can be updated. await interface.add_line("My, the Earth sure is full of things!") # You can also check if it's closed using the 'closed' property. if not interface.closed: await interface.add_line("I'm still here!") """ def __init__(self, bot: commands.Bot, paginator: commands.Paginator, **kwargs): if not isinstance(paginator, commands.Paginator): raise TypeError("paginator must be a commands.Paginator instance") self._display_page = 0 self.bot = bot self.message = None self.paginator = paginator self.owner = kwargs.pop("owner", None) self.emojis = kwargs.pop("emoji", EMOJI_DEFAULT) self.timeout = kwargs.pop("timeout", 7200) self.delete_message = kwargs.pop("delete_message", False) self.sent_page_reactions = False self.task: asyncio.Task = None self.send_lock: asyncio.Event = asyncio.Event() self.close_exception: Exception = None if self.page_size > self.max_page_size: raise ValueError(f"Paginator passed has too large of a page size for this interface. " f"({self.page_size} > {self.max_page_size})") @property def pages(self): """ Returns the paginator's pages without prematurely closing the active page. """ # protected access has to be permitted here to not close the paginator's pages # pylint: disable=protected-access paginator_pages = list(self.paginator._pages) if len(self.paginator._current_page) > 1: paginator_pages.append("\n".join(self.paginator._current_page) + "\n" + (self.paginator.suffix or "")) # pylint: enable=protected-access return paginator_pages @property def page_count(self): """ Returns the page count of the internal paginator. """ return len(self.pages) @property def display_page(self): """ Returns the current page the paginator interface is on. """ self._display_page = max(0, min(self.page_count - 1, self._display_page)) return self._display_page @display_page.setter def display_page(self, value): """ Sets the current page the paginator is on. Automatically pushes values inbounds. """ self._display_page = max(0, min(self.page_count - 1, value)) max_page_size = 2000 @property def page_size(self) -> int: """ A property that returns how large a page is, calculated from the paginator properties. If this exceeds `max_page_size`, an exception is raised upon instantiation. """ page_count = self.page_count return self.paginator.max_size + len(f"\nPage {page_count}/{page_count}") @property def send_kwargs(self) -> dict: """ A property that returns the kwargs forwarded to send/edit when updating the page. As this must be compatible with both `discord.TextChannel.send` and `discord.Message.edit`, it should be a dict containing 'content', 'embed' or both. """ display_page = self.display_page page_num = f"\nPage {display_page + 1}/{self.page_count}" content = self.pages[display_page] + page_num return {"content": content}
[docs] async def add_line(self, *args, **kwargs): """ A proxy function that allows this PaginatorInterface to remain locked to the last page if it is already on it. """ display_page = self.display_page page_count = self.page_count self.paginator.add_line(*args, **kwargs) new_page_count = self.page_count if display_page + 1 == page_count: # To keep position fixed on the end, update position to new last page and update message. self._display_page = new_page_count # Unconditionally set send lock to try and guarantee page updates on unfocused pages self.send_lock.set()
[docs] async def send_to(self, destination: discord.abc.Messageable): """ Sends a message to the given destination with this interface. This automatically creates the response task for you. """ self.message = await destination.send(**self.send_kwargs) # add the close reaction await self.message.add_reaction(self.emojis.close) self.send_lock.set() if self.task: self.task.cancel() self.task = self.bot.loop.create_task(self.wait_loop()) # if there is more than one page, and the reactions haven't been sent yet, send navigation emotes if not self.sent_page_reactions and self.page_count > 1: await self.send_all_reactions() return self
[docs] async def send_all_reactions(self): """ Sends all reactions for this paginator, if any are missing. This method is generally for internal use only. """ for emoji in filter(None, self.emojis): try: await self.message.add_reaction(emoji) except discord.NotFound: # the paginator has probably already been closed break self.sent_page_reactions = True
@property def closed(self): """ Is this interface closed? """ if not self.task: return False return self.task.done()
[docs] async def send_lock_delayed(self): """ A coroutine that returns 1 second after the send lock has been released This helps reduce release spam that hits rate limits quickly """ gathered = await self.send_lock.wait() self.send_lock.clear() await asyncio.sleep(1) return gathered
[docs] async def wait_loop(self): # pylint: disable=too-many-branches, too-many-statements """ Waits on a loop for reactions to the message. This should not be called manually - it is handled by `send_to`. """ start, back, forward, end, close = self.emojis def check(payload: discord.RawReactionActionEvent): """ Checks if this reaction is related to the paginator interface. """ owner_check = not self.owner or payload.user_id == self.owner.id emoji = payload.emoji if isinstance(emoji, discord.PartialEmoji) and emoji.is_unicode_emoji(): emoji = emoji.name tests = ( owner_check, payload.message_id == self.message.id, emoji, emoji in self.emojis, payload.user_id != self.bot.user.id, ) return all(tests) task_list = [ self.bot.loop.create_task(coro) for coro in { self.bot.wait_for("raw_reaction_add", check=check), self.bot.wait_for("raw_reaction_remove", check=check), self.send_lock_delayed(), } ] try: # pylint: disable=too-many-nested-blocks last_kwargs = None while not self.bot.is_closed(): done, _ = await asyncio.wait(task_list, timeout=self.timeout, return_when=asyncio.FIRST_COMPLETED) if not done: raise asyncio.TimeoutError for task in done: task_list.remove(task) payload = task.result() if isinstance(payload, discord.RawReactionActionEvent): emoji = payload.emoji if isinstance(emoji, discord.PartialEmoji) and emoji.is_unicode_emoji(): emoji = emoji.name if emoji == close: await self.message.delete() return if emoji == start: self._display_page = 0 elif emoji == end: self._display_page = self.page_count - 1 elif emoji == back: self._display_page -= 1 elif emoji == forward: self._display_page += 1 if payload.event_type == "REACTION_ADD": task_list.append(self.bot.loop.create_task(self.bot.wait_for("raw_reaction_add", check=check))) elif payload.event_type == "REACTION_REMOVE": task_list.append(self.bot.loop.create_task(self.bot.wait_for("raw_reaction_remove", check=check))) else: # Send lock was released task_list.append(self.bot.loop.create_task(self.send_lock_delayed())) if not self.sent_page_reactions and self.page_count > 1: self.bot.loop.create_task(self.send_all_reactions()) self.sent_page_reactions = True # don't spawn any more tasks if self.send_kwargs != last_kwargs: try: await self.message.edit(**self.send_kwargs) except discord.NotFound: # something terrible has happened return last_kwargs = self.send_kwargs except (asyncio.CancelledError, asyncio.TimeoutError) as exception: self.close_exception = exception if self.bot.is_closed(): # Can't do anything about the messages, so just close out to avoid noisy error return if self.delete_message: return await self.message.delete() for emoji in filter(None, self.emojis): try: await self.message.remove_reaction(emoji, self.bot.user) except (discord.Forbidden, discord.NotFound): pass finally: for task in task_list: task.cancel()
[docs]class PaginatorEmbedInterface(PaginatorInterface): """ A subclass of :class:`PaginatorInterface` that encloses content in an Embed. """ def __init__(self, *args, **kwargs): self._embed = kwargs.pop("embed", None) or discord.Embed() super().__init__(*args, **kwargs) @property def send_kwargs(self) -> dict: display_page = self.display_page self._embed.description = self.pages[display_page] self._embed.set_footer(text=f"Page {display_page + 1}/{self.page_count}") return {"embed": self._embed} max_page_size = 2048 @property def page_size(self) -> int: return self.paginator.max_size