irctest/irctest/client_mock.py

110 lines
4.1 KiB
Python
Raw Normal View History

2015-12-25 14:45:06 +00:00
import ssl
2015-12-21 19:13:16 +00:00
import time
import socket
from .irc_utils import message_parser
class NoMessageException(AssertionError):
pass
class ConnectionClosed(Exception):
pass
class ClientMock:
def __init__(self, name, show_io):
self.name = name
self.show_io = show_io
self.inbuffer = []
2015-12-25 14:45:06 +00:00
self.ssl = False
2015-12-21 19:13:16 +00:00
def connect(self, hostname, port):
self.conn = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.conn.settimeout(1) # TODO: configurable
self.conn.connect((hostname, port))
if self.show_io:
print('{:.3f} {}: connects to server.'.format(time.time(), self.name))
def disconnect(self):
if self.show_io:
print('{:.3f} {}: disconnects from server.'.format(time.time(), self.name))
self.conn.close()
2015-12-25 14:45:06 +00:00
def starttls(self):
assert not self.ssl, 'SSL already active.'
self.conn = ssl.wrap_socket(self.conn)
self.ssl = True
2015-12-21 19:13:16 +00:00
def getMessages(self, synchronize=True, assert_get_one=False):
if synchronize:
token = 'synchronize{}'.format(time.monotonic())
self.sendLine('PING {}'.format(token))
got_pong = False
data = b''
(self.inbuffer, messages) = ([], self.inbuffer)
2015-12-21 19:13:16 +00:00
conn = self.conn
try:
while not got_pong:
try:
new_data = conn.recv(4096)
except socket.timeout:
if not assert_get_one and not synchronize and data == b'':
# Received nothing
return []
2015-12-21 19:13:16 +00:00
if self.show_io:
print('{:.3f} waiting…'.format(time.time()))
time.sleep(0.1)
continue
else:
if not new_data:
# Connection closed
raise ConnectionClosed()
data += new_data
if not new_data.endswith(b'\r\n'):
time.sleep(0.1)
continue
if not synchronize:
got_pong = True
for line in data.decode().split('\r\n'):
if line:
if self.show_io:
2015-12-25 14:45:06 +00:00
print('{time:.3f}{ssl} S -> {client}: {line}'.format(
time=time.time(),
ssl=' (ssl)' if self.ssl else '',
client=self.name,
line=line))
message = message_parser.parse_message(line + '\r\n')
if message.command == 'PONG' and \
token in message.params:
got_pong = True
else:
messages.append(message)
data = b''
except ConnectionClosed:
if messages:
return messages
else:
raise
else:
return messages
2015-12-21 19:13:16 +00:00
def getMessage(self, filter_pred=None, synchronize=True):
while True:
if not self.inbuffer:
self.inbuffer = self.getMessages(
synchronize=synchronize, assert_get_one=True)
if not self.inbuffer:
raise NoMessageException()
message = self.inbuffer.pop(0) # TODO: use dequeue
if not filter_pred or filter_pred(message):
return message
def sendLine(self, line):
if not line.endswith('\r\n'):
2015-12-25 14:45:06 +00:00
line += '\r\n'
encoded_line = line.encode()
ret = self.conn.sendall(encoded_line)
if self.ssl: # https://bugs.python.org/issue25951
2015-12-25 14:45:06 +00:00
assert ret == len(encoded_line), (ret, repr(encoded_line))
else:
assert ret is None, ret
2015-12-21 19:13:16 +00:00
if self.show_io:
2015-12-25 14:45:06 +00:00
print('{time:.3f}{ssl} {client} -> S: {line}'.format(
time=time.time(),
ssl=' (ssl)' if self.ssl else '',
client=self.name,
line=line.strip('\r\n')))