Page MenuHomeSoftware Heritage

D7985.id28772.diff
No OneTemporary

D7985.id28772.diff

diff --git a/docker/origin_client/config.yml b/docker/origin_client/config.yml
--- a/docker/origin_client/config.yml
+++ b/docker/origin_client/config.yml
@@ -16,19 +16,24 @@
archive:
- #cls: graph
- #url: http://graph.internal.softwareheritage.org:5009/graph
+ cls: multiplexer
+ archives:
+ - cls: graph
+ url: http://graph.internal.softwareheritage.org:5009/graph
+ storage:
+ cls: remote
+ url: http://webapp1.internal.softwareheritage.org:5002
+ - cls: api
+ storage:
+ cls: remote
+ url: http://webapp1.internal.softwareheritage.org:5002
# cls: direct
# db:
# host: swh-storage-db
# port: 5432
# dbname: swh
# user: guest
- cls: api
- storage:
- cls: remote
- # url: http://webapp.internal.staging.swh.network:5002/
- url: http://webapp1.internal.softwareheritage.org:5002
+ # cls: api
org_server: # origin provider
host: origin_server
diff --git a/requirements-swh.txt b/requirements-swh.txt
--- a/requirements-swh.txt
+++ b/requirements-swh.txt
@@ -2,3 +2,4 @@
swh.core[db,http] >= 0.14
swh.model >= 2.6.1
swh.storage
+swh.graph
diff --git a/swh/provenance/__init__.py b/swh/provenance/__init__.py
--- a/swh/provenance/__init__.py
+++ b/swh/provenance/__init__.py
@@ -44,6 +44,7 @@
elif cls == "graph":
try:
from swh.graph.client import RemoteGraphClient
+ from swh.storage import get_storage
from .swhgraph.archive import ArchiveGraph
@@ -54,6 +55,12 @@
raise EnvironmentError(
"Graph configuration required but module is not installed."
)
+ elif cls == "multiplexer":
+
+ from .multiplexer.archive import ArchiveMultiplexed
+
+ archives = list((get_archive(**archive) for archive in kwargs["archives"]))
+ return ArchiveMultiplexed(archives)
else:
raise ValueError
diff --git a/swh/provenance/model.py b/swh/provenance/model.py
--- a/swh/provenance/model.py
+++ b/swh/provenance/model.py
@@ -19,12 +19,18 @@
self.id = Origin(url=self.url).id
self.snapshot = snapshot
self._revisions: Optional[List[RevisionEntry]] = None
+ self._revisions_count = -1
def retrieve_revisions(self, archive: ArchiveInterface) -> None:
if self._revisions is None:
self._revisions = [
RevisionEntry(rev) for rev in archive.snapshot_get_heads(self.snapshot)
]
+ self._revisions_count = len(self._revisions)
+
+ @property
+ def revision_count(self) -> int:
+ return self._revisions_count
@property
def revisions(self) -> Iterator[RevisionEntry]:
diff --git a/swh/provenance/multiplexer/__init__.py b/swh/provenance/multiplexer/__init__.py
new file mode 100644
diff --git a/swh/provenance/multiplexer/archive.py b/swh/provenance/multiplexer/archive.py
new file mode 100644
--- /dev/null
+++ b/swh/provenance/multiplexer/archive.py
@@ -0,0 +1,63 @@
+# Copyright (C) 2022 The Software Heritage developers
+# See the AUTHORS file at the top-level directory of this distribution
+# License: GNU General Public License version 3, or any later version
+# See top-level LICENSE file for more information
+
+import logging
+
+from typing import List
+
+from swh.core.statsd import statsd
+from swh.model.model import ObjectType, Sha1Git, TargetType
+from swh.storage.interface import StorageInterface
+from swh.provenance.archive import ArchiveInterface
+from typing import Any, Dict, Iterable, Set, Tuple
+
+
+ARCHIVE_DURATION_METRIC = "swh_provenance_archive_multiplexed_duration_seconds"
+
+LOGGER = logging.getLogger(__name__)
+
+
+class ArchiveMultiplexed(ArchiveInterface):
+
+ def __init__(self, archives: List[ArchiveInterface]) -> None:
+ self.archives = archives
+
+ def directory_ls(self, id: Sha1Git, minsize: int = 0) -> Iterable[Dict[str, Any]]:
+ directories = None
+ for archive in self.archives:
+ try:
+ directories = list(archive.directory_ls(id))
+ except NotImplementedError:
+ pass
+
+ if directories:
+ return directories
+ LOGGER.debug("No parents found for revision %s via %s", id.hex(), archive.__class__)
+
+ return []
+
+ def revision_get_parents(self, id: Sha1Git) -> Iterable[Sha1Git]:
+
+ for archive in self.archives:
+ parents = list(archive.revision_get_parents(id))
+ if parents:
+ return parents
+ LOGGER.debug("No parents found for revision %s via %s", id.hex(), archive.__class__)
+
+ return []
+
+ def snapshot_get_heads(self, id: Sha1Git) -> Iterable[Sha1Git]:
+ for archive in self.archives:
+
+ try:
+ heads = list(archive.snapshot_get_heads(id))
+
+ if heads:
+ return heads
+ LOGGER.debug("No heads found for snapshot %s via %s", str(id), archive.__class__)
+ except Exception as e:
+ LOGGER.warn("Error retrieving heads of snapshots %s via %s: %s", id.hex(), archive.__class__, e)
+
+ return []
diff --git a/swh/provenance/origin.py b/swh/provenance/origin.py
--- a/swh/provenance/origin.py
+++ b/swh/provenance/origin.py
@@ -74,17 +74,37 @@
def proceed_origin(
provenance: ProvenanceInterface, archive: ArchiveInterface, origin: OriginEntry
) -> None:
- LOGGER.info("Processing origin %s", origin.url)
+ LOGGER.info("Processing origin=%s", origin)
start = datetime.now()
+
+ LOGGER.debug("Add origin")
provenance.origin_add(origin)
+
+ LOGGER.debug("Retrieving head revisions")
origin.retrieve_revisions(archive)
- for revision in origin.revisions:
+ LOGGER.info("%d heads founds", origin.revision_count)
+
+ for idx, revision in enumerate(origin.revisions):
+ LOGGER.info(
+ "checking revision %s (%d/%d)", revision, idx + 1, origin.revision_count
+ )
+
if not provenance.revision_is_head(revision):
+ LOGGER.debug("revision %s not in heads", revision)
+
graph = HistoryGraph(archive, revision)
+ LOGGER.debug("History graph built")
+
origin_add_revision(provenance, origin, graph)
+ LOGGER.debug("Revision added")
+
# head is treated separately
+ LOGGER.debug("Checking preferred origin")
check_preferred_origin(provenance, origin, revision)
+
+ LOGGER.debug("Adding revision to origin")
provenance.revision_add_to_origin(origin, revision)
+
end = datetime.now()
LOGGER.info("Processed origin %s in %s", origin.url, (end - start))
diff --git a/swh/provenance/tests/test_archive_interface.py b/swh/provenance/tests/test_archive_interface.py
--- a/swh/provenance/tests/test_archive_interface.py
+++ b/swh/provenance/tests/test_archive_interface.py
@@ -6,7 +6,7 @@
from collections import Counter
from operator import itemgetter
from typing import Counter as TCounter
-from typing import Dict, List, Set, Tuple, Type, Union
+from typing import Any, Dict, Iterable, List, Set, Tuple, Type, Union
import pytest
@@ -28,6 +28,7 @@
)
from swh.model.swhids import CoreSWHID, ExtendedObjectType, ExtendedSWHID
from swh.provenance.archive import ArchiveInterface
+from swh.provenance.multiplexer.archive import ArchiveMultiplexed
from swh.provenance.postgresql.archive import ArchivePostgreSQL
from swh.provenance.storage.archive import ArchiveStorage
from swh.provenance.swhgraph.archive import ArchiveGraph
@@ -35,6 +36,17 @@
from swh.storage.postgresql.storage import Storage
+class ArchiveNoop(ArchiveInterface):
+ def directory_ls(self, id: Sha1Git, minsize: int = 0) -> Iterable[Dict[str, Any]]:
+ return []
+
+ def revision_get_parents(self, id: Sha1Git) -> Iterable[Sha1Git]:
+ return []
+
+ def snapshot_get_heads(self, id: Sha1Git) -> Iterable[Sha1Git]:
+ return []
+
+
def check_directory_ls(
reference: ArchiveInterface, archive: ArchiveInterface, data: Dict[str, List[dict]]
) -> None:
@@ -214,3 +226,19 @@
check_directory_ls(archive, archive_graph, data)
check_revision_get_parents(archive, archive_graph, data)
check_snapshot_get_heads(archive, archive_graph, data)
+
+ # test against ArchiveMultiplexer
+ archive_multiplexed = ArchiveMultiplexed(
+ [ArchiveNoop(), archive_graph, archive_api]
+ )
+ check_directory_ls(archive, archive_multiplexed, data)
+ check_revision_get_parents(archive, archive_multiplexed, data)
+ check_snapshot_get_heads(archive, archive_multiplexed, data)
+
+
+def test_noop_multiplexer():
+ archive = ArchiveMultiplexed([ArchiveNoop()])
+
+ assert not archive.directory_ls(Sha1Git(b"abcd"))
+ assert not archive.revision_get_parents(Sha1Git(b"abcd"))
+ assert not archive.snapshot_get_heads(Sha1Git(b"abcd"))
diff --git a/swh/provenance/tests/test_init.py b/swh/provenance/tests/test_init.py
new file mode 100644
--- /dev/null
+++ b/swh/provenance/tests/test_init.py
@@ -0,0 +1,40 @@
+# Copyright (C) 2019-2021 The Software Heritage developers
+# See the AUTHORS file at the top-level directory of this distribution
+# License: GNU General Public License version 3, or any later version
+# See top-level LICENSE file for more information
+
+from unittest.mock import patch
+
+from swh.provenance import get_archive
+from swh.provenance.multiplexer.archive import ArchiveMultiplexed
+from swh.provenance.swhgraph.archive import ArchiveGraph
+from swh.provenance.storage.archive import ArchiveStorage
+
+
+def test_multiplexer_configuration():
+ config = {
+ "archives": [
+ {
+ "cls": "graph",
+ "url": "graph_url",
+ "storage": {
+ "cls": "remote",
+ "url": "storage_graph_url"
+ }
+ },
+ {
+ "cls": "api",
+ "storage": {
+ "cls": "remote",
+ "url": "storage_api_url"
+ }
+ }
+
+ ]
+ }
+
+ archive = get_archive(cls="multiplexer", **config)
+ assert isinstance(archive, ArchiveMultiplexed)
+ assert len(archive.archives) == 2
+ assert isinstance(archive.archives[0], ArchiveGraph)
+ assert isinstance(archive.archives[1], ArchiveStorage)
diff --git a/swh/provenance/tools/origins/client.py b/swh/provenance/tools/origins/client.py
--- a/swh/provenance/tools/origins/client.py
+++ b/swh/provenance/tools/origins/client.py
@@ -19,6 +19,10 @@
CONFIG_ENVVAR = "SWH_CONFIG_FILENAME"
DEFAULT_PATH = os.environ.get(CONFIG_ENVVAR, None)
+LOG_FORMAT = (
+ "%(asctime)s-%(levelname)s - %(process)d - %(name)s.%(funcName)s:%(lineno)d"
+ ": %(message)s"
+)
class Client(multiprocessing.Process):
@@ -63,7 +67,7 @@
if __name__ == "__main__":
- logging.basicConfig(level=logging.INFO)
+ logging.basicConfig(level=logging.WARN, format=LOG_FORMAT)
# Check parameters
if len(sys.argv) != 2:

File Metadata

Mime Type
text/plain
Expires
Wed, Jul 2, 10:47 AM (2 w, 22 h ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3229825

Event Timeline