Add STS tests.

This commit is contained in:
2019-12-08 21:26:21 +01:00
parent 857e8d195e
commit 71d9315813
12 changed files with 153 additions and 11 deletions

View File

@ -39,6 +39,11 @@ class DirectoryBasedController(_BaseController):
self.kill_proc()
if self.directory:
shutil.rmtree(self.directory)
def terminate(self):
"""Stops the process gracefully, and does not clean its config."""
self.proc.terminate()
self.proc.wait()
self.proc = None
def open_file(self, name, mode='a'):
"""Open a file in the configuration directory."""
assert self.directory
@ -49,7 +54,13 @@ class DirectoryBasedController(_BaseController):
assert os.path.isdir(dir_)
return open(os.path.join(self.directory, name), mode)
def create_config(self):
"""If there is no config dir, creates it and returns True.
Else returns False."""
if self.directory:
return False
else:
self.directory = tempfile.mkdtemp()
return True
def gen_ssl(self):
self.csr_path = os.path.join(self.directory, 'ssl.csr')

View File

@ -102,6 +102,8 @@ class BaseClientTestCase(_IrcTestCase):
self.conn.sendall(b'QUIT :end of test.')
except BrokenPipeError:
pass # client disconnected before we did
except OSError:
pass # the conn was already closed by the test, or something
self.controller.kill()
if self.conn:
self.conn_file.close()
@ -113,9 +115,10 @@ class BaseClientTestCase(_IrcTestCase):
self.server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.server.bind(('', 0)) # Bind any free port
self.server.listen(1)
def acceptClient(self, tls_cert=None, tls_key=None):
def acceptClient(self, tls_cert=None, tls_key=None, server=None):
"""Make the server accept a client connection. Blocking."""
(self.conn, addr) = self.server.accept()
server = server or self.server
(self.conn, addr) = server.accept()
if tls_cert is None and tls_key is None:
pass
else:
@ -414,6 +417,20 @@ class OptionalityHelper:
return f(self)
return newf
def checkCapabilitySupport(self, cap):
if cap in self.controller.supported_capabilities:
return
raise runner.CapabilityNotSupported(cap)
def skipUnlessSupportsCapability(cap):
def decorator(f):
@functools.wraps(f)
def newf(self):
self.checkCapabilitySupport(cap)
return f(self)
return newf
return decorator
class SpecificationSelector:
def requiredBySpecification(*specifications, strict=False):

View File

@ -1,3 +1,6 @@
import socket
import ssl
from irctest import tls
from irctest import cases
from irctest.exceptions import ConnectionClosed
@ -141,3 +144,92 @@ class TlsTestCase(cases.BaseClientTestCase):
self.acceptClient(tls_cert=BAD_CERT, tls_key=BAD_KEY)
with self.assertRaises((ConnectionClosed, ConnectionResetError)):
m = self.getMessage()
class StsTestCase(cases.BaseClientTestCase, cases.OptionalityHelper):
def setUp(self):
super().setUp()
self.insecure_server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.insecure_server.bind(('', 0)) # Bind any free port
self.insecure_server.listen(1)
def tearDown(self):
self.insecure_server.close()
super().tearDown()
@cases.OptionalityHelper.skipUnlessSupportsCapability('sts')
def testSts(self):
tls_config = tls.TlsConfig(
enable=False,
trusted_fingerprints=[GOOD_FINGERPRINT])
# Connect client to insecure server
(hostname, port) = self.insecure_server.getsockname()
self.controller.run(
hostname=hostname,
port=port,
auth=None,
tls_config=tls_config,
)
self.acceptClient(server=self.insecure_server)
# Send STS policy to client
m = self.getMessage()
self.assertEqual(m.command, 'CAP',
'First message is not CAP LS.')
self.assertEqual(m.params[0], 'LS',
'First message is not CAP LS.')
self.sendLine('CAP * LS :sts=port={}'.format(self.server.getsockname()[1]))
# "If the client is not already connected securely to the server
# at the requested hostname, it MUST close the insecure connection
# and reconnect securely on the stated port."
self.acceptClient(tls_cert=GOOD_CERT, tls_key=GOOD_KEY)
# Send the STS policy, over secure connection this time
self.sendLine('CAP * LS :sts=duration=10,port={}'.format(
self.server.getsockname()[1]))
# Make the client reconnect. It should reconnect to the secure server.
self.sendLine('ERROR :closing link')
self.acceptClient()
# Kill the client
self.controller.terminate()
# Run the client, still configured to connect to the insecure server
self.controller.run(
hostname=hostname,
port=port,
auth=None,
tls_config=tls_config,
)
# The client should remember the STS policy and connect to the secure
# server
self.acceptClient()
@cases.OptionalityHelper.skipUnlessSupportsCapability('sts')
def testStsInvalidCertificate(self):
# Connect client to insecure server
(hostname, port) = self.insecure_server.getsockname()
self.controller.run(
hostname=hostname,
port=port,
auth=None,
)
self.acceptClient(server=self.insecure_server)
# Send STS policy to client
m = self.getMessage()
self.assertEqual(m.command, 'CAP',
'First message is not CAP LS.')
self.assertEqual(m.params[0], 'LS',
'First message is not CAP LS.')
self.sendLine('CAP * LS :sts=port={}'.format(self.server.getsockname()[1]))
# The client will reconnect to the TLS port. Unfortunately, it does
# not trust its fingerprint.
with self.assertRaises((ssl.SSLError, socket.error)):
self.acceptClient(tls_cert=GOOD_CERT, tls_key=GOOD_KEY)

View File

@ -45,6 +45,7 @@ TEMPLATE_SSL_CONFIG = """
class CharybdisController(BaseServerController, DirectoryBasedController):
software_name = 'Charybdis'
supported_sasl_mechanisms = set()
supported_capabilities = set() # Not exhaustive
def create_config(self):
super().create_config()
with self.open_file('server.conf'):

View File

@ -4,12 +4,13 @@ from irctest.basecontrollers import BaseClientController, NotImplementedByContro
class GircController(BaseClientController):
software_name = 'gIRC'
supported_sasl_mechanisms = ['PLAIN']
supported_capabilities = set() # Not exhaustive
def __init__(self):
super().__init__()
self.directory = None
self.proc = None
self.supported_sasl_mechanisms = ['PLAIN']
def kill(self):
if self.proc:

View File

@ -43,6 +43,8 @@ TEMPLATE_SSL_CONFIG = """
class HybridController(BaseServerController, DirectoryBasedController):
software_name = 'Hybrid'
supported_sasl_mechanisms = set()
supported_capabilities = set() # Not exhaustive
def create_config(self):
super().create_config()
with self.open_file('server.conf'):

View File

@ -30,6 +30,8 @@ TEMPLATE_SSL_CONFIG = """
class InspircdController(BaseServerController, DirectoryBasedController):
software_name = 'InspIRCd'
supported_sasl_mechanisms = set()
supported_capabilities = set() # Not exhaustive
def create_config(self):
super().create_config()
with self.open_file('server.conf'):

View File

@ -2,6 +2,7 @@ import os
import subprocess
from irctest import authentication
from irctest import tls
from irctest.basecontrollers import NotImplementedByController
from irctest.basecontrollers import BaseClientController, DirectoryBasedController
@ -30,14 +31,19 @@ class LimnoriaController(BaseClientController, DirectoryBasedController):
supported_sasl_mechanisms = {
'PLAIN', 'ECDSA-NIST256P-CHALLENGE', 'SCRAM-SHA-256', 'EXTERNAL',
}
supported_capabilities = {'sts'} # Not exhaustive
def create_config(self):
super().create_config()
create_config = super().create_config()
if create_config:
with self.open_file('bot.conf'):
pass
with self.open_file('conf/users.conf'):
pass
def run(self, hostname, port, auth, tls_config):
def run(self, hostname, port, auth, tls_config=None):
if tls_config is None:
tls_config = tls.TlsConfig(enable=False, trusted_fingerprints=[])
# Runs a client with the config given as arguments
assert self.proc is None
self.create_config()

View File

@ -66,6 +66,8 @@ class MammonController(BaseServerController, DirectoryBasedController):
supported_sasl_mechanisms = {
'PLAIN', 'ECDSA-NIST256P-CHALLENGE',
}
supported_capabilities = set() # Not exhaustive
def create_config(self):
super().create_config()
with self.open_file('server.conf'):

View File

@ -77,6 +77,8 @@ class OragonoController(BaseServerController, DirectoryBasedController):
supported_sasl_mechanisms = {
'PLAIN',
}
supported_capabilities = set() # Not exhaustive
def create_config(self):
super().create_config()
with self.open_file('ircd.yaml'):

View File

@ -24,6 +24,8 @@ class SopelController(BaseClientController):
supported_sasl_mechanisms = {
'PLAIN',
}
supported_caps = set() # Not exhaustive
def __init__(self):
super().__init__()
self.filename = next(tempfile._get_candidate_names()) + '.cfg'

View File

@ -19,6 +19,10 @@ class OptionalSaslMechanismNotSupported(unittest.SkipTest):
def __str__(self):
return 'Unsupported SASL mechanism: {}'.format(self.args[0])
class CapabilityNotSupported(unittest.SkipTest):
def __str__(self):
return 'Unsupported capability: {}'.format(self.args[0])
class NotRequiredBySpecifications(unittest.SkipTest):
def __str__(self):
return 'Tests not required by the set of tested specification(s).'