#!/usr/bin/env python3 # # IDC to IRC proxy (based off an old miniirc proxy) # # © 2022 by luk3yx # import datetime, miniirc, miniirc_idc, os, socket, threading, traceback from concurrent.futures import ThreadPoolExecutor from miniirc_extras.utils import (ircv2_message_unparser, ircv3_message_parser, ircv3_message_unparser) # A single network class Proxy: _buffer = b'' IRC = miniirc_idc.IDC encoding = 'utf-8' _001 = False block_incoming = frozenset(('PING', 'CAP', 'AUTHENTICATE')) block_outgoing = frozenset(('CAP',)) _advertised_caps = frozenset(( 'server-time', 'message-tags', 'account-tag', 'echo-message' )) # Create the IRC object def __init__(self, conn, *args, **kwargs): self.sock = conn self.irc = self.IRC(*args, auto_connect=False, executor=ThreadPoolExecutor(1), **kwargs) self._irc_msg_unparser = ircv2_message_unparser # Add the IRC handler self.irc.CmdHandler(ircv3=True, colon=False)(self._miniirc_handler) # Start the main loop self.thread = threading.Thread(target=self._init_thread) self.thread.start() # Send messages def send(self, cmd, hostmask, tags, args): if hostmask == ('', '', ''): hostmask = None raw = self._irc_msg_unparser( cmd, hostmask or (cmd, cmd, cmd), tags, args, colon=False, encoding=self.encoding ) self.sock.sendall(raw + b'\r\n') # Receive messages def recv(self): while True: while b'\n' not in self._buffer: buf = self.sock.recv(4096) if not buf: raise BrokenPipeError self._buffer += buf.replace(b'\r', b'\n') msg, self._buffer = self._buffer.split(b'\n', 1) msg = msg.decode(self.encoding, 'replace') if msg: cmd, _, tags, args = ircv3_message_parser(msg, colon=False) return tags, cmd.upper(), args # Handle everything def _miniirc_handler(self, irc, cmd, hostmask, tags, args): if cmd.startswith('IRCV3 ') or cmd in self.block_incoming: return elif cmd == '001': if self._001: return self._001 = True # Clear the SendQ if self._sendq: while len(self._sendq) > 0: self._sendcmd(*self._sendq.pop(0)) # Start the main loop threading.Thread(target=self._main).start() elif cmd == 'ERROR': self.send('PING', None, {}, [':ERROR']) elif cmd == 'PONG': # Translate proxied pings back if not args or not args[-1].startswith('proxy:'): return args = [args[0], args[-1][6:]] elif cmd == 'PRIVMSG' and ('\n' in args[-1] or '\r' in args[-1]): message = args[-1].replace('\r\n', '\n').replace('\r', '\n') if 'time' not in tags: tags = { 'time': datetime.datetime.utcnow().isoformat( timespec='milliseconds' ) + 'Z', **tags } for i, line in enumerate(message.split('\n', 9)): if i > 0: line = '\u200b' + line self.send(cmd, hostmask, tags, [args[0], line]) if i == 0: tags = {'time': tags['time']} return # Send the command to the client try: self.send(cmd, hostmask, tags, args) except Exception: traceback.print_exc() self.irc.disconnect('Connection closed.', auto_reconnect=False) # The initial main loop def _init_thread(self): self._sendq = [] nick = None user = None cap_negotiation = False # Wait for NICK and USER to be sent while cap_negotiation or not nick or not user: try: tags, cmd, args = self.recv() except BrokenPipeError: return if cmd == 'NICK' and len(args) == 1: nick = args[0] elif cmd == 'USER' and len(args) > 1: user = args elif cmd == 'CAP' and args: subcmd = args[0].upper() if subcmd == 'LS': cap_negotiation = True self.send('CAP', None, {}, ('*', 'LS', ' '.join(self._advertised_caps))) elif subcmd == 'REQ': cap_negotiation = True requested_caps = frozenset(args[-1].split(' ')) if requested_caps - self._advertised_caps: self.send('CAP', None, {}, ('*', 'NAK', args[-1])) continue if 'message-tags' in requested_caps: self._irc_msg_unparser = ircv3_message_unparser if 'echo-message' in requested_caps: self.irc.ircv3_caps.add('echo-message') self.send('CAP', None, {}, ('*', 'ACK', args[-1])) elif subcmd == 'END': cap_negotiation = False else: self.send('410', None, {}, ('*', subcmd, 'Invalid CAP subcommand')) elif cmd == 'QUIT': self.sock.shutdown(socket.SHUT_RDWR) self.sock.close() return elif cmd == 'PING': self.send('PONG', None, {}, args) else: self._sendq.append((tags, cmd, args)) # Set values self.irc.nick = nick self.irc.ident = user[0] self.irc.realname = user[-1] # Connect self.irc.connect() # Send a command def _sendcmd(self, tags, cmd, args): if cmd in self.block_outgoing: return if cmd == 'PING': args = [f'proxy:{args[0] if args else ""}'] raw = ircv2_message_unparser(cmd, (cmd, cmd, cmd), {}, args, colon=False, encoding=None) self.irc.quote(raw, tags=tags) # The more permanent main loop def _main(self, single_thread=False): # Send everything to IRC while True: try: tags, cmd, args = self.recv() except Exception: traceback.print_exc() return self.irc.disconnect() self._sendcmd(tags, cmd, args) # The proxy class class Server: Proxy = Proxy def __init__(self, *args, local_addr, **kwargs): self.addr = local_addr self.args, self.kwargs = args, kwargs def main(self): # Create a socket object with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock: sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) sock.bind(self.addr) sock.listen(1) conn, _ = sock.accept() with conn: net = self.Proxy(conn, *self.args, **self.kwargs) net.thread.join() net.irc.wait_until_disconnected() def main(): import argparse parser = argparse.ArgumentParser() parser.add_argument('bind_port', type=int) parser.add_argument('username') parser.add_argument('password') parser.add_argument('--debug', '-v', action='store_true') parser.add_argument('--idc-ip', default='andrewyu.org') parser.add_argument('--idc-port', type=int, default=6835) args = parser.parse_args() Server(args.idc_ip, args.idc_port, '', ssl=True, persist=False, local_addr=('127.0.0.1', args.bind_port), debug=args.debug, ns_identity=(args.username, args.password)).main() if __name__ == '__main__': main()