diff --git a/docker/origin_client/config.yml b/docker/origin_client/config.yml --- a/docker/origin_client/config.yml +++ b/docker/origin_client/config.yml @@ -16,19 +16,24 @@ archive: - #cls: graph - #url: http://graph.internal.softwareheritage.org:5009/graph + cls: multiplexer + archives: + - cls: graph + url: http://graph.internal.softwareheritage.org:5009/graph + storage: + cls: remote + url: http://webapp1.internal.softwareheritage.org:5002 + - cls: api + storage: + cls: remote + url: http://webapp1.internal.softwareheritage.org:5002 # cls: direct # db: # host: swh-storage-db # port: 5432 # dbname: swh # user: guest - cls: api - storage: - cls: remote - # url: http://webapp.internal.staging.swh.network:5002/ - url: http://webapp1.internal.softwareheritage.org:5002 + # cls: api org_server: # origin provider host: origin_server diff --git a/requirements-swh.txt b/requirements-swh.txt --- a/requirements-swh.txt +++ b/requirements-swh.txt @@ -2,3 +2,4 @@ swh.core[db,http] >= 0.14 swh.model >= 2.6.1 swh.storage +swh.graph diff --git a/swh/provenance/__init__.py b/swh/provenance/__init__.py --- a/swh/provenance/__init__.py +++ b/swh/provenance/__init__.py @@ -44,6 +44,7 @@ elif cls == "graph": try: from swh.graph.client import RemoteGraphClient + from swh.storage import get_storage from .swhgraph.archive import ArchiveGraph @@ -54,6 +55,12 @@ raise EnvironmentError( "Graph configuration required but module is not installed." ) + elif cls == "multiplexer": + + from .multiplexer.archive import ArchiveMultiplexed + + archives = list((get_archive(**archive) for archive in kwargs["archives"])) + return ArchiveMultiplexed(archives) else: raise ValueError diff --git a/swh/provenance/model.py b/swh/provenance/model.py --- a/swh/provenance/model.py +++ b/swh/provenance/model.py @@ -1,4 +1,4 @@ -# Copyright (C) 2021 The Software Heritage developers +# Copyright (C) 2021-2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information @@ -14,6 +14,9 @@ class OriginEntry: + + revisions_count: int + def __init__(self, url: str, snapshot: Sha1Git) -> None: self.url = url self.id = Origin(url=self.url).id @@ -25,6 +28,13 @@ self._revisions = [ RevisionEntry(rev) for rev in archive.snapshot_get_heads(self.snapshot) ] + self._revisions_count = len(self._revisions) + + @property + def revision_count(self) -> int: + if self._revisions_count is None: + raise ValueError("retrieve_revisions was not called") + return self._revisions_count @property def revisions(self) -> Iterator[RevisionEntry]: diff --git a/swh/provenance/multiplexer/__init__.py b/swh/provenance/multiplexer/__init__.py new file mode 100644 diff --git a/swh/provenance/multiplexer/archive.py b/swh/provenance/multiplexer/archive.py new file mode 100644 --- /dev/null +++ b/swh/provenance/multiplexer/archive.py @@ -0,0 +1,77 @@ +# Copyright (C) 2022 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +import logging +from typing import Any, Dict, Iterable, List + +from swh.core.statsd import statsd +from swh.model.model import Sha1Git +from swh.provenance.archive import ArchiveInterface +from swh.storage.interface import StorageInterface + +ARCHIVE_DURATION_METRIC = "swh_provenance_archive_multiplexed_duration_seconds" + +LOGGER = logging.getLogger(__name__) + + +class ArchiveMultiplexed: + storage: StorageInterface + + def __init__(self, archives: List[ArchiveInterface]) -> None: + self.archives = archives + + @statsd.timed(metric=ARCHIVE_DURATION_METRIC, tags={"method": "directory_ls"}) + def directory_ls(self, id: Sha1Git, minsize: int = 0) -> Iterable[Dict[str, Any]]: + directories = None + for archive in self.archives: + try: + directories = list(archive.directory_ls(id)) + except NotImplementedError: + pass + + if directories: + return directories + LOGGER.debug( + "No parents found for revision %s via %s", id.hex(), archive.__class__ + ) + + return [] + + @statsd.timed( + metric=ARCHIVE_DURATION_METRIC, tags={"method": "revision_get_parents"} + ) + def revision_get_parents(self, id: Sha1Git) -> Iterable[Sha1Git]: + + for archive in self.archives: + parents = list(archive.revision_get_parents(id)) + if parents: + return parents + LOGGER.debug( + "No parents found for revision %s via %s", id.hex(), archive.__class__ + ) + + return [] + + @statsd.timed(metric=ARCHIVE_DURATION_METRIC, tags={"method": "snapshot_get_heads"}) + def snapshot_get_heads(self, id: Sha1Git) -> Iterable[Sha1Git]: + for archive in self.archives: + + try: + heads = list(archive.snapshot_get_heads(id)) + + if heads: + return heads + LOGGER.debug( + "No heads found for snapshot %s via %s", str(id), archive.__class__ + ) + except Exception as e: + LOGGER.warn( + "Error retrieving heads of snapshots %s via %s: %s", + id.hex(), + archive.__class__, + e, + ) + + return [] diff --git a/swh/provenance/origin.py b/swh/provenance/origin.py --- a/swh/provenance/origin.py +++ b/swh/provenance/origin.py @@ -74,17 +74,37 @@ def proceed_origin( provenance: ProvenanceInterface, archive: ArchiveInterface, origin: OriginEntry ) -> None: - LOGGER.info("Processing origin %s", origin.url) + LOGGER.info("Processing origin=%s", origin) start = datetime.now() + + LOGGER.debug("Add origin") provenance.origin_add(origin) + + LOGGER.debug("Retrieving head revisions") origin.retrieve_revisions(archive) - for revision in origin.revisions: + LOGGER.info("%d heads founds", origin.revision_count) + + for idx, revision in enumerate(origin.revisions): + LOGGER.info( + "checking revision %s (%d/%d)", revision, idx + 1, origin.revision_count + ) + if not provenance.revision_is_head(revision): + LOGGER.debug("revision %s not in heads", revision) + graph = HistoryGraph(archive, revision) + LOGGER.debug("History graph built") + origin_add_revision(provenance, origin, graph) + LOGGER.debug("Revision added") + # head is treated separately + LOGGER.debug("Checking preferred origin") check_preferred_origin(provenance, origin, revision) + + LOGGER.debug("Adding revision to origin") provenance.revision_add_to_origin(origin, revision) + end = datetime.now() LOGGER.info("Processed origin %s in %s", origin.url, (end - start)) diff --git a/swh/provenance/tests/test_archive_interface.py b/swh/provenance/tests/test_archive_interface.py --- a/swh/provenance/tests/test_archive_interface.py +++ b/swh/provenance/tests/test_archive_interface.py @@ -1,12 +1,13 @@ -# Copyright (C) 2021 The Software Heritage developers +# Copyright (C) 2021-2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from collections import Counter from operator import itemgetter +from typing import Any from typing import Counter as TCounter -from typing import Dict, List, Set, Tuple, Type, Union +from typing import Dict, Iterable, List, Set, Tuple, Type, Union import pytest @@ -28,13 +29,28 @@ ) from swh.model.swhids import CoreSWHID, ExtendedObjectType, ExtendedSWHID from swh.provenance.archive import ArchiveInterface +from swh.provenance.multiplexer.archive import ArchiveMultiplexed from swh.provenance.postgresql.archive import ArchivePostgreSQL from swh.provenance.storage.archive import ArchiveStorage from swh.provenance.swhgraph.archive import ArchiveGraph from swh.provenance.tests.conftest import fill_storage, load_repo_data +from swh.storage.interface import StorageInterface from swh.storage.postgresql.storage import Storage +class ArchiveNoop: + storage: StorageInterface + + def directory_ls(self, id: Sha1Git, minsize: int = 0) -> Iterable[Dict[str, Any]]: + return [] + + def revision_get_parents(self, id: Sha1Git) -> Iterable[Sha1Git]: + return [] + + def snapshot_get_heads(self, id: Sha1Git) -> Iterable[Sha1Git]: + return [] + + def check_directory_ls( reference: ArchiveInterface, archive: ArchiveInterface, data: Dict[str, List[dict]] ) -> None: @@ -214,3 +230,19 @@ check_directory_ls(archive, archive_graph, data) check_revision_get_parents(archive, archive_graph, data) check_snapshot_get_heads(archive, archive_graph, data) + + # test against ArchiveMultiplexer + archive_multiplexed = ArchiveMultiplexed( + [ArchiveNoop(), archive_graph, archive_api] + ) + check_directory_ls(archive, archive_multiplexed, data) + check_revision_get_parents(archive, archive_multiplexed, data) + check_snapshot_get_heads(archive, archive_multiplexed, data) + + +def test_noop_multiplexer(): + archive = ArchiveMultiplexed([ArchiveNoop()]) + + assert not archive.directory_ls(Sha1Git(b"abcd")) + assert not archive.revision_get_parents(Sha1Git(b"abcd")) + assert not archive.snapshot_get_heads(Sha1Git(b"abcd")) diff --git a/swh/provenance/tests/test_init.py b/swh/provenance/tests/test_init.py new file mode 100644 --- /dev/null +++ b/swh/provenance/tests/test_init.py @@ -0,0 +1,28 @@ +# Copyright (C) 2022 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +from swh.provenance import get_archive +from swh.provenance.multiplexer.archive import ArchiveMultiplexed +from swh.provenance.storage.archive import ArchiveStorage +from swh.provenance.swhgraph.archive import ArchiveGraph + + +def test_multiplexer_configuration(): + config = { + "archives": [ + { + "cls": "graph", + "url": "graph_url", + "storage": {"cls": "remote", "url": "storage_graph_url"}, + }, + {"cls": "api", "storage": {"cls": "remote", "url": "storage_api_url"}}, + ] + } + + archive = get_archive(cls="multiplexer", **config) + assert isinstance(archive, ArchiveMultiplexed) + assert len(archive.archives) == 2 + assert isinstance(archive.archives[0], ArchiveGraph) + assert isinstance(archive.archives[1], ArchiveStorage) diff --git a/swh/provenance/tools/origins/client.py b/swh/provenance/tools/origins/client.py --- a/swh/provenance/tools/origins/client.py +++ b/swh/provenance/tools/origins/client.py @@ -1,5 +1,10 @@ #!/usr/bin/env python +# Copyright (C) 2021-2022 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + import logging import logging.handlers import multiprocessing @@ -19,6 +24,10 @@ CONFIG_ENVVAR = "SWH_CONFIG_FILENAME" DEFAULT_PATH = os.environ.get(CONFIG_ENVVAR, None) +LOG_FORMAT = ( + "%(asctime)s-%(levelname)s - %(process)d - %(name)s.%(funcName)s:%(lineno)d" + ": %(message)s" +) class Client(multiprocessing.Process): @@ -63,7 +72,7 @@ if __name__ == "__main__": - logging.basicConfig(level=logging.INFO) + logging.basicConfig(level=logging.WARN, format=LOG_FORMAT) # Check parameters if len(sys.argv) != 2: