Files
irctest/irctest/server_tests/test_channel_operations.py
2021-02-08 17:47:11 -05:00

1044 lines
45 KiB
Python

"""
Section 3.2 of RFC 2812
<https://tools.ietf.org/html/rfc2812#section-3.2>
"""
from irctest import cases
from irctest import client_mock
from irctest import runner
from irctest.irc_utils import ambiguities
from irctest.numerics import RPL_TOPIC, RPL_TOPICTIME, RPL_NOTOPIC, RPL_NAMREPLY, RPL_INVITING
from irctest.numerics import ERR_NOSUCHCHANNEL, ERR_NOTONCHANNEL, ERR_CHANOPRIVSNEEDED, ERR_NOSUCHNICK, ERR_INVITEONLYCHAN, ERR_CANNOTSENDTOCHAN, ERR_BADCHANNELKEY, ERR_INVALIDMODEPARAM, ERR_UNKNOWNERROR
MODERN_CAPS = ['server-time', 'message-tags', 'batch', 'labeled-response', 'echo-message', 'account-tag']
class JoinTestCase(cases.BaseServerTestCase):
@cases.SpecificationSelector.requiredBySpecification('RFC1459', 'RFC2812',
strict=True)
def testJoinAllMessages(self):
"""“If a JOIN is successful, the user receives a JOIN message as
confirmation and is then sent the channel's topic (using RPL_TOPIC) and
the list of users who are on the channel (using RPL_NAMREPLY), which
MUST include the user joining.”
-- <https://tools.ietf.org/html/rfc2812#section-3.2.1>
“If a JOIN is successful, the user is then sent the channel's topic
(using RPL_TOPIC) and the list of users who are on the channel (using
RPL_NAMREPLY), which must include the user joining.”
-- <https://tools.ietf.org/html/rfc1459#section-4.2.1>
"""
self.connectClient('foo')
self.sendLine(1, 'JOIN #chan')
received_commands = {m.command for m in self.getMessages(1)}
expected_commands = {
'353', # RPL_NAMREPLY
'366', # RPL_ENDOFNAMES
}
self.assertTrue(expected_commands.issubset(received_commands),
'Server sent {} commands, but at least {} were expected.'
.format(received_commands, expected_commands))
@cases.SpecificationSelector.requiredBySpecification('RFC2812')
def testJoinNamreply(self):
"""“353 RPL_NAMREPLY
"( "=" / "*" / "@" ) <channel>
:[ "@" / "+" ] <nick> *( " " [ "@" / "+" ] <nick> )”
-- <https://tools.ietf.org/html/rfc2812#section-5.2>
This test makes a user join and check what is sent to them.
"""
self.connectClient('foo')
self.sendLine(1, 'JOIN #chan')
for m in self.getMessages(1):
if m.command == '353':
self.assertIn(len(m.params), (3, 4), m,
fail_msg='RPL_NAM_REPLY with number of arguments '
'<3 or >4: {msg}')
params = ambiguities.normalize_namreply_params(m.params)
self.assertIn(params[1], '=*@', m,
fail_msg='Bad channel prefix: {item} not in {list}: {msg}')
self.assertEqual(params[2], '#chan', m,
fail_msg='Bad channel name: {got} instead of '
'{expects}: {msg}')
self.assertIn(params[3], {'foo', '@foo', '+foo'}, m,
fail_msg='Bad user list: should contain only user '
'"foo" with an optional "+" or "@" prefix, but got: '
'{msg}')
@cases.SpecificationSelector.requiredBySpecification('RFC1459', 'RFC2812')
def testPartNotInEmptyChannel(self):
"""“442 ERR_NOTONCHANNEL
"<channel> :You're not on that channel"
- Returned by the server whenever a client tries to
perform a channel effecting command for which the
client isn't a member.”
-- <https://tools.ietf.org/html/rfc1459#section-6.1>
and <https://tools.ietf.org/html/rfc2812#section-5.2>
According to RFCs, ERR_NOSUCHCHANNEL should only be used for invalid
channel names:
“403 ERR_NOSUCHCHANNEL
"<channel name> :No such channel"
- Used to indicate the given channel name is invalid.”
-- <https://tools.ietf.org/html/rfc1459#section-6.1>
and <https://tools.ietf.org/html/rfc2812#section-5.2>
However, many implementations use 479 instead, so let's allow it.
<http://danieloaks.net/irc-defs/defs/ircnumerics.html#403>
<http://danieloaks.net/irc-defs/defs/ircnumerics.html#479>
"""
self.connectClient('foo')
self.sendLine(1, 'PART #chan')
m = self.getMessage(1)
self.assertIn(m.command, {'442', '403'}, m, # ERR_NOTONCHANNEL, ERR_NOSUCHCHANNEL
fail_msg='Expected ERR_NOTONCHANNEL (442) or '
'ERR_NOSUCHCHANNEL (403) after PARTing an empty channel '
'one is not on, but got: {msg}')
@cases.SpecificationSelector.requiredBySpecification('RFC1459', 'RFC2812')
def testPartNotInNonEmptyChannel(self):
self.connectClient('foo')
self.connectClient('bar')
self.sendLine(1, 'JOIN #chan')
self.getMessages(1) # Synchronize
self.sendLine(2, 'PART #chan')
m = self.getMessage(2)
self.assertMessageEqual(m, command='442', # ERR_NOTONCHANNEL
fail_msg='Expected ERR_NOTONCHANNEL (442) '
'after PARTing a non-empty channel '
'one is not on, but got: {msg}')
self.assertEqual(self.getMessages(2), [])
testPartNotInNonEmptyChannel.__doc__ = testPartNotInEmptyChannel.__doc__
def testJoinTwice(self):
self.connectClient('foo')
self.sendLine(1, 'JOIN #chan')
m = self.getMessage(1)
self.assertMessageEqual(m, command='JOIN', params=['#chan'])
self.getMessages(1)
self.sendLine(1, 'JOIN #chan')
# Note that there may be no message. Both RFCs require replies only
# if the join is successful, or has an error among the given set.
for m in self.getMessages(1):
if m.command == '353':
self.assertIn(len(m.params), (3, 4), m,
fail_msg='RPL_NAM_REPLY with number of arguments '
'<3 or >4: {msg}')
params = ambiguities.normalize_namreply_params(m.params)
self.assertIn(params[1], '=*@', m,
fail_msg='Bad channel prefix: {item} not in {list}: {msg}')
self.assertEqual(params[2], '#chan', m,
fail_msg='Bad channel name: {got} instead of '
'{expects}: {msg}')
self.assertIn(params[3], {'foo', '@foo', '+foo'}, m,
fail_msg='Bad user list after user "foo" joined twice '
'the same channel: should contain only user '
'"foo" with an optional "+" or "@" prefix, but got: '
'{msg}')
@cases.SpecificationSelector.requiredBySpecification('RFC1459', 'RFC2812')
def testNormalPart(self):
self.connectClient('bar')
self.sendLine(1, 'JOIN #chan')
m = self.getMessage(1)
self.assertMessageEqual(m, command='JOIN', params=['#chan'])
self.connectClient('baz')
self.sendLine(2, 'JOIN #chan')
m = self.getMessage(2)
self.assertMessageEqual(m, command='JOIN', params=['#chan'])
# skip the rest of the JOIN burst:
self.getMessages(1)
self.getMessages(2)
self.sendLine(1, 'PART #chan :bye everyone')
# both the PART'ing client and the other channel member should receive a PART line:
m = self.getMessage(1)
self.assertMessageEqual(m, command='PART', params=['#chan', 'bye everyone'])
m = self.getMessage(2)
self.assertMessageEqual(m, command='PART', params=['#chan', 'bye everyone'])
@cases.SpecificationSelector.requiredBySpecification('RFC1459', 'RFC2812')
def testTopic(self):
"""“Once a user has joined a channel, he receives information about
all commands his server receives affecting the channel. This
includes […] TOPIC”
-- <https://tools.ietf.org/html/rfc1459#section-4.2.1>
and <https://tools.ietf.org/html/rfc2812#section-3.2.1>
"""
self.connectClient('foo')
self.joinChannel(1, '#chan')
self.connectClient('bar')
self.joinChannel(2, '#chan')
# clear waiting msgs about cli 2 joining the channel
self.getMessages(1)
self.getMessages(2)
# TODO: check foo is opped OR +t is unset
self.sendLine(1, 'TOPIC #chan :T0P1C')
try:
m = self.getMessage(1)
if m.command == '482':
raise runner.ImplementationChoice(
'Channel creators are not opped by default, and '
'channel modes to no allow regular users to change '
'topic.')
self.assertMessageEqual(m, command='TOPIC')
except client_mock.NoMessageException:
# The RFCs do not say TOPIC must be echoed
pass
m = self.getMessage(2)
self.assertMessageEqual(m, command='TOPIC', params=['#chan', 'T0P1C'])
@cases.SpecificationSelector.requiredBySpecification('RFC1459', 'RFC2812')
def testTopicMode(self):
"""“Once a user has joined a channel, he receives information about
all commands his server receives affecting the channel. This
includes […] TOPIC”
-- <https://tools.ietf.org/html/rfc1459#section-4.2.1>
and <https://tools.ietf.org/html/rfc2812#section-3.2.1>
"""
self.connectClient('foo')
self.joinChannel(1, '#chan')
self.connectClient('bar')
self.joinChannel(2, '#chan')
self.getMessages(1)
self.getMessages(2)
# TODO: check foo is opped
self.sendLine(1, 'MODE #chan +t')
self.getMessages(1)
self.sendLine(2, 'TOPIC #chan :T0P1C')
m = self.getMessage(2)
self.assertMessageEqual(m, command='482',
fail_msg='Non-op user was not refused use of TOPIC: {msg}')
self.assertEqual(self.getMessages(1), [])
self.sendLine(1, 'MODE #chan -t')
self.getMessages(1)
self.sendLine(2, 'TOPIC #chan :T0P1C')
try:
m = self.getMessage(2)
self.assertNotEqual(m.command, '482',
msg='User was refused TOPIC whereas +t was not '
'set: {}'.format(m))
except client_mock.NoMessageException:
# The RFCs do not say TOPIC must be echoed
pass
m = self.getMessage(1)
self.assertMessageEqual(m, command='TOPIC', params=['#chan', 'T0P1C'])
@cases.SpecificationSelector.requiredBySpecification('RFC2812')
def testTopicNonexistentChannel(self):
"""RFC2812 specifies ERR_NOTONCHANNEL as the correct response to TOPIC
on a nonexistent channel. The modern spec prefers ERR_NOSUCHCHANNEL.
<https://tools.ietf.org/html/rfc2812#section-3.2.4>
<http://modern.ircdocs.horse/#topic-message>
"""
self.connectClient('foo')
self.sendLine(1, 'TOPIC #chan')
m = self.getMessage(1)
# either 403 ERR_NOSUCHCHANNEL or 443 ERR_NOTONCHANNEL
self.assertIn(m.command, ('403', '443'))
@cases.SpecificationSelector.requiredBySpecification('RFC2812')
def testUnsetTopicResponses(self):
"""Test various cases related to RPL_NOTOPIC with set and unset topics."""
self.connectClient('bar')
self.sendLine(1, 'JOIN #test')
messages = self.getMessages(1)
# shouldn't send RPL_NOTOPIC for a new channel
self.assertNotIn(RPL_NOTOPIC, [m.command for m in messages])
self.connectClient('baz')
self.sendLine(2, 'JOIN #test')
messages = self.getMessages(2)
# topic is still unset, shouldn't send RPL_NOTOPIC on initial join
self.assertNotIn(RPL_NOTOPIC, [m.command for m in messages])
self.sendLine(2, 'TOPIC #test')
messages = self.getMessages(2)
# explicit TOPIC should receive RPL_NOTOPIC
self.assertIn(RPL_NOTOPIC, [m.command for m in messages])
self.sendLine(1, 'TOPIC #test :new topic')
self.getMessages(1)
# client 2 should get the new TOPIC line
messages = [message for message in self.getMessages(2) if message.command == 'TOPIC']
self.assertEqual(len(messages), 1)
self.assertEqual(messages[0].params, ['#test', 'new topic'])
# unset the topic:
self.sendLine(1, 'TOPIC #test :')
self.getMessages(1)
self.connectClient('qux')
self.sendLine(3, 'join #test')
messages = self.getMessages(3)
# topic is once again unset, shouldn't send RPL_NOTOPIC on initial join
self.assertNotIn(RPL_NOTOPIC, [m.command for m in messages])
@cases.SpecificationSelector.requiredBySpecification('RFC1459', 'RFC2812')
def testListEmpty(self):
"""<https://tools.ietf.org/html/rfc1459#section-4.2.6>
<https://tools.ietf.org/html/rfc2812#section-3.2.6>
"""
self.connectClient('foo')
self.connectClient('bar')
self.getMessages(1)
self.sendLine(2, 'LIST')
m = self.getMessage(2)
if m.command == '321':
# skip RPL_LISTSTART
m = self.getMessage(2)
self.assertNotEqual(m.command, '322', # RPL_LIST
'LIST response gives (at least) one channel, whereas there '
'is none.')
self.assertMessageEqual(m, command='323', # RPL_LISTEND
fail_msg='Second reply to LIST is not 322 (RPL_LIST) '
'or 323 (RPL_LISTEND), or but: {msg}')
@cases.SpecificationSelector.requiredBySpecification('RFC1459', 'RFC2812')
def testListOne(self):
"""When a channel exists, LIST should get it in a reply.
<https://tools.ietf.org/html/rfc1459#section-4.2.6>
<https://tools.ietf.org/html/rfc2812#section-3.2.6>
"""
self.connectClient('foo')
self.connectClient('bar')
self.sendLine(1, 'JOIN #chan')
self.getMessages(1)
self.sendLine(2, 'LIST')
m = self.getMessage(2)
if m.command == '321':
# skip RPL_LISTSTART
m = self.getMessage(2)
self.assertNotEqual(m.command, '323', # RPL_LISTEND
fail_msg='LIST response ended (ie. 323, aka RPL_LISTEND) '
'without listing any channel, whereas there is one.')
self.assertMessageEqual(m, command='322', # RPL_LIST
fail_msg='Second reply to LIST is not 322 (RPL_LIST), '
'nor 323 (RPL_LISTEND) but: {msg}')
m = self.getMessage(2)
self.assertNotEqual(m.command, '322', # RPL_LIST
fail_msg='LIST response gives (at least) two channels, '
'whereas there is only one.')
self.assertMessageEqual(m, command='323', # RPL_LISTEND
fail_msg='Third reply to LIST is not 322 (RPL_LIST) '
'or 323 (RPL_LISTEND), or but: {msg}')
@cases.SpecificationSelector.requiredBySpecification('RFC1459', 'RFC2812')
def testKickSendsMessages(self):
"""“Once a user has joined a channel, he receives information about
all commands his server receives affecting the channel. This
includes […] KICK”
-- <https://tools.ietf.org/html/rfc1459#section-4.2.1>
and <https://tools.ietf.org/html/rfc2812#section-3.2.1>
"""
self.connectClient('foo')
self.joinChannel(1, '#chan')
self.connectClient('bar')
self.joinChannel(2, '#chan')
self.connectClient('baz')
self.joinChannel(3, '#chan')
# TODO: check foo is an operator
self.getMessages(1)
self.getMessages(2)
self.getMessages(3)
self.sendLine(1, 'KICK #chan bar :bye')
try:
m = self.getMessage(1)
if m.command == '482':
raise runner.ImplementationChoice(
'Channel creators are not opped by default.')
self.assertMessageEqual(m, command='KICK')
except client_mock.NoMessageException:
# The RFCs do not say KICK must be echoed
pass
m = self.getMessage(2)
self.assertMessageEqual(m, command='KICK',
params=['#chan', 'bar', 'bye'])
m = self.getMessage(3)
self.assertMessageEqual(m, command='KICK',
params=['#chan', 'bar', 'bye'])
@cases.SpecificationSelector.requiredBySpecification('RFC2812')
def testKickPrivileges(self):
"""Test who has the ability to kick / what error codes are sent for invalid kicks."""
self.connectClient('foo')
self.sendLine(1, 'JOIN #chan')
self.getMessages(1)
self.connectClient('bar')
self.sendLine(2, 'JOIN #chan')
messages = self.getMessages(2)
names = set()
for message in messages:
if message.command == RPL_NAMREPLY:
names.update(set(message.params[-1].split()))
# assert foo is opped
self.assertIn('@foo', names, f'unexpected names: {names}')
self.connectClient('baz')
self.sendLine(3, 'KICK #chan bar')
replies = set(m.command for m in self.getMessages(3))
self.assertTrue(
ERR_NOTONCHANNEL in replies or ERR_CHANOPRIVSNEEDED in replies or ERR_NOSUCHCHANNEL in replies,
f'did not receive acceptable error code for kick from outside channel: {replies}')
self.joinChannel(3, '#chan')
self.getMessages(3)
self.sendLine(3, 'KICK #chan bar')
replies = set(m.command for m in self.getMessages(3))
# now we're a channel member so we should receive ERR_CHANOPRIVSNEEDED
self.assertIn(ERR_CHANOPRIVSNEEDED, replies)
self.sendLine(1, 'MODE #chan +o baz')
self.getMessages(1)
# should be able to kick an unprivileged user:
self.sendLine(3, 'KICK #chan bar')
# should be able to kick an operator:
self.sendLine(3, 'KICK #chan foo')
baz_replies = set(m.command for m in self.getMessages(3))
self.assertNotIn(ERR_CHANOPRIVSNEEDED, baz_replies)
kick_targets = [m.params[1] for m in self.getMessages(1) if m.command == 'KICK']
# foo should see bar and foo being kicked
self.assertTrue(any(target.startswith('foo') for target in kick_targets), f'unexpected kick targets: {kick_targets}')
self.assertTrue(any(target.startswith('bar') for target in kick_targets), f'unexpected kick targets: {kick_targets}')
@cases.SpecificationSelector.requiredBySpecification('RFC2812')
def testKickNonexistentChannel(self):
"""“Kick command [...] Numeric replies: [...] ERR_NOSUCHCHANNEL."""
self.connectClient('foo')
self.sendLine(1, 'KICK #chan nick')
m = self.getMessage(1)
# should return ERR_NOSUCHCHANNEL
self.assertMessageEqual(m, command='403')
@cases.SpecificationSelector.requiredBySpecification('RFC2812')
def testDoubleKickMessages(self):
"""“The server MUST NOT send KICK messages with multiple channels or
users to clients. This is necessarily to maintain backward
compatibility with old client software.”
-- https://tools.ietf.org/html/rfc2812#section-3.2.8
"""
self.connectClient('foo')
self.joinChannel(1, '#chan')
self.connectClient('bar')
self.joinChannel(2, '#chan')
self.connectClient('baz')
self.joinChannel(3, '#chan')
self.connectClient('qux')
self.joinChannel(4, '#chan')
# TODO: check foo is an operator
# Synchronize
self.getMessages(1)
self.getMessages(2)
self.getMessages(3)
self.getMessages(4)
self.sendLine(1, 'KICK #chan,#chan bar,baz :bye')
try:
m = self.getMessage(1)
if m.command == '482':
raise runner.OptionalExtensionNotSupported(
'Channel creators are not opped by default.')
if m.command in {'401', '403'}:
raise runner.NotImplementedByController(
'Multi-target KICK')
except client_mock.NoMessageException:
# The RFCs do not say KICK must be echoed
pass
mgroup = self.getMessages(4)
self.assertGreaterEqual(len(mgroup), 2)
m1, m2 = mgroup[:2]
for m in m1, m2:
self.assertEqual(m.command, 'KICK')
self.assertEqual(len(m.params), 3)
self.assertEqual(m.params[0], '#chan')
self.assertEqual(m.params[2], 'bye')
if (m1.params[1] == 'bar' and m2.params[1] == 'baz') or (m1.params[1] == 'baz' and m2.params[1] == 'bar'):
... # success
else:
raise AssertionError('Middle params [{}, {}] are not correct.'.format(m1.params[1], m2.params[1]))
@cases.SpecificationSelector.requiredBySpecification('RFC-deprecated')
def testInviteNonExistingChannelTransmitted(self):
"""“There is no requirement that the channel the target user is being
invited to must exist or be a valid channel.”
-- <https://tools.ietf.org/html/rfc1459#section-4.2.7>
and <https://tools.ietf.org/html/rfc2812#section-3.2.7>
“Only the user inviting and the user being invited will receive
notification of the invitation.”
-- <https://tools.ietf.org/html/rfc2812#section-3.2.7>
"""
self.connectClient('foo')
self.connectClient('bar')
self.getMessages(1)
self.getMessages(2)
self.sendLine(1, 'INVITE #chan bar')
self.getMessages(1)
l = self.getMessages(2)
self.assertNotEqual(l, [],
fail_msg='After using “INVITE #chan bar” while #chan does '
'not exist, “bar” received nothing.')
self.assertMessageEqual(l[0], command='INVITE',
params=['#chan', 'bar'],
fail_msg='After “foo” invited “bar” do non-existing channel '
'#chan, “bar” should have received “INVITE #chan bar” but '
'got this instead: {msg}')
@cases.SpecificationSelector.requiredBySpecification('RFC-deprecated')
def testInviteNonExistingChannelEchoed(self):
"""“There is no requirement that the channel the target user is being
invited to must exist or be a valid channel.”
-- <https://tools.ietf.org/html/rfc1459#section-4.2.7>
and <https://tools.ietf.org/html/rfc2812#section-3.2.7>
“Only the user inviting and the user being invited will receive
notification of the invitation.”
-- <https://tools.ietf.org/html/rfc2812#section-3.2.7>
"""
self.connectClient('foo')
self.connectClient('bar')
self.getMessages(1)
self.getMessages(2)
self.sendLine(1, 'INVITE #chan bar')
l = self.getMessages(1)
self.assertNotEqual(l, [],
fail_msg='After using “INVITE #chan bar” while #chan does '
'not exist, the author received nothing.')
self.assertMessageEqual(l[0], command='INVITE',
params=['#chan', 'bar'],
fail_msg='After “foo” invited “bar” do non-existing channel '
'#chan, “foo” should have received “INVITE #chan bar” but '
'got this instead: {msg}')
class testChannelCaseSensitivity(cases.BaseServerTestCase):
def _testChannelsEquivalent(casemapping, name1, name2):
@cases.SpecificationSelector.requiredBySpecification('RFC1459', 'RFC2812',
strict=True)
def f(self):
self.connectClient('foo')
self.connectClient('bar')
if self.server_support['CASEMAPPING'] != casemapping:
raise runner.NotImplementedByController('Casemapping {} not implemented'.format(casemapping))
self.joinClient(1, name1)
self.joinClient(2, name2)
try:
m = self.getMessage(1)
self.assertMessageEqual(m, command='JOIN',
nick='bar')
except client_mock.NoMessageException:
raise AssertionError(
'Channel names {} and {} are not equivalent.'
.format(name1, name2))
f.__name__ = 'testEquivalence__{}__{}'.format(name1, name2)
return f
def _testChannelsNotEquivalent(casemapping, name1, name2):
@cases.SpecificationSelector.requiredBySpecification('RFC1459', 'RFC2812',
strict=True)
def f(self):
self.connectClient('foo')
self.connectClient('bar')
if self.server_support['CASEMAPPING'] != casemapping:
raise runner.NotImplementedByController('Casemapping {} not implemented'.format(casemapping))
self.joinClient(1, name1)
self.joinClient(2, name2)
try:
m = self.getMessage(1)
except client_mock.NoMessageException:
pass
else:
self.assertMessageEqual(m, command='JOIN',
nick='bar') # This should always be true
raise AssertionError(
'Channel names {} and {} are equivalent.'
.format(name1, name2))
f.__name__ = 'testEquivalence__{}__{}'.format(name1, name2)
return f
testAsciiSimpleEquivalent = _testChannelsEquivalent('ascii', '#Foo', '#foo')
testAsciiSimpleNotEquivalent = _testChannelsNotEquivalent('ascii', '#Foo', '#fooa')
testRfcSimpleEquivalent = _testChannelsEquivalent('rfc1459', '#Foo', '#foo')
testRfcSimpleNotEquivalent = _testChannelsNotEquivalent('rfc1459', '#Foo', '#fooa')
testRfcFancyEquivalent = _testChannelsEquivalent('rfc1459', '#F]|oo{', '#f}\\oo[')
testRfcFancyNotEquivalent = _testChannelsEquivalent('rfc1459', '#F}o\\o[', '#f]o|o{')
class InviteTestCase(cases.BaseServerTestCase):
@cases.SpecificationSelector.requiredBySpecification('IRCv3.2')
def testInvites(self):
"""Test some basic functionality related to INVITE and the +i mode."""
self.connectClient('foo')
self.joinChannel(1, '#chan')
self.sendLine(1, 'MODE #chan +i')
self.getMessages(1)
self.sendLine(1, 'INVITE bar #chan')
m = self.getMessage(1)
self.assertEqual(m.command, ERR_NOSUCHNICK)
self.connectClient('bar')
self.sendLine(2, 'JOIN #chan')
m = self.getMessage(2)
self.assertEqual(m.command, ERR_INVITEONLYCHAN)
self.sendLine(1, 'INVITE bar #chan')
m = self.getMessage(1)
self.assertEqual(m.command, RPL_INVITING)
# modern/ircv3 param order: inviter, invitee, channel
self.assertEqual(m.params, ['foo', 'bar', '#chan'])
m = self.getMessage(2)
self.assertEqual(m.command, 'INVITE')
self.assertTrue(m.prefix.startswith("foo")) # nickmask of inviter
self.assertEqual(m.params, ['bar', '#chan'])
# we were invited, so join should succeed now
self.joinChannel(2, '#chan')
class ChannelQuitTestCase(cases.BaseServerTestCase):
@cases.SpecificationSelector.requiredBySpecification('RFC2812')
def testQuit(self):
"""“Once a user has joined a channel, he receives information about
all commands his server receives affecting the channel. This
includes [...] QUIT”
<https://tools.ietf.org/html/rfc2812#section-3.2.1>
"""
self.connectClient('bar')
self.joinChannel(1, '#chan')
self.connectClient('qux')
self.sendLine(2, 'JOIN #chan')
self.getMessages(2)
self.getMessages(1)
self.sendLine(2, 'QUIT :qux out')
self.getMessages(2)
m = self.getMessage(1)
self.assertEqual(m.command, 'QUIT')
self.assertTrue(m.prefix.startswith('qux')) # nickmask of quitter
self.assertIn('qux out', m.params[0])
class NoCTCPTestCase(cases.BaseServerTestCase):
@cases.SpecificationSelector.requiredBySpecification('Oragono')
def testQuit(self):
self.connectClient('bar')
self.joinChannel(1, '#chan')
self.sendLine(1, 'MODE #chan +C')
self.getMessages(1)
self.connectClient('qux')
self.joinChannel(2, '#chan')
self.getMessages(2)
self.sendLine(1, 'PRIVMSG #chan :\x01ACTION hi\x01')
self.getMessages(1)
ms = self.getMessages(2)
self.assertEqual(len(ms), 1)
self.assertMessageEqual(ms[0], command='PRIVMSG', params=['#chan', '\x01ACTION hi\x01'])
self.sendLine(1, 'PRIVMSG #chan :\x01PING 1473523796 918320\x01')
ms = self.getMessages(1)
self.assertEqual(len(ms), 1)
self.assertMessageEqual(ms[0], command=ERR_CANNOTSENDTOCHAN)
ms = self.getMessages(2)
self.assertEqual(ms, [])
class KeyTestCase(cases.BaseServerTestCase):
@cases.SpecificationSelector.requiredBySpecification('RFC2812')
def testKeyNormal(self):
self.connectClient('bar')
self.joinChannel(1, '#chan')
self.sendLine(1, 'MODE #chan +k beer')
self.getMessages(1)
self.connectClient('qux')
self.getMessages(2)
self.sendLine(2, 'JOIN #chan')
reply = self.getMessages(2)
self.assertNotIn('JOIN', {msg.command for msg in reply})
self.assertIn(ERR_BADCHANNELKEY, {msg.command for msg in reply})
self.sendLine(2, 'JOIN #chan beer')
reply = self.getMessages(2)
self.assertMessageEqual(reply[0], command='JOIN', params=['#chan'])
@cases.SpecificationSelector.requiredBySpecification('Oragono')
def testKeyValidation(self):
# oragono issue #1021
self.connectClient('bar')
self.joinChannel(1, '#chan')
self.sendLine(1, 'MODE #chan +k :invalid channel passphrase')
reply = self.getMessages(1)
self.assertNotIn(ERR_UNKNOWNERROR, {msg.command for msg in reply})
self.assertIn(ERR_INVALIDMODEPARAM, {msg.command for msg in reply})
class AuditoriumTestCase(cases.BaseServerTestCase):
@cases.SpecificationSelector.requiredBySpecification('Oragono')
def testAuditorium(self):
self.connectClient('bar', name='bar', capabilities=MODERN_CAPS)
self.joinChannel('bar', '#auditorium')
self.getMessages('bar')
self.sendLine('bar', 'MODE #auditorium +u')
modelines = [msg for msg in self.getMessages('bar') if msg.command == 'MODE']
self.assertEqual(len(modelines), 1)
self.assertMessageEqual(modelines[0], params=['#auditorium', '+u'])
self.connectClient('guest1', name='guest1', capabilities=MODERN_CAPS)
self.joinChannel('guest1', '#auditorium')
self.getMessages('guest1')
# chanop should get a JOIN message
join_msgs = [msg for msg in self.getMessages('bar') if msg.command == 'JOIN']
self.assertEqual(len(join_msgs), 1)
self.assertMessageEqual(join_msgs[0], nick='guest1', params=['#auditorium'])
self.connectClient('guest2', name='guest2', capabilities=MODERN_CAPS)
self.joinChannel('guest2', '#auditorium')
self.getMessages('guest2')
# chanop should get a JOIN message
join_msgs = [msg for msg in self.getMessages('bar') if msg.command == 'JOIN']
self.assertEqual(len(join_msgs), 1)
self.assertMessageEqual(join_msgs[0], nick='guest2', params=['#auditorium'])
# fellow unvoiced participant should not
unvoiced_join_msgs = [msg for msg in self.getMessages('guest1') if msg.command == 'JOIN']
self.assertEqual(len(unvoiced_join_msgs), 0)
self.connectClient('guest3', name='guest3', capabilities=MODERN_CAPS)
self.joinChannel('guest3', '#auditorium')
self.getMessages('guest3')
self.sendLine('bar', 'PRIVMSG #auditorium hi')
echo_message = [msg for msg in self.getMessages('bar') if msg.command == 'PRIVMSG'][0]
self.assertEqual(echo_message, self.getMessages('guest1')[0])
self.assertEqual(echo_message, self.getMessages('guest2')[0])
self.assertEqual(echo_message, self.getMessages('guest3')[0])
# unvoiced users can speak
self.sendLine('guest1', 'PRIVMSG #auditorium :hi you')
echo_message = [msg for msg in self.getMessages('guest1') if msg.command == 'PRIVMSG'][0]
self.assertEqual(self.getMessages('bar'), [echo_message])
self.assertEqual(self.getMessages('guest2'), [echo_message])
self.assertEqual(self.getMessages('guest3'), [echo_message])
def names(client):
self.sendLine(client, 'NAMES #auditorium')
result = set()
for msg in self.getMessages(client):
if msg.command == RPL_NAMREPLY:
result.update(msg.params[-1].split())
return result
self.assertEqual(names('bar'), {'@bar', 'guest1', 'guest2', 'guest3'})
self.assertEqual(names('guest1'), {'@bar',})
self.assertEqual(names('guest2'), {'@bar',})
self.assertEqual(names('guest3'), {'@bar',})
self.sendLine('bar', 'MODE #auditorium +v guest1')
modeLine = [msg for msg in self.getMessages('bar') if msg.command == 'MODE'][0]
self.assertEqual(self.getMessages('guest1'), [modeLine])
self.assertEqual(self.getMessages('guest2'), [modeLine])
self.assertEqual(self.getMessages('guest3'), [modeLine])
self.assertEqual(names('bar'), {'@bar', '+guest1', 'guest2', 'guest3'})
self.assertEqual(names('guest2'), {'@bar', '+guest1'})
self.assertEqual(names('guest3'), {'@bar', '+guest1'})
self.sendLine('guest1', 'PART #auditorium')
part = [msg for msg in self.getMessages('guest1') if msg.command == 'PART'][0]
# everyone should see voiced PART
self.assertEqual(self.getMessages('bar')[0], part)
self.assertEqual(self.getMessages('guest2')[0], part)
self.assertEqual(self.getMessages('guest3')[0], part)
self.joinChannel('guest1', '#auditorium')
self.getMessages('guest1')
self.getMessages('bar')
self.sendLine('guest2', 'PART #auditorium')
part = [msg for msg in self.getMessages('guest2') if msg.command == 'PART'][0]
self.assertEqual(self.getMessages('bar'), [part])
# part should be hidden from unvoiced participants
self.assertEqual(self.getMessages('guest1'), [])
self.assertEqual(self.getMessages('guest3'), [])
self.sendLine('guest3', 'QUIT')
self.assertDisconnected('guest3')
# quit should be hidden from unvoiced participants
self.assertEqual(len([msg for msg in self.getMessages('bar') if msg.command =='QUIT']), 1)
self.assertEqual(len([msg for msg in self.getMessages('guest1') if msg.command =='QUIT']), 0)
class TopicPrivileges(cases.BaseServerTestCase):
@cases.SpecificationSelector.requiredBySpecification('RFC2812')
def testTopicPrivileges(self):
# test the +t channel mode, which prevents unprivileged users from changing the topic
self.connectClient('bar', name='bar')
self.joinChannel('bar', '#chan')
self.getMessages('bar')
self.sendLine('bar', 'MODE #chan +t')
replies = {msg.command for msg in self.getMessages('bar')}
# success response is undefined, may be MODE or may be 324 RPL_CHANNELMODEIS,
# depending on whether this was a no-op
self.assertNotIn(ERR_CHANOPRIVSNEEDED, replies)
self.sendLine('bar', 'TOPIC #chan :new topic')
replies = {msg.command for msg in self.getMessages('bar')}
self.assertIn('TOPIC', replies)
self.assertNotIn(ERR_CHANOPRIVSNEEDED, replies)
self.connectClient('qux', name='qux')
self.joinChannel('qux', '#chan')
self.getMessages('qux')
self.sendLine('qux', 'TOPIC #chan :new topic')
replies = {msg.command for msg in self.getMessages('qux')}
self.assertIn(ERR_CHANOPRIVSNEEDED, replies)
self.assertNotIn('TOPIC', replies)
self.sendLine('bar', 'MODE #chan +v qux')
replies = {msg.command for msg in self.getMessages('bar')}
self.assertIn('MODE', replies)
self.assertNotIn(ERR_CHANOPRIVSNEEDED, replies)
# regression test: +v cannot change the topic of a +t channel
self.sendLine('qux', 'TOPIC #chan :new topic')
replies = {msg.command for msg in self.getMessages('qux')}
self.assertIn(ERR_CHANOPRIVSNEEDED, replies)
self.assertNotIn('TOPIC', replies)
# test that RPL_TOPIC and RPL_TOPICTIME are sent on join
self.connectClient('buzz', name='buzz')
self.sendLine('buzz', 'JOIN #chan')
replies = self.getMessages('buzz')
rpl_topic = [msg for msg in replies if msg.command == RPL_TOPIC][0]
self.assertMessageEqual(rpl_topic, command=RPL_TOPIC, params=['buzz', '#chan', 'new topic'])
self.assertEqual(len([msg for msg in replies if msg.command == RPL_TOPICTIME]), 1)
class ModeratedMode(cases.BaseServerTestCase):
@cases.SpecificationSelector.requiredBySpecification('RFC2812')
def testModeratedMode(self):
# test the +m channel mode
self.connectClient('chanop', name='chanop')
self.joinChannel('chanop', '#chan')
self.getMessages('chanop')
self.sendLine('chanop', 'MODE #chan +m')
replies = self.getMessages('chanop')
modeLines = [line for line in replies if line.command == 'MODE']
self.assertMessageEqual(modeLines[0], command='MODE', params=['#chan', '+m'])
self.connectClient('baz', name='baz')
self.joinChannel('baz', '#chan')
self.getMessages('chanop')
# this message should be suppressed completely by +m
self.sendLine('baz', 'PRIVMSG #chan :hi from baz')
replies = self.getMessages('baz')
reply_cmds = {reply.command for reply in replies}
self.assertIn(ERR_CANNOTSENDTOCHAN, reply_cmds)
self.assertEqual(self.getMessages('chanop'), [])
# grant +v, user should be able to send messages
self.sendLine('chanop', 'MODE #chan +v baz')
self.getMessages('chanop')
self.getMessages('baz')
self.sendLine('baz', 'PRIVMSG #chan :hi again from baz')
self.getMessages('baz')
relays = self.getMessages('chanop')
relay = relays[0]
self.assertMessageEqual(relay, command='PRIVMSG', params=['#chan', 'hi again from baz'])
class OpModerated(cases.BaseServerTestCase):
@cases.SpecificationSelector.requiredBySpecification('Oragono')
def testOpModerated(self):
# test the +U channel mode
self.connectClient('chanop', name='chanop', capabilities=MODERN_CAPS)
self.joinChannel('chanop', '#chan')
self.getMessages('chanop')
self.sendLine('chanop', 'MODE #chan +U')
replies = {msg.command for msg in self.getMessages('chanop')}
self.assertIn('MODE', replies)
self.assertNotIn(ERR_CHANOPRIVSNEEDED, replies)
self.connectClient('baz', name='baz', capabilities=MODERN_CAPS)
self.joinChannel('baz', '#chan')
self.sendLine('baz', 'PRIVMSG #chan :hi from baz')
echo = self.getMessages('baz')[0]
self.assertMessageEqual(echo, command='PRIVMSG', params=['#chan', 'hi from baz'])
self.assertEqual([msg for msg in self.getMessages('chanop') if msg.command == 'PRIVMSG'], [echo])
self.connectClient('qux', name='qux', capabilities=MODERN_CAPS)
self.joinChannel('qux', '#chan')
self.sendLine('qux', 'PRIVMSG #chan :hi from qux')
echo = self.getMessages('qux')[0]
self.assertMessageEqual(echo, command='PRIVMSG', params=['#chan', 'hi from qux'])
# message is relayed to chanop but not to unprivileged
self.assertEqual([msg for msg in self.getMessages('chanop') if msg.command == 'PRIVMSG'], [echo])
self.assertEqual([msg for msg in self.getMessages('baz') if msg.command == 'PRIVMSG'], [])
self.sendLine('chanop', 'MODE #chan +v qux')
self.getMessages('chanop')
self.sendLine('qux', 'PRIVMSG #chan :hi again from qux')
echo = [msg for msg in self.getMessages('qux') if msg.command == 'PRIVMSG'][0]
self.assertMessageEqual(echo, command='PRIVMSG', params=['#chan', 'hi again from qux'])
self.assertEqual([msg for msg in self.getMessages('chanop') if msg.command == 'PRIVMSG'], [echo])
self.assertEqual([msg for msg in self.getMessages('baz') if msg.command == 'PRIVMSG'], [echo])
class MuteExtban(cases.BaseServerTestCase):
@cases.SpecificationSelector.requiredBySpecification('Oragono')
def testISupport(self):
isupport = self.getISupport()
token = isupport['EXTBAN']
prefix, comma, types = token.partition(',')
self.assertEqual(prefix, '')
self.assertEqual(comma, ',')
self.assertIn('m', types)
@cases.SpecificationSelector.requiredBySpecification('Oragono')
def testMuteExtban(self):
clients = ('chanop', 'bar', 'qux')
self.connectClient('chanop', name='chanop', capabilities=MODERN_CAPS)
self.joinChannel('chanop', '#chan')
self.getMessages('chanop')
self.sendLine('chanop', 'MODE #chan +b m:bar!*@*')
self.sendLine('chanop', 'MODE #chan +b m:qux!*@*')
replies = {msg.command for msg in self.getMessages('chanop')}
self.assertIn('MODE', replies)
self.assertNotIn(ERR_CHANOPRIVSNEEDED, replies)
self.connectClient('bar', name='bar', capabilities=MODERN_CAPS)
self.joinChannel('bar', '#chan')
self.connectClient('qux', name='qux', capabilities=MODERN_CAPS, ident='evan')
self.joinChannel('qux', '#chan')
for client in clients:
self.getMessages(client)
self.sendLine('bar', 'PRIVMSG #chan :hi from bar')
replies = self.getMessages('bar')
replies_cmds = {msg.command for msg in replies}
self.assertNotIn('PRIVMSG', replies_cmds)
self.assertIn(ERR_CANNOTSENDTOCHAN, replies_cmds)
self.assertEqual(self.getMessages('chanop'), [])
self.sendLine('qux', 'PRIVMSG #chan :hi from qux')
replies = self.getMessages('qux')
replies_cmds = {msg.command for msg in replies}
self.assertNotIn('PRIVMSG', replies_cmds)
self.assertIn(ERR_CANNOTSENDTOCHAN, replies_cmds)
self.assertEqual(self.getMessages('chanop'), [])
# remove mute with -b
self.sendLine('chanop', 'MODE #chan -b m:bar!*@*')
self.getMessages('chanop')
self.sendLine('bar', 'PRIVMSG #chan :hi again from bar')
replies = self.getMessages('bar')
replies_cmds = {msg.command for msg in replies}
self.assertIn('PRIVMSG', replies_cmds)
self.assertNotIn(ERR_CANNOTSENDTOCHAN, replies_cmds)
self.assertEqual(self.getMessages('chanop'), [msg for msg in replies if msg.command == 'PRIVMSG'])
for client in clients:
self.getMessages(client)
# +v grants an exemption to +b
self.sendLine('chanop', 'MODE #chan +v qux')
self.getMessages('chanop')
self.sendLine('qux', 'PRIVMSG #chan :hi again from qux')
replies = self.getMessages('qux')
replies_cmds = {msg.command for msg in replies}
self.assertIn('PRIVMSG', replies_cmds)
self.assertNotIn(ERR_CANNOTSENDTOCHAN, replies_cmds)
self.assertEqual(self.getMessages('chanop'), [msg for msg in replies if msg.command == 'PRIVMSG'])
self.sendLine('qux', 'PART #chan')
self.sendLine('qux', 'JOIN #chan')
self.getMessages('qux')
self.sendLine('chanop', 'MODE #chan +e m:*!~evan@*')
self.getMessages('chanop')
# +e grants an exemption to +b
self.sendLine('qux', 'PRIVMSG #chan :thanks for mute-excepting me')
replies = self.getMessages('qux')
replies_cmds = {msg.command for msg in replies}
self.assertIn('PRIVMSG', replies_cmds)
self.assertNotIn(ERR_CANNOTSENDTOCHAN, replies_cmds)
self.assertEqual(self.getMessages('chanop'), [msg for msg in replies if msg.command == 'PRIVMSG'])
@cases.SpecificationSelector.requiredBySpecification('Oragono')
def testIssue1370(self):
# regression test for oragono #1370: mutes not correctly enforced against
# users with capital letters in their NUH
clients = ('chanop', 'bar')
self.connectClient('chanop', name='chanop', capabilities=MODERN_CAPS)
self.joinChannel('chanop', '#chan')
self.getMessages('chanop')
self.sendLine('chanop', 'MODE #chan +b m:BAR!*@*')
replies = {msg.command for msg in self.getMessages('chanop')}
self.assertIn('MODE', replies)
self.assertNotIn(ERR_CHANOPRIVSNEEDED, replies)
self.connectClient('Bar', name='bar', capabilities=MODERN_CAPS)
self.joinChannel('bar', '#chan')
for client in clients:
self.getMessages(client)
self.sendLine('bar', 'PRIVMSG #chan :hi from bar')
replies = self.getMessages('bar')
replies_cmds = {msg.command for msg in replies}
self.assertNotIn('PRIVMSG', replies_cmds)
self.assertIn(ERR_CANNOTSENDTOCHAN, replies_cmds)
self.assertEqual(self.getMessages('chanop'), [])
# remove mute with -b
self.sendLine('chanop', 'MODE #chan -b m:bar!*@*')
self.getMessages('chanop')
self.sendLine('bar', 'PRIVMSG #chan :hi again from bar')
replies = self.getMessages('bar')
replies_cmds = {msg.command for msg in replies}
self.assertIn('PRIVMSG', replies_cmds)
self.assertNotIn(ERR_CANNOTSENDTOCHAN, replies_cmds)
self.assertEqual(self.getMessages('chanop'), [msg for msg in replies if msg.command == 'PRIVMSG'])