56 Commits

Author SHA1 Message Date
cab27be1f5 fix incorrect type for empty tags 2021-02-15 21:36:55 +01:00
fc6bd4968d add a test for case changes 2021-02-15 21:36:55 +01:00
c2adc33109 remove starttls test 2021-02-15 21:36:55 +01:00
4519927265 add a test for PART messages 2021-02-15 21:36:55 +01:00
42f3a1f2fe add an away test 2021-02-15 21:36:55 +01:00
f3ff2a424b deflake another test 2021-02-15 21:13:12 +01:00
0c03a8e1c7 expand pyflakes list 2021-02-15 21:12:06 +01:00
7a05081960 add makefile 2021-02-15 21:11:56 +01:00
ee2e77b1e6 deflake registration tests by waiting for quit 2021-02-15 21:11:15 +01:00
9ff2239943 test RPL_ENDOFMONLIST responses 2021-02-15 21:06:18 +01:00
5f75f231a4 add list of numerics, start using them 2021-02-15 21:06:05 +01:00
b3a4a18885 kick privileges test 2021-02-15 21:05:19 +01:00
24df4e4496 fix lots of pyflakes3 failures 2021-02-15 21:00:51 +01:00
2b8aeacd4a tests for RPL_NOTOPIC 2021-02-15 20:59:03 +01:00
fc15ff09c0 fix testWhoisUser for charybdis. 2021-02-15 20:58:52 +01:00
7bf7df8ad1 basic whois test 2021-02-15 20:58:30 +01:00
6abc5b4f98 channel quit test case 2021-02-15 20:53:08 +01:00
cefbabf5c3 test incorrect PASS passwords 2021-02-15 20:52:49 +01:00
9b6f65b622 add regression tests 2021-02-15 20:52:00 +01:00
034fe5c51c framework enhancements 2021-02-15 20:47:08 +01:00
911c0ded04 add a test for INVITE 2021-02-15 20:45:08 +01:00
9b99c6ce20 Fix compatibility with return value of SSLSocket.sendall in python >= 3.6.
https://bugs.python.org/issue25951
2021-02-14 23:18:51 +01:00
e6b1ca5521 remove psutil 2021-02-14 22:59:19 +01:00
ac2671acb0 limnoria: add support for STS. 2021-02-14 22:22:01 +01:00
8a81224ba8 Fix ecdsa tests to use the same protocol as Atheme.
Which requires not hashing the challenge.
2019-12-26 12:10:45 +01:00
442c57e6c6 Temporarily disabling sts on Limnoria until it's released. 2019-12-09 22:13:55 +01:00
f8faec77f1 Ignore return value of sendall; it's not None on py < 3.6.
https://bugs.python.org/issue25951
2019-12-08 22:36:56 +01:00
71d9315813 Add STS tests. 2019-12-08 21:26:21 +01:00
857e8d195e Fix exception handling in testUntrustedCertificate on closed connection. 2019-10-22 18:28:39 +02:00
19a0623e91 echo-message: Check server-time more accurately, and handle slight timing differences due to late application 2018-07-12 15:19:07 +02:00
de4c51e744 oragono: Fix ACC command 2018-07-12 15:19:07 +02:00
1198504f74 Add tests for labeled-responses 2018-07-12 15:17:00 +02:00
5b319a46ee Make message tests less fragile 2018-07-12 15:17:00 +02:00
5950d97926 Bump version number to 0.1.2. 2017-12-10 20:21:29 +01:00
b35a7f7a60 Fix merge. 2017-12-10 20:21:02 +01:00
f47584589b Add tests for nonexistent channels
Inspired by oragono issue #165.
2017-12-09 09:44:45 +01:00
0804b78572 Add multi-prefix testcase 2017-12-09 09:43:58 +01:00
aecfc26a63 tls -> starttls, to match feature name better 2017-12-09 09:43:46 +01:00
f9f13961eb controllers: Add hybrid controller 2017-12-09 09:43:34 +01:00
2bc3cafd25 Make openssl binary configurable, for OSX 2017-12-09 09:43:19 +01:00
86d26d6121 charybdis: New releases name the binary 'charybdis' rather than 'ircd' 2017-12-09 09:42:58 +01:00
3d0b493a11 Don't send empty CAP REQ 2017-12-09 09:41:53 +01:00
711de43b22 Fix channel deterministic joining s'more 2017-12-09 09:41:45 +01:00
8b52ceeee3 Make tests around joining channels more deterministic 2017-12-09 09:41:22 +01:00
6d9c06096b channels: Check server casemapping before doing mapping checks 2017-12-09 09:38:26 +01:00
9575987555 For SCRAM, check clients send an empty response at the end.
https://github.com/ircv3/ircv3-specifications/pull/326
2017-11-15 17:11:44 +01:00
15a92ccf0b oragono: Allow TLS tests 2017-10-01 13:15:46 +02:00
0c12e0ed20 oragono: Use new registration command 2017-07-28 20:19:46 +02:00
f71badbbc1 oragono: Fix config so it loads 2017-07-28 20:19:38 +02:00
41f0418df7 Add gIRC controller 2017-01-11 00:35:22 +01:00
2a55c85c5a gitignore: Use gitignore.io 2017-01-11 00:35:07 +01:00
0c7358c0a5 test_sasl: Unify successful auth checking a bit more 2017-01-11 00:29:14 +01:00
6326af34cc Add Oragono IRCd 2017-01-11 00:29:03 +01:00
ba1fe57248 Bump version number. 2017-01-11 00:07:55 +01:00
6baee70852 Add tests for SCRAM. 2017-01-11 00:07:25 +01:00
2bdcba3da5 Update README. 2016-11-20 09:18:25 +01:00
38 changed files with 1587 additions and 165 deletions

104
.gitignore vendored
View File

@ -1,10 +1,73 @@
# Text editors
*~
*.swp
# Created by https://www.gitignore.io/api/windows,osx,linux,python
### Windows ###
# Windows image file caches
Thumbs.db
ehthumbs.db
# Folder config file
Desktop.ini
# Recycle Bin used on file shares
$RECYCLE.BIN/
# Windows Installer files
*.cab
*.msi
*.msm
*.msp
# Windows shortcuts
*.lnk
### OSX ###
*.DS_Store
.AppleDouble
.LSOverride
# Icon must end with two \r
Icon
# Thumbnails
._*
# Files that might appear in the root of a volume
.DocumentRevisions-V100
.fseventsd
.Spotlight-V100
.TemporaryItems
.Trashes
.VolumeIcon.icns
.com.apple.timemachine.donotpresent
# Directories potentially created on remote AFP share
.AppleDB
.AppleDesktop
Network Trash Folder
Temporary Items
.apdisk
### Linux ###
*~
# temporary files which can be created if a process still has a handle open of a deleted file
.fuse_hidden*
# KDE directory preferences
.directory
# Linux trash folder which might appear on any partition or disk
.Trash-*
# .nfs files are created when an open file is removed but is still being accessed
.nfs*
### Python ###
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class
# C extensions
*.so
@ -46,6 +109,7 @@ htmlcov/
nosetests.xml
coverage.xml
*,cover
.hypothesis/
# Translations
*.mo
@ -53,9 +117,43 @@ coverage.xml
# Django stuff:
*.log
local_settings.py
# Flask stuff:
instance/
.webassets-cache
# Scrapy stuff:
.scrapy
# Sphinx documentation
docs/_build/
# PyBuilder
target/
# Jupyter Notebook
.ipynb_checkpoints
# pyenv
.python-version
# celery beat schedule file
celerybeat-schedule
# dotenv
.env
# virtualenv
.venv/
venv/
ENV/
# Spyder project settings
.spyderproject
# Rope project settings
.ropeproject
# vim swapfiles
*.swp

5
Makefile Normal file
View File

@ -0,0 +1,5 @@
.PHONY: oragono
oragono:
pyflakes3 ./irctest/cases.py ./irctest/client_mock.py ./irctest/controllers/oragono.py irctest/server_tests/*.py
./test.py irctest.controllers.oragono

View File

@ -113,3 +113,4 @@ or anything near that.
At best, `irctest` can help you find issues in your software, but it may
still have false positives (because it does not implement itself a
full-featured client/server, so it supports only “usual” behavior).
Bug reports for false positives are welcome.

View File

@ -28,6 +28,7 @@ def main(args):
file=sys.stderr)
exit(1)
_IrcTestCase.controllerClass = controller_class
_IrcTestCase.controllerClass.openssl_bin = args.openssl_bin
_IrcTestCase.show_io = args.show_io
_IrcTestCase.strictTests = not args.loose
if args.specification:
@ -63,6 +64,8 @@ parser = argparse.ArgumentParser(
description='A script to test interoperability of IRC software.')
parser.add_argument('module', type=str,
help='The module used to run the tested program.')
parser.add_argument('--openssl-bin', type=str, default='openssl',
help='The openssl binary to use')
parser.add_argument('--show-io', action='store_true',
help='Show input/outputs with the tested program.')
parser.add_argument('-v', '--verbose', action='count', default=1,

View File

@ -8,9 +8,11 @@ class Mechanisms(enum.Enum):
def as_string(cls, mech):
return {cls.plain: 'PLAIN',
cls.ecdsa_nist256p_challenge: 'ECDSA-NIST256P-CHALLENGE',
cls.scram_sha_256: 'SCRAM-SHA-256',
}[mech]
plain = 1
ecdsa_nist256p_challenge = 2
scram_sha_256 = 3
Authentication = collections.namedtuple('Authentication',
'mechanisms username password ecdsa_key')

View File

@ -4,7 +4,6 @@ import socket
import tempfile
import time
import subprocess
import psutil
from .runner import NotImplementedByController
@ -39,6 +38,11 @@ class DirectoryBasedController(_BaseController):
self.kill_proc()
if self.directory:
shutil.rmtree(self.directory)
def terminate(self):
"""Stops the process gracefully, and does not clean its config."""
self.proc.terminate()
self.proc.wait()
self.proc = None
def open_file(self, name, mode='a'):
"""Open a file in the configuration directory."""
assert self.directory
@ -49,22 +53,28 @@ class DirectoryBasedController(_BaseController):
assert os.path.isdir(dir_)
return open(os.path.join(self.directory, name), mode)
def create_config(self):
"""If there is no config dir, creates it and returns True.
Else returns False."""
if self.directory:
return False
else:
self.directory = tempfile.mkdtemp()
return True
def gen_ssl(self):
self.csr_path = os.path.join(self.directory, 'ssl.csr')
self.key_path = os.path.join(self.directory, 'ssl.key')
self.pem_path = os.path.join(self.directory, 'ssl.pem')
self.dh_path = os.path.join(self.directory, 'dh.pem')
subprocess.check_output(['openssl', 'req', '-new', '-newkey', 'rsa',
subprocess.check_output([self.openssl_bin, 'req', '-new', '-newkey', 'rsa',
'-nodes', '-out', self.csr_path, '-keyout', self.key_path,
'-batch'],
stderr=subprocess.DEVNULL)
subprocess.check_output(['openssl', 'x509', '-req',
subprocess.check_output([self.openssl_bin, 'x509', '-req',
'-in', self.csr_path, '-signkey', self.key_path,
'-out', self.pem_path],
stderr=subprocess.DEVNULL)
subprocess.check_output(['openssl', 'dhparam',
subprocess.check_output([self.openssl_bin, 'dhparam',
'-out', self.dh_path, '128'],
stderr=subprocess.DEVNULL)
@ -80,11 +90,13 @@ class BaseServerController(_BaseController):
valid_metadata_keys, invalid_metadata_keys):
raise NotImplementedError()
def registerUser(self, case, username, password=None):
raise NotImplementedByController('registration')
raise NotImplementedByController('account registration')
def wait_for_port(self):
while not self.port_open:
time.sleep(0.1)
for conn in psutil.Process(self.proc.pid).connections():
if conn.laddr[1] == self.port:
try:
c = socket.create_connection(('localhost', self.port), timeout=1.0)
c.close()
self.port_open = True
except Exception as e:
continue

View File

@ -4,13 +4,11 @@ import socket
import tempfile
import unittest
import functools
import collections
import supybot.utils
from . import runner
from . import client_mock
from . import authentication
from .irc_utils import capabilities
from .irc_utils import message_parser
from .exceptions import ConnectionClosed
@ -56,7 +54,7 @@ class _IrcTestCase(unittest.TestCase):
self.assertEqual(msg.prefix.split('!')[0], nick, msg, fail_msg)
if subcommand is not None or subparams is not None:
self.assertGreater(len(msg.params), 2, fail_msg)
msg_target = msg.params[0]
#msg_target = msg.params[0]
msg_subcommand = msg.params[1]
msg_subparams = msg.params[2:]
if subcommand:
@ -98,7 +96,12 @@ class BaseClientTestCase(_IrcTestCase):
self._setUpServer()
def tearDown(self):
if self.conn:
try:
self.conn.sendall(b'QUIT :end of test.')
except BrokenPipeError:
pass # client disconnected before we did
except OSError:
pass # the conn was already closed by the test, or something
self.controller.kill()
if self.conn:
self.conn_file.close()
@ -110,9 +113,10 @@ class BaseClientTestCase(_IrcTestCase):
self.server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.server.bind(('', 0)) # Bind any free port
self.server.listen(1)
def acceptClient(self, tls_cert=None, tls_key=None):
def acceptClient(self, tls_cert=None, tls_key=None, server=None):
"""Make the server accept a client connection. Blocking."""
(self.conn, addr) = self.server.accept()
server = server or self.server
(self.conn, addr) = server.accept()
if tls_cert is None and tls_key is None:
pass
else:
@ -150,11 +154,9 @@ class BaseClientTestCase(_IrcTestCase):
if not filter_pred or filter_pred(msg):
return msg
def sendLine(self, line):
ret = self.conn.sendall(line.encode())
assert ret is None
self.conn.sendall(line.encode())
if not line.endswith('\r\n'):
ret = self.conn.sendall(b'\r\n')
assert ret is None
self.conn.sendall(b'\r\n')
if self.show_io:
print('{:.3f} S: {}'.format(time.time(), line.strip()))
@ -324,14 +326,17 @@ class BaseServerTestCase(_IrcTestCase):
"""Skip to the point where we are registered
<https://tools.ietf.org/html/rfc2812#section-3.1>
"""
result = []
while True:
m = self.getMessage(client, synchronize=False)
result.append(m)
if m.command == '001':
return m
return result
def connectClient(self, nick, name=None, capabilities=None,
skip_if_cap_nak=False):
client = self.addClient(name)
if capabilities is not None:
skip_if_cap_nak=False, show_io=None):
client = self.addClient(name, show_io=show_io)
if capabilities is not None and 0 < len(capabilities):
self.sendLine(client, 'CAP REQ :{}'.format(' '.join(capabilities)))
m = self.getRegistrationMessage(client)
try:
@ -349,7 +354,7 @@ class BaseServerTestCase(_IrcTestCase):
self.sendLine(client, 'NICK {}'.format(nick))
self.sendLine(client, 'USER username * * :Realname')
self.skipToWelcome(client)
welcome = self.skipToWelcome(client)
self.sendLine(client, 'PING foo')
# Skip all that happy welcoming stuff
@ -364,6 +369,9 @@ class BaseServerTestCase(_IrcTestCase):
else:
(key, value) = (param, None)
self.server_support[key] = value
welcome.append(m)
return welcome
def joinClient(self, client, channel):
self.sendLine(client, 'JOIN {}'.format(channel))
@ -373,6 +381,17 @@ class BaseServerTestCase(_IrcTestCase):
'received responses: {list}',
extra_format=(channel,))
def joinChannel(self, client, channel):
self.sendLine(client, 'JOIN {}'.format(channel))
# wait until we see them join the channel
joined = False
while not joined:
for msg in self.getMessages(client):
# todo: also respond to cannot join channel numeric
if msg.command.upper() == 'JOIN' and 0 < len(msg.params) and msg.params[0].lower() == channel.lower():
joined = True
break
class OptionalityHelper:
def checkSaslSupport(self):
if self.controller.supported_sasl_mechanisms:
@ -400,6 +419,20 @@ class OptionalityHelper:
return f(self)
return newf
def checkCapabilitySupport(self, cap):
if cap in self.controller.supported_capabilities:
return
raise runner.CapabilityNotSupported(cap)
def skipUnlessSupportsCapability(cap):
def decorator(f):
@functools.wraps(f)
def newf(self):
self.checkCapabilitySupport(cap)
return f(self)
return newf
return decorator
class SpecificationSelector:
def requiredBySpecification(*specifications, strict=False):

View File

@ -1,4 +1,5 @@
import ssl
import sys
import time
import socket
from .irc_utils import message_parser
@ -96,7 +97,7 @@ class ClientMock:
ret = self.conn.sendall(encoded_line)
except BrokenPipeError:
raise ConnectionClosed()
if self.ssl: # https://bugs.python.org/issue25951
if sys.version_info <= (3, 6) and self.ssl: # https://bugs.python.org/issue25951
assert ret == len(encoded_line), (ret, repr(encoded_line))
else:
assert ret is None, ret

View File

@ -1,5 +1,10 @@
import hashlib
import ecdsa
from ecdsa.util import sigencode_der, sigdecode_der
import base64
import pyxmpp2_scram as scram
from irctest import cases
from irctest import authentication
from irctest.irc_utils.message_parser import Message
@ -15,6 +20,16 @@ IRX9cyi2wdYg9mUUYyh9GKdBCYHGUJAiCA==
-----END EC PRIVATE KEY-----
"""
CHALLENGE = bytes(range(32))
assert len(CHALLENGE) == 32
class IdentityHash:
def __init__(self, data):
self._data = data
def digest(self):
return self._data
class SaslTestCase(cases.BaseClientTestCase, cases.ClientNegociationHelper,
cases.OptionalityHelper):
@cases.OptionalityHelper.skipUnlessHasMechanism('PLAIN')
@ -138,14 +153,14 @@ class SaslTestCase(cases.BaseClientTestCase, cases.ClientNegociationHelper,
m = self.getMessage()
self.assertEqual(m, Message([], None, 'AUTHENTICATE',
['amlsbGVz'])) # jilles
self.sendLine('AUTHENTICATE Zm9vYmFy') # foobar
self.sendLine('AUTHENTICATE {}'.format(base64.b64encode(CHALLENGE).decode('ascii')))
m = self.getMessage()
self.assertMessageEqual(m, command='AUTHENTICATE')
sk = ecdsa.SigningKey.from_pem(ECDSA_KEY)
vk = sk.get_verifying_key()
signature = base64.b64decode(m.params[0])
try:
vk.verify(signature, b'foobar')
vk.verify(signature, CHALLENGE, hashfunc=IdentityHash, sigdecode=sigdecode_der)
except ecdsa.BadSignatureError:
raise AssertionError('Bad signature')
self.sendLine('900 * * foo :You are now logged in.')
@ -153,6 +168,78 @@ class SaslTestCase(cases.BaseClientTestCase, cases.ClientNegociationHelper,
m = self.negotiateCapabilities(['sasl'], False)
self.assertEqual(m, Message([], None, 'CAP', ['END']))
@cases.OptionalityHelper.skipUnlessHasMechanism('SCRAM-SHA-256')
def testScram(self):
"""Test SCRAM-SHA-256 authentication.
"""
auth = authentication.Authentication(
mechanisms=[authentication.Mechanisms.scram_sha_256],
username='jilles',
password='sesame',
)
class PasswdDb:
def get_password(self, *args):
return ('sesame', 'plain')
authenticator = scram.SCRAMServerAuthenticator('SHA-256',
channel_binding=False, password_database=PasswdDb())
m = self.negotiateCapabilities(['sasl'], auth=auth)
self.assertEqual(m, Message([], None, 'AUTHENTICATE', ['SCRAM-SHA-256']))
self.sendLine('AUTHENTICATE +')
m = self.getMessage()
self.assertEqual(m.command, 'AUTHENTICATE', m)
client_first = base64.b64decode(m.params[0])
response = authenticator.start(properties={}, initial_response=client_first)
assert isinstance(response, bytes), response
self.sendLine('AUTHENTICATE :' + base64.b64encode(response).decode())
m = self.getMessage()
self.assertEqual(m.command, 'AUTHENTICATE', m)
msg = base64.b64decode(m.params[0])
r = authenticator.response(msg)
assert isinstance(r, tuple), r
assert len(r) == 2, r
(properties, response) = r
self.sendLine('AUTHENTICATE :' + base64.b64encode(response).decode())
self.assertEqual(properties, {'authzid': None, 'username': 'jilles'})
m = self.getMessage()
self.assertEqual(m.command, 'AUTHENTICATE', m)
self.assertEqual(m.params, ['+'], m)
@cases.OptionalityHelper.skipUnlessHasMechanism('SCRAM-SHA-256')
def testScramBadPassword(self):
"""Test SCRAM-SHA-256 authentication with a bad password.
"""
auth = authentication.Authentication(
mechanisms=[authentication.Mechanisms.scram_sha_256],
username='jilles',
password='sesame',
)
class PasswdDb:
def get_password(self, *args):
return ('notsesame', 'plain')
authenticator = scram.SCRAMServerAuthenticator('SHA-256',
channel_binding=False, password_database=PasswdDb())
m = self.negotiateCapabilities(['sasl'], auth=auth)
self.assertEqual(m, Message([], None, 'AUTHENTICATE', ['SCRAM-SHA-256']))
self.sendLine('AUTHENTICATE +')
m = self.getMessage()
self.assertEqual(m.command, 'AUTHENTICATE', m)
client_first = base64.b64decode(m.params[0])
response = authenticator.start(properties={}, initial_response=client_first)
assert isinstance(response, bytes), response
self.sendLine('AUTHENTICATE :' + base64.b64encode(response).decode())
m = self.getMessage()
self.assertEqual(m.command, 'AUTHENTICATE', m)
msg = base64.b64decode(m.params[0])
with self.assertRaises(scram.NotAuthorizedException):
authenticator.response(msg)
class Irc302SaslTestCase(cases.BaseClientTestCase, cases.ClientNegociationHelper,
cases.OptionalityHelper):
@cases.OptionalityHelper.skipUnlessHasMechanism('PLAIN')

View File

@ -1,3 +1,6 @@
import socket
import ssl
from irctest import tls
from irctest import cases
from irctest.exceptions import ConnectionClosed
@ -139,5 +142,94 @@ class TlsTestCase(cases.BaseClientTestCase):
tls_config=tls_config,
)
self.acceptClient(tls_cert=BAD_CERT, tls_key=BAD_KEY)
with self.assertRaises(ConnectionClosed):
with self.assertRaises((ConnectionClosed, ConnectionResetError)):
m = self.getMessage()
class StsTestCase(cases.BaseClientTestCase, cases.OptionalityHelper):
def setUp(self):
super().setUp()
self.insecure_server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.insecure_server.bind(('', 0)) # Bind any free port
self.insecure_server.listen(1)
def tearDown(self):
self.insecure_server.close()
super().tearDown()
@cases.OptionalityHelper.skipUnlessSupportsCapability('sts')
def testSts(self):
tls_config = tls.TlsConfig(
enable=False,
trusted_fingerprints=[GOOD_FINGERPRINT])
# Connect client to insecure server
(hostname, port) = self.insecure_server.getsockname()
self.controller.run(
hostname=hostname,
port=port,
auth=None,
tls_config=tls_config,
)
self.acceptClient(server=self.insecure_server)
# Send STS policy to client
m = self.getMessage()
self.assertEqual(m.command, 'CAP',
'First message is not CAP LS.')
self.assertEqual(m.params[0], 'LS',
'First message is not CAP LS.')
self.sendLine('CAP * LS :sts=port={}'.format(self.server.getsockname()[1]))
# "If the client is not already connected securely to the server
# at the requested hostname, it MUST close the insecure connection
# and reconnect securely on the stated port."
self.acceptClient(tls_cert=GOOD_CERT, tls_key=GOOD_KEY)
# Send the STS policy, over secure connection this time
self.sendLine('CAP * LS :sts=duration=10,port={}'.format(
self.server.getsockname()[1]))
# Make the client reconnect. It should reconnect to the secure server.
self.sendLine('ERROR :closing link')
self.acceptClient()
# Kill the client
self.controller.terminate()
# Run the client, still configured to connect to the insecure server
self.controller.run(
hostname=hostname,
port=port,
auth=None,
tls_config=tls_config,
)
# The client should remember the STS policy and connect to the secure
# server
self.acceptClient()
@cases.OptionalityHelper.skipUnlessSupportsCapability('sts')
def testStsInvalidCertificate(self):
# Connect client to insecure server
(hostname, port) = self.insecure_server.getsockname()
self.controller.run(
hostname=hostname,
port=port,
auth=None,
)
self.acceptClient(server=self.insecure_server)
# Send STS policy to client
m = self.getMessage()
self.assertEqual(m.command, 'CAP',
'First message is not CAP LS.')
self.assertEqual(m.params[0], 'LS',
'First message is not CAP LS.')
self.sendLine('CAP * LS :sts=port={}'.format(self.server.getsockname()[1]))
# The client will reconnect to the TLS port. Unfortunately, it does
# not trust its fingerprint.
with self.assertRaises((ssl.SSLError, socket.error)):
self.acceptClient(tls_cert=GOOD_CERT, tls_key=GOOD_KEY)

View File

@ -45,6 +45,7 @@ TEMPLATE_SSL_CONFIG = """
class CharybdisController(BaseServerController, DirectoryBasedController):
software_name = 'Charybdis'
supported_sasl_mechanisms = set()
supported_capabilities = set() # Not exhaustive
def create_config(self):
super().create_config()
with self.open_file('server.conf'):
@ -75,7 +76,7 @@ class CharybdisController(BaseServerController, DirectoryBasedController):
password_field=password_field,
ssl_config=ssl_config,
))
self.proc = subprocess.Popen(['ircd', '-foreground',
self.proc = subprocess.Popen(['charybdis', '-foreground',
'-configfile', os.path.join(self.directory, 'server.conf'),
'-pidfile', os.path.join(self.directory, 'server.pid'),
],

View File

@ -0,0 +1,45 @@
import subprocess
from irctest.basecontrollers import BaseClientController, NotImplementedByController
class GircController(BaseClientController):
software_name = 'gIRC'
supported_sasl_mechanisms = ['PLAIN']
supported_capabilities = set() # Not exhaustive
def __init__(self):
super().__init__()
self.directory = None
self.proc = None
def kill(self):
if self.proc:
self.proc.terminate()
try:
self.proc.wait(5)
except subprocess.TimeoutExpired:
self.proc.kill()
self.proc = None
def __del__(self):
if self.proc:
self.proc.kill()
if self.directory:
self.directory.cleanup()
def run(self, hostname, port, auth, tls_config):
if tls_config:
print(tls_config)
raise NotImplementedByController('TLS options')
args = ['--host', hostname, '--port', str(port), '--quiet']
if auth and auth.username and auth.password:
args += ['--sasl-name', auth.username]
args += ['--sasl-pass', auth.password]
args += ['--sasl-fail-is-ok']
# Runs a client with the config given as arguments
self.proc = subprocess.Popen(['girc_test', 'connect'] + args)
def get_irctest_controller_class():
return GircController

View File

@ -0,0 +1,88 @@
import os
import time
import shutil
import tempfile
import subprocess
from irctest import client_mock
from irctest import authentication
from irctest.basecontrollers import NotImplementedByController
from irctest.basecontrollers import BaseServerController, DirectoryBasedController
TEMPLATE_CONFIG = """
serverinfo {{
name = "My.Little.Server";
sid = "42X";
description = "test server";
{ssl_config}
}};
listen {{
host = "{hostname}";
port = {port};
}};
general {{
disable_auth = yes;
anti_nick_flood = no;
max_nick_changes = 256;
throttle_count = 512;
}};
auth {{
user = "*";
flags = exceed_limit;
{password_field}
}};
"""
TEMPLATE_SSL_CONFIG = """
rsa_private_key_file = "{key_path}";
ssl_certificate_file = "{pem_path}";
ssl_dh_param_file = "{dh_path}";
"""
class HybridController(BaseServerController, DirectoryBasedController):
software_name = 'Hybrid'
supported_sasl_mechanisms = set()
supported_capabilities = set() # Not exhaustive
def create_config(self):
super().create_config()
with self.open_file('server.conf'):
pass
def run(self, hostname, port, password=None, ssl=False,
valid_metadata_keys=None, invalid_metadata_keys=None):
if valid_metadata_keys or invalid_metadata_keys:
raise NotImplementedByController(
'Defining valid and invalid METADATA keys.')
assert self.proc is None
self.create_config()
self.port = port
password_field = 'password = "{}";'.format(password) if password else ''
if ssl:
self.gen_ssl()
ssl_config = TEMPLATE_SSL_CONFIG.format(
key_path=self.key_path,
pem_path=self.pem_path,
dh_path=self.dh_path,
)
else:
ssl_config = ''
with self.open_file('server.conf') as fd:
fd.write(TEMPLATE_CONFIG.format(
hostname=hostname,
port=port,
password_field=password_field,
ssl_config=ssl_config,
))
self.proc = subprocess.Popen(['ircd', '-foreground',
'-configfile', os.path.join(self.directory, 'server.conf'),
'-pidfile', os.path.join(self.directory, 'server.pid'),
],
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL
)
def get_irctest_controller_class():
return HybridController

View File

@ -30,6 +30,8 @@ TEMPLATE_SSL_CONFIG = """
class InspircdController(BaseServerController, DirectoryBasedController):
software_name = 'InspIRCd'
supported_sasl_mechanisms = set()
supported_capabilities = set() # Not exhaustive
def create_config(self):
super().create_config()
with self.open_file('server.conf'):

View File

@ -2,6 +2,7 @@ import os
import subprocess
from irctest import authentication
from irctest import tls
from irctest.basecontrollers import NotImplementedByController
from irctest.basecontrollers import BaseClientController, DirectoryBasedController
@ -9,6 +10,7 @@ TEMPLATE_CONFIG = """
supybot.directories.conf: {directory}/conf
supybot.directories.data: {directory}/data
supybot.directories.migrations: {directory}/migrations
supybot.log.level: DEBUG
supybot.log.stdout.level: {loglevel}
supybot.networks: testnet
@ -27,16 +29,21 @@ supybot.networks.testnet.sasl.mechanisms: {mechanisms}
class LimnoriaController(BaseClientController, DirectoryBasedController):
software_name = 'Limnoria'
supported_sasl_mechanisms = {
'PLAIN', 'ECDSA-NIST256P-CHALLENGE', 'EXTERNAL',
'PLAIN', 'ECDSA-NIST256P-CHALLENGE', 'SCRAM-SHA-256', 'EXTERNAL',
}
supported_capabilities = set(['sts']) # Not exhaustive
def create_config(self):
super().create_config()
create_config = super().create_config()
if create_config:
with self.open_file('bot.conf'):
pass
with self.open_file('conf/users.conf'):
pass
def run(self, hostname, port, auth, tls_config):
def run(self, hostname, port, auth, tls_config=None):
if tls_config is None:
tls_config = tls.TlsConfig(enable=False, trusted_fingerprints=[])
# Runs a client with the config given as arguments
assert self.proc is None
self.create_config()
@ -61,7 +68,8 @@ class LimnoriaController(BaseClientController, DirectoryBasedController):
trusted_fingerprints=' '.join(tls_config.trusted_fingerprints) if tls_config else '',
))
self.proc = subprocess.Popen(['supybot',
os.path.join(self.directory, 'bot.conf')])
os.path.join(self.directory, 'bot.conf')],
stderr=subprocess.STDOUT)
def get_irctest_controller_class():
return LimnoriaController

View File

@ -66,6 +66,8 @@ class MammonController(BaseServerController, DirectoryBasedController):
supported_sasl_mechanisms = {
'PLAIN', 'ECDSA-NIST256P-CHALLENGE',
}
supported_capabilities = set() # Not exhaustive
def create_config(self):
super().create_config()
with self.open_file('server.conf'):

View File

@ -0,0 +1,144 @@
import os
import subprocess
from irctest.basecontrollers import NotImplementedByController
from irctest.basecontrollers import BaseServerController, DirectoryBasedController
TEMPLATE_CONFIG = """
network:
name: OragonoTest
server:
name: oragono.test
listen:
- "{hostname}:{port}"
{tls}
check-ident: false
max-sendq: 16k
connection-limits:
cidr-len-ipv4: 24
cidr-len-ipv6: 120
ips-per-subnet: 16
exempted:
- "127.0.0.1/8"
- "::1/128"
connection-throttling:
enabled: true
cidr-len-ipv4: 32
cidr-len-ipv6: 128
duration: 10m
max-connections: 12
ban-duration: 10m
ban-message: You have attempted to connect too many times within a short duration. Wait a while, and you will be able to connect.
exempted:
- "127.0.0.1/8"
- "::1/128"
accounts:
registration:
enabled: true
verify-timeout: "120h"
enabled-callbacks:
- none # no verification needed, will instantly register successfully
allow-multiple-per-connection: true
authentication-enabled: true
channels:
registration:
enabled: true
datastore:
path: {directory}/ircd.db
limits:
nicklen: 32
channellen: 64
awaylen: 200
kicklen: 390
topiclen: 390
monitor-entries: 100
whowas-entries: 100
chan-list-modes: 60
linelen:
tags: 2048
rest: 2048
"""
class OragonoController(BaseServerController, DirectoryBasedController):
software_name = 'Oragono'
supported_sasl_mechanisms = {
'PLAIN',
}
supported_capabilities = set() # Not exhaustive
def create_config(self):
super().create_config()
with self.open_file('ircd.yaml'):
pass
def kill_proc(self):
self.proc.kill()
def run(self, hostname, port, password=None, ssl=False,
restricted_metadata_keys=None,
valid_metadata_keys=None, invalid_metadata_keys=None):
if valid_metadata_keys or invalid_metadata_keys:
raise NotImplementedByController(
'Defining valid and invalid METADATA keys.')
if password is not None:
#TODO(dan): fix dis
raise NotImplementedByController('PASS command')
self.create_config()
tls_config = ""
if ssl:
self.key_path = os.path.join(self.directory, 'ssl.key')
self.pem_path = os.path.join(self.directory, 'ssl.pem')
tls_config = 'tls-listeners:\n ":{port}":\n key: {key}\n cert: {pem}'.format(
port=port,
key=self.key_path,
pem=self.pem_path,
)
assert self.proc is None
self.port = port
with self.open_file('server.yml') as fd:
fd.write(TEMPLATE_CONFIG.format(
directory=self.directory,
hostname=hostname,
port=port,
tls=tls_config,
))
subprocess.call(['oragono', 'initdb',
'--conf', os.path.join(self.directory, 'server.yml'), '--quiet'])
subprocess.call(['oragono', 'mkcerts',
'--conf', os.path.join(self.directory, 'server.yml'), '--quiet'])
self.proc = subprocess.Popen(['oragono', 'run',
'--conf', os.path.join(self.directory, 'server.yml'), '--quiet'])
def registerUser(self, case, username, password=None):
# XXX: Move this somewhere else when
# https://github.com/ircv3/ircv3-specifications/pull/152 becomes
# part of the specification
client = case.addClient(show_io=False)
case.sendLine(client, 'CAP LS 302')
case.sendLine(client, 'NICK registration_user')
case.sendLine(client, 'USER r e g :user')
case.sendLine(client, 'CAP END')
while case.getRegistrationMessage(client).command != '001':
pass
case.getMessages(client)
case.sendLine(client, 'ACC REGISTER {} * {}'.format(
username, password))
msg = case.getMessage(client)
assert msg.command == '920', msg
case.sendLine(client, 'QUIT')
case.assertDisconnected(client)
def get_irctest_controller_class():
return OragonoController

View File

@ -24,6 +24,8 @@ class SopelController(BaseClientController):
supported_sasl_mechanisms = {
'PLAIN',
}
supported_capabilities = set() # Not exhaustive
def __init__(self):
super().__init__()
self.filename = next(tempfile._get_candidate_names()) + '.cfg'

View File

@ -42,7 +42,7 @@ def parse_message(s):
(tags, s) = s.split(' ', 1)
tags = parse_tags(tags[1:])
else:
tags = []
tags = {}
if ' :' in s:
(other_tokens, trailing_param) = s.split(' :', 1)
tokens = list(filter(bool, other_tokens.split(' '))) + [trailing_param]

193
irctest/numerics.py Normal file
View File

@ -0,0 +1,193 @@
# Copyright (c) 2012-2014 Jeremy Latt
# Copyright (c) 2014-2015 Edmund Huber
# Copyright (c) 2016-2017 Daniel Oaks <daniel@danieloaks.net>
# released under the MIT license
# These numerics have been retrieved from:
# http://defs.ircdocs.horse/ and http://modern.ircdocs.horse/
# They're intended to represent a relatively-standard cross-section of the IRC
# server ecosystem out there. Custom numerics will be marked as such.
RPL_WELCOME = "001"
RPL_YOURHOST = "002"
RPL_CREATED = "003"
RPL_MYINFO = "004"
RPL_ISUPPORT = "005"
RPL_SNOMASKIS = "008"
RPL_BOUNCE = "010"
RPL_TRACELINK = "200"
RPL_TRACECONNECTING = "201"
RPL_TRACEHANDSHAKE = "202"
RPL_TRACEUNKNOWN = "203"
RPL_TRACEOPERATOR = "204"
RPL_TRACEUSER = "205"
RPL_TRACESERVER = "206"
RPL_TRACESERVICE = "207"
RPL_TRACENEWTYPE = "208"
RPL_TRACECLASS = "209"
RPL_TRACERECONNECT = "210"
RPL_STATSLINKINFO = "211"
RPL_STATSCOMMANDS = "212"
RPL_ENDOFSTATS = "219"
RPL_UMODEIS = "221"
RPL_SERVLIST = "234"
RPL_SERVLISTEND = "235"
RPL_STATSUPTIME = "242"
RPL_STATSOLINE = "243"
RPL_LUSERCLIENT = "251"
RPL_LUSEROP = "252"
RPL_LUSERUNKNOWN = "253"
RPL_LUSERCHANNELS = "254"
RPL_LUSERME = "255"
RPL_ADMINME = "256"
RPL_ADMINLOC1 = "257"
RPL_ADMINLOC2 = "258"
RPL_ADMINEMAIL = "259"
RPL_TRACELOG = "261"
RPL_TRACEEND = "262"
RPL_TRYAGAIN = "263"
RPL_WHOISCERTFP = "276"
RPL_AWAY = "301"
RPL_USERHOST = "302"
RPL_ISON = "303"
RPL_UNAWAY = "305"
RPL_NOWAWAY = "306"
RPL_WHOISUSER = "311"
RPL_WHOISSERVER = "312"
RPL_WHOISOPERATOR = "313"
RPL_WHOWASUSER = "314"
RPL_ENDOFWHO = "315"
RPL_WHOISIDLE = "317"
RPL_ENDOFWHOIS = "318"
RPL_WHOISCHANNELS = "319"
RPL_LIST = "322"
RPL_LISTEND = "323"
RPL_CHANNELMODEIS = "324"
RPL_UNIQOPIS = "325"
RPL_CHANNELCREATED = "329"
RPL_WHOISACCOUNT = "330"
RPL_NOTOPIC = "331"
RPL_TOPIC = "332"
RPL_TOPICTIME = "333"
RPL_WHOISBOT = "335"
RPL_WHOISACTUALLY = "338"
RPL_INVITING = "341"
RPL_SUMMONING = "342"
RPL_INVITELIST = "346"
RPL_ENDOFINVITELIST = "347"
RPL_EXCEPTLIST = "348"
RPL_ENDOFEXCEPTLIST = "349"
RPL_VERSION = "351"
RPL_WHOREPLY = "352"
RPL_NAMREPLY = "353"
RPL_LINKS = "364"
RPL_ENDOFLINKS = "365"
RPL_ENDOFNAMES = "366"
RPL_BANLIST = "367"
RPL_ENDOFBANLIST = "368"
RPL_ENDOFWHOWAS = "369"
RPL_INFO = "371"
RPL_MOTD = "372"
RPL_ENDOFINFO = "374"
RPL_MOTDSTART = "375"
RPL_ENDOFMOTD = "376"
RPL_YOUREOPER = "381"
RPL_REHASHING = "382"
RPL_YOURESERVICE = "383"
RPL_TIME = "391"
RPL_USERSSTART = "392"
RPL_USERS = "393"
RPL_ENDOFUSERS = "394"
RPL_NOUSERS = "395"
ERR_UNKNOWNERROR = "400"
ERR_NOSUCHNICK = "401"
ERR_NOSUCHSERVER = "402"
ERR_NOSUCHCHANNEL = "403"
ERR_CANNOTSENDTOCHAN = "404"
ERR_TOOMANYCHANNELS = "405"
ERR_WASNOSUCHNICK = "406"
ERR_TOOMANYTARGETS = "407"
ERR_NOSUCHSERVICE = "408"
ERR_NOORIGIN = "409"
ERR_INVALIDCAPCMD = "410"
ERR_NORECIPIENT = "411"
ERR_NOTEXTTOSEND = "412"
ERR_NOTOPLEVEL = "413"
ERR_WILDTOPLEVEL = "414"
ERR_BADMASK = "415"
ERR_UNKNOWNCOMMAND = "421"
ERR_NOMOTD = "422"
ERR_NOADMININFO = "423"
ERR_FILEERROR = "424"
ERR_NONICKNAMEGIVEN = "431"
ERR_ERRONEUSNICKNAME = "432"
ERR_NICKNAMEINUSE = "433"
ERR_NICKCOLLISION = "436"
ERR_UNAVAILRESOURCE = "437"
ERR_REG_UNAVAILABLE = "440"
ERR_USERNOTINCHANNEL = "441"
ERR_NOTONCHANNEL = "442"
ERR_USERONCHANNEL = "443"
ERR_NOLOGIN = "444"
ERR_SUMMONDISABLED = "445"
ERR_USERSDISABLED = "446"
ERR_NOTREGISTERED = "451"
ERR_NEEDMOREPARAMS = "461"
ERR_ALREADYREGISTRED = "462"
ERR_NOPERMFORHOST = "463"
ERR_PASSWDMISMATCH = "464"
ERR_YOUREBANNEDCREEP = "465"
ERR_YOUWILLBEBANNED = "466"
ERR_KEYSET = "467"
ERR_INVALIDUSERNAME = "468"
ERR_CHANNELISFULL = "471"
ERR_UNKNOWNMODE = "472"
ERR_INVITEONLYCHAN = "473"
ERR_BANNEDFROMCHAN = "474"
ERR_BADCHANNELKEY = "475"
ERR_BADCHANMASK = "476"
ERR_NOCHANMODES = "477"
ERR_BANLISTFULL = "478"
ERR_NOPRIVILEGES = "481"
ERR_CHANOPRIVSNEEDED = "482"
ERR_CANTKILLSERVER = "483"
ERR_RESTRICTED = "484"
ERR_UNIQOPPRIVSNEEDED = "485"
ERR_NOOPERHOST = "491"
ERR_UMODEUNKNOWNFLAG = "501"
ERR_USERSDONTMATCH = "502"
ERR_HELPNOTFOUND = "524"
ERR_CANNOTSENDRP = "573"
RPL_WHOISSECURE = "671"
RPL_YOURLANGUAGESARE = "687"
RPL_WHOISLANGUAGE = "690"
RPL_HELPSTART = "704"
RPL_HELPTXT = "705"
RPL_ENDOFHELP = "706"
ERR_NOPRIVS = "723"
RPL_MONONLINE = "730"
RPL_MONOFFLINE = "731"
RPL_MONLIST = "732"
RPL_ENDOFMONLIST = "733"
ERR_MONLISTFULL = "734"
RPL_LOGGEDIN = "900"
RPL_LOGGEDOUT = "901"
ERR_NICKLOCKED = "902"
RPL_SASLSUCCESS = "903"
ERR_SASLFAIL = "904"
ERR_SASLTOOLONG = "905"
ERR_SASLABORTED = "906"
ERR_SASLALREADY = "907"
RPL_SASLMECHS = "908"
RPL_REGISTRATION_SUCCESS = "920"
ERR_ACCOUNT_ALREADY_EXISTS = "921"
ERR_REG_UNSPECIFIED_ERROR = "922"
RPL_VERIFYSUCCESS = "923"
ERR_ACCOUNT_ALREADY_VERIFIED = "924"
ERR_ACCOUNT_INVALID_VERIFY_CODE = "925"
RPL_REG_VERIFICATION_REQUIRED = "927"
ERR_REG_INVALID_CRED_TYPE = "928"
ERR_REG_INVALID_CALLBACK = "929"
ERR_TOOMANYLANGUAGES = "981"
ERR_NOLANGUAGE = "982"

View File

@ -19,6 +19,10 @@ class OptionalSaslMechanismNotSupported(unittest.SkipTest):
def __str__(self):
return 'Unsupported SASL mechanism: {}'.format(self.args[0])
class CapabilityNotSupported(unittest.SkipTest):
def __str__(self):
return 'Unsupported capability: {}'.format(self.args[0])
class NotRequiredBySpecifications(unittest.SkipTest):
def __str__(self):
return 'Tests not required by the set of tested specification(s).'

View File

@ -3,8 +3,6 @@
"""
from irctest import cases
from irctest.client_mock import NoMessageException
from irctest.basecontrollers import NotImplementedByController
class AccountTagTestCase(cases.BaseServerTestCase, cases.OptionalityHelper):
def connectRegisteredClient(self, nick):

View File

@ -1,5 +1,4 @@
from irctest import cases
from irctest.irc_utils.message_parser import Message
class CapTestCase(cases.BaseServerTestCase):
@cases.SpecificationSelector.requiredBySpecification('IRCv3.1')

View File

@ -7,7 +7,7 @@ from irctest import cases
from irctest import client_mock
from irctest import runner
from irctest.irc_utils import ambiguities
from irctest.irc_utils.message_parser import Message
from irctest.numerics import RPL_NOTOPIC, RPL_NAMREPLY, RPL_INVITING, ERR_NOSUCHCHANNEL, ERR_NOTONCHANNEL, ERR_CHANOPRIVSNEEDED, ERR_NOSUCHNICK, ERR_INVITEONLYCHAN
class JoinTestCase(cases.BaseServerTestCase):
@cases.SpecificationSelector.requiredBySpecification('RFC1459', 'RFC2812',
@ -139,6 +139,30 @@ class JoinTestCase(cases.BaseServerTestCase):
'"foo" with an optional "+" or "@" prefix, but got: '
'{msg}')
@cases.SpecificationSelector.requiredBySpecification('RFC1459', 'RFC2812')
def testNormalPart(self):
self.connectClient('bar')
self.sendLine(1, 'JOIN #chan')
m = self.getMessage(1)
self.assertMessageEqual(m, command='JOIN', params=['#chan'])
self.connectClient('baz')
self.sendLine(2, 'JOIN #chan')
m = self.getMessage(2)
self.assertMessageEqual(m, command='JOIN', params=['#chan'])
# skip the rest of the JOIN burst:
self.getMessages(1)
self.getMessages(2)
self.sendLine(1, 'PART #chan :bye everyone')
# both the PART'ing client and the other channel member should receive a PART line:
m = self.getMessage(1)
self.assertMessageEqual(m, command='PART', params=['#chan', 'bye everyone'])
m = self.getMessage(2)
self.assertMessageEqual(m, command='PART', params=['#chan', 'bye everyone'])
@cases.SpecificationSelector.requiredBySpecification('RFC1459', 'RFC2812')
def testTopic(self):
"""“Once a user has joined a channel, he receives information about
@ -148,14 +172,17 @@ class JoinTestCase(cases.BaseServerTestCase):
and <https://tools.ietf.org/html/rfc2812#section-3.2.1>
"""
self.connectClient('foo')
self.joinChannel(1, '#chan')
self.connectClient('bar')
self.sendLine(1, 'JOIN #chan')
self.sendLine(2, 'JOIN #chan')
# TODO: check foo is opped OR +t is unset
self.joinChannel(2, '#chan')
self.getMessages(1)
self.getMessages(2)
self.getMessages(1)
self.getMessages(2)
# TODO: check foo is opped OR +t is unset
self.sendLine(1, 'TOPIC #chan :T0P1C')
try:
@ -181,14 +208,15 @@ class JoinTestCase(cases.BaseServerTestCase):
and <https://tools.ietf.org/html/rfc2812#section-3.2.1>
"""
self.connectClient('foo')
self.joinChannel(1, '#chan')
self.connectClient('bar')
self.sendLine(1, 'JOIN #chan')
self.sendLine(2, 'JOIN #chan')
# TODO: check foo is opped
self.joinChannel(2, '#chan')
self.getMessages(1)
self.getMessages(2)
self.getMessages(1)
# TODO: check foo is opped
self.sendLine(1, 'MODE #chan +t')
try:
@ -220,7 +248,55 @@ class JoinTestCase(cases.BaseServerTestCase):
m = self.getMessage(1)
self.assertMessageEqual(m, command='TOPIC', params=['#chan', 'T0P1C'])
@cases.SpecificationSelector.requiredBySpecification('RFC2812')
def testTopicNonexistentChannel(self):
"""RFC2812 specifies ERR_NOTONCHANNEL as the correct response to TOPIC
on a nonexistent channel. The modern spec prefers ERR_NOSUCHCHANNEL.
<https://tools.ietf.org/html/rfc2812#section-3.2.4>
<http://modern.ircdocs.horse/#topic-message>
"""
self.connectClient('foo')
self.sendLine(1, 'TOPIC #chan')
m = self.getMessage(1)
# either 403 ERR_NOSUCHCHANNEL or 443 ERR_NOTONCHANNEL
self.assertIn(m.command, ('403', '443'))
@cases.SpecificationSelector.requiredBySpecification('RFC2812')
def testUnsetTopicResponses(self):
"""Test various cases related to RPL_NOTOPIC with set and unset topics."""
self.connectClient('bar')
self.sendLine(1, 'JOIN #test')
messages = self.getMessages(1)
# shouldn't send RPL_NOTOPIC for a new channel
self.assertNotIn(RPL_NOTOPIC, [m.command for m in messages])
self.connectClient('baz')
self.sendLine(2, 'JOIN #test')
messages = self.getMessages(2)
# topic is still unset, shouldn't send RPL_NOTOPIC on initial join
self.assertNotIn(RPL_NOTOPIC, [m.command for m in messages])
self.sendLine(2, 'TOPIC #test')
messages = self.getMessages(2)
# explicit TOPIC should receive RPL_NOTOPIC
self.assertIn(RPL_NOTOPIC, [m.command for m in messages])
self.sendLine(1, 'TOPIC #test :new topic')
self.getMessages(1)
# client 2 should get the new TOPIC line
messages = [message for message in self.getMessages(2) if message.command == 'TOPIC']
self.assertEqual(len(messages), 1)
self.assertEqual(messages[0].params, ['#test', 'new topic'])
# unset the topic:
self.sendLine(1, 'TOPIC #test :')
self.getMessages(1)
self.connectClient('qux')
self.sendLine(3, 'join #test')
messages = self.getMessages(3)
# topic is once again unset, shouldn't send RPL_NOTOPIC on initial join
self.assertNotIn(RPL_NOTOPIC, [m.command for m in messages])
@cases.SpecificationSelector.requiredBySpecification('RFC1459', 'RFC2812')
def testListEmpty(self):
@ -282,12 +358,15 @@ class JoinTestCase(cases.BaseServerTestCase):
and <https://tools.ietf.org/html/rfc2812#section-3.2.1>
"""
self.connectClient('foo')
self.joinChannel(1, '#chan')
self.connectClient('bar')
self.joinChannel(2, '#chan')
self.connectClient('baz')
self.sendLine(1, 'JOIN #chan')
self.joinChannel(3, '#chan')
# TODO: check foo is an operator
self.sendLine(2, 'JOIN #chan')
self.sendLine(3, 'JOIN #chan')
import time
time.sleep(0.1)
@ -311,6 +390,61 @@ class JoinTestCase(cases.BaseServerTestCase):
self.assertMessageEqual(m, command='KICK',
params=['#chan', 'bar', 'bye'])
@cases.SpecificationSelector.requiredBySpecification('RFC2812')
def testKickPrivileges(self):
"""Test who has the ability to kick / what error codes are sent for invalid kicks."""
self.connectClient('foo')
self.sendLine(1, 'JOIN #chan')
self.getMessages(1)
self.connectClient('bar')
self.sendLine(2, 'JOIN #chan')
messages = self.getMessages(2)
names = set()
for message in messages:
if message.command == RPL_NAMREPLY:
names.update(set(message.params[-1].split()))
# assert foo is opped
self.assertIn('@foo', names, f'unexpected names: {names}')
self.connectClient('baz')
self.sendLine(3, 'KICK #chan bar')
replies = set(m.command for m in self.getMessages(3))
self.assertTrue(
ERR_NOTONCHANNEL in replies or ERR_CHANOPRIVSNEEDED in replies or ERR_NOSUCHCHANNEL in replies,
f'did not receive acceptable error code for kick from outside channel: {replies}')
self.joinChannel(3, '#chan')
self.getMessages(3)
self.sendLine(3, 'KICK #chan bar')
replies = set(m.command for m in self.getMessages(3))
# now we're a channel member so we should receive ERR_CHANOPRIVSNEEDED
self.assertIn(ERR_CHANOPRIVSNEEDED, replies)
self.sendLine(1, 'MODE #chan +o baz')
self.getMessages(1)
# should be able to kick an unprivileged user:
self.sendLine(3, 'KICK #chan bar')
# should be able to kick an operator:
self.sendLine(3, 'KICK #chan foo')
baz_replies = set(m.command for m in self.getMessages(3))
self.assertNotIn(ERR_CHANOPRIVSNEEDED, baz_replies)
kick_targets = [m.params[1] for m in self.getMessages(1) if m.command == 'KICK']
# foo should see bar and foo being kicked
self.assertTrue(any(target.startswith('foo') for target in kick_targets), f'unexpected kick targets: {kick_targets}')
self.assertTrue(any(target.startswith('bar') for target in kick_targets), f'unexpected kick targets: {kick_targets}')
@cases.SpecificationSelector.requiredBySpecification('RFC2812')
def testKickNonexistentChannel(self):
"""“Kick command [...] Numeric replies: [...] ERR_NOSUCHCHANNEL."""
self.connectClient('foo')
self.sendLine(1, 'KICK #chan nick')
m = self.getMessage(1)
# should return ERR_NOSUCHCHANNEL
self.assertMessageEqual(m, command='403')
@cases.SpecificationSelector.requiredBySpecification('RFC2812')
def testDoubleKickMessages(self):
"""“The server MUST NOT send KICK messages with multiple channels or
@ -319,24 +453,24 @@ class JoinTestCase(cases.BaseServerTestCase):
-- https://tools.ietf.org/html/rfc2812#section-3.2.8
"""
self.connectClient('foo')
self.joinChannel(1, '#chan')
self.connectClient('bar')
self.joinChannel(2, '#chan')
self.connectClient('baz')
self.joinChannel(3, '#chan')
self.connectClient('qux')
self.sendLine(1, 'JOIN #chan')
self.joinChannel(4, '#chan')
# TODO: check foo is an operator
self.sendLine(2, 'JOIN #chan')
self.sendLine(3, 'JOIN #chan')
self.sendLine(4, 'JOIN #chan')
# Synchronize
self.getMessages(1)
self.getMessages(2)
self.getMessages(3)
self.getMessages(4)
self.getMessages(1)
self.getMessages(2)
self.getMessages(3)
self.getMessages(4)
self.sendLine(1, 'KICK #chan,#chan bar,baz :bye')
try:
@ -413,12 +547,14 @@ class JoinTestCase(cases.BaseServerTestCase):
'got this instead: {msg}')
class testChannelCaseSensitivity(cases.BaseServerTestCase):
def _testChannelsEquivalent(name1, name2):
def _testChannelsEquivalent(casemapping, name1, name2):
@cases.SpecificationSelector.requiredBySpecification('RFC1459', 'RFC2812',
strict=True)
def f(self):
self.connectClient('foo')
self.connectClient('bar')
if self.server_support['CASEMAPPING'] != casemapping:
raise runner.NotImplementedByController('Casemapping {} not implemented'.format(casemapping))
self.joinClient(1, name1)
self.joinClient(2, name2)
try:
@ -431,12 +567,14 @@ class testChannelCaseSensitivity(cases.BaseServerTestCase):
.format(name1, name2))
f.__name__ = 'testEquivalence__{}__{}'.format(name1, name2)
return f
def _testChannelsNotEquivalent(name1, name2):
def _testChannelsNotEquivalent(casemapping, name1, name2):
@cases.SpecificationSelector.requiredBySpecification('RFC1459', 'RFC2812',
strict=True)
def f(self):
self.connectClient('foo')
self.connectClient('bar')
if self.server_support['CASEMAPPING'] != casemapping:
raise runner.NotImplementedByController('Casemapping {} not implemented'.format(casemapping))
self.joinClient(1, name1)
self.joinClient(2, name2)
try:
@ -452,7 +590,66 @@ class testChannelCaseSensitivity(cases.BaseServerTestCase):
f.__name__ = 'testEquivalence__{}__{}'.format(name1, name2)
return f
testSimpleEquivalent = _testChannelsEquivalent('#Foo', '#foo')
testSimpleNotEquivalent = _testChannelsNotEquivalent('#Foo', '#fooa')
testFancyEquivalent = _testChannelsEquivalent('#F]|oo{', '#f}\\oo[')
testFancyNotEquivalent = _testChannelsEquivalent('#F}o\\o[', '#f]o|o{')
testAsciiSimpleEquivalent = _testChannelsEquivalent('ascii', '#Foo', '#foo')
testAsciiSimpleNotEquivalent = _testChannelsNotEquivalent('ascii', '#Foo', '#fooa')
testRfcSimpleEquivalent = _testChannelsEquivalent('rfc1459', '#Foo', '#foo')
testRfcSimpleNotEquivalent = _testChannelsNotEquivalent('rfc1459', '#Foo', '#fooa')
testRfcFancyEquivalent = _testChannelsEquivalent('rfc1459', '#F]|oo{', '#f}\\oo[')
testRfcFancyNotEquivalent = _testChannelsEquivalent('rfc1459', '#F}o\\o[', '#f]o|o{')
class InviteTestCase(cases.BaseServerTestCase):
@cases.SpecificationSelector.requiredBySpecification('IRCv3.2')
def testInvites(self):
"""Test some basic functionality related to INVITE and the +i mode."""
self.connectClient('foo')
self.joinChannel(1, '#chan')
self.sendLine(1, 'MODE #chan +i')
self.getMessages(1)
self.sendLine(1, 'INVITE bar #chan')
m = self.getMessage(1)
self.assertEqual(m.command, ERR_NOSUCHNICK)
self.connectClient('bar')
self.sendLine(2, 'JOIN #chan')
m = self.getMessage(2)
self.assertEqual(m.command, ERR_INVITEONLYCHAN)
self.sendLine(1, 'INVITE bar #chan')
m = self.getMessage(1)
self.assertEqual(m.command, RPL_INVITING)
# modern/ircv3 param order: inviter, invitee, channel
self.assertEqual(m.params, ['foo', 'bar', '#chan'])
m = self.getMessage(2)
self.assertEqual(m.command, 'INVITE')
self.assertTrue(m.prefix.startswith("foo")) # nickmask of inviter
self.assertEqual(m.params, ['bar', '#chan'])
# we were invited, so join should succeed now
self.joinChannel(2, '#chan')
class ChannelQuitTestCase(cases.BaseServerTestCase):
@cases.SpecificationSelector.requiredBySpecification('RFC2812')
def testQuit(self):
"""“Once a user has joined a channel, he receives information about
all commands his server receives affecting the channel. This
includes [...] QUIT”
<https://tools.ietf.org/html/rfc2812#section-3.2.1>
"""
self.connectClient('bar')
self.joinChannel(1, '#chan')
self.connectClient('qux')
self.sendLine(2, 'JOIN #chan')
self.getMessages(2)
self.getMessages(1)
self.sendLine(2, 'QUIT :qux out')
self.getMessages(2)
m = self.getMessage(1)
self.assertEqual(m.command, 'QUIT')
self.assertTrue(m.prefix.startswith('qux')) # nickmask of quitter
self.assertIn('qux out', m.params[0])

View File

@ -4,9 +4,6 @@ Tests section 4.1 of RFC 1459.
"""
from irctest import cases
from irctest import authentication
from irctest.irc_utils.message_parser import Message
from irctest.basecontrollers import NotImplementedByController
from irctest.client_mock import ConnectionClosed
class PasswordedConnectionRegistrationTestCase(cases.BaseServerTestCase):
@ -31,6 +28,16 @@ class PasswordedConnectionRegistrationTestCase(cases.BaseServerTestCase):
self.assertNotEqual(m.command, '001',
msg='Got 001 after NICK+USER but missing PASS')
@cases.SpecificationSelector.requiredBySpecification('RFC1459', 'RFC2812')
def testWrongPassword(self):
self.addClient()
self.sendLine(1, 'PASS {}'.format(self.password + "garbage"))
self.sendLine(1, 'NICK foo')
self.sendLine(1, 'USER username * * :Realname')
m = self.getRegistrationMessage(1)
self.assertNotEqual(m.command, '001',
msg='Got 001 after NICK+USER but incorrect PASS')
@cases.SpecificationSelector.requiredBySpecification('RFC1459', 'RFC2812')
def testPassAfterNickuser(self):
"""“The password can and must be set before any attempt to register

View File

@ -22,7 +22,7 @@ class EchoMessageTestCase(cases.BaseServerTestCase):
# TODO: check also without this
self.sendLine(1, 'CAP REQ :echo-message{}'.format(
' server-time' if server_time else ''))
m = self.getRegistrationMessage(1)
self.getRegistrationMessage(1)
# TODO: Remove this one the trailing space issue is fixed in Charybdis
# and Mammon:
#self.assertMessageEqual(m, command='CAP',
@ -38,7 +38,7 @@ class EchoMessageTestCase(cases.BaseServerTestCase):
self.sendLine(1, 'JOIN #chan')
if not solo:
capabilities = ['server-time'] if server_time else []
capabilities = ['server-time'] if server_time else None
self.connectClient('qux', capabilities=capabilities)
self.sendLine(2, 'JOIN #chan')
@ -67,10 +67,8 @@ class EchoMessageTestCase(cases.BaseServerTestCase):
'messages differ: {} {}',
extra_format=(m1, m2))
if server_time:
self.assertEqual(m1.tags, m2.tags,
fail_msg='Tags of forwarded and echoed '
'messages differ: {} {}',
extra_format=(m1, m2))
self.assertIn('time', m1.tags, fail_msg='Echoed message is missing server time: {}', extra_format=(m1,))
self.assertIn('time', m2.tags, fail_msg='Forwarded message is missing server time: {}', extra_format=(m2,))
return f
testEchoMessagePrivmsgNoServerTime = _testEchoMessage('PRIVMSG', False, False)

View File

@ -3,7 +3,6 @@
"""
from irctest import cases
from irctest.irc_utils.message_parser import Message
class MetadataTestCase(cases.BaseServerTestCase, cases.OptionalityHelper):
def connectRegisteredClient(self, nick):
@ -29,10 +28,9 @@ class MetadataTestCase(cases.BaseServerTestCase, cases.OptionalityHelper):
def testNotLoggedIn(self):
self.connectClient('foo', capabilities=['extended-join'],
skip_if_cap_nak=True)
self.sendLine(1, 'JOIN #chan')
self.getMessages(1)
self.joinChannel(1, '#chan')
self.connectClient('bar')
self.sendLine(2, 'JOIN #chan')
self.joinChannel(2, '#chan')
m = self.getMessage(1)
self.assertMessageEqual(m, command='JOIN',
params=['#chan', '*', 'Realname'],
@ -41,16 +39,15 @@ class MetadataTestCase(cases.BaseServerTestCase, cases.OptionalityHelper):
@cases.SpecificationSelector.requiredBySpecification('IRCv3.1')
@cases.OptionalityHelper.skipUnlessHasMechanism('PLAIN')
def testNotLoggedIn(self):
def testLoggedIn(self):
self.connectClient('foo', capabilities=['extended-join'],
skip_if_cap_nak=True)
self.sendLine(1, 'JOIN #chan')
self.getMessages(1)
self.joinChannel(1, '#chan')
self.controller.registerUser(self, 'jilles', 'sesame')
self.connectRegisteredClient('bar')
self.joinChannel(2, '#chan')
self.sendLine(2, 'JOIN #chan')
m = self.getMessage(1)
self.assertMessageEqual(m, command='JOIN',
params=['#chan', 'jilles', 'Realname'],

View File

@ -0,0 +1,234 @@
"""
<https://ircv3.net/specs/extensions/labeled-response.html>
"""
from irctest import cases
from irctest.basecontrollers import NotImplementedByController
class LabeledResponsesTestCase(cases.BaseServerTestCase, cases.OptionalityHelper):
@cases.SpecificationSelector.requiredBySpecification('IRCv3.2')
def testLabeledPrivmsgResponsesToMultipleClients(self):
self.connectClient('foo', capabilities=['batch', 'echo-message', 'draft/labeled-response'], skip_if_cap_nak=True)
self.getMessages(1)
self.connectClient('bar', capabilities=['batch', 'echo-message', 'draft/labeled-response'], skip_if_cap_nak=True)
self.getMessages(2)
self.connectClient('carl', capabilities=['batch', 'echo-message', 'draft/labeled-response'], skip_if_cap_nak=True)
self.getMessages(3)
self.connectClient('alice', capabilities=['batch', 'echo-message', 'draft/labeled-response'], skip_if_cap_nak=True)
self.getMessages(4)
self.sendLine(1, '@draft/label=12345 PRIVMSG bar,carl,alice :hi')
m = self.getMessage(1)
m2 = self.getMessage(2)
m3 = self.getMessage(3)
m4 = self.getMessage(4)
# ensure the label isn't sent to recipients
self.assertMessageEqual(m2, command='PRIVMSG', fail_msg='No PRIVMSG received by target 1 after sending one out')
self.assertNotIn('draft/label', m2.tags, m2, fail_msg="When sending a PRIVMSG with a label, the target users shouldn't receive the label (only the sending user should): {msg}")
self.assertMessageEqual(m3, command='PRIVMSG', fail_msg='No PRIVMSG received by target 1 after sending one out')
self.assertNotIn('draft/label', m3.tags, m3, fail_msg="When sending a PRIVMSG with a label, the target users shouldn't receive the label (only the sending user should): {msg}")
self.assertMessageEqual(m4, command='PRIVMSG', fail_msg='No PRIVMSG received by target 1 after sending one out')
self.assertNotIn('draft/label', m4.tags, m4, fail_msg="When sending a PRIVMSG with a label, the target users shouldn't receive the label (only the sending user should): {msg}")
self.assertMessageEqual(m, command='BATCH', fail_msg='No BATCH echo received after sending one out')
@cases.SpecificationSelector.requiredBySpecification('IRCv3.2')
def testLabeledPrivmsgResponsesToClient(self):
self.connectClient('foo', capabilities=['batch', 'echo-message', 'draft/labeled-response'], skip_if_cap_nak=True)
self.getMessages(1)
self.connectClient('bar', capabilities=['batch', 'echo-message', 'draft/labeled-response'], skip_if_cap_nak=True)
self.getMessages(2)
self.sendLine(1, '@draft/label=12345 PRIVMSG bar :hi')
m = self.getMessage(1)
m2 = self.getMessage(2)
# ensure the label isn't sent to recipient
self.assertMessageEqual(m2, command='PRIVMSG', fail_msg='No PRIVMSG received by the target after sending one out')
self.assertNotIn('draft/label', m2.tags, m2, fail_msg="When sending a PRIVMSG with a label, the target user shouldn't receive the label (only the sending user should): {msg}")
self.assertMessageEqual(m, command='PRIVMSG', fail_msg='No PRIVMSG echo received after sending one out')
self.assertIn('draft/label', m.tags, m, fail_msg="When sending a PRIVMSG with a label, the echo'd message didn't contain the label at all: {msg}")
self.assertEqual(m.tags['draft/label'], '12345', m, fail_msg="Echo'd PRIVMSG to a client did not contain the same label we sent it with(should be '12345'): {msg}")
@cases.SpecificationSelector.requiredBySpecification('IRCv3.2')
def testLabeledPrivmsgResponsesToChannel(self):
self.connectClient('foo', capabilities=['batch', 'echo-message', 'draft/labeled-response'], skip_if_cap_nak=True)
self.getMessages(1)
self.connectClient('bar', capabilities=['batch', 'echo-message', 'draft/labeled-response'], skip_if_cap_nak=True)
self.getMessages(2)
# join channels
self.sendLine(1, 'JOIN #test')
self.getMessages(1)
self.sendLine(2, 'JOIN #test')
self.getMessages(2)
self.getMessages(1)
self.sendLine(1, '@draft/label=12345;+draft/reply=123;+draft/react=l😃l PRIVMSG #test :hi')
ms = self.getMessage(1)
mt = self.getMessage(2)
# ensure the label isn't sent to recipient
self.assertMessageEqual(mt, command='PRIVMSG', fail_msg='No PRIVMSG received by the target after sending one out')
self.assertNotIn('draft/label', mt.tags, mt, fail_msg="When sending a PRIVMSG with a label, the target user shouldn't receive the label (only the sending user should): {msg}")
# ensure sender correctly receives msg
self.assertMessageEqual(ms, command='PRIVMSG', fail_msg="Got a message back that wasn't a PRIVMSG")
self.assertIn('draft/label', ms.tags, ms, fail_msg="When sending a PRIVMSG with a label, the source user should receive the label but didn't: {msg}")
self.assertEqual(ms.tags['draft/label'], '12345', ms, fail_msg="Echo'd label doesn't match the label we sent (should be '12345'): {msg}")
@cases.SpecificationSelector.requiredBySpecification('IRCv3.2')
def testLabeledPrivmsgResponsesToSelf(self):
self.connectClient('foo', capabilities=['batch', 'echo-message', 'draft/labeled-response'], skip_if_cap_nak=True)
self.getMessages(1)
self.sendLine(1, '@draft/label=12345 PRIVMSG foo :hi')
m1 = self.getMessage(1)
m2 = self.getMessage(1)
number_of_labels = 0
for m in [m1, m2]:
self.assertMessageEqual(m, command='PRIVMSG', fail_msg="Got a message back that wasn't a PRIVMSG")
if 'draft/label' in m.tags:
number_of_labels += 1
self.assertEqual(m.tags['draft/label'], '12345', m, fail_msg="Echo'd label doesn't match the label we sent (should be '12345'): {msg}")
self.assertEqual(number_of_labels, 1, m1, fail_msg="When sending a PRIVMSG to self with echo-message, we only expect one message to contain the label. Instead, {} messages had the label".format(number_of_labels))
@cases.SpecificationSelector.requiredBySpecification('IRCv3.2')
def testLabeledNoticeResponsesToClient(self):
self.connectClient('foo', capabilities=['batch', 'echo-message', 'draft/labeled-response'], skip_if_cap_nak=True)
self.getMessages(1)
self.connectClient('bar', capabilities=['batch', 'echo-message', 'draft/labeled-response'], skip_if_cap_nak=True)
self.getMessages(2)
self.sendLine(1, '@draft/label=12345 NOTICE bar :hi')
m = self.getMessage(1)
m2 = self.getMessage(2)
# ensure the label isn't sent to recipient
self.assertMessageEqual(m2, command='NOTICE', fail_msg='No NOTICE received by the target after sending one out')
self.assertNotIn('draft/label', m2.tags, m2, fail_msg="When sending a NOTICE with a label, the target user shouldn't receive the label (only the sending user should): {msg}")
self.assertMessageEqual(m, command='NOTICE', fail_msg='No NOTICE echo received after sending one out')
self.assertIn('draft/label', m.tags, m, fail_msg="When sending a NOTICE with a label, the echo'd message didn't contain the label at all: {msg}")
self.assertEqual(m.tags['draft/label'], '12345', m, fail_msg="Echo'd NOTICE to a client did not contain the same label we sent it with(should be '12345'): {msg}")
@cases.SpecificationSelector.requiredBySpecification('IRCv3.2')
def testLabeledNoticeResponsesToChannel(self):
self.connectClient('foo', capabilities=['batch', 'echo-message', 'draft/labeled-response'], skip_if_cap_nak=True)
self.getMessages(1)
self.connectClient('bar', capabilities=['batch', 'echo-message', 'draft/labeled-response'], skip_if_cap_nak=True)
self.getMessages(2)
# join channels
self.sendLine(1, 'JOIN #test')
self.getMessages(1)
self.sendLine(2, 'JOIN #test')
self.getMessages(2)
self.getMessages(1)
self.sendLine(1, '@draft/label=12345;+draft/reply=123;+draft/react=l😃l NOTICE #test :hi')
ms = self.getMessage(1)
mt = self.getMessage(2)
# ensure the label isn't sent to recipient
self.assertMessageEqual(mt, command='NOTICE', fail_msg='No NOTICE received by the target after sending one out')
self.assertNotIn('draft/label', mt.tags, mt, fail_msg="When sending a NOTICE with a label, the target user shouldn't receive the label (only the sending user should): {msg}")
# ensure sender correctly receives msg
self.assertMessageEqual(ms, command='NOTICE', fail_msg="Got a message back that wasn't a NOTICE")
self.assertIn('draft/label', ms.tags, ms, fail_msg="When sending a NOTICE with a label, the source user should receive the label but didn't: {msg}")
self.assertEqual(ms.tags['draft/label'], '12345', ms, fail_msg="Echo'd label doesn't match the label we sent (should be '12345'): {msg}")
@cases.SpecificationSelector.requiredBySpecification('IRCv3.2')
def testLabeledNoticeResponsesToSelf(self):
self.connectClient('foo', capabilities=['batch', 'echo-message', 'draft/labeled-response'], skip_if_cap_nak=True)
self.getMessages(1)
self.sendLine(1, '@draft/label=12345 NOTICE foo :hi')
m1 = self.getMessage(1)
m2 = self.getMessage(1)
number_of_labels = 0
for m in [m1, m2]:
self.assertMessageEqual(m, command='NOTICE', fail_msg="Got a message back that wasn't a NOTICE")
if 'draft/label' in m.tags:
number_of_labels += 1
self.assertEqual(m.tags['draft/label'], '12345', m, fail_msg="Echo'd label doesn't match the label we sent (should be '12345'): {msg}")
self.assertEqual(number_of_labels, 1, m1, fail_msg="When sending a NOTICE to self with echo-message, we only expect one message to contain the label. Instead, {} messages had the label".format(number_of_labels))
@cases.SpecificationSelector.requiredBySpecification('IRCv3.2')
def testLabeledTagMsgResponsesToClient(self):
self.connectClient('foo', capabilities=['batch', 'echo-message', 'draft/labeled-response', 'draft/message-tags-0.2'], skip_if_cap_nak=True)
self.getMessages(1)
self.connectClient('bar', capabilities=['batch', 'echo-message', 'draft/labeled-response', 'draft/message-tags-0.2'], skip_if_cap_nak=True)
self.getMessages(2)
self.sendLine(1, '@draft/label=12345;+draft/reply=123;+draft/react=l😃l TAGMSG bar')
m = self.getMessage(1)
m2 = self.getMessage(2)
# ensure the label isn't sent to recipient
self.assertMessageEqual(m2, command='TAGMSG', fail_msg='No TAGMSG received by the target after sending one out')
self.assertNotIn('draft/label', m2.tags, m2, fail_msg="When sending a TAGMSG with a label, the target user shouldn't receive the label (only the sending user should): {msg}")
self.assertIn('+draft/reply', m2.tags, m2, fail_msg="Reply tag wasn't present on the target user's TAGMSG: {msg}")
self.assertEqual(m2.tags['+draft/reply'], '123', m2, fail_msg="Reply tag wasn't the same on the target user's TAGMSG: {msg}")
self.assertIn('+draft/react', m2.tags, m2, fail_msg="React tag wasn't present on the target user's TAGMSG: {msg}")
self.assertEqual(m2.tags['+draft/react'], 'l😃l', m2, fail_msg="React tag wasn't the same on the target user's TAGMSG: {msg}")
self.assertMessageEqual(m, command='TAGMSG', fail_msg='No TAGMSG echo received after sending one out')
self.assertIn('draft/label', m.tags, m, fail_msg="When sending a TAGMSG with a label, the echo'd message didn't contain the label at all: {msg}")
self.assertEqual(m.tags['draft/label'], '12345', m, fail_msg="Echo'd TAGMSG to a client did not contain the same label we sent it with(should be '12345'): {msg}")
self.assertIn('+draft/reply', m.tags, m, fail_msg="Reply tag wasn't present on the source user's TAGMSG: {msg}")
self.assertEqual(m2.tags['+draft/reply'], '123', m, fail_msg="Reply tag wasn't the same on the source user's TAGMSG: {msg}")
self.assertIn('+draft/react', m.tags, m, fail_msg="React tag wasn't present on the source user's TAGMSG: {msg}")
self.assertEqual(m2.tags['+draft/react'], 'l😃l', m, fail_msg="React tag wasn't the same on the source user's TAGMSG: {msg}")
@cases.SpecificationSelector.requiredBySpecification('IRCv3.2')
def testLabeledTagMsgResponsesToChannel(self):
self.connectClient('foo', capabilities=['batch', 'echo-message', 'draft/labeled-response', 'draft/message-tags-0.2'], skip_if_cap_nak=True)
self.getMessages(1)
self.connectClient('bar', capabilities=['batch', 'echo-message', 'draft/labeled-response', 'draft/message-tags-0.2'], skip_if_cap_nak=True)
self.getMessages(2)
# join channels
self.sendLine(1, 'JOIN #test')
self.getMessages(1)
self.sendLine(2, 'JOIN #test')
self.getMessages(2)
self.getMessages(1)
self.sendLine(1, '@draft/label=12345;+draft/reply=123;+draft/react=l😃l TAGMSG #test')
ms = self.getMessage(1)
mt = self.getMessage(2)
# ensure the label isn't sent to recipient
self.assertMessageEqual(mt, command='TAGMSG', fail_msg='No TAGMSG received by the target after sending one out')
self.assertNotIn('draft/label', mt.tags, mt, fail_msg="When sending a TAGMSG with a label, the target user shouldn't receive the label (only the sending user should): {msg}")
# ensure sender correctly receives msg
self.assertMessageEqual(ms, command='TAGMSG', fail_msg="Got a message back that wasn't a TAGMSG")
self.assertIn('draft/label', ms.tags, ms, fail_msg="When sending a TAGMSG with a label, the source user should receive the label but didn't: {msg}")
self.assertEqual(ms.tags['draft/label'], '12345', ms, fail_msg="Echo'd label doesn't match the label we sent (should be '12345'): {msg}")
@cases.SpecificationSelector.requiredBySpecification('IRCv3.2')
def testLabeledTagMsgResponsesToSelf(self):
self.connectClient('foo', capabilities=['batch', 'echo-message', 'draft/labeled-response', 'draft/message-tags-0.2'], skip_if_cap_nak=True)
self.getMessages(1)
self.sendLine(1, '@draft/label=12345;+draft/reply=123;+draft/react=l😃l TAGMSG foo')
m1 = self.getMessage(1)
m2 = self.getMessage(1)
number_of_labels = 0
for m in [m1, m2]:
self.assertMessageEqual(m, command='TAGMSG', fail_msg="Got a message back that wasn't a TAGMSG")
if 'draft/label' in m.tags:
number_of_labels += 1
self.assertEqual(m.tags['draft/label'], '12345', m, fail_msg="Echo'd label doesn't match the label we sent (should be '12345'): {msg}")
self.assertEqual(number_of_labels, 1, m1, fail_msg="When sending a TAGMSG to self with echo-message, we only expect one message to contain the label. Instead, {} messages had the label".format(number_of_labels))

View File

@ -0,0 +1,63 @@
"""
Section 3.2 of RFC 2812
<https://tools.ietf.org/html/rfc2812#section-3.3>
"""
from irctest import cases
class PrivmsgTestCase(cases.BaseServerTestCase):
@cases.SpecificationSelector.requiredBySpecification('RFC1459', 'RFC2812')
def testPrivmsg(self):
"""<https://tools.ietf.org/html/rfc2812#section-3.3.1>"""
self.connectClient('foo')
self.sendLine(1, 'JOIN #chan')
self.connectClient('bar')
self.sendLine(2, 'JOIN #chan')
self.getMessages(2) # synchronize
self.sendLine(1, 'PRIVMSG #chan :hello there')
self.getMessages(1) # synchronize
pms = [msg for msg in self.getMessages(2) if msg.command == 'PRIVMSG']
self.assertEqual(len(pms), 1)
self.assertMessageEqual(
pms[0],
command='PRIVMSG',
params=['#chan', 'hello there']
)
@cases.SpecificationSelector.requiredBySpecification('RFC1459', 'RFC2812')
def testPrivmsgNonexistentChannel(self):
"""<https://tools.ietf.org/html/rfc2812#section-3.3.1>"""
self.connectClient('foo')
self.sendLine(1, 'PRIVMSG #nonexistent :hello there')
msg = self.getMessage(1)
# ERR_NOSUCHNICK, ERR_NOSUCHCHANNEL, or ERR_CANNOTSENDTOCHAN
self.assertIn(msg.command, ('401', '403', '404'))
class NoticeTestCase(cases.BaseServerTestCase):
@cases.SpecificationSelector.requiredBySpecification('RFC1459', 'RFC2812')
def testNotice(self):
"""<https://tools.ietf.org/html/rfc2812#section-3.3.2>"""
self.connectClient('foo')
self.sendLine(1, 'JOIN #chan')
self.connectClient('bar')
self.sendLine(2, 'JOIN #chan')
self.getMessages(2) # synchronize
self.sendLine(1, 'NOTICE #chan :hello there')
self.getMessages(1) # synchronize
notices = [msg for msg in self.getMessages(2) if msg.command == 'NOTICE']
self.assertEqual(len(notices), 1)
self.assertMessageEqual(
notices[0],
command='NOTICE',
params=['#chan', 'hello there']
)
@cases.SpecificationSelector.requiredBySpecification('RFC1459', 'RFC2812')
def testNoticeNonexistentChannel(self):
"""
'automatic replies MUST NEVER be sent in response to a NOTICE message'
https://tools.ietf.org/html/rfc2812#section-3.3.2>
"""
self.connectClient('foo')
self.sendLine(1, 'NOTICE #nonexistent :hello there')
self.assertEqual(self.getMessages(1), [])

View File

@ -4,7 +4,6 @@ Tests METADATA features.
"""
from irctest import cases
from irctest.irc_utils.message_parser import Message
class MetadataTestCase(cases.BaseServerTestCase):
valid_metadata_keys = {'valid_key1', 'valid_key2'}

View File

@ -5,6 +5,7 @@
from irctest import cases
from irctest.client_mock import NoMessageException
from irctest.basecontrollers import NotImplementedByController
from irctest.numerics import RPL_MONLIST, RPL_ENDOFMONLIST
class EchoMessageTestCase(cases.BaseServerTestCase):
def check_server_support(self):
@ -88,7 +89,7 @@ class EchoMessageTestCase(cases.BaseServerTestCase):
self.assertMonoffline(1, 'bar')
@cases.SpecificationSelector.requiredBySpecification('IRCv3.2')
def testMonitorOneConnection(self):
def testMonitorOneConnectionWithQuit(self):
self.connectClient('foo')
self.check_server_support()
self.connectClient('bar')
@ -220,3 +221,37 @@ class EchoMessageTestCase(cases.BaseServerTestCase):
self.assertEqual(l, [],
fail_msg='Got response to unmonitored client: {}',
extra_format=(l,))
@cases.SpecificationSelector.requiredBySpecification('IRCv3.2')
def testMonitorList(self):
def checkMonitorSubjects(messages, client_nick, expected_targets):
# collect all the RPL_MONLIST nicks into a set:
result = set()
for message in messages:
if message.command == RPL_MONLIST:
self.assertEqual(message.params[0], client_nick)
result.update(message.params[1].split(','))
# finally, RPL_ENDOFMONLIST should be sent
self.assertEqual(messages[-1].command, RPL_ENDOFMONLIST)
self.assertEqual(messages[-1].params[0], client_nick)
self.assertEqual(result, expected_targets)
self.connectClient('bar')
self.check_server_support()
self.sendLine(1, 'MONITOR L')
checkMonitorSubjects(self.getMessages(1), 'bar', set())
self.sendLine(1, 'MONITOR + qux')
self.getMessages(1)
self.sendLine(1, 'MONITOR L')
checkMonitorSubjects(self.getMessages(1), 'bar', {'qux',})
self.sendLine(1, 'MONITOR + bazbat')
self.getMessages(1)
self.sendLine(1, 'MONITOR L')
checkMonitorSubjects(self.getMessages(1), 'bar', {'qux', 'bazbat',})
self.sendLine(1, 'MONITOR - qux')
self.getMessages(1)
self.sendLine(1, 'MONITOR L')
checkMonitorSubjects(self.getMessages(1), 'bar', {'bazbat',})

View File

@ -0,0 +1,32 @@
"""
Tests multi-prefix.
<http://ircv3.net/specs/extensions/multi-prefix-3.1.html>
"""
from irctest import cases
class MultiPrefixTestCase(cases.BaseServerTestCase):
@cases.SpecificationSelector.requiredBySpecification('IRCv3.1')
def testMultiPrefix(self):
"""“When requested, the multi-prefix client capability will cause the
IRC server to send all possible prefixes which apply to a user in NAMES
and WHO output.
These prefixes MUST be in order of rank, from highest to lowest.
"""
self.connectClient('foo', capabilities=['multi-prefix'])
self.joinChannel(1, '#chan')
self.sendLine(1, 'MODE #chan +v foo')
self.getMessages(1)
#TODO(dan): Make sure +v is voice
self.sendLine(1, 'NAMES #chan')
self.assertMessageEqual(self.getMessage(1), command='353', params=['foo', '=', '#chan', '@+foo'], fail_msg='Expected NAMES response (353) with @+foo, got: {msg}')
self.getMessages(1)
self.sendLine(1, 'WHO #chan')
msg = self.getMessage(1)
self.assertEqual(msg.command, '352', msg, fail_msg='Expected WHO response (352), got: {msg}')
self.assertGreaterEqual(len(msg.params), 8, 'Expected WHO response (352) with 8 params, got: {msg}'.format(msg=msg))
self.assertTrue('@+' in msg.params[6], 'Expected WHO response (352) with "@+" in param 7, got: {msg}'.format(msg=msg))

View File

@ -0,0 +1,50 @@
"""
Regression tests for bugs in oragono.
"""
from irctest import cases
class RegressionsTestCase(cases.BaseServerTestCase):
@cases.SpecificationSelector.requiredBySpecification('RFC1459')
def testFailedNickChange(self):
# see oragono commit d0ded906d4ac8f
self.connectClient('alice')
self.connectClient('bob')
# bob tries to change to an in-use nickname; this MUST fail
self.sendLine(2, 'NICK alice')
ms = self.getMessages(2)
self.assertEqual(len(ms), 1)
self.assertMessageEqual(ms[0], command='433') # ERR_NICKNAMEINUSE
# bob MUST still own the bob nick, and be able to receive PRIVMSG as bob
self.sendLine(1, 'PRIVMSG bob hi')
ms = self.getMessages(1)
self.assertEqual(len(ms), 0)
ms = self.getMessages(2)
self.assertEqual(len(ms), 1)
self.assertMessageEqual(ms[0], command='PRIVMSG', params=['bob', 'hi'])
@cases.SpecificationSelector.requiredBySpecification('RFC1459')
def testCaseChanges(self):
self.connectClient('alice')
self.joinChannel(1, '#test')
self.connectClient('bob')
self.joinChannel(2, '#test')
self.getMessages(1)
self.getMessages(2)
# case change: both alice and bob should get a successful nick line
self.sendLine(1, 'NICK Alice')
ms = self.getMessages(1)
self.assertEqual(len(ms), 1)
self.assertMessageEqual(ms[0], command='NICK', params=['Alice'])
ms = self.getMessages(2)
self.assertEqual(len(ms), 1)
self.assertMessageEqual(ms[0], command='NICK', params=['Alice'])
# bob should not get notified on no-op nick change
self.sendLine(1, 'NICK Alice')
ms = self.getMessages(2)
self.assertEqual(ms, [])

View File

@ -1,7 +1,6 @@
import base64
from irctest import cases
from irctest.irc_utils.message_parser import Message
class RegistrationTestCase(cases.BaseServerTestCase):
def testRegistration(self):
@ -129,6 +128,10 @@ class SaslTestCase(cases.BaseServerTestCase, cases.OptionalityHelper):
self.sendLine(1, 'AUTHENTICATE {}'.format(authstring[0:400]))
self.sendLine(1, 'AUTHENTICATE {}'.format(authstring[400:]))
self.confirmSuccessfulAuth()
def confirmSuccessfulAuth(self):
# TODO: check username/etc in this as well, so we can apply it to other tests
# TODO: may be in the other order
m = self.getRegistrationMessage(1)
self.assertMessageEqual(m, command='900',
@ -136,7 +139,7 @@ class SaslTestCase(cases.BaseServerTestCase, cases.OptionalityHelper):
'login, but got: {msg}')
m = self.getRegistrationMessage(1)
self.assertMessageEqual(m, command='903',
fail_msg='Expected 900 (RPL_LOGGEDIN) after successful '
fail_msg='Expected 903 (RPL_SASLSUCCESS) after successful '
'login, but got: {msg}')
# TODO: add a test for when the length of the authstring is greater than 800.
@ -171,15 +174,7 @@ class SaslTestCase(cases.BaseServerTestCase, cases.OptionalityHelper):
self.sendLine(1, 'AUTHENTICATE {}'.format(authstring))
self.sendLine(1, 'AUTHENTICATE +')
# TODO: may be in the other order
m = self.getRegistrationMessage(1)
self.assertMessageEqual(m, command='900',
fail_msg='Expected 900 (RPL_LOGGEDIN) after successful '
'login, but got: {msg}')
m = self.getRegistrationMessage(1)
self.assertMessageEqual(m, command='903',
fail_msg='Expected 900 (RPL_LOGGEDIN) after successful '
'login, but got: {msg}')
self.confirmSuccessfulAuth()
# TODO: add a test for when the length of the authstring is 800.
# I don't know how to do it, because it would make the registration

View File

@ -1,62 +0,0 @@
"""
<http://ircv3.net/specs/extensions/tls-3.1.html>
"""
from irctest import cases
from irctest.basecontrollers import NotImplementedByController
class StarttlsFailTestCase(cases.BaseServerTestCase):
@cases.SpecificationSelector.requiredBySpecification('IRCv3.1')
def testStarttlsRequestTlsFail(self):
"""<http://ircv3.net/specs/extensions/tls-3.1.html>
"""
self.addClient()
# TODO: check also without this
self.sendLine(1, 'CAP LS')
capabilities = self.getCapLs(1)
if 'tls' not in capabilities:
raise NotImplementedByController('tls')
# TODO: check also without this
self.sendLine(1, 'CAP REQ :tls')
m = self.getRegistrationMessage(1)
# TODO: Remove this once the trailing space issue is fixed in Charybdis
# and Mammon:
#self.assertMessageEqual(m, command='CAP', params=['*', 'ACK', 'tls'],
# fail_msg='Did not ACK capability `tls`: {msg}')
self.sendLine(1, 'STARTTLS')
m = self.getRegistrationMessage(1)
self.assertMessageEqual(m, command='691',
fail_msg='Did not respond to STARTTLS with 691 whereas '
'SSL is not configured: {msg}.')
class StarttlsTestCase(cases.BaseServerTestCase):
ssl = True
def testStarttlsRequestTls(self):
"""<http://ircv3.net/specs/extensions/tls-3.1.html>
"""
self.addClient()
# TODO: check also without this
self.sendLine(1, 'CAP LS')
capabilities = self.getCapLs(1)
if 'tls' not in capabilities:
raise NotImplementedByController('tls')
# TODO: check also without this
self.sendLine(1, 'CAP REQ :tls')
m = self.getRegistrationMessage(1)
# TODO: Remove this one the trailing space issue is fixed in Charybdis
# and Mammon:
#self.assertMessageEqual(m, command='CAP', params=['*', 'ACK', 'tls'],
# fail_msg='Did not ACK capability `tls`: {msg}')
self.sendLine(1, 'STARTTLS')
m = self.getRegistrationMessage(1)
self.assertMessageEqual(m, command='670',
fail_msg='Did not respond to STARTTLS with 670: {msg}.')
self.clients[1].starttls()
self.sendLine(1, 'USER f * * :foo')
self.sendLine(1, 'NICK foo')
self.sendLine(1, 'CAP END')
self.getMessages(1)

View File

@ -0,0 +1,57 @@
"""
User commands as specified in Section 3.6 of RFC 2812:
<https://tools.ietf.org/html/rfc2812#section-3.6>
"""
from irctest import cases
from irctest.numerics import RPL_WHOISUSER, RPL_WHOISCHANNELS, RPL_AWAY, RPL_NOWAWAY, RPL_UNAWAY
class WhoisTestCase(cases.BaseServerTestCase):
@cases.SpecificationSelector.requiredBySpecification('RFC2812')
def testWhoisUser(self):
"""Test basic WHOIS behavior"""
nick = 'myCoolNickname'
username = 'myCoolUsername'
realname = 'My Real Name'
self.addClient()
self.sendLine(1, f'NICK {nick}')
self.sendLine(1, f'USER {username} 0 * :{realname}')
self.skipToWelcome(1)
self.connectClient('otherNickname')
self.getMessages(2)
self.sendLine(2, 'WHOIS mycoolnickname')
messages = self.getMessages(2)
whois_user = messages[0]
self.assertEqual(whois_user.command, RPL_WHOISUSER)
# "<client> <nick> <username> <host> * :<realname>"
self.assertEqual(whois_user.params[1], nick)
self.assertIn(whois_user.params[2], ('~' + username, '~' + username[0:9]))
# dumb regression test for oragono/oragono#355:
self.assertNotIn(whois_user.params[3], [nick, username, '~' + username, realname])
self.assertEqual(whois_user.params[5], realname)
class AwayTestCase(cases.BaseServerTestCase):
@cases.SpecificationSelector.requiredBySpecification('RFC2812')
def testAway(self):
self.connectClient('bar')
self.sendLine(1, "AWAY :I'm not here right now")
replies = self.getMessages(1)
self.assertIn(RPL_NOWAWAY, [msg.command for msg in replies])
self.connectClient('qux')
self.sendLine(2, "PRIVMSG bar :what's up")
replies = self.getMessages(2)
self.assertEqual(len(replies), 1)
self.assertEqual(replies[0].command, RPL_AWAY)
self.assertEqual(replies[0].params, ['qux', 'bar', "I'm not here right now"])
self.sendLine(1, "AWAY")
replies = self.getMessages(1)
self.assertIn(RPL_UNAWAY, [msg.command for msg in replies])
self.sendLine(2, "PRIVMSG bar :what's up")
replies = self.getMessages(2)
self.assertEqual(len(replies), 0)

View File

@ -1,3 +1,3 @@
limnoria > 2012.08.04 # Needs MultipleReplacer, from 1a64f105
psutil >= 3.1.0 # Fixes #640
ecdsa
pyxmpp2_scram

View File

@ -15,7 +15,7 @@ with open(os.path.join(os.path.dirname(__file__), 'requirements.txt')) as fd:
setup(
name='irctest',
version='0.1',
version='0.1.2',
author='Valentin Lorentz',
url='https://github.com/ProgVal/irctest/',
author_email='progval+irctest@progval.net',