Add STS tests.

This commit is contained in:
2019-12-08 21:26:21 +01:00
parent 68c2dad8d9
commit 373c705247
12 changed files with 158 additions and 13 deletions

View File

@ -39,6 +39,11 @@ class DirectoryBasedController(_BaseController):
self.kill_proc() self.kill_proc()
if self.directory: if self.directory:
shutil.rmtree(self.directory) shutil.rmtree(self.directory)
def terminate(self):
"""Stops the process gracefully, and does not clean its config."""
self.proc.terminate()
self.proc.wait()
self.proc = None
def open_file(self, name, mode='a'): def open_file(self, name, mode='a'):
"""Open a file in the configuration directory.""" """Open a file in the configuration directory."""
assert self.directory assert self.directory
@ -49,7 +54,13 @@ class DirectoryBasedController(_BaseController):
assert os.path.isdir(dir_) assert os.path.isdir(dir_)
return open(os.path.join(self.directory, name), mode) return open(os.path.join(self.directory, name), mode)
def create_config(self): def create_config(self):
self.directory = tempfile.mkdtemp() """If there is no config dir, creates it and returns True.
Else returns False."""
if self.directory:
return False
else:
self.directory = tempfile.mkdtemp()
return True
def gen_ssl(self): def gen_ssl(self):
self.csr_path = os.path.join(self.directory, 'ssl.csr') self.csr_path = os.path.join(self.directory, 'ssl.csr')

View File

@ -118,8 +118,9 @@ class BaseClientTestCase(_IrcTestCase):
try: try:
self.conn.sendall(b'QUIT :end of test.') self.conn.sendall(b'QUIT :end of test.')
except BrokenPipeError: except BrokenPipeError:
# client already disconnected pass # client already disconnected
pass except OSError:
pass # the conn was already closed by the test, or something
self.controller.kill() self.controller.kill()
if self.conn: if self.conn:
self.conn_file.close() self.conn_file.close()
@ -131,9 +132,10 @@ class BaseClientTestCase(_IrcTestCase):
self.server = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.server.bind(('', 0)) # Bind any free port self.server.bind(('', 0)) # Bind any free port
self.server.listen(1) self.server.listen(1)
def acceptClient(self, tls_cert=None, tls_key=None): def acceptClient(self, tls_cert=None, tls_key=None, server=None):
"""Make the server accept a client connection. Blocking.""" """Make the server accept a client connection. Blocking."""
(self.conn, addr) = self.server.accept() server = server or self.server
(self.conn, addr) = server.accept()
if tls_cert is None and tls_key is None: if tls_cert is None and tls_key is None:
pass pass
else: else:
@ -455,6 +457,20 @@ class OptionalityHelper:
return f(self) return f(self)
return newf return newf
def checkCapabilitySupport(self, cap):
if cap in self.controller.supported_capabilities:
return
raise runner.CapabilityNotSupported(cap)
def skipUnlessSupportsCapability(cap):
def decorator(f):
@functools.wraps(f)
def newf(self):
self.checkCapabilitySupport(cap)
return f(self)
return newf
return decorator
class SpecificationSelector: class SpecificationSelector:
def requiredBySpecification(*specifications, strict=False): def requiredBySpecification(*specifications, strict=False):

View File

@ -1,3 +1,6 @@
import socket
import ssl
from irctest import tls from irctest import tls
from irctest import cases from irctest import cases
from irctest.exceptions import ConnectionClosed from irctest.exceptions import ConnectionClosed
@ -141,3 +144,92 @@ class TlsTestCase(cases.BaseClientTestCase):
self.acceptClient(tls_cert=BAD_CERT, tls_key=BAD_KEY) self.acceptClient(tls_cert=BAD_CERT, tls_key=BAD_KEY)
with self.assertRaises((ConnectionClosed, ConnectionResetError)): with self.assertRaises((ConnectionClosed, ConnectionResetError)):
m = self.getMessage() m = self.getMessage()
class StsTestCase(cases.BaseClientTestCase, cases.OptionalityHelper):
def setUp(self):
super().setUp()
self.insecure_server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.insecure_server.bind(('', 0)) # Bind any free port
self.insecure_server.listen(1)
def tearDown(self):
self.insecure_server.close()
super().tearDown()
@cases.OptionalityHelper.skipUnlessSupportsCapability('sts')
def testSts(self):
tls_config = tls.TlsConfig(
enable=False,
trusted_fingerprints=[GOOD_FINGERPRINT])
# Connect client to insecure server
(hostname, port) = self.insecure_server.getsockname()
self.controller.run(
hostname=hostname,
port=port,
auth=None,
tls_config=tls_config,
)
self.acceptClient(server=self.insecure_server)
# Send STS policy to client
m = self.getMessage()
self.assertEqual(m.command, 'CAP',
'First message is not CAP LS.')
self.assertEqual(m.params[0], 'LS',
'First message is not CAP LS.')
self.sendLine('CAP * LS :sts=port={}'.format(self.server.getsockname()[1]))
# "If the client is not already connected securely to the server
# at the requested hostname, it MUST close the insecure connection
# and reconnect securely on the stated port."
self.acceptClient(tls_cert=GOOD_CERT, tls_key=GOOD_KEY)
# Send the STS policy, over secure connection this time
self.sendLine('CAP * LS :sts=duration=10,port={}'.format(
self.server.getsockname()[1]))
# Make the client reconnect. It should reconnect to the secure server.
self.sendLine('ERROR :closing link')
self.acceptClient()
# Kill the client
self.controller.terminate()
# Run the client, still configured to connect to the insecure server
self.controller.run(
hostname=hostname,
port=port,
auth=None,
tls_config=tls_config,
)
# The client should remember the STS policy and connect to the secure
# server
self.acceptClient()
@cases.OptionalityHelper.skipUnlessSupportsCapability('sts')
def testStsInvalidCertificate(self):
# Connect client to insecure server
(hostname, port) = self.insecure_server.getsockname()
self.controller.run(
hostname=hostname,
port=port,
auth=None,
)
self.acceptClient(server=self.insecure_server)
# Send STS policy to client
m = self.getMessage()
self.assertEqual(m.command, 'CAP',
'First message is not CAP LS.')
self.assertEqual(m.params[0], 'LS',
'First message is not CAP LS.')
self.sendLine('CAP * LS :sts=port={}'.format(self.server.getsockname()[1]))
# The client will reconnect to the TLS port. Unfortunately, it does
# not trust its fingerprint.
with self.assertRaises((ssl.SSLError, socket.error)):
self.acceptClient(tls_cert=GOOD_CERT, tls_key=GOOD_KEY)

View File

@ -45,6 +45,7 @@ TEMPLATE_SSL_CONFIG = """
class CharybdisController(BaseServerController, DirectoryBasedController): class CharybdisController(BaseServerController, DirectoryBasedController):
software_name = 'Charybdis' software_name = 'Charybdis'
supported_sasl_mechanisms = set() supported_sasl_mechanisms = set()
supported_capabilities = set() # Not exhaustive
def create_config(self): def create_config(self):
super().create_config() super().create_config()
with self.open_file('server.conf'): with self.open_file('server.conf'):

View File

@ -4,12 +4,13 @@ from irctest.basecontrollers import BaseClientController, NotImplementedByContro
class GircController(BaseClientController): class GircController(BaseClientController):
software_name = 'gIRC' software_name = 'gIRC'
supported_sasl_mechanisms = ['PLAIN']
supported_capabilities = set() # Not exhaustive
def __init__(self): def __init__(self):
super().__init__() super().__init__()
self.directory = None self.directory = None
self.proc = None self.proc = None
self.supported_sasl_mechanisms = ['PLAIN']
def kill(self): def kill(self):
if self.proc: if self.proc:

View File

@ -43,6 +43,8 @@ TEMPLATE_SSL_CONFIG = """
class HybridController(BaseServerController, DirectoryBasedController): class HybridController(BaseServerController, DirectoryBasedController):
software_name = 'Hybrid' software_name = 'Hybrid'
supported_sasl_mechanisms = set() supported_sasl_mechanisms = set()
supported_capabilities = set() # Not exhaustive
def create_config(self): def create_config(self):
super().create_config() super().create_config()
with self.open_file('server.conf'): with self.open_file('server.conf'):

View File

@ -30,6 +30,8 @@ TEMPLATE_SSL_CONFIG = """
class InspircdController(BaseServerController, DirectoryBasedController): class InspircdController(BaseServerController, DirectoryBasedController):
software_name = 'InspIRCd' software_name = 'InspIRCd'
supported_sasl_mechanisms = set() supported_sasl_mechanisms = set()
supported_capabilities = set() # Not exhaustive
def create_config(self): def create_config(self):
super().create_config() super().create_config()
with self.open_file('server.conf'): with self.open_file('server.conf'):

View File

@ -2,6 +2,7 @@ import os
import subprocess import subprocess
from irctest import authentication from irctest import authentication
from irctest import tls
from irctest.basecontrollers import NotImplementedByController from irctest.basecontrollers import NotImplementedByController
from irctest.basecontrollers import BaseClientController, DirectoryBasedController from irctest.basecontrollers import BaseClientController, DirectoryBasedController
@ -30,14 +31,19 @@ class LimnoriaController(BaseClientController, DirectoryBasedController):
supported_sasl_mechanisms = { supported_sasl_mechanisms = {
'PLAIN', 'ECDSA-NIST256P-CHALLENGE', 'SCRAM-SHA-256', 'EXTERNAL', 'PLAIN', 'ECDSA-NIST256P-CHALLENGE', 'SCRAM-SHA-256', 'EXTERNAL',
} }
def create_config(self): supported_capabilities = {'sts'} # Not exhaustive
super().create_config()
with self.open_file('bot.conf'):
pass
with self.open_file('conf/users.conf'):
pass
def run(self, hostname, port, auth, tls_config): def create_config(self):
create_config = super().create_config()
if create_config:
with self.open_file('bot.conf'):
pass
with self.open_file('conf/users.conf'):
pass
def run(self, hostname, port, auth, tls_config=None):
if tls_config is None:
tls_config = tls.TlsConfig(enable=False, trusted_fingerprints=[])
# Runs a client with the config given as arguments # Runs a client with the config given as arguments
assert self.proc is None assert self.proc is None
self.create_config() self.create_config()

View File

@ -66,6 +66,8 @@ class MammonController(BaseServerController, DirectoryBasedController):
supported_sasl_mechanisms = { supported_sasl_mechanisms = {
'PLAIN', 'ECDSA-NIST256P-CHALLENGE', 'PLAIN', 'ECDSA-NIST256P-CHALLENGE',
} }
supported_capabilities = set() # Not exhaustive
def create_config(self): def create_config(self):
super().create_config() super().create_config()
with self.open_file('server.conf'): with self.open_file('server.conf'):

View File

@ -153,6 +153,12 @@ class OragonoController(BaseServerController, DirectoryBasedController):
'PLAIN', 'PLAIN',
} }
_port_wait_interval = .01 _port_wait_interval = .01
supported_capabilities = set() # Not exhaustive
def create_config(self):
super().create_config()
with self.open_file('ircd.yaml'):
pass
def kill_proc(self): def kill_proc(self):
self.proc.kill() self.proc.kill()

View File

@ -24,6 +24,8 @@ class SopelController(BaseClientController):
supported_sasl_mechanisms = { supported_sasl_mechanisms = {
'PLAIN', 'PLAIN',
} }
supported_caps = set() # Not exhaustive
def __init__(self, test_config): def __init__(self, test_config):
super().__init__(test_config) super().__init__(test_config)
self.filename = next(tempfile._get_candidate_names()) + '.cfg' self.filename = next(tempfile._get_candidate_names()) + '.cfg'

View File

@ -19,6 +19,10 @@ class OptionalSaslMechanismNotSupported(unittest.SkipTest):
def __str__(self): def __str__(self):
return 'Unsupported SASL mechanism: {}'.format(self.args[0]) return 'Unsupported SASL mechanism: {}'.format(self.args[0])
class CapabilityNotSupported(unittest.SkipTest):
def __str__(self):
return 'Unsupported capability: {}'.format(self.args[0])
class NotRequiredBySpecifications(unittest.SkipTest): class NotRequiredBySpecifications(unittest.SkipTest):
def __str__(self): def __str__(self):
return 'Tests not required by the set of tested specification(s).' return 'Tests not required by the set of tested specification(s).'