diff --git a/docker/origin_client/config.yml b/docker/origin_client/config.yml index 7caf996..59aae7c 100644 --- a/docker/origin_client/config.yml +++ b/docker/origin_client/config.yml @@ -1,36 +1,41 @@ provenance: storage: cls: rabbitmq # client configuration url: amqp://rabbitmq:5672/%2f storage_config: cls: postgresql db: host: storage-db dbname: provenance user: provenance password: provenancepassword batch_size: 10000 prefetch_count: 100 wait_min: 60 wait_per_batch: 60 archive: - #cls: graph - #url: http://graph.internal.softwareheritage.org:5009/graph + cls: multiplexer + archives: + - cls: graph + url: http://graph.internal.softwareheritage.org:5009/graph + storage: + cls: remote + url: http://webapp1.internal.softwareheritage.org:5002 + - cls: api + storage: + cls: remote + url: http://webapp1.internal.softwareheritage.org:5002 # cls: direct # db: # host: swh-storage-db # port: 5432 # dbname: swh # user: guest - cls: api - storage: - cls: remote - # url: http://webapp.internal.staging.swh.network:5002/ - url: http://webapp1.internal.softwareheritage.org:5002 + # cls: api org_server: # origin provider host: origin_server port: 5555 batch_size: 1 diff --git a/swh/provenance/__init__.py b/swh/provenance/__init__.py index 506ca14..c13d047 100644 --- a/swh/provenance/__init__.py +++ b/swh/provenance/__init__.py @@ -1,118 +1,125 @@ # Copyright (C) 2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from __future__ import annotations from typing import TYPE_CHECKING import warnings if TYPE_CHECKING: from .archive import ArchiveInterface from .interface import ProvenanceInterface, ProvenanceStorageInterface def get_archive(cls: str, **kwargs) -> ArchiveInterface: """Get an archive object of class ``cls`` with arguments ``args``. Args: cls: archive's class, either 'api', 'direct' or 'graph' args: dictionary of arguments passed to the archive class constructor Returns: an instance of archive object (either using swh.storage API or direct queries to the archive's database) Raises: :cls:`ValueError` if passed an unknown archive class. """ if cls == "api": from swh.storage import get_storage from .storage.archive import ArchiveStorage return ArchiveStorage(get_storage(**kwargs["storage"])) elif cls == "direct": from swh.core.db import BaseDb from .postgresql.archive import ArchivePostgreSQL return ArchivePostgreSQL(BaseDb.connect(**kwargs["db"]).conn) elif cls == "graph": try: from swh.graph.client import RemoteGraphClient + from swh.storage import get_storage from .swhgraph.archive import ArchiveGraph graph = RemoteGraphClient(kwargs.get("url")) return ArchiveGraph(graph, get_storage(**kwargs["storage"])) except ModuleNotFoundError: raise EnvironmentError( "Graph configuration required but module is not installed." ) + elif cls == "multiplexer": + + from .multiplexer.archive import ArchiveMultiplexed + + archives = list((get_archive(**archive) for archive in kwargs["archives"])) + return ArchiveMultiplexed(archives) else: raise ValueError def get_provenance(**kwargs) -> ProvenanceInterface: """Get an provenance object with arguments ``args``. Args: args: dictionary of arguments to retrieve a swh.provenance.storage class (see :func:`get_provenance_storage` for details) Returns: an instance of provenance object """ from .provenance import Provenance return Provenance(get_provenance_storage(**kwargs)) def get_provenance_storage(cls: str, **kwargs) -> ProvenanceStorageInterface: """Get an archive object of class ``cls`` with arguments ``args``. Args: cls: storage's class, only 'local' is currently supported args: dictionary of arguments passed to the storage class constructor Returns: an instance of storage object Raises: :cls:`ValueError` if passed an unknown archive class. """ if cls in ["local", "postgresql"]: from .postgresql.provenance import ProvenanceStoragePostgreSql if cls == "local": warnings.warn( '"local" class is deprecated for provenance storage, please ' 'use "postgresql" class instead.', DeprecationWarning, ) raise_on_commit = kwargs.get("raise_on_commit", False) return ProvenanceStoragePostgreSql( raise_on_commit=raise_on_commit, **kwargs["db"] ) elif cls == "mongodb": from .mongo.backend import ProvenanceStorageMongoDb engine = kwargs.get("engine", "pymongo") return ProvenanceStorageMongoDb(engine=engine, **kwargs["db"]) elif cls == "rabbitmq": from .api.client import ProvenanceStorageRabbitMQClient rmq_storage = ProvenanceStorageRabbitMQClient(**kwargs) if TYPE_CHECKING: assert isinstance(rmq_storage, ProvenanceStorageInterface) return rmq_storage raise ValueError diff --git a/swh/provenance/model.py b/swh/provenance/model.py index 5ace0e6..144f2df 100644 --- a/swh/provenance/model.py +++ b/swh/provenance/model.py @@ -1,153 +1,157 @@ -# Copyright (C) 2021 The Software Heritage developers +# Copyright (C) 2021-2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from __future__ import annotations from datetime import datetime from typing import Iterable, Iterator, List, Optional from swh.model.model import Origin, Sha1Git from .archive import ArchiveInterface class OriginEntry: + + revisions_count: int + def __init__(self, url: str, snapshot: Sha1Git) -> None: self.url = url self.id = Origin(url=self.url).id self.snapshot = snapshot self._revisions: Optional[List[RevisionEntry]] = None - self._revisions_count = -1 def retrieve_revisions(self, archive: ArchiveInterface) -> None: if self._revisions is None: self._revisions = [ RevisionEntry(rev) for rev in archive.snapshot_get_heads(self.snapshot) ] self._revisions_count = len(self._revisions) @property def revision_count(self) -> int: + if self._revisions_count is None: + raise ValueError("retrieve_revisions was not called") return self._revisions_count @property def revisions(self) -> Iterator[RevisionEntry]: if self._revisions is None: raise RuntimeError( "Revisions of this node has not yet been retrieved. " "Please call retrieve_revisions() before using this property." ) return (x for x in self._revisions) def __str__(self) -> str: return f"" class RevisionEntry: def __init__( self, id: Sha1Git, date: Optional[datetime] = None, root: Optional[Sha1Git] = None, parents: Optional[Iterable[Sha1Git]] = None, ) -> None: self.id = id self.date = date assert self.date is None or self.date.tzinfo is not None self.root = root self._parents_ids = parents self._parents_entries: Optional[List[RevisionEntry]] = None def retrieve_parents(self, archive: ArchiveInterface) -> None: if self._parents_entries is None: if self._parents_ids is None: self._parents_ids = archive.revision_get_parents(self.id) self._parents_entries = [RevisionEntry(id) for id in self._parents_ids] @property def parents(self) -> Iterator[RevisionEntry]: if self._parents_entries is None: raise RuntimeError( "Parents of this node has not yet been retrieved. " "Please call retrieve_parents() before using this property." ) return (x for x in self._parents_entries) def __str__(self) -> str: return f"" def __eq__(self, other) -> bool: return isinstance(other, RevisionEntry) and self.id == other.id def __hash__(self) -> int: return hash(self.id) class DirectoryEntry: def __init__(self, id: Sha1Git, name: bytes = b"") -> None: self.id = id self.name = name self._files: Optional[List[FileEntry]] = None self._dirs: Optional[List[DirectoryEntry]] = None def retrieve_children(self, archive: ArchiveInterface, minsize: int = 0) -> None: if self._files is None and self._dirs is None: self._files = [] self._dirs = [] for child in archive.directory_ls(self.id, minsize=minsize): if child["type"] == "dir": self._dirs.append( DirectoryEntry(child["target"], name=child["name"]) ) elif child["type"] == "file": self._files.append(FileEntry(child["target"], child["name"])) @property def files(self) -> Iterator[FileEntry]: if self._files is None: raise RuntimeError( "Children of this node has not yet been retrieved. " "Please call retrieve_children() before using this property." ) return (x for x in self._files) @property def dirs(self) -> Iterator[DirectoryEntry]: if self._dirs is None: raise RuntimeError( "Children of this node has not yet been retrieved. " "Please call retrieve_children() before using this property." ) return (x for x in self._dirs) def __str__(self) -> str: return f"" def __eq__(self, other) -> bool: return isinstance(other, DirectoryEntry) and (self.id, self.name) == ( other.id, other.name, ) def __hash__(self) -> int: return hash((self.id, self.name)) class FileEntry: def __init__(self, id: Sha1Git, name: bytes) -> None: self.id = id self.name = name def __str__(self) -> str: return f"" def __eq__(self, other) -> bool: return isinstance(other, FileEntry) and (self.id, self.name) == ( other.id, other.name, ) def __hash__(self) -> int: return hash((self.id, self.name)) diff --git a/swh/provenance/multiplexer/__init__.py b/swh/provenance/multiplexer/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/swh/provenance/multiplexer/archive.py b/swh/provenance/multiplexer/archive.py new file mode 100644 index 0000000..a547988 --- /dev/null +++ b/swh/provenance/multiplexer/archive.py @@ -0,0 +1,77 @@ +# Copyright (C) 2022 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +import logging +from typing import Any, Dict, Iterable, List + +from swh.core.statsd import statsd +from swh.model.model import Sha1Git +from swh.provenance.archive import ArchiveInterface +from swh.storage.interface import StorageInterface + +ARCHIVE_DURATION_METRIC = "swh_provenance_archive_multiplexed_duration_seconds" + +LOGGER = logging.getLogger(__name__) + + +class ArchiveMultiplexed: + storage: StorageInterface + + def __init__(self, archives: List[ArchiveInterface]) -> None: + self.archives = archives + + @statsd.timed(metric=ARCHIVE_DURATION_METRIC, tags={"method": "directory_ls"}) + def directory_ls(self, id: Sha1Git, minsize: int = 0) -> Iterable[Dict[str, Any]]: + directories = None + for archive in self.archives: + try: + directories = list(archive.directory_ls(id)) + except NotImplementedError: + pass + + if directories: + return directories + LOGGER.debug( + "No parents found for revision %s via %s", id.hex(), archive.__class__ + ) + + return [] + + @statsd.timed( + metric=ARCHIVE_DURATION_METRIC, tags={"method": "revision_get_parents"} + ) + def revision_get_parents(self, id: Sha1Git) -> Iterable[Sha1Git]: + + for archive in self.archives: + parents = list(archive.revision_get_parents(id)) + if parents: + return parents + LOGGER.debug( + "No parents found for revision %s via %s", id.hex(), archive.__class__ + ) + + return [] + + @statsd.timed(metric=ARCHIVE_DURATION_METRIC, tags={"method": "snapshot_get_heads"}) + def snapshot_get_heads(self, id: Sha1Git) -> Iterable[Sha1Git]: + for archive in self.archives: + + try: + heads = list(archive.snapshot_get_heads(id)) + + if heads: + return heads + LOGGER.debug( + "No heads found for snapshot %s via %s", str(id), archive.__class__ + ) + except Exception as e: + LOGGER.warn( + "Error retrieving heads of snapshots %s via %s: %s", + id.hex(), + archive.__class__, + e, + ) + + return [] diff --git a/swh/provenance/tests/test_archive_interface.py b/swh/provenance/tests/test_archive_interface.py index 6d95fb0..77c0c81 100644 --- a/swh/provenance/tests/test_archive_interface.py +++ b/swh/provenance/tests/test_archive_interface.py @@ -1,216 +1,248 @@ -# Copyright (C) 2021 The Software Heritage developers +# Copyright (C) 2021-2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from collections import Counter from operator import itemgetter +from typing import Any from typing import Counter as TCounter -from typing import Dict, List, Set, Tuple, Type, Union +from typing import Dict, Iterable, List, Set, Tuple, Type, Union import pytest from swh.core.db import BaseDb from swh.graph.naive_client import NaiveClient from swh.model.model import ( BaseModel, Content, Directory, DirectoryEntry, Origin, OriginVisit, OriginVisitStatus, Revision, Sha1Git, Snapshot, SnapshotBranch, TargetType, ) from swh.model.swhids import CoreSWHID, ExtendedObjectType, ExtendedSWHID from swh.provenance.archive import ArchiveInterface +from swh.provenance.multiplexer.archive import ArchiveMultiplexed from swh.provenance.postgresql.archive import ArchivePostgreSQL from swh.provenance.storage.archive import ArchiveStorage from swh.provenance.swhgraph.archive import ArchiveGraph from swh.provenance.tests.conftest import fill_storage, load_repo_data +from swh.storage.interface import StorageInterface from swh.storage.postgresql.storage import Storage +class ArchiveNoop: + storage: StorageInterface + + def directory_ls(self, id: Sha1Git, minsize: int = 0) -> Iterable[Dict[str, Any]]: + return [] + + def revision_get_parents(self, id: Sha1Git) -> Iterable[Sha1Git]: + return [] + + def snapshot_get_heads(self, id: Sha1Git) -> Iterable[Sha1Git]: + return [] + + def check_directory_ls( reference: ArchiveInterface, archive: ArchiveInterface, data: Dict[str, List[dict]] ) -> None: for directory in data["directory"]: entries_ref = sorted( reference.directory_ls(directory["id"]), key=itemgetter("name") ) entries = sorted(archive.directory_ls(directory["id"]), key=itemgetter("name")) assert entries_ref == entries def check_revision_get_parents( reference: ArchiveInterface, archive: ArchiveInterface, data: Dict[str, List[dict]] ) -> None: for revision in data["revision"]: parents_ref: TCounter[Sha1Git] = Counter( reference.revision_get_parents(revision["id"]) ) parents: TCounter[Sha1Git] = Counter( archive.revision_get_parents(revision["id"]) ) assert parents_ref == parents def check_snapshot_get_heads( reference: ArchiveInterface, archive: ArchiveInterface, data: Dict[str, List[dict]] ) -> None: for snapshot in data["snapshot"]: heads_ref: TCounter[Sha1Git] = Counter( reference.snapshot_get_heads(snapshot["id"]) ) heads: TCounter[Sha1Git] = Counter(archive.snapshot_get_heads(snapshot["id"])) assert heads_ref == heads def get_object_class(object_type: str) -> Type[BaseModel]: if object_type == "origin": return Origin elif object_type == "origin_visit": return OriginVisit elif object_type == "origin_visit_status": return OriginVisitStatus elif object_type == "content": return Content elif object_type == "directory": return Directory elif object_type == "revision": return Revision elif object_type == "snapshot": return Snapshot raise ValueError def data_to_model(data: Dict[str, List[dict]]) -> Dict[str, List[BaseModel]]: model: Dict[str, List[BaseModel]] = {} for object_type, objects in data.items(): for object in objects: model.setdefault(object_type, []).append( get_object_class(object_type).from_dict(object) ) return model def add_link( edges: Set[ Tuple[ Union[CoreSWHID, ExtendedSWHID, str], Union[CoreSWHID, ExtendedSWHID, str] ] ], src_obj: Union[Origin, Snapshot, Revision, Directory, Content], dst_id: bytes, dst_type: ExtendedObjectType, ) -> None: swhid = ExtendedSWHID(object_type=dst_type, object_id=dst_id) edges.add((src_obj.swhid(), swhid)) def get_graph_data( data: Dict[str, List[dict]] ) -> Tuple[ List[Union[CoreSWHID, ExtendedSWHID, str]], List[ Tuple[ Union[CoreSWHID, ExtendedSWHID, str], Union[CoreSWHID, ExtendedSWHID, str] ] ], ]: nodes: Set[Union[CoreSWHID, ExtendedSWHID, str]] = set() edges: Set[ Tuple[ Union[CoreSWHID, ExtendedSWHID, str], Union[CoreSWHID, ExtendedSWHID, str] ] ] = set() model = data_to_model(data) for origin in model["origin"]: assert isinstance(origin, Origin) nodes.add(origin.swhid()) for status in model["origin_visit_status"]: assert isinstance(status, OriginVisitStatus) if status.origin == origin.url and status.snapshot is not None: add_link(edges, origin, status.snapshot, ExtendedObjectType.SNAPSHOT) for snapshot in model["snapshot"]: assert isinstance(snapshot, Snapshot) nodes.add(snapshot.swhid()) for branch in snapshot.branches.values(): assert isinstance(branch, SnapshotBranch) if branch.target_type in [TargetType.RELEASE, TargetType.REVISION]: target_type = ( ExtendedObjectType.RELEASE if branch.target_type == TargetType.RELEASE else ExtendedObjectType.REVISION ) add_link(edges, snapshot, branch.target, target_type) for revision in model["revision"]: assert isinstance(revision, Revision) nodes.add(revision.swhid()) # root directory add_link(edges, revision, revision.directory, ExtendedObjectType.DIRECTORY) # parent for parent in revision.parents: add_link(edges, revision, parent, ExtendedObjectType.REVISION) for directory in model["directory"]: assert isinstance(directory, Directory) nodes.add(directory.swhid()) for entry in directory.entries: assert isinstance(entry, DirectoryEntry) if entry.type == "file": target_type = ExtendedObjectType.CONTENT elif entry.type == "dir": target_type = ExtendedObjectType.DIRECTORY elif entry.type == "rev": target_type = ExtendedObjectType.REVISION add_link(edges, directory, entry.target, target_type) for content in model["content"]: assert isinstance(content, Content) nodes.add(content.swhid()) return list(nodes), list(edges) @pytest.mark.parametrize( "repo", ("cmdbts2", "out-of-order", "with-merges"), ) def test_archive_interface(repo: str, archive: ArchiveInterface) -> None: # read data/README.md for more details on how these datasets are generated data = load_repo_data(repo) fill_storage(archive.storage, data) # test against ArchiveStorage archive_api = ArchiveStorage(archive.storage) check_directory_ls(archive, archive_api, data) check_revision_get_parents(archive, archive_api, data) check_snapshot_get_heads(archive, archive_api, data) # test against ArchivePostgreSQL assert isinstance(archive.storage, Storage) dsn = archive.storage.get_db().conn.dsn with BaseDb.connect(dsn).conn as conn: BaseDb.adapt_conn(conn) archive_direct = ArchivePostgreSQL(conn) check_directory_ls(archive, archive_direct, data) check_revision_get_parents(archive, archive_direct, data) check_snapshot_get_heads(archive, archive_direct, data) # test against ArchiveGraph nodes, edges = get_graph_data(data) graph = NaiveClient(nodes=nodes, edges=edges) archive_graph = ArchiveGraph(graph, archive.storage) with pytest.raises(NotImplementedError): check_directory_ls(archive, archive_graph, data) check_revision_get_parents(archive, archive_graph, data) check_snapshot_get_heads(archive, archive_graph, data) + + # test against ArchiveMultiplexer + archive_multiplexed = ArchiveMultiplexed( + [ArchiveNoop(), archive_graph, archive_api] + ) + check_directory_ls(archive, archive_multiplexed, data) + check_revision_get_parents(archive, archive_multiplexed, data) + check_snapshot_get_heads(archive, archive_multiplexed, data) + + +def test_noop_multiplexer(): + archive = ArchiveMultiplexed([ArchiveNoop()]) + + assert not archive.directory_ls(Sha1Git(b"abcd")) + assert not archive.revision_get_parents(Sha1Git(b"abcd")) + assert not archive.snapshot_get_heads(Sha1Git(b"abcd")) diff --git a/swh/provenance/tests/test_init.py b/swh/provenance/tests/test_init.py new file mode 100644 index 0000000..fc190bf --- /dev/null +++ b/swh/provenance/tests/test_init.py @@ -0,0 +1,28 @@ +# Copyright (C) 2022 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +from swh.provenance import get_archive +from swh.provenance.multiplexer.archive import ArchiveMultiplexed +from swh.provenance.storage.archive import ArchiveStorage +from swh.provenance.swhgraph.archive import ArchiveGraph + + +def test_multiplexer_configuration(): + config = { + "archives": [ + { + "cls": "graph", + "url": "graph_url", + "storage": {"cls": "remote", "url": "storage_graph_url"}, + }, + {"cls": "api", "storage": {"cls": "remote", "url": "storage_api_url"}}, + ] + } + + archive = get_archive(cls="multiplexer", **config) + assert isinstance(archive, ArchiveMultiplexed) + assert len(archive.archives) == 2 + assert isinstance(archive.archives[0], ArchiveGraph) + assert isinstance(archive.archives[1], ArchiveStorage) diff --git a/swh/provenance/tools/origins/client.py b/swh/provenance/tools/origins/client.py index b161d12..659a78f 100755 --- a/swh/provenance/tools/origins/client.py +++ b/swh/provenance/tools/origins/client.py @@ -1,112 +1,117 @@ #!/usr/bin/env python +# Copyright (C) 2021-2022 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + import logging import logging.handlers import multiprocessing import os import sys import time from typing import Any, Callable, Dict, List, Optional import yaml import zmq from swh.core import config from swh.model.hashutil import hash_to_bytes from swh.provenance import get_archive, get_provenance from swh.provenance.origin import OriginEntry, origin_add CONFIG_ENVVAR = "SWH_CONFIG_FILENAME" DEFAULT_PATH = os.environ.get(CONFIG_ENVVAR, None) LOG_FORMAT = ( "%(asctime)s-%(levelname)s - %(process)d - %(name)s.%(funcName)s:%(lineno)d" ": %(message)s" ) class Client(multiprocessing.Process): def __init__( self, conf: Dict[str, Any], group: None = None, target: Optional[Callable[..., Any]] = ..., name: Optional[str] = ..., ) -> None: super().__init__(group=group, target=target, name=name) self.archive_conf = conf["archive"] self.storage_conf = conf["storage"] self.url = f"tcp://{conf['org_server']['host']}:{conf['org_server']['port']}" logging.info(f"Client {self.name} created") def run(self): logging.info(f"Client {self.name} started") # XXX: should we reconnect on each iteration to save resources? archive = get_archive(**self.archive_conf) context = zmq.Context() socket: zmq.Socket = context.socket(zmq.REQ) socket.connect(self.url) with get_provenance(**self.storage_conf) as provenance: while True: socket.send(b"NEXT") response = socket.recv_json() if response is None: break batch = [] for origin in response: batch.append( OriginEntry(origin["url"], hash_to_bytes(origin["snapshot"])) ) origin_add(provenance, archive, batch) logging.info(f"Client {self.name} stopped") if __name__ == "__main__": logging.basicConfig(level=logging.WARN, format=LOG_FORMAT) # Check parameters if len(sys.argv) != 2: print("usage: client ") exit(-1) processes = int(sys.argv[1]) config_file = None # TODO: add as a cli option if ( config_file is None and DEFAULT_PATH is not None and config.config_exists(DEFAULT_PATH) ): config_file = DEFAULT_PATH if config_file is None or not os.path.exists(config_file): print("No configuration provided") exit(-1) conf = yaml.safe_load(open(config_file, "rb"))["provenance"] # Start counter start = time.time() # Launch as many clients as requested clients: List[Client] = [] for idx in range(processes): logging.info(f"MAIN: launching process {idx}") client = Client(conf, name=f"worker{idx}") client.start() clients.append(client) # Wait for all processes to complete their work for client in clients: logging.info(f"MAIN: waiting for process {client.name} to finish") client.join() logging.info(f"MAIN: process {client.name} finished executing") # Stop counter and report elapsed time stop = time.time() print("Elapsed time:", stop - start, "seconds")