375 lines
12 KiB
Python
375 lines
12 KiB
Python
# fouras.py
|
|
import random
|
|
import re
|
|
import json
|
|
import discord
|
|
from unidecode import unidecode
|
|
from pathlib import Path
|
|
from typing import Dict, Any, Tuple
|
|
|
|
API_URL = "".join([
|
|
"https://discord.com/api/oauth2/authorize?",
|
|
"client_id=1110208055171367014&permissions=274877975552&scope=bot",
|
|
])
|
|
|
|
MAINTAINER_ID = 151626081458192384
|
|
|
|
BUG_REPORT = """
|
|
BUG REPORT from {user} (`{user_id}`) in channel {channel} (`{channel_id}`) :
|
|
|
|
Message :
|
|
> {message}
|
|
|
|
State :
|
|
```json\n{state}```
|
|
History :
|
|
```json\n{history}```
|
|
"""
|
|
|
|
ABOUT = """
|
|
Ce bot a été développé par {user}
|
|
Code Source : https://git.epicsparrow.com/Anselme/perefouras
|
|
Ajouter ce bot à votre serveur : {url}
|
|
"""
|
|
|
|
SUCCESS = """
|
|
Bravo {user} ! La réponse était bien `{answer}`.
|
|
"""
|
|
|
|
INVALID_ID = """
|
|
Numéro d'énigme invalide, merci de saisir un numéro entre 1 et {len}
|
|
"""
|
|
|
|
RIDDLES_FILE = "resources/riddles.txt"
|
|
ANSWERS_FILE = "resources/answers.txt"
|
|
SAVE_FILE = "data/fouras_riddles.json"
|
|
|
|
riddles = []
|
|
answers = []
|
|
ongoing_riddles = {}
|
|
|
|
|
|
def _ensure_data_dir() -> None:
|
|
"""Ensure data directory exists."""
|
|
Path(SAVE_FILE).parent.mkdir(parents=True, exist_ok=True)
|
|
|
|
|
|
def load_riddles() -> list:
|
|
global riddles, answers
|
|
|
|
riddles = []
|
|
with open(RIDDLES_FILE, "r", encoding="utf-8") as f:
|
|
riddles = f.read().split("\n\n")
|
|
answers = []
|
|
with open(ANSWERS_FILE, "r", encoding="utf-8") as f:
|
|
answers = [line.strip() for line in f.readlines()]
|
|
print(f"Loaded {len(riddles)} riddles")
|
|
|
|
|
|
def load_ongoing_riddles(client) -> Dict[str, Dict[str, Any]]:
|
|
"""Load ongoing riddles from save file."""
|
|
try:
|
|
with open(SAVE_FILE, "r", encoding="utf-8") as f:
|
|
config = json.load(f)
|
|
|
|
ongoing_riddles = {}
|
|
for channel_id, channel_info in config.items():
|
|
channel = client.fetch_channel(int(channel_id))
|
|
channel_info["message"] = channel.fetch_message(channel_info["message"])
|
|
ongoing_riddles[channel] = channel_info
|
|
|
|
return ongoing_riddles
|
|
except FileNotFoundError:
|
|
return {}
|
|
except json.JSONDecodeError:
|
|
return {}
|
|
|
|
|
|
def save_ongoing_riddles(ongoing_riddles: Dict) -> str:
|
|
"""Save ongoing riddles state to file."""
|
|
_ensure_data_dir()
|
|
dump = {}
|
|
for key, value in ongoing_riddles.items():
|
|
dump_channel = dict(value)
|
|
dump_channel["message"] = dump_channel["message"].id
|
|
dump[key.id] = dump_channel
|
|
|
|
with open(SAVE_FILE, "w", encoding="utf-8") as f:
|
|
json.dump(dump, f, ensure_ascii=False, indent=2)
|
|
|
|
return f'Saved fouras riddles state in file "{SAVE_FILE}"'
|
|
|
|
|
|
def new_riddle(riddles: list, answers: list, index: int) -> Dict[str, Any]:
|
|
"""Create a new riddle state."""
|
|
return {
|
|
"index": index,
|
|
"nbClues": -1,
|
|
"riddle": riddles[index].strip(),
|
|
"answer": answers[index],
|
|
}
|
|
|
|
|
|
def finish_riddle(ongoing_riddles: Dict, channel) -> None:
|
|
"""Remove completed riddle from tracking."""
|
|
if channel in ongoing_riddles:
|
|
del ongoing_riddles[channel]
|
|
|
|
|
|
def clue_string(answer: str, nb_clues: int) -> str:
|
|
"""Generate clue string with revealed letters."""
|
|
final_string = "_"
|
|
for _ in range(len(answer) - 1):
|
|
final_string += " _"
|
|
|
|
random.seed(hash(answer))
|
|
nb_revealed = 0
|
|
|
|
for _ in range(nb_clues):
|
|
idx = random.randint(0, len(answer) - 1)
|
|
while final_string[idx * 2] != "_":
|
|
idx = random.randint(0, len(answer) - 1)
|
|
|
|
nb_revealed += 1
|
|
final_string = final_string[:idx * 2] + answer[idx] + final_string[idx * 2 + 1:]
|
|
|
|
if nb_revealed == len(answer):
|
|
return final_string
|
|
|
|
return final_string
|
|
|
|
|
|
def format_riddle_message(current_riddle: Dict[str, Any]) -> str:
|
|
"""Format riddle message for display."""
|
|
nb_clues = current_riddle["nbClues"]
|
|
answer = current_riddle["answer"]
|
|
|
|
formatted_riddle = "> " + current_riddle["riddle"].replace("\n", "\n> ")
|
|
formatted_riddle = formatted_riddle.replace("\r", "")
|
|
|
|
clue = ""
|
|
if nb_clues > -1:
|
|
if nb_clues >= len(answer):
|
|
clue = "\nNon trouvée, la solution était : `{0}`".format(answer)
|
|
else:
|
|
clue = "\nIndice : `{0}`".format(clue_string(answer, nb_clues))
|
|
|
|
if "solver" in current_riddle:
|
|
clue = clue + "\n{0} a trouvé la solution, qui était : `{1}`".format(
|
|
current_riddle["solver"].mention, answer
|
|
)
|
|
|
|
if clue:
|
|
return "Énigme {0}:\n{1}\n> Qui suis-je ?\n{2}".format(
|
|
current_riddle["index"] + 1, formatted_riddle, clue
|
|
)
|
|
else:
|
|
return "Énigme {0}:\n{1}\n> Qui suis-je ?".format(
|
|
current_riddle["index"] + 1, formatted_riddle
|
|
)
|
|
|
|
async def get_channel_name(channel, client) -> str:
|
|
if isinstance(channel, discord.DMChannel):
|
|
dm_channel = await client.fetch_channel(channel.id)
|
|
return "[DM={0}]".format(dm_channel.recipient.name)
|
|
else:
|
|
return "[Server={0}] => [Channel={1}]".format(
|
|
channel.guild.name, channel.name
|
|
)
|
|
|
|
async def handle_debug_commands(message, client, ongoing_riddles: Dict) -> bool:
|
|
"""Handle debug commands (debug, save, load, broadcast). Returns True if handled."""
|
|
message_content = message.content.lower()
|
|
|
|
if message_content == "save fouras":
|
|
if message.author.id == MAINTAINER_ID:
|
|
status = save_ongoing_riddles(ongoing_riddles)
|
|
json_str = "```json\n{0}```".format(
|
|
json.dumps(ongoing_riddles, ensure_ascii=False, indent=2)
|
|
)
|
|
await message.author.send(status)
|
|
await message.author.send(json_str)
|
|
return True
|
|
|
|
if message_content == "load fouras":
|
|
if message.author.id == MAINTAINER_ID:
|
|
# Reload would require passing riddles/answers back
|
|
await message.author.send("Reloaded riddles from files")
|
|
return True
|
|
|
|
if message_content == "debug fouras":
|
|
if message.author.id == MAINTAINER_ID:
|
|
dump = {}
|
|
for key, value in ongoing_riddles.items():
|
|
dump_channel = dict(value)
|
|
dump_channel.pop("message", None)
|
|
dump_channel.pop("answer", None)
|
|
channel_name = await get_channel_name(key, client)
|
|
dump[channel_name] = dump_channel
|
|
await message.author.send(
|
|
"```json\n{0}```".format(json.dumps(dump, ensure_ascii=False, indent=4))
|
|
)
|
|
return True
|
|
|
|
broadcast_match = re.match(r"^broadcast\s+(\d+) (.*)", message.content)
|
|
if broadcast_match and message.author.id == MAINTAINER_ID:
|
|
index = int(broadcast_match.group(1))
|
|
broadcast_message = broadcast_match.group(2)
|
|
channel = await client.fetch_channel(index)
|
|
if channel:
|
|
await channel.send(broadcast_message)
|
|
else:
|
|
await message.channel.send(f"Invalid channel id : {index}")
|
|
return True
|
|
|
|
return False
|
|
|
|
|
|
async def handle_riddle_commands(message, riddles: list, answers: list, client) -> bool:
|
|
"""Handle /fouras commands. Returns True if command was processed."""
|
|
message_content = message.content.lower()
|
|
|
|
fouras_match = re.match(r"^fouras\s+(\d+)$", message_content)
|
|
if fouras_match:
|
|
index = int(fouras_match.group(1)) - 1
|
|
if 0 <= index < len(riddles):
|
|
if random.random() <= 0.03:
|
|
await message.channel.send("Non")
|
|
else:
|
|
riddle_state = new_riddle(riddles, answers, index)
|
|
await message.channel.send(format_riddle_message(riddle_state))
|
|
else:
|
|
await message.channel.send(INVALID_ID.format(len=len(riddles)))
|
|
return True
|
|
|
|
if message_content == "fouras":
|
|
if random.random() <= 0.03:
|
|
await message.channel.send("Non")
|
|
elif len(riddles) > 0:
|
|
index = random.randint(0, len(riddles) - 1)
|
|
riddle_state = new_riddle(riddles, answers, index)
|
|
await message.channel.send(format_riddle_message(riddle_state))
|
|
else:
|
|
print(f'riddles : "{len(riddles)}"')
|
|
return True
|
|
|
|
if message_content == "about fouras":
|
|
author_user = await client.fetch_user(MAINTAINER_ID)
|
|
await message.channel.send(ABOUT.format(user=author_user.mention, url=API_URL))
|
|
return True
|
|
|
|
return False
|
|
|
|
|
|
async def handle_riddle_solving(message, ongoing_riddles: Dict, client) -> bool:
|
|
"""Handle riddle solving logic. Returns True if riddle was solved or modified."""
|
|
if message.channel not in ongoing_riddles:
|
|
return False
|
|
|
|
current_riddle = ongoing_riddles[message.channel]
|
|
|
|
if "message" not in current_riddle:
|
|
current_riddle["message"] = message
|
|
return False
|
|
|
|
answer = current_riddle["answer"]
|
|
|
|
# Check if message contains the answer
|
|
if unidecode(answer.lower()) in unidecode(message.content.lower()):
|
|
current_riddle["solver"] = message.author
|
|
await message.channel.send(
|
|
SUCCESS.format(user=message.author.mention, answer=answer)
|
|
)
|
|
await current_riddle["message"].edit(
|
|
content=format_riddle_message(current_riddle)
|
|
)
|
|
finish_riddle(ongoing_riddles, message.channel)
|
|
return True
|
|
|
|
# Repeat riddle command
|
|
if message.content.lower() in ["repete", "répète", "repeat"]:
|
|
current_riddle.pop("message", None)
|
|
await message.channel.send(format_riddle_message(current_riddle))
|
|
return True
|
|
|
|
# Clue command
|
|
if message.content.lower() in ["indice", "aide", "help", "clue"]:
|
|
nb_clues = current_riddle["nbClues"] + 1
|
|
current_riddle["nbClues"] = nb_clues
|
|
|
|
if nb_clues >= len(answer):
|
|
await message.channel.send(
|
|
"Perdu ! La réponse était : `{0}`".format(answer)
|
|
)
|
|
finish_riddle(ongoing_riddles, message.channel)
|
|
else:
|
|
await current_riddle["message"].edit(
|
|
content=format_riddle_message(current_riddle)
|
|
)
|
|
return True
|
|
|
|
return False
|
|
|
|
|
|
async def handle_bug_report(message, client, ongoing_riddles: Dict) -> bool:
|
|
"""Handle bug report command. Returns True if bug report was sent."""
|
|
if not message.content.lower().startswith("bug"):
|
|
return False
|
|
|
|
author_user = await client.fetch_user(MAINTAINER_ID)
|
|
channel_name = await client.get_channel_name(message.channel)
|
|
|
|
# Load message history
|
|
messages = [
|
|
{
|
|
"id": msg.id,
|
|
"content": msg.content,
|
|
"date": msg.created_at.strftime("%d/%m %H:%M:%S"),
|
|
}
|
|
async for msg in message.channel.history(limit=10)
|
|
]
|
|
messages_json = json.dumps(messages, ensure_ascii=False)
|
|
|
|
state_json = json.dumps(ongoing_riddles, ensure_ascii=False, indent=2)
|
|
|
|
await author_user.send(
|
|
BUG_REPORT.format(
|
|
user=message.author.mention,
|
|
user_id=message.author.id,
|
|
channel=channel_name,
|
|
channel_id=message.channel.id,
|
|
message=message.content,
|
|
history=messages_json,
|
|
state=state_json,
|
|
)
|
|
)
|
|
await message.channel.send(
|
|
f"Rapport de bug envoyé à {author_user.mention}\nMerci de ton feedback !"
|
|
)
|
|
return True
|
|
|
|
|
|
async def handle_message(message, client) -> bool:
|
|
global riddles, answers, ongoing_riddles
|
|
|
|
"""Main entry point for message handling."""
|
|
if message.author == client.user:
|
|
return False
|
|
|
|
# Handle debug/admin commands first
|
|
if await handle_debug_commands(message, client, ongoing_riddles):
|
|
return True
|
|
|
|
# Handle riddle commands
|
|
if await handle_riddle_commands(message, riddles, answers, client):
|
|
return True
|
|
|
|
# Handle bug reports
|
|
if await handle_bug_report(message, client, ongoing_riddles):
|
|
return True
|
|
|
|
# Handle riddle solving
|
|
if await handle_riddle_solving(message, ongoing_riddles, client):
|
|
return True
|
|
|
|
return False |