Page MenuHomeSoftware Heritage

D8634.id31311.diff
No OneTemporary

D8634.id31311.diff

diff --git a/conf/web.yml b/conf/web.yml
--- a/conf/web.yml
+++ b/conf/web.yml
@@ -20,7 +20,7 @@
url: http://vault:5005/
allowed_hosts:
- - appserver
+ - "*"
debug: yes
diff --git a/env/tests.env.template b/env/tests.env.template
--- a/env/tests.env.template
+++ b/env/tests.env.template
@@ -3,3 +3,4 @@
SWH_MIRROR_TEST_KAFKA_PASSWORD=SOME_INTERESTING_SECRET
SWH_MIRROR_TEST_KAFKA_BROKER=broker1.journal.staging.swh.network:9093
SWH_MIRROR_TEST_OBJSTORAGE_URL=https://swh-prod:SOME_INTERESTING_SECRET@objstorage.softwareheritage.org/
+SWH_MIRROR_TEST_API_URL=http://127.0.0.1:5081/api/1
\ No newline at end of file
diff --git a/images/build_images.sh b/images/build_images.sh
--- a/images/build_images.sh
+++ b/images/build_images.sh
@@ -5,6 +5,7 @@
builddatetime="${builddate}-${buildtime}"
username=$(docker info | grep Username | awk '{print $2}')
+options=$(getopt -l "write-env-file:" -o "" -- "$@") || exit 1
for img in base web replayer test; do
docker build \
@@ -25,5 +26,20 @@
#docker tag softwareheritage:base-${builddate} softwareheritage:latest
+eval set -- "$options"
+while true; do
+ case "$1" in
+ --write-env-file)
+ shift
+ echo "SWH_IMAGE_TAG=${builddatetime}" > "$1"
+ ;;
+ --)
+ shift
+ break
+ ;;
+ esac
+ shift
+done
+
echo "Done creating images. You may want to use"
echo "export SWH_IMAGE_TAG=${builddatetime}"
diff --git a/mirror.yml b/mirror.yml
--- a/mirror.yml
+++ b/mirror.yml
@@ -254,8 +254,6 @@
# going above the number of partitions on the kafka cluster (so the 64
# and 254 upper limits depending on the execution environment).
replicas: 0
- restart_policy:
- condition: "none"
networks:
- swhtest-mirror
env_file:
diff --git a/pyproject.toml b/pyproject.toml
--- a/pyproject.toml
+++ b/pyproject.toml
@@ -16,3 +16,8 @@
log_cli_level = "INFO"
log_cli_format = "%(asctime)s [%(levelname)8s] %(message)s (%(filename)s:%(lineno)s)"
log_cli_date_format = "%Y-%m-%d %H:%M:%S"
+timeout = 1871
+timeout_method = "signal"
+
+[build-system]
+requires = ["setuptools", "wheel"]
diff --git a/requirements-test.txt b/requirements-test.txt
--- a/requirements-test.txt
+++ b/requirements-test.txt
@@ -1,5 +1,7 @@
pytest
-pytest-testinfra
pytest-dotenv
+pytest-timeout
requests
msgpack
+confluent-kafka
+python-on-whales
diff --git a/tests/conftest.py b/tests/conftest.py
--- a/tests/conftest.py
+++ b/tests/conftest.py
@@ -7,14 +7,12 @@
from os import chdir, environ
from pathlib import Path
from shutil import copy, copytree
-import time
from uuid import uuid4
import pytest
-import testinfra
+from python_on_whales import DockerClient, DockerException
APIURL = "http://127.0.0.1:5080/api/1/"
-SWH_IMAGE_TAG = environ["SWH_IMAGE_TAG"]
SRC_PATH = Path(__file__).resolve().parent.parent
KAFKA_USERNAME = environ["SWH_MIRROR_TEST_KAFKA_USERNAME"]
@@ -22,6 +20,7 @@
KAFKA_BROKER = environ["SWH_MIRROR_TEST_KAFKA_BROKER"]
KAFKA_GROUPID = f"{KAFKA_USERNAME}-{uuid4()}"
OBJSTORAGE_URL = environ["SWH_MIRROR_TEST_OBJSTORAGE_URL"]
+API_URL = environ["SWH_MIRROR_TEST_API_URL"]
WFI_TIMEOUT = 60
LOGGER = logging.getLogger(__name__)
@@ -34,17 +33,18 @@
@pytest.fixture(scope="session")
-def docker_host():
- return testinfra.get_host("local://")
+def docker_client():
+ return DockerClient()
# scope='session' so we use the same container for all the tests;
@pytest.fixture(scope="session")
-def mirror_stack(request, docker_host, tmp_path_factory):
+def mirror_stack(request, docker_client, tmp_path_factory):
tmp_path = tmp_path_factory.mktemp("mirror")
copytree(SRC_PATH / "conf", tmp_path / "conf")
copytree(SRC_PATH / "env", tmp_path / "env")
copy(SRC_PATH / "mirror.yml", tmp_path)
+ Path(tmp_path / "secret").write_bytes(b"not-so-secret\n")
cwd = Path.cwd()
chdir(tmp_path)
# copy test-specific conf files
@@ -63,59 +63,57 @@
stack_name = f"swhtest_{tmp_path.name}"
LOGGER.info("Create missing secrets")
- existing_secrets = [
- line.strip()
- for line in docker_host.check_output(
- "docker secret ls --format '{{.Name}}'"
- ).splitlines()
- ]
for srv in ("storage", "web", "vault", "scheduler"):
- secret = f"swh-mirror-{srv}-db-password"
- if secret not in existing_secrets:
- LOGGER.info("Creating secret %s", secret)
- docker_host.check_output(
- f"echo not-so-secret | docker secret create {secret} -"
- )
+ secret_name = f"swh-mirror-{srv}-db-password"
+ try:
+ docker_client.secret.create(secret_name, tmp_path / "secret")
+ LOGGER.info("Created secret %s", secret_name)
+ except DockerException as e:
+ if "code = AlreadyExists" not in e.stderr:
+ raise
+
LOGGER.info("Remove config objects (if any)")
- existing_configs = [
- line.strip()
- for line in docker_host.check_output(
- "docker config ls --format '{{.Name}}'"
- ).splitlines()
- ]
- for cfg in existing_configs:
- if cfg.startswith(f"{stack_name}_"):
- docker_host.check_output(f"docker config rm {cfg}")
-
- LOGGER.info("Deploy docker stack %s", stack_name)
- docker_host.check_output(f"docker stack deploy -c mirror.yml {stack_name}")
-
- yield stack_name
-
- # breakpoint()
+ existing_configs = docker_client.config.list(
+ filters={"label=com.docker.stack.namespace": stack_name}
+ )
+ for config in existing_configs:
+ config.remove()
+
+ image_tag = environ.get("SWH_IMAGE_TAG", None)
+ assert image_tag and image_tag != "latest", (
+ "SWH_IMAGE_TAG needs to be set to a build tag "
+ "to avoid any incompatibilities in the stack"
+ )
+
+ LOGGER.info("Deploy docker stack %s with SWH_IMAGE_TAG %s", stack_name, image_tag)
+ docker_stack = docker_client.stack.deploy(stack_name, "mirror.yml")
+
+ yield docker_stack
+
if not request.config.getoption("keep_stack"):
LOGGER.info("Remove stack %s", stack_name)
- docker_host.check_output(f"docker stack rm {stack_name}")
- # wait for services to be down
- LOGGER.info("Wait for all services of %s to be down", stack_name)
- while docker_host.check_output(
- "docker service ls --format {{.Name}} "
- f"--filter label=com.docker.stack.namespace={stack_name}"
- ):
- time.sleep(0.2)
-
- # give a bit of time to docker to sync the state of service<->volumes
- # relations so the next step runs ok
- time.sleep(20)
- LOGGER.info("Remove volumes of stack %s", stack_name)
- for volume in docker_host.check_output(
- "docker volume ls --format {{.Name}} "
- f"--filter label=com.docker.stack.namespace={stack_name}"
- ).splitlines():
+ docker_stack.remove()
+ stack_containers = docker_client.container.list(
+ filters={"label=com.docker.stack.namespace": stack_name}
+ )
+
+ try:
+ LOGGER.info("Waiting for all containers of %s to be down", stack_name)
+ docker_client.container.wait(stack_containers)
+ except DockerException as e:
+ # We have a TOCTOU issue, so skip the error if some containers have already
+ # been stopped by the time we wait for them.
+ if "No such container" not in e.stderr:
+ raise
+ LOGGER.info("Remove volumes of stack %s", stack_name)
+ stack_volumes = docker_client.volume.list(
+ filters={"label=com.docker.stack.namespace": stack_name}
+ )
+ for volume in stack_volumes:
try:
- docker_host.check_output(f"docker volume rm {volume}")
- except AssertionError:
- LOGGER.error("Failed to remove volume %s", volume)
+ volume.remove()
+ except DockerException:
+ LOGGER.exception("Failed to remove volume %s", volume)
chdir(cwd)
diff --git a/tests/test_graph_replayer.py b/tests/test_graph_replayer.py
--- a/tests/test_graph_replayer.py
+++ b/tests/test_graph_replayer.py
@@ -8,86 +8,104 @@
import re
import tarfile
import time
+from typing import Dict
from urllib.parse import quote
from confluent_kafka import Consumer, KafkaException
import msgpack
+from python_on_whales import DockerException
import requests
-from .conftest import KAFKA_GROUPID, KAFKA_PASSWORD, KAFKA_USERNAME, LOGGER
+from .conftest import API_URL, KAFKA_GROUPID, KAFKA_PASSWORD, KAFKA_USERNAME, LOGGER
-SERVICES = {
+INITIAL_SERVICES_STATUS = {
+ "{}_amqp": "1/1",
"{}_content-replayer": "0/0",
- "{}_db-storage": "1/1",
- "{}_db-web": "1/1",
- "{}_grafana": "1/1",
"{}_graph-replayer": "0/0",
"{}_memcache": "1/1",
+ "{}_mailhog": "1/1",
"{}_nginx": "1/1",
"{}_objstorage": "1/1",
- "{}_prometheus": "1/1",
- "{}_prometheus-statsd-exporter": "1/1",
"{}_redis": "1/1",
"{}_storage": "1/1",
+ "{}_storage-db": "1/1",
"{}_web": "1/1",
- "{}_db-vault": "1/1",
+ "{}_web-db": "1/1",
"{}_vault": "1/1",
+ "{}_vault-db": "1/1",
"{}_vault-worker": "1/1",
"{}_scheduler": "1/1",
+ "{}_scheduler-db": "1/1",
"{}_scheduler-listener": "1/1",
"{}_scheduler-runner": "1/1",
}
-ATTEMPTS = 600
-DELAY = 1
SCALE = 2
-API_URL = "http://127.0.0.1:5081/api/1"
-
-
-def running_services(host, stack):
- all_services = host.check_output(
- "docker service ls --format '{{.Name}} {{.Replicas}}'"
- )
- return dict(
- line.split()[:2]
- for line in all_services.splitlines()
- if line.startswith(f"{stack}_")
- )
-
-
-def check_running_services(host, stack, services):
- LOGGER.info("Waiting for services %s", services)
- mirror_services_ = {}
- for i in range(ATTEMPTS):
- mirror_services = running_services(host, stack)
- mirror_services = {k: v for k, v in mirror_services.items() if k in services}
- if mirror_services == services:
- LOGGER.info("Got them all!")
- break
- if mirror_services != mirror_services_:
- LOGGER.info("Not yet there %s", mirror_services)
- mirror_services_ = mirror_services
- time.sleep(0.5)
- return mirror_services == services
-def get_logs(host, service):
- rows = host.check_output(f"docker service logs -t {service}").splitlines()
- reg = re.compile(
- rf"^(?P<timestamp>.+) {service}[.]"
- r"(?P<num>\d+)[.](?P<id>\w+)@(?P<host>\w+) +[|] "
- r"(?P<logline>.+)$"
- )
- return [m.groupdict() for m in (reg.match(row) for row in rows) if m is not None]
+def service_target_replicas(service):
+ if "Replicated" in service.spec.mode:
+ return service.spec.mode["Replicated"]["Replicas"]
+ elif "Global" in service.spec.mode:
+ return 1
+ else:
+ raise ValueError(f"Unknown mode {service.spec.mode}")
-def wait_for_log_entry(host, service, logline, occurrences=1):
- for i in range(ATTEMPTS):
- logs = get_logs(host, service)
- match = [entry for entry in logs if logline in entry["logline"]]
- if match and len(match) >= occurrences:
- return match
- time.sleep(DELAY)
- return []
+def is_task_running(task):
+ try:
+ return task.status.state == "running"
+ except DockerException as e:
+ # A task might already have disappeared before we can get its status.
+ # In that case, we know for sure it’s not running.
+ if "No such object" in e.stderr:
+ return False
+ else:
+ raise
+
+
+def wait_services_status(stack, target_status: Dict[str, int]):
+ LOGGER.info("Waiting for services %s", target_status)
+ last_changed_status = {}
+ while True:
+ services = [
+ service
+ for service in stack.services()
+ if service.spec.name in target_status
+ ]
+ status = {
+ service.spec.name: "%s/%s"
+ % (
+ len([True for task in service.ps() if is_task_running(task)]),
+ service_target_replicas(service),
+ )
+ for service in services
+ }
+ if status == target_status:
+ LOGGER.info("Got them all!")
+ break
+ if status != last_changed_status:
+ LOGGER.info("Not yet there %s", status)
+ last_changed_status = status
+ time.sleep(1)
+ return status == target_status
+
+
+def wait_for_log_entry(docker_client, service, log_entry, occurrences=1):
+ count = 0
+ for stream_type, stream_content in docker_client.service.logs(
+ service, follow=True, stream=True
+ ):
+ LOGGER.debug("%s output: %s", service.spec.name, stream_content)
+ if stream_type != "stdout":
+ continue
+ count += len(
+ re.findall(
+ re.escape(log_entry.encode("us-ascii", errors="replace")),
+ stream_content,
+ )
+ )
+ if count >= occurrences:
+ break
def content_get(url, done):
@@ -316,40 +334,43 @@
return stats
-def test_mirror(host, mirror_stack):
- services = {k.format(mirror_stack): v for k, v in SERVICES.items()}
- check_running_services(host, mirror_stack, services)
+def test_mirror(docker_client, mirror_stack):
+ initial_services_status = {
+ k.format(mirror_stack.name): v for k, v in INITIAL_SERVICES_STATUS.items()
+ }
+ wait_services_status(mirror_stack, initial_services_status)
# run replayer services
for service_type in ("content", "graph"):
- service = f"{mirror_stack}_{service_type}-replayer"
- LOGGER.info("Scale %s to 1", service)
- host.check_output(f"docker service scale -d {service}=1")
- if not check_running_services(host, mirror_stack, {service: "1/1"}):
- breakpoint()
- logs = wait_for_log_entry(
- host, service, f"Starting the SWH mirror {service_type} replayer"
+ service = docker_client.service.inspect(
+ f"{mirror_stack}_{service_type}-replayer"
+ )
+ LOGGER.info("Scale %s to 1", service.spec.name)
+ service.scale(1)
+ wait_services_status(mirror_stack, {service.spec.name: "1/1"})
+ wait_for_log_entry(
+ docker_client,
+ service,
+ f"Starting the SWH mirror {service_type} replayer",
+ 1,
)
- assert len(logs) == 1
- LOGGER.info("Scale %s to %d", service, SCALE)
- host.check_output(f"docker service scale -d {service}={SCALE}")
- check_running_services(host, mirror_stack, {service: f"{SCALE}/{SCALE}"})
- logs = wait_for_log_entry(
- host, service, f"Starting the SWH mirror {service_type} replayer", SCALE
+ LOGGER.info("Scale %s to %d", service.spec.name, SCALE)
+ service.scale(SCALE)
+ wait_for_log_entry(
+ docker_client,
+ service,
+ f"Starting the SWH mirror {service_type} replayer",
+ SCALE,
)
- assert len(logs) == SCALE
# wait for the replaying to be done (stop_on_oef is true)
- LOGGER.info("Wait for %s to be done", service)
- logs = wait_for_log_entry(host, service, "Done.", SCALE)
- # >= SCALE below because replayer services may have been restarted
- # (once done) before we scale them to 0
- if not (len(logs) >= SCALE):
- breakpoint()
- assert len(logs) >= SCALE
- LOGGER.info("Scale %s to 0", service)
- check_running_services(host, mirror_stack, {service: f"0/{SCALE}"})
+ LOGGER.info("Wait for %s to be done", service.spec.name)
+ wait_for_log_entry(docker_client, service, "Done.", SCALE)
+
+ LOGGER.info("Scale %s to 0", service.spec.name)
+ service.scale(0)
+ wait_services_status(mirror_stack, {service.spec.name: "0/0"})
# TODO: check there are no error reported in redis after the replayers are done

File Metadata

Mime Type
text/plain
Expires
Wed, Jul 2, 11:12 AM (1 w, 2 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3223486

Event Timeline