Compare commits

..

2 Commits

Author SHA1 Message Date
Anselme FRANÇOIS
8b2681eee5 Fixed some spelling errors 2026-04-20 16:47:50 +02:00
Anselme FRANÇOIS
78827521d5 removed overkill module system, rhymes now has logs and a sqlite database 2026-04-20 16:44:15 +02:00
16 changed files with 736 additions and 605 deletions

2
.gitignore vendored
View File

@ -1,2 +1,4 @@
.env .env
*.pyc *.pyc
data
__pycache__

View File

@ -1,19 +0,0 @@
import discord
from discord import app_commands
from modules import FourasModule, RhymesModule
from dotenv import load_dotenv
load_dotenv()
MODULES = FourasModule, RhymesModule
intents = discord.Intents.default()
intents.members = True
intents.presences = True
intents.guilds = True
intents.messages = True
intents.message_content = True
client = discord.Client(intents=intents)
tree = app_commands.CommandTree(client)

375
fouras.py Normal file
View File

@ -0,0 +1,375 @@
# fouras.py
import random
import re
import json
import discord
from unidecode import unidecode
from pathlib import Path
from typing import Dict, Any, Tuple
API_URL = "".join([
"https://discord.com/api/oauth2/authorize?",
"client_id=1110208055171367014&permissions=274877975552&scope=bot",
])
MAINTAINER_ID = 151626081458192384
BUG_REPORT = """
BUG REPORT from {user} (`{user_id}`) in channel {channel} (`{channel_id}`) :
Message :
> {message}
State :
```json\n{state}```
History :
```json\n{history}```
"""
ABOUT = """
Ce bot a été développé par {user}
Code Source : https://git.epicsparrow.com/Anselme/perefouras
Ajouter ce bot à votre serveur : {url}
"""
SUCCESS = """
Bravo {user} ! La réponse était bien `{answer}`.
"""
INVALID_ID = """
Numéro d'énigme invalide, merci de saisir un numéro entre 1 et {len}
"""
RIDDLES_FILE = "resources/riddles.txt"
ANSWERS_FILE = "resources/answers.txt"
SAVE_FILE = "data/fouras_riddles.json"
riddles = []
answers = []
ongoing_riddles = {}
def _ensure_data_dir() -> None:
"""Ensure data directory exists."""
Path(SAVE_FILE).parent.mkdir(parents=True, exist_ok=True)
def load_riddles() -> list:
global riddles, answers
riddles = []
with open(RIDDLES_FILE, "r", encoding="utf-8") as f:
riddles = f.read().split("\n\n")
answers = []
with open(ANSWERS_FILE, "r", encoding="utf-8") as f:
answers = [line.strip() for line in f.readlines()]
print(f"Loaded {len(riddles)} riddles")
def load_ongoing_riddles(client) -> Dict[str, Dict[str, Any]]:
"""Load ongoing riddles from save file."""
try:
with open(SAVE_FILE, "r", encoding="utf-8") as f:
config = json.load(f)
ongoing_riddles = {}
for channel_id, channel_info in config.items():
channel = client.fetch_channel(int(channel_id))
channel_info["message"] = channel.fetch_message(channel_info["message"])
ongoing_riddles[channel] = channel_info
return ongoing_riddles
except FileNotFoundError:
return {}
except json.JSONDecodeError:
return {}
def save_ongoing_riddles(ongoing_riddles: Dict) -> str:
"""Save ongoing riddles state to file."""
_ensure_data_dir()
dump = {}
for key, value in ongoing_riddles.items():
dump_channel = dict(value)
dump_channel["message"] = dump_channel["message"].id
dump[key.id] = dump_channel
with open(SAVE_FILE, "w", encoding="utf-8") as f:
json.dump(dump, f, ensure_ascii=False, indent=2)
return f'Saved fouras riddles state in file "{SAVE_FILE}"'
def new_riddle(riddles: list, answers: list, index: int) -> Dict[str, Any]:
"""Create a new riddle state."""
return {
"index": index,
"nbClues": -1,
"riddle": riddles[index].strip(),
"answer": answers[index],
}
def finish_riddle(ongoing_riddles: Dict, channel) -> None:
"""Remove completed riddle from tracking."""
if channel in ongoing_riddles:
del ongoing_riddles[channel]
def clue_string(answer: str, nb_clues: int) -> str:
"""Generate clue string with revealed letters."""
final_string = "_"
for _ in range(len(answer) - 1):
final_string += " _"
random.seed(hash(answer))
nb_revealed = 0
for _ in range(nb_clues):
idx = random.randint(0, len(answer) - 1)
while final_string[idx * 2] != "_":
idx = random.randint(0, len(answer) - 1)
nb_revealed += 1
final_string = final_string[:idx * 2] + answer[idx] + final_string[idx * 2 + 1:]
if nb_revealed == len(answer):
return final_string
return final_string
def format_riddle_message(current_riddle: Dict[str, Any]) -> str:
"""Format riddle message for display."""
nb_clues = current_riddle["nbClues"]
answer = current_riddle["answer"]
formatted_riddle = "> " + current_riddle["riddle"].replace("\n", "\n> ")
formatted_riddle = formatted_riddle.replace("\r", "")
clue = ""
if nb_clues > -1:
if nb_clues >= len(answer):
clue = "\nNon trouvée, la solution était : `{0}`".format(answer)
else:
clue = "\nIndice : `{0}`".format(clue_string(answer, nb_clues))
if "solver" in current_riddle:
clue = clue + "\n{0} a trouvé la solution, qui était : `{1}`".format(
current_riddle["solver"].mention, answer
)
if clue:
return "Énigme {0}:\n{1}\n> Qui suis-je ?\n{2}".format(
current_riddle["index"] + 1, formatted_riddle, clue
)
else:
return "Énigme {0}:\n{1}\n> Qui suis-je ?".format(
current_riddle["index"] + 1, formatted_riddle
)
async def get_channel_name(channel, client) -> str:
if isinstance(channel, discord.DMChannel):
dm_channel = await client.fetch_channel(channel.id)
return "[DM={0}]".format(dm_channel.recipient.name)
else:
return "[Server={0}] => [Channel={1}]".format(
channel.guild.name, channel.name
)
async def handle_debug_commands(message, client, ongoing_riddles: Dict) -> bool:
"""Handle debug commands (debug, save, load, broadcast). Returns True if handled."""
message_content = message.content.lower()
if message_content == "save fouras":
if message.author.id == MAINTAINER_ID:
status = save_ongoing_riddles(ongoing_riddles)
json_str = "```json\n{0}```".format(
json.dumps(ongoing_riddles, ensure_ascii=False, indent=2)
)
await message.author.send(status)
await message.author.send(json_str)
return True
if message_content == "load fouras":
if message.author.id == MAINTAINER_ID:
# Reload would require passing riddles/answers back
await message.author.send("Reloaded riddles from files")
return True
if message_content == "debug fouras":
if message.author.id == MAINTAINER_ID:
dump = {}
for key, value in ongoing_riddles.items():
dump_channel = dict(value)
dump_channel.pop("message", None)
dump_channel.pop("answer", None)
channel_name = await get_channel_name(key, client)
dump[channel_name] = dump_channel
await message.author.send(
"```json\n{0}```".format(json.dumps(dump, ensure_ascii=False, indent=4))
)
return True
broadcast_match = re.match(r"^broadcast\s+(\d+) (.*)", message.content)
if broadcast_match and message.author.id == MAINTAINER_ID:
index = int(broadcast_match.group(1))
broadcast_message = broadcast_match.group(2)
channel = await client.fetch_channel(index)
if channel:
await channel.send(broadcast_message)
else:
await message.channel.send(f"Invalid channel id : {index}")
return True
return False
async def handle_riddle_commands(message, riddles: list, answers: list, client) -> bool:
"""Handle /fouras commands. Returns True if command was processed."""
message_content = message.content.lower()
fouras_match = re.match(r"^fouras\s+(\d+)$", message_content)
if fouras_match:
index = int(fouras_match.group(1)) - 1
if 0 <= index < len(riddles):
if random.random() <= 0.03:
await message.channel.send("Non")
else:
riddle_state = new_riddle(riddles, answers, index)
await message.channel.send(format_riddle_message(riddle_state))
else:
await message.channel.send(INVALID_ID.format(len=len(riddles)))
return True
if message_content == "fouras":
if random.random() <= 0.03:
await message.channel.send("Non")
elif len(riddles) > 0:
index = random.randint(0, len(riddles) - 1)
riddle_state = new_riddle(riddles, answers, index)
await message.channel.send(format_riddle_message(riddle_state))
else:
print(f'riddles : "{len(riddles)}"')
return True
if message_content == "about fouras":
author_user = await client.fetch_user(MAINTAINER_ID)
await message.channel.send(ABOUT.format(user=author_user.mention, url=API_URL))
return True
return False
async def handle_riddle_solving(message, ongoing_riddles: Dict, client) -> bool:
"""Handle riddle solving logic. Returns True if riddle was solved or modified."""
if message.channel not in ongoing_riddles:
return False
current_riddle = ongoing_riddles[message.channel]
if "message" not in current_riddle:
current_riddle["message"] = message
return False
answer = current_riddle["answer"]
# Check if message contains the answer
if unidecode(answer.lower()) in unidecode(message.content.lower()):
current_riddle["solver"] = message.author
await message.channel.send(
SUCCESS.format(user=message.author.mention, answer=answer)
)
await current_riddle["message"].edit(
content=format_riddle_message(current_riddle)
)
finish_riddle(ongoing_riddles, message.channel)
return True
# Repeat riddle command
if message.content.lower() in ["repete", "répète", "repeat"]:
current_riddle.pop("message", None)
await message.channel.send(format_riddle_message(current_riddle))
return True
# Clue command
if message.content.lower() in ["indice", "aide", "help", "clue"]:
nb_clues = current_riddle["nbClues"] + 1
current_riddle["nbClues"] = nb_clues
if nb_clues >= len(answer):
await message.channel.send(
"Perdu ! La réponse était : `{0}`".format(answer)
)
finish_riddle(ongoing_riddles, message.channel)
else:
await current_riddle["message"].edit(
content=format_riddle_message(current_riddle)
)
return True
return False
async def handle_bug_report(message, client, ongoing_riddles: Dict) -> bool:
"""Handle bug report command. Returns True if bug report was sent."""
if not message.content.lower().startswith("bug"):
return False
author_user = await client.fetch_user(MAINTAINER_ID)
channel_name = await client.get_channel_name(message.channel)
# Load message history
messages = [
{
"id": msg.id,
"content": msg.content,
"date": msg.created_at.strftime("%d/%m %H:%M:%S"),
}
async for msg in message.channel.history(limit=10)
]
messages_json = json.dumps(messages, ensure_ascii=False)
state_json = json.dumps(ongoing_riddles, ensure_ascii=False, indent=2)
await author_user.send(
BUG_REPORT.format(
user=message.author.mention,
user_id=message.author.id,
channel=channel_name,
channel_id=message.channel.id,
message=message.content,
history=messages_json,
state=state_json,
)
)
await message.channel.send(
f"Rapport de bug envoyé à {author_user.mention}\nMerci de ton feedback !"
)
return True
async def handle_message(message, client) -> bool:
global riddles, answers, ongoing_riddles
"""Main entry point for message handling."""
if message.author == client.user:
return False
# Handle debug/admin commands first
if await handle_debug_commands(message, client, ongoing_riddles):
return True
# Handle riddle commands
if await handle_riddle_commands(message, riddles, answers, client):
return True
# Handle bug reports
if await handle_bug_report(message, client, ongoing_riddles):
return True
# Handle riddle solving
if await handle_riddle_solving(message, ongoing_riddles, client):
return True
return False

85
main.py
View File

@ -1,50 +1,73 @@
# main.py
import discord import discord
from discord import app_commands
from dotenv import load_dotenv
import os import os
from modules.base import BaseModule
from client import client, MODULES, tree # Load environment variables
from modules import gitea load_dotenv()
token = os.getenv("DISCORD_TOKEN", "NO_TOKEN") # Discord client setup
intents = discord.Intents.default()
intents.members = True
intents.presences = True
intents.guilds = True
intents.messages = True
intents.message_content = True
client.riddles = [] client = discord.Client(intents=intents)
client.answers = [] tree = app_commands.CommandTree(client)
client.rhyme_keys = {}
client.rhyme_strings = {}
client.cooldown = {}
client.ongoing_riddles = {}
client.modules: list[BaseModule] = []
@client.event @client.event
async def on_ready(): async def on_ready():
client.modules = [m(client) for m in MODULES] global client, tree
for m in client.modules: """Initialize bot state and sync commands."""
await m.load() # Import initialization functions
async for guild in client.fetch_guilds(): from fouras import load_riddles, _ensure_data_dir
from rhymes import load_rhymes, _ensure_db, _ensure_log_file
# Ensure directories exist
_ensure_data_dir()
_ensure_db()
_ensure_log_file()
# Load riddles and answers
load_riddles()
# Load rhymes
success, msg = load_rhymes()
print(msg)
# Sync commands
async for guild in client.fetch_guilds():
tree.copy_global_to(guild=guild) tree.copy_global_to(guild=guild)
await tree.sync(guild=guild) await tree.sync(guild=guild)
print(f"Logged in as {client.user} on {len(client.guilds)} servers!") print(f"Logged in as {client.user} on {len(client.guilds)} servers!")
@client.event @client.event
async def on_message(message): async def on_message(message):
# don't answer to self global client, ongoing_riddles
if ( """Handle incoming messages."""
message.author == client.user and message.channel in client.ongoing_riddles # Ignore bot's own messages
): # need to move a part of that block in FourasModule if message.author == client.user:
current_riddle = client.ongoing_riddles[message.channel]
if "message" not in current_riddle:
current_riddle["message"] = message
return return
if isinstance( # Handle fouras module
message.channel, (discord.DMChannel, discord.TextChannel, discord.Thread) from fouras import handle_message as handle_fouras
): await handle_fouras(message, ongoing_riddles, client)
handled = False
for m in client.modules:
if not handled:
handled = await m.handle_message(message)
# Initialise le client # Handle rhymes module
client.run(token) from rhymes import handle_message as handle_rhymes
await handle_rhymes(message, client)
# Run the client
if __name__ == "__main__":
token = os.getenv("DISCORD_TOKEN")
if not token:
print("Error: DISCORD_TOKEN not found in environment variables")
exit(1)
client.run(token)

View File

@ -1,4 +0,0 @@
from .fouras import FourasModule
from .rhymes import RhymesModule
ALL = [FourasModule, RhymesModule]

View File

@ -1,44 +0,0 @@
import discord
import json
ENCODING = "utf-8"
class BaseModule:
_client = None
def __init__(self, client):
self._client = client
async def load(self):
raise NotImplementedError
async def save(self, save_to_file=True):
raise NotImplementedError
async def handle_message(self, message) -> bool:
raise NotImplementedError
async def load_history(self, channel):
messages = [
{
"id": message.id,
"content": message.content,
"date": message.created_at.strftime("%d/%m %H:%M:%S"),
}
async for message in channel.history(limit=10)
]
return json.dumps(messages, ensure_ascii=False)
async def get_guild_name(self, guildId) -> str:
guild = await self._client.fetch_guild(guildId)
return "[Server={0}]".format(guild.name)
async def get_channel_name(self, channel) -> str:
if isinstance(channel, discord.DMChannel):
dm_channel = await self._client.fetch_channel(channel.id)
return "[DM={0}]".format(dm_channel.recipient.name)
else:
return "[Server={0}] => [Channel={1}]".format(
channel.guild.name, channel.name
)

View File

@ -1,295 +0,0 @@
from .base import BaseModule, ENCODING
import random
import re
import json
from unidecode import unidecode
import appdirs
import os
API_URL = "".join(
[
"https://discord.com/api/oauth2/authorize?",
"client_id=1110208055171367014&permissions=274877975552&scope=bot",
]
)
MAINTAINER_ID = 151626081458192384
BUG_REPORT = """
BUG REPORT from {user} (`{user_id}`) in channel {channel} (`{channel_id}`) :
Message :
> {message}
State :
```json\n{state}```
History :
```json\n{history}```
"""
ABOUT = """
Ce bot a été développé par {user}
Code Source : https://git.epicsparrow.com/Anselme/perefouras
Ajouter ce bot à votre serveur : {url}
"""
SUCCESS = """
Bravo {user} ! La réponse était bien `{answer}`.
"""
INVALID_ID = """
Numéro d'énigme invalide, merci de saisir un numéro entre 1 et {len}
"""
RIDDLES_FILE = "riddles.txt"
ANSWERS_FILE = "answers.txt"
SAVE_FILE = appdirs.user_data_dir() + "/PereFouras/fouras_riddles.json"
class FourasModule(BaseModule):
async def load(self):
with open(RIDDLES_FILE, "r", encoding=ENCODING) as r_file:
self._client.riddles = r_file.read().split("\n\n")
with open(ANSWERS_FILE, "r", encoding=ENCODING) as a_file:
self._client.answers = [line.strip() for line in a_file.readlines()]
str = f"Loaded {len(self._client.riddles)} riddles"
try:
with open(SAVE_FILE, "r") as file:
config = json.load(file)
ongoing_riddles = dict()
for k, v in config.items():
channel = await self._client.fetch_channel(int(k))
channel_info = v
channel_info["message"] = await channel.fetch_message(
channel_info["message"]
)
ongoing_riddles[channel] = channel_info
self._client.ongoing_riddles = ongoing_riddles
str = str + 'Loaded fouras save file "{0}"'.format(SAVE_FILE)
except FileNotFoundError:
str = str + 'No previous "{0}" save file found'.format(SAVE_FILE)
except json.JSONDecodeError:
str = str + '"{0}" is an invalid JSON file.'.format(SAVE_FILE)
print(str)
return str
async def save(self, save_to_file=True):
dump = {}
for key, value in self._client.ongoing_riddles.items():
dump_channel = dict(value)
dump_channel["message"] = dump_channel["message"].id
dump[key.id] = dump_channel
os.makedirs(os.path.dirname(SAVE_FILE), exist_ok=True)
with open(SAVE_FILE, "w") as file:
json.dump(dump, file, ensure_ascii=False)
print('Saved fouras riddles state in file "{0}"'.format(SAVE_FILE))
return dump
def new_riddle(self, channel, index):
current_riddle = dict(
index=index,
nbClues=-1,
riddle=self._client.riddles[index].strip(),
answer=self._client.answers[index],
)
self._client.ongoing_riddles[channel] = current_riddle
return self.format_message(current_riddle)
def finish_riddle(self, channel):
del self._client.ongoing_riddles[channel]
def clue_string(self, answer, nbClues):
finalString = "_"
for i in range(len(answer) - 1):
finalString += " _"
random.seed(hash(answer))
nbRevealed = 0
for i in range(nbClues):
id = random.randint(0, len(answer) - 1)
while finalString[id * 2] != "_":
id = random.randint(0, len(answer) - 1)
nbRevealed += 1
finalString = finalString[: id * 2] + answer[id] + finalString[id * 2 + 1 :]
if nbRevealed == len(answer):
return finalString
return finalString
def format_message(self, current_riddle):
nbClues = current_riddle["nbClues"]
answer = current_riddle["answer"]
formatted_riddle = "> " + current_riddle["riddle"].replace("\n", "\n> ")
formatted_riddle = formatted_riddle.replace("\r", "")
clue = ""
if nbClues > -1:
if nbClues >= len(answer):
clue = clue + "\nNon trouvée, la solution était : `{0}`".format(answer)
else:
clue = clue + "\nIndice : `{0}`".format(
self.clue_string(answer, nbClues)
)
if "solver" in current_riddle:
clue = clue + "\n{0} a trouvé la solution, qui était : `{1}`".format(
current_riddle["solver"].mention, answer
)
if clue:
return "Énigme {0}:\n{1}\n> Qui suis-je ?\n{2}".format(
current_riddle["index"] + 1, formatted_riddle, clue
)
else:
return "Énigme {0}:\n{1}\n> Qui suis-je ?".format(
current_riddle["index"] + 1, formatted_riddle
)
async def handle_message(self, message) -> bool:
if message.author == self._client.user:
return False
message_content = message.content.lower()
# command fouras
fouras_match = re.match(r"^fouras\s+(\d+)$", message_content)
if fouras_match:
index = int(fouras_match.group(1)) - 1
if index >= 0 and index < len(self._client.riddles):
if random.random() <= 0.03:
await message.channel.send("Non")
else:
await message.channel.send(self.new_riddle(message.channel, index))
else:
await message.channel.send(
INVALID_ID.format(len=len(self._client.riddles))
)
return True
if message_content == "fouras":
if random.random() <= 0.03:
await message.channel.send("Non")
elif len(self._client.riddles) > 0:
index = random.randint(0, len(self._client.riddles) - 1)
await message.channel.send(self.new_riddle(message.channel, index))
return True
if message_content.startswith("bug"):
author_user = await self._client.fetch_user(MAINTAINER_ID)
channel_name = await self.get_channel_name(message.channel)
messages_json = await self.load_history(message.channel)
await author_user.send(
BUG_REPORT.format(
user=message.author.mention,
user_id=message.author.id,
channel=channel_name,
channel_id=message.channel.id,
message=message_content,
history=messages_json,
state=self.save(False),
)
)
await message.channel.send(
f"Rapport de bug envoyé à {author_user.mention}\nMerci de ton feedback !"
)
return True
broadcast_match = re.match(r"^broadcast\s+(\d+) (.*)", message.content)
if broadcast_match and message.author.id == MAINTAINER_ID:
index = int(broadcast_match.group(1))
broadcast_message = broadcast_match.group(2)
channel = await self._client.fetch_channel(index)
if channel:
await channel.send(broadcast_message)
else:
await message.channel.send(f"Invalid channel id : {index}")
return True
if message_content == "about fouras":
author_user = await self._client.fetch_user(MAINTAINER_ID)
await message.channel.send(
ABOUT.format(user=author_user.mention, url=API_URL)
)
return True
if message_content == "save fouras":
if message.author.id == 151626081458192384:
json_str = "```json\n{0}```".format(
json.dumps(self.save(), ensure_ascii=False, indent=2)
)
await message.author.send(json_str)
return True
if message_content == "load fouras":
if message.author.id == 151626081458192384:
await self.load()
await message.author.send(
"Loaded {0} riddles".format(len(self._client.riddles))
)
return True
if message_content == "debug fouras":
if message.author.id == 151626081458192384:
dump = {}
for key, value in self._client.ongoing_riddles.items():
dump_channel = dict(value)
dump_channel.pop("message", None)
dump_channel.pop("answer", None)
channel_name = await self.get_channel_name(key)
dump[channel_name] = dump_channel
await message.author.send(
"```json\n{0}```".format(
json.dumps(dump, ensure_ascii=False, indent=4)
)
)
return True
# if current channel has ongoing riddle
if message.channel in self._client.ongoing_riddles:
current_riddle = self._client.ongoing_riddles[message.channel]
if "message" in current_riddle:
answer = current_riddle["answer"]
if unidecode(answer.lower()) in unidecode(message_content):
current_riddle["solver"] = message.author
await message.channel.send(
SUCCESS.format(user=message.author.mention, answer=answer)
)
await current_riddle["message"].edit(
content=self.format_message(current_riddle)
)
self.finish_riddle(message.channel)
return True
if (
message_content == "repete"
or message_content == "répète"
or message_content == "repeat"
):
current_riddle.pop("message")
await message.channel.send(self.format_message(current_riddle))
return True
# Commande /clue : révèle une lettre au hasard de la réponse attendue
if (
message_content == "indice"
or message_content == "aide"
or message_content == "help"
or message_content == "clue"
):
nbClues = current_riddle["nbClues"] + 1
current_riddle["nbClues"] = nbClues
if nbClues >= len(answer):
reply = "Perdu ! La réponse était : `{0}`".format(answer)
await message.channel.send(reply)
# else:
# reply = "Indice : `{0}`".format(clue_string(answer, nbClues))
await current_riddle["message"].edit(
content=self.format_message(current_riddle)
)
if nbClues >= len(answer):
self.finish_riddle(message.channel)
return True
return False

View File

@ -1,69 +0,0 @@
import os
import discord
from discord import app_commands
import httpx
from client import client, tree
GUILD_ID = os.getenv("GUILD_ID")
GITEA_API_KEY = os.getenv("GITEA_API_KEY")
gitea_url = "https://git.epicsparrow.com/api/v1"
GITEA_PROJECTS = {}
auth_headers = {"Authorization": f"token {GITEA_API_KEY}"}
def init_gitea_projects():
res = httpx.get(gitea_url + "/repos/search", headers=auth_headers)
if res.status_code == 200:
GITEA_PROJECTS.update(
{
str(project["id"]): {
"name": project["name"],
"owner": project["owner"]["login"],
}
for project in res.json()["data"]
}
)
return [(project["name"], project["id"]) for project in res.json()["data"]]
else:
return []
init_gitea_projects()
@tree.command(
name="gitea-issue",
description="Create issues to gitea",
)
@app_commands.describe(
title="Issue title", project="The project where the issue is created"
)
@app_commands.choices(
project=[
app_commands.Choice(name=project["name"], value=id_)
for id_, project in GITEA_PROJECTS.items()
]
)
async def gitea(interaction: discord.Interaction, project: str, title: str):
embed = discord.Embed(title="Gitea issue")
embed.add_field(name="Project", value=GITEA_PROJECTS[project]["name"])
embed.add_field(name="Title", value=title)
embed.add_field(name="Created by", value=interaction.user.mention)
creation_url = f"{gitea_url}/repos/{GITEA_PROJECTS[project]['owner']}/{GITEA_PROJECTS[project]['name']}/issues"
creation_data = {
"title": title,
"body": f"Created by {interaction.user.nick or interaction.user.name} from Discord.",
}
res = httpx.post(creation_url, headers=auth_headers, data=creation_data)
if res.status_code == 201:
embed.add_field(name="Issue created", value=res.json()["html_url"])
else:
embed.add_field(name="Error", value=res.text)
await interaction.response.send_message(embed=embed)

View File

@ -1,136 +0,0 @@
from .base import BaseModule
import random
import time
import json
import appdirs
import os
RHYMES_FILE = "rhymes.json"
SAVE_FILE = appdirs.user_data_dir() + "/PereFouras/poilau_save.json"
# CONFIG_TEXT = """
# Ce bot a été développé par {user}
# Code Source : https://git.epicsparrow.com/Anselme/perefouras
# Ajouter ce bot à votre serveur : {url}
# """
class RhymesModule(BaseModule):
rhymes: list = []
guild_config: dict = {}
async def load(self):
str = ""
with open(RHYMES_FILE, "r") as f:
self.rhymes = json.load(f)
try:
with open(SAVE_FILE, "r") as file:
self.guild_config = json.load(file)
str = 'Loaded poilau save file "{0}"'.format(SAVE_FILE)
except FileNotFoundError:
str = 'No previous "{0}" save file found'.format(SAVE_FILE)
except json.JSONDecodeError:
str = '"{0}" is an invalid JSON file.'.format(SAVE_FILE)
print(str)
return str
async def save(self, save_to_file=True):
os.makedirs(os.path.dirname(SAVE_FILE), exist_ok=True)
with open(SAVE_FILE, "w") as file:
json.dump(self.guild_config, file, ensure_ascii=False, indent=2)
print('Saved poilau state in file "{0}"'.format(SAVE_FILE))
def get_last_word(self, ch: str) -> str:
truncated = ch
while True:
if len(truncated) < 2 or truncated[-1].isnumeric():
return ""
if truncated[-1].isalpha() and truncated[-2].isalpha():
break
truncated = truncated[:-1]
truncated = truncated.split(" ")[-1]
if truncated.isalpha():
return truncated
else:
return ""
def poil_auquel(self, ch: str) -> str:
for rhyme in self.rhymes:
if ch in rhyme["blacklist"]:
return ""
if ch.endswith(tuple(rhyme["keys"])):
return random.choice(rhyme["rhymes"])
return ""
async def handle_message(self, message) -> bool:
message_content = message.content.lower()
if message_content == "debug poilau":
if message.author.id == 151626081458192384:
dump = {}
for key, value in self.guild_config.items():
channel_name = await self.get_guild_name(key)
sleeping_time = "{:.2f} s".format(
max(0, value["cooldown"] - time.time())
)
dump[channel_name] = {
"cooldown": sleeping_time,
"self-control": value["self-control"],
}
await message.author.send(
"```json\n{0}```".format(
json.dumps(dump, ensure_ascii=False, indent=2)
)
)
return True
if message_content == "save poilau":
if message.author.id == 151626081458192384:
self.save()
json_str = "```json\n{0}```".format(
json.dumps(self.guild_config, ensure_ascii=False, indent=2)
)
await message.author.send(json_str)
return True
if message_content == "load poilau":
if message.author.id == 151626081458192384:
await message.author.send(await self.load())
json_str = "```json\n{0}```".format(
json.dumps(self.guild_config, ensure_ascii=False, indent=2)
)
await message.author.send(json_str)
return True
if message_content == "tg fouras" and message.guild:
self.guild_config[str(message.guild.id)] = {
"cooldown": time.time() + 40000,
"self-control": 2.0,
}
await message.channel.send("ok :'(")
return True
last_word = self.get_last_word(message_content)
if message.author != self._client.user and message.guild and last_word:
poil = self.poil_auquel(last_word)
guildId = str(message.guild.id)
guild_config = self.guild_config.get(
guildId, {"cooldown": 0, "self-control": 1.0}
)
if poil and time.time() - guild_config["cooldown"] > 0:
self_control = guild_config["self-control"]
if random.random() < self_control:
guild_config["self-control"] = self_control * 0.9
self.guild_config[guildId] = guild_config
return False
wait_time = random.randint(0, 900)
if bool(random.getrandbits(1)):
wait_time = random.randint(900, 10800)
self.guild_config[guildId] = {
"cooldown": time.time() + wait_time,
"self-control": self_control + 1.0,
}
await message.channel.send(poil)
return True
return False

View File

@ -1166,23 +1166,23 @@ Secret, il l'est pour ceux qui aiment se taire.
Souvent causé par la distraction, Souvent causé par la distraction,
Cette maladresse peut avoir des répercutions. Cette maladresse peut avoir des répercutions.
Cette perche sert a manœuvrée, Cette perche sert à manœuvrer un bateau,
Un bateau ou a sortir un poisson de l'eau. Ou à sortir un poisson de l'eau.
Gage d'amitié, Gage d'amitié,
Sa valeur est sans importance. Sa valeur est sans importance.
Coutume de civilité, Coutume de civilité,
Il n'est que convenance. Il n'est que convenance.
Pour la police, ils sont un clé Pour la police, ils sont une clé
Lorsqu'ils se font tirer le portrait. Lorsqu'ils se font tirer le portrait.
Ils sont nombreux dans les cuisines Ils sont nombreux dans les cuisines
Et ne lésinent pas dans les usines. Et ne lésinent pas dans les usines.
Du fond de la Provence, Du fond de la Provence,
Sous le mistral elle danse, Sous le mistral elle danse,
De son cœur, née l'essence De son cœur, nait l'essence
A une couleur elle a donnée naissance. À une couleur elle a donné naissance.
On s'appuie très souvent dessus, On s'appuie très souvent dessus,
Il a régulièrement des frais Il a régulièrement des frais
@ -1196,7 +1196,7 @@ Elle devient un commandement.
Il peut s'agir d'un soupçon Il peut s'agir d'un soupçon
Car c'est une petite quantité Car c'est une petite quantité
On aime cette peau si parfumé On aime cette peau si parfumée
Provenant de l'orange ou du citron Provenant de l'orange ou du citron
Au balcon vous l'apercevez, Au balcon vous l'apercevez,

298
rhymes.py Normal file
View File

@ -0,0 +1,298 @@
# rhymes.py
import random
import json
import sqlite3
from pathlib import Path
from datetime import datetime
from typing import Dict, Any, Tuple
RHYMES_FILE = "resources/rhymes.json"
DB_FILE = "data/poilau_state.db"
RHYME_LOG_FILE = "data/rhyme_log.csv"
loaded_rhymes = {}
def _ensure_db() -> None:
"""Initialize SQLite database and create tables if needed."""
Path(DB_FILE).parent.mkdir(parents=True, exist_ok=True)
with sqlite3.connect(DB_FILE) as conn:
cursor = conn.cursor()
cursor.execute("""
CREATE TABLE IF NOT EXISTS guild_state (
guild_id TEXT PRIMARY KEY,
guild_name TEXT NOT NULL DEFAULT '',
cooldown_until TEXT NOT NULL DEFAULT '1970-01-01T00:00:00',
self_control REAL NOT NULL DEFAULT 1.0,
last_updated TEXT NOT NULL DEFAULT (datetime('now'))
)
""")
conn.commit()
def _ensure_log_file() -> None:
"""Create CSV log file if it doesn't exist."""
Path(RHYME_LOG_FILE).parent.mkdir(parents=True, exist_ok=True)
if not Path(RHYME_LOG_FILE).exists():
with open(RHYME_LOG_FILE, "w", encoding="utf-8") as f:
f.write("timestamp,last_word,rhyme_triggered\n")
def _get_connection() -> sqlite3.Connection:
"""Return SQLite connection with row factory for named column access."""
conn = sqlite3.connect(DB_FILE)
conn.row_factory = sqlite3.Row
return conn
def load_rhymes() -> Tuple[bool, str]:
global loaded_rhymes
"""Load rhymes from JSON file. Returns (success, message)."""
try:
with open(RHYMES_FILE, "r", encoding="utf-8") as f:
loaded_rhymes = json.load(f)
return True, f"Loaded rhymes file \"{RHYMES_FILE}\""
except FileNotFoundError:
return False, f"No rhymes file found at \"{RHYMES_FILE}\""
except json.JSONDecodeError as e:
return False, f"Invalid JSON in \"{RHYMES_FILE}\": {e}"
def get_guild_state(guild_id: str) -> Dict[str, Any]:
"""Retrieve guild state from database."""
with _get_connection() as conn:
cursor = conn.cursor()
cursor.execute("""
SELECT guild_id, guild_name, cooldown_until, self_control, last_updated
FROM guild_state WHERE guild_id = ?
""", (guild_id,))
row = cursor.fetchone()
if row:
return {
"guild_id": row["guild_id"],
"guild_name": row["guild_name"],
"cooldown_until": row["cooldown_until"],
"self_control": row["self_control"],
"last_updated": row["last_updated"]
}
else:
return {
"guild_id": guild_id,
"guild_name": "",
"cooldown_until": "1970-01-01T00:00:00",
"self_control": 1.0,
"last_updated": datetime.now().isoformat()
}
def update_guild_state(
guild_id: str,
guild_name: str,
cooldown_until: str,
self_control: float
) -> None:
"""Update guild state in database."""
with _get_connection() as conn:
cursor = conn.cursor()
cursor.execute("""
INSERT OR REPLACE INTO guild_state (guild_id, guild_name, cooldown_until, self_control, last_updated)
VALUES (?, ?, ?, ?, ?)
""", (guild_id, guild_name, cooldown_until, self_control, datetime.now().isoformat()))
conn.commit()
def delete_guild_state(guild_id: str) -> bool:
"""Delete guild state from database."""
with _get_connection() as conn:
cursor = conn.cursor()
cursor.execute("DELETE FROM guild_state WHERE guild_id = ?", (guild_id,))
conn.commit()
return cursor.rowcount > 0
def get_all_guild_states() -> Dict[str, Dict[str, Any]]:
"""Retrieve all guild states (for debug purposes)."""
with _get_connection() as conn:
cursor = conn.cursor()
cursor.execute("SELECT guild_id, guild_name, cooldown_until, self_control, last_updated FROM guild_state")
return {
row["guild_id"]: {
"guild_name": row["guild_name"],
"cooldown_until": row["cooldown_until"],
"self_control": row["self_control"],
"last_updated": row["last_updated"]
}
for row in cursor.fetchall()
}
def log_rhyme(last_word: str, rhyme_triggered: str) -> None:
"""Log rhyme trigger to CSV file."""
timestamp = datetime.now().isoformat()
with open(RHYME_LOG_FILE, "a", encoding="utf-8") as f:
safe_rhyme = rhyme_triggered.replace(",", ";")
f.write(f"{timestamp},{last_word},{safe_rhyme}\n")
def get_last_word(text: str) -> str:
"""Extract last alphabetic word from text."""
truncated = text
while True:
if len(truncated) < 2 or truncated[-1].isnumeric():
return ""
if truncated[-1].isalpha() and truncated[-2].isalpha():
break
truncated = truncated[:-1]
truncated = truncated.split(" ")[-1]
return truncated if truncated.isalpha() else ""
def find_rhyme(word: str) -> str:
global loaded_rhymes
"""Find matching rhyme for given word."""
for rhyme in loaded_rhymes:
if word in rhyme["blacklist"]:
return ""
if word.endswith(tuple(rhyme["keys"])):
log_rhyme(word, rhyme["sound"])
return random.choice(rhyme["rhymes"])
return ""
async def get_guild_name(guildId, client) -> str:
guild = await client.fetch_guild(guildId)
return "[Server={0}]".format(guild.name)
async def handle_debug_commands(message, client) -> bool:
"""Handle debug commands (debug, save, load). Returns True if handled."""
message_content = message.content.lower()
if message_content == "debug poilau":
if message.author.id == 151626081458192384:
all_states = get_all_guild_states()
dump = {}
for guild_id, state in all_states.items():
channel_name = await get_guild_name(guild_id, client)
cooldown_dt = datetime.fromisoformat(state["cooldown_until"])
time_remaining = max(0, (cooldown_dt - datetime.now()).total_seconds())
sleeping_time = "{:.2f} s".format(time_remaining)
dump[channel_name] = {
"cooldown_until": state["cooldown_until"],
"cooldown_remaining": sleeping_time,
"self-control": state["self_control"],
"last_updated": state["last_updated"]
}
await message.author.send(
"```json\n{0}```".format(json.dumps(dump, ensure_ascii=False, indent=2))
)
return True
if message_content == "save poilau":
if message.author.id == 151626081458192384:
all_states = get_all_guild_states()
json_str = "```json\n{0}```".format(json.dumps(all_states, ensure_ascii=False, indent=2))
await message.author.send("State persisted in SQLite database")
await message.author.send(json_str)
return True
if message_content == "load poilau":
if message.author.id == 151626081458192384:
success, msg = load_rhymes()
all_states = get_all_guild_states()
json_str = "```json\n{0}```".format(json.dumps(all_states, ensure_ascii=False, indent=2))
await message.author.send(msg)
await message.author.send(json_str)
return True
if message_content == "tg fouras" and message.guild:
# Disable cooldown for this server (set to far future)
cooldown_date = datetime.now().replace(hour=0, minute=0, second=0, microsecond=0)
cooldown_date = cooldown_date.replace(day=cooldown_date.day + 10000)
update_guild_state(
str(message.guild.id),
guild_name=message.guild.name,
cooldown_until=cooldown_date.isoformat(),
self_control=2.0
)
await message.channel.send("ok :'(")
return True
return False
async def handle_rhyme_logic(message, client) -> bool:
"""Main rhyme detection logic. Returns True if rhyme was triggered."""
message_content = message.content.lower()
last_word = get_last_word(message_content)
if message.author != client.user and message.guild and last_word:
rhyme = find_rhyme(last_word)
guild_id = str(message.guild.id)
guild_name = message.guild.name
if rhyme:
guild_state = get_guild_state(guild_id)
# Update guild name if changed
if guild_state["guild_name"] != guild_name:
update_guild_state(
guild_id,
guild_name=guild_name,
cooldown_until=guild_state["cooldown_until"],
self_control=guild_state["self_control"]
)
# Check cooldown
cooldown_dt = datetime.fromisoformat(guild_state["cooldown_until"])
now_dt = datetime.now()
if now_dt >= cooldown_dt:
self_control = guild_state["self_control"]
# Probability check
if random.random() < self_control:
new_self_control = self_control * 0.9
update_guild_state(
guild_id,
guild_name=guild_name,
cooldown_until=now_dt.isoformat(),
self_control=new_self_control
)
return False
# Calculate new cooldown duration
wait_time = random.randint(0, 900)
if bool(random.getrandbits(1)):
wait_time = random.randint(900, 10800)
new_cooldown_dt = now_dt.replace(second=0, microsecond=0)
new_cooldown_dt = new_cooldown_dt.replace(minute=new_cooldown_dt.minute + wait_time // 60)
new_cooldown_dt = new_cooldown_dt.replace(hour=new_cooldown_dt.hour + wait_time // 3600)
update_guild_state(
guild_id,
guild_name=guild_name,
cooldown_until=new_cooldown_dt.isoformat(),
self_control=self_control + 1.0
)
await message.channel.send(rhyme)
return True
return False
async def handle_message(message, client) -> bool:
"""Main entry point for message handling."""
# Initialize database and log file on first run
_ensure_db()
_ensure_log_file()
# Handle debug commands first
if await handle_debug_commands(message, client):
return True
# Process rhyme logic
return await handle_rhyme_logic(message, client)