56 Commits

Author SHA1 Message Date
cab27be1f5 fix incorrect type for empty tags 2021-02-15 21:36:55 +01:00
fc6bd4968d add a test for case changes 2021-02-15 21:36:55 +01:00
c2adc33109 remove starttls test 2021-02-15 21:36:55 +01:00
4519927265 add a test for PART messages 2021-02-15 21:36:55 +01:00
42f3a1f2fe add an away test 2021-02-15 21:36:55 +01:00
f3ff2a424b deflake another test 2021-02-15 21:13:12 +01:00
0c03a8e1c7 expand pyflakes list 2021-02-15 21:12:06 +01:00
7a05081960 add makefile 2021-02-15 21:11:56 +01:00
ee2e77b1e6 deflake registration tests by waiting for quit 2021-02-15 21:11:15 +01:00
9ff2239943 test RPL_ENDOFMONLIST responses 2021-02-15 21:06:18 +01:00
5f75f231a4 add list of numerics, start using them 2021-02-15 21:06:05 +01:00
b3a4a18885 kick privileges test 2021-02-15 21:05:19 +01:00
24df4e4496 fix lots of pyflakes3 failures 2021-02-15 21:00:51 +01:00
2b8aeacd4a tests for RPL_NOTOPIC 2021-02-15 20:59:03 +01:00
fc15ff09c0 fix testWhoisUser for charybdis. 2021-02-15 20:58:52 +01:00
7bf7df8ad1 basic whois test 2021-02-15 20:58:30 +01:00
6abc5b4f98 channel quit test case 2021-02-15 20:53:08 +01:00
cefbabf5c3 test incorrect PASS passwords 2021-02-15 20:52:49 +01:00
9b6f65b622 add regression tests 2021-02-15 20:52:00 +01:00
034fe5c51c framework enhancements 2021-02-15 20:47:08 +01:00
911c0ded04 add a test for INVITE 2021-02-15 20:45:08 +01:00
9b99c6ce20 Fix compatibility with return value of SSLSocket.sendall in python >= 3.6.
https://bugs.python.org/issue25951
2021-02-14 23:18:51 +01:00
e6b1ca5521 remove psutil 2021-02-14 22:59:19 +01:00
ac2671acb0 limnoria: add support for STS. 2021-02-14 22:22:01 +01:00
8a81224ba8 Fix ecdsa tests to use the same protocol as Atheme.
Which requires not hashing the challenge.
2019-12-26 12:10:45 +01:00
442c57e6c6 Temporarily disabling sts on Limnoria until it's released. 2019-12-09 22:13:55 +01:00
f8faec77f1 Ignore return value of sendall; it's not None on py < 3.6.
https://bugs.python.org/issue25951
2019-12-08 22:36:56 +01:00
71d9315813 Add STS tests. 2019-12-08 21:26:21 +01:00
857e8d195e Fix exception handling in testUntrustedCertificate on closed connection. 2019-10-22 18:28:39 +02:00
19a0623e91 echo-message: Check server-time more accurately, and handle slight timing differences due to late application 2018-07-12 15:19:07 +02:00
de4c51e744 oragono: Fix ACC command 2018-07-12 15:19:07 +02:00
1198504f74 Add tests for labeled-responses 2018-07-12 15:17:00 +02:00
5b319a46ee Make message tests less fragile 2018-07-12 15:17:00 +02:00
5950d97926 Bump version number to 0.1.2. 2017-12-10 20:21:29 +01:00
b35a7f7a60 Fix merge. 2017-12-10 20:21:02 +01:00
f47584589b Add tests for nonexistent channels
Inspired by oragono issue #165.
2017-12-09 09:44:45 +01:00
0804b78572 Add multi-prefix testcase 2017-12-09 09:43:58 +01:00
aecfc26a63 tls -> starttls, to match feature name better 2017-12-09 09:43:46 +01:00
f9f13961eb controllers: Add hybrid controller 2017-12-09 09:43:34 +01:00
2bc3cafd25 Make openssl binary configurable, for OSX 2017-12-09 09:43:19 +01:00
86d26d6121 charybdis: New releases name the binary 'charybdis' rather than 'ircd' 2017-12-09 09:42:58 +01:00
3d0b493a11 Don't send empty CAP REQ 2017-12-09 09:41:53 +01:00
711de43b22 Fix channel deterministic joining s'more 2017-12-09 09:41:45 +01:00
8b52ceeee3 Make tests around joining channels more deterministic 2017-12-09 09:41:22 +01:00
6d9c06096b channels: Check server casemapping before doing mapping checks 2017-12-09 09:38:26 +01:00
9575987555 For SCRAM, check clients send an empty response at the end.
https://github.com/ircv3/ircv3-specifications/pull/326
2017-11-15 17:11:44 +01:00
15a92ccf0b oragono: Allow TLS tests 2017-10-01 13:15:46 +02:00
0c12e0ed20 oragono: Use new registration command 2017-07-28 20:19:46 +02:00
f71badbbc1 oragono: Fix config so it loads 2017-07-28 20:19:38 +02:00
41f0418df7 Add gIRC controller 2017-01-11 00:35:22 +01:00
2a55c85c5a gitignore: Use gitignore.io 2017-01-11 00:35:07 +01:00
0c7358c0a5 test_sasl: Unify successful auth checking a bit more 2017-01-11 00:29:14 +01:00
6326af34cc Add Oragono IRCd 2017-01-11 00:29:03 +01:00
ba1fe57248 Bump version number. 2017-01-11 00:07:55 +01:00
6baee70852 Add tests for SCRAM. 2017-01-11 00:07:25 +01:00
2bdcba3da5 Update README. 2016-11-20 09:18:25 +01:00
38 changed files with 1587 additions and 165 deletions

104
.gitignore vendored
View File

@ -1,10 +1,73 @@
# Text editors
*~
*.swp
# Created by https://www.gitignore.io/api/windows,osx,linux,python
### Windows ###
# Windows image file caches
Thumbs.db
ehthumbs.db
# Folder config file
Desktop.ini
# Recycle Bin used on file shares
$RECYCLE.BIN/
# Windows Installer files
*.cab
*.msi
*.msm
*.msp
# Windows shortcuts
*.lnk
### OSX ###
*.DS_Store
.AppleDouble
.LSOverride
# Icon must end with two \r
Icon
# Thumbnails
._*
# Files that might appear in the root of a volume
.DocumentRevisions-V100
.fseventsd
.Spotlight-V100
.TemporaryItems
.Trashes
.VolumeIcon.icns
.com.apple.timemachine.donotpresent
# Directories potentially created on remote AFP share
.AppleDB
.AppleDesktop
Network Trash Folder
Temporary Items
.apdisk
### Linux ###
*~
# temporary files which can be created if a process still has a handle open of a deleted file
.fuse_hidden*
# KDE directory preferences
.directory
# Linux trash folder which might appear on any partition or disk
.Trash-*
# .nfs files are created when an open file is removed but is still being accessed
.nfs*
### Python ###
# Byte-compiled / optimized / DLL files # Byte-compiled / optimized / DLL files
__pycache__/ __pycache__/
*.py[cod] *.py[cod]
*$py.class
# C extensions # C extensions
*.so *.so
@ -46,6 +109,7 @@ htmlcov/
nosetests.xml nosetests.xml
coverage.xml coverage.xml
*,cover *,cover
.hypothesis/
# Translations # Translations
*.mo *.mo
@ -53,9 +117,43 @@ coverage.xml
# Django stuff: # Django stuff:
*.log *.log
local_settings.py
# Flask stuff:
instance/
.webassets-cache
# Scrapy stuff:
.scrapy
# Sphinx documentation # Sphinx documentation
docs/_build/ docs/_build/
# PyBuilder # PyBuilder
target/ target/
# Jupyter Notebook
.ipynb_checkpoints
# pyenv
.python-version
# celery beat schedule file
celerybeat-schedule
# dotenv
.env
# virtualenv
.venv/
venv/
ENV/
# Spyder project settings
.spyderproject
# Rope project settings
.ropeproject
# vim swapfiles
*.swp

5
Makefile Normal file
View File

@ -0,0 +1,5 @@
.PHONY: oragono
oragono:
pyflakes3 ./irctest/cases.py ./irctest/client_mock.py ./irctest/controllers/oragono.py irctest/server_tests/*.py
./test.py irctest.controllers.oragono

View File

@ -113,3 +113,4 @@ or anything near that.
At best, `irctest` can help you find issues in your software, but it may At best, `irctest` can help you find issues in your software, but it may
still have false positives (because it does not implement itself a still have false positives (because it does not implement itself a
full-featured client/server, so it supports only “usual” behavior). full-featured client/server, so it supports only “usual” behavior).
Bug reports for false positives are welcome.

View File

@ -28,6 +28,7 @@ def main(args):
file=sys.stderr) file=sys.stderr)
exit(1) exit(1)
_IrcTestCase.controllerClass = controller_class _IrcTestCase.controllerClass = controller_class
_IrcTestCase.controllerClass.openssl_bin = args.openssl_bin
_IrcTestCase.show_io = args.show_io _IrcTestCase.show_io = args.show_io
_IrcTestCase.strictTests = not args.loose _IrcTestCase.strictTests = not args.loose
if args.specification: if args.specification:
@ -63,6 +64,8 @@ parser = argparse.ArgumentParser(
description='A script to test interoperability of IRC software.') description='A script to test interoperability of IRC software.')
parser.add_argument('module', type=str, parser.add_argument('module', type=str,
help='The module used to run the tested program.') help='The module used to run the tested program.')
parser.add_argument('--openssl-bin', type=str, default='openssl',
help='The openssl binary to use')
parser.add_argument('--show-io', action='store_true', parser.add_argument('--show-io', action='store_true',
help='Show input/outputs with the tested program.') help='Show input/outputs with the tested program.')
parser.add_argument('-v', '--verbose', action='count', default=1, parser.add_argument('-v', '--verbose', action='count', default=1,

View File

@ -8,9 +8,11 @@ class Mechanisms(enum.Enum):
def as_string(cls, mech): def as_string(cls, mech):
return {cls.plain: 'PLAIN', return {cls.plain: 'PLAIN',
cls.ecdsa_nist256p_challenge: 'ECDSA-NIST256P-CHALLENGE', cls.ecdsa_nist256p_challenge: 'ECDSA-NIST256P-CHALLENGE',
cls.scram_sha_256: 'SCRAM-SHA-256',
}[mech] }[mech]
plain = 1 plain = 1
ecdsa_nist256p_challenge = 2 ecdsa_nist256p_challenge = 2
scram_sha_256 = 3
Authentication = collections.namedtuple('Authentication', Authentication = collections.namedtuple('Authentication',
'mechanisms username password ecdsa_key') 'mechanisms username password ecdsa_key')

View File

@ -4,7 +4,6 @@ import socket
import tempfile import tempfile
import time import time
import subprocess import subprocess
import psutil
from .runner import NotImplementedByController from .runner import NotImplementedByController
@ -39,6 +38,11 @@ class DirectoryBasedController(_BaseController):
self.kill_proc() self.kill_proc()
if self.directory: if self.directory:
shutil.rmtree(self.directory) shutil.rmtree(self.directory)
def terminate(self):
"""Stops the process gracefully, and does not clean its config."""
self.proc.terminate()
self.proc.wait()
self.proc = None
def open_file(self, name, mode='a'): def open_file(self, name, mode='a'):
"""Open a file in the configuration directory.""" """Open a file in the configuration directory."""
assert self.directory assert self.directory
@ -49,22 +53,28 @@ class DirectoryBasedController(_BaseController):
assert os.path.isdir(dir_) assert os.path.isdir(dir_)
return open(os.path.join(self.directory, name), mode) return open(os.path.join(self.directory, name), mode)
def create_config(self): def create_config(self):
self.directory = tempfile.mkdtemp() """If there is no config dir, creates it and returns True.
Else returns False."""
if self.directory:
return False
else:
self.directory = tempfile.mkdtemp()
return True
def gen_ssl(self): def gen_ssl(self):
self.csr_path = os.path.join(self.directory, 'ssl.csr') self.csr_path = os.path.join(self.directory, 'ssl.csr')
self.key_path = os.path.join(self.directory, 'ssl.key') self.key_path = os.path.join(self.directory, 'ssl.key')
self.pem_path = os.path.join(self.directory, 'ssl.pem') self.pem_path = os.path.join(self.directory, 'ssl.pem')
self.dh_path = os.path.join(self.directory, 'dh.pem') self.dh_path = os.path.join(self.directory, 'dh.pem')
subprocess.check_output(['openssl', 'req', '-new', '-newkey', 'rsa', subprocess.check_output([self.openssl_bin, 'req', '-new', '-newkey', 'rsa',
'-nodes', '-out', self.csr_path, '-keyout', self.key_path, '-nodes', '-out', self.csr_path, '-keyout', self.key_path,
'-batch'], '-batch'],
stderr=subprocess.DEVNULL) stderr=subprocess.DEVNULL)
subprocess.check_output(['openssl', 'x509', '-req', subprocess.check_output([self.openssl_bin, 'x509', '-req',
'-in', self.csr_path, '-signkey', self.key_path, '-in', self.csr_path, '-signkey', self.key_path,
'-out', self.pem_path], '-out', self.pem_path],
stderr=subprocess.DEVNULL) stderr=subprocess.DEVNULL)
subprocess.check_output(['openssl', 'dhparam', subprocess.check_output([self.openssl_bin, 'dhparam',
'-out', self.dh_path, '128'], '-out', self.dh_path, '128'],
stderr=subprocess.DEVNULL) stderr=subprocess.DEVNULL)
@ -80,11 +90,13 @@ class BaseServerController(_BaseController):
valid_metadata_keys, invalid_metadata_keys): valid_metadata_keys, invalid_metadata_keys):
raise NotImplementedError() raise NotImplementedError()
def registerUser(self, case, username, password=None): def registerUser(self, case, username, password=None):
raise NotImplementedByController('registration') raise NotImplementedByController('account registration')
def wait_for_port(self): def wait_for_port(self):
while not self.port_open: while not self.port_open:
time.sleep(0.1) time.sleep(0.1)
for conn in psutil.Process(self.proc.pid).connections(): try:
if conn.laddr[1] == self.port: c = socket.create_connection(('localhost', self.port), timeout=1.0)
self.port_open = True c.close()
self.port_open = True
except Exception as e:
continue

View File

@ -4,13 +4,11 @@ import socket
import tempfile import tempfile
import unittest import unittest
import functools import functools
import collections
import supybot.utils import supybot.utils
from . import runner from . import runner
from . import client_mock from . import client_mock
from . import authentication
from .irc_utils import capabilities from .irc_utils import capabilities
from .irc_utils import message_parser from .irc_utils import message_parser
from .exceptions import ConnectionClosed from .exceptions import ConnectionClosed
@ -56,7 +54,7 @@ class _IrcTestCase(unittest.TestCase):
self.assertEqual(msg.prefix.split('!')[0], nick, msg, fail_msg) self.assertEqual(msg.prefix.split('!')[0], nick, msg, fail_msg)
if subcommand is not None or subparams is not None: if subcommand is not None or subparams is not None:
self.assertGreater(len(msg.params), 2, fail_msg) self.assertGreater(len(msg.params), 2, fail_msg)
msg_target = msg.params[0] #msg_target = msg.params[0]
msg_subcommand = msg.params[1] msg_subcommand = msg.params[1]
msg_subparams = msg.params[2:] msg_subparams = msg.params[2:]
if subcommand: if subcommand:
@ -98,7 +96,12 @@ class BaseClientTestCase(_IrcTestCase):
self._setUpServer() self._setUpServer()
def tearDown(self): def tearDown(self):
if self.conn: if self.conn:
self.conn.sendall(b'QUIT :end of test.') try:
self.conn.sendall(b'QUIT :end of test.')
except BrokenPipeError:
pass # client disconnected before we did
except OSError:
pass # the conn was already closed by the test, or something
self.controller.kill() self.controller.kill()
if self.conn: if self.conn:
self.conn_file.close() self.conn_file.close()
@ -110,9 +113,10 @@ class BaseClientTestCase(_IrcTestCase):
self.server = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.server.bind(('', 0)) # Bind any free port self.server.bind(('', 0)) # Bind any free port
self.server.listen(1) self.server.listen(1)
def acceptClient(self, tls_cert=None, tls_key=None): def acceptClient(self, tls_cert=None, tls_key=None, server=None):
"""Make the server accept a client connection. Blocking.""" """Make the server accept a client connection. Blocking."""
(self.conn, addr) = self.server.accept() server = server or self.server
(self.conn, addr) = server.accept()
if tls_cert is None and tls_key is None: if tls_cert is None and tls_key is None:
pass pass
else: else:
@ -150,11 +154,9 @@ class BaseClientTestCase(_IrcTestCase):
if not filter_pred or filter_pred(msg): if not filter_pred or filter_pred(msg):
return msg return msg
def sendLine(self, line): def sendLine(self, line):
ret = self.conn.sendall(line.encode()) self.conn.sendall(line.encode())
assert ret is None
if not line.endswith('\r\n'): if not line.endswith('\r\n'):
ret = self.conn.sendall(b'\r\n') self.conn.sendall(b'\r\n')
assert ret is None
if self.show_io: if self.show_io:
print('{:.3f} S: {}'.format(time.time(), line.strip())) print('{:.3f} S: {}'.format(time.time(), line.strip()))
@ -324,14 +326,17 @@ class BaseServerTestCase(_IrcTestCase):
"""Skip to the point where we are registered """Skip to the point where we are registered
<https://tools.ietf.org/html/rfc2812#section-3.1> <https://tools.ietf.org/html/rfc2812#section-3.1>
""" """
result = []
while True: while True:
m = self.getMessage(client, synchronize=False) m = self.getMessage(client, synchronize=False)
result.append(m)
if m.command == '001': if m.command == '001':
return m return result
def connectClient(self, nick, name=None, capabilities=None, def connectClient(self, nick, name=None, capabilities=None,
skip_if_cap_nak=False): skip_if_cap_nak=False, show_io=None):
client = self.addClient(name) client = self.addClient(name, show_io=show_io)
if capabilities is not None: if capabilities is not None and 0 < len(capabilities):
self.sendLine(client, 'CAP REQ :{}'.format(' '.join(capabilities))) self.sendLine(client, 'CAP REQ :{}'.format(' '.join(capabilities)))
m = self.getRegistrationMessage(client) m = self.getRegistrationMessage(client)
try: try:
@ -349,7 +354,7 @@ class BaseServerTestCase(_IrcTestCase):
self.sendLine(client, 'NICK {}'.format(nick)) self.sendLine(client, 'NICK {}'.format(nick))
self.sendLine(client, 'USER username * * :Realname') self.sendLine(client, 'USER username * * :Realname')
self.skipToWelcome(client) welcome = self.skipToWelcome(client)
self.sendLine(client, 'PING foo') self.sendLine(client, 'PING foo')
# Skip all that happy welcoming stuff # Skip all that happy welcoming stuff
@ -364,6 +369,9 @@ class BaseServerTestCase(_IrcTestCase):
else: else:
(key, value) = (param, None) (key, value) = (param, None)
self.server_support[key] = value self.server_support[key] = value
welcome.append(m)
return welcome
def joinClient(self, client, channel): def joinClient(self, client, channel):
self.sendLine(client, 'JOIN {}'.format(channel)) self.sendLine(client, 'JOIN {}'.format(channel))
@ -373,6 +381,17 @@ class BaseServerTestCase(_IrcTestCase):
'received responses: {list}', 'received responses: {list}',
extra_format=(channel,)) extra_format=(channel,))
def joinChannel(self, client, channel):
self.sendLine(client, 'JOIN {}'.format(channel))
# wait until we see them join the channel
joined = False
while not joined:
for msg in self.getMessages(client):
# todo: also respond to cannot join channel numeric
if msg.command.upper() == 'JOIN' and 0 < len(msg.params) and msg.params[0].lower() == channel.lower():
joined = True
break
class OptionalityHelper: class OptionalityHelper:
def checkSaslSupport(self): def checkSaslSupport(self):
if self.controller.supported_sasl_mechanisms: if self.controller.supported_sasl_mechanisms:
@ -400,6 +419,20 @@ class OptionalityHelper:
return f(self) return f(self)
return newf return newf
def checkCapabilitySupport(self, cap):
if cap in self.controller.supported_capabilities:
return
raise runner.CapabilityNotSupported(cap)
def skipUnlessSupportsCapability(cap):
def decorator(f):
@functools.wraps(f)
def newf(self):
self.checkCapabilitySupport(cap)
return f(self)
return newf
return decorator
class SpecificationSelector: class SpecificationSelector:
def requiredBySpecification(*specifications, strict=False): def requiredBySpecification(*specifications, strict=False):

View File

@ -1,4 +1,5 @@
import ssl import ssl
import sys
import time import time
import socket import socket
from .irc_utils import message_parser from .irc_utils import message_parser
@ -96,7 +97,7 @@ class ClientMock:
ret = self.conn.sendall(encoded_line) ret = self.conn.sendall(encoded_line)
except BrokenPipeError: except BrokenPipeError:
raise ConnectionClosed() raise ConnectionClosed()
if self.ssl: # https://bugs.python.org/issue25951 if sys.version_info <= (3, 6) and self.ssl: # https://bugs.python.org/issue25951
assert ret == len(encoded_line), (ret, repr(encoded_line)) assert ret == len(encoded_line), (ret, repr(encoded_line))
else: else:
assert ret is None, ret assert ret is None, ret

View File

@ -1,5 +1,10 @@
import hashlib
import ecdsa import ecdsa
from ecdsa.util import sigencode_der, sigdecode_der
import base64 import base64
import pyxmpp2_scram as scram
from irctest import cases from irctest import cases
from irctest import authentication from irctest import authentication
from irctest.irc_utils.message_parser import Message from irctest.irc_utils.message_parser import Message
@ -15,6 +20,16 @@ IRX9cyi2wdYg9mUUYyh9GKdBCYHGUJAiCA==
-----END EC PRIVATE KEY----- -----END EC PRIVATE KEY-----
""" """
CHALLENGE = bytes(range(32))
assert len(CHALLENGE) == 32
class IdentityHash:
def __init__(self, data):
self._data = data
def digest(self):
return self._data
class SaslTestCase(cases.BaseClientTestCase, cases.ClientNegociationHelper, class SaslTestCase(cases.BaseClientTestCase, cases.ClientNegociationHelper,
cases.OptionalityHelper): cases.OptionalityHelper):
@cases.OptionalityHelper.skipUnlessHasMechanism('PLAIN') @cases.OptionalityHelper.skipUnlessHasMechanism('PLAIN')
@ -138,14 +153,14 @@ class SaslTestCase(cases.BaseClientTestCase, cases.ClientNegociationHelper,
m = self.getMessage() m = self.getMessage()
self.assertEqual(m, Message([], None, 'AUTHENTICATE', self.assertEqual(m, Message([], None, 'AUTHENTICATE',
['amlsbGVz'])) # jilles ['amlsbGVz'])) # jilles
self.sendLine('AUTHENTICATE Zm9vYmFy') # foobar self.sendLine('AUTHENTICATE {}'.format(base64.b64encode(CHALLENGE).decode('ascii')))
m = self.getMessage() m = self.getMessage()
self.assertMessageEqual(m, command='AUTHENTICATE') self.assertMessageEqual(m, command='AUTHENTICATE')
sk = ecdsa.SigningKey.from_pem(ECDSA_KEY) sk = ecdsa.SigningKey.from_pem(ECDSA_KEY)
vk = sk.get_verifying_key() vk = sk.get_verifying_key()
signature = base64.b64decode(m.params[0]) signature = base64.b64decode(m.params[0])
try: try:
vk.verify(signature, b'foobar') vk.verify(signature, CHALLENGE, hashfunc=IdentityHash, sigdecode=sigdecode_der)
except ecdsa.BadSignatureError: except ecdsa.BadSignatureError:
raise AssertionError('Bad signature') raise AssertionError('Bad signature')
self.sendLine('900 * * foo :You are now logged in.') self.sendLine('900 * * foo :You are now logged in.')
@ -153,6 +168,78 @@ class SaslTestCase(cases.BaseClientTestCase, cases.ClientNegociationHelper,
m = self.negotiateCapabilities(['sasl'], False) m = self.negotiateCapabilities(['sasl'], False)
self.assertEqual(m, Message([], None, 'CAP', ['END'])) self.assertEqual(m, Message([], None, 'CAP', ['END']))
@cases.OptionalityHelper.skipUnlessHasMechanism('SCRAM-SHA-256')
def testScram(self):
"""Test SCRAM-SHA-256 authentication.
"""
auth = authentication.Authentication(
mechanisms=[authentication.Mechanisms.scram_sha_256],
username='jilles',
password='sesame',
)
class PasswdDb:
def get_password(self, *args):
return ('sesame', 'plain')
authenticator = scram.SCRAMServerAuthenticator('SHA-256',
channel_binding=False, password_database=PasswdDb())
m = self.negotiateCapabilities(['sasl'], auth=auth)
self.assertEqual(m, Message([], None, 'AUTHENTICATE', ['SCRAM-SHA-256']))
self.sendLine('AUTHENTICATE +')
m = self.getMessage()
self.assertEqual(m.command, 'AUTHENTICATE', m)
client_first = base64.b64decode(m.params[0])
response = authenticator.start(properties={}, initial_response=client_first)
assert isinstance(response, bytes), response
self.sendLine('AUTHENTICATE :' + base64.b64encode(response).decode())
m = self.getMessage()
self.assertEqual(m.command, 'AUTHENTICATE', m)
msg = base64.b64decode(m.params[0])
r = authenticator.response(msg)
assert isinstance(r, tuple), r
assert len(r) == 2, r
(properties, response) = r
self.sendLine('AUTHENTICATE :' + base64.b64encode(response).decode())
self.assertEqual(properties, {'authzid': None, 'username': 'jilles'})
m = self.getMessage()
self.assertEqual(m.command, 'AUTHENTICATE', m)
self.assertEqual(m.params, ['+'], m)
@cases.OptionalityHelper.skipUnlessHasMechanism('SCRAM-SHA-256')
def testScramBadPassword(self):
"""Test SCRAM-SHA-256 authentication with a bad password.
"""
auth = authentication.Authentication(
mechanisms=[authentication.Mechanisms.scram_sha_256],
username='jilles',
password='sesame',
)
class PasswdDb:
def get_password(self, *args):
return ('notsesame', 'plain')
authenticator = scram.SCRAMServerAuthenticator('SHA-256',
channel_binding=False, password_database=PasswdDb())
m = self.negotiateCapabilities(['sasl'], auth=auth)
self.assertEqual(m, Message([], None, 'AUTHENTICATE', ['SCRAM-SHA-256']))
self.sendLine('AUTHENTICATE +')
m = self.getMessage()
self.assertEqual(m.command, 'AUTHENTICATE', m)
client_first = base64.b64decode(m.params[0])
response = authenticator.start(properties={}, initial_response=client_first)
assert isinstance(response, bytes), response
self.sendLine('AUTHENTICATE :' + base64.b64encode(response).decode())
m = self.getMessage()
self.assertEqual(m.command, 'AUTHENTICATE', m)
msg = base64.b64decode(m.params[0])
with self.assertRaises(scram.NotAuthorizedException):
authenticator.response(msg)
class Irc302SaslTestCase(cases.BaseClientTestCase, cases.ClientNegociationHelper, class Irc302SaslTestCase(cases.BaseClientTestCase, cases.ClientNegociationHelper,
cases.OptionalityHelper): cases.OptionalityHelper):
@cases.OptionalityHelper.skipUnlessHasMechanism('PLAIN') @cases.OptionalityHelper.skipUnlessHasMechanism('PLAIN')

View File

@ -1,3 +1,6 @@
import socket
import ssl
from irctest import tls from irctest import tls
from irctest import cases from irctest import cases
from irctest.exceptions import ConnectionClosed from irctest.exceptions import ConnectionClosed
@ -139,5 +142,94 @@ class TlsTestCase(cases.BaseClientTestCase):
tls_config=tls_config, tls_config=tls_config,
) )
self.acceptClient(tls_cert=BAD_CERT, tls_key=BAD_KEY) self.acceptClient(tls_cert=BAD_CERT, tls_key=BAD_KEY)
with self.assertRaises(ConnectionClosed): with self.assertRaises((ConnectionClosed, ConnectionResetError)):
m = self.getMessage() m = self.getMessage()
class StsTestCase(cases.BaseClientTestCase, cases.OptionalityHelper):
def setUp(self):
super().setUp()
self.insecure_server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.insecure_server.bind(('', 0)) # Bind any free port
self.insecure_server.listen(1)
def tearDown(self):
self.insecure_server.close()
super().tearDown()
@cases.OptionalityHelper.skipUnlessSupportsCapability('sts')
def testSts(self):
tls_config = tls.TlsConfig(
enable=False,
trusted_fingerprints=[GOOD_FINGERPRINT])
# Connect client to insecure server
(hostname, port) = self.insecure_server.getsockname()
self.controller.run(
hostname=hostname,
port=port,
auth=None,
tls_config=tls_config,
)
self.acceptClient(server=self.insecure_server)
# Send STS policy to client
m = self.getMessage()
self.assertEqual(m.command, 'CAP',
'First message is not CAP LS.')
self.assertEqual(m.params[0], 'LS',
'First message is not CAP LS.')
self.sendLine('CAP * LS :sts=port={}'.format(self.server.getsockname()[1]))
# "If the client is not already connected securely to the server
# at the requested hostname, it MUST close the insecure connection
# and reconnect securely on the stated port."
self.acceptClient(tls_cert=GOOD_CERT, tls_key=GOOD_KEY)
# Send the STS policy, over secure connection this time
self.sendLine('CAP * LS :sts=duration=10,port={}'.format(
self.server.getsockname()[1]))
# Make the client reconnect. It should reconnect to the secure server.
self.sendLine('ERROR :closing link')
self.acceptClient()
# Kill the client
self.controller.terminate()
# Run the client, still configured to connect to the insecure server
self.controller.run(
hostname=hostname,
port=port,
auth=None,
tls_config=tls_config,
)
# The client should remember the STS policy and connect to the secure
# server
self.acceptClient()
@cases.OptionalityHelper.skipUnlessSupportsCapability('sts')
def testStsInvalidCertificate(self):
# Connect client to insecure server
(hostname, port) = self.insecure_server.getsockname()
self.controller.run(
hostname=hostname,
port=port,
auth=None,
)
self.acceptClient(server=self.insecure_server)
# Send STS policy to client
m = self.getMessage()
self.assertEqual(m.command, 'CAP',
'First message is not CAP LS.')
self.assertEqual(m.params[0], 'LS',
'First message is not CAP LS.')
self.sendLine('CAP * LS :sts=port={}'.format(self.server.getsockname()[1]))
# The client will reconnect to the TLS port. Unfortunately, it does
# not trust its fingerprint.
with self.assertRaises((ssl.SSLError, socket.error)):
self.acceptClient(tls_cert=GOOD_CERT, tls_key=GOOD_KEY)

View File

@ -45,6 +45,7 @@ TEMPLATE_SSL_CONFIG = """
class CharybdisController(BaseServerController, DirectoryBasedController): class CharybdisController(BaseServerController, DirectoryBasedController):
software_name = 'Charybdis' software_name = 'Charybdis'
supported_sasl_mechanisms = set() supported_sasl_mechanisms = set()
supported_capabilities = set() # Not exhaustive
def create_config(self): def create_config(self):
super().create_config() super().create_config()
with self.open_file('server.conf'): with self.open_file('server.conf'):
@ -75,7 +76,7 @@ class CharybdisController(BaseServerController, DirectoryBasedController):
password_field=password_field, password_field=password_field,
ssl_config=ssl_config, ssl_config=ssl_config,
)) ))
self.proc = subprocess.Popen(['ircd', '-foreground', self.proc = subprocess.Popen(['charybdis', '-foreground',
'-configfile', os.path.join(self.directory, 'server.conf'), '-configfile', os.path.join(self.directory, 'server.conf'),
'-pidfile', os.path.join(self.directory, 'server.pid'), '-pidfile', os.path.join(self.directory, 'server.pid'),
], ],

View File

@ -0,0 +1,45 @@
import subprocess
from irctest.basecontrollers import BaseClientController, NotImplementedByController
class GircController(BaseClientController):
software_name = 'gIRC'
supported_sasl_mechanisms = ['PLAIN']
supported_capabilities = set() # Not exhaustive
def __init__(self):
super().__init__()
self.directory = None
self.proc = None
def kill(self):
if self.proc:
self.proc.terminate()
try:
self.proc.wait(5)
except subprocess.TimeoutExpired:
self.proc.kill()
self.proc = None
def __del__(self):
if self.proc:
self.proc.kill()
if self.directory:
self.directory.cleanup()
def run(self, hostname, port, auth, tls_config):
if tls_config:
print(tls_config)
raise NotImplementedByController('TLS options')
args = ['--host', hostname, '--port', str(port), '--quiet']
if auth and auth.username and auth.password:
args += ['--sasl-name', auth.username]
args += ['--sasl-pass', auth.password]
args += ['--sasl-fail-is-ok']
# Runs a client with the config given as arguments
self.proc = subprocess.Popen(['girc_test', 'connect'] + args)
def get_irctest_controller_class():
return GircController

View File

@ -0,0 +1,88 @@
import os
import time
import shutil
import tempfile
import subprocess
from irctest import client_mock
from irctest import authentication
from irctest.basecontrollers import NotImplementedByController
from irctest.basecontrollers import BaseServerController, DirectoryBasedController
TEMPLATE_CONFIG = """
serverinfo {{
name = "My.Little.Server";
sid = "42X";
description = "test server";
{ssl_config}
}};
listen {{
host = "{hostname}";
port = {port};
}};
general {{
disable_auth = yes;
anti_nick_flood = no;
max_nick_changes = 256;
throttle_count = 512;
}};
auth {{
user = "*";
flags = exceed_limit;
{password_field}
}};
"""
TEMPLATE_SSL_CONFIG = """
rsa_private_key_file = "{key_path}";
ssl_certificate_file = "{pem_path}";
ssl_dh_param_file = "{dh_path}";
"""
class HybridController(BaseServerController, DirectoryBasedController):
software_name = 'Hybrid'
supported_sasl_mechanisms = set()
supported_capabilities = set() # Not exhaustive
def create_config(self):
super().create_config()
with self.open_file('server.conf'):
pass
def run(self, hostname, port, password=None, ssl=False,
valid_metadata_keys=None, invalid_metadata_keys=None):
if valid_metadata_keys or invalid_metadata_keys:
raise NotImplementedByController(
'Defining valid and invalid METADATA keys.')
assert self.proc is None
self.create_config()
self.port = port
password_field = 'password = "{}";'.format(password) if password else ''
if ssl:
self.gen_ssl()
ssl_config = TEMPLATE_SSL_CONFIG.format(
key_path=self.key_path,
pem_path=self.pem_path,
dh_path=self.dh_path,
)
else:
ssl_config = ''
with self.open_file('server.conf') as fd:
fd.write(TEMPLATE_CONFIG.format(
hostname=hostname,
port=port,
password_field=password_field,
ssl_config=ssl_config,
))
self.proc = subprocess.Popen(['ircd', '-foreground',
'-configfile', os.path.join(self.directory, 'server.conf'),
'-pidfile', os.path.join(self.directory, 'server.pid'),
],
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL
)
def get_irctest_controller_class():
return HybridController

View File

@ -30,6 +30,8 @@ TEMPLATE_SSL_CONFIG = """
class InspircdController(BaseServerController, DirectoryBasedController): class InspircdController(BaseServerController, DirectoryBasedController):
software_name = 'InspIRCd' software_name = 'InspIRCd'
supported_sasl_mechanisms = set() supported_sasl_mechanisms = set()
supported_capabilities = set() # Not exhaustive
def create_config(self): def create_config(self):
super().create_config() super().create_config()
with self.open_file('server.conf'): with self.open_file('server.conf'):

View File

@ -2,6 +2,7 @@ import os
import subprocess import subprocess
from irctest import authentication from irctest import authentication
from irctest import tls
from irctest.basecontrollers import NotImplementedByController from irctest.basecontrollers import NotImplementedByController
from irctest.basecontrollers import BaseClientController, DirectoryBasedController from irctest.basecontrollers import BaseClientController, DirectoryBasedController
@ -9,6 +10,7 @@ TEMPLATE_CONFIG = """
supybot.directories.conf: {directory}/conf supybot.directories.conf: {directory}/conf
supybot.directories.data: {directory}/data supybot.directories.data: {directory}/data
supybot.directories.migrations: {directory}/migrations supybot.directories.migrations: {directory}/migrations
supybot.log.level: DEBUG
supybot.log.stdout.level: {loglevel} supybot.log.stdout.level: {loglevel}
supybot.networks: testnet supybot.networks: testnet
@ -27,16 +29,21 @@ supybot.networks.testnet.sasl.mechanisms: {mechanisms}
class LimnoriaController(BaseClientController, DirectoryBasedController): class LimnoriaController(BaseClientController, DirectoryBasedController):
software_name = 'Limnoria' software_name = 'Limnoria'
supported_sasl_mechanisms = { supported_sasl_mechanisms = {
'PLAIN', 'ECDSA-NIST256P-CHALLENGE', 'EXTERNAL', 'PLAIN', 'ECDSA-NIST256P-CHALLENGE', 'SCRAM-SHA-256', 'EXTERNAL',
} }
def create_config(self): supported_capabilities = set(['sts']) # Not exhaustive
super().create_config()
with self.open_file('bot.conf'):
pass
with self.open_file('conf/users.conf'):
pass
def run(self, hostname, port, auth, tls_config): def create_config(self):
create_config = super().create_config()
if create_config:
with self.open_file('bot.conf'):
pass
with self.open_file('conf/users.conf'):
pass
def run(self, hostname, port, auth, tls_config=None):
if tls_config is None:
tls_config = tls.TlsConfig(enable=False, trusted_fingerprints=[])
# Runs a client with the config given as arguments # Runs a client with the config given as arguments
assert self.proc is None assert self.proc is None
self.create_config() self.create_config()
@ -61,7 +68,8 @@ class LimnoriaController(BaseClientController, DirectoryBasedController):
trusted_fingerprints=' '.join(tls_config.trusted_fingerprints) if tls_config else '', trusted_fingerprints=' '.join(tls_config.trusted_fingerprints) if tls_config else '',
)) ))
self.proc = subprocess.Popen(['supybot', self.proc = subprocess.Popen(['supybot',
os.path.join(self.directory, 'bot.conf')]) os.path.join(self.directory, 'bot.conf')],
stderr=subprocess.STDOUT)
def get_irctest_controller_class(): def get_irctest_controller_class():
return LimnoriaController return LimnoriaController

View File

@ -66,6 +66,8 @@ class MammonController(BaseServerController, DirectoryBasedController):
supported_sasl_mechanisms = { supported_sasl_mechanisms = {
'PLAIN', 'ECDSA-NIST256P-CHALLENGE', 'PLAIN', 'ECDSA-NIST256P-CHALLENGE',
} }
supported_capabilities = set() # Not exhaustive
def create_config(self): def create_config(self):
super().create_config() super().create_config()
with self.open_file('server.conf'): with self.open_file('server.conf'):

View File

@ -0,0 +1,144 @@
import os
import subprocess
from irctest.basecontrollers import NotImplementedByController
from irctest.basecontrollers import BaseServerController, DirectoryBasedController
TEMPLATE_CONFIG = """
network:
name: OragonoTest
server:
name: oragono.test
listen:
- "{hostname}:{port}"
{tls}
check-ident: false
max-sendq: 16k
connection-limits:
cidr-len-ipv4: 24
cidr-len-ipv6: 120
ips-per-subnet: 16
exempted:
- "127.0.0.1/8"
- "::1/128"
connection-throttling:
enabled: true
cidr-len-ipv4: 32
cidr-len-ipv6: 128
duration: 10m
max-connections: 12
ban-duration: 10m
ban-message: You have attempted to connect too many times within a short duration. Wait a while, and you will be able to connect.
exempted:
- "127.0.0.1/8"
- "::1/128"
accounts:
registration:
enabled: true
verify-timeout: "120h"
enabled-callbacks:
- none # no verification needed, will instantly register successfully
allow-multiple-per-connection: true
authentication-enabled: true
channels:
registration:
enabled: true
datastore:
path: {directory}/ircd.db
limits:
nicklen: 32
channellen: 64
awaylen: 200
kicklen: 390
topiclen: 390
monitor-entries: 100
whowas-entries: 100
chan-list-modes: 60
linelen:
tags: 2048
rest: 2048
"""
class OragonoController(BaseServerController, DirectoryBasedController):
software_name = 'Oragono'
supported_sasl_mechanisms = {
'PLAIN',
}
supported_capabilities = set() # Not exhaustive
def create_config(self):
super().create_config()
with self.open_file('ircd.yaml'):
pass
def kill_proc(self):
self.proc.kill()
def run(self, hostname, port, password=None, ssl=False,
restricted_metadata_keys=None,
valid_metadata_keys=None, invalid_metadata_keys=None):
if valid_metadata_keys or invalid_metadata_keys:
raise NotImplementedByController(
'Defining valid and invalid METADATA keys.')
if password is not None:
#TODO(dan): fix dis
raise NotImplementedByController('PASS command')
self.create_config()
tls_config = ""
if ssl:
self.key_path = os.path.join(self.directory, 'ssl.key')
self.pem_path = os.path.join(self.directory, 'ssl.pem')
tls_config = 'tls-listeners:\n ":{port}":\n key: {key}\n cert: {pem}'.format(
port=port,
key=self.key_path,
pem=self.pem_path,
)
assert self.proc is None
self.port = port
with self.open_file('server.yml') as fd:
fd.write(TEMPLATE_CONFIG.format(
directory=self.directory,
hostname=hostname,
port=port,
tls=tls_config,
))
subprocess.call(['oragono', 'initdb',
'--conf', os.path.join(self.directory, 'server.yml'), '--quiet'])
subprocess.call(['oragono', 'mkcerts',
'--conf', os.path.join(self.directory, 'server.yml'), '--quiet'])
self.proc = subprocess.Popen(['oragono', 'run',
'--conf', os.path.join(self.directory, 'server.yml'), '--quiet'])
def registerUser(self, case, username, password=None):
# XXX: Move this somewhere else when
# https://github.com/ircv3/ircv3-specifications/pull/152 becomes
# part of the specification
client = case.addClient(show_io=False)
case.sendLine(client, 'CAP LS 302')
case.sendLine(client, 'NICK registration_user')
case.sendLine(client, 'USER r e g :user')
case.sendLine(client, 'CAP END')
while case.getRegistrationMessage(client).command != '001':
pass
case.getMessages(client)
case.sendLine(client, 'ACC REGISTER {} * {}'.format(
username, password))
msg = case.getMessage(client)
assert msg.command == '920', msg
case.sendLine(client, 'QUIT')
case.assertDisconnected(client)
def get_irctest_controller_class():
return OragonoController

View File

@ -24,6 +24,8 @@ class SopelController(BaseClientController):
supported_sasl_mechanisms = { supported_sasl_mechanisms = {
'PLAIN', 'PLAIN',
} }
supported_capabilities = set() # Not exhaustive
def __init__(self): def __init__(self):
super().__init__() super().__init__()
self.filename = next(tempfile._get_candidate_names()) + '.cfg' self.filename = next(tempfile._get_candidate_names()) + '.cfg'

View File

@ -42,7 +42,7 @@ def parse_message(s):
(tags, s) = s.split(' ', 1) (tags, s) = s.split(' ', 1)
tags = parse_tags(tags[1:]) tags = parse_tags(tags[1:])
else: else:
tags = [] tags = {}
if ' :' in s: if ' :' in s:
(other_tokens, trailing_param) = s.split(' :', 1) (other_tokens, trailing_param) = s.split(' :', 1)
tokens = list(filter(bool, other_tokens.split(' '))) + [trailing_param] tokens = list(filter(bool, other_tokens.split(' '))) + [trailing_param]

193
irctest/numerics.py Normal file
View File

@ -0,0 +1,193 @@
# Copyright (c) 2012-2014 Jeremy Latt
# Copyright (c) 2014-2015 Edmund Huber
# Copyright (c) 2016-2017 Daniel Oaks <daniel@danieloaks.net>
# released under the MIT license
# These numerics have been retrieved from:
# http://defs.ircdocs.horse/ and http://modern.ircdocs.horse/
# They're intended to represent a relatively-standard cross-section of the IRC
# server ecosystem out there. Custom numerics will be marked as such.
RPL_WELCOME = "001"
RPL_YOURHOST = "002"
RPL_CREATED = "003"
RPL_MYINFO = "004"
RPL_ISUPPORT = "005"
RPL_SNOMASKIS = "008"
RPL_BOUNCE = "010"
RPL_TRACELINK = "200"
RPL_TRACECONNECTING = "201"
RPL_TRACEHANDSHAKE = "202"
RPL_TRACEUNKNOWN = "203"
RPL_TRACEOPERATOR = "204"
RPL_TRACEUSER = "205"
RPL_TRACESERVER = "206"
RPL_TRACESERVICE = "207"
RPL_TRACENEWTYPE = "208"
RPL_TRACECLASS = "209"
RPL_TRACERECONNECT = "210"
RPL_STATSLINKINFO = "211"
RPL_STATSCOMMANDS = "212"
RPL_ENDOFSTATS = "219"
RPL_UMODEIS = "221"
RPL_SERVLIST = "234"
RPL_SERVLISTEND = "235"
RPL_STATSUPTIME = "242"
RPL_STATSOLINE = "243"
RPL_LUSERCLIENT = "251"
RPL_LUSEROP = "252"
RPL_LUSERUNKNOWN = "253"
RPL_LUSERCHANNELS = "254"
RPL_LUSERME = "255"
RPL_ADMINME = "256"
RPL_ADMINLOC1 = "257"
RPL_ADMINLOC2 = "258"
RPL_ADMINEMAIL = "259"
RPL_TRACELOG = "261"
RPL_TRACEEND = "262"
RPL_TRYAGAIN = "263"
RPL_WHOISCERTFP = "276"
RPL_AWAY = "301"
RPL_USERHOST = "302"
RPL_ISON = "303"
RPL_UNAWAY = "305"
RPL_NOWAWAY = "306"
RPL_WHOISUSER = "311"
RPL_WHOISSERVER = "312"
RPL_WHOISOPERATOR = "313"
RPL_WHOWASUSER = "314"
RPL_ENDOFWHO = "315"
RPL_WHOISIDLE = "317"
RPL_ENDOFWHOIS = "318"
RPL_WHOISCHANNELS = "319"
RPL_LIST = "322"
RPL_LISTEND = "323"
RPL_CHANNELMODEIS = "324"
RPL_UNIQOPIS = "325"
RPL_CHANNELCREATED = "329"
RPL_WHOISACCOUNT = "330"
RPL_NOTOPIC = "331"
RPL_TOPIC = "332"
RPL_TOPICTIME = "333"
RPL_WHOISBOT = "335"
RPL_WHOISACTUALLY = "338"
RPL_INVITING = "341"
RPL_SUMMONING = "342"
RPL_INVITELIST = "346"
RPL_ENDOFINVITELIST = "347"
RPL_EXCEPTLIST = "348"
RPL_ENDOFEXCEPTLIST = "349"
RPL_VERSION = "351"
RPL_WHOREPLY = "352"
RPL_NAMREPLY = "353"
RPL_LINKS = "364"
RPL_ENDOFLINKS = "365"
RPL_ENDOFNAMES = "366"
RPL_BANLIST = "367"
RPL_ENDOFBANLIST = "368"
RPL_ENDOFWHOWAS = "369"
RPL_INFO = "371"
RPL_MOTD = "372"
RPL_ENDOFINFO = "374"
RPL_MOTDSTART = "375"
RPL_ENDOFMOTD = "376"
RPL_YOUREOPER = "381"
RPL_REHASHING = "382"
RPL_YOURESERVICE = "383"
RPL_TIME = "391"
RPL_USERSSTART = "392"
RPL_USERS = "393"
RPL_ENDOFUSERS = "394"
RPL_NOUSERS = "395"
ERR_UNKNOWNERROR = "400"
ERR_NOSUCHNICK = "401"
ERR_NOSUCHSERVER = "402"
ERR_NOSUCHCHANNEL = "403"
ERR_CANNOTSENDTOCHAN = "404"
ERR_TOOMANYCHANNELS = "405"
ERR_WASNOSUCHNICK = "406"
ERR_TOOMANYTARGETS = "407"
ERR_NOSUCHSERVICE = "408"
ERR_NOORIGIN = "409"
ERR_INVALIDCAPCMD = "410"
ERR_NORECIPIENT = "411"
ERR_NOTEXTTOSEND = "412"
ERR_NOTOPLEVEL = "413"
ERR_WILDTOPLEVEL = "414"
ERR_BADMASK = "415"
ERR_UNKNOWNCOMMAND = "421"
ERR_NOMOTD = "422"
ERR_NOADMININFO = "423"
ERR_FILEERROR = "424"
ERR_NONICKNAMEGIVEN = "431"
ERR_ERRONEUSNICKNAME = "432"
ERR_NICKNAMEINUSE = "433"
ERR_NICKCOLLISION = "436"
ERR_UNAVAILRESOURCE = "437"
ERR_REG_UNAVAILABLE = "440"
ERR_USERNOTINCHANNEL = "441"
ERR_NOTONCHANNEL = "442"
ERR_USERONCHANNEL = "443"
ERR_NOLOGIN = "444"
ERR_SUMMONDISABLED = "445"
ERR_USERSDISABLED = "446"
ERR_NOTREGISTERED = "451"
ERR_NEEDMOREPARAMS = "461"
ERR_ALREADYREGISTRED = "462"
ERR_NOPERMFORHOST = "463"
ERR_PASSWDMISMATCH = "464"
ERR_YOUREBANNEDCREEP = "465"
ERR_YOUWILLBEBANNED = "466"
ERR_KEYSET = "467"
ERR_INVALIDUSERNAME = "468"
ERR_CHANNELISFULL = "471"
ERR_UNKNOWNMODE = "472"
ERR_INVITEONLYCHAN = "473"
ERR_BANNEDFROMCHAN = "474"
ERR_BADCHANNELKEY = "475"
ERR_BADCHANMASK = "476"
ERR_NOCHANMODES = "477"
ERR_BANLISTFULL = "478"
ERR_NOPRIVILEGES = "481"
ERR_CHANOPRIVSNEEDED = "482"
ERR_CANTKILLSERVER = "483"
ERR_RESTRICTED = "484"
ERR_UNIQOPPRIVSNEEDED = "485"
ERR_NOOPERHOST = "491"
ERR_UMODEUNKNOWNFLAG = "501"
ERR_USERSDONTMATCH = "502"
ERR_HELPNOTFOUND = "524"
ERR_CANNOTSENDRP = "573"
RPL_WHOISSECURE = "671"
RPL_YOURLANGUAGESARE = "687"
RPL_WHOISLANGUAGE = "690"
RPL_HELPSTART = "704"
RPL_HELPTXT = "705"
RPL_ENDOFHELP = "706"
ERR_NOPRIVS = "723"
RPL_MONONLINE = "730"
RPL_MONOFFLINE = "731"
RPL_MONLIST = "732"
RPL_ENDOFMONLIST = "733"
ERR_MONLISTFULL = "734"
RPL_LOGGEDIN = "900"
RPL_LOGGEDOUT = "901"
ERR_NICKLOCKED = "902"
RPL_SASLSUCCESS = "903"
ERR_SASLFAIL = "904"
ERR_SASLTOOLONG = "905"
ERR_SASLABORTED = "906"
ERR_SASLALREADY = "907"
RPL_SASLMECHS = "908"
RPL_REGISTRATION_SUCCESS = "920"
ERR_ACCOUNT_ALREADY_EXISTS = "921"
ERR_REG_UNSPECIFIED_ERROR = "922"
RPL_VERIFYSUCCESS = "923"
ERR_ACCOUNT_ALREADY_VERIFIED = "924"
ERR_ACCOUNT_INVALID_VERIFY_CODE = "925"
RPL_REG_VERIFICATION_REQUIRED = "927"
ERR_REG_INVALID_CRED_TYPE = "928"
ERR_REG_INVALID_CALLBACK = "929"
ERR_TOOMANYLANGUAGES = "981"
ERR_NOLANGUAGE = "982"

View File

@ -19,6 +19,10 @@ class OptionalSaslMechanismNotSupported(unittest.SkipTest):
def __str__(self): def __str__(self):
return 'Unsupported SASL mechanism: {}'.format(self.args[0]) return 'Unsupported SASL mechanism: {}'.format(self.args[0])
class CapabilityNotSupported(unittest.SkipTest):
def __str__(self):
return 'Unsupported capability: {}'.format(self.args[0])
class NotRequiredBySpecifications(unittest.SkipTest): class NotRequiredBySpecifications(unittest.SkipTest):
def __str__(self): def __str__(self):
return 'Tests not required by the set of tested specification(s).' return 'Tests not required by the set of tested specification(s).'

View File

@ -3,8 +3,6 @@
""" """
from irctest import cases from irctest import cases
from irctest.client_mock import NoMessageException
from irctest.basecontrollers import NotImplementedByController
class AccountTagTestCase(cases.BaseServerTestCase, cases.OptionalityHelper): class AccountTagTestCase(cases.BaseServerTestCase, cases.OptionalityHelper):
def connectRegisteredClient(self, nick): def connectRegisteredClient(self, nick):

View File

@ -1,5 +1,4 @@
from irctest import cases from irctest import cases
from irctest.irc_utils.message_parser import Message
class CapTestCase(cases.BaseServerTestCase): class CapTestCase(cases.BaseServerTestCase):
@cases.SpecificationSelector.requiredBySpecification('IRCv3.1') @cases.SpecificationSelector.requiredBySpecification('IRCv3.1')

View File

@ -7,7 +7,7 @@ from irctest import cases
from irctest import client_mock from irctest import client_mock
from irctest import runner from irctest import runner
from irctest.irc_utils import ambiguities from irctest.irc_utils import ambiguities
from irctest.irc_utils.message_parser import Message from irctest.numerics import RPL_NOTOPIC, RPL_NAMREPLY, RPL_INVITING, ERR_NOSUCHCHANNEL, ERR_NOTONCHANNEL, ERR_CHANOPRIVSNEEDED, ERR_NOSUCHNICK, ERR_INVITEONLYCHAN
class JoinTestCase(cases.BaseServerTestCase): class JoinTestCase(cases.BaseServerTestCase):
@cases.SpecificationSelector.requiredBySpecification('RFC1459', 'RFC2812', @cases.SpecificationSelector.requiredBySpecification('RFC1459', 'RFC2812',
@ -139,6 +139,30 @@ class JoinTestCase(cases.BaseServerTestCase):
'"foo" with an optional "+" or "@" prefix, but got: ' '"foo" with an optional "+" or "@" prefix, but got: '
'{msg}') '{msg}')
@cases.SpecificationSelector.requiredBySpecification('RFC1459', 'RFC2812')
def testNormalPart(self):
self.connectClient('bar')
self.sendLine(1, 'JOIN #chan')
m = self.getMessage(1)
self.assertMessageEqual(m, command='JOIN', params=['#chan'])
self.connectClient('baz')
self.sendLine(2, 'JOIN #chan')
m = self.getMessage(2)
self.assertMessageEqual(m, command='JOIN', params=['#chan'])
# skip the rest of the JOIN burst:
self.getMessages(1)
self.getMessages(2)
self.sendLine(1, 'PART #chan :bye everyone')
# both the PART'ing client and the other channel member should receive a PART line:
m = self.getMessage(1)
self.assertMessageEqual(m, command='PART', params=['#chan', 'bye everyone'])
m = self.getMessage(2)
self.assertMessageEqual(m, command='PART', params=['#chan', 'bye everyone'])
@cases.SpecificationSelector.requiredBySpecification('RFC1459', 'RFC2812') @cases.SpecificationSelector.requiredBySpecification('RFC1459', 'RFC2812')
def testTopic(self): def testTopic(self):
"""“Once a user has joined a channel, he receives information about """“Once a user has joined a channel, he receives information about
@ -148,14 +172,17 @@ class JoinTestCase(cases.BaseServerTestCase):
and <https://tools.ietf.org/html/rfc2812#section-3.2.1> and <https://tools.ietf.org/html/rfc2812#section-3.2.1>
""" """
self.connectClient('foo') self.connectClient('foo')
self.joinChannel(1, '#chan')
self.connectClient('bar') self.connectClient('bar')
self.sendLine(1, 'JOIN #chan') self.joinChannel(2, '#chan')
self.sendLine(2, 'JOIN #chan')
# TODO: check foo is opped OR +t is unset
self.getMessages(1) self.getMessages(1)
self.getMessages(2) self.getMessages(2)
self.getMessages(1) self.getMessages(1)
self.getMessages(2)
# TODO: check foo is opped OR +t is unset
self.sendLine(1, 'TOPIC #chan :T0P1C') self.sendLine(1, 'TOPIC #chan :T0P1C')
try: try:
@ -181,14 +208,15 @@ class JoinTestCase(cases.BaseServerTestCase):
and <https://tools.ietf.org/html/rfc2812#section-3.2.1> and <https://tools.ietf.org/html/rfc2812#section-3.2.1>
""" """
self.connectClient('foo') self.connectClient('foo')
self.joinChannel(1, '#chan')
self.connectClient('bar') self.connectClient('bar')
self.sendLine(1, 'JOIN #chan') self.joinChannel(2, '#chan')
self.sendLine(2, 'JOIN #chan')
# TODO: check foo is opped
self.getMessages(1) self.getMessages(1)
self.getMessages(2) self.getMessages(2)
self.getMessages(1)
# TODO: check foo is opped
self.sendLine(1, 'MODE #chan +t') self.sendLine(1, 'MODE #chan +t')
try: try:
@ -220,7 +248,55 @@ class JoinTestCase(cases.BaseServerTestCase):
m = self.getMessage(1) m = self.getMessage(1)
self.assertMessageEqual(m, command='TOPIC', params=['#chan', 'T0P1C']) self.assertMessageEqual(m, command='TOPIC', params=['#chan', 'T0P1C'])
@cases.SpecificationSelector.requiredBySpecification('RFC2812')
def testTopicNonexistentChannel(self):
"""RFC2812 specifies ERR_NOTONCHANNEL as the correct response to TOPIC
on a nonexistent channel. The modern spec prefers ERR_NOSUCHCHANNEL.
<https://tools.ietf.org/html/rfc2812#section-3.2.4>
<http://modern.ircdocs.horse/#topic-message>
"""
self.connectClient('foo')
self.sendLine(1, 'TOPIC #chan')
m = self.getMessage(1)
# either 403 ERR_NOSUCHCHANNEL or 443 ERR_NOTONCHANNEL
self.assertIn(m.command, ('403', '443'))
@cases.SpecificationSelector.requiredBySpecification('RFC2812')
def testUnsetTopicResponses(self):
"""Test various cases related to RPL_NOTOPIC with set and unset topics."""
self.connectClient('bar')
self.sendLine(1, 'JOIN #test')
messages = self.getMessages(1)
# shouldn't send RPL_NOTOPIC for a new channel
self.assertNotIn(RPL_NOTOPIC, [m.command for m in messages])
self.connectClient('baz')
self.sendLine(2, 'JOIN #test')
messages = self.getMessages(2)
# topic is still unset, shouldn't send RPL_NOTOPIC on initial join
self.assertNotIn(RPL_NOTOPIC, [m.command for m in messages])
self.sendLine(2, 'TOPIC #test')
messages = self.getMessages(2)
# explicit TOPIC should receive RPL_NOTOPIC
self.assertIn(RPL_NOTOPIC, [m.command for m in messages])
self.sendLine(1, 'TOPIC #test :new topic')
self.getMessages(1)
# client 2 should get the new TOPIC line
messages = [message for message in self.getMessages(2) if message.command == 'TOPIC']
self.assertEqual(len(messages), 1)
self.assertEqual(messages[0].params, ['#test', 'new topic'])
# unset the topic:
self.sendLine(1, 'TOPIC #test :')
self.getMessages(1)
self.connectClient('qux')
self.sendLine(3, 'join #test')
messages = self.getMessages(3)
# topic is once again unset, shouldn't send RPL_NOTOPIC on initial join
self.assertNotIn(RPL_NOTOPIC, [m.command for m in messages])
@cases.SpecificationSelector.requiredBySpecification('RFC1459', 'RFC2812') @cases.SpecificationSelector.requiredBySpecification('RFC1459', 'RFC2812')
def testListEmpty(self): def testListEmpty(self):
@ -282,12 +358,15 @@ class JoinTestCase(cases.BaseServerTestCase):
and <https://tools.ietf.org/html/rfc2812#section-3.2.1> and <https://tools.ietf.org/html/rfc2812#section-3.2.1>
""" """
self.connectClient('foo') self.connectClient('foo')
self.joinChannel(1, '#chan')
self.connectClient('bar') self.connectClient('bar')
self.joinChannel(2, '#chan')
self.connectClient('baz') self.connectClient('baz')
self.sendLine(1, 'JOIN #chan') self.joinChannel(3, '#chan')
# TODO: check foo is an operator # TODO: check foo is an operator
self.sendLine(2, 'JOIN #chan')
self.sendLine(3, 'JOIN #chan')
import time import time
time.sleep(0.1) time.sleep(0.1)
@ -311,6 +390,61 @@ class JoinTestCase(cases.BaseServerTestCase):
self.assertMessageEqual(m, command='KICK', self.assertMessageEqual(m, command='KICK',
params=['#chan', 'bar', 'bye']) params=['#chan', 'bar', 'bye'])
@cases.SpecificationSelector.requiredBySpecification('RFC2812')
def testKickPrivileges(self):
"""Test who has the ability to kick / what error codes are sent for invalid kicks."""
self.connectClient('foo')
self.sendLine(1, 'JOIN #chan')
self.getMessages(1)
self.connectClient('bar')
self.sendLine(2, 'JOIN #chan')
messages = self.getMessages(2)
names = set()
for message in messages:
if message.command == RPL_NAMREPLY:
names.update(set(message.params[-1].split()))
# assert foo is opped
self.assertIn('@foo', names, f'unexpected names: {names}')
self.connectClient('baz')
self.sendLine(3, 'KICK #chan bar')
replies = set(m.command for m in self.getMessages(3))
self.assertTrue(
ERR_NOTONCHANNEL in replies or ERR_CHANOPRIVSNEEDED in replies or ERR_NOSUCHCHANNEL in replies,
f'did not receive acceptable error code for kick from outside channel: {replies}')
self.joinChannel(3, '#chan')
self.getMessages(3)
self.sendLine(3, 'KICK #chan bar')
replies = set(m.command for m in self.getMessages(3))
# now we're a channel member so we should receive ERR_CHANOPRIVSNEEDED
self.assertIn(ERR_CHANOPRIVSNEEDED, replies)
self.sendLine(1, 'MODE #chan +o baz')
self.getMessages(1)
# should be able to kick an unprivileged user:
self.sendLine(3, 'KICK #chan bar')
# should be able to kick an operator:
self.sendLine(3, 'KICK #chan foo')
baz_replies = set(m.command for m in self.getMessages(3))
self.assertNotIn(ERR_CHANOPRIVSNEEDED, baz_replies)
kick_targets = [m.params[1] for m in self.getMessages(1) if m.command == 'KICK']
# foo should see bar and foo being kicked
self.assertTrue(any(target.startswith('foo') for target in kick_targets), f'unexpected kick targets: {kick_targets}')
self.assertTrue(any(target.startswith('bar') for target in kick_targets), f'unexpected kick targets: {kick_targets}')
@cases.SpecificationSelector.requiredBySpecification('RFC2812')
def testKickNonexistentChannel(self):
"""“Kick command [...] Numeric replies: [...] ERR_NOSUCHCHANNEL."""
self.connectClient('foo')
self.sendLine(1, 'KICK #chan nick')
m = self.getMessage(1)
# should return ERR_NOSUCHCHANNEL
self.assertMessageEqual(m, command='403')
@cases.SpecificationSelector.requiredBySpecification('RFC2812') @cases.SpecificationSelector.requiredBySpecification('RFC2812')
def testDoubleKickMessages(self): def testDoubleKickMessages(self):
"""“The server MUST NOT send KICK messages with multiple channels or """“The server MUST NOT send KICK messages with multiple channels or
@ -319,24 +453,24 @@ class JoinTestCase(cases.BaseServerTestCase):
-- https://tools.ietf.org/html/rfc2812#section-3.2.8 -- https://tools.ietf.org/html/rfc2812#section-3.2.8
""" """
self.connectClient('foo') self.connectClient('foo')
self.joinChannel(1, '#chan')
self.connectClient('bar') self.connectClient('bar')
self.joinChannel(2, '#chan')
self.connectClient('baz') self.connectClient('baz')
self.joinChannel(3, '#chan')
self.connectClient('qux') self.connectClient('qux')
self.sendLine(1, 'JOIN #chan') self.joinChannel(4, '#chan')
# TODO: check foo is an operator # TODO: check foo is an operator
self.sendLine(2, 'JOIN #chan')
self.sendLine(3, 'JOIN #chan')
self.sendLine(4, 'JOIN #chan')
# Synchronize # Synchronize
self.getMessages(1) self.getMessages(1)
self.getMessages(2) self.getMessages(2)
self.getMessages(3) self.getMessages(3)
self.getMessages(4) self.getMessages(4)
self.getMessages(1)
self.getMessages(2)
self.getMessages(3)
self.getMessages(4)
self.sendLine(1, 'KICK #chan,#chan bar,baz :bye') self.sendLine(1, 'KICK #chan,#chan bar,baz :bye')
try: try:
@ -413,12 +547,14 @@ class JoinTestCase(cases.BaseServerTestCase):
'got this instead: {msg}') 'got this instead: {msg}')
class testChannelCaseSensitivity(cases.BaseServerTestCase): class testChannelCaseSensitivity(cases.BaseServerTestCase):
def _testChannelsEquivalent(name1, name2): def _testChannelsEquivalent(casemapping, name1, name2):
@cases.SpecificationSelector.requiredBySpecification('RFC1459', 'RFC2812', @cases.SpecificationSelector.requiredBySpecification('RFC1459', 'RFC2812',
strict=True) strict=True)
def f(self): def f(self):
self.connectClient('foo') self.connectClient('foo')
self.connectClient('bar') self.connectClient('bar')
if self.server_support['CASEMAPPING'] != casemapping:
raise runner.NotImplementedByController('Casemapping {} not implemented'.format(casemapping))
self.joinClient(1, name1) self.joinClient(1, name1)
self.joinClient(2, name2) self.joinClient(2, name2)
try: try:
@ -431,12 +567,14 @@ class testChannelCaseSensitivity(cases.BaseServerTestCase):
.format(name1, name2)) .format(name1, name2))
f.__name__ = 'testEquivalence__{}__{}'.format(name1, name2) f.__name__ = 'testEquivalence__{}__{}'.format(name1, name2)
return f return f
def _testChannelsNotEquivalent(name1, name2): def _testChannelsNotEquivalent(casemapping, name1, name2):
@cases.SpecificationSelector.requiredBySpecification('RFC1459', 'RFC2812', @cases.SpecificationSelector.requiredBySpecification('RFC1459', 'RFC2812',
strict=True) strict=True)
def f(self): def f(self):
self.connectClient('foo') self.connectClient('foo')
self.connectClient('bar') self.connectClient('bar')
if self.server_support['CASEMAPPING'] != casemapping:
raise runner.NotImplementedByController('Casemapping {} not implemented'.format(casemapping))
self.joinClient(1, name1) self.joinClient(1, name1)
self.joinClient(2, name2) self.joinClient(2, name2)
try: try:
@ -452,7 +590,66 @@ class testChannelCaseSensitivity(cases.BaseServerTestCase):
f.__name__ = 'testEquivalence__{}__{}'.format(name1, name2) f.__name__ = 'testEquivalence__{}__{}'.format(name1, name2)
return f return f
testSimpleEquivalent = _testChannelsEquivalent('#Foo', '#foo') testAsciiSimpleEquivalent = _testChannelsEquivalent('ascii', '#Foo', '#foo')
testSimpleNotEquivalent = _testChannelsNotEquivalent('#Foo', '#fooa') testAsciiSimpleNotEquivalent = _testChannelsNotEquivalent('ascii', '#Foo', '#fooa')
testFancyEquivalent = _testChannelsEquivalent('#F]|oo{', '#f}\\oo[')
testFancyNotEquivalent = _testChannelsEquivalent('#F}o\\o[', '#f]o|o{') testRfcSimpleEquivalent = _testChannelsEquivalent('rfc1459', '#Foo', '#foo')
testRfcSimpleNotEquivalent = _testChannelsNotEquivalent('rfc1459', '#Foo', '#fooa')
testRfcFancyEquivalent = _testChannelsEquivalent('rfc1459', '#F]|oo{', '#f}\\oo[')
testRfcFancyNotEquivalent = _testChannelsEquivalent('rfc1459', '#F}o\\o[', '#f]o|o{')
class InviteTestCase(cases.BaseServerTestCase):
@cases.SpecificationSelector.requiredBySpecification('IRCv3.2')
def testInvites(self):
"""Test some basic functionality related to INVITE and the +i mode."""
self.connectClient('foo')
self.joinChannel(1, '#chan')
self.sendLine(1, 'MODE #chan +i')
self.getMessages(1)
self.sendLine(1, 'INVITE bar #chan')
m = self.getMessage(1)
self.assertEqual(m.command, ERR_NOSUCHNICK)
self.connectClient('bar')
self.sendLine(2, 'JOIN #chan')
m = self.getMessage(2)
self.assertEqual(m.command, ERR_INVITEONLYCHAN)
self.sendLine(1, 'INVITE bar #chan')
m = self.getMessage(1)
self.assertEqual(m.command, RPL_INVITING)
# modern/ircv3 param order: inviter, invitee, channel
self.assertEqual(m.params, ['foo', 'bar', '#chan'])
m = self.getMessage(2)
self.assertEqual(m.command, 'INVITE')
self.assertTrue(m.prefix.startswith("foo")) # nickmask of inviter
self.assertEqual(m.params, ['bar', '#chan'])
# we were invited, so join should succeed now
self.joinChannel(2, '#chan')
class ChannelQuitTestCase(cases.BaseServerTestCase):
@cases.SpecificationSelector.requiredBySpecification('RFC2812')
def testQuit(self):
"""“Once a user has joined a channel, he receives information about
all commands his server receives affecting the channel. This
includes [...] QUIT”
<https://tools.ietf.org/html/rfc2812#section-3.2.1>
"""
self.connectClient('bar')
self.joinChannel(1, '#chan')
self.connectClient('qux')
self.sendLine(2, 'JOIN #chan')
self.getMessages(2)
self.getMessages(1)
self.sendLine(2, 'QUIT :qux out')
self.getMessages(2)
m = self.getMessage(1)
self.assertEqual(m.command, 'QUIT')
self.assertTrue(m.prefix.startswith('qux')) # nickmask of quitter
self.assertIn('qux out', m.params[0])

View File

@ -4,9 +4,6 @@ Tests section 4.1 of RFC 1459.
""" """
from irctest import cases from irctest import cases
from irctest import authentication
from irctest.irc_utils.message_parser import Message
from irctest.basecontrollers import NotImplementedByController
from irctest.client_mock import ConnectionClosed from irctest.client_mock import ConnectionClosed
class PasswordedConnectionRegistrationTestCase(cases.BaseServerTestCase): class PasswordedConnectionRegistrationTestCase(cases.BaseServerTestCase):
@ -31,6 +28,16 @@ class PasswordedConnectionRegistrationTestCase(cases.BaseServerTestCase):
self.assertNotEqual(m.command, '001', self.assertNotEqual(m.command, '001',
msg='Got 001 after NICK+USER but missing PASS') msg='Got 001 after NICK+USER but missing PASS')
@cases.SpecificationSelector.requiredBySpecification('RFC1459', 'RFC2812')
def testWrongPassword(self):
self.addClient()
self.sendLine(1, 'PASS {}'.format(self.password + "garbage"))
self.sendLine(1, 'NICK foo')
self.sendLine(1, 'USER username * * :Realname')
m = self.getRegistrationMessage(1)
self.assertNotEqual(m.command, '001',
msg='Got 001 after NICK+USER but incorrect PASS')
@cases.SpecificationSelector.requiredBySpecification('RFC1459', 'RFC2812') @cases.SpecificationSelector.requiredBySpecification('RFC1459', 'RFC2812')
def testPassAfterNickuser(self): def testPassAfterNickuser(self):
"""“The password can and must be set before any attempt to register """“The password can and must be set before any attempt to register

View File

@ -22,7 +22,7 @@ class EchoMessageTestCase(cases.BaseServerTestCase):
# TODO: check also without this # TODO: check also without this
self.sendLine(1, 'CAP REQ :echo-message{}'.format( self.sendLine(1, 'CAP REQ :echo-message{}'.format(
' server-time' if server_time else '')) ' server-time' if server_time else ''))
m = self.getRegistrationMessage(1) self.getRegistrationMessage(1)
# TODO: Remove this one the trailing space issue is fixed in Charybdis # TODO: Remove this one the trailing space issue is fixed in Charybdis
# and Mammon: # and Mammon:
#self.assertMessageEqual(m, command='CAP', #self.assertMessageEqual(m, command='CAP',
@ -38,7 +38,7 @@ class EchoMessageTestCase(cases.BaseServerTestCase):
self.sendLine(1, 'JOIN #chan') self.sendLine(1, 'JOIN #chan')
if not solo: if not solo:
capabilities = ['server-time'] if server_time else [] capabilities = ['server-time'] if server_time else None
self.connectClient('qux', capabilities=capabilities) self.connectClient('qux', capabilities=capabilities)
self.sendLine(2, 'JOIN #chan') self.sendLine(2, 'JOIN #chan')
@ -67,10 +67,8 @@ class EchoMessageTestCase(cases.BaseServerTestCase):
'messages differ: {} {}', 'messages differ: {} {}',
extra_format=(m1, m2)) extra_format=(m1, m2))
if server_time: if server_time:
self.assertEqual(m1.tags, m2.tags, self.assertIn('time', m1.tags, fail_msg='Echoed message is missing server time: {}', extra_format=(m1,))
fail_msg='Tags of forwarded and echoed ' self.assertIn('time', m2.tags, fail_msg='Forwarded message is missing server time: {}', extra_format=(m2,))
'messages differ: {} {}',
extra_format=(m1, m2))
return f return f
testEchoMessagePrivmsgNoServerTime = _testEchoMessage('PRIVMSG', False, False) testEchoMessagePrivmsgNoServerTime = _testEchoMessage('PRIVMSG', False, False)

View File

@ -3,7 +3,6 @@
""" """
from irctest import cases from irctest import cases
from irctest.irc_utils.message_parser import Message
class MetadataTestCase(cases.BaseServerTestCase, cases.OptionalityHelper): class MetadataTestCase(cases.BaseServerTestCase, cases.OptionalityHelper):
def connectRegisteredClient(self, nick): def connectRegisteredClient(self, nick):
@ -29,10 +28,9 @@ class MetadataTestCase(cases.BaseServerTestCase, cases.OptionalityHelper):
def testNotLoggedIn(self): def testNotLoggedIn(self):
self.connectClient('foo', capabilities=['extended-join'], self.connectClient('foo', capabilities=['extended-join'],
skip_if_cap_nak=True) skip_if_cap_nak=True)
self.sendLine(1, 'JOIN #chan') self.joinChannel(1, '#chan')
self.getMessages(1)
self.connectClient('bar') self.connectClient('bar')
self.sendLine(2, 'JOIN #chan') self.joinChannel(2, '#chan')
m = self.getMessage(1) m = self.getMessage(1)
self.assertMessageEqual(m, command='JOIN', self.assertMessageEqual(m, command='JOIN',
params=['#chan', '*', 'Realname'], params=['#chan', '*', 'Realname'],
@ -41,16 +39,15 @@ class MetadataTestCase(cases.BaseServerTestCase, cases.OptionalityHelper):
@cases.SpecificationSelector.requiredBySpecification('IRCv3.1') @cases.SpecificationSelector.requiredBySpecification('IRCv3.1')
@cases.OptionalityHelper.skipUnlessHasMechanism('PLAIN') @cases.OptionalityHelper.skipUnlessHasMechanism('PLAIN')
def testNotLoggedIn(self): def testLoggedIn(self):
self.connectClient('foo', capabilities=['extended-join'], self.connectClient('foo', capabilities=['extended-join'],
skip_if_cap_nak=True) skip_if_cap_nak=True)
self.sendLine(1, 'JOIN #chan') self.joinChannel(1, '#chan')
self.getMessages(1)
self.controller.registerUser(self, 'jilles', 'sesame') self.controller.registerUser(self, 'jilles', 'sesame')
self.connectRegisteredClient('bar') self.connectRegisteredClient('bar')
self.joinChannel(2, '#chan')
self.sendLine(2, 'JOIN #chan')
m = self.getMessage(1) m = self.getMessage(1)
self.assertMessageEqual(m, command='JOIN', self.assertMessageEqual(m, command='JOIN',
params=['#chan', 'jilles', 'Realname'], params=['#chan', 'jilles', 'Realname'],

View File

@ -0,0 +1,234 @@
"""
<https://ircv3.net/specs/extensions/labeled-response.html>
"""
from irctest import cases
from irctest.basecontrollers import NotImplementedByController
class LabeledResponsesTestCase(cases.BaseServerTestCase, cases.OptionalityHelper):
@cases.SpecificationSelector.requiredBySpecification('IRCv3.2')
def testLabeledPrivmsgResponsesToMultipleClients(self):
self.connectClient('foo', capabilities=['batch', 'echo-message', 'draft/labeled-response'], skip_if_cap_nak=True)
self.getMessages(1)
self.connectClient('bar', capabilities=['batch', 'echo-message', 'draft/labeled-response'], skip_if_cap_nak=True)
self.getMessages(2)
self.connectClient('carl', capabilities=['batch', 'echo-message', 'draft/labeled-response'], skip_if_cap_nak=True)
self.getMessages(3)
self.connectClient('alice', capabilities=['batch', 'echo-message', 'draft/labeled-response'], skip_if_cap_nak=True)
self.getMessages(4)
self.sendLine(1, '@draft/label=12345 PRIVMSG bar,carl,alice :hi')
m = self.getMessage(1)
m2 = self.getMessage(2)
m3 = self.getMessage(3)
m4 = self.getMessage(4)
# ensure the label isn't sent to recipients
self.assertMessageEqual(m2, command='PRIVMSG', fail_msg='No PRIVMSG received by target 1 after sending one out')
self.assertNotIn('draft/label', m2.tags, m2, fail_msg="When sending a PRIVMSG with a label, the target users shouldn't receive the label (only the sending user should): {msg}")
self.assertMessageEqual(m3, command='PRIVMSG', fail_msg='No PRIVMSG received by target 1 after sending one out')
self.assertNotIn('draft/label', m3.tags, m3, fail_msg="When sending a PRIVMSG with a label, the target users shouldn't receive the label (only the sending user should): {msg}")
self.assertMessageEqual(m4, command='PRIVMSG', fail_msg='No PRIVMSG received by target 1 after sending one out')
self.assertNotIn('draft/label', m4.tags, m4, fail_msg="When sending a PRIVMSG with a label, the target users shouldn't receive the label (only the sending user should): {msg}")
self.assertMessageEqual(m, command='BATCH', fail_msg='No BATCH echo received after sending one out')
@cases.SpecificationSelector.requiredBySpecification('IRCv3.2')
def testLabeledPrivmsgResponsesToClient(self):
self.connectClient('foo', capabilities=['batch', 'echo-message', 'draft/labeled-response'], skip_if_cap_nak=True)
self.getMessages(1)
self.connectClient('bar', capabilities=['batch', 'echo-message', 'draft/labeled-response'], skip_if_cap_nak=True)
self.getMessages(2)
self.sendLine(1, '@draft/label=12345 PRIVMSG bar :hi')
m = self.getMessage(1)
m2 = self.getMessage(2)
# ensure the label isn't sent to recipient
self.assertMessageEqual(m2, command='PRIVMSG', fail_msg='No PRIVMSG received by the target after sending one out')
self.assertNotIn('draft/label', m2.tags, m2, fail_msg="When sending a PRIVMSG with a label, the target user shouldn't receive the label (only the sending user should): {msg}")
self.assertMessageEqual(m, command='PRIVMSG', fail_msg='No PRIVMSG echo received after sending one out')
self.assertIn('draft/label', m.tags, m, fail_msg="When sending a PRIVMSG with a label, the echo'd message didn't contain the label at all: {msg}")
self.assertEqual(m.tags['draft/label'], '12345', m, fail_msg="Echo'd PRIVMSG to a client did not contain the same label we sent it with(should be '12345'): {msg}")
@cases.SpecificationSelector.requiredBySpecification('IRCv3.2')
def testLabeledPrivmsgResponsesToChannel(self):
self.connectClient('foo', capabilities=['batch', 'echo-message', 'draft/labeled-response'], skip_if_cap_nak=True)
self.getMessages(1)
self.connectClient('bar', capabilities=['batch', 'echo-message', 'draft/labeled-response'], skip_if_cap_nak=True)
self.getMessages(2)
# join channels
self.sendLine(1, 'JOIN #test')
self.getMessages(1)
self.sendLine(2, 'JOIN #test')
self.getMessages(2)
self.getMessages(1)
self.sendLine(1, '@draft/label=12345;+draft/reply=123;+draft/react=l😃l PRIVMSG #test :hi')
ms = self.getMessage(1)
mt = self.getMessage(2)
# ensure the label isn't sent to recipient
self.assertMessageEqual(mt, command='PRIVMSG', fail_msg='No PRIVMSG received by the target after sending one out')
self.assertNotIn('draft/label', mt.tags, mt, fail_msg="When sending a PRIVMSG with a label, the target user shouldn't receive the label (only the sending user should): {msg}")
# ensure sender correctly receives msg
self.assertMessageEqual(ms, command='PRIVMSG', fail_msg="Got a message back that wasn't a PRIVMSG")
self.assertIn('draft/label', ms.tags, ms, fail_msg="When sending a PRIVMSG with a label, the source user should receive the label but didn't: {msg}")
self.assertEqual(ms.tags['draft/label'], '12345', ms, fail_msg="Echo'd label doesn't match the label we sent (should be '12345'): {msg}")
@cases.SpecificationSelector.requiredBySpecification('IRCv3.2')
def testLabeledPrivmsgResponsesToSelf(self):
self.connectClient('foo', capabilities=['batch', 'echo-message', 'draft/labeled-response'], skip_if_cap_nak=True)
self.getMessages(1)
self.sendLine(1, '@draft/label=12345 PRIVMSG foo :hi')
m1 = self.getMessage(1)
m2 = self.getMessage(1)
number_of_labels = 0
for m in [m1, m2]:
self.assertMessageEqual(m, command='PRIVMSG', fail_msg="Got a message back that wasn't a PRIVMSG")
if 'draft/label' in m.tags:
number_of_labels += 1
self.assertEqual(m.tags['draft/label'], '12345', m, fail_msg="Echo'd label doesn't match the label we sent (should be '12345'): {msg}")
self.assertEqual(number_of_labels, 1, m1, fail_msg="When sending a PRIVMSG to self with echo-message, we only expect one message to contain the label. Instead, {} messages had the label".format(number_of_labels))
@cases.SpecificationSelector.requiredBySpecification('IRCv3.2')
def testLabeledNoticeResponsesToClient(self):
self.connectClient('foo', capabilities=['batch', 'echo-message', 'draft/labeled-response'], skip_if_cap_nak=True)
self.getMessages(1)
self.connectClient('bar', capabilities=['batch', 'echo-message', 'draft/labeled-response'], skip_if_cap_nak=True)
self.getMessages(2)
self.sendLine(1, '@draft/label=12345 NOTICE bar :hi')
m = self.getMessage(1)
m2 = self.getMessage(2)
# ensure the label isn't sent to recipient
self.assertMessageEqual(m2, command='NOTICE', fail_msg='No NOTICE received by the target after sending one out')
self.assertNotIn('draft/label', m2.tags, m2, fail_msg="When sending a NOTICE with a label, the target user shouldn't receive the label (only the sending user should): {msg}")
self.assertMessageEqual(m, command='NOTICE', fail_msg='No NOTICE echo received after sending one out')
self.assertIn('draft/label', m.tags, m, fail_msg="When sending a NOTICE with a label, the echo'd message didn't contain the label at all: {msg}")
self.assertEqual(m.tags['draft/label'], '12345', m, fail_msg="Echo'd NOTICE to a client did not contain the same label we sent it with(should be '12345'): {msg}")
@cases.SpecificationSelector.requiredBySpecification('IRCv3.2')
def testLabeledNoticeResponsesToChannel(self):
self.connectClient('foo', capabilities=['batch', 'echo-message', 'draft/labeled-response'], skip_if_cap_nak=True)
self.getMessages(1)
self.connectClient('bar', capabilities=['batch', 'echo-message', 'draft/labeled-response'], skip_if_cap_nak=True)
self.getMessages(2)
# join channels
self.sendLine(1, 'JOIN #test')
self.getMessages(1)
self.sendLine(2, 'JOIN #test')
self.getMessages(2)
self.getMessages(1)
self.sendLine(1, '@draft/label=12345;+draft/reply=123;+draft/react=l😃l NOTICE #test :hi')
ms = self.getMessage(1)
mt = self.getMessage(2)
# ensure the label isn't sent to recipient
self.assertMessageEqual(mt, command='NOTICE', fail_msg='No NOTICE received by the target after sending one out')
self.assertNotIn('draft/label', mt.tags, mt, fail_msg="When sending a NOTICE with a label, the target user shouldn't receive the label (only the sending user should): {msg}")
# ensure sender correctly receives msg
self.assertMessageEqual(ms, command='NOTICE', fail_msg="Got a message back that wasn't a NOTICE")
self.assertIn('draft/label', ms.tags, ms, fail_msg="When sending a NOTICE with a label, the source user should receive the label but didn't: {msg}")
self.assertEqual(ms.tags['draft/label'], '12345', ms, fail_msg="Echo'd label doesn't match the label we sent (should be '12345'): {msg}")
@cases.SpecificationSelector.requiredBySpecification('IRCv3.2')
def testLabeledNoticeResponsesToSelf(self):
self.connectClient('foo', capabilities=['batch', 'echo-message', 'draft/labeled-response'], skip_if_cap_nak=True)
self.getMessages(1)
self.sendLine(1, '@draft/label=12345 NOTICE foo :hi')
m1 = self.getMessage(1)
m2 = self.getMessage(1)
number_of_labels = 0
for m in [m1, m2]:
self.assertMessageEqual(m, command='NOTICE', fail_msg="Got a message back that wasn't a NOTICE")
if 'draft/label' in m.tags:
number_of_labels += 1
self.assertEqual(m.tags['draft/label'], '12345', m, fail_msg="Echo'd label doesn't match the label we sent (should be '12345'): {msg}")
self.assertEqual(number_of_labels, 1, m1, fail_msg="When sending a NOTICE to self with echo-message, we only expect one message to contain the label. Instead, {} messages had the label".format(number_of_labels))
@cases.SpecificationSelector.requiredBySpecification('IRCv3.2')
def testLabeledTagMsgResponsesToClient(self):
self.connectClient('foo', capabilities=['batch', 'echo-message', 'draft/labeled-response', 'draft/message-tags-0.2'], skip_if_cap_nak=True)
self.getMessages(1)
self.connectClient('bar', capabilities=['batch', 'echo-message', 'draft/labeled-response', 'draft/message-tags-0.2'], skip_if_cap_nak=True)
self.getMessages(2)
self.sendLine(1, '@draft/label=12345;+draft/reply=123;+draft/react=l😃l TAGMSG bar')
m = self.getMessage(1)
m2 = self.getMessage(2)
# ensure the label isn't sent to recipient
self.assertMessageEqual(m2, command='TAGMSG', fail_msg='No TAGMSG received by the target after sending one out')
self.assertNotIn('draft/label', m2.tags, m2, fail_msg="When sending a TAGMSG with a label, the target user shouldn't receive the label (only the sending user should): {msg}")
self.assertIn('+draft/reply', m2.tags, m2, fail_msg="Reply tag wasn't present on the target user's TAGMSG: {msg}")
self.assertEqual(m2.tags['+draft/reply'], '123', m2, fail_msg="Reply tag wasn't the same on the target user's TAGMSG: {msg}")
self.assertIn('+draft/react', m2.tags, m2, fail_msg="React tag wasn't present on the target user's TAGMSG: {msg}")
self.assertEqual(m2.tags['+draft/react'], 'l😃l', m2, fail_msg="React tag wasn't the same on the target user's TAGMSG: {msg}")
self.assertMessageEqual(m, command='TAGMSG', fail_msg='No TAGMSG echo received after sending one out')
self.assertIn('draft/label', m.tags, m, fail_msg="When sending a TAGMSG with a label, the echo'd message didn't contain the label at all: {msg}")
self.assertEqual(m.tags['draft/label'], '12345', m, fail_msg="Echo'd TAGMSG to a client did not contain the same label we sent it with(should be '12345'): {msg}")
self.assertIn('+draft/reply', m.tags, m, fail_msg="Reply tag wasn't present on the source user's TAGMSG: {msg}")
self.assertEqual(m2.tags['+draft/reply'], '123', m, fail_msg="Reply tag wasn't the same on the source user's TAGMSG: {msg}")
self.assertIn('+draft/react', m.tags, m, fail_msg="React tag wasn't present on the source user's TAGMSG: {msg}")
self.assertEqual(m2.tags['+draft/react'], 'l😃l', m, fail_msg="React tag wasn't the same on the source user's TAGMSG: {msg}")
@cases.SpecificationSelector.requiredBySpecification('IRCv3.2')
def testLabeledTagMsgResponsesToChannel(self):
self.connectClient('foo', capabilities=['batch', 'echo-message', 'draft/labeled-response', 'draft/message-tags-0.2'], skip_if_cap_nak=True)
self.getMessages(1)
self.connectClient('bar', capabilities=['batch', 'echo-message', 'draft/labeled-response', 'draft/message-tags-0.2'], skip_if_cap_nak=True)
self.getMessages(2)
# join channels
self.sendLine(1, 'JOIN #test')
self.getMessages(1)
self.sendLine(2, 'JOIN #test')
self.getMessages(2)
self.getMessages(1)
self.sendLine(1, '@draft/label=12345;+draft/reply=123;+draft/react=l😃l TAGMSG #test')
ms = self.getMessage(1)
mt = self.getMessage(2)
# ensure the label isn't sent to recipient
self.assertMessageEqual(mt, command='TAGMSG', fail_msg='No TAGMSG received by the target after sending one out')
self.assertNotIn('draft/label', mt.tags, mt, fail_msg="When sending a TAGMSG with a label, the target user shouldn't receive the label (only the sending user should): {msg}")
# ensure sender correctly receives msg
self.assertMessageEqual(ms, command='TAGMSG', fail_msg="Got a message back that wasn't a TAGMSG")
self.assertIn('draft/label', ms.tags, ms, fail_msg="When sending a TAGMSG with a label, the source user should receive the label but didn't: {msg}")
self.assertEqual(ms.tags['draft/label'], '12345', ms, fail_msg="Echo'd label doesn't match the label we sent (should be '12345'): {msg}")
@cases.SpecificationSelector.requiredBySpecification('IRCv3.2')
def testLabeledTagMsgResponsesToSelf(self):
self.connectClient('foo', capabilities=['batch', 'echo-message', 'draft/labeled-response', 'draft/message-tags-0.2'], skip_if_cap_nak=True)
self.getMessages(1)
self.sendLine(1, '@draft/label=12345;+draft/reply=123;+draft/react=l😃l TAGMSG foo')
m1 = self.getMessage(1)
m2 = self.getMessage(1)
number_of_labels = 0
for m in [m1, m2]:
self.assertMessageEqual(m, command='TAGMSG', fail_msg="Got a message back that wasn't a TAGMSG")
if 'draft/label' in m.tags:
number_of_labels += 1
self.assertEqual(m.tags['draft/label'], '12345', m, fail_msg="Echo'd label doesn't match the label we sent (should be '12345'): {msg}")
self.assertEqual(number_of_labels, 1, m1, fail_msg="When sending a TAGMSG to self with echo-message, we only expect one message to contain the label. Instead, {} messages had the label".format(number_of_labels))

View File

@ -0,0 +1,63 @@
"""
Section 3.2 of RFC 2812
<https://tools.ietf.org/html/rfc2812#section-3.3>
"""
from irctest import cases
class PrivmsgTestCase(cases.BaseServerTestCase):
@cases.SpecificationSelector.requiredBySpecification('RFC1459', 'RFC2812')
def testPrivmsg(self):
"""<https://tools.ietf.org/html/rfc2812#section-3.3.1>"""
self.connectClient('foo')
self.sendLine(1, 'JOIN #chan')
self.connectClient('bar')
self.sendLine(2, 'JOIN #chan')
self.getMessages(2) # synchronize
self.sendLine(1, 'PRIVMSG #chan :hello there')
self.getMessages(1) # synchronize
pms = [msg for msg in self.getMessages(2) if msg.command == 'PRIVMSG']
self.assertEqual(len(pms), 1)
self.assertMessageEqual(
pms[0],
command='PRIVMSG',
params=['#chan', 'hello there']
)
@cases.SpecificationSelector.requiredBySpecification('RFC1459', 'RFC2812')
def testPrivmsgNonexistentChannel(self):
"""<https://tools.ietf.org/html/rfc2812#section-3.3.1>"""
self.connectClient('foo')
self.sendLine(1, 'PRIVMSG #nonexistent :hello there')
msg = self.getMessage(1)
# ERR_NOSUCHNICK, ERR_NOSUCHCHANNEL, or ERR_CANNOTSENDTOCHAN
self.assertIn(msg.command, ('401', '403', '404'))
class NoticeTestCase(cases.BaseServerTestCase):
@cases.SpecificationSelector.requiredBySpecification('RFC1459', 'RFC2812')
def testNotice(self):
"""<https://tools.ietf.org/html/rfc2812#section-3.3.2>"""
self.connectClient('foo')
self.sendLine(1, 'JOIN #chan')
self.connectClient('bar')
self.sendLine(2, 'JOIN #chan')
self.getMessages(2) # synchronize
self.sendLine(1, 'NOTICE #chan :hello there')
self.getMessages(1) # synchronize
notices = [msg for msg in self.getMessages(2) if msg.command == 'NOTICE']
self.assertEqual(len(notices), 1)
self.assertMessageEqual(
notices[0],
command='NOTICE',
params=['#chan', 'hello there']
)
@cases.SpecificationSelector.requiredBySpecification('RFC1459', 'RFC2812')
def testNoticeNonexistentChannel(self):
"""
'automatic replies MUST NEVER be sent in response to a NOTICE message'
https://tools.ietf.org/html/rfc2812#section-3.3.2>
"""
self.connectClient('foo')
self.sendLine(1, 'NOTICE #nonexistent :hello there')
self.assertEqual(self.getMessages(1), [])

View File

@ -4,7 +4,6 @@ Tests METADATA features.
""" """
from irctest import cases from irctest import cases
from irctest.irc_utils.message_parser import Message
class MetadataTestCase(cases.BaseServerTestCase): class MetadataTestCase(cases.BaseServerTestCase):
valid_metadata_keys = {'valid_key1', 'valid_key2'} valid_metadata_keys = {'valid_key1', 'valid_key2'}

View File

@ -5,6 +5,7 @@
from irctest import cases from irctest import cases
from irctest.client_mock import NoMessageException from irctest.client_mock import NoMessageException
from irctest.basecontrollers import NotImplementedByController from irctest.basecontrollers import NotImplementedByController
from irctest.numerics import RPL_MONLIST, RPL_ENDOFMONLIST
class EchoMessageTestCase(cases.BaseServerTestCase): class EchoMessageTestCase(cases.BaseServerTestCase):
def check_server_support(self): def check_server_support(self):
@ -88,7 +89,7 @@ class EchoMessageTestCase(cases.BaseServerTestCase):
self.assertMonoffline(1, 'bar') self.assertMonoffline(1, 'bar')
@cases.SpecificationSelector.requiredBySpecification('IRCv3.2') @cases.SpecificationSelector.requiredBySpecification('IRCv3.2')
def testMonitorOneConnection(self): def testMonitorOneConnectionWithQuit(self):
self.connectClient('foo') self.connectClient('foo')
self.check_server_support() self.check_server_support()
self.connectClient('bar') self.connectClient('bar')
@ -220,3 +221,37 @@ class EchoMessageTestCase(cases.BaseServerTestCase):
self.assertEqual(l, [], self.assertEqual(l, [],
fail_msg='Got response to unmonitored client: {}', fail_msg='Got response to unmonitored client: {}',
extra_format=(l,)) extra_format=(l,))
@cases.SpecificationSelector.requiredBySpecification('IRCv3.2')
def testMonitorList(self):
def checkMonitorSubjects(messages, client_nick, expected_targets):
# collect all the RPL_MONLIST nicks into a set:
result = set()
for message in messages:
if message.command == RPL_MONLIST:
self.assertEqual(message.params[0], client_nick)
result.update(message.params[1].split(','))
# finally, RPL_ENDOFMONLIST should be sent
self.assertEqual(messages[-1].command, RPL_ENDOFMONLIST)
self.assertEqual(messages[-1].params[0], client_nick)
self.assertEqual(result, expected_targets)
self.connectClient('bar')
self.check_server_support()
self.sendLine(1, 'MONITOR L')
checkMonitorSubjects(self.getMessages(1), 'bar', set())
self.sendLine(1, 'MONITOR + qux')
self.getMessages(1)
self.sendLine(1, 'MONITOR L')
checkMonitorSubjects(self.getMessages(1), 'bar', {'qux',})
self.sendLine(1, 'MONITOR + bazbat')
self.getMessages(1)
self.sendLine(1, 'MONITOR L')
checkMonitorSubjects(self.getMessages(1), 'bar', {'qux', 'bazbat',})
self.sendLine(1, 'MONITOR - qux')
self.getMessages(1)
self.sendLine(1, 'MONITOR L')
checkMonitorSubjects(self.getMessages(1), 'bar', {'bazbat',})

View File

@ -0,0 +1,32 @@
"""
Tests multi-prefix.
<http://ircv3.net/specs/extensions/multi-prefix-3.1.html>
"""
from irctest import cases
class MultiPrefixTestCase(cases.BaseServerTestCase):
@cases.SpecificationSelector.requiredBySpecification('IRCv3.1')
def testMultiPrefix(self):
"""“When requested, the multi-prefix client capability will cause the
IRC server to send all possible prefixes which apply to a user in NAMES
and WHO output.
These prefixes MUST be in order of rank, from highest to lowest.
"""
self.connectClient('foo', capabilities=['multi-prefix'])
self.joinChannel(1, '#chan')
self.sendLine(1, 'MODE #chan +v foo')
self.getMessages(1)
#TODO(dan): Make sure +v is voice
self.sendLine(1, 'NAMES #chan')
self.assertMessageEqual(self.getMessage(1), command='353', params=['foo', '=', '#chan', '@+foo'], fail_msg='Expected NAMES response (353) with @+foo, got: {msg}')
self.getMessages(1)
self.sendLine(1, 'WHO #chan')
msg = self.getMessage(1)
self.assertEqual(msg.command, '352', msg, fail_msg='Expected WHO response (352), got: {msg}')
self.assertGreaterEqual(len(msg.params), 8, 'Expected WHO response (352) with 8 params, got: {msg}'.format(msg=msg))
self.assertTrue('@+' in msg.params[6], 'Expected WHO response (352) with "@+" in param 7, got: {msg}'.format(msg=msg))

View File

@ -0,0 +1,50 @@
"""
Regression tests for bugs in oragono.
"""
from irctest import cases
class RegressionsTestCase(cases.BaseServerTestCase):
@cases.SpecificationSelector.requiredBySpecification('RFC1459')
def testFailedNickChange(self):
# see oragono commit d0ded906d4ac8f
self.connectClient('alice')
self.connectClient('bob')
# bob tries to change to an in-use nickname; this MUST fail
self.sendLine(2, 'NICK alice')
ms = self.getMessages(2)
self.assertEqual(len(ms), 1)
self.assertMessageEqual(ms[0], command='433') # ERR_NICKNAMEINUSE
# bob MUST still own the bob nick, and be able to receive PRIVMSG as bob
self.sendLine(1, 'PRIVMSG bob hi')
ms = self.getMessages(1)
self.assertEqual(len(ms), 0)
ms = self.getMessages(2)
self.assertEqual(len(ms), 1)
self.assertMessageEqual(ms[0], command='PRIVMSG', params=['bob', 'hi'])
@cases.SpecificationSelector.requiredBySpecification('RFC1459')
def testCaseChanges(self):
self.connectClient('alice')
self.joinChannel(1, '#test')
self.connectClient('bob')
self.joinChannel(2, '#test')
self.getMessages(1)
self.getMessages(2)
# case change: both alice and bob should get a successful nick line
self.sendLine(1, 'NICK Alice')
ms = self.getMessages(1)
self.assertEqual(len(ms), 1)
self.assertMessageEqual(ms[0], command='NICK', params=['Alice'])
ms = self.getMessages(2)
self.assertEqual(len(ms), 1)
self.assertMessageEqual(ms[0], command='NICK', params=['Alice'])
# bob should not get notified on no-op nick change
self.sendLine(1, 'NICK Alice')
ms = self.getMessages(2)
self.assertEqual(ms, [])

View File

@ -1,7 +1,6 @@
import base64 import base64
from irctest import cases from irctest import cases
from irctest.irc_utils.message_parser import Message
class RegistrationTestCase(cases.BaseServerTestCase): class RegistrationTestCase(cases.BaseServerTestCase):
def testRegistration(self): def testRegistration(self):
@ -129,6 +128,10 @@ class SaslTestCase(cases.BaseServerTestCase, cases.OptionalityHelper):
self.sendLine(1, 'AUTHENTICATE {}'.format(authstring[0:400])) self.sendLine(1, 'AUTHENTICATE {}'.format(authstring[0:400]))
self.sendLine(1, 'AUTHENTICATE {}'.format(authstring[400:])) self.sendLine(1, 'AUTHENTICATE {}'.format(authstring[400:]))
self.confirmSuccessfulAuth()
def confirmSuccessfulAuth(self):
# TODO: check username/etc in this as well, so we can apply it to other tests
# TODO: may be in the other order # TODO: may be in the other order
m = self.getRegistrationMessage(1) m = self.getRegistrationMessage(1)
self.assertMessageEqual(m, command='900', self.assertMessageEqual(m, command='900',
@ -136,7 +139,7 @@ class SaslTestCase(cases.BaseServerTestCase, cases.OptionalityHelper):
'login, but got: {msg}') 'login, but got: {msg}')
m = self.getRegistrationMessage(1) m = self.getRegistrationMessage(1)
self.assertMessageEqual(m, command='903', self.assertMessageEqual(m, command='903',
fail_msg='Expected 900 (RPL_LOGGEDIN) after successful ' fail_msg='Expected 903 (RPL_SASLSUCCESS) after successful '
'login, but got: {msg}') 'login, but got: {msg}')
# TODO: add a test for when the length of the authstring is greater than 800. # TODO: add a test for when the length of the authstring is greater than 800.
@ -171,15 +174,7 @@ class SaslTestCase(cases.BaseServerTestCase, cases.OptionalityHelper):
self.sendLine(1, 'AUTHENTICATE {}'.format(authstring)) self.sendLine(1, 'AUTHENTICATE {}'.format(authstring))
self.sendLine(1, 'AUTHENTICATE +') self.sendLine(1, 'AUTHENTICATE +')
# TODO: may be in the other order self.confirmSuccessfulAuth()
m = self.getRegistrationMessage(1)
self.assertMessageEqual(m, command='900',
fail_msg='Expected 900 (RPL_LOGGEDIN) after successful '
'login, but got: {msg}')
m = self.getRegistrationMessage(1)
self.assertMessageEqual(m, command='903',
fail_msg='Expected 900 (RPL_LOGGEDIN) after successful '
'login, but got: {msg}')
# TODO: add a test for when the length of the authstring is 800. # TODO: add a test for when the length of the authstring is 800.
# I don't know how to do it, because it would make the registration # I don't know how to do it, because it would make the registration

View File

@ -1,62 +0,0 @@
"""
<http://ircv3.net/specs/extensions/tls-3.1.html>
"""
from irctest import cases
from irctest.basecontrollers import NotImplementedByController
class StarttlsFailTestCase(cases.BaseServerTestCase):
@cases.SpecificationSelector.requiredBySpecification('IRCv3.1')
def testStarttlsRequestTlsFail(self):
"""<http://ircv3.net/specs/extensions/tls-3.1.html>
"""
self.addClient()
# TODO: check also without this
self.sendLine(1, 'CAP LS')
capabilities = self.getCapLs(1)
if 'tls' not in capabilities:
raise NotImplementedByController('tls')
# TODO: check also without this
self.sendLine(1, 'CAP REQ :tls')
m = self.getRegistrationMessage(1)
# TODO: Remove this once the trailing space issue is fixed in Charybdis
# and Mammon:
#self.assertMessageEqual(m, command='CAP', params=['*', 'ACK', 'tls'],
# fail_msg='Did not ACK capability `tls`: {msg}')
self.sendLine(1, 'STARTTLS')
m = self.getRegistrationMessage(1)
self.assertMessageEqual(m, command='691',
fail_msg='Did not respond to STARTTLS with 691 whereas '
'SSL is not configured: {msg}.')
class StarttlsTestCase(cases.BaseServerTestCase):
ssl = True
def testStarttlsRequestTls(self):
"""<http://ircv3.net/specs/extensions/tls-3.1.html>
"""
self.addClient()
# TODO: check also without this
self.sendLine(1, 'CAP LS')
capabilities = self.getCapLs(1)
if 'tls' not in capabilities:
raise NotImplementedByController('tls')
# TODO: check also without this
self.sendLine(1, 'CAP REQ :tls')
m = self.getRegistrationMessage(1)
# TODO: Remove this one the trailing space issue is fixed in Charybdis
# and Mammon:
#self.assertMessageEqual(m, command='CAP', params=['*', 'ACK', 'tls'],
# fail_msg='Did not ACK capability `tls`: {msg}')
self.sendLine(1, 'STARTTLS')
m = self.getRegistrationMessage(1)
self.assertMessageEqual(m, command='670',
fail_msg='Did not respond to STARTTLS with 670: {msg}.')
self.clients[1].starttls()
self.sendLine(1, 'USER f * * :foo')
self.sendLine(1, 'NICK foo')
self.sendLine(1, 'CAP END')
self.getMessages(1)

View File

@ -0,0 +1,57 @@
"""
User commands as specified in Section 3.6 of RFC 2812:
<https://tools.ietf.org/html/rfc2812#section-3.6>
"""
from irctest import cases
from irctest.numerics import RPL_WHOISUSER, RPL_WHOISCHANNELS, RPL_AWAY, RPL_NOWAWAY, RPL_UNAWAY
class WhoisTestCase(cases.BaseServerTestCase):
@cases.SpecificationSelector.requiredBySpecification('RFC2812')
def testWhoisUser(self):
"""Test basic WHOIS behavior"""
nick = 'myCoolNickname'
username = 'myCoolUsername'
realname = 'My Real Name'
self.addClient()
self.sendLine(1, f'NICK {nick}')
self.sendLine(1, f'USER {username} 0 * :{realname}')
self.skipToWelcome(1)
self.connectClient('otherNickname')
self.getMessages(2)
self.sendLine(2, 'WHOIS mycoolnickname')
messages = self.getMessages(2)
whois_user = messages[0]
self.assertEqual(whois_user.command, RPL_WHOISUSER)
# "<client> <nick> <username> <host> * :<realname>"
self.assertEqual(whois_user.params[1], nick)
self.assertIn(whois_user.params[2], ('~' + username, '~' + username[0:9]))
# dumb regression test for oragono/oragono#355:
self.assertNotIn(whois_user.params[3], [nick, username, '~' + username, realname])
self.assertEqual(whois_user.params[5], realname)
class AwayTestCase(cases.BaseServerTestCase):
@cases.SpecificationSelector.requiredBySpecification('RFC2812')
def testAway(self):
self.connectClient('bar')
self.sendLine(1, "AWAY :I'm not here right now")
replies = self.getMessages(1)
self.assertIn(RPL_NOWAWAY, [msg.command for msg in replies])
self.connectClient('qux')
self.sendLine(2, "PRIVMSG bar :what's up")
replies = self.getMessages(2)
self.assertEqual(len(replies), 1)
self.assertEqual(replies[0].command, RPL_AWAY)
self.assertEqual(replies[0].params, ['qux', 'bar', "I'm not here right now"])
self.sendLine(1, "AWAY")
replies = self.getMessages(1)
self.assertIn(RPL_UNAWAY, [msg.command for msg in replies])
self.sendLine(2, "PRIVMSG bar :what's up")
replies = self.getMessages(2)
self.assertEqual(len(replies), 0)

View File

@ -1,3 +1,3 @@
limnoria > 2012.08.04 # Needs MultipleReplacer, from 1a64f105 limnoria > 2012.08.04 # Needs MultipleReplacer, from 1a64f105
psutil >= 3.1.0 # Fixes #640
ecdsa ecdsa
pyxmpp2_scram

View File

@ -15,7 +15,7 @@ with open(os.path.join(os.path.dirname(__file__), 'requirements.txt')) as fd:
setup( setup(
name='irctest', name='irctest',
version='0.1', version='0.1.2',
author='Valentin Lorentz', author='Valentin Lorentz',
url='https://github.com/ProgVal/irctest/', url='https://github.com/ProgVal/irctest/',
author_email='progval+irctest@progval.net', author_email='progval+irctest@progval.net',