summaryrefslogtreecommitdiff
path: root/miniirc_idc.py
blob: 273dc62549551ff08e2a804f4bcb4293c51ca58f (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
#!/usr/bin/env python3
#
# This is very horrible and quickly written
# But it works
#

from __future__ import annotations
from collections.abc import Iterator, Sequence
from typing import Optional
import datetime, miniirc, re  # type: ignore
assert miniirc.ver >= (1,8,1)


_LEADING_COLON = '' if miniirc.ver[0] > 2 else ':'
_esc_re = re.compile(r'\\(.)')

# Backslash must be first
_idc_escapes = {'\\': '\\\\', 'r': '\r', 'n': '\n', 't': '\t'}


def _get_idc_args(command: str, kwargs: dict[str, Optional[str]]
                  ) -> Iterator[str]:
    yield command
    for key, value in kwargs.items():
        if value is not None:
            for escape_char, char in _idc_escapes.items():
                value = value.replace(char, '\\' + escape_char)
            yield f'{key.upper()}:{value}'


class IDC(miniirc.IRC):
    def idc_message_parser(
        self, msg: str
    ) -> Optional[tuple[str, tuple[str, str, str], dict[str, str], list[str]]]:
        idc_cmd = None
        idc_args = {}
        for arg in msg.split('\t'):
            if ':' in arg:
                key, value = arg.split(':', 1)
                idc_args[key] = _esc_re.sub(
                    lambda m: _idc_escapes.get(m.group(1), '\ufffd'),
                    value
                )
            else:
                idc_cmd = arg

        # Translate IDC keyword arguments into IRC positional ones
        if idc_cmd == 'PRIVMSG':
            msg = idc_args['MESSAGE']
            command = 'PRIVMSG'
            msg_type = idc_args.get('TYPE')
            if msg_type == 'NOTICE':
                command = 'NOTICE'
            elif msg_type == 'ACTION':
                msg = f'\x01ACTION {msg}\x01'
            args = [self.current_nick, msg]
        elif idc_cmd == 'CHANMSG':
            command = 'PRIVMSG'
            args = [idc_args['CHAN'], idc_args['MESSAGE']]
        elif idc_cmd == 'RPL_LOGIN_GOOD':
            command = '001'
            args = [self.current_nick, f'Welcome to IDC {self.current_nick}']
        elif idc_cmd == 'PONG':
            command = 'PONG'
            args = [idc_args.get('COOKIE', '')]
        else:
            return None

        # Add generic parameters
        tags = {}
        if 'SOURCE' in idc_args:
            user = idc_args['SOURCE']
            hostmask = (user, user, user)
            tags['account'] = user
        else:
            hostmask = ('', '', '')

        if 'TS' in idc_args:
            dt = datetime.datetime.utcfromtimestamp(float(idc_args['TS']))
            tags['time'] = dt.isoformat() + 'Z'

        if 'ID' in idc_args:
            tags['label'] = idc_args['ID']

        if args and _LEADING_COLON:
            args[-1] = _LEADING_COLON + args[-1]
        return command, hostmask, tags, args

    # Send raw messages
    def idc_send(self, command: str, **kwargs: Optional[str]):
        super().quote('\t'.join(_get_idc_args(command, kwargs)), force=True)

    def quote(self, *msg: str, force: Optional[bool] = None,
              tags: Optional[dict[str, str]] = None) -> None:
        cmd, _, tags2, args = miniirc.ircv3_message_parser(' '.join(msg))
        if miniirc.ver[0] < 2 and args and args[-1].startswith(':'):
            args[-1] = args[-1][1:]
        self.send(cmd, *args, force=force, tags=tags or tags2)

    def _get_idc_account(self) -> Sequence[str]:
        if isinstance(self.ns_identity, tuple):
            return self.ns_identity
        else:
            return self.ns_identity.split(' ', 1)

    @property
    def current_nick(self) -> str:
        return self._get_idc_account()[0]

    def send(self, cmd: str, *args: str, force: Optional[bool] = None,
             tags: Optional[dict[str, str]] = None) -> None:
        cmd = cmd.upper()
        label = tags.get('label') if tags else None
        if cmd in ('PRIVMSG', 'NOTICE'):
            target = args[0]
            # TODO: Make miniirc think that SASL worked PMs to NickServ don't
            # have to be blocked.
            if target == 'NickServ':
                return

            msg = args[1]
            msg_type: Optional[str]
            if cmd == 'NOTICE':
                msg_type = 'NOTICE'
            elif msg.startswith('\x01ACTION'):
                msg = msg[8:].rstrip('\x01')
                msg_type = 'ACTION'
            else:
                msg_type = None

            self.idc_send('CHANMSG' if target.startswith('#') else 'PRIVMSG',
                          target=target, type=msg_type, message=msg, id=label)
        elif cmd == 'PING':
            self.idc_send('PING', cookie=args[0], id=label)
        elif cmd == 'USER':
            user, password = self._get_idc_account()
            self.idc_send('USER', user=user, password=password, id=label)

    # Override the message parser to change the default parser.
    def change_parser(self, parser=None):
        super().change_parser(parser or self.idc_message_parser)