diff --git a/irctest/server_tests/chmodes/__init__.py b/irctest/server_tests/chmodes/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/irctest/server_tests/chmodes/auditorium.py b/irctest/server_tests/chmodes/auditorium.py new file mode 100644 index 0000000..8cd74a3 --- /dev/null +++ b/irctest/server_tests/chmodes/auditorium.py @@ -0,0 +1,127 @@ +import math +import time + +from irctest import cases +from irctest.irc_utils.junkdrawer import ircv3_timestamp_to_unixtime +from irctest.numerics import RPL_NAMREPLY + +MODERN_CAPS = [ + "server-time", + "message-tags", + "batch", + "labeled-response", + "echo-message", + "account-tag", +] + + +class AuditoriumTestCase(cases.BaseServerTestCase): + @cases.mark_specifications("Ergo") + def testAuditorium(self): + self.connectClient("bar", name="bar", capabilities=MODERN_CAPS) + self.joinChannel("bar", "#auditorium") + self.getMessages("bar") + self.sendLine("bar", "MODE #auditorium +u") + modelines = [msg for msg in self.getMessages("bar") if msg.command == "MODE"] + self.assertEqual(len(modelines), 1) + self.assertMessageMatch(modelines[0], params=["#auditorium", "+u"]) + + self.connectClient("guest1", name="guest1", capabilities=MODERN_CAPS) + self.joinChannel("guest1", "#auditorium") + self.getMessages("guest1") + # chanop should get a JOIN message + join_msgs = [msg for msg in self.getMessages("bar") if msg.command == "JOIN"] + self.assertEqual(len(join_msgs), 1) + self.assertMessageMatch(join_msgs[0], nick="guest1", params=["#auditorium"]) + + self.connectClient("guest2", name="guest2", capabilities=MODERN_CAPS) + self.joinChannel("guest2", "#auditorium") + self.getMessages("guest2") + # chanop should get a JOIN message + join_msgs = [msg for msg in self.getMessages("bar") if msg.command == "JOIN"] + self.assertEqual(len(join_msgs), 1) + join_msg = join_msgs[0] + self.assertMessageMatch(join_msg, nick="guest2", params=["#auditorium"]) + # oragono/oragono#1642 ; msgid should be populated, + # and the time tag should be sane + self.assertTrue(join_msg.tags.get("msgid")) + self.assertLessEqual( + math.fabs(time.time() - ircv3_timestamp_to_unixtime(join_msg.tags["time"])), + 60.0, + ) + # fellow unvoiced participant should not + unvoiced_join_msgs = [ + msg for msg in self.getMessages("guest1") if msg.command == "JOIN" + ] + self.assertEqual(len(unvoiced_join_msgs), 0) + + self.connectClient("guest3", name="guest3", capabilities=MODERN_CAPS) + self.joinChannel("guest3", "#auditorium") + self.getMessages("guest3") + + self.sendLine("bar", "PRIVMSG #auditorium hi") + echo_message = [ + msg for msg in self.getMessages("bar") if msg.command == "PRIVMSG" + ][0] + self.assertEqual(echo_message, self.getMessages("guest1")[0]) + self.assertEqual(echo_message, self.getMessages("guest2")[0]) + self.assertEqual(echo_message, self.getMessages("guest3")[0]) + + # unvoiced users can speak + self.sendLine("guest1", "PRIVMSG #auditorium :hi you") + echo_message = [ + msg for msg in self.getMessages("guest1") if msg.command == "PRIVMSG" + ][0] + self.assertEqual(self.getMessages("bar"), [echo_message]) + self.assertEqual(self.getMessages("guest2"), [echo_message]) + self.assertEqual(self.getMessages("guest3"), [echo_message]) + + def names(client): + self.sendLine(client, "NAMES #auditorium") + result = set() + for msg in self.getMessages(client): + if msg.command == RPL_NAMREPLY: + result.update(msg.params[-1].split()) + return result + + self.assertEqual(names("bar"), {"@bar", "guest1", "guest2", "guest3"}) + self.assertEqual(names("guest1"), {"@bar"}) + self.assertEqual(names("guest2"), {"@bar"}) + self.assertEqual(names("guest3"), {"@bar"}) + + self.sendLine("bar", "MODE #auditorium +v guest1") + modeLine = [msg for msg in self.getMessages("bar") if msg.command == "MODE"][0] + self.assertEqual(self.getMessages("guest1"), [modeLine]) + self.assertEqual(self.getMessages("guest2"), [modeLine]) + self.assertEqual(self.getMessages("guest3"), [modeLine]) + self.assertEqual(names("bar"), {"@bar", "+guest1", "guest2", "guest3"}) + self.assertEqual(names("guest2"), {"@bar", "+guest1"}) + self.assertEqual(names("guest3"), {"@bar", "+guest1"}) + + self.sendLine("guest1", "PART #auditorium") + part = [msg for msg in self.getMessages("guest1") if msg.command == "PART"][0] + # everyone should see voiced PART + self.assertEqual(self.getMessages("bar")[0], part) + self.assertEqual(self.getMessages("guest2")[0], part) + self.assertEqual(self.getMessages("guest3")[0], part) + + self.joinChannel("guest1", "#auditorium") + self.getMessages("guest1") + self.getMessages("bar") + + self.sendLine("guest2", "PART #auditorium") + part = [msg for msg in self.getMessages("guest2") if msg.command == "PART"][0] + self.assertEqual(self.getMessages("bar"), [part]) + # part should be hidden from unvoiced participants + self.assertEqual(self.getMessages("guest1"), []) + self.assertEqual(self.getMessages("guest3"), []) + + self.sendLine("guest3", "QUIT") + self.assertDisconnected("guest3") + # quit should be hidden from unvoiced participants + self.assertEqual( + len([msg for msg in self.getMessages("bar") if msg.command == "QUIT"]), 1 + ) + self.assertEqual( + len([msg for msg in self.getMessages("guest1") if msg.command == "QUIT"]), 0 + ) diff --git a/irctest/server_tests/chmodes/ban.py b/irctest/server_tests/chmodes/ban.py new file mode 100644 index 0000000..1d097aa --- /dev/null +++ b/irctest/server_tests/chmodes/ban.py @@ -0,0 +1,47 @@ +from irctest import cases +from irctest.numerics import ERR_BANNEDFROMCHAN + + +class BanMode(cases.BaseServerTestCase): + @cases.mark_specifications("RFC1459", "RFC2812") + def testBan(self): + """Basic ban operation""" + self.connectClient("chanop", name="chanop") + self.joinChannel("chanop", "#chan") + self.getMessages("chanop") + self.sendLine("chanop", "MODE #chan +b bar!*@*") + self.assertMessageMatch(self.getMessage("chanop"), command="MODE") + + self.connectClient( + "Bar", name="bar", capabilities=["echo-message"], skip_if_cap_nak=True + ) + self.getMessages("bar") + self.sendLine("bar", "JOIN #chan") + self.assertMessageMatch(self.getMessage("bar"), command=ERR_BANNEDFROMCHAN) + + self.sendLine("chanop", "MODE #chan -b bar!*@*") + self.assertMessageMatch(self.getMessage("chanop"), command="MODE") + + self.sendLine("bar", "JOIN #chan") + self.assertMessageMatch(self.getMessage("bar"), command="JOIN") + + @cases.mark_specifications("Ergo") + def testCaseInsensitive(self): + """Some clients allow unsetting modes if their argument matches + up to normalization""" + self.connectClient("chanop", name="chanop") + self.joinChannel("chanop", "#chan") + self.getMessages("chanop") + self.sendLine("chanop", "MODE #chan +b BAR!*@*") + self.assertMessageMatch(self.getMessage("chanop"), command="MODE") + + self.connectClient("Bar", name="bar", capabilities=["echo-message"]) + self.getMessages("bar") + self.sendLine("bar", "JOIN #chan") + self.assertMessageMatch(self.getMessage("bar"), command=ERR_BANNEDFROMCHAN) + + self.sendLine("chanop", "MODE #chan -b bar!*@*") + self.assertMessageMatch(self.getMessage("chanop"), command="MODE") + + self.sendLine("bar", "JOIN #chan") + self.assertMessageMatch(self.getMessage("bar"), command="JOIN") diff --git a/irctest/server_tests/chmodes/ergo.py b/irctest/server_tests/chmodes/ergo.py new file mode 100644 index 0000000..cf22dc6 --- /dev/null +++ b/irctest/server_tests/chmodes/ergo.py @@ -0,0 +1,124 @@ +from irctest import cases +from irctest.numerics import ERR_CANNOTSENDTOCHAN, ERR_CHANOPRIVSNEEDED + +MODERN_CAPS = [ + "server-time", + "message-tags", + "batch", + "labeled-response", + "echo-message", + "account-tag", +] + + +@cases.mark_services +class RegisteredOnlySpeakMode(cases.BaseServerTestCase): + @cases.mark_specifications("Ergo") + def testRegisteredOnlySpeakMode(self): + self.controller.registerUser(self, "evan", "sesame") + + # test the +M (only registered users and ops can speak) channel mode + self.connectClient("chanop", name="chanop") + self.joinChannel("chanop", "#chan") + self.getMessages("chanop") + self.sendLine("chanop", "MODE #chan +M") + replies = self.getMessages("chanop") + modeLines = [line for line in replies if line.command == "MODE"] + self.assertMessageMatch(modeLines[0], command="MODE", params=["#chan", "+M"]) + + self.connectClient("baz", name="baz") + self.joinChannel("baz", "#chan") + self.getMessages("chanop") + # this message should be suppressed completely by +M + self.sendLine("baz", "PRIVMSG #chan :hi from baz") + replies = self.getMessages("baz") + reply_cmds = {reply.command for reply in replies} + self.assertIn(ERR_CANNOTSENDTOCHAN, reply_cmds) + self.assertEqual(self.getMessages("chanop"), []) + + # +v exempts users from the registration requirement: + self.sendLine("chanop", "MODE #chan +v baz") + self.getMessages("chanop") + self.getMessages("baz") + self.sendLine("baz", "PRIVMSG #chan :hi again from baz") + replies = self.getMessages("baz") + # baz should not receive an error (or an echo) + self.assertEqual(replies, []) + replies = self.getMessages("chanop") + self.assertMessageMatch( + replies[0], command="PRIVMSG", params=["#chan", "hi again from baz"] + ) + + self.connectClient( + "evan", + name="evan", + account="evan", + password="sesame", + capabilities=["sasl"], + ) + self.joinChannel("evan", "#chan") + self.getMessages("baz") + self.sendLine("evan", "PRIVMSG #chan :hi from evan") + replies = self.getMessages("evan") + # evan should not receive an error (or an echo) + self.assertEqual(replies, []) + replies = self.getMessages("baz") + self.assertMessageMatch( + replies[0], command="PRIVMSG", params=["#chan", "hi from evan"] + ) + + +class OpModerated(cases.BaseServerTestCase): + @cases.mark_specifications("Ergo") + def testOpModerated(self): + # test the +U channel mode + self.connectClient("chanop", name="chanop", capabilities=MODERN_CAPS) + self.joinChannel("chanop", "#chan") + self.getMessages("chanop") + self.sendLine("chanop", "MODE #chan +U") + replies = {msg.command for msg in self.getMessages("chanop")} + self.assertIn("MODE", replies) + self.assertNotIn(ERR_CHANOPRIVSNEEDED, replies) + + self.connectClient("baz", name="baz", capabilities=MODERN_CAPS) + self.joinChannel("baz", "#chan") + self.sendLine("baz", "PRIVMSG #chan :hi from baz") + echo = self.getMessages("baz")[0] + self.assertMessageMatch( + echo, command="PRIVMSG", params=["#chan", "hi from baz"] + ) + self.assertEqual( + [msg for msg in self.getMessages("chanop") if msg.command == "PRIVMSG"], + [echo], + ) + + self.connectClient("qux", name="qux", capabilities=MODERN_CAPS) + self.joinChannel("qux", "#chan") + self.sendLine("qux", "PRIVMSG #chan :hi from qux") + echo = self.getMessages("qux")[0] + self.assertMessageMatch( + echo, command="PRIVMSG", params=["#chan", "hi from qux"] + ) + # message is relayed to chanop but not to unprivileged + self.assertEqual( + [msg for msg in self.getMessages("chanop") if msg.command == "PRIVMSG"], + [echo], + ) + self.assertEqual( + [msg for msg in self.getMessages("baz") if msg.command == "PRIVMSG"], [] + ) + + self.sendLine("chanop", "MODE #chan +v qux") + self.getMessages("chanop") + self.sendLine("qux", "PRIVMSG #chan :hi again from qux") + echo = [msg for msg in self.getMessages("qux") if msg.command == "PRIVMSG"][0] + self.assertMessageMatch( + echo, command="PRIVMSG", params=["#chan", "hi again from qux"] + ) + self.assertEqual( + [msg for msg in self.getMessages("chanop") if msg.command == "PRIVMSG"], + [echo], + ) + self.assertEqual( + [msg for msg in self.getMessages("baz") if msg.command == "PRIVMSG"], [echo] + ) diff --git a/irctest/server_tests/chmodes/key.py b/irctest/server_tests/chmodes/key.py new file mode 100644 index 0000000..eae1ab3 --- /dev/null +++ b/irctest/server_tests/chmodes/key.py @@ -0,0 +1,134 @@ +import pytest + +from irctest import cases +from irctest.numerics import ( + ERR_BADCHANNELKEY, + ERR_INVALIDKEY, + ERR_INVALIDMODEPARAM, + ERR_UNKNOWNERROR, +) +from irctest.patma import ANYSTR + + +class KeyTestCase(cases.BaseServerTestCase): + @cases.mark_specifications("RFC1459", "RFC2812") + def testKeyNormal(self): + self.connectClient("bar") + self.joinChannel(1, "#chan") + self.sendLine(1, "MODE #chan +k beer") + self.getMessages(1) + + self.connectClient("qux") + self.getMessages(2) + self.sendLine(2, "JOIN #chan") + reply = self.getMessages(2) + self.assertNotIn("JOIN", {msg.command for msg in reply}) + self.assertIn(ERR_BADCHANNELKEY, {msg.command for msg in reply}) + + self.sendLine(2, "JOIN #chan beer") + reply = self.getMessages(2) + self.assertMessageMatch(reply[0], command="JOIN", params=["#chan"]) + + @pytest.mark.parametrize( + "key", + ["passphrase with spaces", "long" * 100, ""], + ids=["spaces", "long", "empty"], + ) + @cases.mark_specifications("RFC2812", "Modern") + def testKeyValidation(self, key): + """ + key = 1*23( %x01-05 / %x07-08 / %x0C / %x0E-1F / %x21-7F ) + ; any 7-bit US_ASCII character, + ; except NUL, CR, LF, FF, h/v TABs, and " " + -- https://tools.ietf.org/html/rfc2812#page-8 + + "Servers may validate the value (eg. to forbid spaces, as they make it harder + to use the key in `JOIN` messages). If the value is invalid, they SHOULD + return [`ERR_INVALIDMODEPARAM`](#errinvalidmodeparam-696). + However, clients MUST be able to handle any of the following: + + * [`ERR_INVALIDMODEPARAM`](#errinvalidmodeparam-696) + * [`ERR_INVALIDKEY`](#errinvalidkey-525) + * `MODE` echoed with a different key (eg. truncated or stripped of invalid + characters) + * the key changed ignored, and no `MODE` echoed if no other mode change + was valid. + " + -- https://modern.ircdocs.horse/#key-channel-mode + -- https://github.com/ircdocs/modern-irc/pull/111 + """ + self.connectClient("bar") + self.joinChannel(1, "#chan") + self.sendLine(1, f"MODE #chan +k :{key}") + + # The spec requires no space; but doesn't say what to do + # if there is one. + # Let's check the various alternatives + + replies = self.getMessages(1) + self.assertNotIn( + ERR_UNKNOWNERROR, + {msg.command for msg in replies}, + fail_msg="Sending an invalid key caused an " + "ERR_UNKNOWNERROR instead of being handled explicitly " + "(eg. ERR_INVALIDMODEPARAM or truncation): {msg}", + ) + + commands = {msg.command for msg in replies} + if {ERR_INVALIDMODEPARAM, ERR_INVALIDKEY} & commands: + # First option: ERR_INVALIDMODEPARAM (eg. Ergo) or ERR_INVALIDKEY + # (eg. ircu2) + if ERR_INVALIDMODEPARAM in commands: + command = [ + msg for msg in replies if msg.command == ERR_INVALIDMODEPARAM + ] + self.assertEqual(len(command), 1, command) + self.assertMessageMatch( + command[0], + command=ERR_INVALIDMODEPARAM, + params=["bar", "#chan", "k", "*", ANYSTR], + ) + return + + if not replies: + # MODE was ignored entirely + self.connectClient("foo") + self.sendLine(2, "JOIN #chan") + self.assertMessageMatch( + self.getMessage(2), command="JOIN", params=["#chan"] + ) + return + + # Second and third options: truncating the key (eg. UnrealIRCd) + # or replacing spaces (eg. Charybdis) + mode_commands = [msg for msg in replies if msg.command == "MODE"] + self.assertGreaterEqual( + len(mode_commands), + 1, + fail_msg="Sending an invalid key (with a space) triggered " + "neither ERR_UNKNOWNERROR, ERR_INVALIDMODEPARAM, ERR_INVALIDKEY, " + " or a MODE. Only these: {}", + extra_format=(replies,), + ) + self.assertLessEqual( + len(mode_commands), + 1, + fail_msg="Sending an invalid key (with a space) triggered " + "multiple MODE responses: {}", + extra_format=(replies,), + ) + + mode_command = mode_commands[0] + if mode_command.params == ["#chan", "+k", "passphrase"]: + key = "passphrase" + elif mode_command.params == ["#chan", "+k", "passphrasewithspaces"]: + key = "passphrasewithspaces" + elif mode_command.params[2].startswith("longlonglong"): + key = mode_command.params[2] + assert mode_command.params == ["#chan", "+k", key] + elif mode_command.params == ["#chan", "+k", "passphrase with spaces"]: + raise self.failureException("Invalid key (with a space) was not rejected.") + + self.connectClient("foo") + self.sendLine(2, f"JOIN #chan {key}") + self.assertMessageMatch(self.getMessage(2), command="JOIN", params=["#chan"]) diff --git a/irctest/server_tests/chmodes/moderated.py b/irctest/server_tests/chmodes/moderated.py new file mode 100644 index 0000000..bd2e2f9 --- /dev/null +++ b/irctest/server_tests/chmodes/moderated.py @@ -0,0 +1,37 @@ +from irctest import cases +from irctest.numerics import ERR_CANNOTSENDTOCHAN + + +class ModeratedMode(cases.BaseServerTestCase): + @cases.mark_specifications("RFC2812") + def testModeratedMode(self): + # test the +m channel mode + self.connectClient("chanop", name="chanop") + self.joinChannel("chanop", "#chan") + self.getMessages("chanop") + self.sendLine("chanop", "MODE #chan +m") + replies = self.getMessages("chanop") + modeLines = [line for line in replies if line.command == "MODE"] + self.assertMessageMatch(modeLines[0], command="MODE", params=["#chan", "+m"]) + + self.connectClient("baz", name="baz") + self.joinChannel("baz", "#chan") + self.getMessages("chanop") + # this message should be suppressed completely by +m + self.sendLine("baz", "PRIVMSG #chan :hi from baz") + replies = self.getMessages("baz") + reply_cmds = {reply.command for reply in replies} + self.assertIn(ERR_CANNOTSENDTOCHAN, reply_cmds) + self.assertEqual(self.getMessages("chanop"), []) + + # grant +v, user should be able to send messages + self.sendLine("chanop", "MODE #chan +v baz") + self.getMessages("chanop") + self.getMessages("baz") + self.sendLine("baz", "PRIVMSG #chan :hi again from baz") + self.getMessages("baz") + relays = self.getMessages("chanop") + relay = relays[0] + self.assertMessageMatch( + relay, command="PRIVMSG", params=["#chan", "hi again from baz"] + ) diff --git a/irctest/server_tests/chmodes/mute_extban.py b/irctest/server_tests/chmodes/mute_extban.py new file mode 100644 index 0000000..30fc6af --- /dev/null +++ b/irctest/server_tests/chmodes/mute_extban.py @@ -0,0 +1,267 @@ +from irctest import cases, runner +from irctest.numerics import ERR_CANNOTSENDTOCHAN, ERR_CHANOPRIVSNEEDED +from irctest.patma import ANYLIST, StrRe + + +class MuteExtban(cases.BaseServerTestCase): + """https://defs.ircdocs.horse/defs/isupport.html#extban + + It magically guesses what char the IRCd uses for mutes.""" + + def char(self): + if self.controller.extban_mute_char is None: + raise runner.ExtbanNotSupported("", "mute") + else: + return self.controller.extban_mute_char + + @cases.mark_specifications("Ergo") + def testISupport(self): + self.connectClient(1) # Fetches ISUPPORT + isupport = self.server_support + token = isupport["EXTBAN"] + prefix, comma, types = token.partition(",") + self.assertIn(self.char(), types, f"Missing '{self.char()}' in ISUPPORT EXTBAN") + self.assertEqual(prefix, "") + self.assertEqual(comma, ",") + + @cases.mark_specifications("ircdocs") + def testMuteExtban(self): + """Basic usage of mute""" + self.connectClient("chanop", name="chanop") + + isupport = self.server_support + token = isupport.get("EXTBAN", "") + prefix, comma, types = token.partition(",") + if self.char() not in types: + raise runner.ExtbanNotSupported(self.char(), "mute") + + clients = ("chanop", "bar") + + # Mute "bar" + self.joinChannel("chanop", "#chan") + self.getMessages("chanop") + self.sendLine("chanop", f"MODE #chan +b {prefix}{self.char()}:bar!*@*") + replies = {msg.command for msg in self.getMessages("chanop")} + self.assertIn("MODE", replies) + self.assertNotIn(ERR_CHANOPRIVSNEEDED, replies) + + self.connectClient("bar", name="bar", capabilities=["echo-message"]) + self.joinChannel("bar", "#chan") + + for client in clients: + self.getMessages(client) + + # "bar" sees the MODE too + self.sendLine("bar", "MODE #chan +b") + self.assertMessageMatch( + self.getMessage("bar"), + command="367", + params=[ + "bar", + "#chan", + f"{prefix}{self.char()}:bar!*@*", + StrRe("chanop(!.*)?"), + *ANYLIST, + ], + ) + self.getMessages("bar") + + # "bar" talks: rejected + self.sendLine("bar", "PRIVMSG #chan :hi from bar") + replies = self.getMessages("bar") + replies_cmds = {msg.command for msg in replies} + self.assertNotIn("PRIVMSG", replies_cmds) + self.assertIn(ERR_CANNOTSENDTOCHAN, replies_cmds) + self.assertEqual(self.getMessages("chanop"), []) + + # remove mute on "bar" with -b + self.getMessages("chanop") + self.sendLine("chanop", f"MODE #chan -b {prefix}{self.char()}:bar!*@*") + replies = {msg.command for msg in self.getMessages("chanop")} + self.assertIn("MODE", replies) + self.assertNotIn(ERR_CHANOPRIVSNEEDED, replies) + + # "bar" can now talk + self.sendLine("bar", "PRIVMSG #chan :hi again from bar") + replies = self.getMessages("bar") + replies_cmds = {msg.command for msg in replies} + self.assertIn("PRIVMSG", replies_cmds) + self.assertNotIn(ERR_CANNOTSENDTOCHAN, replies_cmds) + self.assertEqual( + self.getMessages("chanop"), + [msg for msg in replies if msg.command == "PRIVMSG"], + ) + + @cases.mark_specifications("ircdocs") + def testMuteExtbanVoiced(self): + """Checks +v overrides the mute""" + self.connectClient("chanop", name="chanop") + + isupport = self.server_support + token = isupport.get("EXTBAN", "") + prefix, comma, types = token.partition(",") + if self.char() not in types: + raise runner.ExtbanNotSupported(self.char(), "mute") + + clients = ("chanop", "qux") + + # Mute "qux" + self.joinChannel("chanop", "#chan") + self.getMessages("chanop") + self.sendLine("chanop", f"MODE #chan +b {prefix}{self.char()}:qux!*@*") + replies = {msg.command for msg in self.getMessages("chanop")} + self.assertIn("MODE", replies) + self.assertNotIn(ERR_CHANOPRIVSNEEDED, replies) + + self.connectClient( + "qux", name="qux", ident="evan", capabilities=["echo-message"] + ) + self.joinChannel("qux", "#chan") + + for client in clients: + self.getMessages(client) + + # "qux" talks: rejected + self.sendLine("qux", "PRIVMSG #chan :hi from qux") + replies = self.getMessages("qux") + replies_cmds = {msg.command for msg in replies} + self.assertNotIn("PRIVMSG", replies_cmds) + self.assertIn(ERR_CANNOTSENDTOCHAN, replies_cmds) + self.assertEqual(self.getMessages("chanop"), []) + + for client in clients: + self.getMessages(client) + + # +v grants an exemption to +b + self.sendLine("chanop", "MODE #chan +v qux") + replies = {msg.command for msg in self.getMessages("chanop")} + self.assertIn("MODE", replies) + self.assertNotIn(ERR_CHANOPRIVSNEEDED, replies) + + # so "qux" can now talk + self.sendLine("qux", "PRIVMSG #chan :hi again from qux") + replies = self.getMessages("qux") + replies_cmds = {msg.command for msg in replies} + self.assertIn("PRIVMSG", replies_cmds) + self.assertNotIn(ERR_CANNOTSENDTOCHAN, replies_cmds) + self.assertEqual( + self.getMessages("chanop"), + [msg for msg in replies if msg.command == "PRIVMSG"], + ) + + @cases.mark_specifications("ircdocs") + def testMuteExtbanExempt(self): + """Checks +e overrides the mute + + """ + self.connectClient("chanop", name="chanop") + + isupport = self.server_support + token = isupport.get("EXTBAN", "") + prefix, comma, types = token.partition(",") + if self.char() not in types: + raise runner.ExtbanNotSupported(self.char(), "mute") + if "e" not in self.server_support["CHANMODES"]: + raise runner.ChannelModeNotSupported(self.char(), "mute") + + clients = ("chanop", "qux") + + # Mute "qux" + self.joinChannel("chanop", "#chan") + self.getMessages("chanop") + self.sendLine("chanop", f"MODE #chan +b {prefix}{self.char()}:qux!*@*") + replies = {msg.command for msg in self.getMessages("chanop")} + self.assertIn("MODE", replies) + self.assertNotIn(ERR_CHANOPRIVSNEEDED, replies) + + self.connectClient( + "qux", name="qux", ident="evan", capabilities=["echo-message"] + ) + self.joinChannel("qux", "#chan") + + for client in clients: + self.getMessages(client) + + # "qux" talks: rejected + self.sendLine("qux", "PRIVMSG #chan :hi from qux") + replies = self.getMessages("qux") + replies_cmds = {msg.command for msg in replies} + self.assertNotIn("PRIVMSG", replies_cmds) + self.assertIn(ERR_CANNOTSENDTOCHAN, replies_cmds) + self.assertEqual(self.getMessages("chanop"), []) + + for client in clients: + self.getMessages(client) + + # +e grants an exemption to +b + self.sendLine("chanop", f"MODE #chan +e {prefix}{self.char()}:*!~evan@*") + replies = {msg.command for msg in self.getMessages("chanop")} + self.assertIn("MODE", replies) + self.assertNotIn(ERR_CHANOPRIVSNEEDED, replies) + + self.getMessages("qux") + + # so "qux" can now talk + self.sendLine("qux", "PRIVMSG #chan :thanks for mute-excepting me") + replies = self.getMessages("qux") + replies_cmds = {msg.command for msg in replies} + self.assertIn("PRIVMSG", replies_cmds) + self.assertNotIn(ERR_CANNOTSENDTOCHAN, replies_cmds) + self.assertEqual( + self.getMessages("chanop"), + [msg for msg in replies if msg.command == "PRIVMSG"], + ) + + @cases.mark_specifications("Ergo") + def testCapitalization(self): + """ + Regression test for oragono #1370: mutes not correctly enforced against + users with capital letters in their NUH + + For consistency with regular -b, which allows unsetting up to + normalization + """ + clients = ("chanop", "bar") + + self.connectClient("chanop", name="chanop") + + isupport = self.server_support + token = isupport.get("EXTBAN", "") + prefix, comma, types = token.partition(",") + + self.joinChannel("chanop", "#chan") + self.getMessages("chanop") + self.sendLine("chanop", f"MODE #chan +b {prefix}{self.char()}:BAR!*@*") + replies = {msg.command for msg in self.getMessages("chanop")} + self.assertIn("MODE", replies) + self.assertNotIn(ERR_CHANOPRIVSNEEDED, replies) + + self.connectClient("Bar", name="bar", capabilities=["echo-message"]) + self.joinChannel("bar", "#chan") + + for client in clients: + self.getMessages(client) + + self.sendLine("bar", "PRIVMSG #chan :hi from bar") + replies = self.getMessages("bar") + replies_cmds = {msg.command for msg in replies} + self.assertNotIn("PRIVMSG", replies_cmds) + self.assertIn(ERR_CANNOTSENDTOCHAN, replies_cmds) + self.assertEqual(self.getMessages("chanop"), []) + + # remove mute with -b + self.sendLine("chanop", f"MODE #chan -b {prefix}{self.char()}:bar!*@*") + replies = {msg.command for msg in self.getMessages("chanop")} + self.assertIn("MODE", replies) + self.assertNotIn(ERR_CHANOPRIVSNEEDED, replies) + + # "bar" can talk again + self.sendLine("bar", "PRIVMSG #chan :hi again from bar") + replies = self.getMessages("bar") + replies_cmds = {msg.command for msg in replies} + self.assertIn("PRIVMSG", replies_cmds) + self.assertNotIn(ERR_CANNOTSENDTOCHAN, replies_cmds) + self.assertEqual( + self.getMessages("chanop"), + [msg for msg in replies if msg.command == "PRIVMSG"], + ) diff --git a/irctest/server_tests/mode.py b/irctest/server_tests/mode.py deleted file mode 100644 index 0912be6..0000000 --- a/irctest/server_tests/mode.py +++ /dev/null @@ -1,720 +0,0 @@ -import math -import time - -import pytest - -from irctest import cases, runner -from irctest.irc_utils.junkdrawer import ircv3_timestamp_to_unixtime -from irctest.numerics import ( - ERR_BADCHANNELKEY, - ERR_BANNEDFROMCHAN, - ERR_CANNOTSENDTOCHAN, - ERR_CHANOPRIVSNEEDED, - ERR_INVALIDKEY, - ERR_INVALIDMODEPARAM, - ERR_UNKNOWNERROR, - RPL_NAMREPLY, -) -from irctest.patma import ANYLIST, ANYSTR, StrRe - -MODERN_CAPS = [ - "server-time", - "message-tags", - "batch", - "labeled-response", - "echo-message", - "account-tag", -] - - -class KeyTestCase(cases.BaseServerTestCase): - @cases.mark_specifications("RFC1459", "RFC2812") - def testKeyNormal(self): - self.connectClient("bar") - self.joinChannel(1, "#chan") - self.sendLine(1, "MODE #chan +k beer") - self.getMessages(1) - - self.connectClient("qux") - self.getMessages(2) - self.sendLine(2, "JOIN #chan") - reply = self.getMessages(2) - self.assertNotIn("JOIN", {msg.command for msg in reply}) - self.assertIn(ERR_BADCHANNELKEY, {msg.command for msg in reply}) - - self.sendLine(2, "JOIN #chan beer") - reply = self.getMessages(2) - self.assertMessageMatch(reply[0], command="JOIN", params=["#chan"]) - - @pytest.mark.parametrize( - "key", - ["passphrase with spaces", "long" * 100, ""], - ids=["spaces", "long", "empty"], - ) - @cases.mark_specifications("RFC2812", "Modern") - def testKeyValidation(self, key): - """ - key = 1*23( %x01-05 / %x07-08 / %x0C / %x0E-1F / %x21-7F ) - ; any 7-bit US_ASCII character, - ; except NUL, CR, LF, FF, h/v TABs, and " " - -- https://tools.ietf.org/html/rfc2812#page-8 - - "Servers may validate the value (eg. to forbid spaces, as they make it harder - to use the key in `JOIN` messages). If the value is invalid, they SHOULD - return [`ERR_INVALIDMODEPARAM`](#errinvalidmodeparam-696). - However, clients MUST be able to handle any of the following: - - * [`ERR_INVALIDMODEPARAM`](#errinvalidmodeparam-696) - * [`ERR_INVALIDKEY`](#errinvalidkey-525) - * `MODE` echoed with a different key (eg. truncated or stripped of invalid - characters) - * the key changed ignored, and no `MODE` echoed if no other mode change - was valid. - " - -- https://modern.ircdocs.horse/#key-channel-mode - -- https://github.com/ircdocs/modern-irc/pull/111 - """ - self.connectClient("bar") - self.joinChannel(1, "#chan") - self.sendLine(1, f"MODE #chan +k :{key}") - - # The spec requires no space; but doesn't say what to do - # if there is one. - # Let's check the various alternatives - - replies = self.getMessages(1) - self.assertNotIn( - ERR_UNKNOWNERROR, - {msg.command for msg in replies}, - fail_msg="Sending an invalid key caused an " - "ERR_UNKNOWNERROR instead of being handled explicitly " - "(eg. ERR_INVALIDMODEPARAM or truncation): {msg}", - ) - - commands = {msg.command for msg in replies} - if {ERR_INVALIDMODEPARAM, ERR_INVALIDKEY} & commands: - # First option: ERR_INVALIDMODEPARAM (eg. Ergo) or ERR_INVALIDKEY - # (eg. ircu2) - if ERR_INVALIDMODEPARAM in commands: - command = [ - msg for msg in replies if msg.command == ERR_INVALIDMODEPARAM - ] - self.assertEqual(len(command), 1, command) - self.assertMessageMatch( - command[0], - command=ERR_INVALIDMODEPARAM, - params=["bar", "#chan", "k", "*", ANYSTR], - ) - return - - if not replies: - # MODE was ignored entirely - self.connectClient("foo") - self.sendLine(2, "JOIN #chan") - self.assertMessageMatch( - self.getMessage(2), command="JOIN", params=["#chan"] - ) - return - - # Second and third options: truncating the key (eg. UnrealIRCd) - # or replacing spaces (eg. Charybdis) - mode_commands = [msg for msg in replies if msg.command == "MODE"] - self.assertGreaterEqual( - len(mode_commands), - 1, - fail_msg="Sending an invalid key (with a space) triggered " - "neither ERR_UNKNOWNERROR, ERR_INVALIDMODEPARAM, ERR_INVALIDKEY, " - " or a MODE. Only these: {}", - extra_format=(replies,), - ) - self.assertLessEqual( - len(mode_commands), - 1, - fail_msg="Sending an invalid key (with a space) triggered " - "multiple MODE responses: {}", - extra_format=(replies,), - ) - - mode_command = mode_commands[0] - if mode_command.params == ["#chan", "+k", "passphrase"]: - key = "passphrase" - elif mode_command.params == ["#chan", "+k", "passphrasewithspaces"]: - key = "passphrasewithspaces" - elif mode_command.params[2].startswith("longlonglong"): - key = mode_command.params[2] - assert mode_command.params == ["#chan", "+k", key] - elif mode_command.params == ["#chan", "+k", "passphrase with spaces"]: - raise self.failureException("Invalid key (with a space) was not rejected.") - - self.connectClient("foo") - self.sendLine(2, f"JOIN #chan {key}") - self.assertMessageMatch(self.getMessage(2), command="JOIN", params=["#chan"]) - - -class AuditoriumTestCase(cases.BaseServerTestCase): - @cases.mark_specifications("Ergo") - def testAuditorium(self): - self.connectClient("bar", name="bar", capabilities=MODERN_CAPS) - self.joinChannel("bar", "#auditorium") - self.getMessages("bar") - self.sendLine("bar", "MODE #auditorium +u") - modelines = [msg for msg in self.getMessages("bar") if msg.command == "MODE"] - self.assertEqual(len(modelines), 1) - self.assertMessageMatch(modelines[0], params=["#auditorium", "+u"]) - - self.connectClient("guest1", name="guest1", capabilities=MODERN_CAPS) - self.joinChannel("guest1", "#auditorium") - self.getMessages("guest1") - # chanop should get a JOIN message - join_msgs = [msg for msg in self.getMessages("bar") if msg.command == "JOIN"] - self.assertEqual(len(join_msgs), 1) - self.assertMessageMatch(join_msgs[0], nick="guest1", params=["#auditorium"]) - - self.connectClient("guest2", name="guest2", capabilities=MODERN_CAPS) - self.joinChannel("guest2", "#auditorium") - self.getMessages("guest2") - # chanop should get a JOIN message - join_msgs = [msg for msg in self.getMessages("bar") if msg.command == "JOIN"] - self.assertEqual(len(join_msgs), 1) - join_msg = join_msgs[0] - self.assertMessageMatch(join_msg, nick="guest2", params=["#auditorium"]) - # oragono/oragono#1642 ; msgid should be populated, - # and the time tag should be sane - self.assertTrue(join_msg.tags.get("msgid")) - self.assertLessEqual( - math.fabs(time.time() - ircv3_timestamp_to_unixtime(join_msg.tags["time"])), - 60.0, - ) - # fellow unvoiced participant should not - unvoiced_join_msgs = [ - msg for msg in self.getMessages("guest1") if msg.command == "JOIN" - ] - self.assertEqual(len(unvoiced_join_msgs), 0) - - self.connectClient("guest3", name="guest3", capabilities=MODERN_CAPS) - self.joinChannel("guest3", "#auditorium") - self.getMessages("guest3") - - self.sendLine("bar", "PRIVMSG #auditorium hi") - echo_message = [ - msg for msg in self.getMessages("bar") if msg.command == "PRIVMSG" - ][0] - self.assertEqual(echo_message, self.getMessages("guest1")[0]) - self.assertEqual(echo_message, self.getMessages("guest2")[0]) - self.assertEqual(echo_message, self.getMessages("guest3")[0]) - - # unvoiced users can speak - self.sendLine("guest1", "PRIVMSG #auditorium :hi you") - echo_message = [ - msg for msg in self.getMessages("guest1") if msg.command == "PRIVMSG" - ][0] - self.assertEqual(self.getMessages("bar"), [echo_message]) - self.assertEqual(self.getMessages("guest2"), [echo_message]) - self.assertEqual(self.getMessages("guest3"), [echo_message]) - - def names(client): - self.sendLine(client, "NAMES #auditorium") - result = set() - for msg in self.getMessages(client): - if msg.command == RPL_NAMREPLY: - result.update(msg.params[-1].split()) - return result - - self.assertEqual(names("bar"), {"@bar", "guest1", "guest2", "guest3"}) - self.assertEqual(names("guest1"), {"@bar"}) - self.assertEqual(names("guest2"), {"@bar"}) - self.assertEqual(names("guest3"), {"@bar"}) - - self.sendLine("bar", "MODE #auditorium +v guest1") - modeLine = [msg for msg in self.getMessages("bar") if msg.command == "MODE"][0] - self.assertEqual(self.getMessages("guest1"), [modeLine]) - self.assertEqual(self.getMessages("guest2"), [modeLine]) - self.assertEqual(self.getMessages("guest3"), [modeLine]) - self.assertEqual(names("bar"), {"@bar", "+guest1", "guest2", "guest3"}) - self.assertEqual(names("guest2"), {"@bar", "+guest1"}) - self.assertEqual(names("guest3"), {"@bar", "+guest1"}) - - self.sendLine("guest1", "PART #auditorium") - part = [msg for msg in self.getMessages("guest1") if msg.command == "PART"][0] - # everyone should see voiced PART - self.assertEqual(self.getMessages("bar")[0], part) - self.assertEqual(self.getMessages("guest2")[0], part) - self.assertEqual(self.getMessages("guest3")[0], part) - - self.joinChannel("guest1", "#auditorium") - self.getMessages("guest1") - self.getMessages("bar") - - self.sendLine("guest2", "PART #auditorium") - part = [msg for msg in self.getMessages("guest2") if msg.command == "PART"][0] - self.assertEqual(self.getMessages("bar"), [part]) - # part should be hidden from unvoiced participants - self.assertEqual(self.getMessages("guest1"), []) - self.assertEqual(self.getMessages("guest3"), []) - - self.sendLine("guest3", "QUIT") - self.assertDisconnected("guest3") - # quit should be hidden from unvoiced participants - self.assertEqual( - len([msg for msg in self.getMessages("bar") if msg.command == "QUIT"]), 1 - ) - self.assertEqual( - len([msg for msg in self.getMessages("guest1") if msg.command == "QUIT"]), 0 - ) - - -class BanMode(cases.BaseServerTestCase): - @cases.mark_specifications("RFC1459", "RFC2812") - def testBan(self): - """Basic ban operation""" - self.connectClient("chanop", name="chanop") - self.joinChannel("chanop", "#chan") - self.getMessages("chanop") - self.sendLine("chanop", "MODE #chan +b bar!*@*") - self.assertMessageMatch(self.getMessage("chanop"), command="MODE") - - self.connectClient( - "Bar", name="bar", capabilities=["echo-message"], skip_if_cap_nak=True - ) - self.getMessages("bar") - self.sendLine("bar", "JOIN #chan") - self.assertMessageMatch(self.getMessage("bar"), command=ERR_BANNEDFROMCHAN) - - self.sendLine("chanop", "MODE #chan -b bar!*@*") - self.assertMessageMatch(self.getMessage("chanop"), command="MODE") - - self.sendLine("bar", "JOIN #chan") - self.assertMessageMatch(self.getMessage("bar"), command="JOIN") - - @cases.mark_specifications("Ergo") - def testCaseInsensitive(self): - """Some clients allow unsetting modes if their argument matches - up to normalization""" - self.connectClient("chanop", name="chanop") - self.joinChannel("chanop", "#chan") - self.getMessages("chanop") - self.sendLine("chanop", "MODE #chan +b BAR!*@*") - self.assertMessageMatch(self.getMessage("chanop"), command="MODE") - - self.connectClient("Bar", name="bar", capabilities=["echo-message"]) - self.getMessages("bar") - self.sendLine("bar", "JOIN #chan") - self.assertMessageMatch(self.getMessage("bar"), command=ERR_BANNEDFROMCHAN) - - self.sendLine("chanop", "MODE #chan -b bar!*@*") - self.assertMessageMatch(self.getMessage("chanop"), command="MODE") - - self.sendLine("bar", "JOIN #chan") - self.assertMessageMatch(self.getMessage("bar"), command="JOIN") - - -class ModeratedMode(cases.BaseServerTestCase): - @cases.mark_specifications("RFC2812") - def testModeratedMode(self): - # test the +m channel mode - self.connectClient("chanop", name="chanop") - self.joinChannel("chanop", "#chan") - self.getMessages("chanop") - self.sendLine("chanop", "MODE #chan +m") - replies = self.getMessages("chanop") - modeLines = [line for line in replies if line.command == "MODE"] - self.assertMessageMatch(modeLines[0], command="MODE", params=["#chan", "+m"]) - - self.connectClient("baz", name="baz") - self.joinChannel("baz", "#chan") - self.getMessages("chanop") - # this message should be suppressed completely by +m - self.sendLine("baz", "PRIVMSG #chan :hi from baz") - replies = self.getMessages("baz") - reply_cmds = {reply.command for reply in replies} - self.assertIn(ERR_CANNOTSENDTOCHAN, reply_cmds) - self.assertEqual(self.getMessages("chanop"), []) - - # grant +v, user should be able to send messages - self.sendLine("chanop", "MODE #chan +v baz") - self.getMessages("chanop") - self.getMessages("baz") - self.sendLine("baz", "PRIVMSG #chan :hi again from baz") - self.getMessages("baz") - relays = self.getMessages("chanop") - relay = relays[0] - self.assertMessageMatch( - relay, command="PRIVMSG", params=["#chan", "hi again from baz"] - ) - - -@cases.mark_services -class RegisteredOnlySpeakMode(cases.BaseServerTestCase): - @cases.mark_specifications("Ergo") - def testRegisteredOnlySpeakMode(self): - self.controller.registerUser(self, "evan", "sesame") - - # test the +M (only registered users and ops can speak) channel mode - self.connectClient("chanop", name="chanop") - self.joinChannel("chanop", "#chan") - self.getMessages("chanop") - self.sendLine("chanop", "MODE #chan +M") - replies = self.getMessages("chanop") - modeLines = [line for line in replies if line.command == "MODE"] - self.assertMessageMatch(modeLines[0], command="MODE", params=["#chan", "+M"]) - - self.connectClient("baz", name="baz") - self.joinChannel("baz", "#chan") - self.getMessages("chanop") - # this message should be suppressed completely by +M - self.sendLine("baz", "PRIVMSG #chan :hi from baz") - replies = self.getMessages("baz") - reply_cmds = {reply.command for reply in replies} - self.assertIn(ERR_CANNOTSENDTOCHAN, reply_cmds) - self.assertEqual(self.getMessages("chanop"), []) - - # +v exempts users from the registration requirement: - self.sendLine("chanop", "MODE #chan +v baz") - self.getMessages("chanop") - self.getMessages("baz") - self.sendLine("baz", "PRIVMSG #chan :hi again from baz") - replies = self.getMessages("baz") - # baz should not receive an error (or an echo) - self.assertEqual(replies, []) - replies = self.getMessages("chanop") - self.assertMessageMatch( - replies[0], command="PRIVMSG", params=["#chan", "hi again from baz"] - ) - - self.connectClient( - "evan", - name="evan", - account="evan", - password="sesame", - capabilities=["sasl"], - ) - self.joinChannel("evan", "#chan") - self.getMessages("baz") - self.sendLine("evan", "PRIVMSG #chan :hi from evan") - replies = self.getMessages("evan") - # evan should not receive an error (or an echo) - self.assertEqual(replies, []) - replies = self.getMessages("baz") - self.assertMessageMatch( - replies[0], command="PRIVMSG", params=["#chan", "hi from evan"] - ) - - -class OpModerated(cases.BaseServerTestCase): - @cases.mark_specifications("Ergo") - def testOpModerated(self): - # test the +U channel mode - self.connectClient("chanop", name="chanop", capabilities=MODERN_CAPS) - self.joinChannel("chanop", "#chan") - self.getMessages("chanop") - self.sendLine("chanop", "MODE #chan +U") - replies = {msg.command for msg in self.getMessages("chanop")} - self.assertIn("MODE", replies) - self.assertNotIn(ERR_CHANOPRIVSNEEDED, replies) - - self.connectClient("baz", name="baz", capabilities=MODERN_CAPS) - self.joinChannel("baz", "#chan") - self.sendLine("baz", "PRIVMSG #chan :hi from baz") - echo = self.getMessages("baz")[0] - self.assertMessageMatch( - echo, command="PRIVMSG", params=["#chan", "hi from baz"] - ) - self.assertEqual( - [msg for msg in self.getMessages("chanop") if msg.command == "PRIVMSG"], - [echo], - ) - - self.connectClient("qux", name="qux", capabilities=MODERN_CAPS) - self.joinChannel("qux", "#chan") - self.sendLine("qux", "PRIVMSG #chan :hi from qux") - echo = self.getMessages("qux")[0] - self.assertMessageMatch( - echo, command="PRIVMSG", params=["#chan", "hi from qux"] - ) - # message is relayed to chanop but not to unprivileged - self.assertEqual( - [msg for msg in self.getMessages("chanop") if msg.command == "PRIVMSG"], - [echo], - ) - self.assertEqual( - [msg for msg in self.getMessages("baz") if msg.command == "PRIVMSG"], [] - ) - - self.sendLine("chanop", "MODE #chan +v qux") - self.getMessages("chanop") - self.sendLine("qux", "PRIVMSG #chan :hi again from qux") - echo = [msg for msg in self.getMessages("qux") if msg.command == "PRIVMSG"][0] - self.assertMessageMatch( - echo, command="PRIVMSG", params=["#chan", "hi again from qux"] - ) - self.assertEqual( - [msg for msg in self.getMessages("chanop") if msg.command == "PRIVMSG"], - [echo], - ) - self.assertEqual( - [msg for msg in self.getMessages("baz") if msg.command == "PRIVMSG"], [echo] - ) - - -class MuteExtban(cases.BaseServerTestCase): - """https://defs.ircdocs.horse/defs/isupport.html#extban - - It magically guesses what char the IRCd uses for mutes.""" - - def char(self): - if self.controller.extban_mute_char is None: - raise runner.ExtbanNotSupported("", "mute") - else: - return self.controller.extban_mute_char - - @cases.mark_specifications("Ergo") - def testISupport(self): - self.connectClient(1) # Fetches ISUPPORT - isupport = self.server_support - token = isupport["EXTBAN"] - prefix, comma, types = token.partition(",") - self.assertIn(self.char(), types, f"Missing '{self.char()}' in ISUPPORT EXTBAN") - self.assertEqual(prefix, "") - self.assertEqual(comma, ",") - - @cases.mark_specifications("ircdocs") - def testMuteExtban(self): - """Basic usage of mute""" - self.connectClient("chanop", name="chanop") - - isupport = self.server_support - token = isupport.get("EXTBAN", "") - prefix, comma, types = token.partition(",") - if self.char() not in types: - raise runner.ExtbanNotSupported(self.char(), "mute") - - clients = ("chanop", "bar") - - # Mute "bar" - self.joinChannel("chanop", "#chan") - self.getMessages("chanop") - self.sendLine("chanop", f"MODE #chan +b {prefix}{self.char()}:bar!*@*") - replies = {msg.command for msg in self.getMessages("chanop")} - self.assertIn("MODE", replies) - self.assertNotIn(ERR_CHANOPRIVSNEEDED, replies) - - self.connectClient("bar", name="bar", capabilities=["echo-message"]) - self.joinChannel("bar", "#chan") - - for client in clients: - self.getMessages(client) - - # "bar" sees the MODE too - self.sendLine("bar", "MODE #chan +b") - self.assertMessageMatch( - self.getMessage("bar"), - command="367", - params=[ - "bar", - "#chan", - f"{prefix}{self.char()}:bar!*@*", - StrRe("chanop(!.*)?"), - *ANYLIST, - ], - ) - self.getMessages("bar") - - # "bar" talks: rejected - self.sendLine("bar", "PRIVMSG #chan :hi from bar") - replies = self.getMessages("bar") - replies_cmds = {msg.command for msg in replies} - self.assertNotIn("PRIVMSG", replies_cmds) - self.assertIn(ERR_CANNOTSENDTOCHAN, replies_cmds) - self.assertEqual(self.getMessages("chanop"), []) - - # remove mute on "bar" with -b - self.getMessages("chanop") - self.sendLine("chanop", f"MODE #chan -b {prefix}{self.char()}:bar!*@*") - replies = {msg.command for msg in self.getMessages("chanop")} - self.assertIn("MODE", replies) - self.assertNotIn(ERR_CHANOPRIVSNEEDED, replies) - - # "bar" can now talk - self.sendLine("bar", "PRIVMSG #chan :hi again from bar") - replies = self.getMessages("bar") - replies_cmds = {msg.command for msg in replies} - self.assertIn("PRIVMSG", replies_cmds) - self.assertNotIn(ERR_CANNOTSENDTOCHAN, replies_cmds) - self.assertEqual( - self.getMessages("chanop"), - [msg for msg in replies if msg.command == "PRIVMSG"], - ) - - @cases.mark_specifications("ircdocs") - def testMuteExtbanVoiced(self): - """Checks +v overrides the mute""" - self.connectClient("chanop", name="chanop") - - isupport = self.server_support - token = isupport.get("EXTBAN", "") - prefix, comma, types = token.partition(",") - if self.char() not in types: - raise runner.ExtbanNotSupported(self.char(), "mute") - - clients = ("chanop", "qux") - - # Mute "qux" - self.joinChannel("chanop", "#chan") - self.getMessages("chanop") - self.sendLine("chanop", f"MODE #chan +b {prefix}{self.char()}:qux!*@*") - replies = {msg.command for msg in self.getMessages("chanop")} - self.assertIn("MODE", replies) - self.assertNotIn(ERR_CHANOPRIVSNEEDED, replies) - - self.connectClient( - "qux", name="qux", ident="evan", capabilities=["echo-message"] - ) - self.joinChannel("qux", "#chan") - - for client in clients: - self.getMessages(client) - - # "qux" talks: rejected - self.sendLine("qux", "PRIVMSG #chan :hi from qux") - replies = self.getMessages("qux") - replies_cmds = {msg.command for msg in replies} - self.assertNotIn("PRIVMSG", replies_cmds) - self.assertIn(ERR_CANNOTSENDTOCHAN, replies_cmds) - self.assertEqual(self.getMessages("chanop"), []) - - for client in clients: - self.getMessages(client) - - # +v grants an exemption to +b - self.sendLine("chanop", "MODE #chan +v qux") - replies = {msg.command for msg in self.getMessages("chanop")} - self.assertIn("MODE", replies) - self.assertNotIn(ERR_CHANOPRIVSNEEDED, replies) - - # so "qux" can now talk - self.sendLine("qux", "PRIVMSG #chan :hi again from qux") - replies = self.getMessages("qux") - replies_cmds = {msg.command for msg in replies} - self.assertIn("PRIVMSG", replies_cmds) - self.assertNotIn(ERR_CANNOTSENDTOCHAN, replies_cmds) - self.assertEqual( - self.getMessages("chanop"), - [msg for msg in replies if msg.command == "PRIVMSG"], - ) - - @cases.mark_specifications("ircdocs") - def testMuteExtbanExempt(self): - """Checks +e overrides the mute - - """ - self.connectClient("chanop", name="chanop") - - isupport = self.server_support - token = isupport.get("EXTBAN", "") - prefix, comma, types = token.partition(",") - if self.char() not in types: - raise runner.ExtbanNotSupported(self.char(), "mute") - if "e" not in self.server_support["CHANMODES"]: - raise runner.ChannelModeNotSupported(self.char(), "mute") - - clients = ("chanop", "qux") - - # Mute "qux" - self.joinChannel("chanop", "#chan") - self.getMessages("chanop") - self.sendLine("chanop", f"MODE #chan +b {prefix}{self.char()}:qux!*@*") - replies = {msg.command for msg in self.getMessages("chanop")} - self.assertIn("MODE", replies) - self.assertNotIn(ERR_CHANOPRIVSNEEDED, replies) - - self.connectClient( - "qux", name="qux", ident="evan", capabilities=["echo-message"] - ) - self.joinChannel("qux", "#chan") - - for client in clients: - self.getMessages(client) - - # "qux" talks: rejected - self.sendLine("qux", "PRIVMSG #chan :hi from qux") - replies = self.getMessages("qux") - replies_cmds = {msg.command for msg in replies} - self.assertNotIn("PRIVMSG", replies_cmds) - self.assertIn(ERR_CANNOTSENDTOCHAN, replies_cmds) - self.assertEqual(self.getMessages("chanop"), []) - - for client in clients: - self.getMessages(client) - - # +e grants an exemption to +b - self.sendLine("chanop", f"MODE #chan +e {prefix}{self.char()}:*!~evan@*") - replies = {msg.command for msg in self.getMessages("chanop")} - self.assertIn("MODE", replies) - self.assertNotIn(ERR_CHANOPRIVSNEEDED, replies) - - self.getMessages("qux") - - # so "qux" can now talk - self.sendLine("qux", "PRIVMSG #chan :thanks for mute-excepting me") - replies = self.getMessages("qux") - replies_cmds = {msg.command for msg in replies} - self.assertIn("PRIVMSG", replies_cmds) - self.assertNotIn(ERR_CANNOTSENDTOCHAN, replies_cmds) - self.assertEqual( - self.getMessages("chanop"), - [msg for msg in replies if msg.command == "PRIVMSG"], - ) - - @cases.mark_specifications("Ergo") - def testCapitalization(self): - """ - Regression test for oragono #1370: mutes not correctly enforced against - users with capital letters in their NUH - - For consistency with regular -b, which allows unsetting up to - normalization - """ - clients = ("chanop", "bar") - - self.connectClient("chanop", name="chanop") - - isupport = self.server_support - token = isupport.get("EXTBAN", "") - prefix, comma, types = token.partition(",") - - self.joinChannel("chanop", "#chan") - self.getMessages("chanop") - self.sendLine("chanop", f"MODE #chan +b {prefix}{self.char()}:BAR!*@*") - replies = {msg.command for msg in self.getMessages("chanop")} - self.assertIn("MODE", replies) - self.assertNotIn(ERR_CHANOPRIVSNEEDED, replies) - - self.connectClient("Bar", name="bar", capabilities=["echo-message"]) - self.joinChannel("bar", "#chan") - - for client in clients: - self.getMessages(client) - - self.sendLine("bar", "PRIVMSG #chan :hi from bar") - replies = self.getMessages("bar") - replies_cmds = {msg.command for msg in replies} - self.assertNotIn("PRIVMSG", replies_cmds) - self.assertIn(ERR_CANNOTSENDTOCHAN, replies_cmds) - self.assertEqual(self.getMessages("chanop"), []) - - # remove mute with -b - self.sendLine("chanop", f"MODE #chan -b {prefix}{self.char()}:bar!*@*") - replies = {msg.command for msg in self.getMessages("chanop")} - self.assertIn("MODE", replies) - self.assertNotIn(ERR_CHANOPRIVSNEEDED, replies) - - # "bar" can talk again - self.sendLine("bar", "PRIVMSG #chan :hi again from bar") - replies = self.getMessages("bar") - replies_cmds = {msg.command for msg in replies} - self.assertIn("PRIVMSG", replies_cmds) - self.assertNotIn(ERR_CANNOTSENDTOCHAN, replies_cmds) - self.assertEqual( - self.getMessages("chanop"), - [msg for msg in replies if msg.command == "PRIVMSG"], - )