Add SASL PLAIN test.

This commit is contained in:
Valentin Lorentz
2015-12-19 17:52:38 +01:00
parent 2a4b1f1540
commit 9868f6a1d4
5 changed files with 75 additions and 27 deletions

View File

@ -2,7 +2,7 @@ class _BaseController:
pass pass
class BaseClientController(_BaseController): class BaseClientController(_BaseController):
def run(self, hostname, port, authentication): def run(self, hostname, port, auth):
raise NotImplementedError() raise NotImplementedError()
class BaseServerController(_BaseController): class BaseServerController(_BaseController):

View File

@ -1,6 +1,7 @@
import socket import socket
import unittest import unittest
from . import authentication
from .irc_utils import message_parser from .irc_utils import message_parser
class _IrcTestCase(unittest.TestCase): class _IrcTestCase(unittest.TestCase):
@ -20,11 +21,14 @@ class _IrcTestCase(unittest.TestCase):
class BaseClientTestCase(_IrcTestCase): class BaseClientTestCase(_IrcTestCase):
"""Basic class for client tests. Handles spawning a client and getting """Basic class for client tests. Handles spawning a client and getting
messages from it.""" messages from it."""
nick = None
user = None
def setUp(self): def setUp(self):
self.controller = self.controllerClass() self.controller = self.controllerClass()
self._setUpServer() self._setUpServer()
def tearDown(self): def tearDown(self):
del self.controller self.controller.kill()
self.conn.sendall(b'QUIT :end of test.')
self.conn_file.close() self.conn_file.close()
self.conn.close() self.conn.close()
self.server.close() self.server.close()
@ -49,16 +53,17 @@ class BaseClientTestCase(_IrcTestCase):
assert self.conn.sendall(line.encode()) is None assert self.conn.sendall(line.encode()) is None
if not line.endswith('\r\n'): if not line.endswith('\r\n'):
assert self.conn.sendall(b'\r\n') is None assert self.conn.sendall(b'\r\n') is None
if self.show_io:
print('S: {}'.format(line.strip())) print('S: {}'.format(line.strip()))
class ClientNegociationHelper: class ClientNegociationHelper:
"""Helper class for tests handling capabilities negociation.""" """Helper class for tests handling capabilities negociation."""
def readCapLs(self): def readCapLs(self, auth=None):
(hostname, port) = self.server.getsockname() (hostname, port) = self.server.getsockname()
self.controller.run( self.controller.run(
hostname=hostname, hostname=hostname,
port=port, port=port,
authentication=None, auth=auth,
) )
self.acceptClient() self.acceptClient()
m = self.getMessage() m = self.getMessage()
@ -83,26 +88,34 @@ class ClientNegociationHelper:
return False return False
elif msg.command == 'USER': elif msg.command == 'USER':
self.assertEqual(len(msg.params), 4, msg) self.assertEqual(len(msg.params), 4, msg)
self.nick = msg.params self.user = msg.params
return False return False
else: else:
return True return True
def negotiateCapabilities(self, cap_ls): def negotiateCapabilities(self, capabilities, cap_ls=True, auth=None):
self.readCapLs() if cap_ls:
self.readCapLs(auth)
if not self.protocol_version: if not self.protocol_version:
# No negotiation. # No negotiation.
return return
self.sendLine('CAP * LS :') self.sendLine('CAP * LS :{}'.format(' '.join(capabilities)))
while True: while True:
m = self.getMessage(filter_pred=self.userNickPredicate) m = self.getMessage(filter_pred=self.userNickPredicate)
self.assertEqual(m.command, 'CAP') if m.command != 'CAP':
return m
self.assertGreater(len(m.params), 0, m) self.assertGreater(len(m.params), 0, m)
if m.params[0] == 'REQ': if m.params[0] == 'REQ':
self.assertEqual(len(m.params), 2, m) self.assertEqual(len(m.params), 2, m)
requested = frozenset(m.params[1].split()) requested = frozenset(m.params[1].split())
if not requested.issubset(cap_ls): if not requested.issubset(capabilities):
self.sendLine('CAP * NAK :{}'.format(m.params[1])[0:100]) self.sendLine('CAP {} NAK :{}'.format(
self.nick or '*',
m.params[1][0:100]))
else:
self.sendLine('CAP {} ACK :{}'.format(
self.nick or '*',
m.params[1]))
else: else:
return m return m

View File

@ -1,12 +1,18 @@
import os import os
import shutil
import tempfile import tempfile
import subprocess import subprocess
from irctest import authentication
from irctest.basecontrollers import BaseClientController from irctest.basecontrollers import BaseClientController
TEMPLATE_CONFIG = """ TEMPLATE_CONFIG = """
supybot.log.stdout.level: {loglevel}
supybot.networks: testnet supybot.networks: testnet
supybot.networks.testnet.servers: {hostname}:{port} supybot.networks.testnet.servers: {hostname}:{port}
supybot.networks.testnet.sasl.username: {username}
supybot.networks.testnet.sasl.password: {password}
supybot.networks.testnet.sasl.mechanisms: {mechanisms}
""" """
class LimnoriaController(BaseClientController): class LimnoriaController(BaseClientController):
@ -14,30 +20,52 @@ class LimnoriaController(BaseClientController):
super().__init__() super().__init__()
self.directory = None self.directory = None
self.proc = None self.proc = None
def __del__(self): def kill(self):
if self.proc: if self.proc:
self.proc.terminate()
try:
self.proc.wait(5)
except subprocess.TimeoutExpired:
self.proc.kill() self.proc.kill()
self.proc = None
if self.directory: if self.directory:
self.directory.cleanup() shutil.rmtree(self.directory)
def open_file(self, name): def open_file(self, name, mode='a'):
assert self.directory assert self.directory
return open(os.path.join(self.directory.name, name), 'a') if os.sep in name:
dir_ = os.path.join(self.directory, os.path.dirname(name))
if not os.path.isdir(dir_):
os.makedirs(dir_)
assert os.path.isdir(dir_)
return open(os.path.join(self.directory, name), mode)
def create_config(self): def create_config(self):
self.directory = tempfile.TemporaryDirectory() self.directory = tempfile.mkdtemp()
with self.open_file('bot.conf'): with self.open_file('bot.conf'):
pass pass
with self.open_file('conf/users.conf'):
pass
def run(self, hostname, port, authentication): def run(self, hostname, port, auth):
# Runs a client with the config given as arguments # Runs a client with the config given as arguments
assert self.proc is None
self.create_config() self.create_config()
if auth:
mechanisms = ' '.join(map(authentication.Mechanisms.as_string,
auth.mechanisms))
else:
mechanisms = ''
with self.open_file('bot.conf') as fd: with self.open_file('bot.conf') as fd:
fd.write(TEMPLATE_CONFIG.format( fd.write(TEMPLATE_CONFIG.format(
loglevel='CRITICAL',
hostname=hostname, hostname=hostname,
port=port, port=port,
username=auth.username if auth else '',
password=auth.password if auth else '',
mechanisms=mechanisms.lower(),
)) ))
self.proc = subprocess.Popen(['supybot', self.proc = subprocess.Popen(['supybot',
os.path.join(self.directory.name, 'bot.conf')]) os.path.join(self.directory, 'bot.conf')])
def get_irctest_controller_class(): def get_irctest_controller_class():
return LimnoriaController return LimnoriaController

View File

@ -12,14 +12,17 @@ use_ssl = false
port = {port} port = {port}
owner = me owner = me
channels = channels =
auth_username = {username}
auth_password = {password}
{auth_method}
""" """
class SopelController(BaseClientController): class SopelController(BaseClientController):
def __init__(self): def __init__(self):
super().__init__() super().__init__()
self.filename = next(tempfile._get_candidate_names()) self.filename = next(tempfile._get_candidate_names()) + '.cfg'
self.proc = None self.proc = None
def __del__(self): def kill(self):
if self.proc: if self.proc:
self.proc.kill() self.proc.kill()
if self.filename: if self.filename:
@ -38,13 +41,17 @@ class SopelController(BaseClientController):
with self.open_file(self.filename) as fd: with self.open_file(self.filename) as fd:
pass pass
def run(self, hostname, port, authentication): def run(self, hostname, port, auth):
# Runs a client with the config given as arguments # Runs a client with the config given as arguments
assert self.proc is None
self.create_config() self.create_config()
with self.open_file(self.filename) as fd: with self.open_file(self.filename) as fd:
fd.write(TEMPLATE_CONFIG.format( fd.write(TEMPLATE_CONFIG.format(
hostname=hostname, hostname=hostname,
port=port, port=port,
username=auth.username if auth else '',
password=auth.password if auth else '',
auth_method='auth_method = sasl' if auth else '',
)) ))
self.proc = subprocess.Popen(['sopel', '-c', self.filename]) self.proc = subprocess.Popen(['sopel', '-c', self.filename])

View File

@ -34,7 +34,7 @@ import os
import sys import sys
from distutils.core import setup from distutils.core import setup
if sys.version_info < (3, 2, 0): if sys.version_info < (3, 4, 0):
sys.stderr.write("This script requires Python 3.2 or newer.") sys.stderr.write("This script requires Python 3.2 or newer.")
sys.stderr.write(os.linesep) sys.stderr.write(os.linesep)
sys.exit(-1) sys.exit(-1)