Move ClientMock to its own file.

This commit is contained in:
Valentin Lorentz
2015-12-21 20:13:16 +01:00
parent d2616dc7d7
commit 3a3b504762
3 changed files with 90 additions and 81 deletions

View File

@ -6,16 +6,12 @@ import collections
import supybot.utils
from . import client_mock
from . import authentication
from . import optional_extensions
from .irc_utils import message_parser
from .irc_utils import capabilities
class ConnectionClosed(Exception):
pass
class NoMessageException(AssertionError):
pass
class _IrcTestCase(unittest.TestCase):
"""Base class for test cases."""
controllerClass = None # Will be set by __main__.py
@ -182,80 +178,6 @@ class ClientNegociationHelper:
else:
return m
class Client:
def __init__(self, name, show_io):
self.name = name
self.show_io = show_io
self.inbuffer = []
def connect(self, hostname, port):
self.conn = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.conn.settimeout(1) # TODO: configurable
self.conn.connect((hostname, port))
if self.show_io:
print('{:.3f} {}: connects to server.'.format(time.time(), self.name))
def disconnect(self):
if self.show_io:
print('{:.3f} {}: disconnects from server.'.format(time.time(), self.name))
self.conn.close()
def getMessages(self, synchronize=True, assert_get_one=False):
if synchronize:
token = 'synchronize{}'.format(time.monotonic())
self.sendLine('PING {}'.format(token))
got_pong = False
data = b''
messages = []
conn = self.conn
while not got_pong:
try:
new_data = conn.recv(4096)
except socket.timeout:
if not assert_get_one and not synchronize and data == b'':
# Received nothing
return []
if self.show_io:
print('{:.3f} waiting…'.format(time.time()))
time.sleep(0.1)
continue
else:
if not new_data:
# Connection closed
raise ConnectionClosed()
data += new_data
if not new_data.endswith(b'\r\n'):
time.sleep(0.1)
continue
if not synchronize:
got_pong = True
for line in data.decode().split('\r\n'):
if line:
if self.show_io:
print('{:.3f} S -> {}: {}'.format(time.time(), self.name, line.strip()))
message = message_parser.parse_message(line + '\r\n')
if message.command == 'PONG' and \
token in message.params:
got_pong = True
else:
messages.append(message)
data = b''
return messages
def getMessage(self, filter_pred=None, synchronize=True):
while True:
if not self.inbuffer:
self.inbuffer = self.getMessages(
synchronize=synchronize, assert_get_one=True)
if not self.inbuffer:
raise NoMessageException()
message = self.inbuffer.pop(0) # TODO: use dequeue
if not filter_pred or filter_pred(message):
return message
def sendLine(self, line):
ret = self.conn.sendall(line.encode())
assert ret is None
if not line.endswith('\r\n'):
ret = self.conn.sendall(b'\r\n')
assert ret is None
if self.show_io:
print('{:.3f} {} -> S: {}'.format(time.time(), self.name, line.strip()))
class BaseServerTestCase(_IrcTestCase):
"""Basic class for server tests. Handles spawning a server and exchanging
@ -283,7 +205,8 @@ class BaseServerTestCase(_IrcTestCase):
if not name:
name = max(map(int, list(self.clients)+[0]))+1
show_io = show_io if show_io is not None else self.show_io
self.clients[name] = Client(name=name, show_io=show_io)
self.clients[name] = client_mock.ClientMock(name=name,
show_io=show_io)
self.clients[name].connect(self.hostname, self.port)
return name

85
irctest/client_mock.py Normal file
View File

@ -0,0 +1,85 @@
import time
import socket
from .irc_utils import message_parser
class NoMessageException(AssertionError):
pass
class ConnectionClosed(Exception):
pass
class ClientMock:
def __init__(self, name, show_io):
self.name = name
self.show_io = show_io
self.inbuffer = []
def connect(self, hostname, port):
self.conn = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.conn.settimeout(1) # TODO: configurable
self.conn.connect((hostname, port))
if self.show_io:
print('{:.3f} {}: connects to server.'.format(time.time(), self.name))
def disconnect(self):
if self.show_io:
print('{:.3f} {}: disconnects from server.'.format(time.time(), self.name))
self.conn.close()
def getMessages(self, synchronize=True, assert_get_one=False):
if synchronize:
token = 'synchronize{}'.format(time.monotonic())
self.sendLine('PING {}'.format(token))
got_pong = False
data = b''
messages = []
conn = self.conn
while not got_pong:
try:
new_data = conn.recv(4096)
except socket.timeout:
if not assert_get_one and not synchronize and data == b'':
# Received nothing
return []
if self.show_io:
print('{:.3f} waiting…'.format(time.time()))
time.sleep(0.1)
continue
else:
if not new_data:
# Connection closed
raise ConnectionClosed()
data += new_data
if not new_data.endswith(b'\r\n'):
time.sleep(0.1)
continue
if not synchronize:
got_pong = True
for line in data.decode().split('\r\n'):
if line:
if self.show_io:
print('{:.3f} S -> {}: {}'.format(time.time(), self.name, line.strip()))
message = message_parser.parse_message(line + '\r\n')
if message.command == 'PONG' and \
token in message.params:
got_pong = True
else:
messages.append(message)
data = b''
return messages
def getMessage(self, filter_pred=None, synchronize=True):
while True:
if not self.inbuffer:
self.inbuffer = self.getMessages(
synchronize=synchronize, assert_get_one=True)
if not self.inbuffer:
raise NoMessageException()
message = self.inbuffer.pop(0) # TODO: use dequeue
if not filter_pred or filter_pred(message):
return message
def sendLine(self, line):
ret = self.conn.sendall(line.encode())
assert ret is None
if not line.endswith('\r\n'):
ret = self.conn.sendall(b'\r\n')
assert ret is None
if self.show_io:
print('{:.3f} {} -> S: {}'.format(time.time(), self.name, line.strip()))

View File

@ -7,6 +7,7 @@ from irctest import cases
from irctest import authentication
from irctest.irc_utils.message_parser import Message
from irctest.basecontrollers import NotImplementedByController
from irctest.client_mock import ConnectionClosed
class PasswordedConnectionRegistrationTestCase(cases.BaseServerTestCase):
password = 'testpassword'
@ -53,7 +54,7 @@ class ConnectionRegistrationTestCase(cases.BaseServerTestCase):
self.connectClient('foo')
self.getMessages(1)
self.sendLine(1, 'QUIT')
self.assertRaises(cases.ConnectionClosed, self.getMessages, 1)
self.assertRaises(ConnectionClosed, self.getMessages, 1)
def testNickCollision(self):
"""A user connects and requests the same nickname as an already