Page Menu
Home
Software Heritage
Search
Configure Global Search
Log In
Files
F9312254
D7985.id28772.diff
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
11 KB
Subscribers
None
D7985.id28772.diff
View Options
diff --git a/docker/origin_client/config.yml b/docker/origin_client/config.yml
--- a/docker/origin_client/config.yml
+++ b/docker/origin_client/config.yml
@@ -16,19 +16,24 @@
archive:
- #cls: graph
- #url: http://graph.internal.softwareheritage.org:5009/graph
+ cls: multiplexer
+ archives:
+ - cls: graph
+ url: http://graph.internal.softwareheritage.org:5009/graph
+ storage:
+ cls: remote
+ url: http://webapp1.internal.softwareheritage.org:5002
+ - cls: api
+ storage:
+ cls: remote
+ url: http://webapp1.internal.softwareheritage.org:5002
# cls: direct
# db:
# host: swh-storage-db
# port: 5432
# dbname: swh
# user: guest
- cls: api
- storage:
- cls: remote
- # url: http://webapp.internal.staging.swh.network:5002/
- url: http://webapp1.internal.softwareheritage.org:5002
+ # cls: api
org_server: # origin provider
host: origin_server
diff --git a/requirements-swh.txt b/requirements-swh.txt
--- a/requirements-swh.txt
+++ b/requirements-swh.txt
@@ -2,3 +2,4 @@
swh.core[db,http] >= 0.14
swh.model >= 2.6.1
swh.storage
+swh.graph
diff --git a/swh/provenance/__init__.py b/swh/provenance/__init__.py
--- a/swh/provenance/__init__.py
+++ b/swh/provenance/__init__.py
@@ -44,6 +44,7 @@
elif cls == "graph":
try:
from swh.graph.client import RemoteGraphClient
+ from swh.storage import get_storage
from .swhgraph.archive import ArchiveGraph
@@ -54,6 +55,12 @@
raise EnvironmentError(
"Graph configuration required but module is not installed."
)
+ elif cls == "multiplexer":
+
+ from .multiplexer.archive import ArchiveMultiplexed
+
+ archives = list((get_archive(**archive) for archive in kwargs["archives"]))
+ return ArchiveMultiplexed(archives)
else:
raise ValueError
diff --git a/swh/provenance/model.py b/swh/provenance/model.py
--- a/swh/provenance/model.py
+++ b/swh/provenance/model.py
@@ -19,12 +19,18 @@
self.id = Origin(url=self.url).id
self.snapshot = snapshot
self._revisions: Optional[List[RevisionEntry]] = None
+ self._revisions_count = -1
def retrieve_revisions(self, archive: ArchiveInterface) -> None:
if self._revisions is None:
self._revisions = [
RevisionEntry(rev) for rev in archive.snapshot_get_heads(self.snapshot)
]
+ self._revisions_count = len(self._revisions)
+
+ @property
+ def revision_count(self) -> int:
+ return self._revisions_count
@property
def revisions(self) -> Iterator[RevisionEntry]:
diff --git a/swh/provenance/multiplexer/__init__.py b/swh/provenance/multiplexer/__init__.py
new file mode 100644
diff --git a/swh/provenance/multiplexer/archive.py b/swh/provenance/multiplexer/archive.py
new file mode 100644
--- /dev/null
+++ b/swh/provenance/multiplexer/archive.py
@@ -0,0 +1,63 @@
+# Copyright (C) 2022 The Software Heritage developers
+# See the AUTHORS file at the top-level directory of this distribution
+# License: GNU General Public License version 3, or any later version
+# See top-level LICENSE file for more information
+
+import logging
+
+from typing import List
+
+from swh.core.statsd import statsd
+from swh.model.model import ObjectType, Sha1Git, TargetType
+from swh.storage.interface import StorageInterface
+from swh.provenance.archive import ArchiveInterface
+from typing import Any, Dict, Iterable, Set, Tuple
+
+
+ARCHIVE_DURATION_METRIC = "swh_provenance_archive_multiplexed_duration_seconds"
+
+LOGGER = logging.getLogger(__name__)
+
+
+class ArchiveMultiplexed(ArchiveInterface):
+
+ def __init__(self, archives: List[ArchiveInterface]) -> None:
+ self.archives = archives
+
+ def directory_ls(self, id: Sha1Git, minsize: int = 0) -> Iterable[Dict[str, Any]]:
+ directories = None
+ for archive in self.archives:
+ try:
+ directories = list(archive.directory_ls(id))
+ except NotImplementedError:
+ pass
+
+ if directories:
+ return directories
+ LOGGER.debug("No parents found for revision %s via %s", id.hex(), archive.__class__)
+
+ return []
+
+ def revision_get_parents(self, id: Sha1Git) -> Iterable[Sha1Git]:
+
+ for archive in self.archives:
+ parents = list(archive.revision_get_parents(id))
+ if parents:
+ return parents
+ LOGGER.debug("No parents found for revision %s via %s", id.hex(), archive.__class__)
+
+ return []
+
+ def snapshot_get_heads(self, id: Sha1Git) -> Iterable[Sha1Git]:
+ for archive in self.archives:
+
+ try:
+ heads = list(archive.snapshot_get_heads(id))
+
+ if heads:
+ return heads
+ LOGGER.debug("No heads found for snapshot %s via %s", str(id), archive.__class__)
+ except Exception as e:
+ LOGGER.warn("Error retrieving heads of snapshots %s via %s: %s", id.hex(), archive.__class__, e)
+
+ return []
diff --git a/swh/provenance/origin.py b/swh/provenance/origin.py
--- a/swh/provenance/origin.py
+++ b/swh/provenance/origin.py
@@ -74,17 +74,37 @@
def proceed_origin(
provenance: ProvenanceInterface, archive: ArchiveInterface, origin: OriginEntry
) -> None:
- LOGGER.info("Processing origin %s", origin.url)
+ LOGGER.info("Processing origin=%s", origin)
start = datetime.now()
+
+ LOGGER.debug("Add origin")
provenance.origin_add(origin)
+
+ LOGGER.debug("Retrieving head revisions")
origin.retrieve_revisions(archive)
- for revision in origin.revisions:
+ LOGGER.info("%d heads founds", origin.revision_count)
+
+ for idx, revision in enumerate(origin.revisions):
+ LOGGER.info(
+ "checking revision %s (%d/%d)", revision, idx + 1, origin.revision_count
+ )
+
if not provenance.revision_is_head(revision):
+ LOGGER.debug("revision %s not in heads", revision)
+
graph = HistoryGraph(archive, revision)
+ LOGGER.debug("History graph built")
+
origin_add_revision(provenance, origin, graph)
+ LOGGER.debug("Revision added")
+
# head is treated separately
+ LOGGER.debug("Checking preferred origin")
check_preferred_origin(provenance, origin, revision)
+
+ LOGGER.debug("Adding revision to origin")
provenance.revision_add_to_origin(origin, revision)
+
end = datetime.now()
LOGGER.info("Processed origin %s in %s", origin.url, (end - start))
diff --git a/swh/provenance/tests/test_archive_interface.py b/swh/provenance/tests/test_archive_interface.py
--- a/swh/provenance/tests/test_archive_interface.py
+++ b/swh/provenance/tests/test_archive_interface.py
@@ -6,7 +6,7 @@
from collections import Counter
from operator import itemgetter
from typing import Counter as TCounter
-from typing import Dict, List, Set, Tuple, Type, Union
+from typing import Any, Dict, Iterable, List, Set, Tuple, Type, Union
import pytest
@@ -28,6 +28,7 @@
)
from swh.model.swhids import CoreSWHID, ExtendedObjectType, ExtendedSWHID
from swh.provenance.archive import ArchiveInterface
+from swh.provenance.multiplexer.archive import ArchiveMultiplexed
from swh.provenance.postgresql.archive import ArchivePostgreSQL
from swh.provenance.storage.archive import ArchiveStorage
from swh.provenance.swhgraph.archive import ArchiveGraph
@@ -35,6 +36,17 @@
from swh.storage.postgresql.storage import Storage
+class ArchiveNoop(ArchiveInterface):
+ def directory_ls(self, id: Sha1Git, minsize: int = 0) -> Iterable[Dict[str, Any]]:
+ return []
+
+ def revision_get_parents(self, id: Sha1Git) -> Iterable[Sha1Git]:
+ return []
+
+ def snapshot_get_heads(self, id: Sha1Git) -> Iterable[Sha1Git]:
+ return []
+
+
def check_directory_ls(
reference: ArchiveInterface, archive: ArchiveInterface, data: Dict[str, List[dict]]
) -> None:
@@ -214,3 +226,19 @@
check_directory_ls(archive, archive_graph, data)
check_revision_get_parents(archive, archive_graph, data)
check_snapshot_get_heads(archive, archive_graph, data)
+
+ # test against ArchiveMultiplexer
+ archive_multiplexed = ArchiveMultiplexed(
+ [ArchiveNoop(), archive_graph, archive_api]
+ )
+ check_directory_ls(archive, archive_multiplexed, data)
+ check_revision_get_parents(archive, archive_multiplexed, data)
+ check_snapshot_get_heads(archive, archive_multiplexed, data)
+
+
+def test_noop_multiplexer():
+ archive = ArchiveMultiplexed([ArchiveNoop()])
+
+ assert not archive.directory_ls(Sha1Git(b"abcd"))
+ assert not archive.revision_get_parents(Sha1Git(b"abcd"))
+ assert not archive.snapshot_get_heads(Sha1Git(b"abcd"))
diff --git a/swh/provenance/tests/test_init.py b/swh/provenance/tests/test_init.py
new file mode 100644
--- /dev/null
+++ b/swh/provenance/tests/test_init.py
@@ -0,0 +1,40 @@
+# Copyright (C) 2019-2021 The Software Heritage developers
+# See the AUTHORS file at the top-level directory of this distribution
+# License: GNU General Public License version 3, or any later version
+# See top-level LICENSE file for more information
+
+from unittest.mock import patch
+
+from swh.provenance import get_archive
+from swh.provenance.multiplexer.archive import ArchiveMultiplexed
+from swh.provenance.swhgraph.archive import ArchiveGraph
+from swh.provenance.storage.archive import ArchiveStorage
+
+
+def test_multiplexer_configuration():
+ config = {
+ "archives": [
+ {
+ "cls": "graph",
+ "url": "graph_url",
+ "storage": {
+ "cls": "remote",
+ "url": "storage_graph_url"
+ }
+ },
+ {
+ "cls": "api",
+ "storage": {
+ "cls": "remote",
+ "url": "storage_api_url"
+ }
+ }
+
+ ]
+ }
+
+ archive = get_archive(cls="multiplexer", **config)
+ assert isinstance(archive, ArchiveMultiplexed)
+ assert len(archive.archives) == 2
+ assert isinstance(archive.archives[0], ArchiveGraph)
+ assert isinstance(archive.archives[1], ArchiveStorage)
diff --git a/swh/provenance/tools/origins/client.py b/swh/provenance/tools/origins/client.py
--- a/swh/provenance/tools/origins/client.py
+++ b/swh/provenance/tools/origins/client.py
@@ -19,6 +19,10 @@
CONFIG_ENVVAR = "SWH_CONFIG_FILENAME"
DEFAULT_PATH = os.environ.get(CONFIG_ENVVAR, None)
+LOG_FORMAT = (
+ "%(asctime)s-%(levelname)s - %(process)d - %(name)s.%(funcName)s:%(lineno)d"
+ ": %(message)s"
+)
class Client(multiprocessing.Process):
@@ -63,7 +67,7 @@
if __name__ == "__main__":
- logging.basicConfig(level=logging.INFO)
+ logging.basicConfig(level=logging.WARN, format=LOG_FORMAT)
# Check parameters
if len(sys.argv) != 2:
File Metadata
Details
Attached
Mime Type
text/plain
Expires
Wed, Jul 2, 10:47 AM (2 w, 22 h ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3229825
Attached To
D7985: [provenance] Implement a naive archive multiplexer
Event Timeline
Log In to Comment