#!/usr/bin/env python3
#
# lurklite quote of the day
#
# Copyright © 2019-2021 by luk3yx
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see .
#
from difflib import SequenceMatcher
import base64, json, math, os, random, tempfile, time, urllib.request
# This isn't the best place to store data but lurklite doesn't have any
# database API.
data_path = os.path.dirname(__file__)
qotd_file = os.path.join(data_path, 'qotd.json')
archive_file = os.path.join(data_path, 'old_quotes.txt')
lockfile = os.path.join(tempfile.gettempdir(), '.qotd-' +
base64.urlsafe_b64encode(qotd_file.encode('utf-8')).decode('ascii'
).rstrip('=') + '.lock')
qotd_static = os.path.join(data_path, 'qotd_current.txt')
try:
with open(os.path.join(data_path, 'qotd_webhook_url.txt')) as f:
webhook_url = f.read().strip()
del f
except FileNotFoundError:
webhook_url = None
qotd_format = ('Quote of the day: \x1d{}\x1d\n\nDisclaimer: Quotes are mostly '
'user-generated content and may not be endorsed by lurklite '
'staff.')
# A quick lockfile implementation
# There may be multiple bot instances (over separate processes) trying to
# access this file.
def lock_quotes():
while True:
try:
with open(lockfile, 'x') as f:
f.write(str(time.time()))
except FileExistsError:
time.sleep(0.05)
else:
break
def unlock_quotes():
os.remove(lockfile)
# Read quotes
def read_quotes():
lock_quotes()
try:
with open(qotd_file, 'r') as f:
return json.load(f)
except:
return {}
def save_quotes(quotes):
raw = json.dumps(quotes)
with open(qotd_file + '~', 'w') as f:
f.write(raw)
os.rename(qotd_file + '~', qotd_file)
if not os.path.exists(qotd_static):
with open(qotd_static, 'w') as f:
f.write(quotes.get('current', ''))
def archive_quote(quote, blame, expiry_time):
# Convert the expiry time into a creation time (provided the expiry time
# is a sane value).
if expiry_time and expiry_time > 1_000_000_000:
t = expiry_time - 86400
else:
t = None
with open(archive_file, 'a') as f:
f.write(json.dumps({'quote': quote, 'blame': blame, 'time': t}) + '\n')
def is_discord(irc):
return hasattr(irc, '_client')
def send_webhook(**req):
if not webhook_url:
return
req = urllib.request.Request(
webhook_url,
data=json.dumps(req).encode('utf-8'),
headers={
'Content-Type': 'application/json',
'User-Agent': 'lurklite-qotd-webhook',
},
method='POST',
)
print(req)
with urllib.request.urlopen(req):
pass
def is_qotd_admin(irc, hostmask, is_admin, args, quotes):
if is_admin:
return True
# if quotes is None:
# quotes = read_quotes()
# unlock_quotes()
n = (hostmask[0].strip('<@!>') if is_discord(irc) else hostmask[-1])
if n in quotes.get('admins', ()):
return True
else:
irc.msg(args[0], hostmask[0] + ': Permission denied!')
return False
def prune_quote_blames(quotes):
if 'blame' not in quotes:
return
q = quotes.get('quotes', ())
quotes['blame'] = {k: v for k, v in quotes['blame'].items() if k in q}
def _aux_quote(quotes, source, quote):
quotes['current'] = quote
if 'blame' not in quotes:
quotes['blame'] = {}
quotes['blame'][quote] = f'Auxiliary quote system ({source})'
def _fetch_json(url):
with urllib.request.urlopen(url, timeout=3) as res:
return json.loads(res.read())
def get_quotable_quote():
data = _fetch_json('https://api.quotable.io/random')
return f'"{data["content"]}" -- {data["author"]}'
# def get_pandorabot_quote():
# pandorabox_quotes = _fetch_json('https://pandorabox.io/api/quotes')
# it = filter(lambda s : s.startswith('<'), pandorabox_quotes)
# return random.choice(tuple(it))
# Get the next quote
def get_quote(channel=None):
quotes = read_quotes()
try:
if channel and channel.lower() in quotes.get('blocklist', ()):
return
if quotes.get('next_update', 0) <= time.time():
# Archive the current quote
if 'current' in quotes:
archive_quote(quotes['current'],
quotes['blame'].get(quotes['current']),
quotes.get('next_update'))
prune_quote_blames(quotes)
# quotes['next_update'] = time.time() + 86400
# Always expire quotes at midnight UTC
time_s = math.floor(time.time())
quotes['next_update'] = (time_s // 86400 + 1) * 86400
if quotes.get('quotes', None):
quotes['current'] = random.choice(quotes['quotes'])
quotes['quotes'].remove(quotes['current'])
# elif (random.randint(1, 5) == 1 and
# hasattr(random, 'get_rule_of_acquisition')):
# _aux_quote(quotes, 'Rule of acquisition',
# random.get_rule_of_acquisition())
# elif random.randint(1, 5) == 1:
# _aux_quote(quotes, 'Pandorabot', get_pandorabot_quote())
else:
_aux_quote(quotes, 'Quotable', get_quotable_quote())
save_quotes(quotes)
with open(qotd_static, 'w') as f:
f.write(qotd_format.format(quotes['current']))
return quotes.get('current', '')
finally:
unlock_quotes()
# The quote of the day command
@register_command('qotd', 'quotd', 'quote')
def qotd_cmd(irc, hostmask, is_admin, args):
""" Get the current quote of the day. """
quote = get_quote(args[0])
irc.msg(args[0], qotd_format.format(quote) if quote is not None else
'Unfortunately, .qotd has been disabled in this channel.')
# @register_command('qotd-skip', requires_admin=True)
# def qotd_skip_cmd(irc, hostmask, is_admin, args):
# quotes = read_quotes()
# try:
# del quotes['next_update']
# save_quotes(quotes)
# except KeyError:
# irc.msg(args[0], 'There is no quote to skip!')
# finally:
# unlock_quotes()
# irc.msg(args[0], 'Quote skipped!')
@register_command('qotd-blame', 'qotd_blame')
def qotd_blame_cmd(irc, hostmask, is_admin, args):
quotes = read_quotes()
unlock_quotes()
adder = 'blame' in quotes and quotes['blame'].get(quotes.get('current'))
if adder:
irc.msg(args[0], f'The current quote was added by {adder!r}.')
else:
irc.msg(args[0], "I don't know who added the current quote.")
def compare_quotes(quotes, quote):
for existing_quote in quotes['blame']:
if SequenceMatcher(None, quote, existing_quote).ratio() >= 0.85:
return existing_quote
# Add a quote
@register_command('add-quote', 'add_quote')
def add_quote_cmd(irc, hostmask, is_admin, args):
""" Add a quote of the day (requires admin). """
quotes = read_quotes()
try:
if not is_qotd_admin(irc, hostmask, is_admin, args, quotes):
return
quote = args[1].strip().replace('\t', ' ')
if quote[:1] not in ('*', '<', '"'):
return irc.msg(args[0], hostmask[0] + ': Invalid quote!')
if 'quotes' not in quotes:
quotes['quotes'] = []
if 'blame' not in quotes:
quotes['blame'] = {}
if quote in quotes['quotes']:
return irc.msg(args[0], hostmask[0] + ': That quote has already '
'been added!')
if quote in quotes['blame']:
return irc.msg(args[0], hostmask[0] + ': That quote was recently '
'removed, you may not re-add it.')
if similar_quote := compare_quotes(quotes, quote):
state = ('an existing' if similar_quote in quotes['quotes']
else 'a recently removed')
return irc.msg(args[0], f'{hostmask[0]}: That quote is too similar'
f' to {state} quote:\n> {similar_quote}')
quotes['quotes'].append(quote)
quotes['blame'][quote] = hostmask[1 if is_discord(irc) else 2]
save_quotes(quotes)
finally:
unlock_quotes()
irc.msg(args[0], hostmask[0] + ': Done!')
send_webhook(embeds=[{
'title': f'{hostmask[1]} added a quote',
'color': 0x008000,
'description': quote,
'footer': {'text': hostmask[2]},
}])
# Remove a quote
@register_command('remove-quote', 'remove_quote')
def remove_quote_cmd(irc, hostmask, is_admin, args):
""" Remove a quote of the day (requires admin). """
quotes = read_quotes()
try:
if not is_qotd_admin(irc, hostmask, is_admin, args, quotes):
return
quote = args[1].strip().replace('\t', ' ')
if quote not in quotes.get('quotes', ()):
msg = 'That quote does not exist!'
#if hostmask[1] == 'Ranger#3292':
# msg = 'Done!'
irc.msg(args[0], hostmask[0] + ': ' + msg)
return
quotes['quotes'].remove(quote)
# Remove from the blame list if this is the same person that added it
blame = quotes.get('blame')
if blame and blame.get(quote) == hostmask[1 if is_discord(irc) else 2]:
del blame[quote]
save_quotes(quotes)
finally:
unlock_quotes()
irc.msg(args[0], hostmask[0] + ': Done!')
send_webhook(embeds=[{
'title': f'{hostmask[1]} removed a quote',
'color': 0xFFA500,
'description': quote,
'footer': {'text': hostmask[2]},
}])
def _plural(n):
return '' if n == 1 else 's'
@register_command('qotd-status', 'qotd_status')
def qotd_status_cmd(irc, hostmask, is_admin, args):
quotes = read_quotes()
unlock_quotes()
if not is_qotd_admin(irc, hostmask, is_admin, args, quotes):
return
saved_quotes = len(quotes.get('quotes', ()))
expiry = max(int(quotes.get('next_update', 0) - time.time()), 0)
expiry_s = expiry % 60
expiry_m = (expiry // 60) % 60
expiry_h = expiry // 3600
irc.msg(args[0], f'{hostmask[0]}: '
f'The current quote expires in \x02{expiry_h} hour'
f'{_plural(expiry_h)}, {expiry_m} minute{_plural(expiry_m)}, and '
f'{expiry_s} second{_plural(expiry_s)}\x02. '
f'Amount of saved quotes: \x02{saved_quotes}\x02.')
@register_command('qotd-admins', 'qotd_admins', requires_admin=True)
def qotd_admins_cmd(irc, hostmask, is_admin, args):
quotes = read_quotes()
unlock_quotes()
admins_it = quotes.get('admins', ())
if is_discord(irc):
admins_it = map('<@{}>'.format, admins_it)
admins = ', '.join(admins_it)
irc.msg(args[0], f'{hostmask[0]}: Current .qotd admins: {admins}')
def _get_admin_id(irc, args):
admin = args[1].strip()
if is_discord(irc):
admin = admin.strip('<@!>')
return admin
@register_command('qotd-grant', 'qotd_grant', requires_admin=True)
def qotd_grant_cmd(irc, hostmask, is_admin, args):
quotes = read_quotes()
try:
admin = _get_admin_id(irc, args)
admins = quotes.get('admins')
if admins is None:
admins = quotes['admins'] = []
elif admin in admins:
irc.msg(args[0], f'{hostmask[0]}: That user is already has '
'.add-quote privs!')
return
admins.append(admin)
admins.sort()
save_quotes(quotes)
finally:
unlock_quotes()
irc.msg(args[0], f'{hostmask[0]}: Done!')
@register_command('qotd-revoke', 'qotd_revoke', requires_admin=True)
def qotd_revoke_cmd(irc, hostmask, is_admin, args):
quotes = read_quotes()
try:
admin = _get_admin_id(irc, args)
try:
quotes.get('admins', []).remove(admin)
except ValueError:
pass
save_quotes(quotes)
finally:
unlock_quotes()
irc.msg(args[0], f'{hostmask[0]}: Done!')