mirror of https://github.com/progval/irctest.git
224 lines
8.7 KiB
Python
224 lines
8.7 KiB
Python
"""
|
|
The TOPIC command (`RFC 1459
|
|
<https://datatracker.ietf.org/doc/html/rfc1459#section-4.2.1>`__,
|
|
`RFC 2812 <https://datatracker.ietf.org/doc/html/rfc2812#section-3.2.1>`__,
|
|
`Modern <https://modern.ircdocs.horse/#topic-message>`__)
|
|
"""
|
|
|
|
from irctest import cases, client_mock, runner
|
|
from irctest.numerics import ERR_CHANOPRIVSNEEDED, RPL_NOTOPIC, RPL_TOPIC, RPL_TOPICTIME
|
|
|
|
|
|
class TopicTestCase(cases.BaseServerTestCase):
|
|
@cases.mark_specifications("RFC1459", "RFC2812")
|
|
def testTopicRfc(self):
|
|
"""“Once a user has joined a channel, he receives information about
|
|
all commands his server receives affecting the channel. This
|
|
includes […] TOPIC”
|
|
-- <https://tools.ietf.org/html/rfc1459#section-4.2.1>
|
|
and <https://tools.ietf.org/html/rfc2812#section-3.2.1>
|
|
"""
|
|
self._testTopic(assert_echo=False)
|
|
|
|
@cases.mark_specifications("Modern")
|
|
def testTopicModern(self):
|
|
""" "If the topic of a channel is changed or cleared, every client in that
|
|
channel (including the author of the topic change) will receive a TOPIC command
|
|
with the new topic as argument (or an empty argument if the topic was cleared)
|
|
alerting them to how the topic has changed.
|
|
|
|
Clients joining the channel in the future will receive a RPL_TOPIC numeric (or
|
|
lack thereof) accordingly."
|
|
-- https://modern.ircdocs.horse/#topic-message
|
|
"""
|
|
self._testTopic(assert_echo=True)
|
|
|
|
def _testTopic(self, assert_echo: bool):
|
|
self.connectClient("foo")
|
|
self.joinChannel(1, "#chan")
|
|
|
|
self.connectClient("bar")
|
|
self.joinChannel(2, "#chan")
|
|
|
|
# clear waiting msgs about cli 2 joining the channel
|
|
self.getMessages(1)
|
|
self.getMessages(2)
|
|
|
|
# TODO: check foo is opped OR +t is unset
|
|
|
|
self.sendLine(1, "TOPIC #chan :T0P1C")
|
|
try:
|
|
m = self.getMessage(1)
|
|
if m.command == "482":
|
|
raise runner.ImplementationChoice(
|
|
"Channel creators are not opped by default, and "
|
|
"channel modes to no allow regular users to change "
|
|
"topic."
|
|
)
|
|
self.assertMessageMatch(m, command="TOPIC")
|
|
except client_mock.NoMessageException:
|
|
self.assertFalse(assert_echo, "TOPIC was not echoed back to the author")
|
|
# The RFCs do not say TOPIC must be echoed
|
|
pass
|
|
m = self.getMessage(2)
|
|
self.assertMessageMatch(m, command="TOPIC", params=["#chan", "T0P1C"])
|
|
|
|
@cases.mark_specifications("RFC1459", "RFC2812")
|
|
def testTopicMode(self):
|
|
"""“Once a user has joined a channel, he receives information about
|
|
all commands his server receives affecting the channel. This
|
|
includes […] TOPIC”
|
|
-- <https://tools.ietf.org/html/rfc1459#section-4.2.1>
|
|
and <https://tools.ietf.org/html/rfc2812#section-3.2.1>
|
|
"""
|
|
self.connectClient("foo")
|
|
self.joinChannel(1, "#chan")
|
|
|
|
self.connectClient("bar")
|
|
self.joinChannel(2, "#chan")
|
|
|
|
self.getMessages(1)
|
|
self.getMessages(2)
|
|
|
|
# TODO: check foo is opped
|
|
|
|
self.sendLine(1, "MODE #chan +t")
|
|
self.getMessages(1)
|
|
|
|
self.getMessages(2)
|
|
self.sendLine(2, "TOPIC #chan :T0P1C")
|
|
m = self.getMessage(2)
|
|
self.assertMessageMatch(
|
|
m, command="482", fail_msg="Non-op user was not refused use of TOPIC: {msg}"
|
|
)
|
|
self.assertEqual(self.getMessages(1), [])
|
|
|
|
self.sendLine(1, "MODE #chan -t")
|
|
self.getMessages(1)
|
|
self.sendLine(2, "TOPIC #chan :T0P1C")
|
|
try:
|
|
m = self.getMessage(2)
|
|
self.assertNotEqual(
|
|
m.command,
|
|
"482",
|
|
msg="User was refused TOPIC whereas +t was not " "set: {}".format(m),
|
|
)
|
|
except client_mock.NoMessageException:
|
|
# The RFCs do not say TOPIC must be echoed
|
|
pass
|
|
m = self.getMessage(1)
|
|
self.assertMessageMatch(m, command="TOPIC", params=["#chan", "T0P1C"])
|
|
|
|
@cases.mark_specifications("RFC2812")
|
|
def testTopicNonexistentChannel(self):
|
|
"""RFC2812 specifies ERR_NOTONCHANNEL as the correct response to TOPIC
|
|
on a nonexistent channel. The modern spec prefers ERR_NOSUCHCHANNEL.
|
|
|
|
<https://tools.ietf.org/html/rfc2812#section-3.2.4>
|
|
<http://modern.ircdocs.horse/#topic-message>
|
|
"""
|
|
self.connectClient("foo")
|
|
self.sendLine(1, "TOPIC #chan")
|
|
m = self.getMessage(1)
|
|
# either 403 ERR_NOSUCHCHANNEL or 443 ERR_NOTONCHANNEL
|
|
self.assertIn(m.command, ("403", "443"))
|
|
|
|
@cases.mark_specifications("RFC2812")
|
|
def testUnsetTopicResponses(self):
|
|
"""Test various cases related to RPL_NOTOPIC with set and unset topics."""
|
|
self.connectClient("bar")
|
|
self.sendLine(1, "JOIN #test")
|
|
messages = self.getMessages(1)
|
|
# shouldn't send RPL_NOTOPIC for a new channel
|
|
self.assertNotIn(RPL_NOTOPIC, [m.command for m in messages])
|
|
|
|
self.connectClient("baz")
|
|
self.sendLine(2, "JOIN #test")
|
|
messages = self.getMessages(2)
|
|
# topic is still unset, shouldn't send RPL_NOTOPIC on initial join
|
|
self.assertNotIn(RPL_NOTOPIC, [m.command for m in messages])
|
|
|
|
self.sendLine(2, "TOPIC #test")
|
|
messages = self.getMessages(2)
|
|
# explicit TOPIC should receive RPL_NOTOPIC
|
|
self.assertIn(RPL_NOTOPIC, [m.command for m in messages])
|
|
|
|
self.getMessages(1)
|
|
|
|
self.sendLine(1, "TOPIC #test :new topic")
|
|
# client 1 should get the new TOPIC line echoed
|
|
self.assertMessageMatch(
|
|
self.getMessage(1), command="TOPIC", params=["#test", "new topic"]
|
|
)
|
|
# client 2 should get the new TOPIC line too
|
|
self.assertMessageMatch(
|
|
self.getMessage(2), command="TOPIC", params=["#test", "new topic"]
|
|
)
|
|
|
|
# unset the topic:
|
|
self.sendLine(1, "TOPIC #test :")
|
|
# client 1 should get the new TOPIC line echoed, which has the empty arg
|
|
self.assertMessageMatch(
|
|
self.getMessage(1), command="TOPIC", params=["#test", ""]
|
|
)
|
|
# client 2 should get the new TOPIC line to
|
|
self.assertMessageMatch(
|
|
self.getMessage(2), command="TOPIC", params=["#test", ""]
|
|
)
|
|
|
|
self.connectClient("qux")
|
|
self.sendLine(3, "join #test")
|
|
messages = self.getMessages(3)
|
|
# topic is once again unset, shouldn't send RPL_NOTOPIC on initial join
|
|
self.assertNotIn(RPL_NOTOPIC, [m.command for m in messages])
|
|
|
|
|
|
class TopicPrivilegesTestCase(cases.BaseServerTestCase):
|
|
@cases.mark_specifications("RFC2812")
|
|
def testTopicPrivileges(self):
|
|
# test the +t channel mode, which prevents unprivileged users
|
|
# from changing the topic
|
|
self.connectClient("bar", name="bar")
|
|
self.joinChannel("bar", "#chan")
|
|
self.getMessages("bar")
|
|
self.sendLine("bar", "MODE #chan +t")
|
|
replies = {msg.command for msg in self.getMessages("bar")}
|
|
# success response is undefined, may be MODE or may be 324 RPL_CHANNELMODEIS,
|
|
# depending on whether this was a no-op
|
|
self.assertNotIn(ERR_CHANOPRIVSNEEDED, replies)
|
|
self.sendLine("bar", "TOPIC #chan :new topic")
|
|
replies = {msg.command for msg in self.getMessages("bar")}
|
|
self.assertIn("TOPIC", replies)
|
|
self.assertNotIn(ERR_CHANOPRIVSNEEDED, replies)
|
|
|
|
self.connectClient("qux", name="qux")
|
|
self.joinChannel("qux", "#chan")
|
|
self.getMessages("qux")
|
|
self.sendLine("qux", "TOPIC #chan :new topic")
|
|
replies = {msg.command for msg in self.getMessages("qux")}
|
|
self.assertIn(ERR_CHANOPRIVSNEEDED, replies)
|
|
self.assertNotIn("TOPIC", replies)
|
|
|
|
self.sendLine("bar", "MODE #chan +v qux")
|
|
replies = {msg.command for msg in self.getMessages("bar")}
|
|
self.assertIn("MODE", replies)
|
|
self.assertNotIn(ERR_CHANOPRIVSNEEDED, replies)
|
|
|
|
# regression test: +v cannot change the topic of a +t channel
|
|
self.sendLine("qux", "TOPIC #chan :new topic")
|
|
replies = {msg.command for msg in self.getMessages("qux")}
|
|
self.assertIn(ERR_CHANOPRIVSNEEDED, replies)
|
|
self.assertNotIn("TOPIC", replies)
|
|
|
|
# test that RPL_TOPIC and RPL_TOPICTIME are sent on join
|
|
self.connectClient("buzz", name="buzz")
|
|
self.sendLine("buzz", "JOIN #chan")
|
|
replies = self.getMessages("buzz")
|
|
rpl_topic = [msg for msg in replies if msg.command == RPL_TOPIC][0]
|
|
self.assertMessageMatch(
|
|
rpl_topic, command=RPL_TOPIC, params=["buzz", "#chan", "new topic"]
|
|
)
|
|
self.assertEqual(
|
|
len([msg for msg in replies if msg.command == RPL_TOPICTIME]), 1
|
|
)
|