Merge pull request #34 from slingamn/wip

miscellaneous updates
This commit is contained in:
Daniel Oaks
2020-04-10 07:52:53 +10:00
committed by GitHub
20 changed files with 1208 additions and 251 deletions

View File

@ -11,6 +11,7 @@ from . import runner
from . import client_mock
from .irc_utils import capabilities
from .irc_utils import message_parser
from .irc_utils.sasl import sasl_plain_blob
from .exceptions import ConnectionClosed
from .specifications import Specifications
@ -235,12 +236,15 @@ class BaseServerTestCase(_IrcTestCase):
invalid_metadata_keys = frozenset()
def setUp(self):
super().setUp()
config = None
if hasattr(self, 'customizedConfig'):
config = self.customizedConfig()
self.server_support = {}
self.find_hostname_and_port()
self.controller.run(self.hostname, self.port, password=self.password,
valid_metadata_keys=self.valid_metadata_keys,
invalid_metadata_keys=self.invalid_metadata_keys,
ssl=self.ssl)
ssl=self.ssl, config=config)
self.clients = {}
def tearDown(self):
self.controller.kill()
@ -324,7 +328,7 @@ class BaseServerTestCase(_IrcTestCase):
return result
def connectClient(self, nick, name=None, capabilities=None,
skip_if_cap_nak=False, show_io=None):
skip_if_cap_nak=False, show_io=None, password=None):
client = self.addClient(name, show_io=show_io)
if capabilities is not None and 0 < len(capabilities):
self.sendLine(client, 'CAP REQ :{}'.format(' '.join(capabilities)))
@ -341,6 +345,9 @@ class BaseServerTestCase(_IrcTestCase):
else:
raise
self.sendLine(client, 'CAP END')
if password is not None:
self.sendLine(client, 'AUTHENTICATE PLAIN')
self.sendLine(client, sasl_plain_blob(nick, password))
self.sendLine(client, 'NICK {}'.format(nick))
self.sendLine(client, 'USER username * * :Realname')

View File

@ -1,85 +1,126 @@
import copy
import json
import os
import subprocess
from irctest.basecontrollers import NotImplementedByController
from irctest.basecontrollers import BaseServerController, DirectoryBasedController
TEMPLATE_CONFIG = """
network:
name: OragonoTest
OPER_PWD = 'frenchfries'
server:
name: oragono.test
listen:
- "{hostname}:{port}"
{tls}
BASE_CONFIG = {
"network": {
"name": "OragonoTest",
},
check-ident: false
"server": {
"name": "oragono.test",
"listeners": {},
"max-sendq": "16k",
"connection-limits": {
"enabled": True,
"cidr-len-ipv4": 32,
"cidr-len-ipv6": 64,
"ips-per-subnet": 1,
"exempted": ["localhost"],
},
"connection-throttling": {
"enabled": True,
"cidr-len-ipv4": 32,
"cidr-len-ipv6": 64,
"ips-per-subnet": 16,
"duration": "10m",
"max-connections": 1,
"ban-duration": "10m",
"ban-message": "Try again later",
"exempted": ["localhost"],
},
},
password: {hashed_password}
'accounts': {
'authentication-enabled': True,
'multiclient': {
'allowed-by-default': True,
'enabled': True,
'always-on': 'disabled',
},
'registration': {
'bcrypt-cost': 4,
'enabled': True,
'enabled-callbacks': ['none'],
'verify-timeout': '120h',
},
},
max-sendq: 16k
"channels": {
"registration": {"enabled": True,},
},
allow-plaintext-resume: true
"datastore": {
"path": None,
},
connection-limits:
cidr-len-ipv4: 24
cidr-len-ipv6: 120
ips-per-subnet: 16
'limits': {
'awaylen': 200,
'chan-list-modes': 60,
'channellen': 64,
'kicklen': 390,
'linelen': {'rest': 2048,},
'monitor-entries': 100,
'nicklen': 32,
'topiclen': 390,
'whowas-entries': 100,
'multiline': {'max-bytes': 4096, 'max-lines': 32,},
},
exempted:
- "127.0.0.1/8"
- "::1/128"
"history": {
"enabled": True,
"channel-length": 128,
"client-length": 128,
"chathistory-maxmessages": 100,
},
connection-throttling:
enabled: true
cidr-len-ipv4: 32
cidr-len-ipv6: 128
duration: 10m
max-connections: 12
ban-duration: 10m
ban-message: You have attempted to connect too many times within a short duration. Wait a while, and you will be able to connect.
'oper-classes': {
'server-admin': {
'title': 'Server Admin',
'capabilities': [
"oper:local_kill",
"oper:local_ban",
"oper:local_unban",
"nofakelag",
"oper:remote_kill",
"oper:remote_ban",
"oper:remote_unban",
"oper:rehash",
"oper:die",
"accreg",
"sajoin",
"samode",
"vhosts",
"chanreg",
],
},
},
exempted:
- "127.0.0.1/8"
- "::1/128"
'opers': {
'root': {
'class': 'server-admin',
'whois-line': 'is a server admin',
# OPER_PWD
'password': '$2a$04$3GzUZB5JapaAbwn7sogpOu9NSiLOgnozVllm2e96LiNPrm61ZsZSq',
},
},
}
accounts:
registration:
enabled: true
verify-timeout: "120h"
enabled-callbacks:
- none # no verification needed, will instantly register successfully
allow-multiple-per-connection: true
bcrypt-cost: 4
authentication-enabled: true
channels:
registration:
enabled: true
datastore:
path: {directory}/ircd.db
limits:
nicklen: 32
channellen: 64
awaylen: 200
kicklen: 390
topiclen: 390
monitor-entries: 100
whowas-entries: 100
chan-list-modes: 60
linelen:
tags: 2048
rest: 2048
history:
enabled: true
channel-length: 128
client-length: 128
"""
LOGGING_CONFIG = {
"logging": [
{
"method": "stderr",
"level": "debug",
"type": "*",
},
]
}
def hash_password(password):
if isinstance(password, str):
@ -95,47 +136,46 @@ class OragonoController(BaseServerController, DirectoryBasedController):
supported_sasl_mechanisms = {
'PLAIN',
}
def create_config(self):
super().create_config()
def kill_proc(self):
self.proc.kill()
def run(self, hostname, port, password=None, ssl=False,
restricted_metadata_keys=None,
valid_metadata_keys=None, invalid_metadata_keys=None):
valid_metadata_keys=None, invalid_metadata_keys=None, config=None):
if valid_metadata_keys or invalid_metadata_keys:
raise NotImplementedByController(
'Defining valid and invalid METADATA keys.')
self.create_config()
tls_config = ""
if config is None:
config = copy.deepcopy(BASE_CONFIG)
self.port = port
bind_address = "127.0.0.1:%s" % (port,)
listener_conf = None # plaintext
if ssl:
self.key_path = os.path.join(self.directory, 'ssl.key')
self.pem_path = os.path.join(self.directory, 'ssl.pem')
tls_config = 'tls-listeners:\n ":{port}":\n key: {key}\n cert: {pem}'.format(
port=port,
key=self.key_path,
pem=self.pem_path,
)
assert self.proc is None
self.port = port
hashed_password = '' # oragono will understand this as 'no password required'
listener_conf = {"tls": {"cert": self.pem_path, "key": self.key_path},}
config['server']['listeners'][bind_address] = listener_conf
config['datastore']['path'] = os.path.join(self.directory, 'ircd.db')
if password is not None:
hashed_password = hash_password(password)
with self.open_file('server.yml') as fd:
fd.write(TEMPLATE_CONFIG.format(
directory=self.directory,
hostname=hostname,
port=port,
tls=tls_config,
hashed_password=hashed_password,
))
config['server']['password'] = hash_password(password)
assert self.proc is None
self._config_path = os.path.join(self.directory, 'server.yml')
self._config = config
self._write_config()
subprocess.call(['oragono', 'initdb',
'--conf', os.path.join(self.directory, 'server.yml'), '--quiet'])
'--conf', self._config_path, '--quiet'])
subprocess.call(['oragono', 'mkcerts',
'--conf', os.path.join(self.directory, 'server.yml'), '--quiet'])
'--conf', self._config_path, '--quiet'])
self.proc = subprocess.Popen(['oragono', 'run',
'--conf', os.path.join(self.directory, 'server.yml'), '--quiet'])
'--conf', self._config_path, '--quiet'])
def registerUser(self, case, username, password=None):
# XXX: Move this somewhere else when
@ -143,18 +183,74 @@ class OragonoController(BaseServerController, DirectoryBasedController):
# part of the specification
client = case.addClient(show_io=False)
case.sendLine(client, 'CAP LS 302')
case.sendLine(client, 'NICK registration_user')
case.sendLine(client, 'NICK ' + username)
case.sendLine(client, 'USER r e g :user')
case.sendLine(client, 'CAP END')
while case.getRegistrationMessage(client).command != '001':
pass
case.getMessages(client)
case.sendLine(client, 'ACC REGISTER {} * {}'.format(
username, password))
case.sendLine(client, 'NS REGISTER ' + password)
msg = case.getMessage(client)
assert msg.command == '920', msg
assert msg.params == [username, 'Account created']
case.sendLine(client, 'QUIT')
case.assertDisconnected(client)
def _write_config(self):
with open(self._config_path, 'w') as fd:
json.dump(self._config, fd)
def baseConfig(self):
return copy.deepcopy(BASE_CONFIG)
def getConfig(self):
return copy.deepcopy(self._config)
def addLoggingToConfig(self, config):
config.update(LOGGING_CONFIG)
return config
def addMysqlToConfig(self, config=None):
mysql_password = os.getenv('MYSQL_PASSWORD')
if not mysql_password:
return None
if config is None:
config = self.baseConfig()
config['datastore']['mysql'] = {
"enabled": True,
"host": "localhost",
"user": "oragono",
"password": mysql_password,
"history-database": "oragono_history",
"timeout": "3s",
}
config['accounts']['multiclient'] = {
'enabled': True,
'allowed-by-default': True,
'always-on': 'disabled',
}
config['history']['persistent'] = {
"enabled": True,
"unregistered-channels": True,
"registered-channels": "opt-out",
"direct-messages": "opt-out",
}
return config
def rehash(self, case, config):
self._config = config
self._write_config()
client = 'operator_for_rehash'
case.connectClient(nick=client, name=client)
case.sendLine(client, 'OPER root %s' % (OPER_PWD,))
case.sendLine(client, 'REHASH')
case.getMessages(client)
case.sendLine(client, 'QUIT')
case.assertDisconnected(client)
def enable_debug_logging(self, case):
config = self.getConfig()
config.update(LOGGING_CONFIG)
self.rehash(case, config)
def get_irctest_controller_class():
return OragonoController

View File

@ -0,0 +1,13 @@
import datetime
from collections import namedtuple
HistoryMessage = namedtuple('HistoryMessage', ['time', 'msgid', 'target', 'text'])
def to_history_message(msg):
return HistoryMessage(time=msg.tags.get('time'), msgid=msg.tags.get('msgid'), target=msg.params[0], text=msg.params[1])
# thanks jess!
IRCV3_FORMAT_STRFTIME = "%Y-%m-%dT%H:%M:%S.%f%z"
def ircv3_timestamp_to_unixtime(timestamp):
return datetime.datetime.strptime(timestamp, IRCV3_FORMAT_STRFTIME).timestamp()

View File

@ -14,7 +14,7 @@ unescape_tag_value = supybot.utils.str.MultipleReplacer(
dict(map(lambda x:(x[1],x[0]), TAG_ESCAPE)))
# TODO: validate host
tag_key_validator = re.compile('(\S+/)?[a-zA-Z0-9-]+')
tag_key_validator = re.compile(r'\+?(\S+/)?[a-zA-Z0-9-]+')
def parse_tags(s):
tags = {}
@ -42,7 +42,7 @@ def parse_message(s):
(tags, s) = s.split(' ', 1)
tags = parse_tags(tags[1:])
else:
tags = []
tags = {}
if ' :' in s:
(other_tokens, trailing_param) = s.split(' :', 1)
tokens = list(filter(bool, other_tokens.split(' '))) + [trailing_param]

View File

@ -0,0 +1,4 @@
import secrets
def random_name(base):
return base + '-' + secrets.token_hex(8)

View File

@ -0,0 +1,6 @@
import base64
def sasl_plain_blob(username, passphrase):
blob = base64.b64encode(b'\x00'.join((username.encode('utf-8'), username.encode('utf-8'), passphrase.encode('utf-8'))))
blobstr = blob.decode('ascii')
return f'AUTHENTICATE {blobstr}'

View File

@ -0,0 +1,122 @@
from irctest import cases
from irctest.irc_utils.sasl import sasl_plain_blob
from irctest.numerics import RPL_WELCOME
from irctest.numerics import ERR_NICKNAMEINUSE
class Bouncer(cases.BaseServerTestCase):
@cases.SpecificationSelector.requiredBySpecification('Oragono')
def testBouncer(self):
"""Test basic bouncer functionality."""
self.controller.registerUser(self, 'observer', 'observerpassword')
self.controller.registerUser(self, 'testuser', 'mypassword')
self.connectClient('observer')
self.joinChannel(1, '#chan')
self.sendLine(1, 'NICKSERV IDENTIFY observer observerpassword')
self.sendLine(1, 'CAP REQ :message-tags server-time')
self.getMessages(1)
self.addClient()
self.sendLine(2, 'CAP LS 302')
self.sendLine(2, 'AUTHENTICATE PLAIN')
self.sendLine(2, sasl_plain_blob('testuser', 'mypassword'))
self.sendLine(2, 'NICK testnick')
self.sendLine(2, 'USER a 0 * a')
self.sendLine(2, 'CAP REQ :server-time message-tags')
self.sendLine(2, 'CAP END')
messages = self.getMessages(2)
welcomes = [message for message in messages if message.command == RPL_WELCOME]
self.assertEqual(len(welcomes), 1)
# should see a regburst for testnick
self.assertEqual(welcomes[0].params[0], 'testnick')
self.joinChannel(2, '#chan')
self.addClient()
self.sendLine(3, 'CAP LS 302')
self.sendLine(3, 'AUTHENTICATE PLAIN')
self.sendLine(3, sasl_plain_blob('testuser', 'mypassword'))
self.sendLine(3, 'NICK testnick')
self.sendLine(3, 'USER a 0 * a')
self.sendLine(3, 'CAP REQ :server-time message-tags account-tag')
self.sendLine(3, 'CAP END')
messages = self.getMessages(3)
welcomes = [message for message in messages if message.command == RPL_WELCOME]
self.assertEqual(len(welcomes), 1)
# should see the *same* regburst for testnick
self.assertEqual(welcomes[0].params[0], 'testnick')
joins = [message for message in messages if message.command == 'JOIN']
# we should be automatically joined to #chan
self.assertEqual(joins[0].params[0], '#chan')
# disable multiclient in nickserv
self.sendLine(3, 'NS SET MULTICLIENT OFF')
self.getMessages(3)
self.addClient()
self.sendLine(4, 'CAP LS 302')
self.sendLine(4, 'AUTHENTICATE PLAIN')
self.sendLine(4, sasl_plain_blob('testuser', 'mypassword'))
self.sendLine(4, 'NICK testnick')
self.sendLine(4, 'USER a 0 * a')
self.sendLine(4, 'CAP REQ :server-time message-tags')
self.sendLine(4, 'CAP END')
# with multiclient disabled, we should not be able to attach to the nick
messages = self.getMessages(4)
welcomes = [message for message in messages if message.command == RPL_WELCOME]
self.assertEqual(len(welcomes), 0)
errors = [message for message in messages if message.command == ERR_NICKNAMEINUSE]
self.assertEqual(len(errors), 1)
self.sendLine(1, '@+clientOnlyTag=Value PRIVMSG #chan :hey')
self.getMessages(1)
messagesfortwo = [msg for msg in self.getMessages(2) if msg.command == 'PRIVMSG']
messagesforthree = [msg for msg in self.getMessages(3) if msg.command == 'PRIVMSG']
self.assertEqual(len(messagesfortwo), 1)
self.assertEqual(len(messagesforthree), 1)
messagefortwo = messagesfortwo[0]
messageforthree = messagesforthree[0]
self.assertEqual(messagefortwo.params, ['#chan', 'hey'])
self.assertEqual(messageforthree.params, ['#chan', 'hey'])
self.assertIn('time', messagefortwo.tags)
self.assertNotIn('account', messagefortwo.tags)
self.assertIn('time', messageforthree.tags)
# 3 has account-tag, 2 doesn't
self.assertIn('account', messageforthree.tags)
# should get same msgid
self.assertEqual(messagefortwo.tags['msgid'], messageforthree.tags['msgid'])
# test that copies of sent messages go out to other sessions
self.sendLine(2, 'PRIVMSG observer :this is a direct message')
self.getMessages(2)
messageForRecipient = [msg for msg in self.getMessages(1) if msg.command == 'PRIVMSG'][0]
copyForOtherSession = [msg for msg in self.getMessages(3) if msg.command == 'PRIVMSG'][0]
self.assertEqual(messageForRecipient.params, copyForOtherSession.params)
self.assertEqual(messageForRecipient.tags['msgid'], copyForOtherSession.tags['msgid'])
self.sendLine(2, 'QUIT :two out')
quitLines = [msg for msg in self.getMessages(2) if msg.command == 'QUIT']
self.assertEqual(len(quitLines), 1)
self.assertIn('two out', quitLines[0].params[0])
# neither the observer nor the other attached session should see a quit here
quitLines = [msg for msg in self.getMessages(1) if msg.command == 'QUIT']
self.assertEqual(quitLines, [])
quitLines = [msg for msg in self.getMessages(3) if msg.command == 'QUIT']
self.assertEqual(quitLines, [])
# session 3 should be untouched at this point
self.sendLine(1, '@+clientOnlyTag=Value PRIVMSG #chan :hey again')
self.getMessages(1)
messagesforthree = [msg for msg in self.getMessages(3) if msg.command == 'PRIVMSG']
self.assertEqual(len(messagesforthree), 1)
self.assertMessageEqual(messagesforthree[0], command='PRIVMSG', params=['#chan', 'hey again'])
self.sendLine(3, 'QUIT :three out')
quitLines = [msg for msg in self.getMessages(3) if msg.command == 'QUIT']
self.assertEqual(len(quitLines), 1)
self.assertIn('three out', quitLines[0].params[0])
# observer should see *this* quit
quitLines = [msg for msg in self.getMessages(1) if msg.command == 'QUIT']
self.assertEqual(len(quitLines), 1)
self.assertIn('three out', quitLines[0].params[0])

View File

@ -7,7 +7,7 @@ from irctest import cases
from irctest import client_mock
from irctest import runner
from irctest.irc_utils import ambiguities
from irctest.numerics import RPL_NOTOPIC, RPL_NAMREPLY, RPL_INVITING, ERR_NOSUCHCHANNEL, ERR_NOTONCHANNEL, ERR_CHANOPRIVSNEEDED, ERR_NOSUCHNICK, ERR_INVITEONLYCHAN
from irctest.numerics import RPL_NOTOPIC, RPL_NAMREPLY, RPL_INVITING, ERR_NOSUCHCHANNEL, ERR_NOTONCHANNEL, ERR_CHANOPRIVSNEEDED, ERR_NOSUCHNICK, ERR_INVITEONLYCHAN, ERR_CANNOTSENDTOCHAN
class JoinTestCase(cases.BaseServerTestCase):
@cases.SpecificationSelector.requiredBySpecification('RFC1459', 'RFC2812',
@ -137,6 +137,30 @@ class JoinTestCase(cases.BaseServerTestCase):
'"foo" with an optional "+" or "@" prefix, but got: '
'{msg}')
@cases.SpecificationSelector.requiredBySpecification('RFC1459', 'RFC2812')
def testNormalPart(self):
self.connectClient('bar')
self.sendLine(1, 'JOIN #chan')
m = self.getMessage(1)
self.assertMessageEqual(m, command='JOIN', params=['#chan'])
self.connectClient('baz')
self.sendLine(2, 'JOIN #chan')
m = self.getMessage(2)
self.assertMessageEqual(m, command='JOIN', params=['#chan'])
# skip the rest of the JOIN burst:
self.getMessages(1)
self.getMessages(2)
self.sendLine(1, 'PART #chan :bye everyone')
# both the PART'ing client and the other channel member should receive a PART line:
m = self.getMessage(1)
self.assertMessageEqual(m, command='PART', params=['#chan', 'bye everyone'])
m = self.getMessage(2)
self.assertMessageEqual(m, command='PART', params=['#chan', 'bye everyone'])
@cases.SpecificationSelector.requiredBySpecification('RFC1459', 'RFC2812')
def testTopic(self):
"""“Once a user has joined a channel, he receives information about
@ -623,3 +647,30 @@ class ChannelQuitTestCase(cases.BaseServerTestCase):
self.assertEqual(m.command, 'QUIT')
self.assertTrue(m.prefix.startswith('qux')) # nickmask of quitter
self.assertIn('qux out', m.params[0])
class NoCTCPTestCase(cases.BaseServerTestCase):
@cases.SpecificationSelector.requiredBySpecification('Oragono')
def testQuit(self):
self.connectClient('bar')
self.joinChannel(1, '#chan')
self.sendLine(1, 'MODE #chan +C')
self.getMessages(1)
self.connectClient('qux')
self.joinChannel(2, '#chan')
self.getMessages(2)
self.sendLine(1, 'PRIVMSG #chan :\x01ACTION hi\x01')
self.getMessages(1)
ms = self.getMessages(2)
self.assertEqual(len(ms), 1)
self.assertMessageEqual(ms[0], command='PRIVMSG', params=['#chan', '\x01ACTION hi\x01'])
self.sendLine(1, 'PRIVMSG #chan :\x01PING 1473523796 918320\x01')
ms = self.getMessages(1)
self.assertEqual(len(ms), 1)
self.assertMessageEqual(ms[0], command=ERR_CANNOTSENDTOCHAN)
ms = self.getMessages(2)
self.assertEqual(ms, [])

View File

@ -0,0 +1,268 @@
import secrets
import time
from irctest import cases
from irctest.irc_utils.junkdrawer import to_history_message
from irctest.irc_utils.random import random_name
CHATHISTORY_CAP = 'draft/chathistory'
EVENT_PLAYBACK_CAP = 'draft/event-playback'
MYSQL_PASSWORD = ""
def validate_chathistory_batch(msgs):
batch_tag = None
closed_batch_tag = None
result = []
for msg in msgs:
if msg.command == "BATCH":
batch_param = msg.params[0]
if batch_tag is None and batch_param[0] == '+':
batch_tag = batch_param[1:]
elif batch_param[0] == '-':
closed_batch_tag = batch_param[1:]
elif msg.command == "PRIVMSG" and batch_tag is not None and msg.tags.get("batch") == batch_tag:
result.append(to_history_message(msg))
assert batch_tag == closed_batch_tag
return result
class ChathistoryTestCase(cases.BaseServerTestCase):
@cases.SpecificationSelector.requiredBySpecification('Oragono')
def testEmptyBatch(self):
bar = random_name('bar')
self.controller.registerUser(self, bar, bar)
self.connectClient(bar, name=bar, capabilities=['batch', 'labeled-response', 'message-tags', 'server-time'], password=bar)
self.getMessages(bar)
# no chathistory results SHOULD result in an empty batch:
self.sendLine(bar, 'CHATHISTORY LATEST * * 10')
msgs = self.getMessages(bar)
self.assertEqual([msg.command for msg in msgs], ['BATCH', 'BATCH'])
@cases.SpecificationSelector.requiredBySpecification('Oragono')
def testMessagesToSelf(self):
bar = random_name('bar')
self.controller.registerUser(self, bar, bar)
self.connectClient(bar, name=bar, capabilities=['batch', 'labeled-response', 'message-tags', 'server-time'], password=bar)
self.getMessages(bar)
messages = []
self.sendLine(bar, 'PRIVMSG %s :this is a privmsg sent to myself' % (bar,))
replies = [msg for msg in self.getMessages(bar) if msg.command == 'PRIVMSG']
self.assertEqual(len(replies), 1)
msg = replies[0]
self.assertEqual(msg.params, [bar, 'this is a privmsg sent to myself'])
messages.append(to_history_message(msg))
self.sendLine(bar, 'CAP REQ echo-message')
self.getMessages(bar)
self.sendLine(bar, 'PRIVMSG %s :this is a second privmsg sent to myself' % (bar,))
replies = [msg for msg in self.getMessages(bar) if msg.command == 'PRIVMSG']
# two messages, the echo and the delivery
self.assertEqual(len(replies), 2)
self.assertEqual(replies[0].params, [bar, 'this is a second privmsg sent to myself'])
messages.append(to_history_message(replies[0]))
# messages should be otherwise identical
self.assertEqual(to_history_message(replies[0]), to_history_message(replies[1]))
self.sendLine(bar, '@label=xyz PRIVMSG %s :this is a third privmsg sent to myself' % (bar,))
replies = [msg for msg in self.getMessages(bar) if msg.command == 'PRIVMSG']
self.assertEqual(len(replies), 2)
# exactly one of the replies MUST be labeled
echo = [msg for msg in replies if msg.tags.get('label') == 'xyz'][0]
delivery = [msg for msg in replies if msg.tags.get('label') is None][0]
self.assertEqual(echo.params, [bar, 'this is a third privmsg sent to myself'])
messages.append(to_history_message(echo))
self.assertEqual(to_history_message(echo), to_history_message(delivery))
# should receive exactly 3 messages in the correct order, no duplicates
self.sendLine(bar, 'CHATHISTORY LATEST * * 10')
replies = [msg for msg in self.getMessages(bar) if msg.command == 'PRIVMSG']
self.assertEqual([to_history_message(msg) for msg in replies], messages)
self.sendLine(bar, 'CHATHISTORY LATEST %s * 10' % (bar,))
replies = [msg for msg in self.getMessages(bar) if msg.command == 'PRIVMSG']
self.assertEqual([to_history_message(msg) for msg in replies], messages)
def validate_echo_messages(self, num_messages, echo_messages):
# sanity checks: should have received the correct number of echo messages,
# all with distinct time tags (because we slept) and msgids
self.assertEqual(len(echo_messages), num_messages)
self.assertEqual(len(set(msg.msgid for msg in echo_messages)), num_messages)
self.assertEqual(len(set(msg.time for msg in echo_messages)), num_messages)
@cases.SpecificationSelector.requiredBySpecification('Oragono')
def testChathistory(self):
self.connectClient('bar', capabilities=['message-tags', 'server-time', 'echo-message', 'batch', 'labeled-response', CHATHISTORY_CAP, EVENT_PLAYBACK_CAP])
chname = '#' + secrets.token_hex(12)
self.joinChannel(1, chname)
self.getMessages(1)
NUM_MESSAGES = 10
echo_messages = []
for i in range(NUM_MESSAGES):
self.sendLine(1, 'PRIVMSG %s :this is message %d' % (chname, i))
echo_messages.extend(to_history_message(msg) for msg in self.getMessages(1))
time.sleep(0.002)
self.validate_echo_messages(NUM_MESSAGES, echo_messages)
self.validate_chathistory(echo_messages, 1, chname)
def customizedConfig(self):
return self.controller.addMysqlToConfig()
@cases.SpecificationSelector.requiredBySpecification('Oragono')
def testChathistoryDMs(self):
c1 = secrets.token_hex(12)
c2 = secrets.token_hex(12)
self.controller.registerUser(self, c1, c1)
self.controller.registerUser(self, c2, c2)
self.connectClient(c1, capabilities=['message-tags', 'server-time', 'echo-message', 'batch', 'labeled-response', CHATHISTORY_CAP, EVENT_PLAYBACK_CAP], password=c1)
self.connectClient(c2, capabilities=['message-tags', 'server-time', 'echo-message', 'batch', 'labeled-response', CHATHISTORY_CAP, EVENT_PLAYBACK_CAP], password=c2)
self.getMessages(1)
self.getMessages(2)
NUM_MESSAGES = 10
echo_messages = []
for i in range(NUM_MESSAGES):
user = (i % 2) + 1
if user == 1:
target = c2
else:
target = c1
self.getMessages(user)
self.sendLine(user, 'PRIVMSG %s :this is message %d' % (target, i))
echo_messages.extend(to_history_message(msg) for msg in self.getMessages(user))
time.sleep(0.002)
self.validate_echo_messages(NUM_MESSAGES, echo_messages)
self.validate_chathistory(echo_messages, 1, c2)
self.validate_chathistory(echo_messages, 1, '*')
self.validate_chathistory(echo_messages, 2, c1)
self.validate_chathistory(echo_messages, 2, '*')
c3 = secrets.token_hex(12)
self.connectClient(c3, capabilities=['message-tags', 'server-time', 'echo-message', 'batch', 'labeled-response', CHATHISTORY_CAP, EVENT_PLAYBACK_CAP])
self.sendLine(1, 'PRIVMSG %s :this is a message in a separate conversation' % (c3,))
self.sendLine(3, 'PRIVMSG %s :i agree that this is a separate conversation' % (c1,))
# 3 received the first message as a delivery and the second as an echo
new_convo = [to_history_message(msg) for msg in self.getMessages(3) if msg.command == 'PRIVMSG']
self.assertEqual([msg.text for msg in new_convo], ['this is a message in a separate conversation', 'i agree that this is a separate conversation'])
# messages should be stored and retrievable by c1, even though c3 is not registered
self.getMessages(1)
self.sendLine(1, 'CHATHISTORY LATEST %s * 10' % (c3,))
results = [to_history_message(msg) for msg in self.getMessages(1) if msg.command == 'PRIVMSG']
self.assertEqual(results, new_convo)
# additional messages with c3 should not show up in the c1-c2 history:
self.validate_chathistory(echo_messages, 1, c2)
self.validate_chathistory(echo_messages, 2, c1)
self.validate_chathistory(echo_messages, 2, c1.upper())
# regression test for #833
self.sendLine(3, 'QUIT')
self.assertDisconnected(3)
# register c3 as an account, then attempt to retrieve the conversation history with c1
self.controller.registerUser(self, c3, c3)
self.connectClient(c3, name=c3, capabilities=['message-tags', 'server-time', 'echo-message', 'batch', 'labeled-response', CHATHISTORY_CAP, EVENT_PLAYBACK_CAP], password=c3)
self.getMessages(c3)
self.sendLine(c3, 'CHATHISTORY LATEST %s * 10' % (c1,))
results = [to_history_message(msg) for msg in self.getMessages(c3) if msg.command == 'PRIVMSG']
# should get nothing
self.assertEqual(results, [])
def validate_chathistory(self, echo_messages, user, chname):
INCLUSIVE_LIMIT = len(echo_messages) * 2
self.sendLine(user, "CHATHISTORY LATEST %s * %d" % (chname, INCLUSIVE_LIMIT))
result = validate_chathistory_batch(self.getMessages(user))
self.assertEqual(echo_messages, result)
self.sendLine(user, "CHATHISTORY LATEST %s * %d" % (chname, 5))
result = validate_chathistory_batch(self.getMessages(user))
self.assertEqual(echo_messages[-5:], result)
self.sendLine(user, "CHATHISTORY LATEST %s * %d" % (chname, 1))
result = validate_chathistory_batch(self.getMessages(user))
self.assertEqual(echo_messages[-1:], result)
self.sendLine(user, "CHATHISTORY LATEST %s msgid=%s %d" % (chname, echo_messages[4].msgid, INCLUSIVE_LIMIT))
result = validate_chathistory_batch(self.getMessages(user))
self.assertEqual(echo_messages[5:], result)
self.sendLine(user, "CHATHISTORY LATEST %s timestamp=%s %d" % (chname, echo_messages[4].time, INCLUSIVE_LIMIT))
result = validate_chathistory_batch(self.getMessages(user))
self.assertEqual(echo_messages[5:], result)
self.sendLine(user, "CHATHISTORY BEFORE %s msgid=%s %d" % (chname, echo_messages[6].msgid, INCLUSIVE_LIMIT))
result = validate_chathistory_batch(self.getMessages(user))
self.assertEqual(echo_messages[:6], result)
self.sendLine(user, "CHATHISTORY BEFORE %s timestamp=%s %d" % (chname, echo_messages[6].time, INCLUSIVE_LIMIT))
result = validate_chathistory_batch(self.getMessages(user))
self.assertEqual(echo_messages[:6], result)
self.sendLine(user, "CHATHISTORY BEFORE %s timestamp=%s %d" % (chname, echo_messages[6].time, 2))
result = validate_chathistory_batch(self.getMessages(user))
self.assertEqual(echo_messages[4:6], result)
self.sendLine(user, "CHATHISTORY AFTER %s msgid=%s %d" % (chname, echo_messages[3].msgid, INCLUSIVE_LIMIT))
result = validate_chathistory_batch(self.getMessages(user))
self.assertEqual(echo_messages[4:], result)
self.sendLine(user, "CHATHISTORY AFTER %s timestamp=%s %d" % (chname, echo_messages[3].time, INCLUSIVE_LIMIT))
result = validate_chathistory_batch(self.getMessages(user))
self.assertEqual(echo_messages[4:], result)
self.sendLine(user, "CHATHISTORY AFTER %s timestamp=%s %d" % (chname, echo_messages[3].time, 3))
result = validate_chathistory_batch(self.getMessages(user))
self.assertEqual(echo_messages[4:7], result)
# BETWEEN forwards and backwards
self.sendLine(user, "CHATHISTORY BETWEEN %s msgid=%s msgid=%s %d" % (chname, echo_messages[0].msgid, echo_messages[-1].msgid, INCLUSIVE_LIMIT))
result = validate_chathistory_batch(self.getMessages(user))
self.assertEqual(echo_messages[1:-1], result)
self.sendLine(user, "CHATHISTORY BETWEEN %s msgid=%s msgid=%s %d" % (chname, echo_messages[-1].msgid, echo_messages[0].msgid, INCLUSIVE_LIMIT))
result = validate_chathistory_batch(self.getMessages(user))
self.assertEqual(echo_messages[1:-1], result)
# BETWEEN forwards and backwards with a limit, should get different results this time
self.sendLine(user, "CHATHISTORY BETWEEN %s msgid=%s msgid=%s %d" % (chname, echo_messages[0].msgid, echo_messages[-1].msgid, 3))
result = validate_chathistory_batch(self.getMessages(user))
self.assertEqual(echo_messages[1:4], result)
self.sendLine(user, "CHATHISTORY BETWEEN %s msgid=%s msgid=%s %d" % (chname, echo_messages[-1].msgid, echo_messages[0].msgid, 3))
result = validate_chathistory_batch(self.getMessages(user))
self.assertEqual(echo_messages[-4:-1], result)
# same stuff again but with timestamps
self.sendLine(user, "CHATHISTORY BETWEEN %s timestamp=%s timestamp=%s %d" % (chname, echo_messages[0].time, echo_messages[-1].time, INCLUSIVE_LIMIT))
result = validate_chathistory_batch(self.getMessages(user))
self.assertEqual(echo_messages[1:-1], result)
self.sendLine(user, "CHATHISTORY BETWEEN %s timestamp=%s timestamp=%s %d" % (chname, echo_messages[-1].time, echo_messages[0].time, INCLUSIVE_LIMIT))
result = validate_chathistory_batch(self.getMessages(user))
self.assertEqual(echo_messages[1:-1], result)
self.sendLine(user, "CHATHISTORY BETWEEN %s timestamp=%s timestamp=%s %d" % (chname, echo_messages[0].time, echo_messages[-1].time, 3))
result = validate_chathistory_batch(self.getMessages(user))
self.assertEqual(echo_messages[1:4], result)
self.sendLine(user, "CHATHISTORY BETWEEN %s timestamp=%s timestamp=%s %d" % (chname, echo_messages[-1].time, echo_messages[0].time, 3))
result = validate_chathistory_batch(self.getMessages(user))
self.assertEqual(echo_messages[-4:-1], result)
# AROUND
self.sendLine(user, "CHATHISTORY AROUND %s msgid=%s %d" % (chname, echo_messages[7].msgid, 1))
result = validate_chathistory_batch(self.getMessages(user))
self.assertEqual([echo_messages[7]], result)
self.sendLine(user, "CHATHISTORY AROUND %s msgid=%s %d" % (chname, echo_messages[7].msgid, 3))
result = validate_chathistory_batch(self.getMessages(user))
self.assertEqual(echo_messages[6:9], result)
self.sendLine(user, "CHATHISTORY AROUND %s timestamp=%s %d" % (chname, echo_messages[7].time, 3))
result = validate_chathistory_batch(self.getMessages(user))
self.assertIn(echo_messages[7], result)

View File

@ -0,0 +1,32 @@
from irctest import cases
from irctest.numerics import RPL_WELCOME, ERR_NICKNAMEINUSE
class ConfusablesTestCase(cases.BaseServerTestCase):
def customizedConfig(self):
config = self.controller.baseConfig()
config['accounts']['nick-reservation'] = {
'enabled': True,
'method': 'strict',
}
return config
@cases.SpecificationSelector.requiredBySpecification('Oragono')
def testConfusableNicks(self):
self.controller.registerUser(self, 'evan', 'sesame')
self.addClient(1)
# U+0435 in place of e:
self.sendLine(1, 'NICK еvan')
self.sendLine(1, 'USER a 0 * a')
messages = self.getMessages(1)
commands = set(msg.command for msg in messages)
self.assertNotIn(RPL_WELCOME, commands)
self.assertIn(ERR_NICKNAMEINUSE, commands)
self.connectClient('evan', name='evan', password='sesame')
# should be able to switch to the confusable nick
self.sendLine('evan', 'NICK еvan')
messages = self.getMessages('evan')
commands = set(msg.command for msg in messages)
self.assertIn('NICK', commands)

View File

@ -4,6 +4,30 @@
from irctest import cases
from irctest.basecontrollers import NotImplementedByController
from irctest.irc_utils.random import random_name
class DMEchoMessageTestCase(cases.BaseServerTestCase):
@cases.SpecificationSelector.requiredBySpecification('Oragono')
def testDirectMessageEcho(self):
bar = random_name('bar')
self.connectClient(bar, name=bar, capabilities=['batch', 'labeled-response', 'echo-message', 'message-tags', 'server-time'])
self.getMessages(bar)
qux = random_name('qux')
self.connectClient(qux, name=qux, capabilities=['batch', 'labeled-response', 'echo-message', 'message-tags', 'server-time'])
self.getMessages(qux)
self.sendLine(bar, '@label=xyz;+example-client-tag=example-value PRIVMSG %s :hi there' % (qux,))
echo = self.getMessages(bar)[0]
delivery = self.getMessages(qux)[0]
self.assertEqual(delivery.params, [qux, 'hi there'])
self.assertEqual(delivery.params, echo.params)
self.assertEqual(delivery.tags['msgid'], echo.tags['msgid'])
self.assertEqual(echo.tags['label'], 'xyz')
self.assertEqual(delivery.tags['+example-client-tag'], 'example-value')
self.assertEqual(delivery.tags['+example-client-tag'], echo.tags['+example-client-tag'])
class EchoMessageTestCase(cases.BaseServerTestCase):
def _testEchoMessage(command, solo, server_time):

View File

@ -9,16 +9,16 @@ from irctest import cases
class LabeledResponsesTestCase(cases.BaseServerTestCase, cases.OptionalityHelper):
@cases.SpecificationSelector.requiredBySpecification('IRCv3.2')
def testLabeledPrivmsgResponsesToMultipleClients(self):
self.connectClient('foo', capabilities=['batch', 'echo-message', 'draft/labeled-response'], skip_if_cap_nak=True)
self.connectClient('foo', capabilities=['batch', 'echo-message', 'labeled-response'], skip_if_cap_nak=True)
self.getMessages(1)
self.connectClient('bar', capabilities=['batch', 'echo-message', 'draft/labeled-response'], skip_if_cap_nak=True)
self.connectClient('bar', capabilities=['batch', 'echo-message', 'labeled-response'], skip_if_cap_nak=True)
self.getMessages(2)
self.connectClient('carl', capabilities=['batch', 'echo-message', 'draft/labeled-response'], skip_if_cap_nak=True)
self.connectClient('carl', capabilities=['batch', 'echo-message', 'labeled-response'], skip_if_cap_nak=True)
self.getMessages(3)
self.connectClient('alice', capabilities=['batch', 'echo-message', 'draft/labeled-response'], skip_if_cap_nak=True)
self.connectClient('alice', capabilities=['batch', 'echo-message', 'labeled-response'], skip_if_cap_nak=True)
self.getMessages(4)
self.sendLine(1, '@draft/label=12345 PRIVMSG bar,carl,alice :hi')
self.sendLine(1, '@label=12345 PRIVMSG bar,carl,alice :hi')
m = self.getMessage(1)
m2 = self.getMessage(2)
m3 = self.getMessage(3)
@ -26,38 +26,38 @@ class LabeledResponsesTestCase(cases.BaseServerTestCase, cases.OptionalityHelper
# ensure the label isn't sent to recipients
self.assertMessageEqual(m2, command='PRIVMSG', fail_msg='No PRIVMSG received by target 1 after sending one out')
self.assertNotIn('draft/label', m2.tags, m2, fail_msg="When sending a PRIVMSG with a label, the target users shouldn't receive the label (only the sending user should): {msg}")
self.assertNotIn('label', m2.tags, m2, fail_msg="When sending a PRIVMSG with a label, the target users shouldn't receive the label (only the sending user should): {msg}")
self.assertMessageEqual(m3, command='PRIVMSG', fail_msg='No PRIVMSG received by target 1 after sending one out')
self.assertNotIn('draft/label', m3.tags, m3, fail_msg="When sending a PRIVMSG with a label, the target users shouldn't receive the label (only the sending user should): {msg}")
self.assertNotIn('label', m3.tags, m3, fail_msg="When sending a PRIVMSG with a label, the target users shouldn't receive the label (only the sending user should): {msg}")
self.assertMessageEqual(m4, command='PRIVMSG', fail_msg='No PRIVMSG received by target 1 after sending one out')
self.assertNotIn('draft/label', m4.tags, m4, fail_msg="When sending a PRIVMSG with a label, the target users shouldn't receive the label (only the sending user should): {msg}")
self.assertNotIn('label', m4.tags, m4, fail_msg="When sending a PRIVMSG with a label, the target users shouldn't receive the label (only the sending user should): {msg}")
self.assertMessageEqual(m, command='BATCH', fail_msg='No BATCH echo received after sending one out')
@cases.SpecificationSelector.requiredBySpecification('IRCv3.2')
def testLabeledPrivmsgResponsesToClient(self):
self.connectClient('foo', capabilities=['batch', 'echo-message', 'draft/labeled-response'], skip_if_cap_nak=True)
self.connectClient('foo', capabilities=['batch', 'echo-message', 'labeled-response'], skip_if_cap_nak=True)
self.getMessages(1)
self.connectClient('bar', capabilities=['batch', 'echo-message', 'draft/labeled-response'], skip_if_cap_nak=True)
self.connectClient('bar', capabilities=['batch', 'echo-message', 'labeled-response'], skip_if_cap_nak=True)
self.getMessages(2)
self.sendLine(1, '@draft/label=12345 PRIVMSG bar :hi')
self.sendLine(1, '@label=12345 PRIVMSG bar :hi')
m = self.getMessage(1)
m2 = self.getMessage(2)
# ensure the label isn't sent to recipient
self.assertMessageEqual(m2, command='PRIVMSG', fail_msg='No PRIVMSG received by the target after sending one out')
self.assertNotIn('draft/label', m2.tags, m2, fail_msg="When sending a PRIVMSG with a label, the target user shouldn't receive the label (only the sending user should): {msg}")
self.assertNotIn('label', m2.tags, m2, fail_msg="When sending a PRIVMSG with a label, the target user shouldn't receive the label (only the sending user should): {msg}")
self.assertMessageEqual(m, command='PRIVMSG', fail_msg='No PRIVMSG echo received after sending one out')
self.assertIn('draft/label', m.tags, m, fail_msg="When sending a PRIVMSG with a label, the echo'd message didn't contain the label at all: {msg}")
self.assertEqual(m.tags['draft/label'], '12345', m, fail_msg="Echo'd PRIVMSG to a client did not contain the same label we sent it with(should be '12345'): {msg}")
self.assertIn('label', m.tags, m, fail_msg="When sending a PRIVMSG with a label, the echo'd message didn't contain the label at all: {msg}")
self.assertEqual(m.tags['label'], '12345', m, fail_msg="Echo'd PRIVMSG to a client did not contain the same label we sent it with(should be '12345'): {msg}")
@cases.SpecificationSelector.requiredBySpecification('IRCv3.2')
def testLabeledPrivmsgResponsesToChannel(self):
self.connectClient('foo', capabilities=['batch', 'echo-message', 'draft/labeled-response'], skip_if_cap_nak=True)
self.connectClient('foo', capabilities=['batch', 'echo-message', 'labeled-response'], skip_if_cap_nak=True)
self.getMessages(1)
self.connectClient('bar', capabilities=['batch', 'echo-message', 'draft/labeled-response'], skip_if_cap_nak=True)
self.connectClient('bar', capabilities=['batch', 'echo-message', 'labeled-response'], skip_if_cap_nak=True)
self.getMessages(2)
# join channels
@ -67,61 +67,61 @@ class LabeledResponsesTestCase(cases.BaseServerTestCase, cases.OptionalityHelper
self.getMessages(2)
self.getMessages(1)
self.sendLine(1, '@draft/label=12345;+draft/reply=123;+draft/react=l😃l PRIVMSG #test :hi')
self.sendLine(1, '@label=12345;+draft/reply=123;+draft/react=l😃l PRIVMSG #test :hi')
ms = self.getMessage(1)
mt = self.getMessage(2)
# ensure the label isn't sent to recipient
self.assertMessageEqual(mt, command='PRIVMSG', fail_msg='No PRIVMSG received by the target after sending one out')
self.assertNotIn('draft/label', mt.tags, mt, fail_msg="When sending a PRIVMSG with a label, the target user shouldn't receive the label (only the sending user should): {msg}")
self.assertNotIn('label', mt.tags, mt, fail_msg="When sending a PRIVMSG with a label, the target user shouldn't receive the label (only the sending user should): {msg}")
# ensure sender correctly receives msg
self.assertMessageEqual(ms, command='PRIVMSG', fail_msg="Got a message back that wasn't a PRIVMSG")
self.assertIn('draft/label', ms.tags, ms, fail_msg="When sending a PRIVMSG with a label, the source user should receive the label but didn't: {msg}")
self.assertEqual(ms.tags['draft/label'], '12345', ms, fail_msg="Echo'd label doesn't match the label we sent (should be '12345'): {msg}")
self.assertIn('label', ms.tags, ms, fail_msg="When sending a PRIVMSG with a label, the source user should receive the label but didn't: {msg}")
self.assertEqual(ms.tags['label'], '12345', ms, fail_msg="Echo'd label doesn't match the label we sent (should be '12345'): {msg}")
@cases.SpecificationSelector.requiredBySpecification('IRCv3.2')
def testLabeledPrivmsgResponsesToSelf(self):
self.connectClient('foo', capabilities=['batch', 'echo-message', 'draft/labeled-response'], skip_if_cap_nak=True)
self.connectClient('foo', capabilities=['batch', 'echo-message', 'labeled-response'], skip_if_cap_nak=True)
self.getMessages(1)
self.sendLine(1, '@draft/label=12345 PRIVMSG foo :hi')
self.sendLine(1, '@label=12345 PRIVMSG foo :hi')
m1 = self.getMessage(1)
m2 = self.getMessage(1)
number_of_labels = 0
for m in [m1, m2]:
self.assertMessageEqual(m, command='PRIVMSG', fail_msg="Got a message back that wasn't a PRIVMSG")
if 'draft/label' in m.tags:
if 'label' in m.tags:
number_of_labels += 1
self.assertEqual(m.tags['draft/label'], '12345', m, fail_msg="Echo'd label doesn't match the label we sent (should be '12345'): {msg}")
self.assertEqual(m.tags['label'], '12345', m, fail_msg="Echo'd label doesn't match the label we sent (should be '12345'): {msg}")
self.assertEqual(number_of_labels, 1, m1, fail_msg="When sending a PRIVMSG to self with echo-message, we only expect one message to contain the label. Instead, {} messages had the label".format(number_of_labels))
@cases.SpecificationSelector.requiredBySpecification('IRCv3.2')
def testLabeledNoticeResponsesToClient(self):
self.connectClient('foo', capabilities=['batch', 'echo-message', 'draft/labeled-response'], skip_if_cap_nak=True)
self.connectClient('foo', capabilities=['batch', 'echo-message', 'labeled-response'], skip_if_cap_nak=True)
self.getMessages(1)
self.connectClient('bar', capabilities=['batch', 'echo-message', 'draft/labeled-response'], skip_if_cap_nak=True)
self.connectClient('bar', capabilities=['batch', 'echo-message', 'labeled-response'], skip_if_cap_nak=True)
self.getMessages(2)
self.sendLine(1, '@draft/label=12345 NOTICE bar :hi')
self.sendLine(1, '@label=12345 NOTICE bar :hi')
m = self.getMessage(1)
m2 = self.getMessage(2)
# ensure the label isn't sent to recipient
self.assertMessageEqual(m2, command='NOTICE', fail_msg='No NOTICE received by the target after sending one out')
self.assertNotIn('draft/label', m2.tags, m2, fail_msg="When sending a NOTICE with a label, the target user shouldn't receive the label (only the sending user should): {msg}")
self.assertNotIn('label', m2.tags, m2, fail_msg="When sending a NOTICE with a label, the target user shouldn't receive the label (only the sending user should): {msg}")
self.assertMessageEqual(m, command='NOTICE', fail_msg='No NOTICE echo received after sending one out')
self.assertIn('draft/label', m.tags, m, fail_msg="When sending a NOTICE with a label, the echo'd message didn't contain the label at all: {msg}")
self.assertEqual(m.tags['draft/label'], '12345', m, fail_msg="Echo'd NOTICE to a client did not contain the same label we sent it with(should be '12345'): {msg}")
self.assertIn('label', m.tags, m, fail_msg="When sending a NOTICE with a label, the echo'd message didn't contain the label at all: {msg}")
self.assertEqual(m.tags['label'], '12345', m, fail_msg="Echo'd NOTICE to a client did not contain the same label we sent it with(should be '12345'): {msg}")
@cases.SpecificationSelector.requiredBySpecification('IRCv3.2')
def testLabeledNoticeResponsesToChannel(self):
self.connectClient('foo', capabilities=['batch', 'echo-message', 'draft/labeled-response'], skip_if_cap_nak=True)
self.connectClient('foo', capabilities=['batch', 'echo-message', 'labeled-response'], skip_if_cap_nak=True)
self.getMessages(1)
self.connectClient('bar', capabilities=['batch', 'echo-message', 'draft/labeled-response'], skip_if_cap_nak=True)
self.connectClient('bar', capabilities=['batch', 'echo-message', 'labeled-response'], skip_if_cap_nak=True)
self.getMessages(2)
# join channels
@ -131,59 +131,59 @@ class LabeledResponsesTestCase(cases.BaseServerTestCase, cases.OptionalityHelper
self.getMessages(2)
self.getMessages(1)
self.sendLine(1, '@draft/label=12345;+draft/reply=123;+draft/react=l😃l NOTICE #test :hi')
self.sendLine(1, '@label=12345;+draft/reply=123;+draft/react=l😃l NOTICE #test :hi')
ms = self.getMessage(1)
mt = self.getMessage(2)
# ensure the label isn't sent to recipient
self.assertMessageEqual(mt, command='NOTICE', fail_msg='No NOTICE received by the target after sending one out')
self.assertNotIn('draft/label', mt.tags, mt, fail_msg="When sending a NOTICE with a label, the target user shouldn't receive the label (only the sending user should): {msg}")
self.assertNotIn('label', mt.tags, mt, fail_msg="When sending a NOTICE with a label, the target user shouldn't receive the label (only the sending user should): {msg}")
# ensure sender correctly receives msg
self.assertMessageEqual(ms, command='NOTICE', fail_msg="Got a message back that wasn't a NOTICE")
self.assertIn('draft/label', ms.tags, ms, fail_msg="When sending a NOTICE with a label, the source user should receive the label but didn't: {msg}")
self.assertEqual(ms.tags['draft/label'], '12345', ms, fail_msg="Echo'd label doesn't match the label we sent (should be '12345'): {msg}")
self.assertIn('label', ms.tags, ms, fail_msg="When sending a NOTICE with a label, the source user should receive the label but didn't: {msg}")
self.assertEqual(ms.tags['label'], '12345', ms, fail_msg="Echo'd label doesn't match the label we sent (should be '12345'): {msg}")
@cases.SpecificationSelector.requiredBySpecification('IRCv3.2')
def testLabeledNoticeResponsesToSelf(self):
self.connectClient('foo', capabilities=['batch', 'echo-message', 'draft/labeled-response'], skip_if_cap_nak=True)
self.connectClient('foo', capabilities=['batch', 'echo-message', 'labeled-response'], skip_if_cap_nak=True)
self.getMessages(1)
self.sendLine(1, '@draft/label=12345 NOTICE foo :hi')
self.sendLine(1, '@label=12345 NOTICE foo :hi')
m1 = self.getMessage(1)
m2 = self.getMessage(1)
number_of_labels = 0
for m in [m1, m2]:
self.assertMessageEqual(m, command='NOTICE', fail_msg="Got a message back that wasn't a NOTICE")
if 'draft/label' in m.tags:
if 'label' in m.tags:
number_of_labels += 1
self.assertEqual(m.tags['draft/label'], '12345', m, fail_msg="Echo'd label doesn't match the label we sent (should be '12345'): {msg}")
self.assertEqual(m.tags['label'], '12345', m, fail_msg="Echo'd label doesn't match the label we sent (should be '12345'): {msg}")
self.assertEqual(number_of_labels, 1, m1, fail_msg="When sending a NOTICE to self with echo-message, we only expect one message to contain the label. Instead, {} messages had the label".format(number_of_labels))
@cases.SpecificationSelector.requiredBySpecification('IRCv3.2')
def testLabeledTagMsgResponsesToClient(self):
self.connectClient('foo', capabilities=['batch', 'echo-message', 'draft/labeled-response', 'message-tags'], skip_if_cap_nak=True)
self.connectClient('foo', capabilities=['batch', 'echo-message', 'labeled-response', 'message-tags'], skip_if_cap_nak=True)
self.getMessages(1)
self.connectClient('bar', capabilities=['batch', 'echo-message', 'draft/labeled-response', 'message-tags'], skip_if_cap_nak=True)
self.connectClient('bar', capabilities=['batch', 'echo-message', 'labeled-response', 'message-tags'], skip_if_cap_nak=True)
self.getMessages(2)
self.sendLine(1, '@draft/label=12345;+draft/reply=123;+draft/react=l😃l TAGMSG bar')
self.sendLine(1, '@label=12345;+draft/reply=123;+draft/react=l😃l TAGMSG bar')
m = self.getMessage(1)
m2 = self.getMessage(2)
# ensure the label isn't sent to recipient
self.assertMessageEqual(m2, command='TAGMSG', fail_msg='No TAGMSG received by the target after sending one out')
self.assertNotIn('draft/label', m2.tags, m2, fail_msg="When sending a TAGMSG with a label, the target user shouldn't receive the label (only the sending user should): {msg}")
self.assertNotIn('label', m2.tags, m2, fail_msg="When sending a TAGMSG with a label, the target user shouldn't receive the label (only the sending user should): {msg}")
self.assertIn('+draft/reply', m2.tags, m2, fail_msg="Reply tag wasn't present on the target user's TAGMSG: {msg}")
self.assertEqual(m2.tags['+draft/reply'], '123', m2, fail_msg="Reply tag wasn't the same on the target user's TAGMSG: {msg}")
self.assertIn('+draft/react', m2.tags, m2, fail_msg="React tag wasn't present on the target user's TAGMSG: {msg}")
self.assertEqual(m2.tags['+draft/react'], 'l😃l', m2, fail_msg="React tag wasn't the same on the target user's TAGMSG: {msg}")
self.assertMessageEqual(m, command='TAGMSG', fail_msg='No TAGMSG echo received after sending one out')
self.assertIn('draft/label', m.tags, m, fail_msg="When sending a TAGMSG with a label, the echo'd message didn't contain the label at all: {msg}")
self.assertEqual(m.tags['draft/label'], '12345', m, fail_msg="Echo'd TAGMSG to a client did not contain the same label we sent it with(should be '12345'): {msg}")
self.assertIn('label', m.tags, m, fail_msg="When sending a TAGMSG with a label, the echo'd message didn't contain the label at all: {msg}")
self.assertEqual(m.tags['label'], '12345', m, fail_msg="Echo'd TAGMSG to a client did not contain the same label we sent it with(should be '12345'): {msg}")
self.assertIn('+draft/reply', m.tags, m, fail_msg="Reply tag wasn't present on the source user's TAGMSG: {msg}")
self.assertEqual(m2.tags['+draft/reply'], '123', m, fail_msg="Reply tag wasn't the same on the source user's TAGMSG: {msg}")
self.assertIn('+draft/react', m.tags, m, fail_msg="React tag wasn't present on the source user's TAGMSG: {msg}")
@ -191,9 +191,9 @@ class LabeledResponsesTestCase(cases.BaseServerTestCase, cases.OptionalityHelper
@cases.SpecificationSelector.requiredBySpecification('IRCv3.2')
def testLabeledTagMsgResponsesToChannel(self):
self.connectClient('foo', capabilities=['batch', 'echo-message', 'draft/labeled-response', 'message-tags'], skip_if_cap_nak=True)
self.connectClient('foo', capabilities=['batch', 'echo-message', 'labeled-response', 'message-tags'], skip_if_cap_nak=True)
self.getMessages(1)
self.connectClient('bar', capabilities=['batch', 'echo-message', 'draft/labeled-response', 'message-tags'], skip_if_cap_nak=True)
self.connectClient('bar', capabilities=['batch', 'echo-message', 'labeled-response', 'message-tags'], skip_if_cap_nak=True)
self.getMessages(2)
# join channels
@ -203,43 +203,43 @@ class LabeledResponsesTestCase(cases.BaseServerTestCase, cases.OptionalityHelper
self.getMessages(2)
self.getMessages(1)
self.sendLine(1, '@draft/label=12345;+draft/reply=123;+draft/react=l😃l TAGMSG #test')
self.sendLine(1, '@label=12345;+draft/reply=123;+draft/react=l😃l TAGMSG #test')
ms = self.getMessage(1)
mt = self.getMessage(2)
# ensure the label isn't sent to recipient
self.assertMessageEqual(mt, command='TAGMSG', fail_msg='No TAGMSG received by the target after sending one out')
self.assertNotIn('draft/label', mt.tags, mt, fail_msg="When sending a TAGMSG with a label, the target user shouldn't receive the label (only the sending user should): {msg}")
self.assertNotIn('label', mt.tags, mt, fail_msg="When sending a TAGMSG with a label, the target user shouldn't receive the label (only the sending user should): {msg}")
# ensure sender correctly receives msg
self.assertMessageEqual(ms, command='TAGMSG', fail_msg="Got a message back that wasn't a TAGMSG")
self.assertIn('draft/label', ms.tags, ms, fail_msg="When sending a TAGMSG with a label, the source user should receive the label but didn't: {msg}")
self.assertEqual(ms.tags['draft/label'], '12345', ms, fail_msg="Echo'd label doesn't match the label we sent (should be '12345'): {msg}")
self.assertIn('label', ms.tags, ms, fail_msg="When sending a TAGMSG with a label, the source user should receive the label but didn't: {msg}")
self.assertEqual(ms.tags['label'], '12345', ms, fail_msg="Echo'd label doesn't match the label we sent (should be '12345'): {msg}")
@cases.SpecificationSelector.requiredBySpecification('IRCv3.2')
def testLabeledTagMsgResponsesToSelf(self):
self.connectClient('foo', capabilities=['batch', 'echo-message', 'draft/labeled-response', 'message-tags'], skip_if_cap_nak=True)
self.connectClient('foo', capabilities=['batch', 'echo-message', 'labeled-response', 'message-tags'], skip_if_cap_nak=True)
self.getMessages(1)
self.sendLine(1, '@draft/label=12345;+draft/reply=123;+draft/react=l😃l TAGMSG foo')
self.sendLine(1, '@label=12345;+draft/reply=123;+draft/react=l😃l TAGMSG foo')
m1 = self.getMessage(1)
m2 = self.getMessage(1)
number_of_labels = 0
for m in [m1, m2]:
self.assertMessageEqual(m, command='TAGMSG', fail_msg="Got a message back that wasn't a TAGMSG")
if 'draft/label' in m.tags:
if 'label' in m.tags:
number_of_labels += 1
self.assertEqual(m.tags['draft/label'], '12345', m, fail_msg="Echo'd label doesn't match the label we sent (should be '12345'): {msg}")
self.assertEqual(m.tags['label'], '12345', m, fail_msg="Echo'd label doesn't match the label we sent (should be '12345'): {msg}")
self.assertEqual(number_of_labels, 1, m1, fail_msg="When sending a TAGMSG to self with echo-message, we only expect one message to contain the label. Instead, {} messages had the label".format(number_of_labels))
@cases.SpecificationSelector.requiredBySpecification('IRCv3.2')
def testBatchedJoinMessages(self):
self.connectClient('bar', capabilities=['batch', 'draft/labeled-response', 'message-tags', 'server-time'], skip_if_cap_nak=True)
self.connectClient('bar', capabilities=['batch', 'labeled-response', 'message-tags', 'server-time'], skip_if_cap_nak=True)
self.getMessages(1)
self.sendLine(1, '@draft/label=12345 JOIN #xyz')
self.sendLine(1, '@label=12345 JOIN #xyz')
m = self.getMessages(1)
# we expect at least join and names lines, which must be batched
@ -253,8 +253,8 @@ class LabeledResponsesTestCase(cases.BaseServerTestCase, cases.OptionalityHelper
batch_id = batch_start.params[0][1:]
# batch id MUST be alphanumerics and hyphens
self.assertTrue(re.match(r'^[A-Za-z0-9\-]+$', batch_id) is not None, 'batch id must be alphanumerics and hyphens, got %r' % (batch_id,))
self.assertEqual(batch_start.params[1], 'draft/labeled-response')
self.assertEqual(batch_start.tags.get('draft/label'), '12345')
self.assertEqual(batch_start.params[1], 'labeled-response')
self.assertEqual(batch_start.tags.get('label'), '12345')
# valid BATCH end line
batch_end = m[-1]
@ -266,36 +266,32 @@ class LabeledResponsesTestCase(cases.BaseServerTestCase, cases.OptionalityHelper
@cases.SpecificationSelector.requiredBySpecification('Oragono')
def testNoBatchForSingleMessage(self):
self.connectClient('bar', capabilities=['batch', 'draft/labeled-response', 'message-tags', 'server-time'])
self.connectClient('bar', capabilities=['batch', 'labeled-response', 'message-tags', 'server-time'])
self.getMessages(1)
self.sendLine(1, '@draft/label=98765 PING adhoctestline')
self.sendLine(1, '@label=98765 PING adhoctestline')
# no BATCH should be initiated for a one-line response, it should just be labeled
ms = self.getMessages(1)
self.assertEqual(len(ms), 1)
m = ms[0]
self.assertMessageEqual(m, command='PONG', params=['adhoctestline'])
# check the label
self.assertEqual(m.tags.get('draft/label'), '98765')
self.assertEqual(m.tags.get('label'), '98765')
@cases.SpecificationSelector.requiredBySpecification('Oragono')
def testEmptyBatchForNoResponse(self):
self.connectClient('bar', capabilities=['batch', 'draft/labeled-response', 'message-tags', 'server-time'])
self.connectClient('bar', capabilities=['batch', 'labeled-response', 'message-tags', 'server-time'])
self.getMessages(1)
# PONG never receives a response
self.sendLine(1, '@draft/label=98765 PONG adhoctestline')
self.sendLine(1, '@label=98765 PONG adhoctestline')
# "If no response is required, an empty batch MUST be sent."
# https://ircv3.net/specs/extensions/labeled-response.html
# labeled-response: "Servers MUST respond with a labeled
# `ACK` message when a client sends a labeled command that normally
# produces no response."
ms = self.getMessages(1)
self.assertEqual(len(ms), 2)
batch_start, batch_end = ms
self.assertEqual(len(ms), 1)
ack = ms[0]
self.assertEqual(batch_start.command, 'BATCH')
self.assertEqual(batch_start.tags.get('draft/label'), '98765')
self.assertTrue(batch_start.params[0].startswith('+'))
batch_id = batch_start.params[0][1:]
self.assertEqual(batch_end.command, 'BATCH')
self.assertEqual(batch_end.params[0], '-' + batch_id)
self.assertEqual(ack.command, 'ACK')
self.assertEqual(ack.tags.get('label'), '98765')

View File

@ -0,0 +1,75 @@
"""
draft/multiline
"""
from irctest import cases
CAP_NAME = 'draft/multiline'
BATCH_TYPE = 'draft/multiline'
CONCAT_TAG = 'draft/multiline-concat'
base_caps = ['message-tags', 'batch', 'echo-message', 'server-time', 'labeled-response']
class MultilineTestCase(cases.BaseServerTestCase, cases.OptionalityHelper):
@cases.SpecificationSelector.requiredBySpecification('Oragono')
def testBasic(self):
self.connectClient('alice', capabilities=(base_caps + [CAP_NAME]))
self.joinChannel(1, '#test')
self.connectClient('bob', capabilities=(base_caps + [CAP_NAME]))
self.joinChannel(2, '#test')
self.connectClient('charlie', capabilities=base_caps)
self.joinChannel(3, '#test')
self.getMessages(1)
self.getMessages(2)
self.getMessages(3)
self.sendLine(1, '@label=xyz BATCH +123 %s #test' % (BATCH_TYPE,))
self.sendLine(1, '@batch=123 PRIVMSG #test hello')
self.sendLine(1, '@batch=123 PRIVMSG #test :#how is ')
self.sendLine(1, '@batch=123;%s PRIVMSG #test :everyone?' % (CONCAT_TAG,))
self.sendLine(1, 'BATCH -123')
echo = self.getMessages(1)
batchStart, batchEnd = echo[0], echo[-1]
self.assertEqual(batchStart.command, 'BATCH')
self.assertEqual(batchStart.tags.get('label'), 'xyz')
self.assertEqual(len(batchStart.params), 3)
self.assertEqual(batchStart.params[1], CAP_NAME)
self.assertEqual(batchStart.params[2], "#test")
self.assertEqual(batchEnd.command, 'BATCH')
self.assertEqual(batchStart.params[0][1:], batchEnd.params[0][1:])
msgid = batchStart.tags.get('msgid')
time = batchStart.tags.get('time')
assert msgid
assert time
privmsgs = echo[1:-1]
for msg in privmsgs:
self.assertMessageEqual(msg, command='PRIVMSG')
self.assertNotIn('msgid', msg.tags)
self.assertNotIn('time', msg.tags)
self.assertIn(CONCAT_TAG, echo[3].tags)
relay = self.getMessages(2)
batchStart, batchEnd = relay[0], relay[-1]
self.assertEqual(batchStart.command, 'BATCH')
self.assertEqual(batchEnd.command, 'BATCH')
self.assertEqual(batchStart.params[0][1:], batchEnd.params[0][1:])
self.assertEqual(batchStart.tags.get('msgid'), msgid)
self.assertEqual(batchStart.tags.get('time'), time)
privmsgs = relay[1:-1]
for msg in privmsgs:
self.assertMessageEqual(msg, command='PRIVMSG')
self.assertNotIn('msgid', msg.tags)
self.assertNotIn('time', msg.tags)
self.assertIn(CONCAT_TAG, relay[3].tags)
fallback_relay = self.getMessages(3)
relayed_fmsgids = []
for msg in fallback_relay:
self.assertMessageEqual(msg, command='PRIVMSG')
relayed_fmsgids.append(msg.tags.get('msgid'))
self.assertEqual(msg.tags.get('time'), time)
self.assertNotIn(CONCAT_TAG, msg.tags)
self.assertEqual(relayed_fmsgids, [msgid] + [None]*(len(fallback_relay)-1))

View File

@ -25,3 +25,55 @@ class RegressionsTestCase(cases.BaseServerTestCase):
ms = self.getMessages(2)
self.assertEqual(len(ms), 1)
self.assertMessageEqual(ms[0], command='PRIVMSG', params=['bob', 'hi'])
@cases.SpecificationSelector.requiredBySpecification('RFC1459')
def testCaseChanges(self):
self.connectClient('alice')
self.joinChannel(1, '#test')
self.connectClient('bob')
self.joinChannel(2, '#test')
self.getMessages(1)
self.getMessages(2)
# case change: both alice and bob should get a successful nick line
self.sendLine(1, 'NICK Alice')
ms = self.getMessages(1)
self.assertEqual(len(ms), 1)
self.assertMessageEqual(ms[0], command='NICK', params=['Alice'])
ms = self.getMessages(2)
self.assertEqual(len(ms), 1)
self.assertMessageEqual(ms[0], command='NICK', params=['Alice'])
# bob should not get notified on no-op nick change
self.sendLine(1, 'NICK Alice')
ms = self.getMessages(2)
self.assertEqual(ms, [])
@cases.SpecificationSelector.requiredBySpecification('IRCv3.2')
def testTagCap(self):
# regression test for oragono #754
self.connectClient('alice', capabilities=['message-tags', 'batch', 'echo-message', 'server-time'])
self.connectClient('bob')
self.getMessages(1)
self.getMessages(2)
self.sendLine(1, '@+draft/reply=ct95w3xemz8qj9du2h74wp8pee PRIVMSG bob :hey yourself')
ms = self.getMessages(1)
self.assertEqual(len(ms), 1)
self.assertMessageEqual(ms[0], command='PRIVMSG', params=['bob', 'hey yourself'])
self.assertEqual(ms[0].tags.get('+draft/reply'), 'ct95w3xemz8qj9du2h74wp8pee')
ms = self.getMessages(2)
self.assertEqual(len(ms), 1)
self.assertMessageEqual(ms[0], command='PRIVMSG', params=['bob', 'hey yourself'])
self.assertEqual(ms[0].tags, {})
self.sendLine(2, 'CAP REQ :message-tags server-time')
self.getMessages(2)
self.sendLine(1, '@+draft/reply=tbxqauh9nykrtpa3n6icd9whan PRIVMSG bob :hey again')
self.getMessages(1)
ms = self.getMessages(2)
# now bob has the tags cap, so he should receive the tags
self.assertEqual(len(ms), 1)
self.assertMessageEqual(ms[0], command='PRIVMSG', params=['bob', 'hey again'])
self.assertEqual(ms[0].tags.get('+draft/reply'), 'tbxqauh9nykrtpa3n6icd9whan')

View File

@ -2,39 +2,45 @@
<https://github.com/DanielOaks/ircv3-specifications/blob/master+resume/extensions/resume.md>
"""
import secrets
from irctest import cases
from irctest.numerics import RPL_AWAY
ANCIENT_TIMESTAMP = '2006-01-02T15:04:05.999Z'
class ResumeTestCase(cases.BaseServerTestCase):
@cases.SpecificationSelector.requiredBySpecification('Oragono')
def testNoResumeByDefault(self):
self.connectClient('bar', capabilities=['batch', 'echo-message', 'draft/labeled-response'])
self.connectClient('bar', capabilities=['batch', 'echo-message', 'labeled-response'])
ms = self.getMessages(1)
resume_messages = [m for m in ms if m.command == 'RESUME']
self.assertEqual(resume_messages, [], 'should not see RESUME messages unless explicitly negotiated')
@cases.SpecificationSelector.requiredBySpecification('Oragono')
def testResume(self):
self.connectClient('bar', capabilities=['batch', 'draft/labeled-response', 'server-time'])
chname = '#' + secrets.token_hex(12)
self.connectClient('bar', capabilities=['batch', 'labeled-response', 'server-time'])
ms = self.getMessages(1)
welcome = self.connectClient('baz', capabilities=['batch', 'draft/labeled-response', 'server-time', 'draft/resume-0.3'])
welcome = self.connectClient('baz', capabilities=['batch', 'labeled-response', 'server-time', 'draft/resume-0.5'])
resume_messages = [m for m in welcome if m.command == 'RESUME']
self.assertEqual(len(resume_messages), 1)
self.assertEqual(resume_messages[0].params[0], 'TOKEN')
token = resume_messages[0].params[1]
self.joinChannel(1, '#xyz')
self.joinChannel(2, '#xyz')
self.sendLine(1, 'PRIVMSG #xyz :hello friends')
self.joinChannel(1, chname)
self.joinChannel(2, chname)
self.sendLine(1, 'PRIVMSG %s :hello friends' % (chname,))
self.sendLine(1, 'PRIVMSG baz :hello friend singular')
self.getMessages(1)
# should receive these messages
privmsgs = [m for m in self.getMessages(2) if m.command == 'PRIVMSG']
self.assertEqual(len(privmsgs), 2)
privmsgs.sort(key=lambda m: m.params[0])
self.assertMessageEqual(privmsgs[0], command='PRIVMSG', params=['#xyz', 'hello friends'])
self.assertMessageEqual(privmsgs[0], command='PRIVMSG', params=[chname, 'hello friends'])
self.assertMessageEqual(privmsgs[1], command='PRIVMSG', params=['baz', 'hello friend singular'])
channelMsgTime = privmsgs[0].tags.get('time')
@ -43,14 +49,14 @@ class ResumeTestCase(cases.BaseServerTestCase):
bad_token = 'a' * len(token)
self.addClient()
self.sendLine(3, 'CAP LS')
self.sendLine(3, 'CAP REQ :batch draft/labeled-response server-time draft/resume-0.3')
self.sendLine(3, 'CAP REQ :batch labeled-response server-time draft/resume-0.5')
self.sendLine(3, 'NICK tempnick')
self.sendLine(3, 'USER tempuser 0 * tempuser')
self.sendLine(3, 'RESUME ' + bad_token + ' 2006-01-02T15:04:05.999Z')
self.sendLine(3, ' '.join(('RESUME', bad_token, ANCIENT_TIMESTAMP)))
# resume with a bad token MUST fail
ms = self.getMessages(3)
resume_err_messages = [m for m in ms if m.command == 'RESUME' and m.params[0] == 'ERR']
resume_err_messages = [m for m in ms if m.command == 'FAIL' and m.params[:2] == ['RESUME', 'INVALID_TOKEN']]
self.assertEqual(len(resume_err_messages), 1)
# however, registration should proceed with the alternative nick
self.sendLine(3, 'CAP END')
@ -59,36 +65,112 @@ class ResumeTestCase(cases.BaseServerTestCase):
self.addClient()
self.sendLine(4, 'CAP LS')
self.sendLine(4, 'CAP REQ :batch draft/labeled-response server-time draft/resume-0.3')
self.sendLine(4, 'CAP REQ :batch labeled-response server-time draft/resume-0.5')
self.sendLine(4, 'NICK tempnick_')
self.sendLine(4, 'USER tempuser 0 * tempuser')
# resume with a timestamp in the distant past
self.sendLine(4, 'RESUME ' + token + ' 2006-01-02T15:04:05.999Z')
self.sendLine(4, ' '.join(('RESUME', token, ANCIENT_TIMESTAMP)))
# successful resume does not require CAP END:
# https://github.com/ircv3/ircv3-specifications/pull/306/files#r255318883
ms = self.getMessages(4)
# now, do a valid resume with the correct token
resume_messages = [m for m in ms if m.command == 'RESUME']
self.assertEqual(len(resume_messages), 2)
self.assertEqual(resume_messages[0].params[0], 'TOKEN')
new_token = resume_messages[0].params[1]
self.assertNotEqual(token, new_token, 'should receive a new, strong resume token; instead got ' + new_token)
# success message
self.assertMessageEqual(resume_messages[1], command='RESUME', params=['SUCCESS', 'baz'])
# test replay of messages
privmsgs = [m for m in ms if m.command == 'PRIVMSG' and m.prefix.startswith('bar')]
self.assertEqual(len(privmsgs), 2)
privmsgs.sort(key=lambda m: m.params[0])
self.assertMessageEqual(privmsgs[0], command='PRIVMSG', params=['#xyz', 'hello friends'])
self.assertMessageEqual(privmsgs[0], command='PRIVMSG', params=[chname, 'hello friends'])
self.assertMessageEqual(privmsgs[1], command='PRIVMSG', params=['baz', 'hello friend singular'])
# should replay with the original server-time
# TODO this probably isn't testing anything because the timestamp only has second resolution,
# hence will typically match by accident
self.assertEqual(privmsgs[0].tags.get('time'), channelMsgTime)
# legacy client should receive a QUIT and a JOIN
quit, join = [m for m in self.getMessages(1) if m.command in ('QUIT', 'JOIN')]
self.assertEqual(quit.command, 'QUIT')
self.assertTrue(quit.prefix.startswith('baz'))
self.assertMessageEqual(join, command='JOIN', params=[chname])
self.assertTrue(join.prefix.startswith('baz'))
# original client should have been disconnected
self.assertDisconnected(2)
# new client should be receiving PRIVMSG sent to baz
self.sendLine(1, 'PRIVMSG baz :hello again')
self.getMessages(1)
self.assertMessageEqual(self.getMessage(4), command='PRIVMSG', params=['baz', 'hello again'])
# test chain-resuming (resuming the resumed connection, using the new token)
self.addClient()
self.sendLine(5, 'CAP LS')
self.sendLine(5, 'CAP REQ :batch labeled-response server-time draft/resume-0.5')
self.sendLine(5, 'NICK tempnick_')
self.sendLine(5, 'USER tempuser 0 * tempuser')
self.sendLine(5, 'RESUME ' + new_token)
ms = self.getMessages(5)
resume_messages = [m for m in ms if m.command == 'RESUME']
self.assertEqual(len(resume_messages), 2)
self.assertEqual(resume_messages[0].params[0], 'TOKEN')
new_new_token = resume_messages[0].params[1]
self.assertNotEqual(token, new_new_token, 'should receive a new, strong resume token; instead got ' + new_new_token)
self.assertNotEqual(new_token, new_new_token, 'should receive a new, strong resume token; instead got ' + new_new_token)
# success message
self.assertMessageEqual(resume_messages[1], command='RESUME', params=['SUCCESS', 'baz'])
@cases.SpecificationSelector.requiredBySpecification('Oragono')
def testBRB(self):
chname = '#' + secrets.token_hex(12)
self.connectClient('bar', capabilities=['batch', 'labeled-response', 'message-tags', 'server-time', 'draft/resume-0.5'])
ms = self.getMessages(1)
self.joinChannel(1, chname)
welcome = self.connectClient('baz', capabilities=['batch', 'labeled-response', 'server-time', 'draft/resume-0.5'])
resume_messages = [m for m in welcome if m.command == 'RESUME']
self.assertEqual(len(resume_messages), 1)
self.assertEqual(resume_messages[0].params[0], 'TOKEN')
token = resume_messages[0].params[1]
self.joinChannel(2, chname)
self.getMessages(1)
self.sendLine(2, 'BRB :software upgrade')
# should receive, e.g., `BRB 210` (number of seconds)
ms = [m for m in self.getMessages(2) if m.command == 'BRB']
self.assertEqual(len(ms), 1)
self.assertGreater(int(ms[0].params[0]), 1)
# BRB disconnects you
self.assertDisconnected(2)
# without sending a QUIT line to friends
self.assertEqual(self.getMessages(1), [])
self.sendLine(1, 'PRIVMSG baz :hey there')
# BRB message should be sent as an away message
self.assertMessageEqual(self.getMessage(1), command=RPL_AWAY, params=['bar', 'baz', 'software upgrade'])
self.addClient(3)
self.sendLine(3, 'CAP REQ :batch account-tag message-tags draft/resume-0.5')
self.sendLine(3, ' '.join(('RESUME', token, ANCIENT_TIMESTAMP)))
ms = self.getMessages(3)
resume_messages = [m for m in ms if m.command == 'RESUME']
self.assertEqual(len(resume_messages), 2)
self.assertEqual(resume_messages[0].params[0], 'TOKEN')
self.assertMessageEqual(resume_messages[1], command='RESUME', params=['SUCCESS', 'baz'])
privmsgs = [m for m in ms if m.command == 'PRIVMSG' and m.prefix.startswith('bar')]
self.assertEqual(len(privmsgs), 1)
self.assertMessageEqual(privmsgs[0], params=['baz', 'hey there'])
# friend with the resume cap should receive a RESUMED message
resumed_messages = [m for m in self.getMessages(1) if m.command == 'RESUMED']
self.assertEqual(len(resumed_messages), 1)
self.assertTrue(resumed_messages[0].prefix.startswith('baz'))

View File

@ -0,0 +1,55 @@
from irctest import cases
from irctest.numerics import ERR_CANNOTSENDRP
from irctest.irc_utils.random import random_name
class RoleplayTestCase(cases.BaseServerTestCase):
def customizedConfig(self):
config = self.controller.baseConfig()
config['roleplay'] = {
'enabled': True,
}
return self.controller.addMysqlToConfig(config)
@cases.SpecificationSelector.requiredBySpecification('Oragono')
def testRoleplay(self):
bar = random_name('bar')
qux = random_name('qux')
chan = random_name('#chan')
self.connectClient(bar, name=bar, capabilities=['batch', 'labeled-response', 'message-tags', 'server-time'])
self.connectClient(qux, name=qux, capabilities=['batch', 'labeled-response', 'message-tags', 'server-time'])
self.joinChannel(bar, chan)
self.joinChannel(qux, chan)
self.getMessages(bar)
# roleplay should be forbidden because we aren't +E yet
self.sendLine(bar, 'NPC %s bilbo too much bread' % (chan,))
reply = self.getMessages(bar)[0]
self.assertEqual(reply.command, ERR_CANNOTSENDRP)
self.sendLine(bar, 'MODE %s +E' % (chan,))
reply = self.getMessages(bar)[0]
self.assertEqual(reply.command, 'MODE')
self.assertMessageEqual(reply, command='MODE', params=[chan, '+E'])
self.getMessages(qux)
self.sendLine(bar, 'NPC %s bilbo too much bread' % (chan,))
reply = self.getMessages(bar)[0]
self.assertEqual(reply.command, 'PRIVMSG')
self.assertEqual(reply.params[0], chan)
self.assertTrue(reply.prefix.startswith('*bilbo*!'))
self.assertIn('too much bread', reply.params[1])
reply = self.getMessages(qux)[0]
self.assertEqual(reply.command, 'PRIVMSG')
self.assertEqual(reply.params[0], chan)
self.assertTrue(reply.prefix.startswith('*bilbo*!'))
self.assertIn('too much bread', reply.params[1])
# test history storage
self.sendLine(qux, 'CHATHISTORY LATEST %s * 10' % (chan,))
reply = [msg for msg in self.getMessages(qux) if msg.command == 'PRIVMSG' and 'bilbo' in msg.prefix][0]
self.assertEqual(reply.command, 'PRIVMSG')
self.assertEqual(reply.params[0], chan)
self.assertTrue(reply.prefix.startswith('*bilbo*!'))
self.assertIn('too much bread', reply.params[1])

View File

@ -1,62 +0,0 @@
"""
<http://ircv3.net/specs/extensions/tls-3.1.html>
"""
from irctest import cases
from irctest.basecontrollers import NotImplementedByController
class StarttlsFailTestCase(cases.BaseServerTestCase):
@cases.SpecificationSelector.requiredBySpecification('IRCv3.1')
def testStarttlsRequestTlsFail(self):
"""<http://ircv3.net/specs/extensions/tls-3.1.html>
"""
self.addClient()
# TODO: check also without this
self.sendLine(1, 'CAP LS')
capabilities = self.getCapLs(1)
if 'tls' not in capabilities:
raise NotImplementedByController('starttls')
# TODO: check also without this
self.sendLine(1, 'CAP REQ :tls')
m = self.getRegistrationMessage(1)
# TODO: Remove this once the trailing space issue is fixed in Charybdis
# and Mammon:
#self.assertMessageEqual(m, command='CAP', params=['*', 'ACK', 'tls'],
# fail_msg='Did not ACK capability `tls`: {msg}')
self.sendLine(1, 'STARTTLS')
m = self.getRegistrationMessage(1)
self.assertMessageEqual(m, command='691',
fail_msg='Did not respond to STARTTLS with 691 whereas '
'SSL is not configured: {msg}.')
class StarttlsTestCase(cases.BaseServerTestCase):
ssl = True
def testStarttlsRequestTls(self):
"""<http://ircv3.net/specs/extensions/tls-3.1.html>
"""
self.addClient()
# TODO: check also without this
self.sendLine(1, 'CAP LS')
capabilities = self.getCapLs(1)
if 'tls' not in capabilities:
raise NotImplementedByController('starttls')
# TODO: check also without this
self.sendLine(1, 'CAP REQ :tls')
m = self.getRegistrationMessage(1)
# TODO: Remove this one the trailing space issue is fixed in Charybdis
# and Mammon:
#self.assertMessageEqual(m, command='CAP', params=['*', 'ACK', 'tls'],
# fail_msg='Did not ACK capability `tls`: {msg}')
self.sendLine(1, 'STARTTLS')
m = self.getRegistrationMessage(1)
self.assertMessageEqual(m, command='670',
fail_msg='Did not respond to STARTTLS with 670: {msg}.')
self.clients[1].starttls()
self.sendLine(1, 'USER f * * :foo')
self.sendLine(1, 'NICK foo')
self.sendLine(1, 'CAP END')
self.getMessages(1)

View File

@ -0,0 +1,50 @@
from irctest import cases
from irctest.numerics import RPL_ISUPPORT
from irctest.numerics import RPL_NAMREPLY
class StatusmsgTestCase(cases.BaseServerTestCase):
@cases.SpecificationSelector.requiredBySpecification('Oragono')
def testInIsupport(self):
"""Check that the expected STATUSMSG parameter appears in our isupport list."""
self.addClient()
self.sendLine(1, 'USER foo foo foo :foo')
self.sendLine(1, 'NICK bar')
self.skipToWelcome(1)
messages = self.getMessages(1)
isupport = set()
for message in messages:
if message.command == RPL_ISUPPORT:
isupport.update(message.params)
self.assertIn('STATUSMSG=~&@%+', isupport)
@cases.SpecificationSelector.requiredBySpecification('Oragono')
def testStatusmsg(self):
"""Test that STATUSMSG are sent to the intended recipients, with the intended prefixes."""
self.connectClient('chanop')
self.joinChannel(1, '#chan')
self.getMessages(1)
self.connectClient('joe')
self.joinChannel(2, '#chan')
self.getMessages(2)
self.connectClient('schmoe')
self.sendLine(3, 'join #chan')
messages = self.getMessages(3)
names = set()
for message in messages:
if message.command == RPL_NAMREPLY:
names.update(set(message.params[-1].split()))
# chanop should be opped
self.assertEqual(names, {'@chanop', 'joe', 'schmoe'}, f'unexpected names: {names}')
self.sendLine(3, 'privmsg @#chan :this message is for operators')
self.getMessages(3)
# check the operator's messages
statusMsg = self.getMessage(1, filter_pred=lambda m:m.command == 'PRIVMSG')
self.assertMessageEqual(statusMsg, params=['@#chan', 'this message is for operators'])
# check the non-operator's messages
unprivilegedMessages = [msg for msg in self.getMessages(2) if msg.command == 'PRIVMSG']
self.assertEqual(len(unprivilegedMessages), 0)

View File

@ -39,7 +39,7 @@ class InvisibleTestCase(cases.BaseServerTestCase):
def testInvisibleWhois(self):
"""Test interaction between MODE +i and RPL_WHOISCHANNELS."""
self.connectClient('userOne')
self.sendLine(1, 'JOIN #xyz')
self.joinChannel(1, '#xyz')
self.connectClient('userTwo')
self.getMessages(2)

View File

@ -0,0 +1,86 @@
import time
from irctest import cases
from irctest.irc_utils.junkdrawer import ircv3_timestamp_to_unixtime
from irctest.irc_utils.junkdrawer import to_history_message
from irctest.irc_utils.random import random_name
class ZncPlaybackTestCase(cases.BaseServerTestCase):
def customizedConfig(self):
return self.controller.addMysqlToConfig()
@cases.SpecificationSelector.requiredBySpecification('Oragono')
def testZncPlayback(self):
early_time = int(time.time() - 60)
chname = random_name('#znc_channel')
bar = random_name('bar')
self.controller.registerUser(self, bar, bar)
self.connectClient(bar, name=bar, capabilities=['batch', 'labeled-response', 'message-tags', 'server-time', 'echo-message'], password=bar)
self.joinChannel(bar, chname)
qux = random_name('qux')
self.connectClient(qux, name=qux, capabilities=['batch', 'labeled-response', 'message-tags', 'server-time', 'echo-message'])
self.joinChannel(qux, chname)
self.sendLine(qux, 'PRIVMSG %s :hi there' % (bar,))
dm = to_history_message([msg for msg in self.getMessages(qux) if msg.command == 'PRIVMSG'][0])
self.assertEqual(dm.text, 'hi there')
NUM_MESSAGES = 10
echo_messages = []
for i in range(NUM_MESSAGES):
self.sendLine(qux, 'PRIVMSG %s :this is message %d' % (chname, i))
echo_messages.extend(to_history_message(msg) for msg in self.getMessages(qux) if msg.command == 'PRIVMSG')
time.sleep(0.003)
self.assertEqual(len(echo_messages), NUM_MESSAGES)
self.getMessages(bar)
# reattach to 'bar'
self.connectClient(bar, name='viewer', capabilities=['batch', 'labeled-response', 'message-tags', 'server-time', 'echo-message'], password=bar)
self.sendLine('viewer', 'PRIVMSG *playback :play * %d' % (early_time,))
messages = [to_history_message(msg) for msg in self.getMessages('viewer') if msg.command == 'PRIVMSG']
self.assertEqual(set(messages), set([dm] + echo_messages))
self.sendLine('viewer', 'QUIT')
self.assertDisconnected('viewer')
# reattach to 'bar', play back selectively
self.connectClient(bar, name='viewer', capabilities=['batch', 'labeled-response', 'message-tags', 'server-time', 'echo-message'], password=bar)
mid_timestamp = ircv3_timestamp_to_unixtime(echo_messages[5].time)
# exclude message 5 itself (oragono's CHATHISTORY implementation corrects for this, but znc.in/playback does not because whatever)
mid_timestamp += .001
self.sendLine('viewer', 'PRIVMSG *playback :play * %s' % (mid_timestamp,))
messages = [to_history_message(msg) for msg in self.getMessages('viewer') if msg.command == 'PRIVMSG']
self.assertEqual(messages, echo_messages[6:])
self.sendLine('viewer', 'QUIT')
self.assertDisconnected('viewer')
# reattach to 'bar', play back selectively (pass a parameter and 2 timestamps)
self.connectClient(bar, name='viewer', capabilities=['batch', 'labeled-response', 'message-tags', 'server-time', 'echo-message'], password=bar)
start_timestamp = ircv3_timestamp_to_unixtime(echo_messages[2].time)
start_timestamp += .001
end_timestamp = ircv3_timestamp_to_unixtime(echo_messages[7].time)
self.sendLine('viewer', 'PRIVMSG *playback :play %s %s %s' % (chname, start_timestamp, end_timestamp,))
messages = [to_history_message(msg) for msg in self.getMessages('viewer') if msg.command == 'PRIVMSG']
self.assertEqual(messages, echo_messages[3:7])
# test nicknames as targets
self.sendLine('viewer', 'PRIVMSG *playback :play %s %d' % (qux, early_time,))
messages = [to_history_message(msg) for msg in self.getMessages('viewer') if msg.command == 'PRIVMSG']
self.assertEqual(messages, [dm])
self.sendLine('viewer', 'PRIVMSG *playback :play %s %d' % (qux.upper(), early_time,))
messages = [to_history_message(msg) for msg in self.getMessages('viewer') if msg.command == 'PRIVMSG']
self.assertEqual(messages, [dm])
self.sendLine('viewer', 'QUIT')
self.assertDisconnected('viewer')
# test limiting behavior
config = self.controller.getConfig()
config['history']['znc-maxmessages'] = 5
self.controller.rehash(self, config)
self.connectClient(bar, name='viewer', capabilities=['batch', 'labeled-response', 'message-tags', 'server-time', 'echo-message'], password=bar)
self.sendLine('viewer', 'PRIVMSG *playback :play %s %d' % (chname, int(time.time() - 60)))
messages = [to_history_message(msg) for msg in self.getMessages('viewer') if msg.command == 'PRIVMSG']
# should receive the latest 5 messages
self.assertEqual(messages, echo_messages[5:])