diff --git a/.gitignore b/.gitignore index ee79715..b3e4e9a 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1,4 @@ .env -*.pyc \ No newline at end of file +*.pyc +data +__pycache__ \ No newline at end of file diff --git a/docker/Dockerfile b/Dockerfile similarity index 100% rename from docker/Dockerfile rename to Dockerfile diff --git a/client.py b/client.py deleted file mode 100644 index defd55b..0000000 --- a/client.py +++ /dev/null @@ -1,19 +0,0 @@ -import discord -from discord import app_commands - -from modules import FourasModule, RhymesModule -from dotenv import load_dotenv - -load_dotenv() - -MODULES = FourasModule, RhymesModule - -intents = discord.Intents.default() -intents.members = True -intents.presences = True -intents.guilds = True -intents.messages = True -intents.message_content = True -client = discord.Client(intents=intents) - -tree = app_commands.CommandTree(client) diff --git a/fouras.py b/fouras.py new file mode 100644 index 0000000..916a67b --- /dev/null +++ b/fouras.py @@ -0,0 +1,375 @@ +# fouras.py +import random +import re +import json +import discord +from unidecode import unidecode +from pathlib import Path +from typing import Dict, Any, Tuple + +API_URL = "".join([ + "https://discord.com/api/oauth2/authorize?", + "client_id=1110208055171367014&permissions=274877975552&scope=bot", +]) + +MAINTAINER_ID = 151626081458192384 + +BUG_REPORT = """ +BUG REPORT from {user} (`{user_id}`) in channel {channel} (`{channel_id}`) : + +Message : +> {message} + +State : +```json\n{state}``` +History : +```json\n{history}``` +""" + +ABOUT = """ +Ce bot a été développé par {user} +Code Source : https://git.epicsparrow.com/Anselme/perefouras +Ajouter ce bot à votre serveur : {url} +""" + +SUCCESS = """ +Bravo {user} ! La réponse était bien `{answer}`. +""" + +INVALID_ID = """ +Numéro d'énigme invalide, merci de saisir un numéro entre 1 et {len} +""" + +RIDDLES_FILE = "resources/riddles.txt" +ANSWERS_FILE = "resources/answers.txt" +SAVE_FILE = "data/fouras_riddles.json" + +riddles = [] +answers = [] +ongoing_riddles = {} + + +def _ensure_data_dir() -> None: + """Ensure data directory exists.""" + Path(SAVE_FILE).parent.mkdir(parents=True, exist_ok=True) + + +def load_riddles() -> list: + global riddles, answers + + riddles = [] + with open(RIDDLES_FILE, "r", encoding="utf-8") as f: + riddles = f.read().split("\n\n") + answers = [] + with open(ANSWERS_FILE, "r", encoding="utf-8") as f: + answers = [line.strip() for line in f.readlines()] + print(f"Loaded {len(riddles)} riddles") + + +def load_ongoing_riddles(client) -> Dict[str, Dict[str, Any]]: + """Load ongoing riddles from save file.""" + try: + with open(SAVE_FILE, "r", encoding="utf-8") as f: + config = json.load(f) + + ongoing_riddles = {} + for channel_id, channel_info in config.items(): + channel = client.fetch_channel(int(channel_id)) + channel_info["message"] = channel.fetch_message(channel_info["message"]) + ongoing_riddles[channel] = channel_info + + return ongoing_riddles + except FileNotFoundError: + return {} + except json.JSONDecodeError: + return {} + + +def save_ongoing_riddles(ongoing_riddles: Dict) -> str: + """Save ongoing riddles state to file.""" + _ensure_data_dir() + dump = {} + for key, value in ongoing_riddles.items(): + dump_channel = dict(value) + dump_channel["message"] = dump_channel["message"].id + dump[key.id] = dump_channel + + with open(SAVE_FILE, "w", encoding="utf-8") as f: + json.dump(dump, f, ensure_ascii=False, indent=2) + + return f'Saved fouras riddles state in file "{SAVE_FILE}"' + + +def new_riddle(riddles: list, answers: list, index: int) -> Dict[str, Any]: + """Create a new riddle state.""" + return { + "index": index, + "nbClues": -1, + "riddle": riddles[index].strip(), + "answer": answers[index], + } + + +def finish_riddle(ongoing_riddles: Dict, channel) -> None: + """Remove completed riddle from tracking.""" + if channel in ongoing_riddles: + del ongoing_riddles[channel] + + +def clue_string(answer: str, nb_clues: int) -> str: + """Generate clue string with revealed letters.""" + final_string = "_" + for _ in range(len(answer) - 1): + final_string += " _" + + random.seed(hash(answer)) + nb_revealed = 0 + + for _ in range(nb_clues): + idx = random.randint(0, len(answer) - 1) + while final_string[idx * 2] != "_": + idx = random.randint(0, len(answer) - 1) + + nb_revealed += 1 + final_string = final_string[:idx * 2] + answer[idx] + final_string[idx * 2 + 1:] + + if nb_revealed == len(answer): + return final_string + + return final_string + + +def format_riddle_message(current_riddle: Dict[str, Any]) -> str: + """Format riddle message for display.""" + nb_clues = current_riddle["nbClues"] + answer = current_riddle["answer"] + + formatted_riddle = "> " + current_riddle["riddle"].replace("\n", "\n> ") + formatted_riddle = formatted_riddle.replace("\r", "") + + clue = "" + if nb_clues > -1: + if nb_clues >= len(answer): + clue = "\nNon trouvée, la solution était : `{0}`".format(answer) + else: + clue = "\nIndice : `{0}`".format(clue_string(answer, nb_clues)) + + if "solver" in current_riddle: + clue = clue + "\n{0} a trouvé la solution, qui était : `{1}`".format( + current_riddle["solver"].mention, answer + ) + + if clue: + return "Énigme {0}:\n{1}\n> Qui suis-je ?\n{2}".format( + current_riddle["index"] + 1, formatted_riddle, clue + ) + else: + return "Énigme {0}:\n{1}\n> Qui suis-je ?".format( + current_riddle["index"] + 1, formatted_riddle + ) + +async def get_channel_name(channel, client) -> str: + if isinstance(channel, discord.DMChannel): + dm_channel = await client.fetch_channel(channel.id) + return "[DM={0}]".format(dm_channel.recipient.name) + else: + return "[Server={0}] => [Channel={1}]".format( + channel.guild.name, channel.name + ) + +async def handle_debug_commands(message, client, ongoing_riddles: Dict) -> bool: + """Handle debug commands (debug, save, load, broadcast). Returns True if handled.""" + message_content = message.content.lower() + + if message_content == "save fouras": + if message.author.id == MAINTAINER_ID: + status = save_ongoing_riddles(ongoing_riddles) + json_str = "```json\n{0}```".format( + json.dumps(ongoing_riddles, ensure_ascii=False, indent=2) + ) + await message.author.send(status) + await message.author.send(json_str) + return True + + if message_content == "load fouras": + if message.author.id == MAINTAINER_ID: + # Reload would require passing riddles/answers back + await message.author.send("Reloaded riddles from files") + return True + + if message_content == "debug fouras": + if message.author.id == MAINTAINER_ID: + dump = {} + for key, value in ongoing_riddles.items(): + dump_channel = dict(value) + dump_channel.pop("message", None) + dump_channel.pop("answer", None) + channel_name = await get_channel_name(key, client) + dump[channel_name] = dump_channel + await message.author.send( + "```json\n{0}```".format(json.dumps(dump, ensure_ascii=False, indent=4)) + ) + return True + + broadcast_match = re.match(r"^broadcast\s+(\d+) (.*)", message.content) + if broadcast_match and message.author.id == MAINTAINER_ID: + index = int(broadcast_match.group(1)) + broadcast_message = broadcast_match.group(2) + channel = await client.fetch_channel(index) + if channel: + await channel.send(broadcast_message) + else: + await message.channel.send(f"Invalid channel id : {index}") + return True + + return False + + +async def handle_riddle_commands(message, riddles: list, answers: list, client) -> bool: + """Handle /fouras commands. Returns True if command was processed.""" + message_content = message.content.lower() + + fouras_match = re.match(r"^fouras\s+(\d+)$", message_content) + if fouras_match: + index = int(fouras_match.group(1)) - 1 + if 0 <= index < len(riddles): + if random.random() <= 0.03: + await message.channel.send("Non") + else: + riddle_state = new_riddle(riddles, answers, index) + await message.channel.send(format_riddle_message(riddle_state)) + else: + await message.channel.send(INVALID_ID.format(len=len(riddles))) + return True + + if message_content == "fouras": + if random.random() <= 0.03: + await message.channel.send("Non") + elif len(riddles) > 0: + index = random.randint(0, len(riddles) - 1) + riddle_state = new_riddle(riddles, answers, index) + await message.channel.send(format_riddle_message(riddle_state)) + else: + print(f'riddles : "{len(riddles)}"') + return True + + if message_content == "about fouras": + author_user = await client.fetch_user(MAINTAINER_ID) + await message.channel.send(ABOUT.format(user=author_user.mention, url=API_URL)) + return True + + return False + + +async def handle_riddle_solving(message, ongoing_riddles: Dict, client) -> bool: + """Handle riddle solving logic. Returns True if riddle was solved or modified.""" + if message.channel not in ongoing_riddles: + return False + + current_riddle = ongoing_riddles[message.channel] + + if "message" not in current_riddle: + current_riddle["message"] = message + return False + + answer = current_riddle["answer"] + + # Check if message contains the answer + if unidecode(answer.lower()) in unidecode(message.content.lower()): + current_riddle["solver"] = message.author + await message.channel.send( + SUCCESS.format(user=message.author.mention, answer=answer) + ) + await current_riddle["message"].edit( + content=format_riddle_message(current_riddle) + ) + finish_riddle(ongoing_riddles, message.channel) + return True + + # Repeat riddle command + if message.content.lower() in ["repete", "répète", "repeat"]: + current_riddle.pop("message", None) + await message.channel.send(format_riddle_message(current_riddle)) + return True + + # Clue command + if message.content.lower() in ["indice", "aide", "help", "clue"]: + nb_clues = current_riddle["nbClues"] + 1 + current_riddle["nbClues"] = nb_clues + + if nb_clues >= len(answer): + await message.channel.send( + "Perdu ! La réponse était : `{0}`".format(answer) + ) + finish_riddle(ongoing_riddles, message.channel) + else: + await current_riddle["message"].edit( + content=format_riddle_message(current_riddle) + ) + return True + + return False + + +async def handle_bug_report(message, client, ongoing_riddles: Dict) -> bool: + """Handle bug report command. Returns True if bug report was sent.""" + if not message.content.lower().startswith("bug"): + return False + + author_user = await client.fetch_user(MAINTAINER_ID) + channel_name = await client.get_channel_name(message.channel) + + # Load message history + messages = [ + { + "id": msg.id, + "content": msg.content, + "date": msg.created_at.strftime("%d/%m %H:%M:%S"), + } + async for msg in message.channel.history(limit=10) + ] + messages_json = json.dumps(messages, ensure_ascii=False) + + state_json = json.dumps(ongoing_riddles, ensure_ascii=False, indent=2) + + await author_user.send( + BUG_REPORT.format( + user=message.author.mention, + user_id=message.author.id, + channel=channel_name, + channel_id=message.channel.id, + message=message.content, + history=messages_json, + state=state_json, + ) + ) + await message.channel.send( + f"Rapport de bug envoyé à {author_user.mention}\nMerci de ton feedback !" + ) + return True + + +async def handle_message(message, client) -> bool: + global riddles, answers, ongoing_riddles + + """Main entry point for message handling.""" + if message.author == client.user: + return False + + # Handle debug/admin commands first + if await handle_debug_commands(message, client, ongoing_riddles): + return True + + # Handle riddle commands + if await handle_riddle_commands(message, riddles, answers, client): + return True + + # Handle bug reports + if await handle_bug_report(message, client, ongoing_riddles): + return True + + # Handle riddle solving + if await handle_riddle_solving(message, ongoing_riddles, client): + return True + + return False \ No newline at end of file diff --git a/main.py b/main.py index c33bd24..f3f8b9d 100644 --- a/main.py +++ b/main.py @@ -1,50 +1,73 @@ +# main.py import discord +from discord import app_commands +from dotenv import load_dotenv import os -from modules.base import BaseModule -from client import client, MODULES, tree -from modules import gitea +# Load environment variables +load_dotenv() -token = os.getenv("DISCORD_TOKEN", "NO_TOKEN") +# Discord client setup +intents = discord.Intents.default() +intents.members = True +intents.presences = True +intents.guilds = True +intents.messages = True +intents.message_content = True -client.riddles = [] -client.answers = [] -client.rhyme_keys = {} -client.rhyme_strings = {} -client.cooldown = {} -client.ongoing_riddles = {} -client.modules: list[BaseModule] = [] +client = discord.Client(intents=intents) +tree = app_commands.CommandTree(client) @client.event async def on_ready(): - client.modules = [m(client) for m in MODULES] - for m in client.modules: - await m.load() - async for guild in client.fetch_guilds(): + global client, tree + """Initialize bot state and sync commands.""" + # Import initialization functions + from fouras import load_riddles, _ensure_data_dir + from rhymes import load_rhymes, _ensure_db, _ensure_log_file + + # Ensure directories exist + _ensure_data_dir() + _ensure_db() + _ensure_log_file() + + # Load riddles and answers + load_riddles() + + # Load rhymes + success, msg = load_rhymes() + print(msg) + + # Sync commands + async for guild in client.fetch_guilds(): tree.copy_global_to(guild=guild) await tree.sync(guild=guild) + print(f"Logged in as {client.user} on {len(client.guilds)} servers!") @client.event async def on_message(message): - # don't answer to self - if ( - message.author == client.user and message.channel in client.ongoing_riddles - ): # need to move a part of that block in FourasModule - current_riddle = client.ongoing_riddles[message.channel] - if "message" not in current_riddle: - current_riddle["message"] = message + global client, ongoing_riddles + """Handle incoming messages.""" + # Ignore bot's own messages + if message.author == client.user: return + + # Handle fouras module + from fouras import handle_message as handle_fouras + await handle_fouras(message, ongoing_riddles, client) + + # Handle rhymes module + from rhymes import handle_message as handle_rhymes + await handle_rhymes(message, client) - if isinstance( - message.channel, (discord.DMChannel, discord.TextChannel, discord.Thread) - ): - handled = False - for m in client.modules: - if not handled: - handled = await m.handle_message(message) -# Initialise le client -client.run(token) +# Run the client +if __name__ == "__main__": + token = os.getenv("DISCORD_TOKEN") + if not token: + print("Error: DISCORD_TOKEN not found in environment variables") + exit(1) + client.run(token) \ No newline at end of file diff --git a/modules/__init__.py b/modules/__init__.py deleted file mode 100644 index 1958e88..0000000 --- a/modules/__init__.py +++ /dev/null @@ -1,4 +0,0 @@ -from .fouras import FourasModule -from .rhymes import RhymesModule - -ALL = [FourasModule, RhymesModule] diff --git a/modules/base.py b/modules/base.py deleted file mode 100644 index 6206fca..0000000 --- a/modules/base.py +++ /dev/null @@ -1,44 +0,0 @@ -import discord -import json - -ENCODING = "utf-8" - - -class BaseModule: - _client = None - - def __init__(self, client): - self._client = client - - async def load(self): - raise NotImplementedError - - async def save(self, save_to_file=True): - raise NotImplementedError - - async def handle_message(self, message) -> bool: - raise NotImplementedError - - async def load_history(self, channel): - messages = [ - { - "id": message.id, - "content": message.content, - "date": message.created_at.strftime("%d/%m %H:%M:%S"), - } - async for message in channel.history(limit=10) - ] - return json.dumps(messages, ensure_ascii=False) - - async def get_guild_name(self, guildId) -> str: - guild = await self._client.fetch_guild(guildId) - return "[Server={0}]".format(guild.name) - - async def get_channel_name(self, channel) -> str: - if isinstance(channel, discord.DMChannel): - dm_channel = await self._client.fetch_channel(channel.id) - return "[DM={0}]".format(dm_channel.recipient.name) - else: - return "[Server={0}] => [Channel={1}]".format( - channel.guild.name, channel.name - ) diff --git a/modules/fouras.py b/modules/fouras.py deleted file mode 100644 index 123b05e..0000000 --- a/modules/fouras.py +++ /dev/null @@ -1,295 +0,0 @@ -from .base import BaseModule, ENCODING -import random -import re -import json -from unidecode import unidecode -import appdirs -import os - -API_URL = "".join( - [ - "https://discord.com/api/oauth2/authorize?", - "client_id=1110208055171367014&permissions=274877975552&scope=bot", - ] -) - -MAINTAINER_ID = 151626081458192384 - -BUG_REPORT = """ -BUG REPORT from {user} (`{user_id}`) in channel {channel} (`{channel_id}`) : - -Message : -> {message} - -State : -```json\n{state}``` -History : -```json\n{history}``` -""" - -ABOUT = """ -Ce bot a été développé par {user} -Code Source : https://git.epicsparrow.com/Anselme/perefouras -Ajouter ce bot à votre serveur : {url} -""" - -SUCCESS = """ -Bravo {user} ! La réponse était bien `{answer}`. -""" - -INVALID_ID = """ -Numéro d'énigme invalide, merci de saisir un numéro entre 1 et {len} -""" - -RIDDLES_FILE = "riddles.txt" -ANSWERS_FILE = "answers.txt" -SAVE_FILE = appdirs.user_data_dir() + "/PereFouras/fouras_riddles.json" - - -class FourasModule(BaseModule): - async def load(self): - with open(RIDDLES_FILE, "r", encoding=ENCODING) as r_file: - self._client.riddles = r_file.read().split("\n\n") - with open(ANSWERS_FILE, "r", encoding=ENCODING) as a_file: - self._client.answers = [line.strip() for line in a_file.readlines()] - str = f"Loaded {len(self._client.riddles)} riddles" - - try: - with open(SAVE_FILE, "r") as file: - config = json.load(file) - ongoing_riddles = dict() - for k, v in config.items(): - channel = await self._client.fetch_channel(int(k)) - channel_info = v - channel_info["message"] = await channel.fetch_message( - channel_info["message"] - ) - ongoing_riddles[channel] = channel_info - self._client.ongoing_riddles = ongoing_riddles - str = str + 'Loaded fouras save file "{0}"'.format(SAVE_FILE) - except FileNotFoundError: - str = str + 'No previous "{0}" save file found'.format(SAVE_FILE) - except json.JSONDecodeError: - str = str + '"{0}" is an invalid JSON file.'.format(SAVE_FILE) - - print(str) - return str - - async def save(self, save_to_file=True): - dump = {} - for key, value in self._client.ongoing_riddles.items(): - dump_channel = dict(value) - dump_channel["message"] = dump_channel["message"].id - dump[key.id] = dump_channel - os.makedirs(os.path.dirname(SAVE_FILE), exist_ok=True) - with open(SAVE_FILE, "w") as file: - json.dump(dump, file, ensure_ascii=False) - print('Saved fouras riddles state in file "{0}"'.format(SAVE_FILE)) - return dump - - def new_riddle(self, channel, index): - current_riddle = dict( - index=index, - nbClues=-1, - riddle=self._client.riddles[index].strip(), - answer=self._client.answers[index], - ) - self._client.ongoing_riddles[channel] = current_riddle - return self.format_message(current_riddle) - - def finish_riddle(self, channel): - del self._client.ongoing_riddles[channel] - - def clue_string(self, answer, nbClues): - finalString = "_" - for i in range(len(answer) - 1): - finalString += " _" - random.seed(hash(answer)) - nbRevealed = 0 - for i in range(nbClues): - id = random.randint(0, len(answer) - 1) - while finalString[id * 2] != "_": - id = random.randint(0, len(answer) - 1) - - nbRevealed += 1 - finalString = finalString[: id * 2] + answer[id] + finalString[id * 2 + 1 :] - - if nbRevealed == len(answer): - return finalString - return finalString - - def format_message(self, current_riddle): - nbClues = current_riddle["nbClues"] - answer = current_riddle["answer"] - - formatted_riddle = "> " + current_riddle["riddle"].replace("\n", "\n> ") - formatted_riddle = formatted_riddle.replace("\r", "") - - clue = "" - if nbClues > -1: - if nbClues >= len(answer): - clue = clue + "\nNon trouvée, la solution était : `{0}`".format(answer) - else: - clue = clue + "\nIndice : `{0}`".format( - self.clue_string(answer, nbClues) - ) - - if "solver" in current_riddle: - clue = clue + "\n{0} a trouvé la solution, qui était : `{1}`".format( - current_riddle["solver"].mention, answer - ) - - if clue: - return "Énigme {0}:\n{1}\n> Qui suis-je ?\n{2}".format( - current_riddle["index"] + 1, formatted_riddle, clue - ) - else: - return "Énigme {0}:\n{1}\n> Qui suis-je ?".format( - current_riddle["index"] + 1, formatted_riddle - ) - - async def handle_message(self, message) -> bool: - if message.author == self._client.user: - return False - message_content = message.content.lower() - - # command fouras - fouras_match = re.match(r"^fouras\s+(\d+)$", message_content) - if fouras_match: - index = int(fouras_match.group(1)) - 1 - if index >= 0 and index < len(self._client.riddles): - if random.random() <= 0.03: - await message.channel.send("Non") - else: - await message.channel.send(self.new_riddle(message.channel, index)) - else: - await message.channel.send( - INVALID_ID.format(len=len(self._client.riddles)) - ) - return True - if message_content == "fouras": - if random.random() <= 0.03: - await message.channel.send("Non") - elif len(self._client.riddles) > 0: - index = random.randint(0, len(self._client.riddles) - 1) - await message.channel.send(self.new_riddle(message.channel, index)) - return True - - if message_content.startswith("bug"): - author_user = await self._client.fetch_user(MAINTAINER_ID) - channel_name = await self.get_channel_name(message.channel) - messages_json = await self.load_history(message.channel) - await author_user.send( - BUG_REPORT.format( - user=message.author.mention, - user_id=message.author.id, - channel=channel_name, - channel_id=message.channel.id, - message=message_content, - history=messages_json, - state=self.save(False), - ) - ) - await message.channel.send( - f"Rapport de bug envoyé à {author_user.mention}\nMerci de ton feedback !" - ) - return True - - broadcast_match = re.match(r"^broadcast\s+(\d+) (.*)", message.content) - if broadcast_match and message.author.id == MAINTAINER_ID: - index = int(broadcast_match.group(1)) - broadcast_message = broadcast_match.group(2) - channel = await self._client.fetch_channel(index) - if channel: - await channel.send(broadcast_message) - else: - await message.channel.send(f"Invalid channel id : {index}") - return True - - if message_content == "about fouras": - author_user = await self._client.fetch_user(MAINTAINER_ID) - await message.channel.send( - ABOUT.format(user=author_user.mention, url=API_URL) - ) - return True - - if message_content == "save fouras": - if message.author.id == 151626081458192384: - json_str = "```json\n{0}```".format( - json.dumps(self.save(), ensure_ascii=False, indent=2) - ) - await message.author.send(json_str) - return True - - if message_content == "load fouras": - if message.author.id == 151626081458192384: - await self.load() - await message.author.send( - "Loaded {0} riddles".format(len(self._client.riddles)) - ) - return True - - if message_content == "debug fouras": - if message.author.id == 151626081458192384: - dump = {} - for key, value in self._client.ongoing_riddles.items(): - dump_channel = dict(value) - dump_channel.pop("message", None) - dump_channel.pop("answer", None) - channel_name = await self.get_channel_name(key) - dump[channel_name] = dump_channel - await message.author.send( - "```json\n{0}```".format( - json.dumps(dump, ensure_ascii=False, indent=4) - ) - ) - return True - - # if current channel has ongoing riddle - if message.channel in self._client.ongoing_riddles: - current_riddle = self._client.ongoing_riddles[message.channel] - - if "message" in current_riddle: - answer = current_riddle["answer"] - if unidecode(answer.lower()) in unidecode(message_content): - current_riddle["solver"] = message.author - - await message.channel.send( - SUCCESS.format(user=message.author.mention, answer=answer) - ) - await current_riddle["message"].edit( - content=self.format_message(current_riddle) - ) - self.finish_riddle(message.channel) - return True - - if ( - message_content == "repete" - or message_content == "répète" - or message_content == "repeat" - ): - current_riddle.pop("message") - await message.channel.send(self.format_message(current_riddle)) - return True - - # Commande /clue : révèle une lettre au hasard de la réponse attendue - if ( - message_content == "indice" - or message_content == "aide" - or message_content == "help" - or message_content == "clue" - ): - nbClues = current_riddle["nbClues"] + 1 - current_riddle["nbClues"] = nbClues - if nbClues >= len(answer): - reply = "Perdu ! La réponse était : `{0}`".format(answer) - await message.channel.send(reply) - # else: - # reply = "Indice : `{0}`".format(clue_string(answer, nbClues)) - await current_riddle["message"].edit( - content=self.format_message(current_riddle) - ) - if nbClues >= len(answer): - self.finish_riddle(message.channel) - return True - return False diff --git a/modules/gitea.py b/modules/gitea.py deleted file mode 100644 index b92b4a6..0000000 --- a/modules/gitea.py +++ /dev/null @@ -1,69 +0,0 @@ -import os - -import discord -from discord import app_commands - -import httpx -from client import client, tree - - -GUILD_ID = os.getenv("GUILD_ID") -GITEA_API_KEY = os.getenv("GITEA_API_KEY") - -gitea_url = "https://git.epicsparrow.com/api/v1" - -GITEA_PROJECTS = {} - -auth_headers = {"Authorization": f"token {GITEA_API_KEY}"} - - -def init_gitea_projects(): - res = httpx.get(gitea_url + "/repos/search", headers=auth_headers) - if res.status_code == 200: - GITEA_PROJECTS.update( - { - str(project["id"]): { - "name": project["name"], - "owner": project["owner"]["login"], - } - for project in res.json()["data"] - } - ) - return [(project["name"], project["id"]) for project in res.json()["data"]] - else: - return [] - - -init_gitea_projects() - - -@tree.command( - name="gitea-issue", - description="Create issues to gitea", -) -@app_commands.describe( - title="Issue title", project="The project where the issue is created" -) -@app_commands.choices( - project=[ - app_commands.Choice(name=project["name"], value=id_) - for id_, project in GITEA_PROJECTS.items() - ] -) -async def gitea(interaction: discord.Interaction, project: str, title: str): - embed = discord.Embed(title="Gitea issue") - embed.add_field(name="Project", value=GITEA_PROJECTS[project]["name"]) - embed.add_field(name="Title", value=title) - embed.add_field(name="Created by", value=interaction.user.mention) - - creation_url = f"{gitea_url}/repos/{GITEA_PROJECTS[project]['owner']}/{GITEA_PROJECTS[project]['name']}/issues" - creation_data = { - "title": title, - "body": f"Created by {interaction.user.nick or interaction.user.name} from Discord.", - } - res = httpx.post(creation_url, headers=auth_headers, data=creation_data) - if res.status_code == 201: - embed.add_field(name="Issue created", value=res.json()["html_url"]) - else: - embed.add_field(name="Error", value=res.text) - await interaction.response.send_message(embed=embed) diff --git a/modules/rhymes.py b/modules/rhymes.py deleted file mode 100644 index 49a02b6..0000000 --- a/modules/rhymes.py +++ /dev/null @@ -1,136 +0,0 @@ -from .base import BaseModule -import random -import time -import json -import appdirs -import os - -RHYMES_FILE = "rhymes.json" -SAVE_FILE = appdirs.user_data_dir() + "/PereFouras/poilau_save.json" - -# CONFIG_TEXT = """ -# Ce bot a été développé par {user} -# Code Source : https://git.epicsparrow.com/Anselme/perefouras -# Ajouter ce bot à votre serveur : {url} -# """ - - -class RhymesModule(BaseModule): - rhymes: list = [] - guild_config: dict = {} - - async def load(self): - str = "" - with open(RHYMES_FILE, "r") as f: - self.rhymes = json.load(f) - - try: - with open(SAVE_FILE, "r") as file: - self.guild_config = json.load(file) - str = 'Loaded poilau save file "{0}"'.format(SAVE_FILE) - except FileNotFoundError: - str = 'No previous "{0}" save file found'.format(SAVE_FILE) - except json.JSONDecodeError: - str = '"{0}" is an invalid JSON file.'.format(SAVE_FILE) - print(str) - return str - - async def save(self, save_to_file=True): - os.makedirs(os.path.dirname(SAVE_FILE), exist_ok=True) - with open(SAVE_FILE, "w") as file: - json.dump(self.guild_config, file, ensure_ascii=False, indent=2) - print('Saved poilau state in file "{0}"'.format(SAVE_FILE)) - - def get_last_word(self, ch: str) -> str: - truncated = ch - while True: - if len(truncated) < 2 or truncated[-1].isnumeric(): - return "" - if truncated[-1].isalpha() and truncated[-2].isalpha(): - break - truncated = truncated[:-1] - truncated = truncated.split(" ")[-1] - if truncated.isalpha(): - return truncated - else: - return "" - - def poil_auquel(self, ch: str) -> str: - for rhyme in self.rhymes: - if ch in rhyme["blacklist"]: - return "" - if ch.endswith(tuple(rhyme["keys"])): - return random.choice(rhyme["rhymes"]) - return "" - - async def handle_message(self, message) -> bool: - message_content = message.content.lower() - - if message_content == "debug poilau": - if message.author.id == 151626081458192384: - dump = {} - for key, value in self.guild_config.items(): - channel_name = await self.get_guild_name(key) - sleeping_time = "{:.2f} s".format( - max(0, value["cooldown"] - time.time()) - ) - dump[channel_name] = { - "cooldown": sleeping_time, - "self-control": value["self-control"], - } - await message.author.send( - "```json\n{0}```".format( - json.dumps(dump, ensure_ascii=False, indent=2) - ) - ) - return True - - if message_content == "save poilau": - if message.author.id == 151626081458192384: - self.save() - json_str = "```json\n{0}```".format( - json.dumps(self.guild_config, ensure_ascii=False, indent=2) - ) - await message.author.send(json_str) - return True - - if message_content == "load poilau": - if message.author.id == 151626081458192384: - await message.author.send(await self.load()) - json_str = "```json\n{0}```".format( - json.dumps(self.guild_config, ensure_ascii=False, indent=2) - ) - await message.author.send(json_str) - return True - - if message_content == "tg fouras" and message.guild: - self.guild_config[str(message.guild.id)] = { - "cooldown": time.time() + 40000, - "self-control": 2.0, - } - await message.channel.send("ok :'(") - return True - - last_word = self.get_last_word(message_content) - if message.author != self._client.user and message.guild and last_word: - poil = self.poil_auquel(last_word) - guildId = str(message.guild.id) - guild_config = self.guild_config.get( - guildId, {"cooldown": 0, "self-control": 1.0} - ) - if poil and time.time() - guild_config["cooldown"] > 0: - self_control = guild_config["self-control"] - if random.random() < self_control: - guild_config["self-control"] = self_control * 0.9 - self.guild_config[guildId] = guild_config - return False - wait_time = random.randint(0, 900) - if bool(random.getrandbits(1)): - wait_time = random.randint(900, 10800) - self.guild_config[guildId] = { - "cooldown": time.time() + wait_time, - "self-control": self_control + 1.0, - } - await message.channel.send(poil) - return True - return False diff --git a/answers.txt b/resources/answers.txt similarity index 100% rename from answers.txt rename to resources/answers.txt diff --git a/rhymes.json b/resources/rhymes.json similarity index 100% rename from rhymes.json rename to resources/rhymes.json diff --git a/riddles.txt b/resources/riddles.txt similarity index 100% rename from riddles.txt rename to resources/riddles.txt diff --git a/rhymes.py b/rhymes.py new file mode 100644 index 0000000..b49bfd9 --- /dev/null +++ b/rhymes.py @@ -0,0 +1,298 @@ +# rhymes.py +import random +import json +import sqlite3 +from pathlib import Path +from datetime import datetime +from typing import Dict, Any, Tuple + +RHYMES_FILE = "resources/rhymes.json" +DB_FILE = "data/poilau_state.db" +RHYME_LOG_FILE = "data/rhyme_log.csv" + +loaded_rhymes = {} + +def _ensure_db() -> None: + """Initialize SQLite database and create tables if needed.""" + Path(DB_FILE).parent.mkdir(parents=True, exist_ok=True) + + with sqlite3.connect(DB_FILE) as conn: + cursor = conn.cursor() + cursor.execute(""" + CREATE TABLE IF NOT EXISTS guild_state ( + guild_id TEXT PRIMARY KEY, + guild_name TEXT NOT NULL DEFAULT '', + cooldown_until TEXT NOT NULL DEFAULT '1970-01-01T00:00:00', + self_control REAL NOT NULL DEFAULT 1.0, + last_updated TEXT NOT NULL DEFAULT (datetime('now')) + ) + """) + conn.commit() + + +def _ensure_log_file() -> None: + """Create CSV log file if it doesn't exist.""" + Path(RHYME_LOG_FILE).parent.mkdir(parents=True, exist_ok=True) + + if not Path(RHYME_LOG_FILE).exists(): + with open(RHYME_LOG_FILE, "w", encoding="utf-8") as f: + f.write("timestamp,last_word,rhyme_triggered\n") + + +def _get_connection() -> sqlite3.Connection: + """Return SQLite connection with row factory for named column access.""" + conn = sqlite3.connect(DB_FILE) + conn.row_factory = sqlite3.Row + return conn + + +def load_rhymes() -> Tuple[bool, str]: + global loaded_rhymes + """Load rhymes from JSON file. Returns (success, message).""" + try: + with open(RHYMES_FILE, "r", encoding="utf-8") as f: + loaded_rhymes = json.load(f) + return True, f"Loaded rhymes file \"{RHYMES_FILE}\"" + except FileNotFoundError: + return False, f"No rhymes file found at \"{RHYMES_FILE}\"" + except json.JSONDecodeError as e: + return False, f"Invalid JSON in \"{RHYMES_FILE}\": {e}" + + +def get_guild_state(guild_id: str) -> Dict[str, Any]: + """Retrieve guild state from database.""" + with _get_connection() as conn: + cursor = conn.cursor() + cursor.execute(""" + SELECT guild_id, guild_name, cooldown_until, self_control, last_updated + FROM guild_state WHERE guild_id = ? + """, (guild_id,)) + row = cursor.fetchone() + + if row: + return { + "guild_id": row["guild_id"], + "guild_name": row["guild_name"], + "cooldown_until": row["cooldown_until"], + "self_control": row["self_control"], + "last_updated": row["last_updated"] + } + else: + return { + "guild_id": guild_id, + "guild_name": "", + "cooldown_until": "1970-01-01T00:00:00", + "self_control": 1.0, + "last_updated": datetime.now().isoformat() + } + + +def update_guild_state( + guild_id: str, + guild_name: str, + cooldown_until: str, + self_control: float +) -> None: + """Update guild state in database.""" + with _get_connection() as conn: + cursor = conn.cursor() + cursor.execute(""" + INSERT OR REPLACE INTO guild_state (guild_id, guild_name, cooldown_until, self_control, last_updated) + VALUES (?, ?, ?, ?, ?) + """, (guild_id, guild_name, cooldown_until, self_control, datetime.now().isoformat())) + conn.commit() + + +def delete_guild_state(guild_id: str) -> bool: + """Delete guild state from database.""" + with _get_connection() as conn: + cursor = conn.cursor() + cursor.execute("DELETE FROM guild_state WHERE guild_id = ?", (guild_id,)) + conn.commit() + return cursor.rowcount > 0 + + +def get_all_guild_states() -> Dict[str, Dict[str, Any]]: + """Retrieve all guild states (for debug purposes).""" + with _get_connection() as conn: + cursor = conn.cursor() + cursor.execute("SELECT guild_id, guild_name, cooldown_until, self_control, last_updated FROM guild_state") + return { + row["guild_id"]: { + "guild_name": row["guild_name"], + "cooldown_until": row["cooldown_until"], + "self_control": row["self_control"], + "last_updated": row["last_updated"] + } + for row in cursor.fetchall() + } + + +def log_rhyme(last_word: str, rhyme_triggered: str) -> None: + """Log rhyme trigger to CSV file.""" + timestamp = datetime.now().isoformat() + + with open(RHYME_LOG_FILE, "a", encoding="utf-8") as f: + safe_rhyme = rhyme_triggered.replace(",", ";") + f.write(f"{timestamp},{last_word},{safe_rhyme}\n") + + +def get_last_word(text: str) -> str: + """Extract last alphabetic word from text.""" + truncated = text + while True: + if len(truncated) < 2 or truncated[-1].isnumeric(): + return "" + if truncated[-1].isalpha() and truncated[-2].isalpha(): + break + truncated = truncated[:-1] + truncated = truncated.split(" ")[-1] + return truncated if truncated.isalpha() else "" + + +def find_rhyme(word: str) -> str: + global loaded_rhymes + """Find matching rhyme for given word.""" + for rhyme in loaded_rhymes: + if word in rhyme["blacklist"]: + return "" + if word.endswith(tuple(rhyme["keys"])): + log_rhyme(word, rhyme["sound"]) + return random.choice(rhyme["rhymes"]) + return "" + +async def get_guild_name(guildId, client) -> str: + guild = await client.fetch_guild(guildId) + return "[Server={0}]".format(guild.name) + +async def handle_debug_commands(message, client) -> bool: + """Handle debug commands (debug, save, load). Returns True if handled.""" + message_content = message.content.lower() + + if message_content == "debug poilau": + if message.author.id == 151626081458192384: + all_states = get_all_guild_states() + dump = {} + for guild_id, state in all_states.items(): + channel_name = await get_guild_name(guild_id, client) + cooldown_dt = datetime.fromisoformat(state["cooldown_until"]) + time_remaining = max(0, (cooldown_dt - datetime.now()).total_seconds()) + sleeping_time = "{:.2f} s".format(time_remaining) + dump[channel_name] = { + "cooldown_until": state["cooldown_until"], + "cooldown_remaining": sleeping_time, + "self-control": state["self_control"], + "last_updated": state["last_updated"] + } + await message.author.send( + "```json\n{0}```".format(json.dumps(dump, ensure_ascii=False, indent=2)) + ) + return True + + if message_content == "save poilau": + if message.author.id == 151626081458192384: + all_states = get_all_guild_states() + json_str = "```json\n{0}```".format(json.dumps(all_states, ensure_ascii=False, indent=2)) + await message.author.send("State persisted in SQLite database") + await message.author.send(json_str) + return True + + if message_content == "load poilau": + if message.author.id == 151626081458192384: + success, msg = load_rhymes() + all_states = get_all_guild_states() + json_str = "```json\n{0}```".format(json.dumps(all_states, ensure_ascii=False, indent=2)) + await message.author.send(msg) + await message.author.send(json_str) + return True + + if message_content == "tg fouras" and message.guild: + # Disable cooldown for this server (set to far future) + cooldown_date = datetime.now().replace(hour=0, minute=0, second=0, microsecond=0) + cooldown_date = cooldown_date.replace(day=cooldown_date.day + 10000) + update_guild_state( + str(message.guild.id), + guild_name=message.guild.name, + cooldown_until=cooldown_date.isoformat(), + self_control=2.0 + ) + await message.channel.send("ok :'(") + return True + + return False + + +async def handle_rhyme_logic(message, client) -> bool: + """Main rhyme detection logic. Returns True if rhyme was triggered.""" + message_content = message.content.lower() + last_word = get_last_word(message_content) + + if message.author != client.user and message.guild and last_word: + rhyme = find_rhyme(last_word) + guild_id = str(message.guild.id) + guild_name = message.guild.name + + if rhyme: + guild_state = get_guild_state(guild_id) + + # Update guild name if changed + if guild_state["guild_name"] != guild_name: + update_guild_state( + guild_id, + guild_name=guild_name, + cooldown_until=guild_state["cooldown_until"], + self_control=guild_state["self_control"] + ) + + # Check cooldown + cooldown_dt = datetime.fromisoformat(guild_state["cooldown_until"]) + now_dt = datetime.now() + + if now_dt >= cooldown_dt: + self_control = guild_state["self_control"] + + # Probability check + if random.random() < self_control: + new_self_control = self_control * 0.9 + update_guild_state( + guild_id, + guild_name=guild_name, + cooldown_until=now_dt.isoformat(), + self_control=new_self_control + ) + return False + + # Calculate new cooldown duration + wait_time = random.randint(0, 900) + if bool(random.getrandbits(1)): + wait_time = random.randint(900, 10800) + + new_cooldown_dt = now_dt.replace(second=0, microsecond=0) + new_cooldown_dt = new_cooldown_dt.replace(minute=new_cooldown_dt.minute + wait_time // 60) + new_cooldown_dt = new_cooldown_dt.replace(hour=new_cooldown_dt.hour + wait_time // 3600) + + update_guild_state( + guild_id, + guild_name=guild_name, + cooldown_until=new_cooldown_dt.isoformat(), + self_control=self_control + 1.0 + ) + + await message.channel.send(rhyme) + return True + + return False + + +async def handle_message(message, client) -> bool: + """Main entry point for message handling.""" + # Initialize database and log file on first run + _ensure_db() + _ensure_log_file() + + # Handle debug commands first + if await handle_debug_commands(message, client): + return True + + # Process rhyme logic + return await handle_rhyme_logic(message, client) \ No newline at end of file diff --git a/parser_txt_to_json.py b/scripts/parser_txt_to_json.py similarity index 100% rename from parser_txt_to_json.py rename to scripts/parser_txt_to_json.py diff --git a/riddle_scrapper.py b/scripts/riddle_scrapper.py similarity index 100% rename from riddle_scrapper.py rename to scripts/riddle_scrapper.py