summaryrefslogtreecommitdiff
path: root/miniirc_idc.py
diff options
context:
space:
mode:
authorluk3yx <luk3yx@users.noreply.github.com>2022-04-24 14:59:23 +1200
committerluk3yx <luk3yx@users.noreply.github.com>2022-04-24 14:59:23 +1200
commit10e2ee5f5083d544dbc159e02474d156d82d208b (patch)
tree8a3f695969bdbb66e96ec5c1b5fe997660e7ec4c /miniirc_idc.py
parentf71a85aeb6366a264929d526b4ea708114a1884b (diff)
downloadminiirc_idc-10e2ee5f5083d544dbc159e02474d156d82d208b.tar.gz
miniirc_idc-10e2ee5f5083d544dbc159e02474d156d82d208b.zip
New message format (again)
Diffstat (limited to 'miniirc_idc.py')
-rw-r--r--miniirc_idc.py125
1 files changed, 87 insertions, 38 deletions
diff --git a/miniirc_idc.py b/miniirc_idc.py
index 77dca2c..3e32454 100644
--- a/miniirc_idc.py
+++ b/miniirc_idc.py
@@ -4,90 +4,139 @@
# But it works
#
-import datetime, miniirc, re
+from __future__ import annotations
+from collections.abc import Iterator, Sequence
+from typing import Optional
+import datetime, miniirc, re # type: ignore
assert miniirc.ver >= (1,8,1)
_LEADING_COLON = '' if miniirc.ver[0] > 2 else ':'
-_esc_re = re.compile(r'\\(.)|\t')
+_esc_re = re.compile(r'\\(.)')
+# Backslash must be first
+_idc_escapes = {'\\': '\\\\', 'r': '\r', 'n': '\n', 't': '\t'}
-def _unix_str_to_iso(unix_str):
- dt = datetime.datetime.utcfromtimestamp(float(unix_str))
- return dt.isoformat() + 'Z'
+
+def _get_idc_args(command: str, kwargs: dict[str, Optional[str]]
+ ) -> Iterator[str]:
+ yield command
+ for key, value in kwargs.items():
+ if value is not None:
+ for escape_char, char in _idc_escapes.items():
+ value = value.replace(char, '\\' + escape_char)
+ yield f'{key.upper()}:{value}'
class IDC(miniirc.IRC):
- def idc_message_parser(self, msg):
- args = _esc_re.sub(lambda m: m.group(1) or '\udeff', msg
- ).split('\udeff')
+ def idc_message_parser(
+ self, msg: str
+ ) -> Optional[tuple[str, tuple[str, str, str], dict[str, str], list[str]]]:
+ idc_cmd = None
+ idc_args = {}
+ for arg in msg.split('\t'):
+ if ':' in arg:
+ key, value = arg.split(':', 1)
+ idc_args[key] = _esc_re.sub(
+ lambda m: _idc_escapes.get(m.group(1), '\ufffd'),
+ value
+ )
+ else:
+ idc_cmd = arg
+
+ # Translate IDC keyword arguments into IRC positional ones
+ if idc_cmd == 'PRIVMSG':
+ msg = idc_args['MESSAGE']
+ msg_type = idc_args.get('TYPE')
+ command = 'PRIVMSG'
+ if msg_type == 'NOTICE':
+ command = 'NOTICE'
+ elif msg_type:
+ msg = f'\x01{idc_args["TYPE"]} {msg}\x01'
+ args = [self.current_nick, msg]
+ elif idc_cmd == 'CHANMSG':
+ command = 'PRIVMSG'
+ args = [idc_args['CHAN'], idc_args['MESSAGE']]
+ elif idc_cmd == 'RPL_LOGIN_GOOD':
+ command = '001'
+ args = [self.current_nick, f'Welcome to IDC {self.current_nick}']
+ elif idc_cmd == 'PONG':
+ command = 'PONG'
+ args = [idc_args.get('COOKIE', '')]
+ else:
+ return None
+ # Add generic parameters
tags = {}
- if args[0].startswith(':'):
- user = args.pop(0)[1:]
+ if 'SOURCE' in idc_args:
+ user = idc_args['SOURCE']
hostmask = (user, user, user)
tags['account'] = user
else:
hostmask = ('', '', '')
- command = args.pop(0).upper()
- if command == 'PRIVMSG':
- tags['time'] = _unix_str_to_iso(args[0])
- args = [self.current_nick, args[1]]
- elif command == 'CHANMSG':
- command = 'PRIVMSG'
- tags['time'] = _unix_str_to_iso(args[0])
- args = [args[1], args[2]]
- elif command == 'RPL_LOGIN_GOOD':
- command = '001'
- args = [self.current_nick, f'Welcome to IDC {self.current_nick}']
+ if 'TS' in idc_args:
+ dt = datetime.datetime.utcfromtimestamp(float(idc_args['TS']))
+ tags['time'] = dt.isoformat() + 'Z'
+
+ if 'ID' in idc_args:
+ tags['label'] = idc_args['ID']
if args and _LEADING_COLON:
args[-1] = _LEADING_COLON + args[-1]
return command, hostmask, tags, args
# Send raw messages
- def idc_send(self, *args):
- line = '\t'.join(arg.replace('\\', '\\\\').replace('\t', '\\\t')
- for arg in args)
- super().quote(line, force=True)
-
- if miniirc.ver < (2, 0, 0):
- @property
- def _sock(self):
- return self.sock
+ def idc_send(self, command: str, **kwargs: Optional[str]):
+ super().quote('\t'.join(_get_idc_args(command, kwargs)), force=True)
- def quote(self, *msg, force=None, tags=None) -> None:
+ def quote(self, *msg: str, force: Optional[bool] = None,
+ tags: Optional[dict[str, str]] = None) -> None:
cmd, _, tags2, args = miniirc.ircv3_message_parser(' '.join(msg))
if miniirc.ver[0] < 2 and args and args[-1].startswith(':'):
args[-1] = args[-1][1:]
self.send(cmd, *args, force=force, tags=tags or tags2)
- def _get_idc_account(self):
+ def _get_idc_account(self) -> Sequence[str]:
if isinstance(self.ns_identity, tuple):
return self.ns_identity
else:
return self.ns_identity.split(' ', 1)
@property
- def current_nick(self):
+ def current_nick(self) -> str:
return self._get_idc_account()[0]
- def send(self, cmd, *args, force=None, tags=None) -> None:
+ def send(self, cmd: str, *args: str, force: Optional[bool] = None,
+ tags: Optional[dict[str, str]] = None) -> None:
cmd = cmd.upper()
+ label = tags.get('label') if tags else None
if cmd in ('PRIVMSG', 'NOTICE'):
target = args[0]
# TODO: Make miniirc think that SASL worked PMs to NickServ don't
# have to be blocked.
if target == 'NickServ':
return
+ msg_type = None
+ msg = args[1]
+ if msg.startswith('\x01'):
+ msg = msg.rstrip('\x01')
+ try:
+ msg_type, msg = msg.split(' ', 1)
+ except ValueError:
+ msg_type, msg = msg, ''
+ if cmd == 'NOTICE':
+ msg_type += '-REPLY'
+ elif cmd == 'NOTICE':
+ msg_type = 'NOTICE'
+
self.idc_send('CHANMSG' if target.startswith('#') else 'PRIVMSG',
- *args)
- elif cmd in ('PING', 'QUIT'):
- self.idc_send(cmd, *args)
+ target=target, type=msg_type, message=msg, id=label)
+ elif cmd == 'PING':
+ self.idc_send('PING', cookie=args[0], id=label)
elif cmd == 'USER':
user, password = self._get_idc_account()
- self.idc_send('USER', user, password)
+ self.idc_send('USER', user=user, password=password, id=label)
# Override the message parser to change the default parser.
def change_parser(self, parser=None):