diff --git a/docker/origin_client/config.yml b/docker/origin_client/config.yml --- a/docker/origin_client/config.yml +++ b/docker/origin_client/config.yml @@ -16,19 +16,24 @@ archive: - #cls: graph - #url: http://graph.internal.softwareheritage.org:5009/graph + cls: multiplexer + archives: + - cls: graph + url: http://graph.internal.softwareheritage.org:5009/graph + storage: + cls: remote + url: http://webapp1.internal.softwareheritage.org:5002 + - cls: api + storage: + cls: remote + url: http://webapp1.internal.softwareheritage.org:5002 # cls: direct # db: # host: swh-storage-db # port: 5432 # dbname: swh # user: guest - cls: api - storage: - cls: remote - # url: http://webapp.internal.staging.swh.network:5002/ - url: http://webapp1.internal.softwareheritage.org:5002 + # cls: api org_server: # origin provider host: origin_server diff --git a/requirements-swh.txt b/requirements-swh.txt --- a/requirements-swh.txt +++ b/requirements-swh.txt @@ -2,3 +2,4 @@ swh.core[db,http] >= 0.14 swh.model >= 2.6.1 swh.storage +swh.graph diff --git a/swh/provenance/__init__.py b/swh/provenance/__init__.py --- a/swh/provenance/__init__.py +++ b/swh/provenance/__init__.py @@ -44,6 +44,7 @@ elif cls == "graph": try: from swh.graph.client import RemoteGraphClient + from swh.storage import get_storage from .swhgraph.archive import ArchiveGraph @@ -54,6 +55,12 @@ raise EnvironmentError( "Graph configuration required but module is not installed." ) + elif cls == "multiplexer": + + from .multiplexer.archive import ArchiveMultiplexed + + archives = list((get_archive(**archive) for archive in kwargs["archives"])) + return ArchiveMultiplexed(archives) else: raise ValueError diff --git a/swh/provenance/model.py b/swh/provenance/model.py --- a/swh/provenance/model.py +++ b/swh/provenance/model.py @@ -19,12 +19,18 @@ self.id = Origin(url=self.url).id self.snapshot = snapshot self._revisions: Optional[List[RevisionEntry]] = None + self._revisions_count = -1 def retrieve_revisions(self, archive: ArchiveInterface) -> None: if self._revisions is None: self._revisions = [ RevisionEntry(rev) for rev in archive.snapshot_get_heads(self.snapshot) ] + self._revisions_count = len(self._revisions) + + @property + def revision_count(self) -> int: + return self._revisions_count @property def revisions(self) -> Iterator[RevisionEntry]: diff --git a/swh/provenance/multiplexer/__init__.py b/swh/provenance/multiplexer/__init__.py new file mode 100644 diff --git a/swh/provenance/multiplexer/archive.py b/swh/provenance/multiplexer/archive.py new file mode 100644 --- /dev/null +++ b/swh/provenance/multiplexer/archive.py @@ -0,0 +1,63 @@ +# Copyright (C) 2022 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +import logging + +from typing import List + +from swh.core.statsd import statsd +from swh.model.model import ObjectType, Sha1Git, TargetType +from swh.storage.interface import StorageInterface +from swh.provenance.archive import ArchiveInterface +from typing import Any, Dict, Iterable, Set, Tuple + + +ARCHIVE_DURATION_METRIC = "swh_provenance_archive_multiplexed_duration_seconds" + +LOGGER = logging.getLogger(__name__) + + +class ArchiveMultiplexed(ArchiveInterface): + + def __init__(self, archives: List[ArchiveInterface]) -> None: + self.archives = archives + + def directory_ls(self, id: Sha1Git, minsize: int = 0) -> Iterable[Dict[str, Any]]: + directories = None + for archive in self.archives: + try: + directories = list(archive.directory_ls(id)) + except NotImplementedError: + pass + + if directories: + return directories + LOGGER.debug("No parents found for revision %s via %s", id.hex(), archive.__class__) + + return [] + + def revision_get_parents(self, id: Sha1Git) -> Iterable[Sha1Git]: + + for archive in self.archives: + parents = list(archive.revision_get_parents(id)) + if parents: + return parents + LOGGER.debug("No parents found for revision %s via %s", id.hex(), archive.__class__) + + return [] + + def snapshot_get_heads(self, id: Sha1Git) -> Iterable[Sha1Git]: + for archive in self.archives: + + try: + heads = list(archive.snapshot_get_heads(id)) + + if heads: + return heads + LOGGER.debug("No heads found for snapshot %s via %s", str(id), archive.__class__) + except Exception as e: + LOGGER.warn("Error retrieving heads of snapshots %s via %s: %s", id.hex(), archive.__class__, e) + + return [] diff --git a/swh/provenance/origin.py b/swh/provenance/origin.py --- a/swh/provenance/origin.py +++ b/swh/provenance/origin.py @@ -74,17 +74,37 @@ def proceed_origin( provenance: ProvenanceInterface, archive: ArchiveInterface, origin: OriginEntry ) -> None: - LOGGER.info("Processing origin %s", origin.url) + LOGGER.info("Processing origin=%s", origin) start = datetime.now() + + LOGGER.debug("Add origin") provenance.origin_add(origin) + + LOGGER.debug("Retrieving head revisions") origin.retrieve_revisions(archive) - for revision in origin.revisions: + LOGGER.info("%d heads founds", origin.revision_count) + + for idx, revision in enumerate(origin.revisions): + LOGGER.info( + "checking revision %s (%d/%d)", revision, idx + 1, origin.revision_count + ) + if not provenance.revision_is_head(revision): + LOGGER.debug("revision %s not in heads", revision) + graph = HistoryGraph(archive, revision) + LOGGER.debug("History graph built") + origin_add_revision(provenance, origin, graph) + LOGGER.debug("Revision added") + # head is treated separately + LOGGER.debug("Checking preferred origin") check_preferred_origin(provenance, origin, revision) + + LOGGER.debug("Adding revision to origin") provenance.revision_add_to_origin(origin, revision) + end = datetime.now() LOGGER.info("Processed origin %s in %s", origin.url, (end - start)) diff --git a/swh/provenance/tests/test_archive_interface.py b/swh/provenance/tests/test_archive_interface.py --- a/swh/provenance/tests/test_archive_interface.py +++ b/swh/provenance/tests/test_archive_interface.py @@ -6,7 +6,7 @@ from collections import Counter from operator import itemgetter from typing import Counter as TCounter -from typing import Dict, List, Set, Tuple, Type, Union +from typing import Any, Dict, Iterable, List, Set, Tuple, Type, Union import pytest @@ -28,6 +28,7 @@ ) from swh.model.swhids import CoreSWHID, ExtendedObjectType, ExtendedSWHID from swh.provenance.archive import ArchiveInterface +from swh.provenance.multiplexer.archive import ArchiveMultiplexed from swh.provenance.postgresql.archive import ArchivePostgreSQL from swh.provenance.storage.archive import ArchiveStorage from swh.provenance.swhgraph.archive import ArchiveGraph @@ -35,6 +36,17 @@ from swh.storage.postgresql.storage import Storage +class ArchiveNoop(ArchiveInterface): + def directory_ls(self, id: Sha1Git, minsize: int = 0) -> Iterable[Dict[str, Any]]: + return [] + + def revision_get_parents(self, id: Sha1Git) -> Iterable[Sha1Git]: + return [] + + def snapshot_get_heads(self, id: Sha1Git) -> Iterable[Sha1Git]: + return [] + + def check_directory_ls( reference: ArchiveInterface, archive: ArchiveInterface, data: Dict[str, List[dict]] ) -> None: @@ -214,3 +226,19 @@ check_directory_ls(archive, archive_graph, data) check_revision_get_parents(archive, archive_graph, data) check_snapshot_get_heads(archive, archive_graph, data) + + # test against ArchiveMultiplexer + archive_multiplexed = ArchiveMultiplexed( + [ArchiveNoop(), archive_graph, archive_api] + ) + check_directory_ls(archive, archive_multiplexed, data) + check_revision_get_parents(archive, archive_multiplexed, data) + check_snapshot_get_heads(archive, archive_multiplexed, data) + + +def test_noop_multiplexer(): + archive = ArchiveMultiplexed([ArchiveNoop()]) + + assert not archive.directory_ls(Sha1Git(b"abcd")) + assert not archive.revision_get_parents(Sha1Git(b"abcd")) + assert not archive.snapshot_get_heads(Sha1Git(b"abcd")) diff --git a/swh/provenance/tests/test_init.py b/swh/provenance/tests/test_init.py new file mode 100644 --- /dev/null +++ b/swh/provenance/tests/test_init.py @@ -0,0 +1,40 @@ +# Copyright (C) 2019-2021 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +from unittest.mock import patch + +from swh.provenance import get_archive +from swh.provenance.multiplexer.archive import ArchiveMultiplexed +from swh.provenance.swhgraph.archive import ArchiveGraph +from swh.provenance.storage.archive import ArchiveStorage + + +def test_multiplexer_configuration(): + config = { + "archives": [ + { + "cls": "graph", + "url": "graph_url", + "storage": { + "cls": "remote", + "url": "storage_graph_url" + } + }, + { + "cls": "api", + "storage": { + "cls": "remote", + "url": "storage_api_url" + } + } + + ] + } + + archive = get_archive(cls="multiplexer", **config) + assert isinstance(archive, ArchiveMultiplexed) + assert len(archive.archives) == 2 + assert isinstance(archive.archives[0], ArchiveGraph) + assert isinstance(archive.archives[1], ArchiveStorage) diff --git a/swh/provenance/tools/origins/client.py b/swh/provenance/tools/origins/client.py --- a/swh/provenance/tools/origins/client.py +++ b/swh/provenance/tools/origins/client.py @@ -19,6 +19,10 @@ CONFIG_ENVVAR = "SWH_CONFIG_FILENAME" DEFAULT_PATH = os.environ.get(CONFIG_ENVVAR, None) +LOG_FORMAT = ( + "%(asctime)s-%(levelname)s - %(process)d - %(name)s.%(funcName)s:%(lineno)d" + ": %(message)s" +) class Client(multiprocessing.Process): @@ -63,7 +67,7 @@ if __name__ == "__main__": - logging.basicConfig(level=logging.INFO) + logging.basicConfig(level=logging.WARN, format=LOG_FORMAT) # Check parameters if len(sys.argv) != 2: