Use certifi certificates if default cert store is empty.

Previous behavior was to always load both the certifi certificates and system-provided certificates when availible, now it's strictly a fallback.
This commit is contained in:
Dpeta 2022-12-07 02:01:43 +01:00
parent 08812b0448
commit 25e8deb7f7

View file

@ -39,6 +39,11 @@ except ImportError:
"Pesterchum might not be able to validate certificates unless " "Pesterchum might not be able to validate certificates unless "
"Python's root certs are installed." "Python's root certs are installed."
) )
else:
PchumLog.info(
"Failed to import certifi, Pesterchum will not be able to validate "
"certificates if the system-provided root certificates are invalid."
)
class IRCClientError(Exception): class IRCClientError(Exception):
@ -200,45 +205,49 @@ class IRCClient:
# raise se # raise se
self._end = True # This ok? self._end = True # This ok?
def get_ssl_context(self):
"""Returns an SSL context for connecting over SSL/TLS.
Loads the certifi certs instead of the system-provided certificates if
the system certificate store is empty."""
context = ssl.create_default_context()
# Check if store is empty
empty_cert_store = list(context.cert_store_stats().values()).count(0) == 3
if empty_cert_store and "certifi" in sys.modules:
# Can't validate certificates if store is empty,
# load root certificates from certifi instead.
context = ssl.create_default_context(cafile=certifi.where())
PchumLog.info(
"Using SSL/TLS context with certifi-provided root certificates."
)
else:
PchumLog.info(
"Using SSL/TLS context with system-provided root certificates."
)
return context
def connect(self, verify_hostname=True): def connect(self, verify_hostname=True):
"""initiates the connection to the server set in self.host:self.port""" """initiates the connection to the server set in self.host:self.port
self.ssl decides whether the connection uses ssl.
Certificate validation when using SSL/TLS may be disabled by
passing the 'verify_hostname' parameter. The user is asked if they
want to disable it if this functions raises a certificate validation error,
in which case the function may be called again with 'verify_hostname'."""
PchumLog.info("connecting to %s:%s" % (self.host, self.port)) PchumLog.info("connecting to %s:%s" % (self.host, self.port))
if self.ssl == True: # Open connection
context = ssl.create_default_context() plaintext_socket = socket.create_connection((self.host, self.port))
if verify_hostname == False:
context.check_hostname = False
context.verify_mode = ssl.CERT_NONE
else:
# Also load certifi provided root certs if present. (Mainly useful for MacOS)
if "certifi" in sys.modules:
try:
context.load_verify_locations(cafile=certifi.where())
except:
PchumLog.exception("")
bare_sock = socket.create_connection((self.host, self.port)) if self.ssl:
# Upgrade connection to use SSL/TLS if enabled
context = self.get_ssl_context()
context.check_hostname = verify_hostname
self.socket = context.wrap_socket( self.socket = context.wrap_socket(
bare_sock, server_hostname=self.host, do_handshake_on_connect=False plaintext_socket, server_hostname=self.host
) )
while True:
try:
self.socket.do_handshake()
break
except ssl.SSLWantReadError:
select.select([self.socket], [], [])
except ssl.SSLWantWriteError:
select.select([], [self.socket], [])
except ssl.SSLCertVerificationError as e:
# Disconnect for now
self.socket.close()
bare_sock.close()
raise e
PchumLog.info("secure sockets version is %s" % self.socket.version())
else: else:
self.socket = socket.create_connection((self.host, self.port)) # SSL/TLS is disabled, connection is plaintext
self.socket = plaintext_socket
# setblocking is a shorthand for timeout, # setblocking is a shorthand for timeout,
# we shouldn't use both. # we shouldn't use both.
@ -249,17 +258,6 @@ class IRCClient:
elif self.blocking: elif self.blocking:
self.socket.setblocking(True) self.socket.setblocking(True)
# try:
# self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
# if hasattr(socket, "TCP_KEEPIDLE"):
# self.socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, 1)
# if hasattr(socket, "TCP_KEEPINTVL"):
# self.socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, 1)
# if hasattr(socket, "TCP_KEEPCNT"):
# self.socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPCNT, 1)
# except Exception as e:
# print(e)
helpers.nick(self, self.nick) helpers.nick(self, self.nick)
helpers.user(self, self.username, self.realname) helpers.user(self, self.username, self.realname)
if self.connect_cb: if self.connect_cb: