mirror of
https://github.com/progval/irctest.git
synced 2025-04-05 06:49:47 +00:00
213 lines
8.5 KiB
Python
213 lines
8.5 KiB
Python
"""
|
|
The PRIVMSG and NOTICE commands.
|
|
"""
|
|
|
|
from irctest import cases
|
|
from irctest.numerics import ERR_INPUTTOOLONG
|
|
from irctest.patma import ANYSTR
|
|
|
|
|
|
class PrivmsgTestCase(cases.BaseServerTestCase):
|
|
@cases.mark_specifications("RFC1459", "RFC2812")
|
|
def testPrivmsg(self):
|
|
"""<https://tools.ietf.org/html/rfc2812#section-3.3.1>"""
|
|
self.connectClient("foo")
|
|
self.sendLine(1, "JOIN #chan")
|
|
self.getMessages(1) # synchronize
|
|
self.connectClient("bar")
|
|
self.sendLine(2, "JOIN #chan")
|
|
self.getMessages(2) # synchronize
|
|
self.sendLine(1, "PRIVMSG #chan :hello there")
|
|
self.getMessages(1) # synchronize
|
|
pms = [msg for msg in self.getMessages(2) if msg.command == "PRIVMSG"]
|
|
self.assertEqual(len(pms), 1)
|
|
self.assertMessageMatch(
|
|
pms[0], command="PRIVMSG", params=["#chan", "hello there"]
|
|
)
|
|
|
|
@cases.mark_specifications("RFC1459", "RFC2812")
|
|
def testPrivmsgNonexistentChannel(self):
|
|
"""<https://tools.ietf.org/html/rfc2812#section-3.3.1>"""
|
|
self.connectClient("foo")
|
|
self.sendLine(1, "PRIVMSG #nonexistent :hello there")
|
|
msg = self.getMessage(1)
|
|
# ERR_NOSUCHNICK, ERR_NOSUCHCHANNEL, or ERR_CANNOTSENDTOCHAN
|
|
self.assertIn(msg.command, ("401", "403", "404"))
|
|
|
|
@cases.mark_specifications("RFC1459", "RFC2812")
|
|
def testPrivmsgToUser(self):
|
|
"""<https://tools.ietf.org/html/rfc2812#section-3.3.1>"""
|
|
self.connectClient("foo")
|
|
self.connectClient("bar")
|
|
self.sendLine(1, "PRIVMSG bar :hey there!")
|
|
self.getMessages(1)
|
|
pms = [msg for msg in self.getMessages(2) if msg.command == "PRIVMSG"]
|
|
self.assertEqual(len(pms), 1)
|
|
self.assertMessageMatch(pms[0], command="PRIVMSG", params=["bar", "hey there!"])
|
|
|
|
@cases.mark_specifications("RFC1459", "RFC2812")
|
|
def testPrivmsgNonexistentUser(self):
|
|
"""<https://tools.ietf.org/html/rfc2812#section-3.3.1>"""
|
|
self.connectClient("foo")
|
|
self.sendLine(1, "PRIVMSG bar :hey there!")
|
|
msg = self.getMessage(1)
|
|
# ERR_NOSUCHNICK: 401 <sender> <recipient> :No such nick
|
|
self.assertMessageMatch(msg, command="401", params=["foo", "bar", ANYSTR])
|
|
|
|
@cases.mark_specifications("RFC1459", "RFC2812", "Modern")
|
|
@cases.xfailIfSoftware(
|
|
["irc2"],
|
|
"replies with ERR_NEEDMOREPARAMS instead of ERR_NOTEXTTOSEND",
|
|
)
|
|
def testEmptyPrivmsg(self):
|
|
self.connectClient("foo")
|
|
self.sendLine(1, "JOIN #chan")
|
|
self.getMessages(1) # synchronize
|
|
self.connectClient("bar")
|
|
self.sendLine(2, "JOIN #chan")
|
|
self.getMessages(2) # synchronize
|
|
self.getMessages(1) # synchronize
|
|
self.sendLine(1, "PRIVMSG #chan :")
|
|
|
|
self.assertMessageMatch(
|
|
self.getMessage(1),
|
|
command="412", # ERR_NOTEXTTOSEND
|
|
params=["foo", ANYSTR],
|
|
)
|
|
self.assertEqual(self.getMessages(2), [])
|
|
|
|
|
|
class NoticeTestCase(cases.BaseServerTestCase):
|
|
@cases.mark_specifications("RFC1459", "RFC2812")
|
|
def testNotice(self):
|
|
"""<https://tools.ietf.org/html/rfc2812#section-3.3.2>"""
|
|
self.connectClient("foo")
|
|
self.sendLine(1, "JOIN #chan")
|
|
self.connectClient("bar")
|
|
self.sendLine(2, "JOIN #chan")
|
|
self.getMessages(2) # synchronize
|
|
self.sendLine(1, "NOTICE #chan :hello there")
|
|
self.getMessages(1) # synchronize
|
|
notices = [msg for msg in self.getMessages(2) if msg.command == "NOTICE"]
|
|
self.assertEqual(len(notices), 1)
|
|
self.assertMessageMatch(
|
|
notices[0], command="NOTICE", params=["#chan", "hello there"]
|
|
)
|
|
|
|
@cases.mark_specifications("RFC1459", "RFC2812")
|
|
@cases.xfailIfSoftware(
|
|
["InspIRCd"],
|
|
"replies with ERR_NOSUCHCHANNEL to NOTICE to non-existent channels",
|
|
)
|
|
@cases.xfailIfSoftware(
|
|
["UnrealIRCd"],
|
|
"replies with ERR_NOSUCHCHANNEL to NOTICE to non-existent channels: "
|
|
"https://bugs.unrealircd.org/view.php?id=5949",
|
|
)
|
|
def testNoticeNonexistentChannel(self):
|
|
"""
|
|
"automatic replies must never be
|
|
sent in response to a NOTICE message. This rule applies to servers
|
|
too - they must not send any error reply back to the client on
|
|
receipt of a notice"
|
|
<https://tools.ietf.org/html/rfc1459#section-4.4.2>
|
|
|
|
'automatic replies MUST NEVER be sent in response to a NOTICE message.
|
|
This rule applies to servers too - they MUST NOT send any error repl
|
|
back to the client on receipt of a notice."
|
|
<https://tools.ietf.org/html/rfc2812#section-3.3.2>
|
|
"""
|
|
self.connectClient("foo")
|
|
self.sendLine(1, "NOTICE #nonexistent :hello there")
|
|
self.assertEqual(self.getMessages(1), [])
|
|
|
|
|
|
class TagsTestCase(cases.BaseServerTestCase):
|
|
@cases.mark_capabilities("message-tags")
|
|
@cases.xfailIf(
|
|
lambda self: bool(
|
|
self.controller.software_name == "UnrealIRCd"
|
|
and self.controller.software_version == 5
|
|
),
|
|
"UnrealIRCd <6.0.7 dropped messages with excessively large tags: "
|
|
"https://bugs.unrealircd.org/view.php?id=5947",
|
|
)
|
|
def testLineTooLong(self):
|
|
self.connectClient("bar", capabilities=["message-tags"], skip_if_cap_nak=True)
|
|
self.connectClient(
|
|
"recver", capabilities=["message-tags"], skip_if_cap_nak=True
|
|
)
|
|
self.joinChannel(1, "#xyz")
|
|
monsterMessage = "@+clientOnlyTagExample=" + "a" * 4096 + " PRIVMSG #xyz hi!"
|
|
self.sendLine(1, monsterMessage)
|
|
replies = self.getMessages(1)
|
|
self.assertIn(ERR_INPUTTOOLONG, set(reply.command for reply in replies))
|
|
self.assertEqual(self.getMessages(2), [], "overflowing message was relayed")
|
|
|
|
|
|
class LengthLimitTestCase(cases.BaseServerTestCase):
|
|
@cases.mark_specifications("Ergo")
|
|
def testLineAtLimit(self):
|
|
self.connectClient("bar", name="bar")
|
|
self.getMessages("bar")
|
|
line = "PING " + ("x" * (512 - 7))
|
|
# this line is exactly as the limit, after including \r\n:
|
|
self.assertEqual(len(line), 510)
|
|
# oragono should accept and process this message. the outgoing PONG
|
|
# will be truncated due to the addition of the server name as source
|
|
# and initial parameter; this is fine:
|
|
self.sendLine("bar", line)
|
|
result = self.getMessage("bar", synchronize=False)
|
|
self.assertMessageMatch(result, command="PONG")
|
|
self.assertIn("x" * 450, result.params[-1])
|
|
|
|
@cases.mark_specifications("Ergo")
|
|
def testLineBeyondLimit(self):
|
|
self.connectClient("bar", name="bar")
|
|
self.getMessages("bar")
|
|
line = "PING " + ("x" * (512 - 6))
|
|
# this line is one over the limit after including \r\n:
|
|
self.assertEqual(len(line), 511)
|
|
# oragono should reject this message for exceeding the length limit:
|
|
self.sendLine("bar", line)
|
|
result = self.getMessage("bar", synchronize=False)
|
|
self.assertMessageMatch(result, command=ERR_INPUTTOOLONG)
|
|
# we should not be disconnected and should be able to join a channel
|
|
self.joinChannel("bar", "#test_channel")
|
|
|
|
|
|
class NoCTCPModeTestCase(cases.BaseServerTestCase):
|
|
@cases.mark_specifications("Ergo")
|
|
def testNoCTCPMode(self):
|
|
self.connectClient("bar", "bar")
|
|
self.connectClient("qux", "qux")
|
|
# CTCP is not blocked by default:
|
|
self.sendLine("qux", "PRIVMSG bar :\x01VERSION\x01")
|
|
self.getMessages("qux")
|
|
relay = [msg for msg in self.getMessages("bar") if msg.command == "PRIVMSG"][0]
|
|
self.assertMessageMatch(
|
|
relay, command="PRIVMSG", params=["bar", "\x01VERSION\x01"]
|
|
)
|
|
|
|
# set the no-CTCP user mode on bar:
|
|
self.sendLine("bar", "MODE bar +T")
|
|
replies = self.getMessages("bar")
|
|
umode_line = [msg for msg in replies if msg.command == "MODE"][0]
|
|
self.assertMessageMatch(umode_line, command="MODE", params=["bar", "+T"])
|
|
|
|
# CTCP is now blocked:
|
|
self.sendLine("qux", "PRIVMSG bar :\x01VERSION\x01")
|
|
self.getMessages("qux")
|
|
self.assertEqual(self.getMessages("bar"), [])
|
|
|
|
# normal PRIVMSG go through:
|
|
self.sendLine("qux", "PRIVMSG bar :please just tell me your client version")
|
|
self.getMessages("qux")
|
|
relay = self.getMessages("bar")[0]
|
|
self.assertMessageMatch(
|
|
relay,
|
|
command="PRIVMSG",
|
|
nick="qux",
|
|
params=["bar", "please just tell me your client version"],
|
|
)
|