diff options
Diffstat (limited to 'qotd.py')
-rw-r--r-- | qotd.py | 376 |
1 files changed, 376 insertions, 0 deletions
@@ -0,0 +1,376 @@ +#!/usr/bin/env python3 +# +# lurklite quote of the day +# +# Copyright © 2019-2021 by luk3yx +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as +# published by the Free Software Foundation, either version 3 of the +# License, or (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see <https://www.gnu.org/licenses/>. +# + +from difflib import SequenceMatcher +import base64, json, math, os, random, tempfile, time, urllib.request + +# This isn't the best place to store data but lurklite doesn't have any +# database API. +data_path = os.path.dirname(__file__) +qotd_file = os.path.join(data_path, 'qotd.json') +archive_file = os.path.join(data_path, 'old_quotes.txt') +lockfile = os.path.join(tempfile.gettempdir(), '.qotd-' + + base64.urlsafe_b64encode(qotd_file.encode('utf-8')).decode('ascii' + ).rstrip('=') + '.lock') +qotd_static = os.path.join(data_path, 'qotd_current.txt') +try: + with open(os.path.join(data_path, 'qotd_webhook_url.txt')) as f: + webhook_url = f.read().strip() + del f +except FileNotFoundError: + webhook_url = None + +qotd_format = ('Quote of the day: \x1d{}\x1d\n\nDisclaimer: Quotes are mostly ' + 'user-generated content and may not be endorsed by lurklite ' + 'staff.') + +# A quick lockfile implementation +# There may be multiple bot instances (over separate processes) trying to +# access this file. +def lock_quotes(): + while True: + try: + with open(lockfile, 'x') as f: + f.write(str(time.time())) + except FileExistsError: + time.sleep(0.05) + else: + break + +def unlock_quotes(): + os.remove(lockfile) + +# Read quotes +def read_quotes(): + lock_quotes() + try: + with open(qotd_file, 'r') as f: + return json.load(f) + except: + return {} + +def save_quotes(quotes): + raw = json.dumps(quotes) + with open(qotd_file + '~', 'w') as f: + f.write(raw) + os.rename(qotd_file + '~', qotd_file) + + if not os.path.exists(qotd_static): + with open(qotd_static, 'w') as f: + f.write(quotes.get('current', '<No quotes>')) + +def archive_quote(quote, blame, expiry_time): + # Convert the expiry time into a creation time (provided the expiry time + # is a sane value). + if expiry_time and expiry_time > 1_000_000_000: + t = expiry_time - 86400 + else: + t = None + with open(archive_file, 'a') as f: + f.write(json.dumps({'quote': quote, 'blame': blame, 'time': t}) + '\n') + +def is_discord(irc): + return hasattr(irc, '_client') + +def send_webhook(**req): + if not webhook_url: + return + req = urllib.request.Request( + webhook_url, + data=json.dumps(req).encode('utf-8'), + headers={ + 'Content-Type': 'application/json', + 'User-Agent': 'lurklite-qotd-webhook', + }, + method='POST', + ) + print(req) + with urllib.request.urlopen(req): + pass + +def is_qotd_admin(irc, hostmask, is_admin, args, quotes): + if is_admin: + return True + # if quotes is None: + # quotes = read_quotes() + # unlock_quotes() + n = (hostmask[0].strip('<@!>') if is_discord(irc) else hostmask[-1]) + if n in quotes.get('admins', ()): + return True + else: + irc.msg(args[0], hostmask[0] + ': Permission denied!') + return False + +def prune_quote_blames(quotes): + if 'blame' not in quotes: + return + q = quotes.get('quotes', ()) + quotes['blame'] = {k: v for k, v in quotes['blame'].items() if k in q} + +def _aux_quote(quotes, source, quote): + quotes['current'] = quote + if 'blame' not in quotes: + quotes['blame'] = {} + quotes['blame'][quote] = f'Auxiliary quote system ({source})' + +def _fetch_json(url): + with urllib.request.urlopen(url, timeout=3) as res: + return json.loads(res.read()) + +def get_quotable_quote(): + data = _fetch_json('https://api.quotable.io/random') + return f'"{data["content"]}" -- {data["author"]}' + +# def get_pandorabot_quote(): +# pandorabox_quotes = _fetch_json('https://pandorabox.io/api/quotes') +# it = filter(lambda s : s.startswith('<'), pandorabox_quotes) +# return random.choice(tuple(it)) + +# Get the next quote +def get_quote(channel=None): + quotes = read_quotes() + try: + if channel and channel.lower() in quotes.get('blocklist', ()): + return + + if quotes.get('next_update', 0) <= time.time(): + # Archive the current quote + if 'current' in quotes: + archive_quote(quotes['current'], + quotes['blame'].get(quotes['current']), + quotes.get('next_update')) + prune_quote_blames(quotes) + + # quotes['next_update'] = time.time() + 86400 + + # Always expire quotes at midnight UTC + time_s = math.floor(time.time()) + quotes['next_update'] = (time_s // 86400 + 1) * 86400 + + if quotes.get('quotes', None): + quotes['current'] = random.choice(quotes['quotes']) + quotes['quotes'].remove(quotes['current']) + # elif (random.randint(1, 5) == 1 and + # hasattr(random, 'get_rule_of_acquisition')): + # _aux_quote(quotes, 'Rule of acquisition', + # random.get_rule_of_acquisition()) + # elif random.randint(1, 5) == 1: + # _aux_quote(quotes, 'Pandorabot', get_pandorabot_quote()) + else: + _aux_quote(quotes, 'Quotable', get_quotable_quote()) + + save_quotes(quotes) + + with open(qotd_static, 'w') as f: + f.write(qotd_format.format(quotes['current'])) + + return quotes.get('current', '<No quotes>') + finally: + unlock_quotes() + +# The quote of the day command +@register_command('qotd', 'quotd', 'quote') +def qotd_cmd(irc, hostmask, is_admin, args): + """ Get the current quote of the day. """ + quote = get_quote(args[0]) + irc.msg(args[0], qotd_format.format(quote) if quote is not None else + 'Unfortunately, .qotd has been disabled in this channel.') + +# @register_command('qotd-skip', requires_admin=True) +# def qotd_skip_cmd(irc, hostmask, is_admin, args): +# quotes = read_quotes() +# try: +# del quotes['next_update'] +# save_quotes(quotes) +# except KeyError: +# irc.msg(args[0], 'There is no quote to skip!') +# finally: +# unlock_quotes() +# irc.msg(args[0], 'Quote skipped!') + +@register_command('qotd-blame', 'qotd_blame') +def qotd_blame_cmd(irc, hostmask, is_admin, args): + quotes = read_quotes() + unlock_quotes() + adder = 'blame' in quotes and quotes['blame'].get(quotes.get('current')) + if adder: + irc.msg(args[0], f'The current quote was added by {adder!r}.') + else: + irc.msg(args[0], "I don't know who added the current quote.") + +def compare_quotes(quotes, quote): + for existing_quote in quotes['blame']: + if SequenceMatcher(None, quote, existing_quote).ratio() >= 0.85: + return existing_quote + +# Add a quote +@register_command('add-quote', 'add_quote') +def add_quote_cmd(irc, hostmask, is_admin, args): + """ Add a quote of the day (requires admin). """ + quotes = read_quotes() + try: + if not is_qotd_admin(irc, hostmask, is_admin, args, quotes): + return + + quote = args[1].strip().replace('\t', ' ') + if quote[:1] not in ('*', '<', '"'): + return irc.msg(args[0], hostmask[0] + ': Invalid quote!') + + if 'quotes' not in quotes: + quotes['quotes'] = [] + + if 'blame' not in quotes: + quotes['blame'] = {} + + if quote in quotes['quotes']: + return irc.msg(args[0], hostmask[0] + ': That quote has already ' + 'been added!') + if quote in quotes['blame']: + return irc.msg(args[0], hostmask[0] + ': That quote was recently ' + 'removed, you may not re-add it.') + + if similar_quote := compare_quotes(quotes, quote): + state = ('an existing' if similar_quote in quotes['quotes'] + else 'a recently removed') + return irc.msg(args[0], f'{hostmask[0]}: That quote is too similar' + f' to {state} quote:\n> {similar_quote}') + + quotes['quotes'].append(quote) + quotes['blame'][quote] = hostmask[1 if is_discord(irc) else 2] + save_quotes(quotes) + finally: + unlock_quotes() + + irc.msg(args[0], hostmask[0] + ': Done!') + + send_webhook(embeds=[{ + 'title': f'{hostmask[1]} added a quote', + 'color': 0x008000, + 'description': quote, + 'footer': {'text': hostmask[2]}, + }]) + +# Remove a quote +@register_command('remove-quote', 'remove_quote') +def remove_quote_cmd(irc, hostmask, is_admin, args): + """ Remove a quote of the day (requires admin). """ + quotes = read_quotes() + try: + if not is_qotd_admin(irc, hostmask, is_admin, args, quotes): + return + quote = args[1].strip().replace('\t', ' ') + if quote not in quotes.get('quotes', ()): + msg = 'That quote does not exist!' + #if hostmask[1] == 'Ranger#3292': + # msg = 'Done!' + irc.msg(args[0], hostmask[0] + ': ' + msg) + return + + quotes['quotes'].remove(quote) + + # Remove from the blame list if this is the same person that added it + blame = quotes.get('blame') + if blame and blame.get(quote) == hostmask[1 if is_discord(irc) else 2]: + del blame[quote] + + save_quotes(quotes) + finally: + unlock_quotes() + + irc.msg(args[0], hostmask[0] + ': Done!') + + send_webhook(embeds=[{ + 'title': f'{hostmask[1]} removed a quote', + 'color': 0xFFA500, + 'description': quote, + 'footer': {'text': hostmask[2]}, + }]) + +def _plural(n): + return '' if n == 1 else 's' + +@register_command('qotd-status', 'qotd_status') +def qotd_status_cmd(irc, hostmask, is_admin, args): + quotes = read_quotes() + unlock_quotes() + if not is_qotd_admin(irc, hostmask, is_admin, args, quotes): + return + + saved_quotes = len(quotes.get('quotes', ())) + expiry = max(int(quotes.get('next_update', 0) - time.time()), 0) + + expiry_s = expiry % 60 + expiry_m = (expiry // 60) % 60 + expiry_h = expiry // 3600 + + irc.msg(args[0], f'{hostmask[0]}: ' + f'The current quote expires in \x02{expiry_h} hour' + f'{_plural(expiry_h)}, {expiry_m} minute{_plural(expiry_m)}, and ' + f'{expiry_s} second{_plural(expiry_s)}\x02. ' + f'Amount of saved quotes: \x02{saved_quotes}\x02.') + +@register_command('qotd-admins', 'qotd_admins', requires_admin=True) +def qotd_admins_cmd(irc, hostmask, is_admin, args): + quotes = read_quotes() + unlock_quotes() + admins_it = quotes.get('admins', ()) + if is_discord(irc): + admins_it = map('<@{}>'.format, admins_it) + admins = ', '.join(admins_it) + irc.msg(args[0], f'{hostmask[0]}: Current .qotd admins: {admins}') + +def _get_admin_id(irc, args): + admin = args[1].strip() + if is_discord(irc): + admin = admin.strip('<@!>') + return admin + +@register_command('qotd-grant', 'qotd_grant', requires_admin=True) +def qotd_grant_cmd(irc, hostmask, is_admin, args): + quotes = read_quotes() + try: + admin = _get_admin_id(irc, args) + admins = quotes.get('admins') + if admins is None: + admins = quotes['admins'] = [] + elif admin in admins: + irc.msg(args[0], f'{hostmask[0]}: That user is already has ' + '.add-quote privs!') + return + admins.append(admin) + admins.sort() + save_quotes(quotes) + finally: + unlock_quotes() + irc.msg(args[0], f'{hostmask[0]}: Done!') + +@register_command('qotd-revoke', 'qotd_revoke', requires_admin=True) +def qotd_revoke_cmd(irc, hostmask, is_admin, args): + quotes = read_quotes() + try: + admin = _get_admin_id(irc, args) + try: + quotes.get('admins', []).remove(admin) + except ValueError: + pass + save_quotes(quotes) + finally: + unlock_quotes() + irc.msg(args[0], f'{hostmask[0]}: Done!') |