Split SendIRC, parse_irc_line, and get_ssl_content off into appropriate files
This commit is contained in:
parent
2c278c00fa
commit
fbe8f48d63
3 changed files with 261 additions and 253 deletions
286
irc.py
286
irc.py
|
@ -30,10 +30,10 @@ the license notice included with oyoyo source files is indented here:
|
|||
"""
|
||||
import sys
|
||||
import socket
|
||||
import ssl
|
||||
import random
|
||||
import datetime
|
||||
import logging
|
||||
from ssl import SSLEOFError, SSLCertVerificationError
|
||||
|
||||
try:
|
||||
from PyQt6 import QtCore, QtGui
|
||||
|
@ -45,6 +45,8 @@ from mood import Mood
|
|||
from dataobjs import PesterProfile
|
||||
from generic import PesterList
|
||||
from version import _pcVersion
|
||||
from scripts.irc_protocol import SendIRC, parse_irc_line
|
||||
from scripts.ssl_context import get_ssl_context
|
||||
|
||||
PchumLog = logging.getLogger("pchumLogger")
|
||||
SERVICES = [
|
||||
|
@ -57,25 +59,10 @@ SERVICES = [
|
|||
"botserv",
|
||||
]
|
||||
|
||||
try:
|
||||
import certifi
|
||||
except ImportError:
|
||||
if sys.platform == "darwin":
|
||||
# Certifi is required to validate certificates on MacOS with pyinstaller builds.
|
||||
PchumLog.warning(
|
||||
"Failed to import certifi, which is recommended on MacOS. "
|
||||
"Pesterchum might not be able to validate certificates unless "
|
||||
"Python's root certs are installed."
|
||||
)
|
||||
else:
|
||||
PchumLog.info(
|
||||
"Failed to import certifi, Pesterchum will not be able to validate "
|
||||
"certificates if the system-provided root certificates are invalid."
|
||||
)
|
||||
|
||||
|
||||
class PesterIRC(QtCore.QThread):
|
||||
"""Class for making a thread that manages the connection to server."""
|
||||
|
||||
def __init__(self, config, window, verify_hostname=True):
|
||||
QtCore.QThread.__init__(self)
|
||||
self.mainwindow = window
|
||||
|
@ -137,38 +124,6 @@ class PesterIRC(QtCore.QThread):
|
|||
"cap": self.cap, # IRCv3 Client Capability Negotiation
|
||||
}
|
||||
|
||||
def get_ssl_context(self):
|
||||
"""Returns an SSL context for connecting over SSL/TLS.
|
||||
Loads the certifi root certificate bundle if the certifi module is less
|
||||
than a year old or if the system certificate store is empty.
|
||||
|
||||
The cert store on Windows also seems to have issues, so it's better
|
||||
to use the certifi provided bundle assuming it's a recent version.
|
||||
|
||||
On MacOS the system cert store is usually empty, as Python does not use
|
||||
the system provided ones, instead relying on a bundle installed with the
|
||||
python installer."""
|
||||
default_context = ssl.create_default_context()
|
||||
if "certifi" not in sys.modules:
|
||||
return default_context
|
||||
|
||||
# Get age of certifi module
|
||||
certifi_date = datetime.datetime.strptime(certifi.__version__, "%Y.%m.%d")
|
||||
current_date = datetime.datetime.now()
|
||||
certifi_age = current_date - certifi_date
|
||||
|
||||
empty_cert_store = (
|
||||
list(default_context.cert_store_stats().values()).count(0) == 3
|
||||
)
|
||||
# 31557600 seconds is approximately 1 year
|
||||
if empty_cert_store or certifi_age.total_seconds() <= 31557600:
|
||||
PchumLog.info(
|
||||
"Using SSL/TLS context with certifi-provided root certificates."
|
||||
)
|
||||
return ssl.create_default_context(cafile=certifi.where())
|
||||
PchumLog.info("Using SSL/TLS context with system-provided root certificates.")
|
||||
return default_context
|
||||
|
||||
def connect(self, verify_hostname=True):
|
||||
"""Initiates the connection to the server set in self.host:self.port
|
||||
self.ssl decides whether the connection uses ssl.
|
||||
|
@ -184,7 +139,7 @@ class PesterIRC(QtCore.QThread):
|
|||
|
||||
if self.ssl:
|
||||
# Upgrade connection to use SSL/TLS if enabled
|
||||
context = self.get_ssl_context()
|
||||
context = get_ssl_context()
|
||||
context.check_hostname = verify_hostname
|
||||
self.socket = context.wrap_socket(
|
||||
plaintext_socket, server_hostname=self.host
|
||||
|
@ -225,7 +180,7 @@ class PesterIRC(QtCore.QThread):
|
|||
|
||||
for line in split_buffer:
|
||||
line = line.decode(encoding="utf-8", errors="replace")
|
||||
tags, prefix, command, args = self.parse_irc_line(line)
|
||||
tags, prefix, command, args = parse_irc_line(line)
|
||||
if command:
|
||||
# Only need tags with tagmsg
|
||||
if command.casefold() == "tagmsg":
|
||||
|
@ -237,7 +192,7 @@ class PesterIRC(QtCore.QThread):
|
|||
except socket.timeout as se:
|
||||
PchumLog.debug("passing timeout")
|
||||
raise se
|
||||
except (OSError, ssl.SSLEOFError) as se:
|
||||
except (OSError, SSLEOFError) as se:
|
||||
PchumLog.warning("Problem: %s", se)
|
||||
if self.socket:
|
||||
PchumLog.info("Error: closing socket.")
|
||||
|
@ -253,36 +208,23 @@ class PesterIRC(QtCore.QThread):
|
|||
self.socket.close()
|
||||
yield False
|
||||
|
||||
def parse_irc_line(self, line: str):
|
||||
"""Retrieves tags, prefix, command, and arguments from an unparsed IRC line."""
|
||||
parts = line.split(" ")
|
||||
tags = None
|
||||
prefix = None
|
||||
if parts[0].startswith(":"):
|
||||
prefix = parts[0][1:]
|
||||
command = parts[1]
|
||||
args = parts[2:]
|
||||
elif parts[0].startswith("@"):
|
||||
tags = parts[0] # IRCv3 message tag
|
||||
prefix = parts[1][1:]
|
||||
command = parts[2]
|
||||
args = parts[3:]
|
||||
def run_command(self, command, *args):
|
||||
"""Finds and runs a command if it has a matching function in the self.command dict."""
|
||||
PchumLog.debug("run_command %s(%s)", command, args)
|
||||
if command in self.commands:
|
||||
command_function = self.commands[command]
|
||||
else:
|
||||
command = parts[0]
|
||||
args = parts[1:]
|
||||
command = command.casefold()
|
||||
PchumLog.warning("No matching function for command: %s(%s)", command, args)
|
||||
return
|
||||
|
||||
# If ':' is present the subsequent args are one parameter.
|
||||
fused_args = []
|
||||
for idx, arg in enumerate(args):
|
||||
if arg.startswith(":"):
|
||||
final_param = " ".join(args[idx:])
|
||||
fused_args.append(final_param[1:])
|
||||
break
|
||||
else:
|
||||
fused_args.append(arg)
|
||||
|
||||
return (tags, prefix, command, fused_args)
|
||||
try:
|
||||
command_function(*args)
|
||||
except TypeError:
|
||||
PchumLog.exception(
|
||||
"Failed to pass command, did the server pass an unsupported paramater?"
|
||||
)
|
||||
except Exception:
|
||||
PchumLog.exception("Exception while parsing command.")
|
||||
|
||||
def close(self):
|
||||
"""Kill the socket 'with extreme prejudice'."""
|
||||
|
@ -301,7 +243,7 @@ class PesterIRC(QtCore.QThread):
|
|||
def IRCConnect(self):
|
||||
try:
|
||||
self.connect(self.verify_hostname)
|
||||
except ssl.SSLCertVerificationError as e:
|
||||
except SSLCertVerificationError as e:
|
||||
# Ask if users wants to connect anyway
|
||||
self.askToConnect.emit(e)
|
||||
raise e
|
||||
|
@ -333,15 +275,7 @@ class PesterIRC(QtCore.QThread):
|
|||
if not res:
|
||||
PchumLog.debug("False Yield: %s, returning", res)
|
||||
return
|
||||
|
||||
def setConnected(self):
|
||||
"""Called when connected and registered to server.
|
||||
|
||||
Meaning the server has accepted our nick and user and has replied with 001/welcome.
|
||||
"""
|
||||
self.registeredIRC = True
|
||||
self.connected.emit()
|
||||
|
||||
|
||||
def setConnectionBroken(self):
|
||||
"""Called when the connection is broken."""
|
||||
PchumLog.critical("setConnectionBroken() got called, disconnecting.")
|
||||
|
@ -851,10 +785,9 @@ class PesterIRC(QtCore.QThread):
|
|||
|
||||
def welcome(self, _server, _nick, _msg):
|
||||
"""Numeric reply 001 RPL_WELCOME, send when we've connected to the server."""
|
||||
self.setConnected()
|
||||
# mychumhandle = self.mainwindow.profile().handle
|
||||
mymood = self.mainwindow.profile().mood.value()
|
||||
color = self.mainwindow.profile().color
|
||||
self.registeredIRC = True # Registered as in, the server has accepted our nick & user.
|
||||
self.connected.emit() # Alert main thread that we've connected.
|
||||
profile = self.mainwindow.profile()
|
||||
if not self.mainwindow.config.lowBandwidth():
|
||||
# Negotiate capabilities
|
||||
self.send_irc.cap("REQ", "message-tags")
|
||||
|
@ -867,12 +800,12 @@ class PesterIRC(QtCore.QThread):
|
|||
self.send_irc.join("#pesterchum")
|
||||
# Moods via metadata
|
||||
self.send_irc.metadata("*", "sub", "mood")
|
||||
self.send_irc.metadata("*", "set", "mood", str(mymood))
|
||||
self.send_irc.metadata("*", "set", "mood", str(profile.mood.value))
|
||||
# Color via metadata
|
||||
self.send_irc.metadata("*", "sub", "color")
|
||||
self.send_irc.metadata("*", "set", "color", str(color.name()))
|
||||
self.send_irc.metadata("*", "set", "color", profile.color.name())
|
||||
# Backwards compatible moods
|
||||
self.send_irc.privmsg("#pesterchum", f"MOOD >{mymood}")
|
||||
self.send_irc.privmsg("#pesterchum", f"MOOD >{profile.mymood}")
|
||||
|
||||
def featurelist(self, _target, _handle, *params):
|
||||
"""Numerical reply 005 RPL_ISUPPORT to communicate supported server features.
|
||||
|
@ -980,7 +913,9 @@ class PesterIRC(QtCore.QThread):
|
|||
|
||||
def _reset_nick(self, oldnick):
|
||||
"""Set our nick to a random pesterClient."""
|
||||
random_number = int(random.random() * 9999) # Random int in range 1000 <---> 9999
|
||||
random_number = int(
|
||||
random.random() * 9999
|
||||
) # Random int in range 1000 <---> 9999
|
||||
newnick = f"pesterClient{random_number}"
|
||||
self.send_irc.nick(newnick)
|
||||
self.nickCollision.emit(oldnick, newnick)
|
||||
|
@ -1032,24 +967,6 @@ class PesterIRC(QtCore.QThread):
|
|||
""" "METADATA DRAFT numeric reply 770 RPL_METADATASUBOK, we subbed to a key."""
|
||||
PchumLog.info("metadatasubok: %s", params)
|
||||
|
||||
def run_command(self, command, *args):
|
||||
"""Finds and runs a command."""
|
||||
PchumLog.debug("run_command %s(%s)", command, args)
|
||||
if command in self.commands:
|
||||
command_function = self.commands[command]
|
||||
else:
|
||||
PchumLog.warning("No matching function for command: %s(%s)", command, args)
|
||||
return
|
||||
|
||||
try:
|
||||
command_function(*args)
|
||||
except TypeError:
|
||||
PchumLog.exception(
|
||||
"Failed to pass command, did the server pass an unsupported paramater?"
|
||||
)
|
||||
except Exception:
|
||||
PchumLog.exception("Exception while parsing command.")
|
||||
|
||||
moodUpdated = QtCore.pyqtSignal("QString", Mood)
|
||||
colorUpdated = QtCore.pyqtSignal("QString", QtGui.QColor)
|
||||
messageReceived = QtCore.pyqtSignal("QString", "QString")
|
||||
|
@ -1070,140 +987,3 @@ class PesterIRC(QtCore.QThread):
|
|||
cannotSendToChan = QtCore.pyqtSignal("QString", "QString")
|
||||
quirkDisable = QtCore.pyqtSignal("QString", "QString", "QString")
|
||||
signal_forbiddenchannel = QtCore.pyqtSignal("QString", "QString")
|
||||
|
||||
class SendIRC:
|
||||
"""Provides functions for outgoing IRC commands."""
|
||||
|
||||
def __init__(self):
|
||||
self.socket = None # INET socket connected with server.
|
||||
|
||||
def _send(self, *args: str, text=None):
|
||||
"""Send a command to the IRC server.
|
||||
|
||||
Takes either a string or a list of strings.
|
||||
The 'text' argument is for the final parameter, which can have spaces.
|
||||
|
||||
Since this checks if the socket is alive, it's best to send via this method."""
|
||||
# Return if disconnected
|
||||
if not self.socket or self.socket.fileno() == -1:
|
||||
PchumLog.error(
|
||||
"Send attempted while disconnected, args: %s, text: %s.", args, text
|
||||
)
|
||||
return
|
||||
|
||||
command = ""
|
||||
# Convert command arguments to a single string if passed.
|
||||
if args:
|
||||
command += " ".join(args)
|
||||
# If text is passed, add ':' to imply everything after it is one parameter.
|
||||
if text:
|
||||
command += f" :{text}"
|
||||
# Add characters for end of line in IRC.
|
||||
command += "\r\n"
|
||||
# UTF-8 is the prefered encoding in 2023.
|
||||
outgoing_bytes = command.encode(encoding="utf-8", errors="replace")
|
||||
|
||||
try:
|
||||
PchumLog.debug("Sending: %s", command)
|
||||
self.socket.sendall(outgoing_bytes)
|
||||
except OSError:
|
||||
PchumLog.exception("Error while sending: '%s'", command.strip())
|
||||
self.socket.close()
|
||||
|
||||
def ping(self, token):
|
||||
"""Send PING command to server to check for connectivity."""
|
||||
self._send("PING", text=token)
|
||||
|
||||
def pong(self, token):
|
||||
"""Send PONG command to reply to server PING."""
|
||||
self._send("PONG", token)
|
||||
|
||||
def nick(self, nick):
|
||||
"""Send USER command to communicate nick to server."""
|
||||
self._send("NICK", nick)
|
||||
|
||||
def user(self, username, realname):
|
||||
"""Send USER command to communicate username and realname to server."""
|
||||
self._send("USER", username, "0", "*", text=realname)
|
||||
|
||||
def privmsg(self, target, text):
|
||||
"""Send PRIVMSG command to send a message."""
|
||||
for line in text.split("\n"):
|
||||
self._send("PRIVMSG", target, text=line)
|
||||
|
||||
def names(self, channel):
|
||||
"""Send NAMES command to view channel members."""
|
||||
self._send("NAMES", channel)
|
||||
|
||||
def kick(self, channel, user, reason=""):
|
||||
"""Send KICK command to force user from channel."""
|
||||
if reason:
|
||||
self._send(f"KICK {channel} {user}", text=reason)
|
||||
else:
|
||||
self._send(f"KICK {channel} {user}")
|
||||
|
||||
def mode(self, target, modestring="", mode_arguments=""):
|
||||
"""Set or remove modes from target."""
|
||||
outgoing_mode = " ".join([target, modestring, mode_arguments]).strip()
|
||||
self._send("MODE", outgoing_mode)
|
||||
|
||||
def ctcp(self, target, command, msg=""):
|
||||
"""Send Client-to-Client Protocol message."""
|
||||
outgoing_ctcp = " ".join(
|
||||
[command, msg]
|
||||
).strip() # Extra spaces break protocol, so strip.
|
||||
self.privmsg(target, f"\x01{outgoing_ctcp}\x01")
|
||||
|
||||
def metadata(self, target, subcommand, *params):
|
||||
"""Send Metadata command to get or set metadata.
|
||||
|
||||
See IRC metadata draft specification:
|
||||
https://gist.github.com/k4bek4be/92c2937cefd49990fbebd001faf2b237
|
||||
"""
|
||||
self._send("METADATA", target, subcommand, *params)
|
||||
|
||||
def cap(self, subcommand, *params):
|
||||
"""Send IRCv3 CAP command for capability negotiation.
|
||||
|
||||
See: https://ircv3.net/specs/extensions/capability-negotiation.html"""
|
||||
self._send("CAP", subcommand, *params)
|
||||
|
||||
def join(self, channel, key=""):
|
||||
"""Send JOIN command to join a channel/memo.
|
||||
|
||||
Keys or joining multiple channels is possible in the specification, but unused.
|
||||
"""
|
||||
channel_and_key = " ".join([channel, key]).strip()
|
||||
self._send("JOIN", channel_and_key)
|
||||
|
||||
def part(self, channel):
|
||||
"""Send PART command to leave a channel/memo.
|
||||
|
||||
Providing a reason or leaving multiple channels is possible in the specification.
|
||||
"""
|
||||
self._send("PART", channel)
|
||||
|
||||
def notice(self, target, text):
|
||||
"""Send a NOTICE to a user or channel."""
|
||||
self._send("NOTICE", target, text=text)
|
||||
|
||||
def invite(self, nick, channel):
|
||||
"""Send INVITE command to invite a user to a channel."""
|
||||
self._send("INVITE", nick, channel)
|
||||
|
||||
def away(self, text=None):
|
||||
"""AWAY command to mark client as away or no longer away.
|
||||
|
||||
No 'text' parameter means the client is no longer away."""
|
||||
if text:
|
||||
self._send("AWAY", text=text)
|
||||
else:
|
||||
self._send("AWAY")
|
||||
|
||||
def list(self):
|
||||
"""Send LIST command to get list of channels."""
|
||||
self._send("LIST")
|
||||
|
||||
def quit(self, reason=""):
|
||||
"""Send QUIT to terminate connection."""
|
||||
self._send("QUIT", text=reason)
|
||||
|
|
176
scripts/irc_protocol.py
Normal file
176
scripts/irc_protocol.py
Normal file
|
@ -0,0 +1,176 @@
|
|||
"""IRC-related functions and classes to be imported by irc.py"""
|
||||
import logging
|
||||
|
||||
PchumLog = logging.getLogger("pchumLogger")
|
||||
|
||||
|
||||
class SendIRC:
|
||||
"""Provides functions for outgoing IRC commands.
|
||||
|
||||
Functions are protocol compliant but don't implement all valid uses of certain commands.
|
||||
"""
|
||||
|
||||
def __init__(self):
|
||||
self.socket = None # INET socket connected with server.
|
||||
|
||||
def _send(self, *args: str, text=None):
|
||||
"""Send a command to the IRC server.
|
||||
|
||||
Takes either a string or a list of strings.
|
||||
The 'text' argument is for the final parameter, which can have spaces.
|
||||
|
||||
Since this checks if the socket is alive, it's best to send via this method."""
|
||||
# Return if disconnected
|
||||
if not self.socket or self.socket.fileno() == -1:
|
||||
PchumLog.error(
|
||||
"Send attempted while disconnected, args: %s, text: %s.", args, text
|
||||
)
|
||||
return
|
||||
|
||||
command = ""
|
||||
# Convert command arguments to a single string if passed.
|
||||
if args:
|
||||
command += " ".join(args)
|
||||
# If text is passed, add ':' to imply everything after it is one parameter.
|
||||
if text:
|
||||
command += f" :{text}"
|
||||
# Add characters for end of line in IRC.
|
||||
command += "\r\n"
|
||||
# UTF-8 is the prefered encoding in 2023.
|
||||
outgoing_bytes = command.encode(encoding="utf-8", errors="replace")
|
||||
|
||||
try:
|
||||
PchumLog.debug("Sending: %s", command)
|
||||
self.socket.sendall(outgoing_bytes)
|
||||
except OSError:
|
||||
PchumLog.exception("Error while sending: '%s'", command.strip())
|
||||
self.socket.close()
|
||||
|
||||
def ping(self, token):
|
||||
"""Send PING command to server to check for connectivity."""
|
||||
self._send("PING", text=token)
|
||||
|
||||
def pong(self, token):
|
||||
"""Send PONG command to reply to server PING."""
|
||||
self._send("PONG", token)
|
||||
|
||||
def nick(self, nick):
|
||||
"""Send USER command to communicate nick to server."""
|
||||
self._send("NICK", nick)
|
||||
|
||||
def user(self, username, realname):
|
||||
"""Send USER command to communicate username and realname to server."""
|
||||
self._send("USER", username, "0", "*", text=realname)
|
||||
|
||||
def privmsg(self, target, text):
|
||||
"""Send PRIVMSG command to send a message."""
|
||||
for line in text.split("\n"):
|
||||
self._send("PRIVMSG", target, text=line)
|
||||
|
||||
def names(self, channel):
|
||||
"""Send NAMES command to view channel members."""
|
||||
self._send("NAMES", channel)
|
||||
|
||||
def kick(self, channel, user, reason=""):
|
||||
"""Send KICK command to force user from channel."""
|
||||
if reason:
|
||||
self._send(f"KICK {channel} {user}", text=reason)
|
||||
else:
|
||||
self._send(f"KICK {channel} {user}")
|
||||
|
||||
def mode(self, target, modestring="", mode_arguments=""):
|
||||
"""Set or remove modes from target."""
|
||||
outgoing_mode = " ".join([target, modestring, mode_arguments]).strip()
|
||||
self._send("MODE", outgoing_mode)
|
||||
|
||||
def ctcp(self, target, command, msg=""):
|
||||
"""Send Client-to-Client Protocol message."""
|
||||
outgoing_ctcp = " ".join(
|
||||
[command, msg]
|
||||
).strip() # Extra spaces break protocol, so strip.
|
||||
self.privmsg(target, f"\x01{outgoing_ctcp}\x01")
|
||||
|
||||
def metadata(self, target, subcommand, *params):
|
||||
"""Send Metadata command to get or set metadata.
|
||||
|
||||
See IRC metadata draft specification:
|
||||
https://gist.github.com/k4bek4be/92c2937cefd49990fbebd001faf2b237
|
||||
"""
|
||||
self._send("METADATA", target, subcommand, *params)
|
||||
|
||||
def cap(self, subcommand, *params):
|
||||
"""Send IRCv3 CAP command for capability negotiation.
|
||||
|
||||
See: https://ircv3.net/specs/extensions/capability-negotiation.html"""
|
||||
self._send("CAP", subcommand, *params)
|
||||
|
||||
def join(self, channel, key=""):
|
||||
"""Send JOIN command to join a channel/memo.
|
||||
|
||||
Keys or joining multiple channels is possible in the specification, but unused.
|
||||
"""
|
||||
channel_and_key = " ".join([channel, key]).strip()
|
||||
self._send("JOIN", channel_and_key)
|
||||
|
||||
def part(self, channel):
|
||||
"""Send PART command to leave a channel/memo.
|
||||
|
||||
Providing a reason or leaving multiple channels is possible in the specification.
|
||||
"""
|
||||
self._send("PART", channel)
|
||||
|
||||
def notice(self, target, text):
|
||||
"""Send a NOTICE to a user or channel."""
|
||||
self._send("NOTICE", target, text=text)
|
||||
|
||||
def invite(self, nick, channel):
|
||||
"""Send INVITE command to invite a user to a channel."""
|
||||
self._send("INVITE", nick, channel)
|
||||
|
||||
def away(self, text=None):
|
||||
"""AWAY command to mark client as away or no longer away.
|
||||
|
||||
No 'text' parameter means the client is no longer away."""
|
||||
if text:
|
||||
self._send("AWAY", text=text)
|
||||
else:
|
||||
self._send("AWAY")
|
||||
|
||||
def list(self):
|
||||
"""Send LIST command to get list of channels."""
|
||||
self._send("LIST")
|
||||
|
||||
def quit(self, reason=""):
|
||||
"""Send QUIT to terminate connection."""
|
||||
self._send("QUIT", text=reason)
|
||||
|
||||
|
||||
def parse_irc_line(line: str):
|
||||
"""Retrieves tags, prefix, command, and arguments from an unparsed IRC line."""
|
||||
parts = line.split(" ")
|
||||
tags = None
|
||||
prefix = None
|
||||
if parts[0].startswith(":"):
|
||||
prefix = parts[0][1:]
|
||||
command = parts[1]
|
||||
args = parts[2:]
|
||||
elif parts[0].startswith("@"):
|
||||
tags = parts[0] # IRCv3 message tag
|
||||
prefix = parts[1][1:]
|
||||
command = parts[2]
|
||||
args = parts[3:]
|
||||
else:
|
||||
command = parts[0]
|
||||
args = parts[1:]
|
||||
command = command.casefold()
|
||||
|
||||
# If ':' is present the subsequent args are one parameter.
|
||||
fused_args = []
|
||||
for idx, arg in enumerate(args):
|
||||
if arg.startswith(":"):
|
||||
final_param = " ".join(args[idx:])
|
||||
fused_args.append(final_param[1:])
|
||||
break
|
||||
fused_args.append(arg)
|
||||
|
||||
return (tags, prefix, command, fused_args)
|
52
scripts/ssl_context.py
Normal file
52
scripts/ssl_context.py
Normal file
|
@ -0,0 +1,52 @@
|
|||
"""Provides a function for creating an appropriate SSL context."""
|
||||
import ssl
|
||||
import sys
|
||||
import datetime
|
||||
import logging
|
||||
|
||||
PchumLog = logging.getLogger("pchumLogger")
|
||||
|
||||
try:
|
||||
import certifi
|
||||
except ImportError:
|
||||
if sys.platform == "darwin":
|
||||
# Certifi is required to validate certificates on MacOS with pyinstaller builds.
|
||||
PchumLog.warning(
|
||||
"Failed to import certifi, which is recommended on MacOS. "
|
||||
"Pesterchum might not be able to validate certificates unless "
|
||||
"Python's root certs are installed."
|
||||
)
|
||||
else:
|
||||
PchumLog.info(
|
||||
"Failed to import certifi, Pesterchum will not be able to validate "
|
||||
"certificates if the system-provided root certificates are invalid."
|
||||
)
|
||||
|
||||
|
||||
def get_ssl_context():
|
||||
"""Returns an SSL context for connecting over SSL/TLS.
|
||||
Loads the certifi root certificate bundle if the certifi module is less
|
||||
than a year old or if the system certificate store is empty.
|
||||
|
||||
The cert store on Windows also seems to have issues, so it's better
|
||||
to use the certifi provided bundle assuming it's a recent version.
|
||||
|
||||
On MacOS the system cert store is usually empty, as Python does not use
|
||||
the system provided ones, instead relying on a bundle installed with the
|
||||
python installer."""
|
||||
default_context = ssl.create_default_context()
|
||||
if "certifi" not in sys.modules:
|
||||
return default_context
|
||||
|
||||
# Get age of certifi module
|
||||
certifi_date = datetime.datetime.strptime(certifi.__version__, "%Y.%m.%d")
|
||||
current_date = datetime.datetime.now()
|
||||
certifi_age = current_date - certifi_date
|
||||
|
||||
empty_cert_store = list(default_context.cert_store_stats().values()).count(0) == 3
|
||||
# 31557600 seconds is approximately 1 year
|
||||
if empty_cert_store or certifi_age.total_seconds() <= 31557600:
|
||||
PchumLog.info("Using SSL/TLS context with certifi-provided root certificates.")
|
||||
return ssl.create_default_context(cafile=certifi.where())
|
||||
PchumLog.info("Using SSL/TLS context with system-provided root certificates.")
|
||||
return default_context
|
Loading…
Reference in a new issue