1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
|
#!/usr/bin/env python3
#
# This is very horrible and quickly written
# But it works
#
from __future__ import annotations
from collections.abc import Iterator, Sequence
from typing import Optional
import datetime, miniirc, re # type: ignore
assert miniirc.ver >= (1,8,1)
_LEADING_COLON = '' if miniirc.ver[0] > 2 else ':'
_esc_re = re.compile(r'\\(.)')
# Backslash must be first
_idc_escapes = {'\\': '\\\\', 'r': '\r', 'n': '\n', 't': '\t'}
def _get_idc_args(command: str, kwargs: dict[str, Optional[str]]
) -> Iterator[str]:
yield command
for key, value in kwargs.items():
if value is not None:
for escape_char, char in _idc_escapes.items():
value = value.replace(char, '\\' + escape_char)
yield f'{key.upper()}:{value}'
class IDC(miniirc.IRC):
def idc_message_parser(
self, msg: str
) -> Optional[tuple[str, tuple[str, str, str], dict[str, str], list[str]]]:
idc_cmd = None
idc_args = {}
for arg in msg.split('\t'):
if ':' in arg:
key, value = arg.split(':', 1)
idc_args[key] = _esc_re.sub(
lambda m: _idc_escapes.get(m.group(1), '\ufffd'),
value
)
else:
idc_cmd = arg
# Translate IDC keyword arguments into IRC positional ones
if idc_cmd == 'PRIVMSG':
msg = idc_args['MESSAGE']
command = 'PRIVMSG'
msg_type = idc_args.get('TYPE')
if msg_type == 'NOTICE':
command = 'NOTICE'
elif msg_type == 'ACTION':
msg = f'\x01ACTION {msg}\x01'
args = [self.current_nick, msg]
elif idc_cmd == 'CHANMSG':
command = 'PRIVMSG'
args = [idc_args['CHAN'], idc_args['MESSAGE']]
elif idc_cmd == 'RPL_LOGIN_GOOD':
command = '001'
args = [self.current_nick, f'Welcome to IDC {self.current_nick}']
elif idc_cmd == 'PONG':
command = 'PONG'
args = [idc_args.get('COOKIE', '')]
else:
return None
# Add generic parameters
tags = {}
if 'SOURCE' in idc_args:
user = idc_args['SOURCE']
hostmask = (user, user, user)
tags['account'] = user
else:
hostmask = ('', '', '')
if 'TS' in idc_args:
dt = datetime.datetime.utcfromtimestamp(float(idc_args['TS']))
tags['time'] = dt.isoformat() + 'Z'
if 'ID' in idc_args:
tags['label'] = idc_args['ID']
if args and _LEADING_COLON:
args[-1] = _LEADING_COLON + args[-1]
return command, hostmask, tags, args
# Send raw messages
def idc_send(self, command: str, **kwargs: Optional[str]):
super().quote('\t'.join(_get_idc_args(command, kwargs)), force=True)
def quote(self, *msg: str, force: Optional[bool] = None,
tags: Optional[dict[str, str]] = None) -> None:
cmd, _, tags2, args = miniirc.ircv3_message_parser(' '.join(msg))
if miniirc.ver[0] < 2 and args and args[-1].startswith(':'):
args[-1] = args[-1][1:]
self.send(cmd, *args, force=force, tags=tags or tags2)
def _get_idc_account(self) -> Sequence[str]:
if isinstance(self.ns_identity, tuple):
return self.ns_identity
else:
return self.ns_identity.split(' ', 1)
@property
def current_nick(self) -> str:
return self._get_idc_account()[0]
def send(self, cmd: str, *args: str, force: Optional[bool] = None,
tags: Optional[dict[str, str]] = None) -> None:
cmd = cmd.upper()
label = tags.get('label') if tags else None
if cmd in ('PRIVMSG', 'NOTICE'):
target = args[0]
# TODO: Make miniirc think that SASL worked PMs to NickServ don't
# have to be blocked.
if target == 'NickServ':
return
msg = args[1]
msg_type: Optional[str]
if cmd == 'NOTICE':
msg_type = 'NOTICE'
elif msg.startswith('\x01ACTION'):
msg = msg[8:].rstrip('\x01')
msg_type = 'ACTION'
else:
msg_type = None
self.idc_send('CHANMSG' if target.startswith('#') else 'PRIVMSG',
target=target, type=msg_type, message=msg, id=label)
elif cmd == 'PING':
self.idc_send('PING', cookie=args[0], id=label)
elif cmd == 'USER':
user, password = self._get_idc_account()
self.idc_send('USER', user=user, password=password, id=label)
# Override the message parser to change the default parser.
def change_parser(self, parser=None):
super().change_parser(parser or self.idc_message_parser)
|