summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAndrew Yu <andrew@andrewyu.org>2022-08-11 22:39:50 +0800
committerAndrew Yu <andrew@andrewyu.org>2022-08-11 22:39:50 +0800
commit10d757ec85a5913c4f1e2e16d332c5c7a2309960 (patch)
treeaf2244ee7f12c2b3658dcaf7cf883baf04aaaff3
parentf00c76aa3e47fc0f586794b8b5436f10ad544c1b (diff)
downloadirc-mod-bot-10d757ec85a5913c4f1e2e16d332c5c7a2309960.tar.gz
irc-mod-bot-10d757ec85a5913c4f1e2e16d332c5c7a2309960.zip
Basic kickbanning
-rw-r--r--TODO3
-rw-r--r--bot.py175
2 files changed, 157 insertions, 21 deletions
diff --git a/TODO b/TODO
new file mode 100644
index 0000000..3f9352f
--- /dev/null
+++ b/TODO
@@ -0,0 +1,3 @@
+- Keep track of channel modes, keep track of QUIT.
+- Summon the GNU.
+- Reload configuration on-the-fly.
diff --git a/bot.py b/bot.py
index 6e5a79b..1f7c626 100644
--- a/bot.py
+++ b/bot.py
@@ -37,7 +37,19 @@ import base64
import re
import time
-from config import SERVER, PORT, NICK, IDENT, GECOS, CHANNELS, PREFIX, LOGIN, PASSWORD, ADMINS
+
+from config import (
+ SERVER,
+ PORT,
+ NICK,
+ IDENT,
+ GECOS,
+ CHANNELS,
+ PREFIX,
+ LOGIN,
+ PASSWORD,
+ ADMINS,
+)
car = lambda l: l[0]
cdr = lambda l: l[1:]
@@ -50,6 +62,8 @@ class User:
host: Optional[bytes] = None
on_server: Optional[bytes] = None
in_channels: list[Channel] = field(default_factory=list)
+ last_activity_time: Optional[float] = None
+ last_activity: Optional[bytes] = None
@dataclass
@@ -83,27 +97,29 @@ class Message:
class ProtocolViolation(Exception):
pass
+
def match_hostmask(hostmask: bytes, pattern: bytes) -> bool:
regexp = b""
for i in list(pattern):
c = i.to_bytes(1, "big")
- if c == b'*':
- regexp += b'.*'
- elif c == b'?':
- regexp += b'.'
- elif c in b'[{':
- regexp += b'[\\[{]'
- elif c in b'}]':
- regexp += b'[}\\]]'
- elif c in b'|\\':
- regexp += b'[|\\\\]'
- elif c in b'^~':
- regexp += b'[~^]'
+ if c == b"*":
+ regexp += b".*"
+ elif c == b"?":
+ regexp += b"."
+ elif c in b"[{":
+ regexp += b"[\\[{]"
+ elif c in b"}]":
+ regexp += b"[}\\]]"
+ elif c in b"|\\":
+ regexp += b"[|\\\\]"
+ elif c in b"^~":
+ regexp += b"[~^]"
else:
regexp += re.escape(c)
- regexp += b'$'
+ regexp += b"$"
return bool(re.compile(regexp, re.I).match(hostmask))
+
def is_admin(user: User) -> Optional[bool]:
if not (user.nick and user.ident and user.host):
return None
@@ -113,6 +129,14 @@ def is_admin(user: User) -> Optional[bool]:
return False
+def human_list_sep(l: list[bytes]) -> Optional[bytes]:
+ if len(l) == 0:
+ return None
+ if len(l) == 1:
+ return l[0]
+ return b", ".join(l[:-1]) + b" and " + l[-1]
+
+
def parse_nih(nih: bytes) -> tuple[Optional[bytes], Optional[bytes], Optional[bytes]]:
"Parse a nick!username@host into tuples"
n_ih = nih.split(b"!", 1)
@@ -189,11 +213,14 @@ def send(s: socket.socket, *args: bytes) -> bytes:
print("<", line.decode("utf-8", "surrogateescape"))
return line
+
def reply(s: socket.socket, msg: Message, text: bytes) -> bytes:
if msg.source is None:
raise TypeError("Cannot reply to a message that doesn't have a source")
if msg.cmd != b"PRIVMSG":
- raise TypeError("reply() only accepts incoming PRIVMSG Message's as the msg argument")
+ raise TypeError(
+ "reply() only accepts incoming PRIVMSG Message's as the msg argument"
+ )
if msg.args[0].startswith(b"#"):
text = (b"%s: " % msg.source.nick) + text
reply_to = msg.args[0]
@@ -307,8 +334,11 @@ def handle_part(
except IndexError:
raise ProtocolViolation("PART without channel name") from None
else:
- msg.source.in_channels.remove(channels[msg.args[0]])
- channels[msg.args[0]].users.remove(msg.source)
+ try:
+ msg.source.in_channels.remove(channels[msg.args[0]])
+ channels[msg.args[0]].users.remove(msg.source)
+ except ValueError:
+ raise ProtocolViolation("Received PART for a user that hasn't joined the channel")
@register_irc_callback(b"352") # WHO
@@ -461,7 +491,7 @@ def handle_privmsg(
elif target == me.nick:
channel = None
if text.startswith(PREFIX):
- text = text[len(PREFIX):]
+ text = text[len(PREFIX) :]
else:
# A message addressed to a non-channel that isn't the bot itself? Weird!
return
@@ -474,29 +504,130 @@ def handle_privmsg(
print(repr(users))
reply(s, msg, b"I dumped my channel and user database to standard output.")
return
+ elif chat_command[0] == b"kb":
+ if not channel:
+ reply(s, msg, b"You may only do this in a channel.")
+ return
+ elif msg.source not in channel.users:
+ reply(
+ s,
+ msg,
+ b"You are not on the channel, and you're haxsending messages to it. This isn't nice.",
+ )
+ return
+ priv = is_admin(msg.source)
+ if not priv:
+ reply(s, msg, b"Access denied.")
+ return
+ elif not me in channel.ops:
+ reply(
+ s,
+ msg,
+ b"I am not a channel operator, so I kickban people.",
+ )
+ return
+
+ kickban_string = chat_command[1]
+ if b"@" in kickban_string:
+ if b"!" not in kickban_string:
+ kickban_string = b"*!" + kickban_string
+ send(s, b"MODE", channel.name, b"+b", kickban_string)
+ i: int = 0
+ for potential_target in channel.users:
+ assert potential_target.nick and potential_target.ident and potential_target.host
+ if match_hostmask(potential_target.nick + b"!" + potential_target.ident + b"@" + potential_target.host, kickban_string):
+ send(s, b"KICK", channel.name, potential_target.nick)
+ i += 1
+ reply(s, msg, b"Done kickbanning mask %s, kicked %d clients." % (kickban_string, i))
+ return
+ else:
+ try:
+ kickban_target = users[kickban_string]
+ assert kickban_target.nick and kickban_target.ident and kickban_target.host
+ send(s, b"MODE", channel.name, b"+b", b"*!%s@%s" % (kickban_target.ident if (not kickban_target.ident.startswith(b"~")) else b"*", kickban_target.host))
+ send(s, b"KICK", channel.name, kickban_target.nick)
+ return
+ except KeyError:
+ reply(s, msg, b"You didn't provide a hostmask for me to kickban, so I assumed it's a nickname, but I don't know %s." % kickban_string)
+ return
elif chat_command[0] == b"OP":
if not channel:
reply(s, msg, b"You may only do this in a channel.")
return
elif msg.source not in channel.users:
- reply(s, msg, b"You are not on the channel, and you're haxsending messages to it. This isn't nice.")
+ reply(
+ s,
+ msg,
+ b"You are not on the channel, and you're haxsending messages to it. This isn't nice.",
+ )
return
priv = is_admin(msg.source)
if priv is False:
reply(s, msg, b"Access denied.")
return
elif priv is None:
- reply(s, msg, b"I don't know your ident or host, so I can't check your permissions. Something is wrong.")
+ reply(
+ s,
+ msg,
+ b"I don't know your ident or host, so I can't check your permissions. Something is wrong.",
+ )
return
elif msg.source in channel.ops:
reply(s, msg, b"You are already a channel operator.")
return
+ elif not me in channel.ops:
+ reply(
+ s,
+ msg,
+ b"I am not a channel operator, so I can't make you a channel operator.",
+ )
+ return
else:
send(s, b"MODE", channel.name, b"+o", msg.source.nick)
reply(s, msg, b"I tried to make you a channel operator.")
+ elif chat_command[0] == b"WHOIS":
+ if not is_admin(msg.source):
+ reply(s, msg, b"Access denied.")
+ return
+ whois_target_nick = chat_command[1]
+ if whois_target_nick == me.nick:
+ reply(s, msg, b"That's me, right?")
+ return
+ elif whois_target_nick == msg.source.nick:
+ reply(s, msg, b"You should know enough about yourself.")
+ try:
+ whois_target = users[whois_target_nick]
+ except KeyError:
+ reply(s, msg, b"I know nothing about %s." % whois_target_nick)
+ return
+ if whois_target.last_activity_time is None:
+ reply(
+ s,
+ msg,
+ b"I didn't see %s do anything while I was here." % whois_target.nick,
+ )
+ else:
+ reply(
+ s,
+ msg,
+ b"%s's last activity was %s seconds ago: %s"
+ % (
+ whois_target.nick,
+ str(int(time.time() - whois_target.last_activity_time)).encode("ascii"),
+ whois_target.last_activity
+ ),
+ )
+ if whois_target.in_channels:
+ reply(s, msg, b"I am actively keeping track of %s through %s." % (whois_target.nick, human_list_sep([c.name for c in whois_target.in_channels])))
+ else:
+ reply(s, msg, b"I am not actively keeping track of %s as we don't have common channels." % whois_target.nick)
elif chat_command[0] == b"HELP":
# TODO for forkers: If you run this bot for public use, and you've changed the code (well, except for config.py), you must update the information below, in accordance with the AGPL.
- reply(s, msg, b"Hi! I'm a simple IRC bot. Consult git://git.andrewyu.org/irc-mod-bot.git/ for documentation.")
+ reply(
+ s,
+ msg,
+ b"Hi! I'm a simple IRC bot. Consult git://git.andrewyu.org/irc-mod-bot.git/ for documentation.",
+ )
else:
reply(s, msg, b"Unknown command: %s" % chat_command[0])
@@ -539,6 +670,8 @@ def handle_incoming_line(
# These assignments are to update the ident and the host from incoming messages to users in our database, though this may be a bit redundent to do every time.
source.ident = parsed[1][1]
source.host = parsed[1][2]
+ source.last_activity_time = time.time()
+ source.last_activity = m
msg = Message(parsed[0].upper(), source, parsed[2])
handle_incoming_message(s, state, me, users, channels, msg)