aboutsummaryrefslogtreecommitdiff
path: root/qotd.py
diff options
context:
space:
mode:
Diffstat (limited to 'qotd.py')
-rw-r--r--qotd.py376
1 files changed, 376 insertions, 0 deletions
diff --git a/qotd.py b/qotd.py
new file mode 100644
index 0000000..0874aef
--- /dev/null
+++ b/qotd.py
@@ -0,0 +1,376 @@
+#!/usr/bin/env python3
+#
+# lurklite quote of the day
+#
+# Copyright © 2019-2021 by luk3yx
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as
+# published by the Free Software Foundation, either version 3 of the
+# License, or (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU Affero General Public License for more details.
+#
+# You should have received a copy of the GNU Affero General Public License
+# along with this program. If not, see <https://www.gnu.org/licenses/>.
+#
+
+from difflib import SequenceMatcher
+import base64, json, math, os, random, tempfile, time, urllib.request
+
+# This isn't the best place to store data but lurklite doesn't have any
+# database API.
+data_path = os.path.dirname(__file__)
+qotd_file = os.path.join(data_path, 'qotd.json')
+archive_file = os.path.join(data_path, 'old_quotes.txt')
+lockfile = os.path.join(tempfile.gettempdir(), '.qotd-' +
+ base64.urlsafe_b64encode(qotd_file.encode('utf-8')).decode('ascii'
+ ).rstrip('=') + '.lock')
+qotd_static = os.path.join(data_path, 'qotd_current.txt')
+try:
+ with open(os.path.join(data_path, 'qotd_webhook_url.txt')) as f:
+ webhook_url = f.read().strip()
+ del f
+except FileNotFoundError:
+ webhook_url = None
+
+qotd_format = ('Quote of the day: \x1d{}\x1d\n\nDisclaimer: Quotes are mostly '
+ 'user-generated content and may not be endorsed by lurklite '
+ 'staff.')
+
+# A quick lockfile implementation
+# There may be multiple bot instances (over separate processes) trying to
+# access this file.
+def lock_quotes():
+ while True:
+ try:
+ with open(lockfile, 'x') as f:
+ f.write(str(time.time()))
+ except FileExistsError:
+ time.sleep(0.05)
+ else:
+ break
+
+def unlock_quotes():
+ os.remove(lockfile)
+
+# Read quotes
+def read_quotes():
+ lock_quotes()
+ try:
+ with open(qotd_file, 'r') as f:
+ return json.load(f)
+ except:
+ return {}
+
+def save_quotes(quotes):
+ raw = json.dumps(quotes)
+ with open(qotd_file + '~', 'w') as f:
+ f.write(raw)
+ os.rename(qotd_file + '~', qotd_file)
+
+ if not os.path.exists(qotd_static):
+ with open(qotd_static, 'w') as f:
+ f.write(quotes.get('current', '<No quotes>'))
+
+def archive_quote(quote, blame, expiry_time):
+ # Convert the expiry time into a creation time (provided the expiry time
+ # is a sane value).
+ if expiry_time and expiry_time > 1_000_000_000:
+ t = expiry_time - 86400
+ else:
+ t = None
+ with open(archive_file, 'a') as f:
+ f.write(json.dumps({'quote': quote, 'blame': blame, 'time': t}) + '\n')
+
+def is_discord(irc):
+ return hasattr(irc, '_client')
+
+def send_webhook(**req):
+ if not webhook_url:
+ return
+ req = urllib.request.Request(
+ webhook_url,
+ data=json.dumps(req).encode('utf-8'),
+ headers={
+ 'Content-Type': 'application/json',
+ 'User-Agent': 'lurklite-qotd-webhook',
+ },
+ method='POST',
+ )
+ print(req)
+ with urllib.request.urlopen(req):
+ pass
+
+def is_qotd_admin(irc, hostmask, is_admin, args, quotes):
+ if is_admin:
+ return True
+ # if quotes is None:
+ # quotes = read_quotes()
+ # unlock_quotes()
+ n = (hostmask[0].strip('<@!>') if is_discord(irc) else hostmask[-1])
+ if n in quotes.get('admins', ()):
+ return True
+ else:
+ irc.msg(args[0], hostmask[0] + ': Permission denied!')
+ return False
+
+def prune_quote_blames(quotes):
+ if 'blame' not in quotes:
+ return
+ q = quotes.get('quotes', ())
+ quotes['blame'] = {k: v for k, v in quotes['blame'].items() if k in q}
+
+def _aux_quote(quotes, source, quote):
+ quotes['current'] = quote
+ if 'blame' not in quotes:
+ quotes['blame'] = {}
+ quotes['blame'][quote] = f'Auxiliary quote system ({source})'
+
+def _fetch_json(url):
+ with urllib.request.urlopen(url, timeout=3) as res:
+ return json.loads(res.read())
+
+def get_quotable_quote():
+ data = _fetch_json('https://api.quotable.io/random')
+ return f'"{data["content"]}" -- {data["author"]}'
+
+# def get_pandorabot_quote():
+# pandorabox_quotes = _fetch_json('https://pandorabox.io/api/quotes')
+# it = filter(lambda s : s.startswith('<'), pandorabox_quotes)
+# return random.choice(tuple(it))
+
+# Get the next quote
+def get_quote(channel=None):
+ quotes = read_quotes()
+ try:
+ if channel and channel.lower() in quotes.get('blocklist', ()):
+ return
+
+ if quotes.get('next_update', 0) <= time.time():
+ # Archive the current quote
+ if 'current' in quotes:
+ archive_quote(quotes['current'],
+ quotes['blame'].get(quotes['current']),
+ quotes.get('next_update'))
+ prune_quote_blames(quotes)
+
+ # quotes['next_update'] = time.time() + 86400
+
+ # Always expire quotes at midnight UTC
+ time_s = math.floor(time.time())
+ quotes['next_update'] = (time_s // 86400 + 1) * 86400
+
+ if quotes.get('quotes', None):
+ quotes['current'] = random.choice(quotes['quotes'])
+ quotes['quotes'].remove(quotes['current'])
+ # elif (random.randint(1, 5) == 1 and
+ # hasattr(random, 'get_rule_of_acquisition')):
+ # _aux_quote(quotes, 'Rule of acquisition',
+ # random.get_rule_of_acquisition())
+ # elif random.randint(1, 5) == 1:
+ # _aux_quote(quotes, 'Pandorabot', get_pandorabot_quote())
+ else:
+ _aux_quote(quotes, 'Quotable', get_quotable_quote())
+
+ save_quotes(quotes)
+
+ with open(qotd_static, 'w') as f:
+ f.write(qotd_format.format(quotes['current']))
+
+ return quotes.get('current', '<No quotes>')
+ finally:
+ unlock_quotes()
+
+# The quote of the day command
+@register_command('qotd', 'quotd', 'quote')
+def qotd_cmd(irc, hostmask, is_admin, args):
+ """ Get the current quote of the day. """
+ quote = get_quote(args[0])
+ irc.msg(args[0], qotd_format.format(quote) if quote is not None else
+ 'Unfortunately, .qotd has been disabled in this channel.')
+
+# @register_command('qotd-skip', requires_admin=True)
+# def qotd_skip_cmd(irc, hostmask, is_admin, args):
+# quotes = read_quotes()
+# try:
+# del quotes['next_update']
+# save_quotes(quotes)
+# except KeyError:
+# irc.msg(args[0], 'There is no quote to skip!')
+# finally:
+# unlock_quotes()
+# irc.msg(args[0], 'Quote skipped!')
+
+@register_command('qotd-blame', 'qotd_blame')
+def qotd_blame_cmd(irc, hostmask, is_admin, args):
+ quotes = read_quotes()
+ unlock_quotes()
+ adder = 'blame' in quotes and quotes['blame'].get(quotes.get('current'))
+ if adder:
+ irc.msg(args[0], f'The current quote was added by {adder!r}.')
+ else:
+ irc.msg(args[0], "I don't know who added the current quote.")
+
+def compare_quotes(quotes, quote):
+ for existing_quote in quotes['blame']:
+ if SequenceMatcher(None, quote, existing_quote).ratio() >= 0.85:
+ return existing_quote
+
+# Add a quote
+@register_command('add-quote', 'add_quote')
+def add_quote_cmd(irc, hostmask, is_admin, args):
+ """ Add a quote of the day (requires admin). """
+ quotes = read_quotes()
+ try:
+ if not is_qotd_admin(irc, hostmask, is_admin, args, quotes):
+ return
+
+ quote = args[1].strip().replace('\t', ' ')
+ if quote[:1] not in ('*', '<', '"'):
+ return irc.msg(args[0], hostmask[0] + ': Invalid quote!')
+
+ if 'quotes' not in quotes:
+ quotes['quotes'] = []
+
+ if 'blame' not in quotes:
+ quotes['blame'] = {}
+
+ if quote in quotes['quotes']:
+ return irc.msg(args[0], hostmask[0] + ': That quote has already '
+ 'been added!')
+ if quote in quotes['blame']:
+ return irc.msg(args[0], hostmask[0] + ': That quote was recently '
+ 'removed, you may not re-add it.')
+
+ if similar_quote := compare_quotes(quotes, quote):
+ state = ('an existing' if similar_quote in quotes['quotes']
+ else 'a recently removed')
+ return irc.msg(args[0], f'{hostmask[0]}: That quote is too similar'
+ f' to {state} quote:\n> {similar_quote}')
+
+ quotes['quotes'].append(quote)
+ quotes['blame'][quote] = hostmask[1 if is_discord(irc) else 2]
+ save_quotes(quotes)
+ finally:
+ unlock_quotes()
+
+ irc.msg(args[0], hostmask[0] + ': Done!')
+
+ send_webhook(embeds=[{
+ 'title': f'{hostmask[1]} added a quote',
+ 'color': 0x008000,
+ 'description': quote,
+ 'footer': {'text': hostmask[2]},
+ }])
+
+# Remove a quote
+@register_command('remove-quote', 'remove_quote')
+def remove_quote_cmd(irc, hostmask, is_admin, args):
+ """ Remove a quote of the day (requires admin). """
+ quotes = read_quotes()
+ try:
+ if not is_qotd_admin(irc, hostmask, is_admin, args, quotes):
+ return
+ quote = args[1].strip().replace('\t', ' ')
+ if quote not in quotes.get('quotes', ()):
+ msg = 'That quote does not exist!'
+ #if hostmask[1] == 'Ranger#3292':
+ # msg = 'Done!'
+ irc.msg(args[0], hostmask[0] + ': ' + msg)
+ return
+
+ quotes['quotes'].remove(quote)
+
+ # Remove from the blame list if this is the same person that added it
+ blame = quotes.get('blame')
+ if blame and blame.get(quote) == hostmask[1 if is_discord(irc) else 2]:
+ del blame[quote]
+
+ save_quotes(quotes)
+ finally:
+ unlock_quotes()
+
+ irc.msg(args[0], hostmask[0] + ': Done!')
+
+ send_webhook(embeds=[{
+ 'title': f'{hostmask[1]} removed a quote',
+ 'color': 0xFFA500,
+ 'description': quote,
+ 'footer': {'text': hostmask[2]},
+ }])
+
+def _plural(n):
+ return '' if n == 1 else 's'
+
+@register_command('qotd-status', 'qotd_status')
+def qotd_status_cmd(irc, hostmask, is_admin, args):
+ quotes = read_quotes()
+ unlock_quotes()
+ if not is_qotd_admin(irc, hostmask, is_admin, args, quotes):
+ return
+
+ saved_quotes = len(quotes.get('quotes', ()))
+ expiry = max(int(quotes.get('next_update', 0) - time.time()), 0)
+
+ expiry_s = expiry % 60
+ expiry_m = (expiry // 60) % 60
+ expiry_h = expiry // 3600
+
+ irc.msg(args[0], f'{hostmask[0]}: '
+ f'The current quote expires in \x02{expiry_h} hour'
+ f'{_plural(expiry_h)}, {expiry_m} minute{_plural(expiry_m)}, and '
+ f'{expiry_s} second{_plural(expiry_s)}\x02. '
+ f'Amount of saved quotes: \x02{saved_quotes}\x02.')
+
+@register_command('qotd-admins', 'qotd_admins', requires_admin=True)
+def qotd_admins_cmd(irc, hostmask, is_admin, args):
+ quotes = read_quotes()
+ unlock_quotes()
+ admins_it = quotes.get('admins', ())
+ if is_discord(irc):
+ admins_it = map('<@{}>'.format, admins_it)
+ admins = ', '.join(admins_it)
+ irc.msg(args[0], f'{hostmask[0]}: Current .qotd admins: {admins}')
+
+def _get_admin_id(irc, args):
+ admin = args[1].strip()
+ if is_discord(irc):
+ admin = admin.strip('<@!>')
+ return admin
+
+@register_command('qotd-grant', 'qotd_grant', requires_admin=True)
+def qotd_grant_cmd(irc, hostmask, is_admin, args):
+ quotes = read_quotes()
+ try:
+ admin = _get_admin_id(irc, args)
+ admins = quotes.get('admins')
+ if admins is None:
+ admins = quotes['admins'] = []
+ elif admin in admins:
+ irc.msg(args[0], f'{hostmask[0]}: That user is already has '
+ '.add-quote privs!')
+ return
+ admins.append(admin)
+ admins.sort()
+ save_quotes(quotes)
+ finally:
+ unlock_quotes()
+ irc.msg(args[0], f'{hostmask[0]}: Done!')
+
+@register_command('qotd-revoke', 'qotd_revoke', requires_admin=True)
+def qotd_revoke_cmd(irc, hostmask, is_admin, args):
+ quotes = read_quotes()
+ try:
+ admin = _get_admin_id(irc, args)
+ try:
+ quotes.get('admins', []).remove(admin)
+ except ValueError:
+ pass
+ save_quotes(quotes)
+ finally:
+ unlock_quotes()
+ irc.msg(args[0], f'{hostmask[0]}: Done!')