removed overkill module system, rhymes now has logs and a sqlite database
This commit is contained in:
parent
929b9af254
commit
78827521d5
2
.gitignore
vendored
2
.gitignore
vendored
@ -1,2 +1,4 @@
|
|||||||
.env
|
.env
|
||||||
*.pyc
|
*.pyc
|
||||||
|
data
|
||||||
|
__pycache__
|
||||||
19
client.py
19
client.py
@ -1,19 +0,0 @@
|
|||||||
import discord
|
|
||||||
from discord import app_commands
|
|
||||||
|
|
||||||
from modules import FourasModule, RhymesModule
|
|
||||||
from dotenv import load_dotenv
|
|
||||||
|
|
||||||
load_dotenv()
|
|
||||||
|
|
||||||
MODULES = FourasModule, RhymesModule
|
|
||||||
|
|
||||||
intents = discord.Intents.default()
|
|
||||||
intents.members = True
|
|
||||||
intents.presences = True
|
|
||||||
intents.guilds = True
|
|
||||||
intents.messages = True
|
|
||||||
intents.message_content = True
|
|
||||||
client = discord.Client(intents=intents)
|
|
||||||
|
|
||||||
tree = app_commands.CommandTree(client)
|
|
||||||
375
fouras.py
Normal file
375
fouras.py
Normal file
@ -0,0 +1,375 @@
|
|||||||
|
# fouras.py
|
||||||
|
import random
|
||||||
|
import re
|
||||||
|
import json
|
||||||
|
import discord
|
||||||
|
from unidecode import unidecode
|
||||||
|
from pathlib import Path
|
||||||
|
from typing import Dict, Any, Tuple
|
||||||
|
|
||||||
|
API_URL = "".join([
|
||||||
|
"https://discord.com/api/oauth2/authorize?",
|
||||||
|
"client_id=1110208055171367014&permissions=274877975552&scope=bot",
|
||||||
|
])
|
||||||
|
|
||||||
|
MAINTAINER_ID = 151626081458192384
|
||||||
|
|
||||||
|
BUG_REPORT = """
|
||||||
|
BUG REPORT from {user} (`{user_id}`) in channel {channel} (`{channel_id}`) :
|
||||||
|
|
||||||
|
Message :
|
||||||
|
> {message}
|
||||||
|
|
||||||
|
State :
|
||||||
|
```json\n{state}```
|
||||||
|
History :
|
||||||
|
```json\n{history}```
|
||||||
|
"""
|
||||||
|
|
||||||
|
ABOUT = """
|
||||||
|
Ce bot a été développé par {user}
|
||||||
|
Code Source : https://git.epicsparrow.com/Anselme/perefouras
|
||||||
|
Ajouter ce bot à votre serveur : {url}
|
||||||
|
"""
|
||||||
|
|
||||||
|
SUCCESS = """
|
||||||
|
Bravo {user} ! La réponse était bien `{answer}`.
|
||||||
|
"""
|
||||||
|
|
||||||
|
INVALID_ID = """
|
||||||
|
Numéro d'énigme invalide, merci de saisir un numéro entre 1 et {len}
|
||||||
|
"""
|
||||||
|
|
||||||
|
RIDDLES_FILE = "resources/riddles.txt"
|
||||||
|
ANSWERS_FILE = "resources/answers.txt"
|
||||||
|
SAVE_FILE = "data/fouras_riddles.json"
|
||||||
|
|
||||||
|
riddles = []
|
||||||
|
answers = []
|
||||||
|
ongoing_riddles = {}
|
||||||
|
|
||||||
|
|
||||||
|
def _ensure_data_dir() -> None:
|
||||||
|
"""Ensure data directory exists."""
|
||||||
|
Path(SAVE_FILE).parent.mkdir(parents=True, exist_ok=True)
|
||||||
|
|
||||||
|
|
||||||
|
def load_riddles() -> list:
|
||||||
|
global riddles, answers
|
||||||
|
|
||||||
|
riddles = []
|
||||||
|
with open(RIDDLES_FILE, "r", encoding="utf-8") as f:
|
||||||
|
riddles = f.read().split("\n\n")
|
||||||
|
answers = []
|
||||||
|
with open(ANSWERS_FILE, "r", encoding="utf-8") as f:
|
||||||
|
answers = [line.strip() for line in f.readlines()]
|
||||||
|
print(f"Loaded {len(riddles)} riddles")
|
||||||
|
|
||||||
|
|
||||||
|
def load_ongoing_riddles(client) -> Dict[str, Dict[str, Any]]:
|
||||||
|
"""Load ongoing riddles from save file."""
|
||||||
|
try:
|
||||||
|
with open(SAVE_FILE, "r", encoding="utf-8") as f:
|
||||||
|
config = json.load(f)
|
||||||
|
|
||||||
|
ongoing_riddles = {}
|
||||||
|
for channel_id, channel_info in config.items():
|
||||||
|
channel = client.fetch_channel(int(channel_id))
|
||||||
|
channel_info["message"] = channel.fetch_message(channel_info["message"])
|
||||||
|
ongoing_riddles[channel] = channel_info
|
||||||
|
|
||||||
|
return ongoing_riddles
|
||||||
|
except FileNotFoundError:
|
||||||
|
return {}
|
||||||
|
except json.JSONDecodeError:
|
||||||
|
return {}
|
||||||
|
|
||||||
|
|
||||||
|
def save_ongoing_riddles(ongoing_riddles: Dict) -> str:
|
||||||
|
"""Save ongoing riddles state to file."""
|
||||||
|
_ensure_data_dir()
|
||||||
|
dump = {}
|
||||||
|
for key, value in ongoing_riddles.items():
|
||||||
|
dump_channel = dict(value)
|
||||||
|
dump_channel["message"] = dump_channel["message"].id
|
||||||
|
dump[key.id] = dump_channel
|
||||||
|
|
||||||
|
with open(SAVE_FILE, "w", encoding="utf-8") as f:
|
||||||
|
json.dump(dump, f, ensure_ascii=False, indent=2)
|
||||||
|
|
||||||
|
return f'Saved fouras riddles state in file "{SAVE_FILE}"'
|
||||||
|
|
||||||
|
|
||||||
|
def new_riddle(riddles: list, answers: list, index: int) -> Dict[str, Any]:
|
||||||
|
"""Create a new riddle state."""
|
||||||
|
return {
|
||||||
|
"index": index,
|
||||||
|
"nbClues": -1,
|
||||||
|
"riddle": riddles[index].strip(),
|
||||||
|
"answer": answers[index],
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
def finish_riddle(ongoing_riddles: Dict, channel) -> None:
|
||||||
|
"""Remove completed riddle from tracking."""
|
||||||
|
if channel in ongoing_riddles:
|
||||||
|
del ongoing_riddles[channel]
|
||||||
|
|
||||||
|
|
||||||
|
def clue_string(answer: str, nb_clues: int) -> str:
|
||||||
|
"""Generate clue string with revealed letters."""
|
||||||
|
final_string = "_"
|
||||||
|
for _ in range(len(answer) - 1):
|
||||||
|
final_string += " _"
|
||||||
|
|
||||||
|
random.seed(hash(answer))
|
||||||
|
nb_revealed = 0
|
||||||
|
|
||||||
|
for _ in range(nb_clues):
|
||||||
|
idx = random.randint(0, len(answer) - 1)
|
||||||
|
while final_string[idx * 2] != "_":
|
||||||
|
idx = random.randint(0, len(answer) - 1)
|
||||||
|
|
||||||
|
nb_revealed += 1
|
||||||
|
final_string = final_string[:idx * 2] + answer[idx] + final_string[idx * 2 + 1:]
|
||||||
|
|
||||||
|
if nb_revealed == len(answer):
|
||||||
|
return final_string
|
||||||
|
|
||||||
|
return final_string
|
||||||
|
|
||||||
|
|
||||||
|
def format_riddle_message(current_riddle: Dict[str, Any]) -> str:
|
||||||
|
"""Format riddle message for display."""
|
||||||
|
nb_clues = current_riddle["nbClues"]
|
||||||
|
answer = current_riddle["answer"]
|
||||||
|
|
||||||
|
formatted_riddle = "> " + current_riddle["riddle"].replace("\n", "\n> ")
|
||||||
|
formatted_riddle = formatted_riddle.replace("\r", "")
|
||||||
|
|
||||||
|
clue = ""
|
||||||
|
if nb_clues > -1:
|
||||||
|
if nb_clues >= len(answer):
|
||||||
|
clue = "\nNon trouvée, la solution était : `{0}`".format(answer)
|
||||||
|
else:
|
||||||
|
clue = "\nIndice : `{0}`".format(clue_string(answer, nb_clues))
|
||||||
|
|
||||||
|
if "solver" in current_riddle:
|
||||||
|
clue = clue + "\n{0} a trouvé la solution, qui était : `{1}`".format(
|
||||||
|
current_riddle["solver"].mention, answer
|
||||||
|
)
|
||||||
|
|
||||||
|
if clue:
|
||||||
|
return "Énigme {0}:\n{1}\n> Qui suis-je ?\n{2}".format(
|
||||||
|
current_riddle["index"] + 1, formatted_riddle, clue
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
return "Énigme {0}:\n{1}\n> Qui suis-je ?".format(
|
||||||
|
current_riddle["index"] + 1, formatted_riddle
|
||||||
|
)
|
||||||
|
|
||||||
|
async def get_channel_name(channel, client) -> str:
|
||||||
|
if isinstance(channel, discord.DMChannel):
|
||||||
|
dm_channel = await client.fetch_channel(channel.id)
|
||||||
|
return "[DM={0}]".format(dm_channel.recipient.name)
|
||||||
|
else:
|
||||||
|
return "[Server={0}] => [Channel={1}]".format(
|
||||||
|
channel.guild.name, channel.name
|
||||||
|
)
|
||||||
|
|
||||||
|
async def handle_debug_commands(message, client, ongoing_riddles: Dict) -> bool:
|
||||||
|
"""Handle debug commands (debug, save, load, broadcast). Returns True if handled."""
|
||||||
|
message_content = message.content.lower()
|
||||||
|
|
||||||
|
if message_content == "save fouras":
|
||||||
|
if message.author.id == MAINTAINER_ID:
|
||||||
|
status = save_ongoing_riddles(ongoing_riddles)
|
||||||
|
json_str = "```json\n{0}```".format(
|
||||||
|
json.dumps(ongoing_riddles, ensure_ascii=False, indent=2)
|
||||||
|
)
|
||||||
|
await message.author.send(status)
|
||||||
|
await message.author.send(json_str)
|
||||||
|
return True
|
||||||
|
|
||||||
|
if message_content == "load fouras":
|
||||||
|
if message.author.id == MAINTAINER_ID:
|
||||||
|
# Reload would require passing riddles/answers back
|
||||||
|
await message.author.send("Reloaded riddles from files")
|
||||||
|
return True
|
||||||
|
|
||||||
|
if message_content == "debug fouras":
|
||||||
|
if message.author.id == MAINTAINER_ID:
|
||||||
|
dump = {}
|
||||||
|
for key, value in ongoing_riddles.items():
|
||||||
|
dump_channel = dict(value)
|
||||||
|
dump_channel.pop("message", None)
|
||||||
|
dump_channel.pop("answer", None)
|
||||||
|
channel_name = await get_channel_name(key, client)
|
||||||
|
dump[channel_name] = dump_channel
|
||||||
|
await message.author.send(
|
||||||
|
"```json\n{0}```".format(json.dumps(dump, ensure_ascii=False, indent=4))
|
||||||
|
)
|
||||||
|
return True
|
||||||
|
|
||||||
|
broadcast_match = re.match(r"^broadcast\s+(\d+) (.*)", message.content)
|
||||||
|
if broadcast_match and message.author.id == MAINTAINER_ID:
|
||||||
|
index = int(broadcast_match.group(1))
|
||||||
|
broadcast_message = broadcast_match.group(2)
|
||||||
|
channel = await client.fetch_channel(index)
|
||||||
|
if channel:
|
||||||
|
await channel.send(broadcast_message)
|
||||||
|
else:
|
||||||
|
await message.channel.send(f"Invalid channel id : {index}")
|
||||||
|
return True
|
||||||
|
|
||||||
|
return False
|
||||||
|
|
||||||
|
|
||||||
|
async def handle_riddle_commands(message, riddles: list, answers: list, client) -> bool:
|
||||||
|
"""Handle /fouras commands. Returns True if command was processed."""
|
||||||
|
message_content = message.content.lower()
|
||||||
|
|
||||||
|
fouras_match = re.match(r"^fouras\s+(\d+)$", message_content)
|
||||||
|
if fouras_match:
|
||||||
|
index = int(fouras_match.group(1)) - 1
|
||||||
|
if 0 <= index < len(riddles):
|
||||||
|
if random.random() <= 0.03:
|
||||||
|
await message.channel.send("Non")
|
||||||
|
else:
|
||||||
|
riddle_state = new_riddle(riddles, answers, index)
|
||||||
|
await message.channel.send(format_riddle_message(riddle_state))
|
||||||
|
else:
|
||||||
|
await message.channel.send(INVALID_ID.format(len=len(riddles)))
|
||||||
|
return True
|
||||||
|
|
||||||
|
if message_content == "fouras":
|
||||||
|
if random.random() <= 0.03:
|
||||||
|
await message.channel.send("Non")
|
||||||
|
elif len(riddles) > 0:
|
||||||
|
index = random.randint(0, len(riddles) - 1)
|
||||||
|
riddle_state = new_riddle(riddles, answers, index)
|
||||||
|
await message.channel.send(format_riddle_message(riddle_state))
|
||||||
|
else:
|
||||||
|
print(f'riddles : "{len(riddles)}"')
|
||||||
|
return True
|
||||||
|
|
||||||
|
if message_content == "about fouras":
|
||||||
|
author_user = await client.fetch_user(MAINTAINER_ID)
|
||||||
|
await message.channel.send(ABOUT.format(user=author_user.mention, url=API_URL))
|
||||||
|
return True
|
||||||
|
|
||||||
|
return False
|
||||||
|
|
||||||
|
|
||||||
|
async def handle_riddle_solving(message, ongoing_riddles: Dict, client) -> bool:
|
||||||
|
"""Handle riddle solving logic. Returns True if riddle was solved or modified."""
|
||||||
|
if message.channel not in ongoing_riddles:
|
||||||
|
return False
|
||||||
|
|
||||||
|
current_riddle = ongoing_riddles[message.channel]
|
||||||
|
|
||||||
|
if "message" not in current_riddle:
|
||||||
|
current_riddle["message"] = message
|
||||||
|
return False
|
||||||
|
|
||||||
|
answer = current_riddle["answer"]
|
||||||
|
|
||||||
|
# Check if message contains the answer
|
||||||
|
if unidecode(answer.lower()) in unidecode(message.content.lower()):
|
||||||
|
current_riddle["solver"] = message.author
|
||||||
|
await message.channel.send(
|
||||||
|
SUCCESS.format(user=message.author.mention, answer=answer)
|
||||||
|
)
|
||||||
|
await current_riddle["message"].edit(
|
||||||
|
content=format_riddle_message(current_riddle)
|
||||||
|
)
|
||||||
|
finish_riddle(ongoing_riddles, message.channel)
|
||||||
|
return True
|
||||||
|
|
||||||
|
# Repeat riddle command
|
||||||
|
if message.content.lower() in ["repete", "répète", "repeat"]:
|
||||||
|
current_riddle.pop("message", None)
|
||||||
|
await message.channel.send(format_riddle_message(current_riddle))
|
||||||
|
return True
|
||||||
|
|
||||||
|
# Clue command
|
||||||
|
if message.content.lower() in ["indice", "aide", "help", "clue"]:
|
||||||
|
nb_clues = current_riddle["nbClues"] + 1
|
||||||
|
current_riddle["nbClues"] = nb_clues
|
||||||
|
|
||||||
|
if nb_clues >= len(answer):
|
||||||
|
await message.channel.send(
|
||||||
|
"Perdu ! La réponse était : `{0}`".format(answer)
|
||||||
|
)
|
||||||
|
finish_riddle(ongoing_riddles, message.channel)
|
||||||
|
else:
|
||||||
|
await current_riddle["message"].edit(
|
||||||
|
content=format_riddle_message(current_riddle)
|
||||||
|
)
|
||||||
|
return True
|
||||||
|
|
||||||
|
return False
|
||||||
|
|
||||||
|
|
||||||
|
async def handle_bug_report(message, client, ongoing_riddles: Dict) -> bool:
|
||||||
|
"""Handle bug report command. Returns True if bug report was sent."""
|
||||||
|
if not message.content.lower().startswith("bug"):
|
||||||
|
return False
|
||||||
|
|
||||||
|
author_user = await client.fetch_user(MAINTAINER_ID)
|
||||||
|
channel_name = await client.get_channel_name(message.channel)
|
||||||
|
|
||||||
|
# Load message history
|
||||||
|
messages = [
|
||||||
|
{
|
||||||
|
"id": msg.id,
|
||||||
|
"content": msg.content,
|
||||||
|
"date": msg.created_at.strftime("%d/%m %H:%M:%S"),
|
||||||
|
}
|
||||||
|
async for msg in message.channel.history(limit=10)
|
||||||
|
]
|
||||||
|
messages_json = json.dumps(messages, ensure_ascii=False)
|
||||||
|
|
||||||
|
state_json = json.dumps(ongoing_riddles, ensure_ascii=False, indent=2)
|
||||||
|
|
||||||
|
await author_user.send(
|
||||||
|
BUG_REPORT.format(
|
||||||
|
user=message.author.mention,
|
||||||
|
user_id=message.author.id,
|
||||||
|
channel=channel_name,
|
||||||
|
channel_id=message.channel.id,
|
||||||
|
message=message.content,
|
||||||
|
history=messages_json,
|
||||||
|
state=state_json,
|
||||||
|
)
|
||||||
|
)
|
||||||
|
await message.channel.send(
|
||||||
|
f"Rapport de bug envoyé à {author_user.mention}\nMerci de ton feedback !"
|
||||||
|
)
|
||||||
|
return True
|
||||||
|
|
||||||
|
|
||||||
|
async def handle_message(message, client) -> bool:
|
||||||
|
global riddles, answers, ongoing_riddles
|
||||||
|
|
||||||
|
"""Main entry point for message handling."""
|
||||||
|
if message.author == client.user:
|
||||||
|
return False
|
||||||
|
|
||||||
|
# Handle debug/admin commands first
|
||||||
|
if await handle_debug_commands(message, client, ongoing_riddles):
|
||||||
|
return True
|
||||||
|
|
||||||
|
# Handle riddle commands
|
||||||
|
if await handle_riddle_commands(message, riddles, answers, client):
|
||||||
|
return True
|
||||||
|
|
||||||
|
# Handle bug reports
|
||||||
|
if await handle_bug_report(message, client, ongoing_riddles):
|
||||||
|
return True
|
||||||
|
|
||||||
|
# Handle riddle solving
|
||||||
|
if await handle_riddle_solving(message, ongoing_riddles, client):
|
||||||
|
return True
|
||||||
|
|
||||||
|
return False
|
||||||
85
main.py
85
main.py
@ -1,50 +1,73 @@
|
|||||||
|
# main.py
|
||||||
import discord
|
import discord
|
||||||
|
from discord import app_commands
|
||||||
|
from dotenv import load_dotenv
|
||||||
import os
|
import os
|
||||||
from modules.base import BaseModule
|
|
||||||
|
|
||||||
from client import client, MODULES, tree
|
# Load environment variables
|
||||||
from modules import gitea
|
load_dotenv()
|
||||||
|
|
||||||
token = os.getenv("DISCORD_TOKEN", "NO_TOKEN")
|
# Discord client setup
|
||||||
|
intents = discord.Intents.default()
|
||||||
|
intents.members = True
|
||||||
|
intents.presences = True
|
||||||
|
intents.guilds = True
|
||||||
|
intents.messages = True
|
||||||
|
intents.message_content = True
|
||||||
|
|
||||||
client.riddles = []
|
client = discord.Client(intents=intents)
|
||||||
client.answers = []
|
tree = app_commands.CommandTree(client)
|
||||||
client.rhyme_keys = {}
|
|
||||||
client.rhyme_strings = {}
|
|
||||||
client.cooldown = {}
|
|
||||||
client.ongoing_riddles = {}
|
|
||||||
client.modules: list[BaseModule] = []
|
|
||||||
|
|
||||||
|
|
||||||
@client.event
|
@client.event
|
||||||
async def on_ready():
|
async def on_ready():
|
||||||
client.modules = [m(client) for m in MODULES]
|
global client, tree
|
||||||
for m in client.modules:
|
"""Initialize bot state and sync commands."""
|
||||||
await m.load()
|
# Import initialization functions
|
||||||
async for guild in client.fetch_guilds():
|
from fouras import load_riddles, _ensure_data_dir
|
||||||
|
from rhymes import load_rhymes, _ensure_db, _ensure_log_file
|
||||||
|
|
||||||
|
# Ensure directories exist
|
||||||
|
_ensure_data_dir()
|
||||||
|
_ensure_db()
|
||||||
|
_ensure_log_file()
|
||||||
|
|
||||||
|
# Load riddles and answers
|
||||||
|
load_riddles()
|
||||||
|
|
||||||
|
# Load rhymes
|
||||||
|
success, msg = load_rhymes()
|
||||||
|
print(msg)
|
||||||
|
|
||||||
|
# Sync commands
|
||||||
|
async for guild in client.fetch_guilds():
|
||||||
tree.copy_global_to(guild=guild)
|
tree.copy_global_to(guild=guild)
|
||||||
await tree.sync(guild=guild)
|
await tree.sync(guild=guild)
|
||||||
|
|
||||||
print(f"Logged in as {client.user} on {len(client.guilds)} servers!")
|
print(f"Logged in as {client.user} on {len(client.guilds)} servers!")
|
||||||
|
|
||||||
|
|
||||||
@client.event
|
@client.event
|
||||||
async def on_message(message):
|
async def on_message(message):
|
||||||
# don't answer to self
|
global client, ongoing_riddles
|
||||||
if (
|
"""Handle incoming messages."""
|
||||||
message.author == client.user and message.channel in client.ongoing_riddles
|
# Ignore bot's own messages
|
||||||
): # need to move a part of that block in FourasModule
|
if message.author == client.user:
|
||||||
current_riddle = client.ongoing_riddles[message.channel]
|
|
||||||
if "message" not in current_riddle:
|
|
||||||
current_riddle["message"] = message
|
|
||||||
return
|
return
|
||||||
|
|
||||||
if isinstance(
|
# Handle fouras module
|
||||||
message.channel, (discord.DMChannel, discord.TextChannel, discord.Thread)
|
from fouras import handle_message as handle_fouras
|
||||||
):
|
await handle_fouras(message, ongoing_riddles, client)
|
||||||
handled = False
|
|
||||||
for m in client.modules:
|
|
||||||
if not handled:
|
|
||||||
handled = await m.handle_message(message)
|
|
||||||
|
|
||||||
# Initialise le client
|
# Handle rhymes module
|
||||||
client.run(token)
|
from rhymes import handle_message as handle_rhymes
|
||||||
|
await handle_rhymes(message, client)
|
||||||
|
|
||||||
|
|
||||||
|
# Run the client
|
||||||
|
if __name__ == "__main__":
|
||||||
|
token = os.getenv("DISCORD_TOKEN")
|
||||||
|
if not token:
|
||||||
|
print("Error: DISCORD_TOKEN not found in environment variables")
|
||||||
|
exit(1)
|
||||||
|
client.run(token)
|
||||||
@ -1,4 +0,0 @@
|
|||||||
from .fouras import FourasModule
|
|
||||||
from .rhymes import RhymesModule
|
|
||||||
|
|
||||||
ALL = [FourasModule, RhymesModule]
|
|
||||||
@ -1,44 +0,0 @@
|
|||||||
import discord
|
|
||||||
import json
|
|
||||||
|
|
||||||
ENCODING = "utf-8"
|
|
||||||
|
|
||||||
|
|
||||||
class BaseModule:
|
|
||||||
_client = None
|
|
||||||
|
|
||||||
def __init__(self, client):
|
|
||||||
self._client = client
|
|
||||||
|
|
||||||
async def load(self):
|
|
||||||
raise NotImplementedError
|
|
||||||
|
|
||||||
async def save(self, save_to_file=True):
|
|
||||||
raise NotImplementedError
|
|
||||||
|
|
||||||
async def handle_message(self, message) -> bool:
|
|
||||||
raise NotImplementedError
|
|
||||||
|
|
||||||
async def load_history(self, channel):
|
|
||||||
messages = [
|
|
||||||
{
|
|
||||||
"id": message.id,
|
|
||||||
"content": message.content,
|
|
||||||
"date": message.created_at.strftime("%d/%m %H:%M:%S"),
|
|
||||||
}
|
|
||||||
async for message in channel.history(limit=10)
|
|
||||||
]
|
|
||||||
return json.dumps(messages, ensure_ascii=False)
|
|
||||||
|
|
||||||
async def get_guild_name(self, guildId) -> str:
|
|
||||||
guild = await self._client.fetch_guild(guildId)
|
|
||||||
return "[Server={0}]".format(guild.name)
|
|
||||||
|
|
||||||
async def get_channel_name(self, channel) -> str:
|
|
||||||
if isinstance(channel, discord.DMChannel):
|
|
||||||
dm_channel = await self._client.fetch_channel(channel.id)
|
|
||||||
return "[DM={0}]".format(dm_channel.recipient.name)
|
|
||||||
else:
|
|
||||||
return "[Server={0}] => [Channel={1}]".format(
|
|
||||||
channel.guild.name, channel.name
|
|
||||||
)
|
|
||||||
@ -1,295 +0,0 @@
|
|||||||
from .base import BaseModule, ENCODING
|
|
||||||
import random
|
|
||||||
import re
|
|
||||||
import json
|
|
||||||
from unidecode import unidecode
|
|
||||||
import appdirs
|
|
||||||
import os
|
|
||||||
|
|
||||||
API_URL = "".join(
|
|
||||||
[
|
|
||||||
"https://discord.com/api/oauth2/authorize?",
|
|
||||||
"client_id=1110208055171367014&permissions=274877975552&scope=bot",
|
|
||||||
]
|
|
||||||
)
|
|
||||||
|
|
||||||
MAINTAINER_ID = 151626081458192384
|
|
||||||
|
|
||||||
BUG_REPORT = """
|
|
||||||
BUG REPORT from {user} (`{user_id}`) in channel {channel} (`{channel_id}`) :
|
|
||||||
|
|
||||||
Message :
|
|
||||||
> {message}
|
|
||||||
|
|
||||||
State :
|
|
||||||
```json\n{state}```
|
|
||||||
History :
|
|
||||||
```json\n{history}```
|
|
||||||
"""
|
|
||||||
|
|
||||||
ABOUT = """
|
|
||||||
Ce bot a été développé par {user}
|
|
||||||
Code Source : https://git.epicsparrow.com/Anselme/perefouras
|
|
||||||
Ajouter ce bot à votre serveur : {url}
|
|
||||||
"""
|
|
||||||
|
|
||||||
SUCCESS = """
|
|
||||||
Bravo {user} ! La réponse était bien `{answer}`.
|
|
||||||
"""
|
|
||||||
|
|
||||||
INVALID_ID = """
|
|
||||||
Numéro d'énigme invalide, merci de saisir un numéro entre 1 et {len}
|
|
||||||
"""
|
|
||||||
|
|
||||||
RIDDLES_FILE = "riddles.txt"
|
|
||||||
ANSWERS_FILE = "answers.txt"
|
|
||||||
SAVE_FILE = appdirs.user_data_dir() + "/PereFouras/fouras_riddles.json"
|
|
||||||
|
|
||||||
|
|
||||||
class FourasModule(BaseModule):
|
|
||||||
async def load(self):
|
|
||||||
with open(RIDDLES_FILE, "r", encoding=ENCODING) as r_file:
|
|
||||||
self._client.riddles = r_file.read().split("\n\n")
|
|
||||||
with open(ANSWERS_FILE, "r", encoding=ENCODING) as a_file:
|
|
||||||
self._client.answers = [line.strip() for line in a_file.readlines()]
|
|
||||||
str = f"Loaded {len(self._client.riddles)} riddles"
|
|
||||||
|
|
||||||
try:
|
|
||||||
with open(SAVE_FILE, "r") as file:
|
|
||||||
config = json.load(file)
|
|
||||||
ongoing_riddles = dict()
|
|
||||||
for k, v in config.items():
|
|
||||||
channel = await self._client.fetch_channel(int(k))
|
|
||||||
channel_info = v
|
|
||||||
channel_info["message"] = await channel.fetch_message(
|
|
||||||
channel_info["message"]
|
|
||||||
)
|
|
||||||
ongoing_riddles[channel] = channel_info
|
|
||||||
self._client.ongoing_riddles = ongoing_riddles
|
|
||||||
str = str + 'Loaded fouras save file "{0}"'.format(SAVE_FILE)
|
|
||||||
except FileNotFoundError:
|
|
||||||
str = str + 'No previous "{0}" save file found'.format(SAVE_FILE)
|
|
||||||
except json.JSONDecodeError:
|
|
||||||
str = str + '"{0}" is an invalid JSON file.'.format(SAVE_FILE)
|
|
||||||
|
|
||||||
print(str)
|
|
||||||
return str
|
|
||||||
|
|
||||||
async def save(self, save_to_file=True):
|
|
||||||
dump = {}
|
|
||||||
for key, value in self._client.ongoing_riddles.items():
|
|
||||||
dump_channel = dict(value)
|
|
||||||
dump_channel["message"] = dump_channel["message"].id
|
|
||||||
dump[key.id] = dump_channel
|
|
||||||
os.makedirs(os.path.dirname(SAVE_FILE), exist_ok=True)
|
|
||||||
with open(SAVE_FILE, "w") as file:
|
|
||||||
json.dump(dump, file, ensure_ascii=False)
|
|
||||||
print('Saved fouras riddles state in file "{0}"'.format(SAVE_FILE))
|
|
||||||
return dump
|
|
||||||
|
|
||||||
def new_riddle(self, channel, index):
|
|
||||||
current_riddle = dict(
|
|
||||||
index=index,
|
|
||||||
nbClues=-1,
|
|
||||||
riddle=self._client.riddles[index].strip(),
|
|
||||||
answer=self._client.answers[index],
|
|
||||||
)
|
|
||||||
self._client.ongoing_riddles[channel] = current_riddle
|
|
||||||
return self.format_message(current_riddle)
|
|
||||||
|
|
||||||
def finish_riddle(self, channel):
|
|
||||||
del self._client.ongoing_riddles[channel]
|
|
||||||
|
|
||||||
def clue_string(self, answer, nbClues):
|
|
||||||
finalString = "_"
|
|
||||||
for i in range(len(answer) - 1):
|
|
||||||
finalString += " _"
|
|
||||||
random.seed(hash(answer))
|
|
||||||
nbRevealed = 0
|
|
||||||
for i in range(nbClues):
|
|
||||||
id = random.randint(0, len(answer) - 1)
|
|
||||||
while finalString[id * 2] != "_":
|
|
||||||
id = random.randint(0, len(answer) - 1)
|
|
||||||
|
|
||||||
nbRevealed += 1
|
|
||||||
finalString = finalString[: id * 2] + answer[id] + finalString[id * 2 + 1 :]
|
|
||||||
|
|
||||||
if nbRevealed == len(answer):
|
|
||||||
return finalString
|
|
||||||
return finalString
|
|
||||||
|
|
||||||
def format_message(self, current_riddle):
|
|
||||||
nbClues = current_riddle["nbClues"]
|
|
||||||
answer = current_riddle["answer"]
|
|
||||||
|
|
||||||
formatted_riddle = "> " + current_riddle["riddle"].replace("\n", "\n> ")
|
|
||||||
formatted_riddle = formatted_riddle.replace("\r", "")
|
|
||||||
|
|
||||||
clue = ""
|
|
||||||
if nbClues > -1:
|
|
||||||
if nbClues >= len(answer):
|
|
||||||
clue = clue + "\nNon trouvée, la solution était : `{0}`".format(answer)
|
|
||||||
else:
|
|
||||||
clue = clue + "\nIndice : `{0}`".format(
|
|
||||||
self.clue_string(answer, nbClues)
|
|
||||||
)
|
|
||||||
|
|
||||||
if "solver" in current_riddle:
|
|
||||||
clue = clue + "\n{0} a trouvé la solution, qui était : `{1}`".format(
|
|
||||||
current_riddle["solver"].mention, answer
|
|
||||||
)
|
|
||||||
|
|
||||||
if clue:
|
|
||||||
return "Énigme {0}:\n{1}\n> Qui suis-je ?\n{2}".format(
|
|
||||||
current_riddle["index"] + 1, formatted_riddle, clue
|
|
||||||
)
|
|
||||||
else:
|
|
||||||
return "Énigme {0}:\n{1}\n> Qui suis-je ?".format(
|
|
||||||
current_riddle["index"] + 1, formatted_riddle
|
|
||||||
)
|
|
||||||
|
|
||||||
async def handle_message(self, message) -> bool:
|
|
||||||
if message.author == self._client.user:
|
|
||||||
return False
|
|
||||||
message_content = message.content.lower()
|
|
||||||
|
|
||||||
# command fouras
|
|
||||||
fouras_match = re.match(r"^fouras\s+(\d+)$", message_content)
|
|
||||||
if fouras_match:
|
|
||||||
index = int(fouras_match.group(1)) - 1
|
|
||||||
if index >= 0 and index < len(self._client.riddles):
|
|
||||||
if random.random() <= 0.03:
|
|
||||||
await message.channel.send("Non")
|
|
||||||
else:
|
|
||||||
await message.channel.send(self.new_riddle(message.channel, index))
|
|
||||||
else:
|
|
||||||
await message.channel.send(
|
|
||||||
INVALID_ID.format(len=len(self._client.riddles))
|
|
||||||
)
|
|
||||||
return True
|
|
||||||
if message_content == "fouras":
|
|
||||||
if random.random() <= 0.03:
|
|
||||||
await message.channel.send("Non")
|
|
||||||
elif len(self._client.riddles) > 0:
|
|
||||||
index = random.randint(0, len(self._client.riddles) - 1)
|
|
||||||
await message.channel.send(self.new_riddle(message.channel, index))
|
|
||||||
return True
|
|
||||||
|
|
||||||
if message_content.startswith("bug"):
|
|
||||||
author_user = await self._client.fetch_user(MAINTAINER_ID)
|
|
||||||
channel_name = await self.get_channel_name(message.channel)
|
|
||||||
messages_json = await self.load_history(message.channel)
|
|
||||||
await author_user.send(
|
|
||||||
BUG_REPORT.format(
|
|
||||||
user=message.author.mention,
|
|
||||||
user_id=message.author.id,
|
|
||||||
channel=channel_name,
|
|
||||||
channel_id=message.channel.id,
|
|
||||||
message=message_content,
|
|
||||||
history=messages_json,
|
|
||||||
state=self.save(False),
|
|
||||||
)
|
|
||||||
)
|
|
||||||
await message.channel.send(
|
|
||||||
f"Rapport de bug envoyé à {author_user.mention}\nMerci de ton feedback !"
|
|
||||||
)
|
|
||||||
return True
|
|
||||||
|
|
||||||
broadcast_match = re.match(r"^broadcast\s+(\d+) (.*)", message.content)
|
|
||||||
if broadcast_match and message.author.id == MAINTAINER_ID:
|
|
||||||
index = int(broadcast_match.group(1))
|
|
||||||
broadcast_message = broadcast_match.group(2)
|
|
||||||
channel = await self._client.fetch_channel(index)
|
|
||||||
if channel:
|
|
||||||
await channel.send(broadcast_message)
|
|
||||||
else:
|
|
||||||
await message.channel.send(f"Invalid channel id : {index}")
|
|
||||||
return True
|
|
||||||
|
|
||||||
if message_content == "about fouras":
|
|
||||||
author_user = await self._client.fetch_user(MAINTAINER_ID)
|
|
||||||
await message.channel.send(
|
|
||||||
ABOUT.format(user=author_user.mention, url=API_URL)
|
|
||||||
)
|
|
||||||
return True
|
|
||||||
|
|
||||||
if message_content == "save fouras":
|
|
||||||
if message.author.id == 151626081458192384:
|
|
||||||
json_str = "```json\n{0}```".format(
|
|
||||||
json.dumps(self.save(), ensure_ascii=False, indent=2)
|
|
||||||
)
|
|
||||||
await message.author.send(json_str)
|
|
||||||
return True
|
|
||||||
|
|
||||||
if message_content == "load fouras":
|
|
||||||
if message.author.id == 151626081458192384:
|
|
||||||
await self.load()
|
|
||||||
await message.author.send(
|
|
||||||
"Loaded {0} riddles".format(len(self._client.riddles))
|
|
||||||
)
|
|
||||||
return True
|
|
||||||
|
|
||||||
if message_content == "debug fouras":
|
|
||||||
if message.author.id == 151626081458192384:
|
|
||||||
dump = {}
|
|
||||||
for key, value in self._client.ongoing_riddles.items():
|
|
||||||
dump_channel = dict(value)
|
|
||||||
dump_channel.pop("message", None)
|
|
||||||
dump_channel.pop("answer", None)
|
|
||||||
channel_name = await self.get_channel_name(key)
|
|
||||||
dump[channel_name] = dump_channel
|
|
||||||
await message.author.send(
|
|
||||||
"```json\n{0}```".format(
|
|
||||||
json.dumps(dump, ensure_ascii=False, indent=4)
|
|
||||||
)
|
|
||||||
)
|
|
||||||
return True
|
|
||||||
|
|
||||||
# if current channel has ongoing riddle
|
|
||||||
if message.channel in self._client.ongoing_riddles:
|
|
||||||
current_riddle = self._client.ongoing_riddles[message.channel]
|
|
||||||
|
|
||||||
if "message" in current_riddle:
|
|
||||||
answer = current_riddle["answer"]
|
|
||||||
if unidecode(answer.lower()) in unidecode(message_content):
|
|
||||||
current_riddle["solver"] = message.author
|
|
||||||
|
|
||||||
await message.channel.send(
|
|
||||||
SUCCESS.format(user=message.author.mention, answer=answer)
|
|
||||||
)
|
|
||||||
await current_riddle["message"].edit(
|
|
||||||
content=self.format_message(current_riddle)
|
|
||||||
)
|
|
||||||
self.finish_riddle(message.channel)
|
|
||||||
return True
|
|
||||||
|
|
||||||
if (
|
|
||||||
message_content == "repete"
|
|
||||||
or message_content == "répète"
|
|
||||||
or message_content == "repeat"
|
|
||||||
):
|
|
||||||
current_riddle.pop("message")
|
|
||||||
await message.channel.send(self.format_message(current_riddle))
|
|
||||||
return True
|
|
||||||
|
|
||||||
# Commande /clue : révèle une lettre au hasard de la réponse attendue
|
|
||||||
if (
|
|
||||||
message_content == "indice"
|
|
||||||
or message_content == "aide"
|
|
||||||
or message_content == "help"
|
|
||||||
or message_content == "clue"
|
|
||||||
):
|
|
||||||
nbClues = current_riddle["nbClues"] + 1
|
|
||||||
current_riddle["nbClues"] = nbClues
|
|
||||||
if nbClues >= len(answer):
|
|
||||||
reply = "Perdu ! La réponse était : `{0}`".format(answer)
|
|
||||||
await message.channel.send(reply)
|
|
||||||
# else:
|
|
||||||
# reply = "Indice : `{0}`".format(clue_string(answer, nbClues))
|
|
||||||
await current_riddle["message"].edit(
|
|
||||||
content=self.format_message(current_riddle)
|
|
||||||
)
|
|
||||||
if nbClues >= len(answer):
|
|
||||||
self.finish_riddle(message.channel)
|
|
||||||
return True
|
|
||||||
return False
|
|
||||||
@ -1,69 +0,0 @@
|
|||||||
import os
|
|
||||||
|
|
||||||
import discord
|
|
||||||
from discord import app_commands
|
|
||||||
|
|
||||||
import httpx
|
|
||||||
from client import client, tree
|
|
||||||
|
|
||||||
|
|
||||||
GUILD_ID = os.getenv("GUILD_ID")
|
|
||||||
GITEA_API_KEY = os.getenv("GITEA_API_KEY")
|
|
||||||
|
|
||||||
gitea_url = "https://git.epicsparrow.com/api/v1"
|
|
||||||
|
|
||||||
GITEA_PROJECTS = {}
|
|
||||||
|
|
||||||
auth_headers = {"Authorization": f"token {GITEA_API_KEY}"}
|
|
||||||
|
|
||||||
|
|
||||||
def init_gitea_projects():
|
|
||||||
res = httpx.get(gitea_url + "/repos/search", headers=auth_headers)
|
|
||||||
if res.status_code == 200:
|
|
||||||
GITEA_PROJECTS.update(
|
|
||||||
{
|
|
||||||
str(project["id"]): {
|
|
||||||
"name": project["name"],
|
|
||||||
"owner": project["owner"]["login"],
|
|
||||||
}
|
|
||||||
for project in res.json()["data"]
|
|
||||||
}
|
|
||||||
)
|
|
||||||
return [(project["name"], project["id"]) for project in res.json()["data"]]
|
|
||||||
else:
|
|
||||||
return []
|
|
||||||
|
|
||||||
|
|
||||||
init_gitea_projects()
|
|
||||||
|
|
||||||
|
|
||||||
@tree.command(
|
|
||||||
name="gitea-issue",
|
|
||||||
description="Create issues to gitea",
|
|
||||||
)
|
|
||||||
@app_commands.describe(
|
|
||||||
title="Issue title", project="The project where the issue is created"
|
|
||||||
)
|
|
||||||
@app_commands.choices(
|
|
||||||
project=[
|
|
||||||
app_commands.Choice(name=project["name"], value=id_)
|
|
||||||
for id_, project in GITEA_PROJECTS.items()
|
|
||||||
]
|
|
||||||
)
|
|
||||||
async def gitea(interaction: discord.Interaction, project: str, title: str):
|
|
||||||
embed = discord.Embed(title="Gitea issue")
|
|
||||||
embed.add_field(name="Project", value=GITEA_PROJECTS[project]["name"])
|
|
||||||
embed.add_field(name="Title", value=title)
|
|
||||||
embed.add_field(name="Created by", value=interaction.user.mention)
|
|
||||||
|
|
||||||
creation_url = f"{gitea_url}/repos/{GITEA_PROJECTS[project]['owner']}/{GITEA_PROJECTS[project]['name']}/issues"
|
|
||||||
creation_data = {
|
|
||||||
"title": title,
|
|
||||||
"body": f"Created by {interaction.user.nick or interaction.user.name} from Discord.",
|
|
||||||
}
|
|
||||||
res = httpx.post(creation_url, headers=auth_headers, data=creation_data)
|
|
||||||
if res.status_code == 201:
|
|
||||||
embed.add_field(name="Issue created", value=res.json()["html_url"])
|
|
||||||
else:
|
|
||||||
embed.add_field(name="Error", value=res.text)
|
|
||||||
await interaction.response.send_message(embed=embed)
|
|
||||||
@ -1,136 +0,0 @@
|
|||||||
from .base import BaseModule
|
|
||||||
import random
|
|
||||||
import time
|
|
||||||
import json
|
|
||||||
import appdirs
|
|
||||||
import os
|
|
||||||
|
|
||||||
RHYMES_FILE = "rhymes.json"
|
|
||||||
SAVE_FILE = appdirs.user_data_dir() + "/PereFouras/poilau_save.json"
|
|
||||||
|
|
||||||
# CONFIG_TEXT = """
|
|
||||||
# Ce bot a été développé par {user}
|
|
||||||
# Code Source : https://git.epicsparrow.com/Anselme/perefouras
|
|
||||||
# Ajouter ce bot à votre serveur : {url}
|
|
||||||
# """
|
|
||||||
|
|
||||||
|
|
||||||
class RhymesModule(BaseModule):
|
|
||||||
rhymes: list = []
|
|
||||||
guild_config: dict = {}
|
|
||||||
|
|
||||||
async def load(self):
|
|
||||||
str = ""
|
|
||||||
with open(RHYMES_FILE, "r") as f:
|
|
||||||
self.rhymes = json.load(f)
|
|
||||||
|
|
||||||
try:
|
|
||||||
with open(SAVE_FILE, "r") as file:
|
|
||||||
self.guild_config = json.load(file)
|
|
||||||
str = 'Loaded poilau save file "{0}"'.format(SAVE_FILE)
|
|
||||||
except FileNotFoundError:
|
|
||||||
str = 'No previous "{0}" save file found'.format(SAVE_FILE)
|
|
||||||
except json.JSONDecodeError:
|
|
||||||
str = '"{0}" is an invalid JSON file.'.format(SAVE_FILE)
|
|
||||||
print(str)
|
|
||||||
return str
|
|
||||||
|
|
||||||
async def save(self, save_to_file=True):
|
|
||||||
os.makedirs(os.path.dirname(SAVE_FILE), exist_ok=True)
|
|
||||||
with open(SAVE_FILE, "w") as file:
|
|
||||||
json.dump(self.guild_config, file, ensure_ascii=False, indent=2)
|
|
||||||
print('Saved poilau state in file "{0}"'.format(SAVE_FILE))
|
|
||||||
|
|
||||||
def get_last_word(self, ch: str) -> str:
|
|
||||||
truncated = ch
|
|
||||||
while True:
|
|
||||||
if len(truncated) < 2 or truncated[-1].isnumeric():
|
|
||||||
return ""
|
|
||||||
if truncated[-1].isalpha() and truncated[-2].isalpha():
|
|
||||||
break
|
|
||||||
truncated = truncated[:-1]
|
|
||||||
truncated = truncated.split(" ")[-1]
|
|
||||||
if truncated.isalpha():
|
|
||||||
return truncated
|
|
||||||
else:
|
|
||||||
return ""
|
|
||||||
|
|
||||||
def poil_auquel(self, ch: str) -> str:
|
|
||||||
for rhyme in self.rhymes:
|
|
||||||
if ch in rhyme["blacklist"]:
|
|
||||||
return ""
|
|
||||||
if ch.endswith(tuple(rhyme["keys"])):
|
|
||||||
return random.choice(rhyme["rhymes"])
|
|
||||||
return ""
|
|
||||||
|
|
||||||
async def handle_message(self, message) -> bool:
|
|
||||||
message_content = message.content.lower()
|
|
||||||
|
|
||||||
if message_content == "debug poilau":
|
|
||||||
if message.author.id == 151626081458192384:
|
|
||||||
dump = {}
|
|
||||||
for key, value in self.guild_config.items():
|
|
||||||
channel_name = await self.get_guild_name(key)
|
|
||||||
sleeping_time = "{:.2f} s".format(
|
|
||||||
max(0, value["cooldown"] - time.time())
|
|
||||||
)
|
|
||||||
dump[channel_name] = {
|
|
||||||
"cooldown": sleeping_time,
|
|
||||||
"self-control": value["self-control"],
|
|
||||||
}
|
|
||||||
await message.author.send(
|
|
||||||
"```json\n{0}```".format(
|
|
||||||
json.dumps(dump, ensure_ascii=False, indent=2)
|
|
||||||
)
|
|
||||||
)
|
|
||||||
return True
|
|
||||||
|
|
||||||
if message_content == "save poilau":
|
|
||||||
if message.author.id == 151626081458192384:
|
|
||||||
self.save()
|
|
||||||
json_str = "```json\n{0}```".format(
|
|
||||||
json.dumps(self.guild_config, ensure_ascii=False, indent=2)
|
|
||||||
)
|
|
||||||
await message.author.send(json_str)
|
|
||||||
return True
|
|
||||||
|
|
||||||
if message_content == "load poilau":
|
|
||||||
if message.author.id == 151626081458192384:
|
|
||||||
await message.author.send(await self.load())
|
|
||||||
json_str = "```json\n{0}```".format(
|
|
||||||
json.dumps(self.guild_config, ensure_ascii=False, indent=2)
|
|
||||||
)
|
|
||||||
await message.author.send(json_str)
|
|
||||||
return True
|
|
||||||
|
|
||||||
if message_content == "tg fouras" and message.guild:
|
|
||||||
self.guild_config[str(message.guild.id)] = {
|
|
||||||
"cooldown": time.time() + 40000,
|
|
||||||
"self-control": 2.0,
|
|
||||||
}
|
|
||||||
await message.channel.send("ok :'(")
|
|
||||||
return True
|
|
||||||
|
|
||||||
last_word = self.get_last_word(message_content)
|
|
||||||
if message.author != self._client.user and message.guild and last_word:
|
|
||||||
poil = self.poil_auquel(last_word)
|
|
||||||
guildId = str(message.guild.id)
|
|
||||||
guild_config = self.guild_config.get(
|
|
||||||
guildId, {"cooldown": 0, "self-control": 1.0}
|
|
||||||
)
|
|
||||||
if poil and time.time() - guild_config["cooldown"] > 0:
|
|
||||||
self_control = guild_config["self-control"]
|
|
||||||
if random.random() < self_control:
|
|
||||||
guild_config["self-control"] = self_control * 0.9
|
|
||||||
self.guild_config[guildId] = guild_config
|
|
||||||
return False
|
|
||||||
wait_time = random.randint(0, 900)
|
|
||||||
if bool(random.getrandbits(1)):
|
|
||||||
wait_time = random.randint(900, 10800)
|
|
||||||
self.guild_config[guildId] = {
|
|
||||||
"cooldown": time.time() + wait_time,
|
|
||||||
"self-control": self_control + 1.0,
|
|
||||||
}
|
|
||||||
await message.channel.send(poil)
|
|
||||||
return True
|
|
||||||
return False
|
|
||||||
298
rhymes.py
Normal file
298
rhymes.py
Normal file
@ -0,0 +1,298 @@
|
|||||||
|
# rhymes.py
|
||||||
|
import random
|
||||||
|
import json
|
||||||
|
import sqlite3
|
||||||
|
from pathlib import Path
|
||||||
|
from datetime import datetime
|
||||||
|
from typing import Dict, Any, Tuple
|
||||||
|
|
||||||
|
RHYMES_FILE = "resources/rhymes.json"
|
||||||
|
DB_FILE = "data/poilau_state.db"
|
||||||
|
RHYME_LOG_FILE = "data/rhyme_log.csv"
|
||||||
|
|
||||||
|
loaded_rhymes = {}
|
||||||
|
|
||||||
|
def _ensure_db() -> None:
|
||||||
|
"""Initialize SQLite database and create tables if needed."""
|
||||||
|
Path(DB_FILE).parent.mkdir(parents=True, exist_ok=True)
|
||||||
|
|
||||||
|
with sqlite3.connect(DB_FILE) as conn:
|
||||||
|
cursor = conn.cursor()
|
||||||
|
cursor.execute("""
|
||||||
|
CREATE TABLE IF NOT EXISTS guild_state (
|
||||||
|
guild_id TEXT PRIMARY KEY,
|
||||||
|
guild_name TEXT NOT NULL DEFAULT '',
|
||||||
|
cooldown_until TEXT NOT NULL DEFAULT '1970-01-01T00:00:00',
|
||||||
|
self_control REAL NOT NULL DEFAULT 1.0,
|
||||||
|
last_updated TEXT NOT NULL DEFAULT (datetime('now'))
|
||||||
|
)
|
||||||
|
""")
|
||||||
|
conn.commit()
|
||||||
|
|
||||||
|
|
||||||
|
def _ensure_log_file() -> None:
|
||||||
|
"""Create CSV log file if it doesn't exist."""
|
||||||
|
Path(RHYME_LOG_FILE).parent.mkdir(parents=True, exist_ok=True)
|
||||||
|
|
||||||
|
if not Path(RHYME_LOG_FILE).exists():
|
||||||
|
with open(RHYME_LOG_FILE, "w", encoding="utf-8") as f:
|
||||||
|
f.write("timestamp,last_word,rhyme_triggered\n")
|
||||||
|
|
||||||
|
|
||||||
|
def _get_connection() -> sqlite3.Connection:
|
||||||
|
"""Return SQLite connection with row factory for named column access."""
|
||||||
|
conn = sqlite3.connect(DB_FILE)
|
||||||
|
conn.row_factory = sqlite3.Row
|
||||||
|
return conn
|
||||||
|
|
||||||
|
|
||||||
|
def load_rhymes() -> Tuple[bool, str]:
|
||||||
|
global loaded_rhymes
|
||||||
|
"""Load rhymes from JSON file. Returns (success, message)."""
|
||||||
|
try:
|
||||||
|
with open(RHYMES_FILE, "r", encoding="utf-8") as f:
|
||||||
|
loaded_rhymes = json.load(f)
|
||||||
|
return True, f"Loaded rhymes file \"{RHYMES_FILE}\""
|
||||||
|
except FileNotFoundError:
|
||||||
|
return False, f"No rhymes file found at \"{RHYMES_FILE}\""
|
||||||
|
except json.JSONDecodeError as e:
|
||||||
|
return False, f"Invalid JSON in \"{RHYMES_FILE}\": {e}"
|
||||||
|
|
||||||
|
|
||||||
|
def get_guild_state(guild_id: str) -> Dict[str, Any]:
|
||||||
|
"""Retrieve guild state from database."""
|
||||||
|
with _get_connection() as conn:
|
||||||
|
cursor = conn.cursor()
|
||||||
|
cursor.execute("""
|
||||||
|
SELECT guild_id, guild_name, cooldown_until, self_control, last_updated
|
||||||
|
FROM guild_state WHERE guild_id = ?
|
||||||
|
""", (guild_id,))
|
||||||
|
row = cursor.fetchone()
|
||||||
|
|
||||||
|
if row:
|
||||||
|
return {
|
||||||
|
"guild_id": row["guild_id"],
|
||||||
|
"guild_name": row["guild_name"],
|
||||||
|
"cooldown_until": row["cooldown_until"],
|
||||||
|
"self_control": row["self_control"],
|
||||||
|
"last_updated": row["last_updated"]
|
||||||
|
}
|
||||||
|
else:
|
||||||
|
return {
|
||||||
|
"guild_id": guild_id,
|
||||||
|
"guild_name": "",
|
||||||
|
"cooldown_until": "1970-01-01T00:00:00",
|
||||||
|
"self_control": 1.0,
|
||||||
|
"last_updated": datetime.now().isoformat()
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
def update_guild_state(
|
||||||
|
guild_id: str,
|
||||||
|
guild_name: str,
|
||||||
|
cooldown_until: str,
|
||||||
|
self_control: float
|
||||||
|
) -> None:
|
||||||
|
"""Update guild state in database."""
|
||||||
|
with _get_connection() as conn:
|
||||||
|
cursor = conn.cursor()
|
||||||
|
cursor.execute("""
|
||||||
|
INSERT OR REPLACE INTO guild_state (guild_id, guild_name, cooldown_until, self_control, last_updated)
|
||||||
|
VALUES (?, ?, ?, ?, ?)
|
||||||
|
""", (guild_id, guild_name, cooldown_until, self_control, datetime.now().isoformat()))
|
||||||
|
conn.commit()
|
||||||
|
|
||||||
|
|
||||||
|
def delete_guild_state(guild_id: str) -> bool:
|
||||||
|
"""Delete guild state from database."""
|
||||||
|
with _get_connection() as conn:
|
||||||
|
cursor = conn.cursor()
|
||||||
|
cursor.execute("DELETE FROM guild_state WHERE guild_id = ?", (guild_id,))
|
||||||
|
conn.commit()
|
||||||
|
return cursor.rowcount > 0
|
||||||
|
|
||||||
|
|
||||||
|
def get_all_guild_states() -> Dict[str, Dict[str, Any]]:
|
||||||
|
"""Retrieve all guild states (for debug purposes)."""
|
||||||
|
with _get_connection() as conn:
|
||||||
|
cursor = conn.cursor()
|
||||||
|
cursor.execute("SELECT guild_id, guild_name, cooldown_until, self_control, last_updated FROM guild_state")
|
||||||
|
return {
|
||||||
|
row["guild_id"]: {
|
||||||
|
"guild_name": row["guild_name"],
|
||||||
|
"cooldown_until": row["cooldown_until"],
|
||||||
|
"self_control": row["self_control"],
|
||||||
|
"last_updated": row["last_updated"]
|
||||||
|
}
|
||||||
|
for row in cursor.fetchall()
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
def log_rhyme(last_word: str, rhyme_triggered: str) -> None:
|
||||||
|
"""Log rhyme trigger to CSV file."""
|
||||||
|
timestamp = datetime.now().isoformat()
|
||||||
|
|
||||||
|
with open(RHYME_LOG_FILE, "a", encoding="utf-8") as f:
|
||||||
|
safe_rhyme = rhyme_triggered.replace(",", ";")
|
||||||
|
f.write(f"{timestamp},{last_word},{safe_rhyme}\n")
|
||||||
|
|
||||||
|
|
||||||
|
def get_last_word(text: str) -> str:
|
||||||
|
"""Extract last alphabetic word from text."""
|
||||||
|
truncated = text
|
||||||
|
while True:
|
||||||
|
if len(truncated) < 2 or truncated[-1].isnumeric():
|
||||||
|
return ""
|
||||||
|
if truncated[-1].isalpha() and truncated[-2].isalpha():
|
||||||
|
break
|
||||||
|
truncated = truncated[:-1]
|
||||||
|
truncated = truncated.split(" ")[-1]
|
||||||
|
return truncated if truncated.isalpha() else ""
|
||||||
|
|
||||||
|
|
||||||
|
def find_rhyme(word: str) -> str:
|
||||||
|
global loaded_rhymes
|
||||||
|
"""Find matching rhyme for given word."""
|
||||||
|
for rhyme in loaded_rhymes:
|
||||||
|
if word in rhyme["blacklist"]:
|
||||||
|
return ""
|
||||||
|
if word.endswith(tuple(rhyme["keys"])):
|
||||||
|
log_rhyme(word, rhyme["sound"])
|
||||||
|
return random.choice(rhyme["rhymes"])
|
||||||
|
return ""
|
||||||
|
|
||||||
|
async def get_guild_name(guildId, client) -> str:
|
||||||
|
guild = await client.fetch_guild(guildId)
|
||||||
|
return "[Server={0}]".format(guild.name)
|
||||||
|
|
||||||
|
async def handle_debug_commands(message, client) -> bool:
|
||||||
|
"""Handle debug commands (debug, save, load). Returns True if handled."""
|
||||||
|
message_content = message.content.lower()
|
||||||
|
|
||||||
|
if message_content == "debug poilau":
|
||||||
|
if message.author.id == 151626081458192384:
|
||||||
|
all_states = get_all_guild_states()
|
||||||
|
dump = {}
|
||||||
|
for guild_id, state in all_states.items():
|
||||||
|
channel_name = await get_guild_name(guild_id, client)
|
||||||
|
cooldown_dt = datetime.fromisoformat(state["cooldown_until"])
|
||||||
|
time_remaining = max(0, (cooldown_dt - datetime.now()).total_seconds())
|
||||||
|
sleeping_time = "{:.2f} s".format(time_remaining)
|
||||||
|
dump[channel_name] = {
|
||||||
|
"cooldown_until": state["cooldown_until"],
|
||||||
|
"cooldown_remaining": sleeping_time,
|
||||||
|
"self-control": state["self_control"],
|
||||||
|
"last_updated": state["last_updated"]
|
||||||
|
}
|
||||||
|
await message.author.send(
|
||||||
|
"```json\n{0}```".format(json.dumps(dump, ensure_ascii=False, indent=2))
|
||||||
|
)
|
||||||
|
return True
|
||||||
|
|
||||||
|
if message_content == "save poilau":
|
||||||
|
if message.author.id == 151626081458192384:
|
||||||
|
all_states = get_all_guild_states()
|
||||||
|
json_str = "```json\n{0}```".format(json.dumps(all_states, ensure_ascii=False, indent=2))
|
||||||
|
await message.author.send("State persisted in SQLite database")
|
||||||
|
await message.author.send(json_str)
|
||||||
|
return True
|
||||||
|
|
||||||
|
if message_content == "load poilau":
|
||||||
|
if message.author.id == 151626081458192384:
|
||||||
|
success, msg = load_rhymes()
|
||||||
|
all_states = get_all_guild_states()
|
||||||
|
json_str = "```json\n{0}```".format(json.dumps(all_states, ensure_ascii=False, indent=2))
|
||||||
|
await message.author.send(msg)
|
||||||
|
await message.author.send(json_str)
|
||||||
|
return True
|
||||||
|
|
||||||
|
if message_content == "tg fouras" and message.guild:
|
||||||
|
# Disable cooldown for this server (set to far future)
|
||||||
|
cooldown_date = datetime.now().replace(hour=0, minute=0, second=0, microsecond=0)
|
||||||
|
cooldown_date = cooldown_date.replace(day=cooldown_date.day + 10000)
|
||||||
|
update_guild_state(
|
||||||
|
str(message.guild.id),
|
||||||
|
guild_name=message.guild.name,
|
||||||
|
cooldown_until=cooldown_date.isoformat(),
|
||||||
|
self_control=2.0
|
||||||
|
)
|
||||||
|
await message.channel.send("ok :'(")
|
||||||
|
return True
|
||||||
|
|
||||||
|
return False
|
||||||
|
|
||||||
|
|
||||||
|
async def handle_rhyme_logic(message, client) -> bool:
|
||||||
|
"""Main rhyme detection logic. Returns True if rhyme was triggered."""
|
||||||
|
message_content = message.content.lower()
|
||||||
|
last_word = get_last_word(message_content)
|
||||||
|
|
||||||
|
if message.author != client.user and message.guild and last_word:
|
||||||
|
rhyme = find_rhyme(last_word)
|
||||||
|
guild_id = str(message.guild.id)
|
||||||
|
guild_name = message.guild.name
|
||||||
|
|
||||||
|
if rhyme:
|
||||||
|
guild_state = get_guild_state(guild_id)
|
||||||
|
|
||||||
|
# Update guild name if changed
|
||||||
|
if guild_state["guild_name"] != guild_name:
|
||||||
|
update_guild_state(
|
||||||
|
guild_id,
|
||||||
|
guild_name=guild_name,
|
||||||
|
cooldown_until=guild_state["cooldown_until"],
|
||||||
|
self_control=guild_state["self_control"]
|
||||||
|
)
|
||||||
|
|
||||||
|
# Check cooldown
|
||||||
|
cooldown_dt = datetime.fromisoformat(guild_state["cooldown_until"])
|
||||||
|
now_dt = datetime.now()
|
||||||
|
|
||||||
|
if now_dt >= cooldown_dt:
|
||||||
|
self_control = guild_state["self_control"]
|
||||||
|
|
||||||
|
# Probability check
|
||||||
|
if random.random() < self_control:
|
||||||
|
new_self_control = self_control * 0.9
|
||||||
|
update_guild_state(
|
||||||
|
guild_id,
|
||||||
|
guild_name=guild_name,
|
||||||
|
cooldown_until=now_dt.isoformat(),
|
||||||
|
self_control=new_self_control
|
||||||
|
)
|
||||||
|
return False
|
||||||
|
|
||||||
|
# Calculate new cooldown duration
|
||||||
|
wait_time = random.randint(0, 900)
|
||||||
|
if bool(random.getrandbits(1)):
|
||||||
|
wait_time = random.randint(900, 10800)
|
||||||
|
|
||||||
|
new_cooldown_dt = now_dt.replace(second=0, microsecond=0)
|
||||||
|
new_cooldown_dt = new_cooldown_dt.replace(minute=new_cooldown_dt.minute + wait_time // 60)
|
||||||
|
new_cooldown_dt = new_cooldown_dt.replace(hour=new_cooldown_dt.hour + wait_time // 3600)
|
||||||
|
|
||||||
|
update_guild_state(
|
||||||
|
guild_id,
|
||||||
|
guild_name=guild_name,
|
||||||
|
cooldown_until=new_cooldown_dt.isoformat(),
|
||||||
|
self_control=self_control + 1.0
|
||||||
|
)
|
||||||
|
|
||||||
|
await message.channel.send(rhyme)
|
||||||
|
return True
|
||||||
|
|
||||||
|
return False
|
||||||
|
|
||||||
|
|
||||||
|
async def handle_message(message, client) -> bool:
|
||||||
|
"""Main entry point for message handling."""
|
||||||
|
# Initialize database and log file on first run
|
||||||
|
_ensure_db()
|
||||||
|
_ensure_log_file()
|
||||||
|
|
||||||
|
# Handle debug commands first
|
||||||
|
if await handle_debug_commands(message, client):
|
||||||
|
return True
|
||||||
|
|
||||||
|
# Process rhyme logic
|
||||||
|
return await handle_rhyme_logic(message, client)
|
||||||
Loading…
x
Reference in New Issue
Block a user