aboutsummaryrefslogtreecommitdiff
path: root/currency.liteonly.py
blob: 7288676f1e038fd15487ba1cb65b880bd290991e (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
"""
Based on currency.py - Sopel Currency Conversion Plugin
Copyright 2013, Elsie Powell, embolalia.com
Copyright 2019, Mikkel Jeppesen
Licensed under the Eiffel Forum License 2.

Original currency.py: https://sopel.chat
"""

import logging
import re
import time
import urllib.parse

import requests


# Which data provider to use (some of which require no API key)
FIAT_PROVIDER = 'exchangerate.host'

FIXER_IO_KEY = None
FIXER_USE_SSL = False


FIAT_PROVIDERS = {
    'exchangerate.host': 'https://api.exchangerate.host/latest?base=EUR',
    'fixer.io': '//data.fixer.io/api/latest?base=EUR&access_key={}',
}
CRYPTO_URL = 'https://api.coingecko.com/api/v3/exchange_rates'
EXCHANGE_REGEX = re.compile(r'''
    ^(\d+(?:\.\d+)?)                       # Decimal number
    \s*([a-zA-Z]{3})                       # 3-letter currency code
    \s+(?:in|as|of|to)\s+                  # preposition
    (([a-zA-Z]{3}$)|([a-zA-Z]{3})\s)+$     # one or more 3-letter currency code
''', re.VERBOSE)
LOGGER = logging.getLogger(__name__)
UNSUPPORTED_CURRENCY = "Sorry, {} isn't currently supported."
USAGE = "Usage: .cur 100 usd in btc cad eur"
UNRECOGNIZED_INPUT = f"Sorry, I didn't understand the input. {USAGE}"

rates = {}
rates_updated = 0.0


class FixerError(Exception):
    """A Fixer.io API Error Exception"""
    def __init__(self, status):
        super().__init__("FixerError: {}".format(status))


class UnsupportedCurrencyError(Exception):
    """A currency is currently not supported by the API"""
    def __init__(self, currency):
        super().__init__(currency)


def build_reply(amount, base, target, out_string):
    rate_raw = get_rate(base, target)
    rate = float(rate_raw)
    result = float(rate * amount)

    digits = 0
    # up to 10 (8+2) digits precision when result is less than 1
    # as smaller results need more precision
    while digits < 8 and 1 / 10**digits > result:
        digits += 1

    digits += 2

    out_string += ' {value:,.{precision}f} {currency},'.format(
        value=result, precision=digits, currency=target
    )

    return out_string


def exchange(match):
    """Show the exchange rate between two currencies"""
    if not match:
        return UNRECOGNIZED_INPUT

    # Try and update rates. Rate-limiting is done in update_rates()
    try:
        update_rates()
    except requests.exceptions.RequestException as err:
        traceback.print_exc()
        return "Something went wrong while I was getting the exchange rate."
    except (KeyError, ValueError) as err:
        traceback.print_exc()
        return "Error: Could not update exchange rates. Try again later."
    except FixerError as err:
        traceback.print_exc()
        return 'Sorry, something went wrong with Fixer'

    query = match.string
    amount_in, base, _, *targets = query.split()

    try:
        amount = float(amount_in)
    except ValueError:
        return UNRECOGNIZED_INPUT
    except OverflowError:
        return "Sorry, input amount was out of range."

    if not amount:
        return "Zero is zero, no matter what country you're in."

    out_string = '{} {} is'.format(amount_in, base.upper())

    unsupported_currencies = []
    for target in targets:
        try:
            out_string = build_reply(amount, base.upper(), target.upper(),
                                     out_string)
        except ValueError:
            return "Error: Raw rate wasn't a float"
        except KeyError as err:
            return "Error: Invalid rates"
        except UnsupportedCurrencyError as cur:
            unsupported_currencies.append(cur)

    if unsupported_currencies:
        out_string = out_string + ' (unsupported:'
        for target in unsupported_currencies:
            out_string = out_string + ' {},'.format(target)
        out_string = out_string[0:-1] + ')'
    else:
        out_string = out_string[0:-1]

    return out_string


def get_rate(base, target):
    base = base.upper()
    target = target.upper()

    if base not in rates:
        raise UnsupportedCurrencyError(base)

    if target not in rates:
        raise UnsupportedCurrencyError(target)

    return (1 / rates[base]) * rates[target]


def update_rates():
    global rates, rates_updated

    # If we have data that is less than 24h old, return
    if time.time() - rates_updated < 24 * 60 * 60:
        return

    # Update crypto rates
    response = requests.get(CRYPTO_URL)
    response.raise_for_status()
    rates_crypto = response.json()

    # Update fiat rates
    if FIXER_IO_KEY is not None:
        assert FIAT_PROVIDER == 'fixer.io'

        proto = 'https:' if FIXER_USE_SSL else 'http:'
        response = requests.get(
            proto +
            FIAT_PROVIDERS['fixer.io'].format(
                urllib.parse.quote(FIXER_IO_KEY)
            )
        )

        if not response.json()['success']:
            raise FixerError('Fixer.io request failed with error: {}'.format(
                response.json()['error']
            ))
    else:
        LOGGER.debug('Updating fiat rates from %s', FIAT_PROVIDER)
        response = requests.get(FIAT_PROVIDERS[FIAT_PROVIDER])

    response.raise_for_status()
    rates_fiat = response.json()

    rates = rates_fiat['rates']
    rates['EUR'] = 1.0  # Put this here to make logic easier

    eur_btc_rate = 1 / rates_crypto['rates']['eur']['value']

    for rate in rates_crypto['rates']:
        if rate.upper() not in rates:
            rates[rate.upper()] = (rates_crypto['rates'][rate]['value'] *
                                   eur_btc_rate)

    # if an error aborted the operation prematurely, we want the next call to
    # retry updating rates therefore we'll update the stored timestamp at the
    # last possible moment
    rates_updated = time.time()


@register_command('cur', 'currency', 'exchange')
def exchange_cmd(irc, hostmask, is_admin, args):
    """Show the exchange rate between two currencies."""
    if param := args[1].strip():
        msg = exchange(EXCHANGE_REGEX.match(param))
    else:
        msg = f"No search term. {USAGE}"

    irc.msg(args[0], f'{hostmask[0]}: {msg}')