From 10e2ee5f5083d544dbc159e02474d156d82d208b Mon Sep 17 00:00:00 2001 From: luk3yx Date: Sun, 24 Apr 2022 14:59:23 +1200 Subject: New message format (again) --- miniirc_idc.py | 125 +++++++++++++++++++++++++++++++++++++++------------------ 1 file changed, 87 insertions(+), 38 deletions(-) diff --git a/miniirc_idc.py b/miniirc_idc.py index 77dca2c..3e32454 100644 --- a/miniirc_idc.py +++ b/miniirc_idc.py @@ -4,90 +4,139 @@ # But it works # -import datetime, miniirc, re +from __future__ import annotations +from collections.abc import Iterator, Sequence +from typing import Optional +import datetime, miniirc, re # type: ignore assert miniirc.ver >= (1,8,1) _LEADING_COLON = '' if miniirc.ver[0] > 2 else ':' -_esc_re = re.compile(r'\\(.)|\t') +_esc_re = re.compile(r'\\(.)') +# Backslash must be first +_idc_escapes = {'\\': '\\\\', 'r': '\r', 'n': '\n', 't': '\t'} -def _unix_str_to_iso(unix_str): - dt = datetime.datetime.utcfromtimestamp(float(unix_str)) - return dt.isoformat() + 'Z' + +def _get_idc_args(command: str, kwargs: dict[str, Optional[str]] + ) -> Iterator[str]: + yield command + for key, value in kwargs.items(): + if value is not None: + for escape_char, char in _idc_escapes.items(): + value = value.replace(char, '\\' + escape_char) + yield f'{key.upper()}:{value}' class IDC(miniirc.IRC): - def idc_message_parser(self, msg): - args = _esc_re.sub(lambda m: m.group(1) or '\udeff', msg - ).split('\udeff') + def idc_message_parser( + self, msg: str + ) -> Optional[tuple[str, tuple[str, str, str], dict[str, str], list[str]]]: + idc_cmd = None + idc_args = {} + for arg in msg.split('\t'): + if ':' in arg: + key, value = arg.split(':', 1) + idc_args[key] = _esc_re.sub( + lambda m: _idc_escapes.get(m.group(1), '\ufffd'), + value + ) + else: + idc_cmd = arg + + # Translate IDC keyword arguments into IRC positional ones + if idc_cmd == 'PRIVMSG': + msg = idc_args['MESSAGE'] + msg_type = idc_args.get('TYPE') + command = 'PRIVMSG' + if msg_type == 'NOTICE': + command = 'NOTICE' + elif msg_type: + msg = f'\x01{idc_args["TYPE"]} {msg}\x01' + args = [self.current_nick, msg] + elif idc_cmd == 'CHANMSG': + command = 'PRIVMSG' + args = [idc_args['CHAN'], idc_args['MESSAGE']] + elif idc_cmd == 'RPL_LOGIN_GOOD': + command = '001' + args = [self.current_nick, f'Welcome to IDC {self.current_nick}'] + elif idc_cmd == 'PONG': + command = 'PONG' + args = [idc_args.get('COOKIE', '')] + else: + return None + # Add generic parameters tags = {} - if args[0].startswith(':'): - user = args.pop(0)[1:] + if 'SOURCE' in idc_args: + user = idc_args['SOURCE'] hostmask = (user, user, user) tags['account'] = user else: hostmask = ('', '', '') - command = args.pop(0).upper() - if command == 'PRIVMSG': - tags['time'] = _unix_str_to_iso(args[0]) - args = [self.current_nick, args[1]] - elif command == 'CHANMSG': - command = 'PRIVMSG' - tags['time'] = _unix_str_to_iso(args[0]) - args = [args[1], args[2]] - elif command == 'RPL_LOGIN_GOOD': - command = '001' - args = [self.current_nick, f'Welcome to IDC {self.current_nick}'] + if 'TS' in idc_args: + dt = datetime.datetime.utcfromtimestamp(float(idc_args['TS'])) + tags['time'] = dt.isoformat() + 'Z' + + if 'ID' in idc_args: + tags['label'] = idc_args['ID'] if args and _LEADING_COLON: args[-1] = _LEADING_COLON + args[-1] return command, hostmask, tags, args # Send raw messages - def idc_send(self, *args): - line = '\t'.join(arg.replace('\\', '\\\\').replace('\t', '\\\t') - for arg in args) - super().quote(line, force=True) - - if miniirc.ver < (2, 0, 0): - @property - def _sock(self): - return self.sock + def idc_send(self, command: str, **kwargs: Optional[str]): + super().quote('\t'.join(_get_idc_args(command, kwargs)), force=True) - def quote(self, *msg, force=None, tags=None) -> None: + def quote(self, *msg: str, force: Optional[bool] = None, + tags: Optional[dict[str, str]] = None) -> None: cmd, _, tags2, args = miniirc.ircv3_message_parser(' '.join(msg)) if miniirc.ver[0] < 2 and args and args[-1].startswith(':'): args[-1] = args[-1][1:] self.send(cmd, *args, force=force, tags=tags or tags2) - def _get_idc_account(self): + def _get_idc_account(self) -> Sequence[str]: if isinstance(self.ns_identity, tuple): return self.ns_identity else: return self.ns_identity.split(' ', 1) @property - def current_nick(self): + def current_nick(self) -> str: return self._get_idc_account()[0] - def send(self, cmd, *args, force=None, tags=None) -> None: + def send(self, cmd: str, *args: str, force: Optional[bool] = None, + tags: Optional[dict[str, str]] = None) -> None: cmd = cmd.upper() + label = tags.get('label') if tags else None if cmd in ('PRIVMSG', 'NOTICE'): target = args[0] # TODO: Make miniirc think that SASL worked PMs to NickServ don't # have to be blocked. if target == 'NickServ': return + msg_type = None + msg = args[1] + if msg.startswith('\x01'): + msg = msg.rstrip('\x01') + try: + msg_type, msg = msg.split(' ', 1) + except ValueError: + msg_type, msg = msg, '' + if cmd == 'NOTICE': + msg_type += '-REPLY' + elif cmd == 'NOTICE': + msg_type = 'NOTICE' + self.idc_send('CHANMSG' if target.startswith('#') else 'PRIVMSG', - *args) - elif cmd in ('PING', 'QUIT'): - self.idc_send(cmd, *args) + target=target, type=msg_type, message=msg, id=label) + elif cmd == 'PING': + self.idc_send('PING', cookie=args[0], id=label) elif cmd == 'USER': user, password = self._get_idc_account() - self.idc_send('USER', user, password) + self.idc_send('USER', user=user, password=password, id=label) # Override the message parser to change the default parser. def change_parser(self, parser=None): -- cgit v1.2.3