#!/usr/bin/env python3 # # lurklite quote of the day # # Copyright © 2019-2021 by luk3yx # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as # published by the Free Software Foundation, either version 3 of the # License, or (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . # from difflib import SequenceMatcher import base64, json, math, os, random, tempfile, time, urllib.request # This isn't the best place to store data but lurklite doesn't have any # database API. data_path = os.path.dirname(__file__) qotd_file = os.path.join(data_path, 'qotd.json') archive_file = os.path.join(data_path, 'old_quotes.txt') lockfile = os.path.join(tempfile.gettempdir(), '.qotd-' + base64.urlsafe_b64encode(qotd_file.encode('utf-8')).decode('ascii' ).rstrip('=') + '.lock') qotd_static = os.path.join(data_path, 'qotd_current.txt') try: with open(os.path.join(data_path, 'qotd_webhook_url.txt')) as f: webhook_url = f.read().strip() del f except FileNotFoundError: webhook_url = None qotd_format = ('Quote of the day: \x1d{}\x1d\n\nDisclaimer: Quotes are mostly ' 'user-generated content and may not be endorsed by lurklite ' 'staff.') # A quick lockfile implementation # There may be multiple bot instances (over separate processes) trying to # access this file. def lock_quotes(): while True: try: with open(lockfile, 'x') as f: f.write(str(time.time())) except FileExistsError: time.sleep(0.05) else: break def unlock_quotes(): os.remove(lockfile) # Read quotes def read_quotes(): lock_quotes() try: with open(qotd_file, 'r') as f: return json.load(f) except: return {} def save_quotes(quotes): raw = json.dumps(quotes) with open(qotd_file + '~', 'w') as f: f.write(raw) os.rename(qotd_file + '~', qotd_file) if not os.path.exists(qotd_static): with open(qotd_static, 'w') as f: f.write(quotes.get('current', '')) def archive_quote(quote, blame, expiry_time): # Convert the expiry time into a creation time (provided the expiry time # is a sane value). if expiry_time and expiry_time > 1_000_000_000: t = expiry_time - 86400 else: t = None with open(archive_file, 'a') as f: f.write(json.dumps({'quote': quote, 'blame': blame, 'time': t}) + '\n') def is_discord(irc): return hasattr(irc, '_client') def send_webhook(**req): if not webhook_url: return req = urllib.request.Request( webhook_url, data=json.dumps(req).encode('utf-8'), headers={ 'Content-Type': 'application/json', 'User-Agent': 'lurklite-qotd-webhook', }, method='POST', ) print(req) with urllib.request.urlopen(req): pass def is_qotd_admin(irc, hostmask, is_admin, args, quotes): if is_admin: return True # if quotes is None: # quotes = read_quotes() # unlock_quotes() n = (hostmask[0].strip('<@!>') if is_discord(irc) else hostmask[-1]) if n in quotes.get('admins', ()): return True else: irc.msg(args[0], hostmask[0] + ': Permission denied!') return False def prune_quote_blames(quotes): if 'blame' not in quotes: return q = quotes.get('quotes', ()) quotes['blame'] = {k: v for k, v in quotes['blame'].items() if k in q} def _aux_quote(quotes, source, quote): quotes['current'] = quote if 'blame' not in quotes: quotes['blame'] = {} quotes['blame'][quote] = f'Auxiliary quote system ({source})' def _fetch_json(url): with urllib.request.urlopen(url, timeout=3) as res: return json.loads(res.read()) def get_quotable_quote(): data = _fetch_json('https://api.quotable.io/random') return f'"{data["content"]}" -- {data["author"]}' # def get_pandorabot_quote(): # pandorabox_quotes = _fetch_json('https://pandorabox.io/api/quotes') # it = filter(lambda s : s.startswith('<'), pandorabox_quotes) # return random.choice(tuple(it)) # Get the next quote def get_quote(channel=None): quotes = read_quotes() try: if channel and channel.lower() in quotes.get('blocklist', ()): return if quotes.get('next_update', 0) <= time.time(): # Archive the current quote if 'current' in quotes: archive_quote(quotes['current'], quotes['blame'].get(quotes['current']), quotes.get('next_update')) prune_quote_blames(quotes) # quotes['next_update'] = time.time() + 86400 # Always expire quotes at midnight UTC time_s = math.floor(time.time()) quotes['next_update'] = (time_s // 86400 + 1) * 86400 if quotes.get('quotes', None): quotes['current'] = random.choice(quotes['quotes']) quotes['quotes'].remove(quotes['current']) # elif (random.randint(1, 5) == 1 and # hasattr(random, 'get_rule_of_acquisition')): # _aux_quote(quotes, 'Rule of acquisition', # random.get_rule_of_acquisition()) # elif random.randint(1, 5) == 1: # _aux_quote(quotes, 'Pandorabot', get_pandorabot_quote()) else: _aux_quote(quotes, 'Quotable', get_quotable_quote()) save_quotes(quotes) with open(qotd_static, 'w') as f: f.write(qotd_format.format(quotes['current'])) return quotes.get('current', '') finally: unlock_quotes() # The quote of the day command @register_command('qotd', 'quotd', 'quote') def qotd_cmd(irc, hostmask, is_admin, args): """ Get the current quote of the day. """ quote = get_quote(args[0]) irc.msg(args[0], qotd_format.format(quote) if quote is not None else 'Unfortunately, .qotd has been disabled in this channel.') # @register_command('qotd-skip', requires_admin=True) # def qotd_skip_cmd(irc, hostmask, is_admin, args): # quotes = read_quotes() # try: # del quotes['next_update'] # save_quotes(quotes) # except KeyError: # irc.msg(args[0], 'There is no quote to skip!') # finally: # unlock_quotes() # irc.msg(args[0], 'Quote skipped!') @register_command('qotd-blame', 'qotd_blame') def qotd_blame_cmd(irc, hostmask, is_admin, args): quotes = read_quotes() unlock_quotes() adder = 'blame' in quotes and quotes['blame'].get(quotes.get('current')) if adder: irc.msg(args[0], f'The current quote was added by {adder!r}.') else: irc.msg(args[0], "I don't know who added the current quote.") def compare_quotes(quotes, quote): for existing_quote in quotes['blame']: if SequenceMatcher(None, quote, existing_quote).ratio() >= 0.85: return existing_quote # Add a quote @register_command('add-quote', 'add_quote') def add_quote_cmd(irc, hostmask, is_admin, args): """ Add a quote of the day (requires admin). """ quotes = read_quotes() try: if not is_qotd_admin(irc, hostmask, is_admin, args, quotes): return quote = args[1].strip().replace('\t', ' ') if quote[:1] not in ('*', '<', '"'): return irc.msg(args[0], hostmask[0] + ': Invalid quote!') if 'quotes' not in quotes: quotes['quotes'] = [] if 'blame' not in quotes: quotes['blame'] = {} if quote in quotes['quotes']: return irc.msg(args[0], hostmask[0] + ': That quote has already ' 'been added!') if quote in quotes['blame']: return irc.msg(args[0], hostmask[0] + ': That quote was recently ' 'removed, you may not re-add it.') if similar_quote := compare_quotes(quotes, quote): state = ('an existing' if similar_quote in quotes['quotes'] else 'a recently removed') return irc.msg(args[0], f'{hostmask[0]}: That quote is too similar' f' to {state} quote:\n> {similar_quote}') quotes['quotes'].append(quote) quotes['blame'][quote] = hostmask[1 if is_discord(irc) else 2] save_quotes(quotes) finally: unlock_quotes() irc.msg(args[0], hostmask[0] + ': Done!') send_webhook(embeds=[{ 'title': f'{hostmask[1]} added a quote', 'color': 0x008000, 'description': quote, 'footer': {'text': hostmask[2]}, }]) # Remove a quote @register_command('remove-quote', 'remove_quote') def remove_quote_cmd(irc, hostmask, is_admin, args): """ Remove a quote of the day (requires admin). """ quotes = read_quotes() try: if not is_qotd_admin(irc, hostmask, is_admin, args, quotes): return quote = args[1].strip().replace('\t', ' ') if quote not in quotes.get('quotes', ()): msg = 'That quote does not exist!' #if hostmask[1] == 'Ranger#3292': # msg = 'Done!' irc.msg(args[0], hostmask[0] + ': ' + msg) return quotes['quotes'].remove(quote) # Remove from the blame list if this is the same person that added it blame = quotes.get('blame') if blame and blame.get(quote) == hostmask[1 if is_discord(irc) else 2]: del blame[quote] save_quotes(quotes) finally: unlock_quotes() irc.msg(args[0], hostmask[0] + ': Done!') send_webhook(embeds=[{ 'title': f'{hostmask[1]} removed a quote', 'color': 0xFFA500, 'description': quote, 'footer': {'text': hostmask[2]}, }]) def _plural(n): return '' if n == 1 else 's' @register_command('qotd-status', 'qotd_status') def qotd_status_cmd(irc, hostmask, is_admin, args): quotes = read_quotes() unlock_quotes() if not is_qotd_admin(irc, hostmask, is_admin, args, quotes): return saved_quotes = len(quotes.get('quotes', ())) expiry = max(int(quotes.get('next_update', 0) - time.time()), 0) expiry_s = expiry % 60 expiry_m = (expiry // 60) % 60 expiry_h = expiry // 3600 irc.msg(args[0], f'{hostmask[0]}: ' f'The current quote expires in \x02{expiry_h} hour' f'{_plural(expiry_h)}, {expiry_m} minute{_plural(expiry_m)}, and ' f'{expiry_s} second{_plural(expiry_s)}\x02. ' f'Amount of saved quotes: \x02{saved_quotes}\x02.') @register_command('qotd-admins', 'qotd_admins', requires_admin=True) def qotd_admins_cmd(irc, hostmask, is_admin, args): quotes = read_quotes() unlock_quotes() admins_it = quotes.get('admins', ()) if is_discord(irc): admins_it = map('<@{}>'.format, admins_it) admins = ', '.join(admins_it) irc.msg(args[0], f'{hostmask[0]}: Current .qotd admins: {admins}') def _get_admin_id(irc, args): admin = args[1].strip() if is_discord(irc): admin = admin.strip('<@!>') return admin @register_command('qotd-grant', 'qotd_grant', requires_admin=True) def qotd_grant_cmd(irc, hostmask, is_admin, args): quotes = read_quotes() try: admin = _get_admin_id(irc, args) admins = quotes.get('admins') if admins is None: admins = quotes['admins'] = [] elif admin in admins: irc.msg(args[0], f'{hostmask[0]}: That user is already has ' '.add-quote privs!') return admins.append(admin) admins.sort() save_quotes(quotes) finally: unlock_quotes() irc.msg(args[0], f'{hostmask[0]}: Done!') @register_command('qotd-revoke', 'qotd_revoke', requires_admin=True) def qotd_revoke_cmd(irc, hostmask, is_admin, args): quotes = read_quotes() try: admin = _get_admin_id(irc, args) try: quotes.get('admins', []).remove(admin) except ValueError: pass save_quotes(quotes) finally: unlock_quotes() irc.msg(args[0], f'{hostmask[0]}: Done!')