mirror of
https://github.com/progval/irctest.git
synced 2025-04-05 06:49:47 +00:00
Split irctest/server_tests/mode.py into a subpackage
It was getting too big
This commit is contained in:
0
irctest/server_tests/chmodes/__init__.py
Normal file
0
irctest/server_tests/chmodes/__init__.py
Normal file
127
irctest/server_tests/chmodes/auditorium.py
Normal file
127
irctest/server_tests/chmodes/auditorium.py
Normal file
@ -0,0 +1,127 @@
|
||||
import math
|
||||
import time
|
||||
|
||||
from irctest import cases
|
||||
from irctest.irc_utils.junkdrawer import ircv3_timestamp_to_unixtime
|
||||
from irctest.numerics import RPL_NAMREPLY
|
||||
|
||||
MODERN_CAPS = [
|
||||
"server-time",
|
||||
"message-tags",
|
||||
"batch",
|
||||
"labeled-response",
|
||||
"echo-message",
|
||||
"account-tag",
|
||||
]
|
||||
|
||||
|
||||
class AuditoriumTestCase(cases.BaseServerTestCase):
|
||||
@cases.mark_specifications("Ergo")
|
||||
def testAuditorium(self):
|
||||
self.connectClient("bar", name="bar", capabilities=MODERN_CAPS)
|
||||
self.joinChannel("bar", "#auditorium")
|
||||
self.getMessages("bar")
|
||||
self.sendLine("bar", "MODE #auditorium +u")
|
||||
modelines = [msg for msg in self.getMessages("bar") if msg.command == "MODE"]
|
||||
self.assertEqual(len(modelines), 1)
|
||||
self.assertMessageMatch(modelines[0], params=["#auditorium", "+u"])
|
||||
|
||||
self.connectClient("guest1", name="guest1", capabilities=MODERN_CAPS)
|
||||
self.joinChannel("guest1", "#auditorium")
|
||||
self.getMessages("guest1")
|
||||
# chanop should get a JOIN message
|
||||
join_msgs = [msg for msg in self.getMessages("bar") if msg.command == "JOIN"]
|
||||
self.assertEqual(len(join_msgs), 1)
|
||||
self.assertMessageMatch(join_msgs[0], nick="guest1", params=["#auditorium"])
|
||||
|
||||
self.connectClient("guest2", name="guest2", capabilities=MODERN_CAPS)
|
||||
self.joinChannel("guest2", "#auditorium")
|
||||
self.getMessages("guest2")
|
||||
# chanop should get a JOIN message
|
||||
join_msgs = [msg for msg in self.getMessages("bar") if msg.command == "JOIN"]
|
||||
self.assertEqual(len(join_msgs), 1)
|
||||
join_msg = join_msgs[0]
|
||||
self.assertMessageMatch(join_msg, nick="guest2", params=["#auditorium"])
|
||||
# oragono/oragono#1642 ; msgid should be populated,
|
||||
# and the time tag should be sane
|
||||
self.assertTrue(join_msg.tags.get("msgid"))
|
||||
self.assertLessEqual(
|
||||
math.fabs(time.time() - ircv3_timestamp_to_unixtime(join_msg.tags["time"])),
|
||||
60.0,
|
||||
)
|
||||
# fellow unvoiced participant should not
|
||||
unvoiced_join_msgs = [
|
||||
msg for msg in self.getMessages("guest1") if msg.command == "JOIN"
|
||||
]
|
||||
self.assertEqual(len(unvoiced_join_msgs), 0)
|
||||
|
||||
self.connectClient("guest3", name="guest3", capabilities=MODERN_CAPS)
|
||||
self.joinChannel("guest3", "#auditorium")
|
||||
self.getMessages("guest3")
|
||||
|
||||
self.sendLine("bar", "PRIVMSG #auditorium hi")
|
||||
echo_message = [
|
||||
msg for msg in self.getMessages("bar") if msg.command == "PRIVMSG"
|
||||
][0]
|
||||
self.assertEqual(echo_message, self.getMessages("guest1")[0])
|
||||
self.assertEqual(echo_message, self.getMessages("guest2")[0])
|
||||
self.assertEqual(echo_message, self.getMessages("guest3")[0])
|
||||
|
||||
# unvoiced users can speak
|
||||
self.sendLine("guest1", "PRIVMSG #auditorium :hi you")
|
||||
echo_message = [
|
||||
msg for msg in self.getMessages("guest1") if msg.command == "PRIVMSG"
|
||||
][0]
|
||||
self.assertEqual(self.getMessages("bar"), [echo_message])
|
||||
self.assertEqual(self.getMessages("guest2"), [echo_message])
|
||||
self.assertEqual(self.getMessages("guest3"), [echo_message])
|
||||
|
||||
def names(client):
|
||||
self.sendLine(client, "NAMES #auditorium")
|
||||
result = set()
|
||||
for msg in self.getMessages(client):
|
||||
if msg.command == RPL_NAMREPLY:
|
||||
result.update(msg.params[-1].split())
|
||||
return result
|
||||
|
||||
self.assertEqual(names("bar"), {"@bar", "guest1", "guest2", "guest3"})
|
||||
self.assertEqual(names("guest1"), {"@bar"})
|
||||
self.assertEqual(names("guest2"), {"@bar"})
|
||||
self.assertEqual(names("guest3"), {"@bar"})
|
||||
|
||||
self.sendLine("bar", "MODE #auditorium +v guest1")
|
||||
modeLine = [msg for msg in self.getMessages("bar") if msg.command == "MODE"][0]
|
||||
self.assertEqual(self.getMessages("guest1"), [modeLine])
|
||||
self.assertEqual(self.getMessages("guest2"), [modeLine])
|
||||
self.assertEqual(self.getMessages("guest3"), [modeLine])
|
||||
self.assertEqual(names("bar"), {"@bar", "+guest1", "guest2", "guest3"})
|
||||
self.assertEqual(names("guest2"), {"@bar", "+guest1"})
|
||||
self.assertEqual(names("guest3"), {"@bar", "+guest1"})
|
||||
|
||||
self.sendLine("guest1", "PART #auditorium")
|
||||
part = [msg for msg in self.getMessages("guest1") if msg.command == "PART"][0]
|
||||
# everyone should see voiced PART
|
||||
self.assertEqual(self.getMessages("bar")[0], part)
|
||||
self.assertEqual(self.getMessages("guest2")[0], part)
|
||||
self.assertEqual(self.getMessages("guest3")[0], part)
|
||||
|
||||
self.joinChannel("guest1", "#auditorium")
|
||||
self.getMessages("guest1")
|
||||
self.getMessages("bar")
|
||||
|
||||
self.sendLine("guest2", "PART #auditorium")
|
||||
part = [msg for msg in self.getMessages("guest2") if msg.command == "PART"][0]
|
||||
self.assertEqual(self.getMessages("bar"), [part])
|
||||
# part should be hidden from unvoiced participants
|
||||
self.assertEqual(self.getMessages("guest1"), [])
|
||||
self.assertEqual(self.getMessages("guest3"), [])
|
||||
|
||||
self.sendLine("guest3", "QUIT")
|
||||
self.assertDisconnected("guest3")
|
||||
# quit should be hidden from unvoiced participants
|
||||
self.assertEqual(
|
||||
len([msg for msg in self.getMessages("bar") if msg.command == "QUIT"]), 1
|
||||
)
|
||||
self.assertEqual(
|
||||
len([msg for msg in self.getMessages("guest1") if msg.command == "QUIT"]), 0
|
||||
)
|
47
irctest/server_tests/chmodes/ban.py
Normal file
47
irctest/server_tests/chmodes/ban.py
Normal file
@ -0,0 +1,47 @@
|
||||
from irctest import cases
|
||||
from irctest.numerics import ERR_BANNEDFROMCHAN
|
||||
|
||||
|
||||
class BanMode(cases.BaseServerTestCase):
|
||||
@cases.mark_specifications("RFC1459", "RFC2812")
|
||||
def testBan(self):
|
||||
"""Basic ban operation"""
|
||||
self.connectClient("chanop", name="chanop")
|
||||
self.joinChannel("chanop", "#chan")
|
||||
self.getMessages("chanop")
|
||||
self.sendLine("chanop", "MODE #chan +b bar!*@*")
|
||||
self.assertMessageMatch(self.getMessage("chanop"), command="MODE")
|
||||
|
||||
self.connectClient(
|
||||
"Bar", name="bar", capabilities=["echo-message"], skip_if_cap_nak=True
|
||||
)
|
||||
self.getMessages("bar")
|
||||
self.sendLine("bar", "JOIN #chan")
|
||||
self.assertMessageMatch(self.getMessage("bar"), command=ERR_BANNEDFROMCHAN)
|
||||
|
||||
self.sendLine("chanop", "MODE #chan -b bar!*@*")
|
||||
self.assertMessageMatch(self.getMessage("chanop"), command="MODE")
|
||||
|
||||
self.sendLine("bar", "JOIN #chan")
|
||||
self.assertMessageMatch(self.getMessage("bar"), command="JOIN")
|
||||
|
||||
@cases.mark_specifications("Ergo")
|
||||
def testCaseInsensitive(self):
|
||||
"""Some clients allow unsetting modes if their argument matches
|
||||
up to normalization"""
|
||||
self.connectClient("chanop", name="chanop")
|
||||
self.joinChannel("chanop", "#chan")
|
||||
self.getMessages("chanop")
|
||||
self.sendLine("chanop", "MODE #chan +b BAR!*@*")
|
||||
self.assertMessageMatch(self.getMessage("chanop"), command="MODE")
|
||||
|
||||
self.connectClient("Bar", name="bar", capabilities=["echo-message"])
|
||||
self.getMessages("bar")
|
||||
self.sendLine("bar", "JOIN #chan")
|
||||
self.assertMessageMatch(self.getMessage("bar"), command=ERR_BANNEDFROMCHAN)
|
||||
|
||||
self.sendLine("chanop", "MODE #chan -b bar!*@*")
|
||||
self.assertMessageMatch(self.getMessage("chanop"), command="MODE")
|
||||
|
||||
self.sendLine("bar", "JOIN #chan")
|
||||
self.assertMessageMatch(self.getMessage("bar"), command="JOIN")
|
124
irctest/server_tests/chmodes/ergo.py
Normal file
124
irctest/server_tests/chmodes/ergo.py
Normal file
@ -0,0 +1,124 @@
|
||||
from irctest import cases
|
||||
from irctest.numerics import ERR_CANNOTSENDTOCHAN, ERR_CHANOPRIVSNEEDED
|
||||
|
||||
MODERN_CAPS = [
|
||||
"server-time",
|
||||
"message-tags",
|
||||
"batch",
|
||||
"labeled-response",
|
||||
"echo-message",
|
||||
"account-tag",
|
||||
]
|
||||
|
||||
|
||||
@cases.mark_services
|
||||
class RegisteredOnlySpeakMode(cases.BaseServerTestCase):
|
||||
@cases.mark_specifications("Ergo")
|
||||
def testRegisteredOnlySpeakMode(self):
|
||||
self.controller.registerUser(self, "evan", "sesame")
|
||||
|
||||
# test the +M (only registered users and ops can speak) channel mode
|
||||
self.connectClient("chanop", name="chanop")
|
||||
self.joinChannel("chanop", "#chan")
|
||||
self.getMessages("chanop")
|
||||
self.sendLine("chanop", "MODE #chan +M")
|
||||
replies = self.getMessages("chanop")
|
||||
modeLines = [line for line in replies if line.command == "MODE"]
|
||||
self.assertMessageMatch(modeLines[0], command="MODE", params=["#chan", "+M"])
|
||||
|
||||
self.connectClient("baz", name="baz")
|
||||
self.joinChannel("baz", "#chan")
|
||||
self.getMessages("chanop")
|
||||
# this message should be suppressed completely by +M
|
||||
self.sendLine("baz", "PRIVMSG #chan :hi from baz")
|
||||
replies = self.getMessages("baz")
|
||||
reply_cmds = {reply.command for reply in replies}
|
||||
self.assertIn(ERR_CANNOTSENDTOCHAN, reply_cmds)
|
||||
self.assertEqual(self.getMessages("chanop"), [])
|
||||
|
||||
# +v exempts users from the registration requirement:
|
||||
self.sendLine("chanop", "MODE #chan +v baz")
|
||||
self.getMessages("chanop")
|
||||
self.getMessages("baz")
|
||||
self.sendLine("baz", "PRIVMSG #chan :hi again from baz")
|
||||
replies = self.getMessages("baz")
|
||||
# baz should not receive an error (or an echo)
|
||||
self.assertEqual(replies, [])
|
||||
replies = self.getMessages("chanop")
|
||||
self.assertMessageMatch(
|
||||
replies[0], command="PRIVMSG", params=["#chan", "hi again from baz"]
|
||||
)
|
||||
|
||||
self.connectClient(
|
||||
"evan",
|
||||
name="evan",
|
||||
account="evan",
|
||||
password="sesame",
|
||||
capabilities=["sasl"],
|
||||
)
|
||||
self.joinChannel("evan", "#chan")
|
||||
self.getMessages("baz")
|
||||
self.sendLine("evan", "PRIVMSG #chan :hi from evan")
|
||||
replies = self.getMessages("evan")
|
||||
# evan should not receive an error (or an echo)
|
||||
self.assertEqual(replies, [])
|
||||
replies = self.getMessages("baz")
|
||||
self.assertMessageMatch(
|
||||
replies[0], command="PRIVMSG", params=["#chan", "hi from evan"]
|
||||
)
|
||||
|
||||
|
||||
class OpModerated(cases.BaseServerTestCase):
|
||||
@cases.mark_specifications("Ergo")
|
||||
def testOpModerated(self):
|
||||
# test the +U channel mode
|
||||
self.connectClient("chanop", name="chanop", capabilities=MODERN_CAPS)
|
||||
self.joinChannel("chanop", "#chan")
|
||||
self.getMessages("chanop")
|
||||
self.sendLine("chanop", "MODE #chan +U")
|
||||
replies = {msg.command for msg in self.getMessages("chanop")}
|
||||
self.assertIn("MODE", replies)
|
||||
self.assertNotIn(ERR_CHANOPRIVSNEEDED, replies)
|
||||
|
||||
self.connectClient("baz", name="baz", capabilities=MODERN_CAPS)
|
||||
self.joinChannel("baz", "#chan")
|
||||
self.sendLine("baz", "PRIVMSG #chan :hi from baz")
|
||||
echo = self.getMessages("baz")[0]
|
||||
self.assertMessageMatch(
|
||||
echo, command="PRIVMSG", params=["#chan", "hi from baz"]
|
||||
)
|
||||
self.assertEqual(
|
||||
[msg for msg in self.getMessages("chanop") if msg.command == "PRIVMSG"],
|
||||
[echo],
|
||||
)
|
||||
|
||||
self.connectClient("qux", name="qux", capabilities=MODERN_CAPS)
|
||||
self.joinChannel("qux", "#chan")
|
||||
self.sendLine("qux", "PRIVMSG #chan :hi from qux")
|
||||
echo = self.getMessages("qux")[0]
|
||||
self.assertMessageMatch(
|
||||
echo, command="PRIVMSG", params=["#chan", "hi from qux"]
|
||||
)
|
||||
# message is relayed to chanop but not to unprivileged
|
||||
self.assertEqual(
|
||||
[msg for msg in self.getMessages("chanop") if msg.command == "PRIVMSG"],
|
||||
[echo],
|
||||
)
|
||||
self.assertEqual(
|
||||
[msg for msg in self.getMessages("baz") if msg.command == "PRIVMSG"], []
|
||||
)
|
||||
|
||||
self.sendLine("chanop", "MODE #chan +v qux")
|
||||
self.getMessages("chanop")
|
||||
self.sendLine("qux", "PRIVMSG #chan :hi again from qux")
|
||||
echo = [msg for msg in self.getMessages("qux") if msg.command == "PRIVMSG"][0]
|
||||
self.assertMessageMatch(
|
||||
echo, command="PRIVMSG", params=["#chan", "hi again from qux"]
|
||||
)
|
||||
self.assertEqual(
|
||||
[msg for msg in self.getMessages("chanop") if msg.command == "PRIVMSG"],
|
||||
[echo],
|
||||
)
|
||||
self.assertEqual(
|
||||
[msg for msg in self.getMessages("baz") if msg.command == "PRIVMSG"], [echo]
|
||||
)
|
134
irctest/server_tests/chmodes/key.py
Normal file
134
irctest/server_tests/chmodes/key.py
Normal file
@ -0,0 +1,134 @@
|
||||
import pytest
|
||||
|
||||
from irctest import cases
|
||||
from irctest.numerics import (
|
||||
ERR_BADCHANNELKEY,
|
||||
ERR_INVALIDKEY,
|
||||
ERR_INVALIDMODEPARAM,
|
||||
ERR_UNKNOWNERROR,
|
||||
)
|
||||
from irctest.patma import ANYSTR
|
||||
|
||||
|
||||
class KeyTestCase(cases.BaseServerTestCase):
|
||||
@cases.mark_specifications("RFC1459", "RFC2812")
|
||||
def testKeyNormal(self):
|
||||
self.connectClient("bar")
|
||||
self.joinChannel(1, "#chan")
|
||||
self.sendLine(1, "MODE #chan +k beer")
|
||||
self.getMessages(1)
|
||||
|
||||
self.connectClient("qux")
|
||||
self.getMessages(2)
|
||||
self.sendLine(2, "JOIN #chan")
|
||||
reply = self.getMessages(2)
|
||||
self.assertNotIn("JOIN", {msg.command for msg in reply})
|
||||
self.assertIn(ERR_BADCHANNELKEY, {msg.command for msg in reply})
|
||||
|
||||
self.sendLine(2, "JOIN #chan beer")
|
||||
reply = self.getMessages(2)
|
||||
self.assertMessageMatch(reply[0], command="JOIN", params=["#chan"])
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"key",
|
||||
["passphrase with spaces", "long" * 100, ""],
|
||||
ids=["spaces", "long", "empty"],
|
||||
)
|
||||
@cases.mark_specifications("RFC2812", "Modern")
|
||||
def testKeyValidation(self, key):
|
||||
"""
|
||||
key = 1*23( %x01-05 / %x07-08 / %x0C / %x0E-1F / %x21-7F )
|
||||
; any 7-bit US_ASCII character,
|
||||
; except NUL, CR, LF, FF, h/v TABs, and " "
|
||||
-- https://tools.ietf.org/html/rfc2812#page-8
|
||||
|
||||
"Servers may validate the value (eg. to forbid spaces, as they make it harder
|
||||
to use the key in `JOIN` messages). If the value is invalid, they SHOULD
|
||||
return [`ERR_INVALIDMODEPARAM`](#errinvalidmodeparam-696).
|
||||
However, clients MUST be able to handle any of the following:
|
||||
|
||||
* [`ERR_INVALIDMODEPARAM`](#errinvalidmodeparam-696)
|
||||
* [`ERR_INVALIDKEY`](#errinvalidkey-525)
|
||||
* `MODE` echoed with a different key (eg. truncated or stripped of invalid
|
||||
characters)
|
||||
* the key changed ignored, and no `MODE` echoed if no other mode change
|
||||
was valid.
|
||||
"
|
||||
-- https://modern.ircdocs.horse/#key-channel-mode
|
||||
-- https://github.com/ircdocs/modern-irc/pull/111
|
||||
"""
|
||||
self.connectClient("bar")
|
||||
self.joinChannel(1, "#chan")
|
||||
self.sendLine(1, f"MODE #chan +k :{key}")
|
||||
|
||||
# The spec requires no space; but doesn't say what to do
|
||||
# if there is one.
|
||||
# Let's check the various alternatives
|
||||
|
||||
replies = self.getMessages(1)
|
||||
self.assertNotIn(
|
||||
ERR_UNKNOWNERROR,
|
||||
{msg.command for msg in replies},
|
||||
fail_msg="Sending an invalid key caused an "
|
||||
"ERR_UNKNOWNERROR instead of being handled explicitly "
|
||||
"(eg. ERR_INVALIDMODEPARAM or truncation): {msg}",
|
||||
)
|
||||
|
||||
commands = {msg.command for msg in replies}
|
||||
if {ERR_INVALIDMODEPARAM, ERR_INVALIDKEY} & commands:
|
||||
# First option: ERR_INVALIDMODEPARAM (eg. Ergo) or ERR_INVALIDKEY
|
||||
# (eg. ircu2)
|
||||
if ERR_INVALIDMODEPARAM in commands:
|
||||
command = [
|
||||
msg for msg in replies if msg.command == ERR_INVALIDMODEPARAM
|
||||
]
|
||||
self.assertEqual(len(command), 1, command)
|
||||
self.assertMessageMatch(
|
||||
command[0],
|
||||
command=ERR_INVALIDMODEPARAM,
|
||||
params=["bar", "#chan", "k", "*", ANYSTR],
|
||||
)
|
||||
return
|
||||
|
||||
if not replies:
|
||||
# MODE was ignored entirely
|
||||
self.connectClient("foo")
|
||||
self.sendLine(2, "JOIN #chan")
|
||||
self.assertMessageMatch(
|
||||
self.getMessage(2), command="JOIN", params=["#chan"]
|
||||
)
|
||||
return
|
||||
|
||||
# Second and third options: truncating the key (eg. UnrealIRCd)
|
||||
# or replacing spaces (eg. Charybdis)
|
||||
mode_commands = [msg for msg in replies if msg.command == "MODE"]
|
||||
self.assertGreaterEqual(
|
||||
len(mode_commands),
|
||||
1,
|
||||
fail_msg="Sending an invalid key (with a space) triggered "
|
||||
"neither ERR_UNKNOWNERROR, ERR_INVALIDMODEPARAM, ERR_INVALIDKEY, "
|
||||
" or a MODE. Only these: {}",
|
||||
extra_format=(replies,),
|
||||
)
|
||||
self.assertLessEqual(
|
||||
len(mode_commands),
|
||||
1,
|
||||
fail_msg="Sending an invalid key (with a space) triggered "
|
||||
"multiple MODE responses: {}",
|
||||
extra_format=(replies,),
|
||||
)
|
||||
|
||||
mode_command = mode_commands[0]
|
||||
if mode_command.params == ["#chan", "+k", "passphrase"]:
|
||||
key = "passphrase"
|
||||
elif mode_command.params == ["#chan", "+k", "passphrasewithspaces"]:
|
||||
key = "passphrasewithspaces"
|
||||
elif mode_command.params[2].startswith("longlonglong"):
|
||||
key = mode_command.params[2]
|
||||
assert mode_command.params == ["#chan", "+k", key]
|
||||
elif mode_command.params == ["#chan", "+k", "passphrase with spaces"]:
|
||||
raise self.failureException("Invalid key (with a space) was not rejected.")
|
||||
|
||||
self.connectClient("foo")
|
||||
self.sendLine(2, f"JOIN #chan {key}")
|
||||
self.assertMessageMatch(self.getMessage(2), command="JOIN", params=["#chan"])
|
37
irctest/server_tests/chmodes/moderated.py
Normal file
37
irctest/server_tests/chmodes/moderated.py
Normal file
@ -0,0 +1,37 @@
|
||||
from irctest import cases
|
||||
from irctest.numerics import ERR_CANNOTSENDTOCHAN
|
||||
|
||||
|
||||
class ModeratedMode(cases.BaseServerTestCase):
|
||||
@cases.mark_specifications("RFC2812")
|
||||
def testModeratedMode(self):
|
||||
# test the +m channel mode
|
||||
self.connectClient("chanop", name="chanop")
|
||||
self.joinChannel("chanop", "#chan")
|
||||
self.getMessages("chanop")
|
||||
self.sendLine("chanop", "MODE #chan +m")
|
||||
replies = self.getMessages("chanop")
|
||||
modeLines = [line for line in replies if line.command == "MODE"]
|
||||
self.assertMessageMatch(modeLines[0], command="MODE", params=["#chan", "+m"])
|
||||
|
||||
self.connectClient("baz", name="baz")
|
||||
self.joinChannel("baz", "#chan")
|
||||
self.getMessages("chanop")
|
||||
# this message should be suppressed completely by +m
|
||||
self.sendLine("baz", "PRIVMSG #chan :hi from baz")
|
||||
replies = self.getMessages("baz")
|
||||
reply_cmds = {reply.command for reply in replies}
|
||||
self.assertIn(ERR_CANNOTSENDTOCHAN, reply_cmds)
|
||||
self.assertEqual(self.getMessages("chanop"), [])
|
||||
|
||||
# grant +v, user should be able to send messages
|
||||
self.sendLine("chanop", "MODE #chan +v baz")
|
||||
self.getMessages("chanop")
|
||||
self.getMessages("baz")
|
||||
self.sendLine("baz", "PRIVMSG #chan :hi again from baz")
|
||||
self.getMessages("baz")
|
||||
relays = self.getMessages("chanop")
|
||||
relay = relays[0]
|
||||
self.assertMessageMatch(
|
||||
relay, command="PRIVMSG", params=["#chan", "hi again from baz"]
|
||||
)
|
267
irctest/server_tests/chmodes/mute_extban.py
Normal file
267
irctest/server_tests/chmodes/mute_extban.py
Normal file
@ -0,0 +1,267 @@
|
||||
from irctest import cases, runner
|
||||
from irctest.numerics import ERR_CANNOTSENDTOCHAN, ERR_CHANOPRIVSNEEDED
|
||||
from irctest.patma import ANYLIST, StrRe
|
||||
|
||||
|
||||
class MuteExtban(cases.BaseServerTestCase):
|
||||
"""https://defs.ircdocs.horse/defs/isupport.html#extban
|
||||
|
||||
It magically guesses what char the IRCd uses for mutes."""
|
||||
|
||||
def char(self):
|
||||
if self.controller.extban_mute_char is None:
|
||||
raise runner.ExtbanNotSupported("", "mute")
|
||||
else:
|
||||
return self.controller.extban_mute_char
|
||||
|
||||
@cases.mark_specifications("Ergo")
|
||||
def testISupport(self):
|
||||
self.connectClient(1) # Fetches ISUPPORT
|
||||
isupport = self.server_support
|
||||
token = isupport["EXTBAN"]
|
||||
prefix, comma, types = token.partition(",")
|
||||
self.assertIn(self.char(), types, f"Missing '{self.char()}' in ISUPPORT EXTBAN")
|
||||
self.assertEqual(prefix, "")
|
||||
self.assertEqual(comma, ",")
|
||||
|
||||
@cases.mark_specifications("ircdocs")
|
||||
def testMuteExtban(self):
|
||||
"""Basic usage of mute"""
|
||||
self.connectClient("chanop", name="chanop")
|
||||
|
||||
isupport = self.server_support
|
||||
token = isupport.get("EXTBAN", "")
|
||||
prefix, comma, types = token.partition(",")
|
||||
if self.char() not in types:
|
||||
raise runner.ExtbanNotSupported(self.char(), "mute")
|
||||
|
||||
clients = ("chanop", "bar")
|
||||
|
||||
# Mute "bar"
|
||||
self.joinChannel("chanop", "#chan")
|
||||
self.getMessages("chanop")
|
||||
self.sendLine("chanop", f"MODE #chan +b {prefix}{self.char()}:bar!*@*")
|
||||
replies = {msg.command for msg in self.getMessages("chanop")}
|
||||
self.assertIn("MODE", replies)
|
||||
self.assertNotIn(ERR_CHANOPRIVSNEEDED, replies)
|
||||
|
||||
self.connectClient("bar", name="bar", capabilities=["echo-message"])
|
||||
self.joinChannel("bar", "#chan")
|
||||
|
||||
for client in clients:
|
||||
self.getMessages(client)
|
||||
|
||||
# "bar" sees the MODE too
|
||||
self.sendLine("bar", "MODE #chan +b")
|
||||
self.assertMessageMatch(
|
||||
self.getMessage("bar"),
|
||||
command="367",
|
||||
params=[
|
||||
"bar",
|
||||
"#chan",
|
||||
f"{prefix}{self.char()}:bar!*@*",
|
||||
StrRe("chanop(!.*)?"),
|
||||
*ANYLIST,
|
||||
],
|
||||
)
|
||||
self.getMessages("bar")
|
||||
|
||||
# "bar" talks: rejected
|
||||
self.sendLine("bar", "PRIVMSG #chan :hi from bar")
|
||||
replies = self.getMessages("bar")
|
||||
replies_cmds = {msg.command for msg in replies}
|
||||
self.assertNotIn("PRIVMSG", replies_cmds)
|
||||
self.assertIn(ERR_CANNOTSENDTOCHAN, replies_cmds)
|
||||
self.assertEqual(self.getMessages("chanop"), [])
|
||||
|
||||
# remove mute on "bar" with -b
|
||||
self.getMessages("chanop")
|
||||
self.sendLine("chanop", f"MODE #chan -b {prefix}{self.char()}:bar!*@*")
|
||||
replies = {msg.command for msg in self.getMessages("chanop")}
|
||||
self.assertIn("MODE", replies)
|
||||
self.assertNotIn(ERR_CHANOPRIVSNEEDED, replies)
|
||||
|
||||
# "bar" can now talk
|
||||
self.sendLine("bar", "PRIVMSG #chan :hi again from bar")
|
||||
replies = self.getMessages("bar")
|
||||
replies_cmds = {msg.command for msg in replies}
|
||||
self.assertIn("PRIVMSG", replies_cmds)
|
||||
self.assertNotIn(ERR_CANNOTSENDTOCHAN, replies_cmds)
|
||||
self.assertEqual(
|
||||
self.getMessages("chanop"),
|
||||
[msg for msg in replies if msg.command == "PRIVMSG"],
|
||||
)
|
||||
|
||||
@cases.mark_specifications("ircdocs")
|
||||
def testMuteExtbanVoiced(self):
|
||||
"""Checks +v overrides the mute"""
|
||||
self.connectClient("chanop", name="chanop")
|
||||
|
||||
isupport = self.server_support
|
||||
token = isupport.get("EXTBAN", "")
|
||||
prefix, comma, types = token.partition(",")
|
||||
if self.char() not in types:
|
||||
raise runner.ExtbanNotSupported(self.char(), "mute")
|
||||
|
||||
clients = ("chanop", "qux")
|
||||
|
||||
# Mute "qux"
|
||||
self.joinChannel("chanop", "#chan")
|
||||
self.getMessages("chanop")
|
||||
self.sendLine("chanop", f"MODE #chan +b {prefix}{self.char()}:qux!*@*")
|
||||
replies = {msg.command for msg in self.getMessages("chanop")}
|
||||
self.assertIn("MODE", replies)
|
||||
self.assertNotIn(ERR_CHANOPRIVSNEEDED, replies)
|
||||
|
||||
self.connectClient(
|
||||
"qux", name="qux", ident="evan", capabilities=["echo-message"]
|
||||
)
|
||||
self.joinChannel("qux", "#chan")
|
||||
|
||||
for client in clients:
|
||||
self.getMessages(client)
|
||||
|
||||
# "qux" talks: rejected
|
||||
self.sendLine("qux", "PRIVMSG #chan :hi from qux")
|
||||
replies = self.getMessages("qux")
|
||||
replies_cmds = {msg.command for msg in replies}
|
||||
self.assertNotIn("PRIVMSG", replies_cmds)
|
||||
self.assertIn(ERR_CANNOTSENDTOCHAN, replies_cmds)
|
||||
self.assertEqual(self.getMessages("chanop"), [])
|
||||
|
||||
for client in clients:
|
||||
self.getMessages(client)
|
||||
|
||||
# +v grants an exemption to +b
|
||||
self.sendLine("chanop", "MODE #chan +v qux")
|
||||
replies = {msg.command for msg in self.getMessages("chanop")}
|
||||
self.assertIn("MODE", replies)
|
||||
self.assertNotIn(ERR_CHANOPRIVSNEEDED, replies)
|
||||
|
||||
# so "qux" can now talk
|
||||
self.sendLine("qux", "PRIVMSG #chan :hi again from qux")
|
||||
replies = self.getMessages("qux")
|
||||
replies_cmds = {msg.command for msg in replies}
|
||||
self.assertIn("PRIVMSG", replies_cmds)
|
||||
self.assertNotIn(ERR_CANNOTSENDTOCHAN, replies_cmds)
|
||||
self.assertEqual(
|
||||
self.getMessages("chanop"),
|
||||
[msg for msg in replies if msg.command == "PRIVMSG"],
|
||||
)
|
||||
|
||||
@cases.mark_specifications("ircdocs")
|
||||
def testMuteExtbanExempt(self):
|
||||
"""Checks +e overrides the mute
|
||||
|
||||
<https://defs.ircdocs.horse/defs/chanmodes.html#e-ban-exception>"""
|
||||
self.connectClient("chanop", name="chanop")
|
||||
|
||||
isupport = self.server_support
|
||||
token = isupport.get("EXTBAN", "")
|
||||
prefix, comma, types = token.partition(",")
|
||||
if self.char() not in types:
|
||||
raise runner.ExtbanNotSupported(self.char(), "mute")
|
||||
if "e" not in self.server_support["CHANMODES"]:
|
||||
raise runner.ChannelModeNotSupported(self.char(), "mute")
|
||||
|
||||
clients = ("chanop", "qux")
|
||||
|
||||
# Mute "qux"
|
||||
self.joinChannel("chanop", "#chan")
|
||||
self.getMessages("chanop")
|
||||
self.sendLine("chanop", f"MODE #chan +b {prefix}{self.char()}:qux!*@*")
|
||||
replies = {msg.command for msg in self.getMessages("chanop")}
|
||||
self.assertIn("MODE", replies)
|
||||
self.assertNotIn(ERR_CHANOPRIVSNEEDED, replies)
|
||||
|
||||
self.connectClient(
|
||||
"qux", name="qux", ident="evan", capabilities=["echo-message"]
|
||||
)
|
||||
self.joinChannel("qux", "#chan")
|
||||
|
||||
for client in clients:
|
||||
self.getMessages(client)
|
||||
|
||||
# "qux" talks: rejected
|
||||
self.sendLine("qux", "PRIVMSG #chan :hi from qux")
|
||||
replies = self.getMessages("qux")
|
||||
replies_cmds = {msg.command for msg in replies}
|
||||
self.assertNotIn("PRIVMSG", replies_cmds)
|
||||
self.assertIn(ERR_CANNOTSENDTOCHAN, replies_cmds)
|
||||
self.assertEqual(self.getMessages("chanop"), [])
|
||||
|
||||
for client in clients:
|
||||
self.getMessages(client)
|
||||
|
||||
# +e grants an exemption to +b
|
||||
self.sendLine("chanop", f"MODE #chan +e {prefix}{self.char()}:*!~evan@*")
|
||||
replies = {msg.command for msg in self.getMessages("chanop")}
|
||||
self.assertIn("MODE", replies)
|
||||
self.assertNotIn(ERR_CHANOPRIVSNEEDED, replies)
|
||||
|
||||
self.getMessages("qux")
|
||||
|
||||
# so "qux" can now talk
|
||||
self.sendLine("qux", "PRIVMSG #chan :thanks for mute-excepting me")
|
||||
replies = self.getMessages("qux")
|
||||
replies_cmds = {msg.command for msg in replies}
|
||||
self.assertIn("PRIVMSG", replies_cmds)
|
||||
self.assertNotIn(ERR_CANNOTSENDTOCHAN, replies_cmds)
|
||||
self.assertEqual(
|
||||
self.getMessages("chanop"),
|
||||
[msg for msg in replies if msg.command == "PRIVMSG"],
|
||||
)
|
||||
|
||||
@cases.mark_specifications("Ergo")
|
||||
def testCapitalization(self):
|
||||
"""
|
||||
Regression test for oragono #1370: mutes not correctly enforced against
|
||||
users with capital letters in their NUH
|
||||
|
||||
For consistency with regular -b, which allows unsetting up to
|
||||
normalization
|
||||
"""
|
||||
clients = ("chanop", "bar")
|
||||
|
||||
self.connectClient("chanop", name="chanop")
|
||||
|
||||
isupport = self.server_support
|
||||
token = isupport.get("EXTBAN", "")
|
||||
prefix, comma, types = token.partition(",")
|
||||
|
||||
self.joinChannel("chanop", "#chan")
|
||||
self.getMessages("chanop")
|
||||
self.sendLine("chanop", f"MODE #chan +b {prefix}{self.char()}:BAR!*@*")
|
||||
replies = {msg.command for msg in self.getMessages("chanop")}
|
||||
self.assertIn("MODE", replies)
|
||||
self.assertNotIn(ERR_CHANOPRIVSNEEDED, replies)
|
||||
|
||||
self.connectClient("Bar", name="bar", capabilities=["echo-message"])
|
||||
self.joinChannel("bar", "#chan")
|
||||
|
||||
for client in clients:
|
||||
self.getMessages(client)
|
||||
|
||||
self.sendLine("bar", "PRIVMSG #chan :hi from bar")
|
||||
replies = self.getMessages("bar")
|
||||
replies_cmds = {msg.command for msg in replies}
|
||||
self.assertNotIn("PRIVMSG", replies_cmds)
|
||||
self.assertIn(ERR_CANNOTSENDTOCHAN, replies_cmds)
|
||||
self.assertEqual(self.getMessages("chanop"), [])
|
||||
|
||||
# remove mute with -b
|
||||
self.sendLine("chanop", f"MODE #chan -b {prefix}{self.char()}:bar!*@*")
|
||||
replies = {msg.command for msg in self.getMessages("chanop")}
|
||||
self.assertIn("MODE", replies)
|
||||
self.assertNotIn(ERR_CHANOPRIVSNEEDED, replies)
|
||||
|
||||
# "bar" can talk again
|
||||
self.sendLine("bar", "PRIVMSG #chan :hi again from bar")
|
||||
replies = self.getMessages("bar")
|
||||
replies_cmds = {msg.command for msg in replies}
|
||||
self.assertIn("PRIVMSG", replies_cmds)
|
||||
self.assertNotIn(ERR_CANNOTSENDTOCHAN, replies_cmds)
|
||||
self.assertEqual(
|
||||
self.getMessages("chanop"),
|
||||
[msg for msg in replies if msg.command == "PRIVMSG"],
|
||||
)
|
@ -1,720 +0,0 @@
|
||||
import math
|
||||
import time
|
||||
|
||||
import pytest
|
||||
|
||||
from irctest import cases, runner
|
||||
from irctest.irc_utils.junkdrawer import ircv3_timestamp_to_unixtime
|
||||
from irctest.numerics import (
|
||||
ERR_BADCHANNELKEY,
|
||||
ERR_BANNEDFROMCHAN,
|
||||
ERR_CANNOTSENDTOCHAN,
|
||||
ERR_CHANOPRIVSNEEDED,
|
||||
ERR_INVALIDKEY,
|
||||
ERR_INVALIDMODEPARAM,
|
||||
ERR_UNKNOWNERROR,
|
||||
RPL_NAMREPLY,
|
||||
)
|
||||
from irctest.patma import ANYLIST, ANYSTR, StrRe
|
||||
|
||||
MODERN_CAPS = [
|
||||
"server-time",
|
||||
"message-tags",
|
||||
"batch",
|
||||
"labeled-response",
|
||||
"echo-message",
|
||||
"account-tag",
|
||||
]
|
||||
|
||||
|
||||
class KeyTestCase(cases.BaseServerTestCase):
|
||||
@cases.mark_specifications("RFC1459", "RFC2812")
|
||||
def testKeyNormal(self):
|
||||
self.connectClient("bar")
|
||||
self.joinChannel(1, "#chan")
|
||||
self.sendLine(1, "MODE #chan +k beer")
|
||||
self.getMessages(1)
|
||||
|
||||
self.connectClient("qux")
|
||||
self.getMessages(2)
|
||||
self.sendLine(2, "JOIN #chan")
|
||||
reply = self.getMessages(2)
|
||||
self.assertNotIn("JOIN", {msg.command for msg in reply})
|
||||
self.assertIn(ERR_BADCHANNELKEY, {msg.command for msg in reply})
|
||||
|
||||
self.sendLine(2, "JOIN #chan beer")
|
||||
reply = self.getMessages(2)
|
||||
self.assertMessageMatch(reply[0], command="JOIN", params=["#chan"])
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"key",
|
||||
["passphrase with spaces", "long" * 100, ""],
|
||||
ids=["spaces", "long", "empty"],
|
||||
)
|
||||
@cases.mark_specifications("RFC2812", "Modern")
|
||||
def testKeyValidation(self, key):
|
||||
"""
|
||||
key = 1*23( %x01-05 / %x07-08 / %x0C / %x0E-1F / %x21-7F )
|
||||
; any 7-bit US_ASCII character,
|
||||
; except NUL, CR, LF, FF, h/v TABs, and " "
|
||||
-- https://tools.ietf.org/html/rfc2812#page-8
|
||||
|
||||
"Servers may validate the value (eg. to forbid spaces, as they make it harder
|
||||
to use the key in `JOIN` messages). If the value is invalid, they SHOULD
|
||||
return [`ERR_INVALIDMODEPARAM`](#errinvalidmodeparam-696).
|
||||
However, clients MUST be able to handle any of the following:
|
||||
|
||||
* [`ERR_INVALIDMODEPARAM`](#errinvalidmodeparam-696)
|
||||
* [`ERR_INVALIDKEY`](#errinvalidkey-525)
|
||||
* `MODE` echoed with a different key (eg. truncated or stripped of invalid
|
||||
characters)
|
||||
* the key changed ignored, and no `MODE` echoed if no other mode change
|
||||
was valid.
|
||||
"
|
||||
-- https://modern.ircdocs.horse/#key-channel-mode
|
||||
-- https://github.com/ircdocs/modern-irc/pull/111
|
||||
"""
|
||||
self.connectClient("bar")
|
||||
self.joinChannel(1, "#chan")
|
||||
self.sendLine(1, f"MODE #chan +k :{key}")
|
||||
|
||||
# The spec requires no space; but doesn't say what to do
|
||||
# if there is one.
|
||||
# Let's check the various alternatives
|
||||
|
||||
replies = self.getMessages(1)
|
||||
self.assertNotIn(
|
||||
ERR_UNKNOWNERROR,
|
||||
{msg.command for msg in replies},
|
||||
fail_msg="Sending an invalid key caused an "
|
||||
"ERR_UNKNOWNERROR instead of being handled explicitly "
|
||||
"(eg. ERR_INVALIDMODEPARAM or truncation): {msg}",
|
||||
)
|
||||
|
||||
commands = {msg.command for msg in replies}
|
||||
if {ERR_INVALIDMODEPARAM, ERR_INVALIDKEY} & commands:
|
||||
# First option: ERR_INVALIDMODEPARAM (eg. Ergo) or ERR_INVALIDKEY
|
||||
# (eg. ircu2)
|
||||
if ERR_INVALIDMODEPARAM in commands:
|
||||
command = [
|
||||
msg for msg in replies if msg.command == ERR_INVALIDMODEPARAM
|
||||
]
|
||||
self.assertEqual(len(command), 1, command)
|
||||
self.assertMessageMatch(
|
||||
command[0],
|
||||
command=ERR_INVALIDMODEPARAM,
|
||||
params=["bar", "#chan", "k", "*", ANYSTR],
|
||||
)
|
||||
return
|
||||
|
||||
if not replies:
|
||||
# MODE was ignored entirely
|
||||
self.connectClient("foo")
|
||||
self.sendLine(2, "JOIN #chan")
|
||||
self.assertMessageMatch(
|
||||
self.getMessage(2), command="JOIN", params=["#chan"]
|
||||
)
|
||||
return
|
||||
|
||||
# Second and third options: truncating the key (eg. UnrealIRCd)
|
||||
# or replacing spaces (eg. Charybdis)
|
||||
mode_commands = [msg for msg in replies if msg.command == "MODE"]
|
||||
self.assertGreaterEqual(
|
||||
len(mode_commands),
|
||||
1,
|
||||
fail_msg="Sending an invalid key (with a space) triggered "
|
||||
"neither ERR_UNKNOWNERROR, ERR_INVALIDMODEPARAM, ERR_INVALIDKEY, "
|
||||
" or a MODE. Only these: {}",
|
||||
extra_format=(replies,),
|
||||
)
|
||||
self.assertLessEqual(
|
||||
len(mode_commands),
|
||||
1,
|
||||
fail_msg="Sending an invalid key (with a space) triggered "
|
||||
"multiple MODE responses: {}",
|
||||
extra_format=(replies,),
|
||||
)
|
||||
|
||||
mode_command = mode_commands[0]
|
||||
if mode_command.params == ["#chan", "+k", "passphrase"]:
|
||||
key = "passphrase"
|
||||
elif mode_command.params == ["#chan", "+k", "passphrasewithspaces"]:
|
||||
key = "passphrasewithspaces"
|
||||
elif mode_command.params[2].startswith("longlonglong"):
|
||||
key = mode_command.params[2]
|
||||
assert mode_command.params == ["#chan", "+k", key]
|
||||
elif mode_command.params == ["#chan", "+k", "passphrase with spaces"]:
|
||||
raise self.failureException("Invalid key (with a space) was not rejected.")
|
||||
|
||||
self.connectClient("foo")
|
||||
self.sendLine(2, f"JOIN #chan {key}")
|
||||
self.assertMessageMatch(self.getMessage(2), command="JOIN", params=["#chan"])
|
||||
|
||||
|
||||
class AuditoriumTestCase(cases.BaseServerTestCase):
|
||||
@cases.mark_specifications("Ergo")
|
||||
def testAuditorium(self):
|
||||
self.connectClient("bar", name="bar", capabilities=MODERN_CAPS)
|
||||
self.joinChannel("bar", "#auditorium")
|
||||
self.getMessages("bar")
|
||||
self.sendLine("bar", "MODE #auditorium +u")
|
||||
modelines = [msg for msg in self.getMessages("bar") if msg.command == "MODE"]
|
||||
self.assertEqual(len(modelines), 1)
|
||||
self.assertMessageMatch(modelines[0], params=["#auditorium", "+u"])
|
||||
|
||||
self.connectClient("guest1", name="guest1", capabilities=MODERN_CAPS)
|
||||
self.joinChannel("guest1", "#auditorium")
|
||||
self.getMessages("guest1")
|
||||
# chanop should get a JOIN message
|
||||
join_msgs = [msg for msg in self.getMessages("bar") if msg.command == "JOIN"]
|
||||
self.assertEqual(len(join_msgs), 1)
|
||||
self.assertMessageMatch(join_msgs[0], nick="guest1", params=["#auditorium"])
|
||||
|
||||
self.connectClient("guest2", name="guest2", capabilities=MODERN_CAPS)
|
||||
self.joinChannel("guest2", "#auditorium")
|
||||
self.getMessages("guest2")
|
||||
# chanop should get a JOIN message
|
||||
join_msgs = [msg for msg in self.getMessages("bar") if msg.command == "JOIN"]
|
||||
self.assertEqual(len(join_msgs), 1)
|
||||
join_msg = join_msgs[0]
|
||||
self.assertMessageMatch(join_msg, nick="guest2", params=["#auditorium"])
|
||||
# oragono/oragono#1642 ; msgid should be populated,
|
||||
# and the time tag should be sane
|
||||
self.assertTrue(join_msg.tags.get("msgid"))
|
||||
self.assertLessEqual(
|
||||
math.fabs(time.time() - ircv3_timestamp_to_unixtime(join_msg.tags["time"])),
|
||||
60.0,
|
||||
)
|
||||
# fellow unvoiced participant should not
|
||||
unvoiced_join_msgs = [
|
||||
msg for msg in self.getMessages("guest1") if msg.command == "JOIN"
|
||||
]
|
||||
self.assertEqual(len(unvoiced_join_msgs), 0)
|
||||
|
||||
self.connectClient("guest3", name="guest3", capabilities=MODERN_CAPS)
|
||||
self.joinChannel("guest3", "#auditorium")
|
||||
self.getMessages("guest3")
|
||||
|
||||
self.sendLine("bar", "PRIVMSG #auditorium hi")
|
||||
echo_message = [
|
||||
msg for msg in self.getMessages("bar") if msg.command == "PRIVMSG"
|
||||
][0]
|
||||
self.assertEqual(echo_message, self.getMessages("guest1")[0])
|
||||
self.assertEqual(echo_message, self.getMessages("guest2")[0])
|
||||
self.assertEqual(echo_message, self.getMessages("guest3")[0])
|
||||
|
||||
# unvoiced users can speak
|
||||
self.sendLine("guest1", "PRIVMSG #auditorium :hi you")
|
||||
echo_message = [
|
||||
msg for msg in self.getMessages("guest1") if msg.command == "PRIVMSG"
|
||||
][0]
|
||||
self.assertEqual(self.getMessages("bar"), [echo_message])
|
||||
self.assertEqual(self.getMessages("guest2"), [echo_message])
|
||||
self.assertEqual(self.getMessages("guest3"), [echo_message])
|
||||
|
||||
def names(client):
|
||||
self.sendLine(client, "NAMES #auditorium")
|
||||
result = set()
|
||||
for msg in self.getMessages(client):
|
||||
if msg.command == RPL_NAMREPLY:
|
||||
result.update(msg.params[-1].split())
|
||||
return result
|
||||
|
||||
self.assertEqual(names("bar"), {"@bar", "guest1", "guest2", "guest3"})
|
||||
self.assertEqual(names("guest1"), {"@bar"})
|
||||
self.assertEqual(names("guest2"), {"@bar"})
|
||||
self.assertEqual(names("guest3"), {"@bar"})
|
||||
|
||||
self.sendLine("bar", "MODE #auditorium +v guest1")
|
||||
modeLine = [msg for msg in self.getMessages("bar") if msg.command == "MODE"][0]
|
||||
self.assertEqual(self.getMessages("guest1"), [modeLine])
|
||||
self.assertEqual(self.getMessages("guest2"), [modeLine])
|
||||
self.assertEqual(self.getMessages("guest3"), [modeLine])
|
||||
self.assertEqual(names("bar"), {"@bar", "+guest1", "guest2", "guest3"})
|
||||
self.assertEqual(names("guest2"), {"@bar", "+guest1"})
|
||||
self.assertEqual(names("guest3"), {"@bar", "+guest1"})
|
||||
|
||||
self.sendLine("guest1", "PART #auditorium")
|
||||
part = [msg for msg in self.getMessages("guest1") if msg.command == "PART"][0]
|
||||
# everyone should see voiced PART
|
||||
self.assertEqual(self.getMessages("bar")[0], part)
|
||||
self.assertEqual(self.getMessages("guest2")[0], part)
|
||||
self.assertEqual(self.getMessages("guest3")[0], part)
|
||||
|
||||
self.joinChannel("guest1", "#auditorium")
|
||||
self.getMessages("guest1")
|
||||
self.getMessages("bar")
|
||||
|
||||
self.sendLine("guest2", "PART #auditorium")
|
||||
part = [msg for msg in self.getMessages("guest2") if msg.command == "PART"][0]
|
||||
self.assertEqual(self.getMessages("bar"), [part])
|
||||
# part should be hidden from unvoiced participants
|
||||
self.assertEqual(self.getMessages("guest1"), [])
|
||||
self.assertEqual(self.getMessages("guest3"), [])
|
||||
|
||||
self.sendLine("guest3", "QUIT")
|
||||
self.assertDisconnected("guest3")
|
||||
# quit should be hidden from unvoiced participants
|
||||
self.assertEqual(
|
||||
len([msg for msg in self.getMessages("bar") if msg.command == "QUIT"]), 1
|
||||
)
|
||||
self.assertEqual(
|
||||
len([msg for msg in self.getMessages("guest1") if msg.command == "QUIT"]), 0
|
||||
)
|
||||
|
||||
|
||||
class BanMode(cases.BaseServerTestCase):
|
||||
@cases.mark_specifications("RFC1459", "RFC2812")
|
||||
def testBan(self):
|
||||
"""Basic ban operation"""
|
||||
self.connectClient("chanop", name="chanop")
|
||||
self.joinChannel("chanop", "#chan")
|
||||
self.getMessages("chanop")
|
||||
self.sendLine("chanop", "MODE #chan +b bar!*@*")
|
||||
self.assertMessageMatch(self.getMessage("chanop"), command="MODE")
|
||||
|
||||
self.connectClient(
|
||||
"Bar", name="bar", capabilities=["echo-message"], skip_if_cap_nak=True
|
||||
)
|
||||
self.getMessages("bar")
|
||||
self.sendLine("bar", "JOIN #chan")
|
||||
self.assertMessageMatch(self.getMessage("bar"), command=ERR_BANNEDFROMCHAN)
|
||||
|
||||
self.sendLine("chanop", "MODE #chan -b bar!*@*")
|
||||
self.assertMessageMatch(self.getMessage("chanop"), command="MODE")
|
||||
|
||||
self.sendLine("bar", "JOIN #chan")
|
||||
self.assertMessageMatch(self.getMessage("bar"), command="JOIN")
|
||||
|
||||
@cases.mark_specifications("Ergo")
|
||||
def testCaseInsensitive(self):
|
||||
"""Some clients allow unsetting modes if their argument matches
|
||||
up to normalization"""
|
||||
self.connectClient("chanop", name="chanop")
|
||||
self.joinChannel("chanop", "#chan")
|
||||
self.getMessages("chanop")
|
||||
self.sendLine("chanop", "MODE #chan +b BAR!*@*")
|
||||
self.assertMessageMatch(self.getMessage("chanop"), command="MODE")
|
||||
|
||||
self.connectClient("Bar", name="bar", capabilities=["echo-message"])
|
||||
self.getMessages("bar")
|
||||
self.sendLine("bar", "JOIN #chan")
|
||||
self.assertMessageMatch(self.getMessage("bar"), command=ERR_BANNEDFROMCHAN)
|
||||
|
||||
self.sendLine("chanop", "MODE #chan -b bar!*@*")
|
||||
self.assertMessageMatch(self.getMessage("chanop"), command="MODE")
|
||||
|
||||
self.sendLine("bar", "JOIN #chan")
|
||||
self.assertMessageMatch(self.getMessage("bar"), command="JOIN")
|
||||
|
||||
|
||||
class ModeratedMode(cases.BaseServerTestCase):
|
||||
@cases.mark_specifications("RFC2812")
|
||||
def testModeratedMode(self):
|
||||
# test the +m channel mode
|
||||
self.connectClient("chanop", name="chanop")
|
||||
self.joinChannel("chanop", "#chan")
|
||||
self.getMessages("chanop")
|
||||
self.sendLine("chanop", "MODE #chan +m")
|
||||
replies = self.getMessages("chanop")
|
||||
modeLines = [line for line in replies if line.command == "MODE"]
|
||||
self.assertMessageMatch(modeLines[0], command="MODE", params=["#chan", "+m"])
|
||||
|
||||
self.connectClient("baz", name="baz")
|
||||
self.joinChannel("baz", "#chan")
|
||||
self.getMessages("chanop")
|
||||
# this message should be suppressed completely by +m
|
||||
self.sendLine("baz", "PRIVMSG #chan :hi from baz")
|
||||
replies = self.getMessages("baz")
|
||||
reply_cmds = {reply.command for reply in replies}
|
||||
self.assertIn(ERR_CANNOTSENDTOCHAN, reply_cmds)
|
||||
self.assertEqual(self.getMessages("chanop"), [])
|
||||
|
||||
# grant +v, user should be able to send messages
|
||||
self.sendLine("chanop", "MODE #chan +v baz")
|
||||
self.getMessages("chanop")
|
||||
self.getMessages("baz")
|
||||
self.sendLine("baz", "PRIVMSG #chan :hi again from baz")
|
||||
self.getMessages("baz")
|
||||
relays = self.getMessages("chanop")
|
||||
relay = relays[0]
|
||||
self.assertMessageMatch(
|
||||
relay, command="PRIVMSG", params=["#chan", "hi again from baz"]
|
||||
)
|
||||
|
||||
|
||||
@cases.mark_services
|
||||
class RegisteredOnlySpeakMode(cases.BaseServerTestCase):
|
||||
@cases.mark_specifications("Ergo")
|
||||
def testRegisteredOnlySpeakMode(self):
|
||||
self.controller.registerUser(self, "evan", "sesame")
|
||||
|
||||
# test the +M (only registered users and ops can speak) channel mode
|
||||
self.connectClient("chanop", name="chanop")
|
||||
self.joinChannel("chanop", "#chan")
|
||||
self.getMessages("chanop")
|
||||
self.sendLine("chanop", "MODE #chan +M")
|
||||
replies = self.getMessages("chanop")
|
||||
modeLines = [line for line in replies if line.command == "MODE"]
|
||||
self.assertMessageMatch(modeLines[0], command="MODE", params=["#chan", "+M"])
|
||||
|
||||
self.connectClient("baz", name="baz")
|
||||
self.joinChannel("baz", "#chan")
|
||||
self.getMessages("chanop")
|
||||
# this message should be suppressed completely by +M
|
||||
self.sendLine("baz", "PRIVMSG #chan :hi from baz")
|
||||
replies = self.getMessages("baz")
|
||||
reply_cmds = {reply.command for reply in replies}
|
||||
self.assertIn(ERR_CANNOTSENDTOCHAN, reply_cmds)
|
||||
self.assertEqual(self.getMessages("chanop"), [])
|
||||
|
||||
# +v exempts users from the registration requirement:
|
||||
self.sendLine("chanop", "MODE #chan +v baz")
|
||||
self.getMessages("chanop")
|
||||
self.getMessages("baz")
|
||||
self.sendLine("baz", "PRIVMSG #chan :hi again from baz")
|
||||
replies = self.getMessages("baz")
|
||||
# baz should not receive an error (or an echo)
|
||||
self.assertEqual(replies, [])
|
||||
replies = self.getMessages("chanop")
|
||||
self.assertMessageMatch(
|
||||
replies[0], command="PRIVMSG", params=["#chan", "hi again from baz"]
|
||||
)
|
||||
|
||||
self.connectClient(
|
||||
"evan",
|
||||
name="evan",
|
||||
account="evan",
|
||||
password="sesame",
|
||||
capabilities=["sasl"],
|
||||
)
|
||||
self.joinChannel("evan", "#chan")
|
||||
self.getMessages("baz")
|
||||
self.sendLine("evan", "PRIVMSG #chan :hi from evan")
|
||||
replies = self.getMessages("evan")
|
||||
# evan should not receive an error (or an echo)
|
||||
self.assertEqual(replies, [])
|
||||
replies = self.getMessages("baz")
|
||||
self.assertMessageMatch(
|
||||
replies[0], command="PRIVMSG", params=["#chan", "hi from evan"]
|
||||
)
|
||||
|
||||
|
||||
class OpModerated(cases.BaseServerTestCase):
|
||||
@cases.mark_specifications("Ergo")
|
||||
def testOpModerated(self):
|
||||
# test the +U channel mode
|
||||
self.connectClient("chanop", name="chanop", capabilities=MODERN_CAPS)
|
||||
self.joinChannel("chanop", "#chan")
|
||||
self.getMessages("chanop")
|
||||
self.sendLine("chanop", "MODE #chan +U")
|
||||
replies = {msg.command for msg in self.getMessages("chanop")}
|
||||
self.assertIn("MODE", replies)
|
||||
self.assertNotIn(ERR_CHANOPRIVSNEEDED, replies)
|
||||
|
||||
self.connectClient("baz", name="baz", capabilities=MODERN_CAPS)
|
||||
self.joinChannel("baz", "#chan")
|
||||
self.sendLine("baz", "PRIVMSG #chan :hi from baz")
|
||||
echo = self.getMessages("baz")[0]
|
||||
self.assertMessageMatch(
|
||||
echo, command="PRIVMSG", params=["#chan", "hi from baz"]
|
||||
)
|
||||
self.assertEqual(
|
||||
[msg for msg in self.getMessages("chanop") if msg.command == "PRIVMSG"],
|
||||
[echo],
|
||||
)
|
||||
|
||||
self.connectClient("qux", name="qux", capabilities=MODERN_CAPS)
|
||||
self.joinChannel("qux", "#chan")
|
||||
self.sendLine("qux", "PRIVMSG #chan :hi from qux")
|
||||
echo = self.getMessages("qux")[0]
|
||||
self.assertMessageMatch(
|
||||
echo, command="PRIVMSG", params=["#chan", "hi from qux"]
|
||||
)
|
||||
# message is relayed to chanop but not to unprivileged
|
||||
self.assertEqual(
|
||||
[msg for msg in self.getMessages("chanop") if msg.command == "PRIVMSG"],
|
||||
[echo],
|
||||
)
|
||||
self.assertEqual(
|
||||
[msg for msg in self.getMessages("baz") if msg.command == "PRIVMSG"], []
|
||||
)
|
||||
|
||||
self.sendLine("chanop", "MODE #chan +v qux")
|
||||
self.getMessages("chanop")
|
||||
self.sendLine("qux", "PRIVMSG #chan :hi again from qux")
|
||||
echo = [msg for msg in self.getMessages("qux") if msg.command == "PRIVMSG"][0]
|
||||
self.assertMessageMatch(
|
||||
echo, command="PRIVMSG", params=["#chan", "hi again from qux"]
|
||||
)
|
||||
self.assertEqual(
|
||||
[msg for msg in self.getMessages("chanop") if msg.command == "PRIVMSG"],
|
||||
[echo],
|
||||
)
|
||||
self.assertEqual(
|
||||
[msg for msg in self.getMessages("baz") if msg.command == "PRIVMSG"], [echo]
|
||||
)
|
||||
|
||||
|
||||
class MuteExtban(cases.BaseServerTestCase):
|
||||
"""https://defs.ircdocs.horse/defs/isupport.html#extban
|
||||
|
||||
It magically guesses what char the IRCd uses for mutes."""
|
||||
|
||||
def char(self):
|
||||
if self.controller.extban_mute_char is None:
|
||||
raise runner.ExtbanNotSupported("", "mute")
|
||||
else:
|
||||
return self.controller.extban_mute_char
|
||||
|
||||
@cases.mark_specifications("Ergo")
|
||||
def testISupport(self):
|
||||
self.connectClient(1) # Fetches ISUPPORT
|
||||
isupport = self.server_support
|
||||
token = isupport["EXTBAN"]
|
||||
prefix, comma, types = token.partition(",")
|
||||
self.assertIn(self.char(), types, f"Missing '{self.char()}' in ISUPPORT EXTBAN")
|
||||
self.assertEqual(prefix, "")
|
||||
self.assertEqual(comma, ",")
|
||||
|
||||
@cases.mark_specifications("ircdocs")
|
||||
def testMuteExtban(self):
|
||||
"""Basic usage of mute"""
|
||||
self.connectClient("chanop", name="chanop")
|
||||
|
||||
isupport = self.server_support
|
||||
token = isupport.get("EXTBAN", "")
|
||||
prefix, comma, types = token.partition(",")
|
||||
if self.char() not in types:
|
||||
raise runner.ExtbanNotSupported(self.char(), "mute")
|
||||
|
||||
clients = ("chanop", "bar")
|
||||
|
||||
# Mute "bar"
|
||||
self.joinChannel("chanop", "#chan")
|
||||
self.getMessages("chanop")
|
||||
self.sendLine("chanop", f"MODE #chan +b {prefix}{self.char()}:bar!*@*")
|
||||
replies = {msg.command for msg in self.getMessages("chanop")}
|
||||
self.assertIn("MODE", replies)
|
||||
self.assertNotIn(ERR_CHANOPRIVSNEEDED, replies)
|
||||
|
||||
self.connectClient("bar", name="bar", capabilities=["echo-message"])
|
||||
self.joinChannel("bar", "#chan")
|
||||
|
||||
for client in clients:
|
||||
self.getMessages(client)
|
||||
|
||||
# "bar" sees the MODE too
|
||||
self.sendLine("bar", "MODE #chan +b")
|
||||
self.assertMessageMatch(
|
||||
self.getMessage("bar"),
|
||||
command="367",
|
||||
params=[
|
||||
"bar",
|
||||
"#chan",
|
||||
f"{prefix}{self.char()}:bar!*@*",
|
||||
StrRe("chanop(!.*)?"),
|
||||
*ANYLIST,
|
||||
],
|
||||
)
|
||||
self.getMessages("bar")
|
||||
|
||||
# "bar" talks: rejected
|
||||
self.sendLine("bar", "PRIVMSG #chan :hi from bar")
|
||||
replies = self.getMessages("bar")
|
||||
replies_cmds = {msg.command for msg in replies}
|
||||
self.assertNotIn("PRIVMSG", replies_cmds)
|
||||
self.assertIn(ERR_CANNOTSENDTOCHAN, replies_cmds)
|
||||
self.assertEqual(self.getMessages("chanop"), [])
|
||||
|
||||
# remove mute on "bar" with -b
|
||||
self.getMessages("chanop")
|
||||
self.sendLine("chanop", f"MODE #chan -b {prefix}{self.char()}:bar!*@*")
|
||||
replies = {msg.command for msg in self.getMessages("chanop")}
|
||||
self.assertIn("MODE", replies)
|
||||
self.assertNotIn(ERR_CHANOPRIVSNEEDED, replies)
|
||||
|
||||
# "bar" can now talk
|
||||
self.sendLine("bar", "PRIVMSG #chan :hi again from bar")
|
||||
replies = self.getMessages("bar")
|
||||
replies_cmds = {msg.command for msg in replies}
|
||||
self.assertIn("PRIVMSG", replies_cmds)
|
||||
self.assertNotIn(ERR_CANNOTSENDTOCHAN, replies_cmds)
|
||||
self.assertEqual(
|
||||
self.getMessages("chanop"),
|
||||
[msg for msg in replies if msg.command == "PRIVMSG"],
|
||||
)
|
||||
|
||||
@cases.mark_specifications("ircdocs")
|
||||
def testMuteExtbanVoiced(self):
|
||||
"""Checks +v overrides the mute"""
|
||||
self.connectClient("chanop", name="chanop")
|
||||
|
||||
isupport = self.server_support
|
||||
token = isupport.get("EXTBAN", "")
|
||||
prefix, comma, types = token.partition(",")
|
||||
if self.char() not in types:
|
||||
raise runner.ExtbanNotSupported(self.char(), "mute")
|
||||
|
||||
clients = ("chanop", "qux")
|
||||
|
||||
# Mute "qux"
|
||||
self.joinChannel("chanop", "#chan")
|
||||
self.getMessages("chanop")
|
||||
self.sendLine("chanop", f"MODE #chan +b {prefix}{self.char()}:qux!*@*")
|
||||
replies = {msg.command for msg in self.getMessages("chanop")}
|
||||
self.assertIn("MODE", replies)
|
||||
self.assertNotIn(ERR_CHANOPRIVSNEEDED, replies)
|
||||
|
||||
self.connectClient(
|
||||
"qux", name="qux", ident="evan", capabilities=["echo-message"]
|
||||
)
|
||||
self.joinChannel("qux", "#chan")
|
||||
|
||||
for client in clients:
|
||||
self.getMessages(client)
|
||||
|
||||
# "qux" talks: rejected
|
||||
self.sendLine("qux", "PRIVMSG #chan :hi from qux")
|
||||
replies = self.getMessages("qux")
|
||||
replies_cmds = {msg.command for msg in replies}
|
||||
self.assertNotIn("PRIVMSG", replies_cmds)
|
||||
self.assertIn(ERR_CANNOTSENDTOCHAN, replies_cmds)
|
||||
self.assertEqual(self.getMessages("chanop"), [])
|
||||
|
||||
for client in clients:
|
||||
self.getMessages(client)
|
||||
|
||||
# +v grants an exemption to +b
|
||||
self.sendLine("chanop", "MODE #chan +v qux")
|
||||
replies = {msg.command for msg in self.getMessages("chanop")}
|
||||
self.assertIn("MODE", replies)
|
||||
self.assertNotIn(ERR_CHANOPRIVSNEEDED, replies)
|
||||
|
||||
# so "qux" can now talk
|
||||
self.sendLine("qux", "PRIVMSG #chan :hi again from qux")
|
||||
replies = self.getMessages("qux")
|
||||
replies_cmds = {msg.command for msg in replies}
|
||||
self.assertIn("PRIVMSG", replies_cmds)
|
||||
self.assertNotIn(ERR_CANNOTSENDTOCHAN, replies_cmds)
|
||||
self.assertEqual(
|
||||
self.getMessages("chanop"),
|
||||
[msg for msg in replies if msg.command == "PRIVMSG"],
|
||||
)
|
||||
|
||||
@cases.mark_specifications("ircdocs")
|
||||
def testMuteExtbanExempt(self):
|
||||
"""Checks +e overrides the mute
|
||||
|
||||
<https://defs.ircdocs.horse/defs/chanmodes.html#e-ban-exception>"""
|
||||
self.connectClient("chanop", name="chanop")
|
||||
|
||||
isupport = self.server_support
|
||||
token = isupport.get("EXTBAN", "")
|
||||
prefix, comma, types = token.partition(",")
|
||||
if self.char() not in types:
|
||||
raise runner.ExtbanNotSupported(self.char(), "mute")
|
||||
if "e" not in self.server_support["CHANMODES"]:
|
||||
raise runner.ChannelModeNotSupported(self.char(), "mute")
|
||||
|
||||
clients = ("chanop", "qux")
|
||||
|
||||
# Mute "qux"
|
||||
self.joinChannel("chanop", "#chan")
|
||||
self.getMessages("chanop")
|
||||
self.sendLine("chanop", f"MODE #chan +b {prefix}{self.char()}:qux!*@*")
|
||||
replies = {msg.command for msg in self.getMessages("chanop")}
|
||||
self.assertIn("MODE", replies)
|
||||
self.assertNotIn(ERR_CHANOPRIVSNEEDED, replies)
|
||||
|
||||
self.connectClient(
|
||||
"qux", name="qux", ident="evan", capabilities=["echo-message"]
|
||||
)
|
||||
self.joinChannel("qux", "#chan")
|
||||
|
||||
for client in clients:
|
||||
self.getMessages(client)
|
||||
|
||||
# "qux" talks: rejected
|
||||
self.sendLine("qux", "PRIVMSG #chan :hi from qux")
|
||||
replies = self.getMessages("qux")
|
||||
replies_cmds = {msg.command for msg in replies}
|
||||
self.assertNotIn("PRIVMSG", replies_cmds)
|
||||
self.assertIn(ERR_CANNOTSENDTOCHAN, replies_cmds)
|
||||
self.assertEqual(self.getMessages("chanop"), [])
|
||||
|
||||
for client in clients:
|
||||
self.getMessages(client)
|
||||
|
||||
# +e grants an exemption to +b
|
||||
self.sendLine("chanop", f"MODE #chan +e {prefix}{self.char()}:*!~evan@*")
|
||||
replies = {msg.command for msg in self.getMessages("chanop")}
|
||||
self.assertIn("MODE", replies)
|
||||
self.assertNotIn(ERR_CHANOPRIVSNEEDED, replies)
|
||||
|
||||
self.getMessages("qux")
|
||||
|
||||
# so "qux" can now talk
|
||||
self.sendLine("qux", "PRIVMSG #chan :thanks for mute-excepting me")
|
||||
replies = self.getMessages("qux")
|
||||
replies_cmds = {msg.command for msg in replies}
|
||||
self.assertIn("PRIVMSG", replies_cmds)
|
||||
self.assertNotIn(ERR_CANNOTSENDTOCHAN, replies_cmds)
|
||||
self.assertEqual(
|
||||
self.getMessages("chanop"),
|
||||
[msg for msg in replies if msg.command == "PRIVMSG"],
|
||||
)
|
||||
|
||||
@cases.mark_specifications("Ergo")
|
||||
def testCapitalization(self):
|
||||
"""
|
||||
Regression test for oragono #1370: mutes not correctly enforced against
|
||||
users with capital letters in their NUH
|
||||
|
||||
For consistency with regular -b, which allows unsetting up to
|
||||
normalization
|
||||
"""
|
||||
clients = ("chanop", "bar")
|
||||
|
||||
self.connectClient("chanop", name="chanop")
|
||||
|
||||
isupport = self.server_support
|
||||
token = isupport.get("EXTBAN", "")
|
||||
prefix, comma, types = token.partition(",")
|
||||
|
||||
self.joinChannel("chanop", "#chan")
|
||||
self.getMessages("chanop")
|
||||
self.sendLine("chanop", f"MODE #chan +b {prefix}{self.char()}:BAR!*@*")
|
||||
replies = {msg.command for msg in self.getMessages("chanop")}
|
||||
self.assertIn("MODE", replies)
|
||||
self.assertNotIn(ERR_CHANOPRIVSNEEDED, replies)
|
||||
|
||||
self.connectClient("Bar", name="bar", capabilities=["echo-message"])
|
||||
self.joinChannel("bar", "#chan")
|
||||
|
||||
for client in clients:
|
||||
self.getMessages(client)
|
||||
|
||||
self.sendLine("bar", "PRIVMSG #chan :hi from bar")
|
||||
replies = self.getMessages("bar")
|
||||
replies_cmds = {msg.command for msg in replies}
|
||||
self.assertNotIn("PRIVMSG", replies_cmds)
|
||||
self.assertIn(ERR_CANNOTSENDTOCHAN, replies_cmds)
|
||||
self.assertEqual(self.getMessages("chanop"), [])
|
||||
|
||||
# remove mute with -b
|
||||
self.sendLine("chanop", f"MODE #chan -b {prefix}{self.char()}:bar!*@*")
|
||||
replies = {msg.command for msg in self.getMessages("chanop")}
|
||||
self.assertIn("MODE", replies)
|
||||
self.assertNotIn(ERR_CHANOPRIVSNEEDED, replies)
|
||||
|
||||
# "bar" can talk again
|
||||
self.sendLine("bar", "PRIVMSG #chan :hi again from bar")
|
||||
replies = self.getMessages("bar")
|
||||
replies_cmds = {msg.command for msg in replies}
|
||||
self.assertIn("PRIVMSG", replies_cmds)
|
||||
self.assertNotIn(ERR_CANNOTSENDTOCHAN, replies_cmds)
|
||||
self.assertEqual(
|
||||
self.getMessages("chanop"),
|
||||
[msg for msg in replies if msg.command == "PRIVMSG"],
|
||||
)
|
Reference in New Issue
Block a user