add a test for message-tags

This commit is contained in:
Shivaram Lingamneni 2020-11-26 00:25:52 -05:00
parent b7975ada46
commit d741ab86d5
5 changed files with 84 additions and 9 deletions

View File

@ -24,7 +24,7 @@ class ClientMock:
assert not self.ssl, 'SSL already active.'
self.conn = ssl.wrap_socket(self.conn)
self.ssl = True
def getMessages(self, synchronize=True, assert_get_one=False):
def getMessages(self, synchronize=True, assert_get_one=False, raw=False):
if synchronize:
token = 'synchronize{}'.format(time.monotonic())
self.sendLine('PING {}'.format(token))
@ -64,12 +64,15 @@ class ClientMock:
ssl=' (ssl)' if self.ssl else '',
client=self.name,
line=line))
message = message_parser.parse_message(line + '\r\n')
message = message_parser.parse_message(line)
if message.command == 'PONG' and \
token in message.params:
got_pong = True
else:
messages.append(message)
if raw:
messages.append(line)
else:
messages.append(message)
data = b''
except ConnectionClosed:
if messages:
@ -78,11 +81,11 @@ class ClientMock:
raise
else:
return messages
def getMessage(self, filter_pred=None, synchronize=True):
def getMessage(self, filter_pred=None, synchronize=True, raw=False):
while True:
if not self.inbuffer:
self.inbuffer = self.getMessages(
synchronize=synchronize, assert_get_one=True)
synchronize=synchronize, assert_get_one=True, raw=raw)
if not self.inbuffer:
raise NoMessageException()
message = self.inbuffer.pop(0) # TODO: use dequeue

View File

@ -37,8 +37,7 @@ def parse_message(s):
http://tools.ietf.org/html/rfc1459#section-2.3.1
and
http://ircv3.net/specs/core/message-tags-3.2.html"""
assert s.endswith('\r\n'), 'Message does not end with CR LF: {!r}'.format(s)
s = s[0:-2]
s = s.rstrip('\r\n')
if s.startswith('@'):
(tags, s) = s.split(' ', 1)
tags = parse_tags(tags[1:])

View File

@ -0,0 +1,71 @@
"""
https://ircv3.net/specs/extensions/message-tags.html
"""
from irctest import cases
from irctest.irc_utils.message_parser import parse_message
class MessageTagsTestCase(cases.BaseServerTestCase, cases.OptionalityHelper):
@cases.SpecificationSelector.requiredBySpecification('message-tags')
def testBasic(self):
def getAllMessages():
for name in ['alice', 'bob', 'carol']:
self.getMessages(name)
def assertNoTags(line):
# tags start with '@', without tags we start with the prefix,
# which begins with ':'
self.assertEqual(line[0], ':')
msg = parse_message(line)
self.assertEqual(msg.tags, {})
return msg
self.connectClient('alice', name='alice', capabilities=['message-tags'])
self.joinChannel('alice', '#test')
self.connectClient('bob', name='bob', capabilities=['message-tags', 'echo-message'])
self.joinChannel('bob', '#test')
self.connectClient('carol', name='carol')
self.joinChannel('carol', '#test')
getAllMessages()
self.sendLine('alice', '@+baz=bat;fizz=buzz PRIVMSG #test hi')
self.getMessages('alice')
bob_msg = self.getMessage('bob')
carol_line = self.getMessage('carol', raw=True)
self.assertMessageEqual(bob_msg, command='PRIVMSG', params=['#test', 'hi'])
self.assertEqual(bob_msg.tags['+baz'], "bat")
self.assertIn('msgid', bob_msg.tags)
# should not relay a non-client-only tag
self.assertNotIn('fizz', bob_msg.tags)
# carol MUST NOT receive tags
carol_msg = assertNoTags(carol_line)
self.assertMessageEqual(carol_msg, command='PRIVMSG', params=['#test', 'hi'])
getAllMessages()
self.sendLine('bob', '@+bat=baz;+fizz=buzz PRIVMSG #test :hi yourself')
bob_msg = self.getMessage('bob') # bob has echo-message
alice_msg = self.getMessage('alice')
carol_line = self.getMessage('carol', raw=True)
carol_msg = assertNoTags(carol_line)
for msg in [alice_msg, bob_msg, carol_msg]:
self.assertMessageEqual(msg, command='PRIVMSG', params=['#test', 'hi yourself'])
for msg in [alice_msg, bob_msg]:
self.assertEqual(msg.tags['+bat'], 'baz')
self.assertEqual(msg.tags['+fizz'], 'buzz')
self.assertTrue(alice_msg.tags['msgid'])
self.assertEqual(alice_msg.tags['msgid'], bob_msg.tags['msgid'])
# test TAGMSG and basic escaping
self.sendLine('bob', '@+buzz=fizz\:buzz;cat=dog;+steel=wootz TAGMSG #test')
bob_msg = self.getMessage('bob') # bob has echo-message
alice_msg = self.getMessage('alice')
# carol MUST NOT receive TAGMSG at all
self.assertEqual(self.getMessages('carol'), [])
for msg in [alice_msg, bob_msg]:
self.assertMessageEqual(alice_msg, command='TAGMSG', params=['#test'])
self.assertEqual(msg.tags['+buzz'], 'fizz;buzz')
self.assertEqual(msg.tags['+steel'], 'wootz')
self.assertNotIn('cat', msg.tags)
self.assertTrue(alice_msg.tags['msgid'])
self.assertEqual(alice_msg.tags['msgid'], bob_msg.tags['msgid'])

View File

@ -12,7 +12,7 @@ base_caps = ['message-tags', 'batch', 'echo-message', 'server-time', 'labeled-re
class MultilineTestCase(cases.BaseServerTestCase, cases.OptionalityHelper):
@cases.SpecificationSelector.requiredBySpecification('Oragono')
@cases.SpecificationSelector.requiredBySpecification('multiline')
def testBasic(self):
self.connectClient('alice', capabilities=(base_caps + [CAP_NAME]))
self.joinChannel(1, '#test')
@ -75,7 +75,7 @@ class MultilineTestCase(cases.BaseServerTestCase, cases.OptionalityHelper):
self.assertEqual(relayed_fmsgids, [msgid] + [None]*(len(fallback_relay)-1))
@cases.SpecificationSelector.requiredBySpecification('Oragono')
@cases.SpecificationSelector.requiredBySpecification('multiline')
def testBlankLines(self):
self.connectClient('alice', capabilities=(base_caps + [CAP_NAME]))
self.joinChannel(1, '#test')

View File

@ -9,6 +9,8 @@ class Specifications(enum.Enum):
IRC302 = 'IRCv3.2'
IRC302Deprecated = 'IRCv3.2-deprecated'
Oragono = 'Oragono'
Multiline = 'multiline'
MessageTags = 'message-tags'
@classmethod
def of_name(cls, name):