Page Menu
Home
Software Heritage
Search
Configure Global Search
Log In
Files
F9312850
D8634.id31311.diff
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
15 KB
Subscribers
None
D8634.id31311.diff
View Options
diff --git a/conf/web.yml b/conf/web.yml
--- a/conf/web.yml
+++ b/conf/web.yml
@@ -20,7 +20,7 @@
url: http://vault:5005/
allowed_hosts:
- - appserver
+ - "*"
debug: yes
diff --git a/env/tests.env.template b/env/tests.env.template
--- a/env/tests.env.template
+++ b/env/tests.env.template
@@ -3,3 +3,4 @@
SWH_MIRROR_TEST_KAFKA_PASSWORD=SOME_INTERESTING_SECRET
SWH_MIRROR_TEST_KAFKA_BROKER=broker1.journal.staging.swh.network:9093
SWH_MIRROR_TEST_OBJSTORAGE_URL=https://swh-prod:SOME_INTERESTING_SECRET@objstorage.softwareheritage.org/
+SWH_MIRROR_TEST_API_URL=http://127.0.0.1:5081/api/1
\ No newline at end of file
diff --git a/images/build_images.sh b/images/build_images.sh
--- a/images/build_images.sh
+++ b/images/build_images.sh
@@ -5,6 +5,7 @@
builddatetime="${builddate}-${buildtime}"
username=$(docker info | grep Username | awk '{print $2}')
+options=$(getopt -l "write-env-file:" -o "" -- "$@") || exit 1
for img in base web replayer test; do
docker build \
@@ -25,5 +26,20 @@
#docker tag softwareheritage:base-${builddate} softwareheritage:latest
+eval set -- "$options"
+while true; do
+ case "$1" in
+ --write-env-file)
+ shift
+ echo "SWH_IMAGE_TAG=${builddatetime}" > "$1"
+ ;;
+ --)
+ shift
+ break
+ ;;
+ esac
+ shift
+done
+
echo "Done creating images. You may want to use"
echo "export SWH_IMAGE_TAG=${builddatetime}"
diff --git a/mirror.yml b/mirror.yml
--- a/mirror.yml
+++ b/mirror.yml
@@ -254,8 +254,6 @@
# going above the number of partitions on the kafka cluster (so the 64
# and 254 upper limits depending on the execution environment).
replicas: 0
- restart_policy:
- condition: "none"
networks:
- swhtest-mirror
env_file:
diff --git a/pyproject.toml b/pyproject.toml
--- a/pyproject.toml
+++ b/pyproject.toml
@@ -16,3 +16,8 @@
log_cli_level = "INFO"
log_cli_format = "%(asctime)s [%(levelname)8s] %(message)s (%(filename)s:%(lineno)s)"
log_cli_date_format = "%Y-%m-%d %H:%M:%S"
+timeout = 1871
+timeout_method = "signal"
+
+[build-system]
+requires = ["setuptools", "wheel"]
diff --git a/requirements-test.txt b/requirements-test.txt
--- a/requirements-test.txt
+++ b/requirements-test.txt
@@ -1,5 +1,7 @@
pytest
-pytest-testinfra
pytest-dotenv
+pytest-timeout
requests
msgpack
+confluent-kafka
+python-on-whales
diff --git a/tests/conftest.py b/tests/conftest.py
--- a/tests/conftest.py
+++ b/tests/conftest.py
@@ -7,14 +7,12 @@
from os import chdir, environ
from pathlib import Path
from shutil import copy, copytree
-import time
from uuid import uuid4
import pytest
-import testinfra
+from python_on_whales import DockerClient, DockerException
APIURL = "http://127.0.0.1:5080/api/1/"
-SWH_IMAGE_TAG = environ["SWH_IMAGE_TAG"]
SRC_PATH = Path(__file__).resolve().parent.parent
KAFKA_USERNAME = environ["SWH_MIRROR_TEST_KAFKA_USERNAME"]
@@ -22,6 +20,7 @@
KAFKA_BROKER = environ["SWH_MIRROR_TEST_KAFKA_BROKER"]
KAFKA_GROUPID = f"{KAFKA_USERNAME}-{uuid4()}"
OBJSTORAGE_URL = environ["SWH_MIRROR_TEST_OBJSTORAGE_URL"]
+API_URL = environ["SWH_MIRROR_TEST_API_URL"]
WFI_TIMEOUT = 60
LOGGER = logging.getLogger(__name__)
@@ -34,17 +33,18 @@
@pytest.fixture(scope="session")
-def docker_host():
- return testinfra.get_host("local://")
+def docker_client():
+ return DockerClient()
# scope='session' so we use the same container for all the tests;
@pytest.fixture(scope="session")
-def mirror_stack(request, docker_host, tmp_path_factory):
+def mirror_stack(request, docker_client, tmp_path_factory):
tmp_path = tmp_path_factory.mktemp("mirror")
copytree(SRC_PATH / "conf", tmp_path / "conf")
copytree(SRC_PATH / "env", tmp_path / "env")
copy(SRC_PATH / "mirror.yml", tmp_path)
+ Path(tmp_path / "secret").write_bytes(b"not-so-secret\n")
cwd = Path.cwd()
chdir(tmp_path)
# copy test-specific conf files
@@ -63,59 +63,57 @@
stack_name = f"swhtest_{tmp_path.name}"
LOGGER.info("Create missing secrets")
- existing_secrets = [
- line.strip()
- for line in docker_host.check_output(
- "docker secret ls --format '{{.Name}}'"
- ).splitlines()
- ]
for srv in ("storage", "web", "vault", "scheduler"):
- secret = f"swh-mirror-{srv}-db-password"
- if secret not in existing_secrets:
- LOGGER.info("Creating secret %s", secret)
- docker_host.check_output(
- f"echo not-so-secret | docker secret create {secret} -"
- )
+ secret_name = f"swh-mirror-{srv}-db-password"
+ try:
+ docker_client.secret.create(secret_name, tmp_path / "secret")
+ LOGGER.info("Created secret %s", secret_name)
+ except DockerException as e:
+ if "code = AlreadyExists" not in e.stderr:
+ raise
+
LOGGER.info("Remove config objects (if any)")
- existing_configs = [
- line.strip()
- for line in docker_host.check_output(
- "docker config ls --format '{{.Name}}'"
- ).splitlines()
- ]
- for cfg in existing_configs:
- if cfg.startswith(f"{stack_name}_"):
- docker_host.check_output(f"docker config rm {cfg}")
-
- LOGGER.info("Deploy docker stack %s", stack_name)
- docker_host.check_output(f"docker stack deploy -c mirror.yml {stack_name}")
-
- yield stack_name
-
- # breakpoint()
+ existing_configs = docker_client.config.list(
+ filters={"label=com.docker.stack.namespace": stack_name}
+ )
+ for config in existing_configs:
+ config.remove()
+
+ image_tag = environ.get("SWH_IMAGE_TAG", None)
+ assert image_tag and image_tag != "latest", (
+ "SWH_IMAGE_TAG needs to be set to a build tag "
+ "to avoid any incompatibilities in the stack"
+ )
+
+ LOGGER.info("Deploy docker stack %s with SWH_IMAGE_TAG %s", stack_name, image_tag)
+ docker_stack = docker_client.stack.deploy(stack_name, "mirror.yml")
+
+ yield docker_stack
+
if not request.config.getoption("keep_stack"):
LOGGER.info("Remove stack %s", stack_name)
- docker_host.check_output(f"docker stack rm {stack_name}")
- # wait for services to be down
- LOGGER.info("Wait for all services of %s to be down", stack_name)
- while docker_host.check_output(
- "docker service ls --format {{.Name}} "
- f"--filter label=com.docker.stack.namespace={stack_name}"
- ):
- time.sleep(0.2)
-
- # give a bit of time to docker to sync the state of service<->volumes
- # relations so the next step runs ok
- time.sleep(20)
- LOGGER.info("Remove volumes of stack %s", stack_name)
- for volume in docker_host.check_output(
- "docker volume ls --format {{.Name}} "
- f"--filter label=com.docker.stack.namespace={stack_name}"
- ).splitlines():
+ docker_stack.remove()
+ stack_containers = docker_client.container.list(
+ filters={"label=com.docker.stack.namespace": stack_name}
+ )
+
+ try:
+ LOGGER.info("Waiting for all containers of %s to be down", stack_name)
+ docker_client.container.wait(stack_containers)
+ except DockerException as e:
+ # We have a TOCTOU issue, so skip the error if some containers have already
+ # been stopped by the time we wait for them.
+ if "No such container" not in e.stderr:
+ raise
+ LOGGER.info("Remove volumes of stack %s", stack_name)
+ stack_volumes = docker_client.volume.list(
+ filters={"label=com.docker.stack.namespace": stack_name}
+ )
+ for volume in stack_volumes:
try:
- docker_host.check_output(f"docker volume rm {volume}")
- except AssertionError:
- LOGGER.error("Failed to remove volume %s", volume)
+ volume.remove()
+ except DockerException:
+ LOGGER.exception("Failed to remove volume %s", volume)
chdir(cwd)
diff --git a/tests/test_graph_replayer.py b/tests/test_graph_replayer.py
--- a/tests/test_graph_replayer.py
+++ b/tests/test_graph_replayer.py
@@ -8,86 +8,104 @@
import re
import tarfile
import time
+from typing import Dict
from urllib.parse import quote
from confluent_kafka import Consumer, KafkaException
import msgpack
+from python_on_whales import DockerException
import requests
-from .conftest import KAFKA_GROUPID, KAFKA_PASSWORD, KAFKA_USERNAME, LOGGER
+from .conftest import API_URL, KAFKA_GROUPID, KAFKA_PASSWORD, KAFKA_USERNAME, LOGGER
-SERVICES = {
+INITIAL_SERVICES_STATUS = {
+ "{}_amqp": "1/1",
"{}_content-replayer": "0/0",
- "{}_db-storage": "1/1",
- "{}_db-web": "1/1",
- "{}_grafana": "1/1",
"{}_graph-replayer": "0/0",
"{}_memcache": "1/1",
+ "{}_mailhog": "1/1",
"{}_nginx": "1/1",
"{}_objstorage": "1/1",
- "{}_prometheus": "1/1",
- "{}_prometheus-statsd-exporter": "1/1",
"{}_redis": "1/1",
"{}_storage": "1/1",
+ "{}_storage-db": "1/1",
"{}_web": "1/1",
- "{}_db-vault": "1/1",
+ "{}_web-db": "1/1",
"{}_vault": "1/1",
+ "{}_vault-db": "1/1",
"{}_vault-worker": "1/1",
"{}_scheduler": "1/1",
+ "{}_scheduler-db": "1/1",
"{}_scheduler-listener": "1/1",
"{}_scheduler-runner": "1/1",
}
-ATTEMPTS = 600
-DELAY = 1
SCALE = 2
-API_URL = "http://127.0.0.1:5081/api/1"
-
-
-def running_services(host, stack):
- all_services = host.check_output(
- "docker service ls --format '{{.Name}} {{.Replicas}}'"
- )
- return dict(
- line.split()[:2]
- for line in all_services.splitlines()
- if line.startswith(f"{stack}_")
- )
-
-
-def check_running_services(host, stack, services):
- LOGGER.info("Waiting for services %s", services)
- mirror_services_ = {}
- for i in range(ATTEMPTS):
- mirror_services = running_services(host, stack)
- mirror_services = {k: v for k, v in mirror_services.items() if k in services}
- if mirror_services == services:
- LOGGER.info("Got them all!")
- break
- if mirror_services != mirror_services_:
- LOGGER.info("Not yet there %s", mirror_services)
- mirror_services_ = mirror_services
- time.sleep(0.5)
- return mirror_services == services
-def get_logs(host, service):
- rows = host.check_output(f"docker service logs -t {service}").splitlines()
- reg = re.compile(
- rf"^(?P<timestamp>.+) {service}[.]"
- r"(?P<num>\d+)[.](?P<id>\w+)@(?P<host>\w+) +[|] "
- r"(?P<logline>.+)$"
- )
- return [m.groupdict() for m in (reg.match(row) for row in rows) if m is not None]
+def service_target_replicas(service):
+ if "Replicated" in service.spec.mode:
+ return service.spec.mode["Replicated"]["Replicas"]
+ elif "Global" in service.spec.mode:
+ return 1
+ else:
+ raise ValueError(f"Unknown mode {service.spec.mode}")
-def wait_for_log_entry(host, service, logline, occurrences=1):
- for i in range(ATTEMPTS):
- logs = get_logs(host, service)
- match = [entry for entry in logs if logline in entry["logline"]]
- if match and len(match) >= occurrences:
- return match
- time.sleep(DELAY)
- return []
+def is_task_running(task):
+ try:
+ return task.status.state == "running"
+ except DockerException as e:
+ # A task might already have disappeared before we can get its status.
+ # In that case, we know for sure it’s not running.
+ if "No such object" in e.stderr:
+ return False
+ else:
+ raise
+
+
+def wait_services_status(stack, target_status: Dict[str, int]):
+ LOGGER.info("Waiting for services %s", target_status)
+ last_changed_status = {}
+ while True:
+ services = [
+ service
+ for service in stack.services()
+ if service.spec.name in target_status
+ ]
+ status = {
+ service.spec.name: "%s/%s"
+ % (
+ len([True for task in service.ps() if is_task_running(task)]),
+ service_target_replicas(service),
+ )
+ for service in services
+ }
+ if status == target_status:
+ LOGGER.info("Got them all!")
+ break
+ if status != last_changed_status:
+ LOGGER.info("Not yet there %s", status)
+ last_changed_status = status
+ time.sleep(1)
+ return status == target_status
+
+
+def wait_for_log_entry(docker_client, service, log_entry, occurrences=1):
+ count = 0
+ for stream_type, stream_content in docker_client.service.logs(
+ service, follow=True, stream=True
+ ):
+ LOGGER.debug("%s output: %s", service.spec.name, stream_content)
+ if stream_type != "stdout":
+ continue
+ count += len(
+ re.findall(
+ re.escape(log_entry.encode("us-ascii", errors="replace")),
+ stream_content,
+ )
+ )
+ if count >= occurrences:
+ break
def content_get(url, done):
@@ -316,40 +334,43 @@
return stats
-def test_mirror(host, mirror_stack):
- services = {k.format(mirror_stack): v for k, v in SERVICES.items()}
- check_running_services(host, mirror_stack, services)
+def test_mirror(docker_client, mirror_stack):
+ initial_services_status = {
+ k.format(mirror_stack.name): v for k, v in INITIAL_SERVICES_STATUS.items()
+ }
+ wait_services_status(mirror_stack, initial_services_status)
# run replayer services
for service_type in ("content", "graph"):
- service = f"{mirror_stack}_{service_type}-replayer"
- LOGGER.info("Scale %s to 1", service)
- host.check_output(f"docker service scale -d {service}=1")
- if not check_running_services(host, mirror_stack, {service: "1/1"}):
- breakpoint()
- logs = wait_for_log_entry(
- host, service, f"Starting the SWH mirror {service_type} replayer"
+ service = docker_client.service.inspect(
+ f"{mirror_stack}_{service_type}-replayer"
+ )
+ LOGGER.info("Scale %s to 1", service.spec.name)
+ service.scale(1)
+ wait_services_status(mirror_stack, {service.spec.name: "1/1"})
+ wait_for_log_entry(
+ docker_client,
+ service,
+ f"Starting the SWH mirror {service_type} replayer",
+ 1,
)
- assert len(logs) == 1
- LOGGER.info("Scale %s to %d", service, SCALE)
- host.check_output(f"docker service scale -d {service}={SCALE}")
- check_running_services(host, mirror_stack, {service: f"{SCALE}/{SCALE}"})
- logs = wait_for_log_entry(
- host, service, f"Starting the SWH mirror {service_type} replayer", SCALE
+ LOGGER.info("Scale %s to %d", service.spec.name, SCALE)
+ service.scale(SCALE)
+ wait_for_log_entry(
+ docker_client,
+ service,
+ f"Starting the SWH mirror {service_type} replayer",
+ SCALE,
)
- assert len(logs) == SCALE
# wait for the replaying to be done (stop_on_oef is true)
- LOGGER.info("Wait for %s to be done", service)
- logs = wait_for_log_entry(host, service, "Done.", SCALE)
- # >= SCALE below because replayer services may have been restarted
- # (once done) before we scale them to 0
- if not (len(logs) >= SCALE):
- breakpoint()
- assert len(logs) >= SCALE
- LOGGER.info("Scale %s to 0", service)
- check_running_services(host, mirror_stack, {service: f"0/{SCALE}"})
+ LOGGER.info("Wait for %s to be done", service.spec.name)
+ wait_for_log_entry(docker_client, service, "Done.", SCALE)
+
+ LOGGER.info("Scale %s to 0", service.spec.name)
+ service.scale(0)
+ wait_services_status(mirror_stack, {service.spec.name: "0/0"})
# TODO: check there are no error reported in redis after the replayers are done
File Metadata
Details
Attached
Mime Type
text/plain
Expires
Wed, Jul 2, 11:12 AM (1 w, 2 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3223486
Attached To
D8634: Prepare the tests to run in Jenkins
Event Timeline
Log In to Comment