mirror of
https://github.com/progval/irctest.git
synced 2025-04-05 14:59:49 +00:00
111 lines
5.7 KiB
Python
111 lines
5.7 KiB
Python
import time
|
|
|
|
from irctest import cases
|
|
from irctest.irc_utils.junkdrawer import ircv3_timestamp_to_unixtime
|
|
from irctest.irc_utils.junkdrawer import to_history_message
|
|
from irctest.irc_utils.junkdrawer import random_name
|
|
|
|
|
|
def extract_playback_privmsgs(messages):
|
|
# convert the output of a playback command, drop the echo message
|
|
result = []
|
|
for msg in messages:
|
|
if msg.command == 'PRIVMSG' and msg.params[0].lower() != '*playback':
|
|
result.append(to_history_message(msg))
|
|
return result
|
|
|
|
|
|
class ZncPlaybackTestCase(cases.BaseServerTestCase):
|
|
|
|
def customizedConfig(self):
|
|
return self.controller.addMysqlToConfig()
|
|
|
|
@cases.SpecificationSelector.requiredBySpecification('Oragono')
|
|
def testZncPlayback(self):
|
|
early_time = int(time.time() - 60)
|
|
|
|
chname = random_name('#znc_channel')
|
|
bar, pw = random_name('bar'), random_name('pass')
|
|
self.controller.registerUser(self, bar, pw)
|
|
self.connectClient(bar, name=bar, capabilities=['batch', 'labeled-response', 'message-tags', 'server-time', 'echo-message'], password=pw)
|
|
self.joinChannel(bar, chname)
|
|
|
|
qux = random_name('qux')
|
|
self.connectClient(qux, name=qux, capabilities=['batch', 'labeled-response', 'message-tags', 'server-time', 'echo-message'])
|
|
self.joinChannel(qux, chname)
|
|
|
|
self.sendLine(qux, 'PRIVMSG %s :hi there' % (bar,))
|
|
dm = to_history_message([msg for msg in self.getMessages(qux) if msg.command == 'PRIVMSG'][0])
|
|
self.assertEqual(dm.text, 'hi there')
|
|
|
|
NUM_MESSAGES = 10
|
|
echo_messages = []
|
|
for i in range(NUM_MESSAGES):
|
|
self.sendLine(qux, 'PRIVMSG %s :this is message %d' % (chname, i))
|
|
echo_messages.extend(to_history_message(msg) for msg in self.getMessages(qux) if msg.command == 'PRIVMSG')
|
|
time.sleep(0.003)
|
|
self.assertEqual(len(echo_messages), NUM_MESSAGES)
|
|
|
|
self.getMessages(bar)
|
|
|
|
# reattach to 'bar'
|
|
self.connectClient(bar, name='viewer', capabilities=['batch', 'labeled-response', 'message-tags', 'server-time', 'echo-message'], password=pw)
|
|
self.sendLine('viewer', 'PRIVMSG *playback :play * %d' % (early_time,))
|
|
messages = extract_playback_privmsgs(self.getMessages('viewer'))
|
|
self.assertEqual(set(messages), set([dm] + echo_messages))
|
|
self.sendLine('viewer', 'QUIT')
|
|
self.assertDisconnected('viewer')
|
|
|
|
# reattach to 'bar', play back selectively
|
|
self.connectClient(bar, name='viewer', capabilities=['batch', 'labeled-response', 'message-tags', 'server-time', 'echo-message'], password=pw)
|
|
mid_timestamp = ircv3_timestamp_to_unixtime(echo_messages[5].time)
|
|
# exclude message 5 itself (oragono's CHATHISTORY implementation corrects for this, but znc.in/playback does not because whatever)
|
|
mid_timestamp += .001
|
|
self.sendLine('viewer', 'PRIVMSG *playback :play * %s' % (mid_timestamp,))
|
|
messages = extract_playback_privmsgs(self.getMessages('viewer'))
|
|
self.assertEqual(messages, echo_messages[6:])
|
|
self.sendLine('viewer', 'QUIT')
|
|
self.assertDisconnected('viewer')
|
|
|
|
# reattach to 'bar', play back selectively (pass a parameter and 2 timestamps)
|
|
self.connectClient(bar, name='viewer', capabilities=['batch', 'labeled-response', 'message-tags', 'server-time', 'echo-message'], password=pw)
|
|
start_timestamp = ircv3_timestamp_to_unixtime(echo_messages[2].time)
|
|
start_timestamp += .001
|
|
end_timestamp = ircv3_timestamp_to_unixtime(echo_messages[7].time)
|
|
self.sendLine('viewer', 'PRIVMSG *playback :play %s %s %s' % (chname, start_timestamp, end_timestamp,))
|
|
messages = extract_playback_privmsgs(self.getMessages('viewer'))
|
|
self.assertEqual(messages, echo_messages[3:7])
|
|
# test nicknames as targets
|
|
self.sendLine('viewer', 'PRIVMSG *playback :play %s %d' % (qux, early_time,))
|
|
messages = extract_playback_privmsgs(self.getMessages('viewer'))
|
|
self.assertEqual(messages, [dm])
|
|
self.sendLine('viewer', 'PRIVMSG *playback :play %s %d' % (qux.upper(), early_time,))
|
|
messages = extract_playback_privmsgs(self.getMessages('viewer'))
|
|
self.assertEqual(messages, [dm])
|
|
self.sendLine('viewer', 'QUIT')
|
|
self.assertDisconnected('viewer')
|
|
|
|
# test 2-argument form
|
|
self.connectClient(bar, name='viewer', capabilities=['batch', 'labeled-response', 'message-tags', 'server-time', 'echo-message'], password=pw)
|
|
self.sendLine('viewer', 'PRIVMSG *playback :play %s' % (chname,))
|
|
messages = extract_playback_privmsgs(self.getMessages('viewer'))
|
|
self.assertEqual(messages, echo_messages)
|
|
self.sendLine('viewer', 'PRIVMSG *playback :play *self')
|
|
messages = extract_playback_privmsgs(self.getMessages('viewer'))
|
|
self.assertEqual(messages, [dm])
|
|
self.sendLine('viewer', 'PRIVMSG *playback :play *')
|
|
messages = extract_playback_privmsgs(self.getMessages('viewer'))
|
|
self.assertEqual(set(messages), set([dm] + echo_messages))
|
|
self.sendLine('viewer', 'QUIT')
|
|
self.assertDisconnected('viewer')
|
|
|
|
# test limiting behavior
|
|
config = self.controller.getConfig()
|
|
config['history']['znc-maxmessages'] = 5
|
|
self.controller.rehash(self, config)
|
|
self.connectClient(bar, name='viewer', capabilities=['batch', 'labeled-response', 'message-tags', 'server-time', 'echo-message'], password=pw)
|
|
self.sendLine('viewer', 'PRIVMSG *playback :play %s %d' % (chname, int(time.time() - 60)))
|
|
messages = extract_playback_privmsgs(self.getMessages('viewer'))
|
|
# should receive the latest 5 messages
|
|
self.assertEqual(messages, echo_messages[5:])
|