""" Tests section 4.1 of RFC 1459. TODO: cross-reference Modern and RFC 2812 too """ import time from irctest import cases from irctest.client_mock import ConnectionClosed from irctest.numerics import ERR_NEEDMOREPARAMS, ERR_PASSWDMISMATCH from irctest.patma import ANYLIST, ANYSTR, OptStrRe, StrRe class PasswordedConnectionRegistrationTestCase(cases.BaseServerTestCase): password = "testpassword" @cases.mark_specifications("RFC1459", "RFC2812") def testPassBeforeNickuser(self): self.addClient() self.sendLine(1, "PASS {}".format(self.password)) self.sendLine(1, "NICK foo") self.sendLine(1, "USER username * * :Realname") m = self.getRegistrationMessage(1) self.assertMessageMatch( m, command="001", fail_msg="Did not get 001 after correct PASS+NICK+USER: {msg}", ) @cases.mark_specifications("RFC1459", "RFC2812") def testNoPassword(self): self.addClient() self.sendLine(1, "NICK foo") self.sendLine(1, "USER username * * :Realname") m = self.getRegistrationMessage(1) self.assertNotEqual( m.command, "001", msg="Got 001 after NICK+USER but missing PASS" ) @cases.mark_specifications("Modern") def testWrongPassword(self): """ "If the password supplied does not match the password expected by the server, then the server SHOULD send ERR_PASSWDMISMATCH and MUST close the connection with ERROR." -- https://github.com/ircdocs/modern-irc/pull/172 """ self.addClient() self.sendLine(1, "PASS {}".format(self.password + "garbage")) self.sendLine(1, "NICK foo") self.sendLine(1, "USER username * * :Realname") m = self.getRegistrationMessage(1) self.assertNotEqual( m.command, "001", msg="Got 001 after NICK+USER but incorrect PASS" ) self.assertIn(m.command, {ERR_PASSWDMISMATCH, "ERROR"}) if m.command == "ERR_PASSWDMISMATCH": m = self.getRegistrationMessage(1) self.assertEqual( m.command, "ERROR", msg="ERR_PASSWDMISMATCH not followed by ERROR." ) @cases.mark_specifications("RFC1459", "RFC2812", strict=True) def testPassAfterNickuser(self): """“The password can and must be set before any attempt to register the connection is made.” -- “The optional password can and MUST be set before any attempt to register the connection is made. Currently this requires that user send a PASS command before sending the NICK/USER combination.” -- """ self.addClient() self.sendLine(1, "NICK foo") self.sendLine(1, "USER username * * :Realname") self.sendLine(1, "PASS {}".format(self.password)) m = self.getRegistrationMessage(1) self.assertNotEqual(m.command, "001", "Got 001 after PASS sent after NICK+USER") class ConnectionRegistrationTestCase(cases.BaseServerTestCase): def testConnectionRegistration(self): self.addClient() self.sendLine(1, "NICK foo") self.sendLine(1, "USER foo * * :foo") for numeric in ("001", "002", "003"): self.assertMessageMatch( self.getRegistrationMessage(1), command=numeric, params=["foo", ANYSTR], ) self.assertMessageMatch( self.getRegistrationMessage(1), command="004", # RPL_MYINFO params=[ "foo", "My.Little.Server", ANYSTR, # version StrRe("[a-zA-Z]+"), # user modes StrRe("[a-zA-Z]+"), # channel modes OptStrRe("[a-zA-Z]+"), # channel modes with parameter ], ) # ISUPPORT m = self.getRegistrationMessage(1) while True: self.assertMessageMatch( m, command="005", params=["foo", *ANYLIST], ) m = self.getRegistrationMessage(1) if m.command != "005": break if m.command in ("042", "396"): # RPL_YOURID / RPL_VISIBLEHOST, non-standard m = self.getRegistrationMessage(1) # LUSERS while m.command in ("250", "251", "252", "253", "254", "255", "265", "266"): m = self.getRegistrationMessage(1) if m.command == "375": # RPL_MOTDSTART self.assertMessageMatch( m, command="375", params=["foo", ANYSTR], ) while (m := self.getRegistrationMessage(1)).command == "372": self.assertMessageMatch( m, command="372", # RPL_MOTD params=["foo", ANYSTR], ) self.assertMessageMatch( m, command="376", # RPL_ENDOFMOTD params=["foo", ANYSTR], ) else: self.assertMessageMatch( m, command="422", # ERR_NOMOTD params=["foo", ANYSTR], ) # User mode if m.command == "MODE": self.assertMessageMatch( m, command="MODE", params=["foo", ANYSTR, *ANYLIST], ) m = self.getRegistrationMessage(1) elif m.command == "221": # RPL_UMODEIS self.assertMessageMatch( m, command="221", params=["foo", ANYSTR, *ANYLIST], ) m = self.getRegistrationMessage(1) else: print("Warning: missing MODE") @cases.mark_specifications("RFC1459") def testQuitDisconnects(self): """“The server must close the connection to a client which sends a QUIT message.” -- """ self.connectClient("foo") self.getMessages(1) self.sendLine(1, "QUIT") with self.assertRaises(ConnectionClosed): self.getMessages(1) # Fetch remaining messages self.getMessages(1) @cases.mark_specifications("RFC2812") @cases.xfailIfSoftware(["Charybdis", "Solanum"], "very flaky") @cases.xfailIfSoftware( ["ircu2", "Nefarious", "snircd"], "ircu2 does not send ERROR" ) def testQuitErrors(self): """“A client session is terminated with a quit message. The server acknowledges this by sending an ERROR message to the client.” -- """ self.connectClient("foo") self.getMessages(1) self.sendLine(1, "QUIT") while True: try: new_messages = self.getMessages(1) if not new_messages: break commands = {m.command for m in new_messages} except ConnectionClosed: break self.assertIn( "ERROR", commands, fail_msg="Did not receive ERROR as a reply to QUIT." ) def testNickCollision(self): """A user connects and requests the same nickname as an already registered user. """ self.connectClient("foo") self.addClient() self.sendLine(2, "NICK foo") self.sendLine(2, "USER username * * :Realname") m = self.getRegistrationMessage(2) self.assertNotEqual( m.command, "001", "Received 001 after registering with the nick of a registered user.", ) def testEarlyNickCollision(self): """Two users register simultaneously with the same nick.""" self.addClient() self.addClient() self.sendLine(1, "NICK foo") self.sendLine(2, "NICK foo") self.sendLine(1, "USER username * * :Realname") try: self.sendLine(2, "USER username * * :Realname") except (ConnectionClosed, ConnectionResetError): # Bahamut closes the connection here pass try: m1 = self.getRegistrationMessage(1) except (ConnectionClosed, ConnectionResetError): # Unreal closes the connection, see # https://bugs.unrealircd.org/view.php?id=5950 command1 = None else: command1 = m1.command try: m2 = self.getRegistrationMessage(2) except (ConnectionClosed, ConnectionResetError): # ditto command2 = None else: command2 = m2.command self.assertNotEqual( (command1, command2), ("001", "001"), "Two concurrently registering requesting the same nickname " "both got 001.", ) self.assertIn( "001", (command1, command2), "Two concurrently registering requesting the same nickname " "neither got 001.", ) @cases.xfailIfSoftware( ["ircu2", "Nefarious", "ngIRCd"], "uses a default value instead of ERR_NEEDMOREPARAMS", ) def testEmptyRealname(self): """ Syntax: " :Not enough parameters" -- https://defs.ircdocs.horse/defs/numerics.html#err-needmoreparams-461 -- https://modern.ircdocs.horse/#errneedmoreparams-461 Use of this numeric: "The minimum length of `` is 1, ie. it MUST not be empty. If it is empty, the server SHOULD reject the command with ERR_NEEDMOREPARAMS (even an empty parameter is provided)" https://github.com/ircdocs/modern-irc/issues/85 """ self.addClient() self.sendLine(1, "NICK foo") self.sendLine(1, "USER username * * :") self.assertMessageMatch( self.getRegistrationMessage(1), command=ERR_NEEDMOREPARAMS, params=[StrRe(r"(\*|foo)"), "USER", ANYSTR], ) def testNonutf8Realname(self): self.addClient() self.sendLine(1, "NICK foo") line = b"USER username * * :i\xe8rc\xe9\r\n" print("1 -> S (repr): " + repr(line)) self.clients[1].conn.sendall(line) for _ in range(10): time.sleep(1) d = self.clients[1].conn.recv(10000) self.assertTrue(d, "Server closed connection") print("S -> 1 (repr): " + repr(d)) if b" 001 " in d: break if b"ERROR " in d or b" FAIL " in d: # Rejected; nothing more to test. return for line in d.split(b"\r\n"): if line.startswith(b"PING "): line = line.replace(b"PING", b"PONG") + b"\r\n" print("1 -> S (repr): " + repr(line)) self.clients[1].conn.sendall(line) else: self.assertTrue(False, "stuck waiting") self.sendLine(1, "WHOIS foo") time.sleep(3) # for ngIRCd d = self.clients[1].conn.recv(10000) print("S -> 1 (repr): " + repr(d)) self.assertIn(b"username", d) def testNonutf8Username(self): self.addClient() self.sendLine(1, "NICK foo") self.sendLine(1, "USER 😊😊😊😊😊😊😊😊😊😊 * * :realname") for _ in range(10): time.sleep(1) d = self.clients[1].conn.recv(10000) self.assertTrue(d, "Server closed connection") print("S -> 1 (repr): " + repr(d)) if b" 001 " in d: break if b" 468" in d or b"ERROR " in d: # Rejected; nothing more to test. return for line in d.split(b"\r\n"): if line.startswith(b"PING "): line = line.replace(b"PING", b"PONG") + b"\r\n" print("1 -> S (repr): " + repr(line)) self.clients[1].conn.sendall(line) else: self.assertTrue(False, "stuck waiting") self.sendLine(1, "WHOIS foo") d = self.clients[1].conn.recv(10000) print("S -> 1 (repr): " + repr(d)) self.assertIn(b"realname", d)