# Manage the IRC layer of GLaDOS import logging import re import socket import ssl from functools import wraps from queue import Queue from threading import Thread class IRC(object): def __init__(self, host, port): """ Initialize an IRC wrapper """ # Public attributes self.connected = False # Simple lock self.running = False # Private attributes self._socket = ssl.create_default_context().wrap_socket( socket.create_connection((host, port)), server_hostname=host) self._inbox = Queue() self._handler = Thread(target=self._handle) self._callbacks = [] # Public methods def start(self, nick, password, sasl_nick=None): """ Start the IRC layer. Manage authentication as well Return True if authentication succeed, False if failed""" sasl_nick = sasl_nick or nick self.nick = nick self.running = True logging.debug("Thread start") self._handler.start() self._send(f"USER {nick} * * :{nick}") self._send(f"NICK {nick}") self._waitfor(lambda m: "NOTICE" in m and "/AUTH" in m) for i in range(3): self._send(f"AUTH {sasl_nick}:{password}") msg = self._waitfor(lambda m: "You are now logged in" in m or "Authentication failed" in m) if "You are now logged in" in msg: self.connected = True return True logging.info(f"Authentication for {nick} ({sasl_nick}) failed") self.connected = False self._handler.stop() return False def stop(self): """ Stop the IRC layer """ self._send("QUIT :Bye bye") logging.debug("STOP: sent QUIT message") self.running = False try: self._handler.join() except RuntimeError: logging.error("STOP: Cannot join handler") logging.debug("STOP: thread has terminated") self._socket.close() logging.debug("STOP: socket close") def run(self): """ Handle new messages """ while True: message = self.receive() logging.info(f"received {message}") if message is not None: for event, callback in self._callbacks: if event(message): logging.info(f"matched {event.__name__}") callback(message) def send(self, target, message): """ Send a message to the specified target (channel or user) """ self._send(f"PRIVMSG {target} :{message}") def receive(self): """ Receive a private message """ while True: message = self._recv() if " PRIVMSG " in message: msg = IRCMessage(message) if msg: return msg logging.debug(f"skipped message {msg}") def join(self, channel): """ Join a channel """ self._send(f"JOIN {channel}") logging.info(f"joined {channel}") def on(self, event): """ Adds a callback to the IRC handler Event is a function taking in parameter a IRCMessage and returning True if the callback should be executed on the message """ def callback(func): @wraps(func) def wrapper(message): func(message) self._callbacks.append((event, wrapper)) logging.info(f"added callback {func.__name__}") return wrapper return callback # Private methods def _handle(self, store=True): """ Handle raw messages from irc and manage ping """ while self.running: # Get incoming messages data = self._socket.recv(4096).decode() if data.__len__() == 0: self.stop() # Split multiple lines for m in data.split('\r\n'): # Manage ping if m.startswith("PING"): self._send(m.replace("PING", "PONG")) # Or add a new message to inbox elif len(m): if store: self._inbox.put(m) logging.debug(f"_handle has quit: running={self.running}") def _send(self, raw): """ Wrap and encode raw message to send """ try: self._socket.send(f"{raw}\r\n".encode()) except OSError as e: logging.warning(e) # Do not display password in logs if raw.startswith("AUTH"): raw = raw.split(":")[0] + ":*REDACTED*" logging.debug(raw) def _recv(self): try: m = self._inbox.get() except OSError as e: logging.warning(e) logging.debug(m) return m def _waitfor(self, condition): """ Wait for a raw message that matches the condition """ msg = self._recv() while not condition(msg): msg = self._recv() return msg class IRCMessage(object): r = re.compile("^:(?P[\w.~|\[\]]+)(?:!(?P\S+))? PRIVMSG (?P\S+) :(?P.+)") def __init__(self, raw): match = re.search(IRCMessage.r, raw) if match: self.author = match.group("author") self.to = match.group("to") self.text = match.group("text") logging.debug(f"sucessfully parsed {raw} into {self}") else: self.author = "" self.to = "" self.text = "" logging.warning(f"failed to parse {raw} into valid message") def __str__(self): return f"{self.author} to {self.to}: {self.text}"