Page Menu
Home
Software Heritage
Search
Configure Global Search
Log In
Files
F9312296
D7985.id28798.diff
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
12 KB
Subscribers
None
D7985.id28798.diff
View Options
diff --git a/docker/origin_client/config.yml b/docker/origin_client/config.yml
--- a/docker/origin_client/config.yml
+++ b/docker/origin_client/config.yml
@@ -16,19 +16,24 @@
archive:
- #cls: graph
- #url: http://graph.internal.softwareheritage.org:5009/graph
+ cls: multiplexer
+ archives:
+ - cls: graph
+ url: http://graph.internal.softwareheritage.org:5009/graph
+ storage:
+ cls: remote
+ url: http://webapp1.internal.softwareheritage.org:5002
+ - cls: api
+ storage:
+ cls: remote
+ url: http://webapp1.internal.softwareheritage.org:5002
# cls: direct
# db:
# host: swh-storage-db
# port: 5432
# dbname: swh
# user: guest
- cls: api
- storage:
- cls: remote
- # url: http://webapp.internal.staging.swh.network:5002/
- url: http://webapp1.internal.softwareheritage.org:5002
+ # cls: api
org_server: # origin provider
host: origin_server
diff --git a/requirements-swh.txt b/requirements-swh.txt
--- a/requirements-swh.txt
+++ b/requirements-swh.txt
@@ -2,3 +2,4 @@
swh.core[db,http] >= 0.14
swh.model >= 2.6.1
swh.storage
+swh.graph
diff --git a/swh/provenance/__init__.py b/swh/provenance/__init__.py
--- a/swh/provenance/__init__.py
+++ b/swh/provenance/__init__.py
@@ -44,6 +44,7 @@
elif cls == "graph":
try:
from swh.graph.client import RemoteGraphClient
+ from swh.storage import get_storage
from .swhgraph.archive import ArchiveGraph
@@ -54,6 +55,12 @@
raise EnvironmentError(
"Graph configuration required but module is not installed."
)
+ elif cls == "multiplexer":
+
+ from .multiplexer.archive import ArchiveMultiplexed
+
+ archives = list((get_archive(**archive) for archive in kwargs["archives"]))
+ return ArchiveMultiplexed(archives)
else:
raise ValueError
diff --git a/swh/provenance/model.py b/swh/provenance/model.py
--- a/swh/provenance/model.py
+++ b/swh/provenance/model.py
@@ -1,4 +1,4 @@
-# Copyright (C) 2021 The Software Heritage developers
+# Copyright (C) 2021-2022 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
@@ -14,6 +14,9 @@
class OriginEntry:
+
+ revisions_count: int
+
def __init__(self, url: str, snapshot: Sha1Git) -> None:
self.url = url
self.id = Origin(url=self.url).id
@@ -25,6 +28,13 @@
self._revisions = [
RevisionEntry(rev) for rev in archive.snapshot_get_heads(self.snapshot)
]
+ self._revisions_count = len(self._revisions)
+
+ @property
+ def revision_count(self) -> int:
+ if self._revisions_count is None:
+ raise ValueError("retrieve_revisions was not called")
+ return self._revisions_count
@property
def revisions(self) -> Iterator[RevisionEntry]:
diff --git a/swh/provenance/multiplexer/__init__.py b/swh/provenance/multiplexer/__init__.py
new file mode 100644
diff --git a/swh/provenance/multiplexer/archive.py b/swh/provenance/multiplexer/archive.py
new file mode 100644
--- /dev/null
+++ b/swh/provenance/multiplexer/archive.py
@@ -0,0 +1,77 @@
+# Copyright (C) 2022 The Software Heritage developers
+# See the AUTHORS file at the top-level directory of this distribution
+# License: GNU General Public License version 3, or any later version
+# See top-level LICENSE file for more information
+
+import logging
+from typing import Any, Dict, Iterable, List
+
+from swh.core.statsd import statsd
+from swh.model.model import Sha1Git
+from swh.provenance.archive import ArchiveInterface
+from swh.storage.interface import StorageInterface
+
+ARCHIVE_DURATION_METRIC = "swh_provenance_archive_multiplexed_duration_seconds"
+
+LOGGER = logging.getLogger(__name__)
+
+
+class ArchiveMultiplexed:
+ storage: StorageInterface
+
+ def __init__(self, archives: List[ArchiveInterface]) -> None:
+ self.archives = archives
+
+ @statsd.timed(metric=ARCHIVE_DURATION_METRIC, tags={"method": "directory_ls"})
+ def directory_ls(self, id: Sha1Git, minsize: int = 0) -> Iterable[Dict[str, Any]]:
+ directories = None
+ for archive in self.archives:
+ try:
+ directories = list(archive.directory_ls(id))
+ except NotImplementedError:
+ pass
+
+ if directories:
+ return directories
+ LOGGER.debug(
+ "No parents found for revision %s via %s", id.hex(), archive.__class__
+ )
+
+ return []
+
+ @statsd.timed(
+ metric=ARCHIVE_DURATION_METRIC, tags={"method": "revision_get_parents"}
+ )
+ def revision_get_parents(self, id: Sha1Git) -> Iterable[Sha1Git]:
+
+ for archive in self.archives:
+ parents = list(archive.revision_get_parents(id))
+ if parents:
+ return parents
+ LOGGER.debug(
+ "No parents found for revision %s via %s", id.hex(), archive.__class__
+ )
+
+ return []
+
+ @statsd.timed(metric=ARCHIVE_DURATION_METRIC, tags={"method": "snapshot_get_heads"})
+ def snapshot_get_heads(self, id: Sha1Git) -> Iterable[Sha1Git]:
+ for archive in self.archives:
+
+ try:
+ heads = list(archive.snapshot_get_heads(id))
+
+ if heads:
+ return heads
+ LOGGER.debug(
+ "No heads found for snapshot %s via %s", str(id), archive.__class__
+ )
+ except Exception as e:
+ LOGGER.warn(
+ "Error retrieving heads of snapshots %s via %s: %s",
+ id.hex(),
+ archive.__class__,
+ e,
+ )
+
+ return []
diff --git a/swh/provenance/origin.py b/swh/provenance/origin.py
--- a/swh/provenance/origin.py
+++ b/swh/provenance/origin.py
@@ -74,17 +74,37 @@
def proceed_origin(
provenance: ProvenanceInterface, archive: ArchiveInterface, origin: OriginEntry
) -> None:
- LOGGER.info("Processing origin %s", origin.url)
+ LOGGER.info("Processing origin=%s", origin)
start = datetime.now()
+
+ LOGGER.debug("Add origin")
provenance.origin_add(origin)
+
+ LOGGER.debug("Retrieving head revisions")
origin.retrieve_revisions(archive)
- for revision in origin.revisions:
+ LOGGER.info("%d heads founds", origin.revision_count)
+
+ for idx, revision in enumerate(origin.revisions):
+ LOGGER.info(
+ "checking revision %s (%d/%d)", revision, idx + 1, origin.revision_count
+ )
+
if not provenance.revision_is_head(revision):
+ LOGGER.debug("revision %s not in heads", revision)
+
graph = HistoryGraph(archive, revision)
+ LOGGER.debug("History graph built")
+
origin_add_revision(provenance, origin, graph)
+ LOGGER.debug("Revision added")
+
# head is treated separately
+ LOGGER.debug("Checking preferred origin")
check_preferred_origin(provenance, origin, revision)
+
+ LOGGER.debug("Adding revision to origin")
provenance.revision_add_to_origin(origin, revision)
+
end = datetime.now()
LOGGER.info("Processed origin %s in %s", origin.url, (end - start))
diff --git a/swh/provenance/tests/test_archive_interface.py b/swh/provenance/tests/test_archive_interface.py
--- a/swh/provenance/tests/test_archive_interface.py
+++ b/swh/provenance/tests/test_archive_interface.py
@@ -1,12 +1,13 @@
-# Copyright (C) 2021 The Software Heritage developers
+# Copyright (C) 2021-2022 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
from collections import Counter
from operator import itemgetter
+from typing import Any
from typing import Counter as TCounter
-from typing import Dict, List, Set, Tuple, Type, Union
+from typing import Dict, Iterable, List, Set, Tuple, Type, Union
import pytest
@@ -28,13 +29,28 @@
)
from swh.model.swhids import CoreSWHID, ExtendedObjectType, ExtendedSWHID
from swh.provenance.archive import ArchiveInterface
+from swh.provenance.multiplexer.archive import ArchiveMultiplexed
from swh.provenance.postgresql.archive import ArchivePostgreSQL
from swh.provenance.storage.archive import ArchiveStorage
from swh.provenance.swhgraph.archive import ArchiveGraph
from swh.provenance.tests.conftest import fill_storage, load_repo_data
+from swh.storage.interface import StorageInterface
from swh.storage.postgresql.storage import Storage
+class ArchiveNoop:
+ storage: StorageInterface
+
+ def directory_ls(self, id: Sha1Git, minsize: int = 0) -> Iterable[Dict[str, Any]]:
+ return []
+
+ def revision_get_parents(self, id: Sha1Git) -> Iterable[Sha1Git]:
+ return []
+
+ def snapshot_get_heads(self, id: Sha1Git) -> Iterable[Sha1Git]:
+ return []
+
+
def check_directory_ls(
reference: ArchiveInterface, archive: ArchiveInterface, data: Dict[str, List[dict]]
) -> None:
@@ -214,3 +230,19 @@
check_directory_ls(archive, archive_graph, data)
check_revision_get_parents(archive, archive_graph, data)
check_snapshot_get_heads(archive, archive_graph, data)
+
+ # test against ArchiveMultiplexer
+ archive_multiplexed = ArchiveMultiplexed(
+ [ArchiveNoop(), archive_graph, archive_api]
+ )
+ check_directory_ls(archive, archive_multiplexed, data)
+ check_revision_get_parents(archive, archive_multiplexed, data)
+ check_snapshot_get_heads(archive, archive_multiplexed, data)
+
+
+def test_noop_multiplexer():
+ archive = ArchiveMultiplexed([ArchiveNoop()])
+
+ assert not archive.directory_ls(Sha1Git(b"abcd"))
+ assert not archive.revision_get_parents(Sha1Git(b"abcd"))
+ assert not archive.snapshot_get_heads(Sha1Git(b"abcd"))
diff --git a/swh/provenance/tests/test_init.py b/swh/provenance/tests/test_init.py
new file mode 100644
--- /dev/null
+++ b/swh/provenance/tests/test_init.py
@@ -0,0 +1,28 @@
+# Copyright (C) 2022 The Software Heritage developers
+# See the AUTHORS file at the top-level directory of this distribution
+# License: GNU General Public License version 3, or any later version
+# See top-level LICENSE file for more information
+
+from swh.provenance import get_archive
+from swh.provenance.multiplexer.archive import ArchiveMultiplexed
+from swh.provenance.storage.archive import ArchiveStorage
+from swh.provenance.swhgraph.archive import ArchiveGraph
+
+
+def test_multiplexer_configuration():
+ config = {
+ "archives": [
+ {
+ "cls": "graph",
+ "url": "graph_url",
+ "storage": {"cls": "remote", "url": "storage_graph_url"},
+ },
+ {"cls": "api", "storage": {"cls": "remote", "url": "storage_api_url"}},
+ ]
+ }
+
+ archive = get_archive(cls="multiplexer", **config)
+ assert isinstance(archive, ArchiveMultiplexed)
+ assert len(archive.archives) == 2
+ assert isinstance(archive.archives[0], ArchiveGraph)
+ assert isinstance(archive.archives[1], ArchiveStorage)
diff --git a/swh/provenance/tools/origins/client.py b/swh/provenance/tools/origins/client.py
--- a/swh/provenance/tools/origins/client.py
+++ b/swh/provenance/tools/origins/client.py
@@ -1,5 +1,10 @@
#!/usr/bin/env python
+# Copyright (C) 2021-2022 The Software Heritage developers
+# See the AUTHORS file at the top-level directory of this distribution
+# License: GNU General Public License version 3, or any later version
+# See top-level LICENSE file for more information
+
import logging
import logging.handlers
import multiprocessing
@@ -19,6 +24,10 @@
CONFIG_ENVVAR = "SWH_CONFIG_FILENAME"
DEFAULT_PATH = os.environ.get(CONFIG_ENVVAR, None)
+LOG_FORMAT = (
+ "%(asctime)s-%(levelname)s - %(process)d - %(name)s.%(funcName)s:%(lineno)d"
+ ": %(message)s"
+)
class Client(multiprocessing.Process):
@@ -63,7 +72,7 @@
if __name__ == "__main__":
- logging.basicConfig(level=logging.INFO)
+ logging.basicConfig(level=logging.WARN, format=LOG_FORMAT)
# Check parameters
if len(sys.argv) != 2:
File Metadata
Details
Attached
Mime Type
text/plain
Expires
Wed, Jul 2, 10:48 AM (1 w, 6 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3227245
Attached To
D7985: [provenance] Implement a naive archive multiplexer
Event Timeline
Log In to Comment