diff --git a/conftest.py b/conftest.py index 2ea652bf..973f4979 100644 --- a/conftest.py +++ b/conftest.py @@ -1 +1 @@ -pytest_plugins = ["swh.auth.pytest_plugin"] +pytest_plugins = ["swh.auth.pytest_plugin", "swh.scheduler.pytest_plugin"] diff --git a/requirements-test.txt b/requirements-test.txt index ec88a860..4ee14e0b 100644 --- a/requirements-test.txt +++ b/requirements-test.txt @@ -1,15 +1,16 @@ decorator # dependency of swh.core[http] djangorestframework-stubs django-stubs django-test-migrations hypothesis pytest pytest-django pytest-mock requests-mock != 1.9.0, != 1.9.1 swh.core[http] >= 0.0.95 swh.loader.git >= 0.8.0 +swh-scheduler[testing] >= 0.5.0 swh.storage >= 0.1.1 types-docutils types-pyyaml types-requests diff --git a/swh/web/common/management/commands/refresh_savecodenow_statuses.py b/swh/web/common/management/commands/refresh_savecodenow_statuses.py index 0c984bd9..e697d92d 100644 --- a/swh/web/common/management/commands/refresh_savecodenow_statuses.py +++ b/swh/web/common/management/commands/refresh_savecodenow_statuses.py @@ -1,22 +1,63 @@ # Copyright (C) 2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information +from typing import Set + from django.core.management.base import BaseCommand +from swh.scheduler.model import ListedOrigin +from swh.web.common.models import VISIT_STATUS_FULL, VISIT_STATUS_PARTIAL from swh.web.common.origin_save import refresh_save_origin_request_statuses +from swh.web.config import get_config +from swh.web.config import scheduler as get_scheduler class Command(BaseCommand): help = "Refresh save code now origin request statuses periodically" def handle(self, *args, **options): + """Refresh origin save code now requests. + + For the origin visit types, svn, git, hg, this also installs the origins as + recurring origins to visit. + + """ refreshed_statuses = refresh_save_origin_request_statuses() + scheduler = get_scheduler() + + # then schedule the origins with meaningful status and type to be ingested + # regularly + lister = scheduler.get_or_create_lister( + name="save-code-now", instance_name=get_config()["instance_name"] + ) + + origins: Set[str, str] = set() + listed_origins = [] + for status in refreshed_statuses: + visit_type = status["visit_type"] + # only deal with git, svn, hg visit types + if visit_type == "archives": + continue + # only keep satisfying visit statuses + if status["visit_status"] not in (VISIT_STATUS_PARTIAL, VISIT_STATUS_FULL): + continue + origin = status["origin_url"] + # drop duplicates within the same batch + if (visit_type, origin) in origins: + continue + origins.add((visit_type, origin)) + listed_origins.append( + ListedOrigin(lister_id=lister.id, visit_type=visit_type, url=origin) + ) + + if listed_origins: + scheduler.record_listed_origins(listed_origins) if len(refreshed_statuses) > 0: msg = f"Successfully updated {len(refreshed_statuses)} save request(s)." else: msg = "Nothing to do." self.stdout.write(self.style.SUCCESS(msg)) diff --git a/swh/web/config.py b/swh/web/config.py index e8142f5e..19c4ee81 100644 --- a/swh/web/config.py +++ b/swh/web/config.py @@ -1,213 +1,214 @@ # Copyright (C) 2017-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information import os from typing import Any, Dict from swh.core import config from swh.counters import get_counters from swh.indexer.storage import get_indexer_storage from swh.scheduler import get_scheduler from swh.search import get_search from swh.storage import get_storage from swh.vault import get_vault from swh.web import settings SWH_WEB_INTERNAL_SERVER_NAME = "archive.internal.softwareheritage.org" STAGING_SERVER_NAMES = [ "webapp.staging.swh.network", "webapp.internal.staging.swh.network", ] ORIGIN_VISIT_TYPES = [ "cran", "deb", "deposit", "ftp", "hg", "git", "nixguix", "npm", "pypi", "svn", "tar", ] SETTINGS_DIR = os.path.dirname(settings.__file__) DEFAULT_CONFIG = { "allowed_hosts": ("list", []), "search": ( "dict", {"cls": "remote", "url": "http://127.0.0.1:5010/", "timeout": 10,}, ), "storage": ( "dict", {"cls": "remote", "url": "http://127.0.0.1:5002/", "timeout": 10,}, ), "indexer_storage": ( "dict", {"cls": "remote", "url": "http://127.0.0.1:5007/", "timeout": 1,}, ), "counters": ( "dict", {"cls": "remote", "url": "http://127.0.0.1:5011/", "timeout": 1,}, ), "log_dir": ("string", "/tmp/swh/log"), "debug": ("bool", False), "serve_assets": ("bool", False), "host": ("string", "127.0.0.1"), "port": ("int", 5004), "secret_key": ("string", "development key"), # do not display code highlighting for content > 1MB "content_display_max_size": ("int", 5 * 1024 * 1024), "snapshot_content_max_size": ("int", 1000), "throttling": ( "dict", { "cache_uri": None, # production: memcached as cache (127.0.0.1:11211) # development: in-memory cache so None "scopes": { "swh_api": { "limiter_rate": {"default": "120/h"}, "exempted_networks": ["127.0.0.0/8"], }, "swh_api_origin_search": { "limiter_rate": {"default": "10/m"}, "exempted_networks": ["127.0.0.0/8"], }, "swh_vault_cooking": { "limiter_rate": {"default": "120/h", "GET": "60/m"}, "exempted_networks": ["127.0.0.0/8"], }, "swh_save_origin": { "limiter_rate": {"default": "120/h", "POST": "10/h"}, "exempted_networks": ["127.0.0.0/8"], }, "swh_api_origin_visit_latest": { "limiter_rate": {"default": "700/m"}, "exempted_networks": ["127.0.0.0/8"], }, }, }, ), "vault": ("dict", {"cls": "remote", "args": {"url": "http://127.0.0.1:5005/",}}), "scheduler": ("dict", {"cls": "remote", "url": "http://127.0.0.1:5008/"}), "development_db": ("string", os.path.join(SETTINGS_DIR, "db.sqlite3")), "test_db": ("string", os.path.join(SETTINGS_DIR, "testdb.sqlite3")), "production_db": ("dict", {"name": "swh-web"}), "deposit": ( "dict", { "private_api_url": "https://deposit.softwareheritage.org/1/private/", "private_api_user": "swhworker", "private_api_password": "", }, ), "coverage_count_origins": ("bool", False), "e2e_tests_mode": ("bool", False), "es_workers_index_url": ("string", ""), "history_counters_url": ( "string", "https://stats.export.softwareheritage.org/history_counters.json", ), "client_config": ("dict", {}), "keycloak": ("dict", {"server_url": "", "realm_name": ""}), "graph": ( "dict", {"server_url": "http://graph.internal.softwareheritage.org:5009/graph/"}, ), "status": ( "dict", { "server_url": "https://status.softwareheritage.org/", "json_path": "1.0/status/578e5eddcdc0cc7951000520", }, ), "metadata_search_backend": ("string", "swh-indexer-storage"), # or "swh-search" "counters_backend": ("string", "swh-storage"), # or "swh-counters" "staging_server_names": ("list", STAGING_SERVER_NAMES), + "instance_name": ("str", "archive-test.softwareheritage.org"), } -swhweb_config = {} # type: Dict[str, Any] +swhweb_config: Dict[str, Any] = {} def get_config(config_file="web/web"): """Read the configuration file `config_file`. If an environment variable SWH_CONFIG_FILENAME is defined, this takes precedence over the config_file parameter. In any case, update the app with parameters (secret_key, conf) and return the parsed configuration as a dict. If no configuration file is provided, return a default configuration. """ if not swhweb_config: config_filename = os.environ.get("SWH_CONFIG_FILENAME") if config_filename: config_file = config_filename cfg = config.load_named_config(config_file, DEFAULT_CONFIG) swhweb_config.update(cfg) config.prepare_folders(swhweb_config, "log_dir") if swhweb_config.get("search"): swhweb_config["search"] = get_search(**swhweb_config["search"]) else: swhweb_config["search"] = None swhweb_config["storage"] = get_storage(**swhweb_config["storage"]) swhweb_config["vault"] = get_vault(**swhweb_config["vault"]) swhweb_config["indexer_storage"] = get_indexer_storage( **swhweb_config["indexer_storage"] ) swhweb_config["scheduler"] = get_scheduler(**swhweb_config["scheduler"]) swhweb_config["counters"] = get_counters(**swhweb_config["counters"]) return swhweb_config def search(): """Return the current application's search. """ return get_config()["search"] def storage(): """Return the current application's storage. """ return get_config()["storage"] def vault(): """Return the current application's vault. """ return get_config()["vault"] def indexer_storage(): """Return the current application's indexer storage. """ return get_config()["indexer_storage"] def scheduler(): """Return the current application's scheduler. """ return get_config()["scheduler"] def counters(): """Return the current application's counters. """ return get_config()["counters"] diff --git a/swh/web/tests/common/test_django_command.py b/swh/web/tests/common/test_django_command.py index 9a6de94b..3d64bb99 100644 --- a/swh/web/tests/common/test_django_command.py +++ b/swh/web/tests/common/test_django_command.py @@ -1,35 +1,177 @@ # Copyright (C) 2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information +from datetime import datetime, timedelta, timezone from io import StringIO import pytest from django.core.management import call_command +from swh.core.api.classes import stream_results +from swh.web.common.models import ( + SAVE_REQUEST_ACCEPTED, + SAVE_TASK_FAILED, + SAVE_TASK_SCHEDULED, + SAVE_TASK_SUCCEEDED, + VISIT_STATUS_FAILED, + VISIT_STATUS_FULL, + VISIT_STATUS_PARTIAL, +) +from swh.web.common.typing import SaveOriginRequestInfo +from swh.web.config import get_config + +MODULE_FQDN = "swh.web.common.management.commands" +COMMAND_NAME = "refresh_savecodenow_statuses" + +AUTHORIZED_ORIGIN_URL = "https://scm.ourproject.org/anonscm/%s" + + +@pytest.fixture +def mock_refresh(mocker): + return mocker.patch( + f"{MODULE_FQDN}.{COMMAND_NAME}.refresh_save_origin_request_statuses" + ) + + +@pytest.fixture +def mock_scheduler(mocker, swh_scheduler): + mock_scheduler = mocker.patch(f"{MODULE_FQDN}.{COMMAND_NAME}.get_scheduler") + mock_scheduler.return_value = swh_scheduler + + return mock_scheduler + @pytest.mark.parametrize("nb_results", [0, 10, 20]) -def test_command_refresh__with_statuses_refreshed(mocker, nb_results): - """Refresh status command reported updated non-terminal statuses. +def test_command_refresh__with_statuses_refreshed( + mock_scheduler, mock_refresh, nb_results +): + """Refresh status command reports non-terminal statuses updates. """ - command_name = "refresh_savecodenow_statuses" - module_fqdn = "swh.web.common.management.commands" - mock_refresh = mocker.patch( - f"{module_fqdn}.{command_name}.refresh_save_origin_request_statuses" - ) - # fake returned refreshed status - mock_refresh.return_value = [{"": ""}] * nb_results + # fake returned refreshed status for 'archives' visit type + mock_refresh.return_value = [{"visit_type": "archives",}] * nb_results out = StringIO() - call_command(command_name, stdout=out) - - assert mock_refresh.called + call_command(COMMAND_NAME, stdout=out) actual_output = out.getvalue() if nb_results > 0: assert f"updated {nb_results}" in actual_output else: assert "Nothing" in actual_output + + assert mock_scheduler.called + assert mock_refresh.called + + +@pytest.fixture +def fake_refreshed_data(): + """Prepare test data within the scheduler and the swh-web model db + + """ + duplicated_origin_url = AUTHORIZED_ORIGIN_URL % "specific-origin" + entries = ( + [ + { + "visit_type": "archives", # ignored from recurring task scheduling + "visit_status": VISIT_STATUS_FULL, + "task_status": SAVE_TASK_SUCCEEDED, + }, + { + "visit_type": "hg", # scheduled as recurring task + "visit_status": VISIT_STATUS_PARTIAL, + "task_status": SAVE_TASK_SUCCEEDED, + }, + { + "visit_type": "svn", # scheduled as recurring task + "visit_status": VISIT_STATUS_PARTIAL, + "task_status": SAVE_TASK_SCHEDULED, + }, + { + "visit_type": "svn", # ignored from recurring task scheduling + "visit_status": VISIT_STATUS_FAILED, + "task_status": SAVE_TASK_FAILED, + }, + { + "visit_type": "hg", # ignored from recurring task scheduling + "visit_status": "created", + "task_status": SAVE_TASK_SCHEDULED, + }, + ] + + [ + { + "visit_type": "git", + "visit_status": VISIT_STATUS_FULL, + "task_status": SAVE_TASK_SUCCEEDED, + "origin": duplicated_origin_url, + } + ] + * 3 + ) # only 1 of the origin duplicates will be scheduled as recurring task + + time_now = datetime.now(tz=timezone.utc) - timedelta(days=len(entries)) + return [ + SaveOriginRequestInfo( + visit_type=meta["visit_type"], + visit_status=meta["visit_status"], + origin_url=( + meta["origin"] if "origin" in meta else AUTHORIZED_ORIGIN_URL % i + ), + save_request_date=time_now + timedelta(days=i - 1), + save_request_status=SAVE_REQUEST_ACCEPTED, + visit_date=time_now + timedelta(days=i), + save_task_status=meta["task_status"], + id=i, + loading_task_id=i, + ) + for i, meta in enumerate(entries) + ] + + +def test_command_refresh__with_recurrent_tasks_scheduling( + mock_scheduler, mock_refresh, fake_refreshed_data, swh_scheduler +): + """Refresh status command report updates of statuses. The successful ones without the + type 'archived' are also scheduled recurringly. + + """ + mock_refresh.return_value = fake_refreshed_data + + # only visit types (git, hg, svn) types with status (full, partial) are taken into + # account for scheduling, so only 3 of those matches in the fake data set. + expected_nb_scheduled = 0 + + origins = set() + expected_nb_scheduled = 0 + for entry in fake_refreshed_data: + visit_type = entry["visit_type"] + if visit_type == "archives": # only deal with git, svn, hg + continue + if entry["visit_status"] not in ("partial", "full"): + continue + origin = entry["origin_url"] + if (visit_type, origin) in origins: + continue + origins.add((visit_type, origin)) + expected_nb_scheduled += 1 + + assert expected_nb_scheduled == 3 + + out = StringIO() + call_command(COMMAND_NAME, stdout=out) + + actual_output = out.getvalue() + assert f"Successfully updated {len(fake_refreshed_data)}" in actual_output + + lister = swh_scheduler.get_or_create_lister( + name="save-code-now", instance_name=get_config()["instance_name"] + ) + + result = list(stream_results(swh_scheduler.get_listed_origins, lister.id)) + assert len(result) == expected_nb_scheduled + + assert mock_scheduler.called + assert mock_refresh.called