import asyncio import pathlib import re import typing import aiosqlite import discord from discord.ext import commands class McDataBase: def __init__(self): self.db = pathlib.Path.home() / ".cache" / "lcc-bot" / "McDataBase.db" self._conn: typing.Optional[aiosqlite.Connection] = None async def init_db(self): if self._conn: conn = self._conn await conn.execute( """ CREATE TABLE IF NOT EXISTS breaks ( user_id INTEGER PRIMARY KEY, since FLOAT NOT NULL ); """ ) await conn.execute( """ CREATE TABLE IF NOT EXISTS cooldowns ( user_id INTEGER PRIMARY KEY, expires FLOAT NOT NULL ); """ ) async def get_break(self, user_id: int) -> typing.Optional[tuple[float]]: async with self._conn.execute( """ SELECT since FROM breaks WHERE user_id = ?; """, (user_id,) ) as cursor: return await cursor.fetchone() async def set_break(self, user_id: int, since: float) -> None: await self._conn.execute( """ INSERT INTO breaks (user_id, since) VALUES (?, ?) ON CONFLICT(user_id) DO UPDATE SET since = excluded.since """, (user_id, since) ) async def remove_break(self, user_id: int) -> None: now = discord.utils.utcnow().timestamp() await self._conn.execute( """ DELETE FROM breaks WHERE user_id = ?; """, (user_id,) ) await self.set_cooldown(user_id, now) async def get_cooldown(self, user_id: int) -> typing.Optional[tuple[float]]: async with self._conn.execute( """ SELECT expires FROM cooldowns WHERE user_id = ?; """, (user_id,) ) as cursor: return await cursor.fetchone() async def set_cooldown(self, user_id: int, expires: float) -> None: await self._conn.execute( """ INSERT INTO cooldowns (user_id, expires) VALUES (?, ?) ON CONFLICT(user_id) DO UPDATE SET expires = excluded.expires; """, (user_id, expires) ) async def remove_cooldown(self, user_id: int) -> None: await self._conn.execute( """ DELETE FROM cooldowns WHERE user_id = ?; """, (user_id,) ) async def __aenter__(self) -> "McDataBase": self._conn = await aiosqlite.connect(self.db) await self.init_db() return self async def __aexit__(self, exc_type, exc_val, exc_tb): await self._conn.commit() await self._conn.close() self._conn = None class McDonaldsCog(commands.Cog): def __init__(self, bot): = bot self.lock = asyncio.Lock() @commands.Cog.listener() async def on_message(self, message: discord.Message): author = me = if message.guild else if not return async with self.lock: NIGHTMARE_REGEX = re.compile(r"[|\s]*(?P[^|]+)(#0)?[|\s]*:( bypassed.*)?") if m := NIGHTMARE_REGEX.match(message.content): username = member = discord.utils.get(message.guild.members, name=username) if member: author = member async with McDataBase() as db: if (last_info := await db.get_break( is not None: if message.content.upper() != "MCDONALDS!": await message.delete() if (message.created_at.timestamp() - last_info[0]) > 10: await f"{} Please say `MCDONALDS!` to end commercial.", delete_after=30 ) await db.set_break(, message.created_at.timestamp()) else: await db.remove_break( await message.reply( "Thank you. You may now resume your activity.", delete_after=120 ) @commands.user_command(name="Commercial Break") async def commercial_break(self, ctx: discord.ApplicationContext, member: discord.Member): await ctx.defer(ephemeral=True) if not return await ctx.respond("I don't have permission to manage messages in this channel.", ephemeral=True) if or member == ctx.user: return await ctx.respond("No.", ephemeral=True) async with McDataBase() as db: if await db.get_break( is not None: await ctx.respond(f"{member.mention} is already in a commercial break.") return elif (cooldown := await db.get_cooldown( is not None: expires = cooldown[0] + 300 if expires > discord.utils.utcnow().timestamp(): await ctx.respond( f"{member.mention} is not due another ad break yet. Their next commercial break will start " f" at the earliest." ) return else: await db.remove_cooldown( await db.set_break(, discord.utils.utcnow().timestamp()) await ctx.send( f"{member.mention} Commercial break! Please say `MCDONALDS!` to end commercial.\n" f"*This commercial break is sponsored by {ctx.user.mention}.*", delete_after=300, allowed_mentions=discord.AllowedMentions(users=True, roles=False, everyone=False) ) await ctx.respond("Commercial break started.", ephemeral=True) await ctx.delete(delay=120) def setup(bot): bot.add_cog(McDonaldsCog(bot))