irctest/irctest/controllers/sable.py

498 lines
15 KiB
Python

import os
from pathlib import Path
import shutil
import signal
import subprocess
import tempfile
import time
from typing import Optional, Type
from irctest.basecontrollers import (
BaseServerController,
BaseServicesController,
DirectoryBasedController,
NotImplementedByController,
)
from irctest.cases import BaseServerTestCase
from irctest.exceptions import NoMessageException
from irctest.patma import ANYSTR
GEN_CERTS = """
mkdir -p useless_openssl_data/
cat > openssl.cnf <<EOF
[ ca ]
default_ca = CA_default # The default ca section
[ CA_default ]
new_certs_dir = useless_openssl_data/
database = useless_openssl_data/db
policy = policy_anything
serial = useless_openssl_data/serial
copy_extensions = copy
email_in_dn = no
rand_serial = no
[ policy_anything ]
countryName = optional
stateOrProvinceName = optional
localityName = optional
organizationName = optional
organizationalUnitName = optional
commonName = supplied
emailAddress = optional
[ usr_cert ]
subjectAltName=subject:copy
EOF
rm -f useless_openssl_data/db
touch useless_openssl_data/db
echo 01 > useless_openssl_data/serial
# Generate CA
openssl req -x509 -nodes -newkey rsa:2048 -batch \
-subj "/CN=Test CA" \
-outform PEM -out ca_cert.pem \
-keyout ca_cert.key
for server in $*; do
openssl genrsa -traditional \
-out $server.key \
2048
openssl req -nodes -batch -new \
-addext "subjectAltName = DNS:$server" \
-key $server.key \
-outform PEM -out server_$server.req
openssl ca -config openssl.cnf -days 3650 -md sha512 -batch \
-subj /CN=$server \
-keyfile ca_cert.key -cert ca_cert.pem \
-in server_$server.req \
-out $server.pem
openssl x509 -sha1 -in $server.pem -fingerprint -noout \
| sed "s/.*=//" | sed "s/://g" | tr '[:upper:]' '[:lower:]' > $server.pem.sha1
done
rm -r useless_openssl_data/
"""
_certs_dir = None
def certs_dir() -> Path:
global _certs_dir
if _certs_dir is None:
certs_dir = tempfile.TemporaryDirectory()
(Path(certs_dir.name) / "gen_certs.sh").write_text(GEN_CERTS)
subprocess.run(
["bash", "gen_certs.sh", "My.Little.Server", "My.Little.Services"],
cwd=certs_dir.name,
check=True,
)
_certs_dir = certs_dir
return Path(_certs_dir.name)
NETWORK_CONFIG = """
{
"fanout": 1,
"ca_file": "%(certs_dir)s/ca_cert.pem",
"peers": [
{ "name": "My.Little.Services", "address": "%(services_hostname)s:%(services_port)s", "fingerprint": "%(services_cert_sha1)s" },
{ "name": "My.Little.Server", "address": "%(server1_hostname)s:%(server1_port)s", "fingerprint": "%(server1_cert_sha1)s" }
]
}
"""
NETWORK_CONFIG_CONFIG = """
{
"opers": [
{
"name": "operuser",
// echo -n "operpassword" | openssl passwd -6 -stdin
"hash": "$6$z5yA.OfGliDoi/R2$BgSsguS6bxAsPSCygDisgDw5JZuo5.88eU3Hyc7/4OaNpeKIxWGjOggeHzOl0xLiZg1vfwxXjOTFN14wG5vNI."
}
],
"alias_users": [
%(services_alias_users)s
],
"default_roles": {
"builtin:op": [
"always_send",
"op_self", "op_grant", "voice_self", "voice_grant",
"receive_op", "receive_voice", "receive_opmod",
"topic", "kick", "set_simple_mode", "set_key",
"rename",
"ban_view", "ban_add", "ban_remove_any",
"quiet_view", "quiet_add", "quiet_remove_any",
"exempt_view", "exempt_add", "exempt_remove_any",
"invite_self", "invite_other",
"invex_view", "invex_add", "invex_remove_any"
],
"builtin:voice": [
"always_send",
"voice_self",
"receive_voice",
"ban_view", "quiet_view"
],
"builtin:all": [
"ban_view", "quiet_view"
]
},
"debug_mode": true
}
"""
SERVICES_ALIAS_USERS = """
{
"nick": "ChanServ",
"user": "ChanServ",
"host": "services.",
"realname": "Channel services compatibility layer",
"command_alias": "CS"
},
{
"nick": "NickServ",
"user": "NickServ",
"host": "services.",
"realname": "Account services compatibility layer",
"command_alias": "NS"
}
"""
SERVER_CONFIG = """
{
"server_id": 1,
"server_name": "My.Little.Server",
"management": {
"address": "%(server1_management_hostname)s:%(server1_management_port)s",
"client_ca": "%(certs_dir)s/ca_cert.pem",
"authorised_fingerprints": [
{ "name": "user1", "fingerprint": "435bc6db9f22e84ba5d9652432154617c9509370" },
],
},
"server": {
"listeners": [
{ "address": "%(c2s_hostname)s:%(c2s_port)s" },
],
},
"event_log": {
"event_expiry": 300, // five minutes, for local testing
},
"tls_config": {
"key_file": "%(certs_dir)s/My.Little.Server.key",
"cert_file": "%(certs_dir)s/My.Little.Server.pem",
},
"node_config": {
"listen_addr": "%(server1_hostname)s:%(server1_port)s",
"cert_file": "%(certs_dir)s/My.Little.Server.pem",
"key_file": "%(certs_dir)s/My.Little.Server.key",
},
"log": {
"dir": "log/server1/",
"module-levels": {
"": "debug",
"sable_ircd": "trace",
},
"targets": [
{
"target": "stdout",
"level": "trace",
"modules": [ "sable", "audit", "client_listener" ],
},
],
},
}
"""
SERVICES_CONFIG = """
{
"server_id": 99,
"server_name": "My.Little.Services",
"management": {
"address": "%(services_management_hostname)s:%(services_management_port)s",
"client_ca": "%(certs_dir)s/ca_cert.pem",
"authorised_fingerprints": [
{ "name": "user1", "fingerprint": "435bc6db9f22e84ba5d9652432154617c9509370" }
]
},
"server": {
"database": "test_database.json",
"default_roles": {
"builtin:founder": [
"founder", "access_view", "access_edit", "role_view", "role_edit",
"op_self", "op_grant",
"voice_self", "voice_grant",
"always_send",
"invite_self", "invite_other",
"receive_op", "receive_voice", "receive_opmod",
"topic", "kick", "set_simple_mode", "set_key",
"rename",
"ban_view", "ban_add", "ban_remove_any",
"quiet_view", "quiet_add", "quiet_remove_any",
"exempt_view", "exempt_add", "exempt_remove_any",
"invex_view", "invex_add", "invex_remove_any"
],
"builtin:op": [
"always_send",
"receive_op", "receive_voice", "receive_opmod",
"topic", "kick", "set_simple_mode", "set_key",
"rename",
"ban_view", "ban_add", "ban_remove_any",
"quiet_view", "quiet_add", "quiet_remove_any",
"exempt_view", "exempt_add", "exempt_remove_any",
"invex_view", "invex_add", "invex_remove_any"
],
"builtin:voice": [
"always_send", "voice_self", "receive_voice"
]
},
"password_hash": {
"algorithm": "bcrypt", // Only "bcrypt" is supported for now
"cost": 4, // Exponentially faster than the default 12
},
},
"event_log": {
"event_expiry": 300, // five minutes, for local testing
},
"tls_config": {
"key_file": "%(certs_dir)s/My.Little.Services.key",
"cert_file": "%(certs_dir)s/My.Little.Services.pem"
},
"node_config": {
"listen_addr": "%(services_hostname)s:%(services_port)s",
"cert_file": "%(certs_dir)s/My.Little.Services.pem",
"key_file": "%(certs_dir)s/My.Little.Services.key"
},
"log": {
"dir": "log/services/",
"module-levels": {
"": "debug"
},
"targets": [
{
"target": "stdout",
"level": "debug",
"modules": [ "sable_services" ]
}
]
}
}
"""
class SableController(BaseServerController, DirectoryBasedController):
software_name = "Sable"
supported_sasl_mechanisms = {"PLAIN"}
sync_sleep_time = 0.1
"""Sable processes commands very quickly, but responses for commands changing the
state may be sent after later commands for messages which don't."""
def run(
self,
hostname: str,
port: int,
*,
password: Optional[str],
ssl: bool,
run_services: bool,
faketime: Optional[str],
) -> None:
if password is not None:
raise NotImplementedByController("PASS command")
if ssl:
raise NotImplementedByController("SSL")
if self.test_config.account_registration_before_connect:
raise NotImplementedByController("account-registration with before-connect")
if self.test_config.account_registration_requires_email:
raise NotImplementedByController("account-registration with email-required")
assert self.proc is None
self.port = port
self.create_config()
assert self.directory
(self.directory / "configs").mkdir()
c2s_hostname = hostname
c2s_port = port
del hostname, port
# base controller expects this to check for NickServ presence itself
self.hostname = c2s_hostname
self.port = c2s_port
(server1_hostname, server1_port) = self.get_hostname_and_port()
(services_hostname, services_port) = self.get_hostname_and_port()
# Sable requires inbound connections to match the configured hostname,
# so we can't configure 0.0.0.0
server1_hostname = services_hostname = "127.0.0.1"
(
server1_management_hostname,
server1_management_port,
) = self.get_hostname_and_port()
(
services_management_hostname,
services_management_port,
) = self.get_hostname_and_port()
self.template_vars = dict(
certs_dir=certs_dir(),
c2s_hostname=c2s_hostname,
c2s_port=c2s_port,
server1_hostname=server1_hostname,
server1_port=server1_port,
server1_cert_sha1=(certs_dir() / "My.Little.Server.pem.sha1")
.read_text()
.strip(),
server1_management_hostname=server1_management_hostname,
server1_management_port=server1_management_port,
services_hostname=services_hostname,
services_port=services_port,
services_cert_sha1=(certs_dir() / "My.Little.Services.pem.sha1")
.read_text()
.strip(),
services_management_hostname=services_management_hostname,
services_management_port=services_management_port,
services_alias_users=SERVICES_ALIAS_USERS if run_services else "",
)
with self.open_file("configs/network.conf") as fd:
fd.write(NETWORK_CONFIG % self.template_vars)
with self.open_file("configs/network_config.conf") as fd:
fd.write(NETWORK_CONFIG_CONFIG % self.template_vars)
with self.open_file("configs/server1.conf") as fd:
fd.write(SERVER_CONFIG % self.template_vars)
if faketime and shutil.which("faketime"):
faketime_cmd = ["faketime", "-f", faketime]
self.faketime_enabled = True
else:
faketime_cmd = []
self.proc = subprocess.Popen(
[
*faketime_cmd,
"sable_ircd",
"--foreground",
"--server-conf",
self.directory / "configs/server1.conf",
"--network-conf",
self.directory / "configs/network.conf",
"--bootstrap-network",
self.directory / "configs/network_config.conf",
],
cwd=self.directory,
preexec_fn=os.setsid,
)
self.pgroup_id = os.getpgid(self.proc.pid)
if run_services:
self.services_controller = SableServicesController(self.test_config, self)
self.services_controller.run(
protocol="sable",
server_hostname=services_hostname,
server_port=services_port,
)
def kill_proc(self) -> None:
os.killpg(self.pgroup_id, signal.SIGKILL)
super().kill_proc()
def registerUser(
self,
case: BaseServerTestCase, # type: ignore
username: str,
password: Optional[str] = None,
) -> None:
# XXX: Move this somewhere else when
# https://github.com/ircv3/ircv3-specifications/pull/152 becomes
# part of the specification
if not case.run_services:
raise ValueError(
"Attempted to register a nick, but `run_services` it not True."
)
assert password
client = case.addClient(show_io=True)
case.sendLine(client, "NICK " + username)
case.sendLine(client, "USER r e g :user")
while case.getRegistrationMessage(client).command != "001":
pass
case.getMessages(client)
case.sendLine(
client,
f"REGISTER * * {password}",
)
for _ in range(100):
time.sleep(0.1)
try:
msg = case.getMessage(client)
except NoMessageException:
continue
case.assertMessageMatch(
msg, command="REGISTER", params=["SUCCESS", username, ANYSTR]
)
break
else:
raise NoMessageException()
case.sendLine(client, "QUIT")
case.assertDisconnected(client)
class SableServicesController(BaseServicesController):
server_controller: SableController
software_name = "Sable Services"
def run(self, protocol: str, server_hostname: str, server_port: int) -> None:
assert protocol == "sable"
assert self.server_controller.directory is not None
with self.server_controller.open_file("configs/services.conf") as fd:
fd.write(SERVICES_CONFIG % self.server_controller.template_vars)
self.proc = subprocess.Popen(
[
"sable_services",
"--foreground",
"--server-conf",
self.server_controller.directory / "configs/services.conf",
"--network-conf",
self.server_controller.directory / "configs/network.conf",
],
cwd=self.server_controller.directory,
preexec_fn=os.setsid,
)
self.pgroup_id = os.getpgid(self.proc.pid)
def kill_proc(self) -> None:
os.killpg(self.pgroup_id, signal.SIGKILL)
super().kill_proc()
def get_irctest_controller_class() -> Type[SableController]:
return SableController