From fbe8f48d631ba77967bed1a30452d9ec89365d4e Mon Sep 17 00:00:00 2001 From: Dpeta <69427753+Dpeta@users.noreply.github.com> Date: Fri, 10 Feb 2023 02:33:01 +0100 Subject: [PATCH] Split SendIRC, parse_irc_line, and get_ssl_content off into appropriate files --- irc.py | 286 +++++----------------------------------- scripts/irc_protocol.py | 176 +++++++++++++++++++++++++ scripts/ssl_context.py | 52 ++++++++ 3 files changed, 261 insertions(+), 253 deletions(-) create mode 100644 scripts/irc_protocol.py create mode 100644 scripts/ssl_context.py diff --git a/irc.py b/irc.py index 92312a8..58fe063 100644 --- a/irc.py +++ b/irc.py @@ -30,10 +30,10 @@ the license notice included with oyoyo source files is indented here: """ import sys import socket -import ssl import random import datetime import logging +from ssl import SSLEOFError, SSLCertVerificationError try: from PyQt6 import QtCore, QtGui @@ -45,6 +45,8 @@ from mood import Mood from dataobjs import PesterProfile from generic import PesterList from version import _pcVersion +from scripts.irc_protocol import SendIRC, parse_irc_line +from scripts.ssl_context import get_ssl_context PchumLog = logging.getLogger("pchumLogger") SERVICES = [ @@ -57,25 +59,10 @@ SERVICES = [ "botserv", ] -try: - import certifi -except ImportError: - if sys.platform == "darwin": - # Certifi is required to validate certificates on MacOS with pyinstaller builds. - PchumLog.warning( - "Failed to import certifi, which is recommended on MacOS. " - "Pesterchum might not be able to validate certificates unless " - "Python's root certs are installed." - ) - else: - PchumLog.info( - "Failed to import certifi, Pesterchum will not be able to validate " - "certificates if the system-provided root certificates are invalid." - ) - class PesterIRC(QtCore.QThread): """Class for making a thread that manages the connection to server.""" + def __init__(self, config, window, verify_hostname=True): QtCore.QThread.__init__(self) self.mainwindow = window @@ -137,38 +124,6 @@ class PesterIRC(QtCore.QThread): "cap": self.cap, # IRCv3 Client Capability Negotiation } - def get_ssl_context(self): - """Returns an SSL context for connecting over SSL/TLS. - Loads the certifi root certificate bundle if the certifi module is less - than a year old or if the system certificate store is empty. - - The cert store on Windows also seems to have issues, so it's better - to use the certifi provided bundle assuming it's a recent version. - - On MacOS the system cert store is usually empty, as Python does not use - the system provided ones, instead relying on a bundle installed with the - python installer.""" - default_context = ssl.create_default_context() - if "certifi" not in sys.modules: - return default_context - - # Get age of certifi module - certifi_date = datetime.datetime.strptime(certifi.__version__, "%Y.%m.%d") - current_date = datetime.datetime.now() - certifi_age = current_date - certifi_date - - empty_cert_store = ( - list(default_context.cert_store_stats().values()).count(0) == 3 - ) - # 31557600 seconds is approximately 1 year - if empty_cert_store or certifi_age.total_seconds() <= 31557600: - PchumLog.info( - "Using SSL/TLS context with certifi-provided root certificates." - ) - return ssl.create_default_context(cafile=certifi.where()) - PchumLog.info("Using SSL/TLS context with system-provided root certificates.") - return default_context - def connect(self, verify_hostname=True): """Initiates the connection to the server set in self.host:self.port self.ssl decides whether the connection uses ssl. @@ -184,7 +139,7 @@ class PesterIRC(QtCore.QThread): if self.ssl: # Upgrade connection to use SSL/TLS if enabled - context = self.get_ssl_context() + context = get_ssl_context() context.check_hostname = verify_hostname self.socket = context.wrap_socket( plaintext_socket, server_hostname=self.host @@ -225,7 +180,7 @@ class PesterIRC(QtCore.QThread): for line in split_buffer: line = line.decode(encoding="utf-8", errors="replace") - tags, prefix, command, args = self.parse_irc_line(line) + tags, prefix, command, args = parse_irc_line(line) if command: # Only need tags with tagmsg if command.casefold() == "tagmsg": @@ -237,7 +192,7 @@ class PesterIRC(QtCore.QThread): except socket.timeout as se: PchumLog.debug("passing timeout") raise se - except (OSError, ssl.SSLEOFError) as se: + except (OSError, SSLEOFError) as se: PchumLog.warning("Problem: %s", se) if self.socket: PchumLog.info("Error: closing socket.") @@ -253,36 +208,23 @@ class PesterIRC(QtCore.QThread): self.socket.close() yield False - def parse_irc_line(self, line: str): - """Retrieves tags, prefix, command, and arguments from an unparsed IRC line.""" - parts = line.split(" ") - tags = None - prefix = None - if parts[0].startswith(":"): - prefix = parts[0][1:] - command = parts[1] - args = parts[2:] - elif parts[0].startswith("@"): - tags = parts[0] # IRCv3 message tag - prefix = parts[1][1:] - command = parts[2] - args = parts[3:] + def run_command(self, command, *args): + """Finds and runs a command if it has a matching function in the self.command dict.""" + PchumLog.debug("run_command %s(%s)", command, args) + if command in self.commands: + command_function = self.commands[command] else: - command = parts[0] - args = parts[1:] - command = command.casefold() + PchumLog.warning("No matching function for command: %s(%s)", command, args) + return - # If ':' is present the subsequent args are one parameter. - fused_args = [] - for idx, arg in enumerate(args): - if arg.startswith(":"): - final_param = " ".join(args[idx:]) - fused_args.append(final_param[1:]) - break - else: - fused_args.append(arg) - - return (tags, prefix, command, fused_args) + try: + command_function(*args) + except TypeError: + PchumLog.exception( + "Failed to pass command, did the server pass an unsupported paramater?" + ) + except Exception: + PchumLog.exception("Exception while parsing command.") def close(self): """Kill the socket 'with extreme prejudice'.""" @@ -301,7 +243,7 @@ class PesterIRC(QtCore.QThread): def IRCConnect(self): try: self.connect(self.verify_hostname) - except ssl.SSLCertVerificationError as e: + except SSLCertVerificationError as e: # Ask if users wants to connect anyway self.askToConnect.emit(e) raise e @@ -333,15 +275,7 @@ class PesterIRC(QtCore.QThread): if not res: PchumLog.debug("False Yield: %s, returning", res) return - - def setConnected(self): - """Called when connected and registered to server. - - Meaning the server has accepted our nick and user and has replied with 001/welcome. - """ - self.registeredIRC = True - self.connected.emit() - + def setConnectionBroken(self): """Called when the connection is broken.""" PchumLog.critical("setConnectionBroken() got called, disconnecting.") @@ -851,10 +785,9 @@ class PesterIRC(QtCore.QThread): def welcome(self, _server, _nick, _msg): """Numeric reply 001 RPL_WELCOME, send when we've connected to the server.""" - self.setConnected() - # mychumhandle = self.mainwindow.profile().handle - mymood = self.mainwindow.profile().mood.value() - color = self.mainwindow.profile().color + self.registeredIRC = True # Registered as in, the server has accepted our nick & user. + self.connected.emit() # Alert main thread that we've connected. + profile = self.mainwindow.profile() if not self.mainwindow.config.lowBandwidth(): # Negotiate capabilities self.send_irc.cap("REQ", "message-tags") @@ -867,12 +800,12 @@ class PesterIRC(QtCore.QThread): self.send_irc.join("#pesterchum") # Moods via metadata self.send_irc.metadata("*", "sub", "mood") - self.send_irc.metadata("*", "set", "mood", str(mymood)) + self.send_irc.metadata("*", "set", "mood", str(profile.mood.value)) # Color via metadata self.send_irc.metadata("*", "sub", "color") - self.send_irc.metadata("*", "set", "color", str(color.name())) + self.send_irc.metadata("*", "set", "color", profile.color.name()) # Backwards compatible moods - self.send_irc.privmsg("#pesterchum", f"MOOD >{mymood}") + self.send_irc.privmsg("#pesterchum", f"MOOD >{profile.mymood}") def featurelist(self, _target, _handle, *params): """Numerical reply 005 RPL_ISUPPORT to communicate supported server features. @@ -980,7 +913,9 @@ class PesterIRC(QtCore.QThread): def _reset_nick(self, oldnick): """Set our nick to a random pesterClient.""" - random_number = int(random.random() * 9999) # Random int in range 1000 <---> 9999 + random_number = int( + random.random() * 9999 + ) # Random int in range 1000 <---> 9999 newnick = f"pesterClient{random_number}" self.send_irc.nick(newnick) self.nickCollision.emit(oldnick, newnick) @@ -1032,24 +967,6 @@ class PesterIRC(QtCore.QThread): """ "METADATA DRAFT numeric reply 770 RPL_METADATASUBOK, we subbed to a key.""" PchumLog.info("metadatasubok: %s", params) - def run_command(self, command, *args): - """Finds and runs a command.""" - PchumLog.debug("run_command %s(%s)", command, args) - if command in self.commands: - command_function = self.commands[command] - else: - PchumLog.warning("No matching function for command: %s(%s)", command, args) - return - - try: - command_function(*args) - except TypeError: - PchumLog.exception( - "Failed to pass command, did the server pass an unsupported paramater?" - ) - except Exception: - PchumLog.exception("Exception while parsing command.") - moodUpdated = QtCore.pyqtSignal("QString", Mood) colorUpdated = QtCore.pyqtSignal("QString", QtGui.QColor) messageReceived = QtCore.pyqtSignal("QString", "QString") @@ -1070,140 +987,3 @@ class PesterIRC(QtCore.QThread): cannotSendToChan = QtCore.pyqtSignal("QString", "QString") quirkDisable = QtCore.pyqtSignal("QString", "QString", "QString") signal_forbiddenchannel = QtCore.pyqtSignal("QString", "QString") - -class SendIRC: - """Provides functions for outgoing IRC commands.""" - - def __init__(self): - self.socket = None # INET socket connected with server. - - def _send(self, *args: str, text=None): - """Send a command to the IRC server. - - Takes either a string or a list of strings. - The 'text' argument is for the final parameter, which can have spaces. - - Since this checks if the socket is alive, it's best to send via this method.""" - # Return if disconnected - if not self.socket or self.socket.fileno() == -1: - PchumLog.error( - "Send attempted while disconnected, args: %s, text: %s.", args, text - ) - return - - command = "" - # Convert command arguments to a single string if passed. - if args: - command += " ".join(args) - # If text is passed, add ':' to imply everything after it is one parameter. - if text: - command += f" :{text}" - # Add characters for end of line in IRC. - command += "\r\n" - # UTF-8 is the prefered encoding in 2023. - outgoing_bytes = command.encode(encoding="utf-8", errors="replace") - - try: - PchumLog.debug("Sending: %s", command) - self.socket.sendall(outgoing_bytes) - except OSError: - PchumLog.exception("Error while sending: '%s'", command.strip()) - self.socket.close() - - def ping(self, token): - """Send PING command to server to check for connectivity.""" - self._send("PING", text=token) - - def pong(self, token): - """Send PONG command to reply to server PING.""" - self._send("PONG", token) - - def nick(self, nick): - """Send USER command to communicate nick to server.""" - self._send("NICK", nick) - - def user(self, username, realname): - """Send USER command to communicate username and realname to server.""" - self._send("USER", username, "0", "*", text=realname) - - def privmsg(self, target, text): - """Send PRIVMSG command to send a message.""" - for line in text.split("\n"): - self._send("PRIVMSG", target, text=line) - - def names(self, channel): - """Send NAMES command to view channel members.""" - self._send("NAMES", channel) - - def kick(self, channel, user, reason=""): - """Send KICK command to force user from channel.""" - if reason: - self._send(f"KICK {channel} {user}", text=reason) - else: - self._send(f"KICK {channel} {user}") - - def mode(self, target, modestring="", mode_arguments=""): - """Set or remove modes from target.""" - outgoing_mode = " ".join([target, modestring, mode_arguments]).strip() - self._send("MODE", outgoing_mode) - - def ctcp(self, target, command, msg=""): - """Send Client-to-Client Protocol message.""" - outgoing_ctcp = " ".join( - [command, msg] - ).strip() # Extra spaces break protocol, so strip. - self.privmsg(target, f"\x01{outgoing_ctcp}\x01") - - def metadata(self, target, subcommand, *params): - """Send Metadata command to get or set metadata. - - See IRC metadata draft specification: - https://gist.github.com/k4bek4be/92c2937cefd49990fbebd001faf2b237 - """ - self._send("METADATA", target, subcommand, *params) - - def cap(self, subcommand, *params): - """Send IRCv3 CAP command for capability negotiation. - - See: https://ircv3.net/specs/extensions/capability-negotiation.html""" - self._send("CAP", subcommand, *params) - - def join(self, channel, key=""): - """Send JOIN command to join a channel/memo. - - Keys or joining multiple channels is possible in the specification, but unused. - """ - channel_and_key = " ".join([channel, key]).strip() - self._send("JOIN", channel_and_key) - - def part(self, channel): - """Send PART command to leave a channel/memo. - - Providing a reason or leaving multiple channels is possible in the specification. - """ - self._send("PART", channel) - - def notice(self, target, text): - """Send a NOTICE to a user or channel.""" - self._send("NOTICE", target, text=text) - - def invite(self, nick, channel): - """Send INVITE command to invite a user to a channel.""" - self._send("INVITE", nick, channel) - - def away(self, text=None): - """AWAY command to mark client as away or no longer away. - - No 'text' parameter means the client is no longer away.""" - if text: - self._send("AWAY", text=text) - else: - self._send("AWAY") - - def list(self): - """Send LIST command to get list of channels.""" - self._send("LIST") - - def quit(self, reason=""): - """Send QUIT to terminate connection.""" - self._send("QUIT", text=reason) diff --git a/scripts/irc_protocol.py b/scripts/irc_protocol.py new file mode 100644 index 0000000..b257d20 --- /dev/null +++ b/scripts/irc_protocol.py @@ -0,0 +1,176 @@ +"""IRC-related functions and classes to be imported by irc.py""" +import logging + +PchumLog = logging.getLogger("pchumLogger") + + +class SendIRC: + """Provides functions for outgoing IRC commands. + + Functions are protocol compliant but don't implement all valid uses of certain commands. + """ + + def __init__(self): + self.socket = None # INET socket connected with server. + + def _send(self, *args: str, text=None): + """Send a command to the IRC server. + + Takes either a string or a list of strings. + The 'text' argument is for the final parameter, which can have spaces. + + Since this checks if the socket is alive, it's best to send via this method.""" + # Return if disconnected + if not self.socket or self.socket.fileno() == -1: + PchumLog.error( + "Send attempted while disconnected, args: %s, text: %s.", args, text + ) + return + + command = "" + # Convert command arguments to a single string if passed. + if args: + command += " ".join(args) + # If text is passed, add ':' to imply everything after it is one parameter. + if text: + command += f" :{text}" + # Add characters for end of line in IRC. + command += "\r\n" + # UTF-8 is the prefered encoding in 2023. + outgoing_bytes = command.encode(encoding="utf-8", errors="replace") + + try: + PchumLog.debug("Sending: %s", command) + self.socket.sendall(outgoing_bytes) + except OSError: + PchumLog.exception("Error while sending: '%s'", command.strip()) + self.socket.close() + + def ping(self, token): + """Send PING command to server to check for connectivity.""" + self._send("PING", text=token) + + def pong(self, token): + """Send PONG command to reply to server PING.""" + self._send("PONG", token) + + def nick(self, nick): + """Send USER command to communicate nick to server.""" + self._send("NICK", nick) + + def user(self, username, realname): + """Send USER command to communicate username and realname to server.""" + self._send("USER", username, "0", "*", text=realname) + + def privmsg(self, target, text): + """Send PRIVMSG command to send a message.""" + for line in text.split("\n"): + self._send("PRIVMSG", target, text=line) + + def names(self, channel): + """Send NAMES command to view channel members.""" + self._send("NAMES", channel) + + def kick(self, channel, user, reason=""): + """Send KICK command to force user from channel.""" + if reason: + self._send(f"KICK {channel} {user}", text=reason) + else: + self._send(f"KICK {channel} {user}") + + def mode(self, target, modestring="", mode_arguments=""): + """Set or remove modes from target.""" + outgoing_mode = " ".join([target, modestring, mode_arguments]).strip() + self._send("MODE", outgoing_mode) + + def ctcp(self, target, command, msg=""): + """Send Client-to-Client Protocol message.""" + outgoing_ctcp = " ".join( + [command, msg] + ).strip() # Extra spaces break protocol, so strip. + self.privmsg(target, f"\x01{outgoing_ctcp}\x01") + + def metadata(self, target, subcommand, *params): + """Send Metadata command to get or set metadata. + + See IRC metadata draft specification: + https://gist.github.com/k4bek4be/92c2937cefd49990fbebd001faf2b237 + """ + self._send("METADATA", target, subcommand, *params) + + def cap(self, subcommand, *params): + """Send IRCv3 CAP command for capability negotiation. + + See: https://ircv3.net/specs/extensions/capability-negotiation.html""" + self._send("CAP", subcommand, *params) + + def join(self, channel, key=""): + """Send JOIN command to join a channel/memo. + + Keys or joining multiple channels is possible in the specification, but unused. + """ + channel_and_key = " ".join([channel, key]).strip() + self._send("JOIN", channel_and_key) + + def part(self, channel): + """Send PART command to leave a channel/memo. + + Providing a reason or leaving multiple channels is possible in the specification. + """ + self._send("PART", channel) + + def notice(self, target, text): + """Send a NOTICE to a user or channel.""" + self._send("NOTICE", target, text=text) + + def invite(self, nick, channel): + """Send INVITE command to invite a user to a channel.""" + self._send("INVITE", nick, channel) + + def away(self, text=None): + """AWAY command to mark client as away or no longer away. + + No 'text' parameter means the client is no longer away.""" + if text: + self._send("AWAY", text=text) + else: + self._send("AWAY") + + def list(self): + """Send LIST command to get list of channels.""" + self._send("LIST") + + def quit(self, reason=""): + """Send QUIT to terminate connection.""" + self._send("QUIT", text=reason) + + +def parse_irc_line(line: str): + """Retrieves tags, prefix, command, and arguments from an unparsed IRC line.""" + parts = line.split(" ") + tags = None + prefix = None + if parts[0].startswith(":"): + prefix = parts[0][1:] + command = parts[1] + args = parts[2:] + elif parts[0].startswith("@"): + tags = parts[0] # IRCv3 message tag + prefix = parts[1][1:] + command = parts[2] + args = parts[3:] + else: + command = parts[0] + args = parts[1:] + command = command.casefold() + + # If ':' is present the subsequent args are one parameter. + fused_args = [] + for idx, arg in enumerate(args): + if arg.startswith(":"): + final_param = " ".join(args[idx:]) + fused_args.append(final_param[1:]) + break + fused_args.append(arg) + + return (tags, prefix, command, fused_args) diff --git a/scripts/ssl_context.py b/scripts/ssl_context.py new file mode 100644 index 0000000..30938fb --- /dev/null +++ b/scripts/ssl_context.py @@ -0,0 +1,52 @@ +"""Provides a function for creating an appropriate SSL context.""" +import ssl +import sys +import datetime +import logging + +PchumLog = logging.getLogger("pchumLogger") + +try: + import certifi +except ImportError: + if sys.platform == "darwin": + # Certifi is required to validate certificates on MacOS with pyinstaller builds. + PchumLog.warning( + "Failed to import certifi, which is recommended on MacOS. " + "Pesterchum might not be able to validate certificates unless " + "Python's root certs are installed." + ) + else: + PchumLog.info( + "Failed to import certifi, Pesterchum will not be able to validate " + "certificates if the system-provided root certificates are invalid." + ) + + +def get_ssl_context(): + """Returns an SSL context for connecting over SSL/TLS. + Loads the certifi root certificate bundle if the certifi module is less + than a year old or if the system certificate store is empty. + + The cert store on Windows also seems to have issues, so it's better + to use the certifi provided bundle assuming it's a recent version. + + On MacOS the system cert store is usually empty, as Python does not use + the system provided ones, instead relying on a bundle installed with the + python installer.""" + default_context = ssl.create_default_context() + if "certifi" not in sys.modules: + return default_context + + # Get age of certifi module + certifi_date = datetime.datetime.strptime(certifi.__version__, "%Y.%m.%d") + current_date = datetime.datetime.now() + certifi_age = current_date - certifi_date + + empty_cert_store = list(default_context.cert_store_stats().values()).count(0) == 3 + # 31557600 seconds is approximately 1 year + if empty_cert_store or certifi_age.total_seconds() <= 31557600: + PchumLog.info("Using SSL/TLS context with certifi-provided root certificates.") + return ssl.create_default_context(cafile=certifi.where()) + PchumLog.info("Using SSL/TLS context with system-provided root certificates.") + return default_context