mirror of
https://github.com/progval/irctest.git
synced 2025-04-04 22:39:50 +00:00
Merge branch 'deps'
This commit is contained in:
10
README.md
10
README.md
@ -6,16 +6,6 @@ Some of these tests may be applicable to other projects (we attempt to mark the
|
||||
|
||||
This suite needs more test cases. Contributions are welcome and are a great way to help the Oragono project!
|
||||
|
||||
## Installing
|
||||
|
||||
Clone the repo and install the relevant dependencies:
|
||||
|
||||
```
|
||||
virtualenv ./venv
|
||||
source ./venv/bin/activate
|
||||
pip install -r requirements.txt
|
||||
```
|
||||
|
||||
## Running Tests
|
||||
|
||||
Make sure the version of `oragono` you want to test is on your PATH. Then run `make`.
|
||||
|
@ -4,7 +4,6 @@ import socket
|
||||
import tempfile
|
||||
import time
|
||||
import subprocess
|
||||
import psutil
|
||||
|
||||
from .runner import NotImplementedByController
|
||||
|
||||
@ -84,7 +83,9 @@ class BaseServerController(_BaseController):
|
||||
def wait_for_port(self):
|
||||
while not self.port_open:
|
||||
time.sleep(0.1)
|
||||
for conn in psutil.Process(self.proc.pid).connections():
|
||||
if conn.laddr[1] == self.port:
|
||||
self.port_open = True
|
||||
|
||||
try:
|
||||
c = socket.create_connection(('localhost', self.port), timeout=1.0)
|
||||
c.close()
|
||||
self.port_open = True
|
||||
except Exception as e:
|
||||
continue
|
||||
|
@ -5,12 +5,11 @@ import tempfile
|
||||
import unittest
|
||||
import functools
|
||||
|
||||
import supybot.utils
|
||||
|
||||
from . import runner
|
||||
from . import client_mock
|
||||
from .irc_utils import capabilities
|
||||
from .irc_utils import message_parser
|
||||
from .irc_utils.junkdrawer import normalizeWhitespace
|
||||
from .irc_utils.sasl import sasl_plain_blob
|
||||
from .exceptions import ConnectionClosed
|
||||
from .specifications import Specifications
|
||||
@ -23,7 +22,7 @@ class _IrcTestCase(unittest.TestCase):
|
||||
method_doc = self._testMethodDoc
|
||||
if not method_doc:
|
||||
return ''
|
||||
return '\t'+supybot.utils.str.normalizeWhitespace(
|
||||
return '\t'+normalizeWhitespace(
|
||||
method_doc,
|
||||
removeNewline=False,
|
||||
).strip().replace('\n ', '\n\t')
|
||||
|
@ -1,22 +1,13 @@
|
||||
import ecdsa
|
||||
import base64
|
||||
import pyxmpp2_scram as scram
|
||||
|
||||
# TODO: figure this out if we ever implement SCRAM
|
||||
#import pyxmpp2_scram as scram
|
||||
scram = None
|
||||
|
||||
from irctest import cases
|
||||
from irctest import authentication
|
||||
from irctest.irc_utils.message_parser import Message
|
||||
|
||||
ECDSA_KEY = """
|
||||
-----BEGIN EC PARAMETERS-----
|
||||
BggqhkjOPQMBBw==
|
||||
-----END EC PARAMETERS-----
|
||||
-----BEGIN EC PRIVATE KEY-----
|
||||
MHcCAQEEIIJueQ3W2IrGbe9wKdOI75yGS7PYZSj6W4tg854hlsvmoAoGCCqGSM49
|
||||
AwEHoUQDQgAEAZmaVhNSMmV5r8FXPvKuMnqDKyIA9pDHN5TNMfiF3mMeikGgK10W
|
||||
IRX9cyi2wdYg9mUUYyh9GKdBCYHGUJAiCA==
|
||||
-----END EC PRIVATE KEY-----
|
||||
"""
|
||||
|
||||
class SaslTestCase(cases.BaseClientTestCase, cases.ClientNegociationHelper,
|
||||
cases.OptionalityHelper):
|
||||
@cases.OptionalityHelper.skipUnlessHasMechanism('PLAIN')
|
||||
@ -125,36 +116,6 @@ class SaslTestCase(cases.BaseClientTestCase, cases.ClientNegociationHelper,
|
||||
m = self.negotiateCapabilities(['sasl'], False)
|
||||
self.assertEqual(m, Message([], None, 'CAP', ['END']))
|
||||
|
||||
@cases.OptionalityHelper.skipUnlessHasMechanism('ECDSA-NIST256P-CHALLENGE')
|
||||
def testEcdsa(self):
|
||||
"""Test ECDSA authentication.
|
||||
"""
|
||||
auth = authentication.Authentication(
|
||||
mechanisms=[authentication.Mechanisms.ecdsa_nist256p_challenge],
|
||||
username='jilles',
|
||||
ecdsa_key=ECDSA_KEY,
|
||||
)
|
||||
m = self.negotiateCapabilities(['sasl'], auth=auth)
|
||||
self.assertEqual(m, Message([], None, 'AUTHENTICATE', ['ECDSA-NIST256P-CHALLENGE']))
|
||||
self.sendLine('AUTHENTICATE +')
|
||||
m = self.getMessage()
|
||||
self.assertEqual(m, Message([], None, 'AUTHENTICATE',
|
||||
['amlsbGVz'])) # jilles
|
||||
self.sendLine('AUTHENTICATE Zm9vYmFy') # foobar
|
||||
m = self.getMessage()
|
||||
self.assertMessageEqual(m, command='AUTHENTICATE')
|
||||
sk = ecdsa.SigningKey.from_pem(ECDSA_KEY)
|
||||
vk = sk.get_verifying_key()
|
||||
signature = base64.b64decode(m.params[0])
|
||||
try:
|
||||
vk.verify(signature, b'foobar')
|
||||
except ecdsa.BadSignatureError:
|
||||
raise AssertionError('Bad signature')
|
||||
self.sendLine('900 * * foo :You are now logged in.')
|
||||
self.sendLine('903 * :SASL authentication successful')
|
||||
m = self.negotiateCapabilities(['sasl'], False)
|
||||
self.assertEqual(m, Message([], None, 'CAP', ['END']))
|
||||
|
||||
@cases.OptionalityHelper.skipUnlessHasMechanism('SCRAM-SHA-256')
|
||||
def testScram(self):
|
||||
"""Test SCRAM-SHA-256 authentication.
|
||||
|
@ -1,4 +1,5 @@
|
||||
import datetime
|
||||
import re
|
||||
from collections import namedtuple
|
||||
|
||||
HistoryMessage = namedtuple('HistoryMessage', ['time', 'msgid', 'target', 'text'])
|
||||
@ -11,3 +12,38 @@ IRCV3_FORMAT_STRFTIME = "%Y-%m-%dT%H:%M:%S.%f%z"
|
||||
|
||||
def ircv3_timestamp_to_unixtime(timestamp):
|
||||
return datetime.datetime.strptime(timestamp, IRCV3_FORMAT_STRFTIME).timestamp()
|
||||
|
||||
"""
|
||||
Stolen from supybot:
|
||||
"""
|
||||
|
||||
class MultipleReplacer:
|
||||
"""Return a callable that replaces all dict keys by the associated
|
||||
value. More efficient than multiple .replace()."""
|
||||
|
||||
# We use an object instead of a lambda function because it avoids the
|
||||
# need for using the staticmethod() on the lambda function if assigning
|
||||
# it to a class in Python 3.
|
||||
def __init__(self, dict_):
|
||||
self._dict = dict_
|
||||
dict_ = dict([(re.escape(key), val) for key,val in dict_.items()])
|
||||
self._matcher = re.compile('|'.join(dict_.keys()))
|
||||
def __call__(self, s):
|
||||
return self._matcher.sub(lambda m: self._dict[m.group(0)], s)
|
||||
|
||||
def normalizeWhitespace(s, removeNewline=True):
|
||||
r"""Normalizes the whitespace in a string; \s+ becomes one space."""
|
||||
if not s:
|
||||
return str(s) # not the same reference
|
||||
starts_with_space = (s[0] in ' \n\t\r')
|
||||
ends_with_space = (s[-1] in ' \n\t\r')
|
||||
if removeNewline:
|
||||
newline_re = re.compile('[\r\n]+')
|
||||
s = ' '.join(filter(bool, newline_re.split(s)))
|
||||
s = ' '.join(filter(bool, s.split('\t')))
|
||||
s = ' '.join(filter(bool, s.split(' ')))
|
||||
if starts_with_space:
|
||||
s = ' ' + s
|
||||
if ends_with_space:
|
||||
s += ' '
|
||||
retur
|
||||
|
@ -1,6 +1,7 @@
|
||||
import re
|
||||
import collections
|
||||
import supybot.utils
|
||||
|
||||
from .junkdrawer import MultipleReplacer
|
||||
|
||||
# http://ircv3.net/specs/core/message-tags-3.2.html#escaping-values
|
||||
TAG_ESCAPE = [
|
||||
@ -10,7 +11,7 @@ TAG_ESCAPE = [
|
||||
('\r', r'\r'),
|
||||
('\n', r'\n'),
|
||||
]
|
||||
unescape_tag_value = supybot.utils.str.MultipleReplacer(
|
||||
unescape_tag_value = MultipleReplacer(
|
||||
dict(map(lambda x:(x[1],x[0]), TAG_ESCAPE)))
|
||||
|
||||
# TODO: validate host
|
||||
|
@ -1,4 +0,0 @@
|
||||
limnoria > 2012.08.04 # Needs MultipleReplacer, from 1a64f105
|
||||
psutil >= 3.1.0 # Fixes #640
|
||||
ecdsa
|
||||
pyxmpp2_scram
|
Reference in New Issue
Block a user