import socket, ssl from queue import Queue def create_socket(type, target): if type == "irc": s = socket.create_connection(('irc.planet-casio.com', 6697)) context = ssl.create_default_context() s = context.wrap_socket(s, server_hostname='irc.planet-casio.com') return s elif type == "v5": s = socket.socket(socket.AF_INET,socket.SOCK_DGRAM) s.bind(target) return s class IRC(object): def __init__(self, target): self.socket = create_socket("irc", target) self.inbox = Queue() # Public API def raw_handle(self): """ Handle new messages and ping responses. Runs in its own thread""" data = self.socket.recv(4096).decode() for m in data.split('\r\n'): if m.startswith("PING"): self._send(m.replace("PING", "PONG")) else: self.inbox.put(m) def connect(self, nick, password): """ Connect and authenticate to IRC server """ self._send(f"NICK {nick}") self._send(f"USER {nick} * * :{nick}") self._waitfor(lambda m: "NOTICE" in m and "/AUTH" in m) self._send(f"auth {nick}:{password}") self._waitfor(lambda m: f":{nick} MODE {nick} :+iwxz" in m) def join(self, channel): """ Join a channel """ self._send("JOIN :{channel}") def msg(self, msg, channel): self._send(f"PRIVMSG {channel} :{msg}") def get_msg(self): """ Get the next message on the inbox """ return _waitfor(lambda m: "PRIVMSG" in m) # Private API def _send(self, raw): """ Send raw data """ self.socket.send(f"{raw}\r\n".encode()) def _recv(self): """ Pop a new item from INBOX """ print("Waiting for new item") return self.inbox.get() def _waitfor(self, condition): """ Wait for a message that matches the condition """ msg = self._recv() while not condition(msg): print(f"Wait for: {msg}") msg = self._recv() return msg