diff --git a/swh/provenance/archive.py b/swh/provenance/archive.py --- a/swh/provenance/archive.py +++ b/swh/provenance/archive.py @@ -1,4 +1,4 @@ -from typing import Any, Dict, List +from typing import Any, Dict, Iterable, List from typing_extensions import Protocol, runtime_checkable @@ -17,10 +17,10 @@ def iter_origin_visit_statuses(self, origin: str, visit: int): ... - def release_get(self, ids: List[bytes]): + def release_get(self, ids: Iterable[bytes]): ... - def revision_get(self, ids: List[bytes]): + def revision_get(self, ids: Iterable[bytes]): ... def snapshot_get_all_branches(self, snapshot: bytes): diff --git a/swh/provenance/cli.py b/swh/provenance/cli.py --- a/swh/provenance/cli.py +++ b/swh/provenance/cli.py @@ -156,7 +156,7 @@ provenance = get_provenance(**ctx.obj["config"]["provenance"]) for origin in FileOriginIterator(filename, archive, limit=limit): - origin_add(provenance, origin) + origin_add(archive, provenance, origin) @cli.command(name="find-first") diff --git a/swh/provenance/model.py b/swh/provenance/model.py --- a/swh/provenance/model.py +++ b/swh/provenance/model.py @@ -1,32 +1,75 @@ +# Copyright (C) 2021 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +from datetime import datetime +from typing import Iterable, List, Optional, Union + from .archive import ArchiveInterface -class TreeEntry: - def __init__(self, id: bytes, name: bytes): +class OriginEntry: + def __init__(self, url, revisions: Iterable["RevisionEntry"], id=None): self.id = id - self.name = name + self.url = url + self.revisions = revisions -class DirectoryEntry(TreeEntry): - def __init__(self, archive: ArchiveInterface, id: bytes, name: bytes): - super().__init__(id, name) - self.archive = archive - self.children = None +class RevisionEntry: + def __init__( + self, + id: bytes, + date: Optional[datetime] = None, + root: Optional[bytes] = None, + parents: Optional[Iterable[bytes]] = None, + ): + self.id = id + self.date = date + assert self.date is None or self.date.tzinfo is not None + self.root = root + self._parents = parents + self._nodes: List[RevisionEntry] = [] - def __iter__(self): - if self.children is None: - self.children = [] - for child in self.archive.directory_ls(self.id): - if child["type"] == "dir": - self.children.append( - DirectoryEntry(self.archive, child["target"], child["name"]) + def parents(self, archive: ArchiveInterface): + if self._parents is None: + # XXX: no check is done to ensure node.id is a known revision in + # the SWH archive + self._parents = archive.revision_get([self.id])[0].parents + if self._parents: + self._nodes = [ + RevisionEntry( + id=rev.id, + root=rev.directory, + date=rev.date, + parents=rev.parents, ) + for rev in archive.revision_get(self._parents) + if rev + ] + yield from self._nodes - elif child["type"] == "file": - self.children.append(FileEntry(child["target"], child["name"])) - return iter(self.children) +class DirectoryEntry: + def __init__(self, id: bytes, name: bytes): + self.id = id + self.name = name + self._children: Optional[List[Union[DirectoryEntry, FileEntry]]] = None + + def ls(self, archive: ArchiveInterface): + if self._children is None: + self._children = [] + for child in archive.directory_ls(self.id): + if child["type"] == "dir": + self._children.append( + DirectoryEntry(child["target"], child["name"]) + ) + elif child["type"] == "file": + self._children.append(FileEntry(child["target"], child["name"])) + yield from self._children -class FileEntry(TreeEntry): - pass +class FileEntry: + def __init__(self, id: bytes, name: bytes): + self.id = id + self.name = name diff --git a/swh/provenance/origin.py b/swh/provenance/origin.py --- a/swh/provenance/origin.py +++ b/swh/provenance/origin.py @@ -3,31 +3,13 @@ from swh.model.model import ObjectType, Origin, TargetType from .archive import ArchiveInterface -from .revision import RevisionEntry - - -class OriginEntry: - def __init__(self, url, revisions, id=None): - self.id = id - self.url = url - self.revisions = revisions - +from .model import OriginEntry, RevisionEntry ################################################################################ ################################################################################ -class OriginIterator: - """Iterator interface.""" - - def __iter__(self): - pass - - def __next__(self): - pass - - -class FileOriginIterator(OriginIterator): +class FileOriginIterator: """Iterator over origins present in the given CSV file.""" def __init__( @@ -35,7 +17,6 @@ ): self.file = open(filename) self.limit = limit - # self.mutex = threading.Lock() self.archive = archive def __iter__(self): @@ -49,7 +30,6 @@ def __init__(self, archive: ArchiveInterface, limit: Optional[int] = None): self.limit = limit - # self.mutex = threading.Lock() self.archive = archive def __iter__(self): @@ -81,31 +61,26 @@ # This is done to keep the query in release_get small, hence avoiding # a timeout. batchsize = 100 - releases = list(releases_set) - while releases: - for release in archive.release_get(releases[:batchsize]): + while releases_set: + releases = [ + releases_set.pop() for i in range(batchsize) if releases_set + ] + for release in archive.release_get(releases): if release is not None: if release.target_type == ObjectType.REVISION: targets_set.add(release.target) - releases[:batchsize] = [] # This is done to keep the query in revision_get small, hence avoiding # a timeout. revisions = set() - targets = list(targets_set) - while targets: - for revision in archive.revision_get(targets[:batchsize]): + while targets_set: + targets = [ + targets_set.pop() for i in range(batchsize) if targets_set + ] + for revision in archive.revision_get(targets): if revision is not None: - parents = list( - map( - lambda id: RevisionEntry(archive, id), - revision.parents, - ) - ) - revisions.add( - RevisionEntry(archive, revision.id, parents=parents) - ) - targets[:batchsize] = [] + revisions.add(RevisionEntry(revision.id)) + # target_set |= set(revision.parents) yield OriginEntry(status.origin, list(revisions)) diff --git a/swh/provenance/postgresql/archive.py b/swh/provenance/postgresql/archive.py --- a/swh/provenance/postgresql/archive.py +++ b/swh/provenance/postgresql/archive.py @@ -1,4 +1,4 @@ -from typing import Any, Dict, List +from typing import Any, Dict, Iterable, List from methodtools import lru_cache import psycopg2 @@ -67,10 +67,10 @@ def iter_origin_visit_statuses(self, origin: str, visit: int): raise NotImplementedError - def release_get(self, ids: List[bytes]): + def release_get(self, ids: Iterable[bytes]): raise NotImplementedError - def revision_get(self, ids: List[bytes]): + def revision_get(self, ids: Iterable[bytes]): raise NotImplementedError def snapshot_get_all_branches(self, snapshot: bytes): diff --git a/swh/provenance/provenance.py b/swh/provenance/provenance.py --- a/swh/provenance/provenance.py +++ b/swh/provenance/provenance.py @@ -5,9 +5,7 @@ from typing_extensions import Protocol, runtime_checkable from .archive import ArchiveInterface -from .model import DirectoryEntry, FileEntry, TreeEntry -from .origin import OriginEntry -from .revision import RevisionEntry +from .model import DirectoryEntry, FileEntry, OriginEntry, RevisionEntry UTCMIN = datetime.min.replace(tzinfo=timezone.utc) @@ -107,32 +105,40 @@ def directory_process_content( - provenance: ProvenanceInterface, directory: DirectoryEntry, relative: DirectoryEntry + archive: ArchiveInterface, + provenance: ProvenanceInterface, + directory: DirectoryEntry, + relative: DirectoryEntry, ) -> None: stack = [(directory, b"")] while stack: current, prefix = stack.pop() - for child in iter(current): + for child in current.ls(archive): if isinstance(child, FileEntry): # Add content to the relative directory with the computed prefix. provenance.content_add_to_directory(relative, child, prefix) - else: + elif isinstance(child, DirectoryEntry): # Recursively walk the child directory. stack.append((child, os.path.join(prefix, child.name))) -def origin_add(provenance: ProvenanceInterface, origin: OriginEntry) -> None: +def origin_add( + archive: ArchiveInterface, provenance: ProvenanceInterface, origin: OriginEntry +) -> None: # TODO: refactor to iterate over origin visit statuses and commit only once # per status. origin.id = provenance.origin_get_id(origin) for revision in origin.revisions: - origin_add_revision(provenance, origin, revision) + origin_add_revision(archive, provenance, origin, revision) # Commit after each revision provenance.commit() # TODO: verify this! def origin_add_revision( - provenance: ProvenanceInterface, origin: OriginEntry, revision: RevisionEntry + archive: ArchiveInterface, + provenance: ProvenanceInterface, + origin: OriginEntry, + revision: RevisionEntry, ) -> None: stack: List[Tuple[Optional[RevisionEntry], RevisionEntry]] = [(None, revision)] @@ -157,7 +163,7 @@ else: # This revision is a parent of another one in the history of the # relative revision. - for parent in iter(current): + for parent in current.parents(archive): visited = provenance.revision_visited(parent) if not visited: @@ -198,9 +204,10 @@ provenance.revision_add(revision) # TODO: add file size filtering revision_process_content( + archive, provenance, revision, - DirectoryEntry(archive, revision.root, b""), + DirectoryEntry(revision.root, b""), lower=lower, mindepth=mindepth, ) @@ -212,7 +219,7 @@ class IsochroneNode: def __init__( - self, entry: TreeEntry, dates: Dict[bytes, datetime] = {}, depth: int = 0 + self, entry: DirectoryEntry, dates: Dict[bytes, datetime] = {}, depth: int = 0 ): self.entry = entry self.depth = depth @@ -221,7 +228,7 @@ self.maxdate: Optional[datetime] = None def add_child( - self, child: TreeEntry, dates: Dict[bytes, datetime] = {} + self, child: DirectoryEntry, dates: Dict[bytes, datetime] = {} ) -> "IsochroneNode": assert isinstance(self.entry, DirectoryEntry) and self.date is None node = IsochroneNode(child, dates=dates, depth=self.depth + 1) @@ -230,9 +237,14 @@ def build_isochrone_graph( - provenance: ProvenanceInterface, revision: RevisionEntry, directory: DirectoryEntry + archive: ArchiveInterface, + provenance: ProvenanceInterface, + revision: RevisionEntry, + directory: DirectoryEntry, ) -> IsochroneNode: assert revision.date is not None + assert revision.root == directory.id + # Build the nodes structure root = IsochroneNode(directory) root.date = provenance.directory_get_date_in_isochrone_frontier(directory) @@ -251,12 +263,20 @@ # for the provenance object to have them cached and (potentially) improve # performance. ddates = provenance.directory_get_dates_in_isochrone_frontier( - [child for child in current.entry if isinstance(child, DirectoryEntry)] + [ + child + for child in current.entry.ls(archive) + if isinstance(child, DirectoryEntry) + ] ) fdates = provenance.content_get_early_dates( - [child for child in current.entry if isinstance(child, FileEntry)] + [ + child + for child in current.entry.ls(archive) + if isinstance(child, FileEntry) + ] ) - for child in current.entry: + for child in current.entry.ls(archive): # Recursively analyse directory nodes. if isinstance(child, DirectoryEntry): node = current.add_child(child, dates=ddates) @@ -298,6 +318,7 @@ def revision_process_content( + archive: ArchiveInterface, provenance: ProvenanceInterface, revision: RevisionEntry, root: DirectoryEntry, @@ -305,7 +326,7 @@ mindepth: int = 1, ): assert revision.date is not None - stack = [(build_isochrone_graph(provenance, revision, root), root.name)] + stack = [(build_isochrone_graph(archive, provenance, revision, root), root.name)] while stack: current, path = stack.pop() assert isinstance(current.entry, DirectoryEntry) @@ -327,7 +348,10 @@ ) provenance.directory_add_to_revision(revision, current.entry, path) directory_process_content( - provenance, directory=current.entry, relative=current.entry, + archive, + provenance, + directory=current.entry, + relative=current.entry, ) else: # No point moving the frontier here. Either there are no files or they diff --git a/swh/provenance/revision.py b/swh/provenance/revision.py --- a/swh/provenance/revision.py +++ b/swh/provenance/revision.py @@ -1,47 +1,13 @@ from datetime import datetime, timezone -import iso8601 from itertools import islice import threading from typing import Iterable, Iterator, Optional, Tuple -from swh.model.hashutil import hash_to_bytes - -from .archive import ArchiveInterface - - -class RevisionEntry: - def __init__( - self, - archive: ArchiveInterface, - id: bytes, - date: Optional[datetime] = None, - root: Optional[bytes] = None, - parents: Optional[list] = None, - ): - self.archive = archive - self.id = id - self.date = date - assert self.date is None or self.date.tzinfo is not None - self.parents = parents - self.root = root - - def __iter__(self): - if self.parents is None: - self.parents = [] - for parent in self.archive.revision_get([self.id]): - if parent is not None: - self.parents.append( - RevisionEntry( - self.archive, - parent.id, - parents=[ - RevisionEntry(self.archive, id) for id in parent.parents - ], - ) - ) - - return iter(self.parents) +import iso8601 +from swh.model.hashutil import hash_to_bytes +from swh.provenance.archive import ArchiveInterface +from swh.provenance.model import RevisionEntry ######################################################################################## ######################################################################################## @@ -84,7 +50,7 @@ if date.tzinfo is None: date = date.replace(tzinfo=timezone.utc) return RevisionEntry( - self.archive, hash_to_bytes(id), date=date, root=hash_to_bytes(root), + hash_to_bytes(id), date=date, root=hash_to_bytes(root), ) diff --git a/swh/provenance/storage/archive.py b/swh/provenance/storage/archive.py --- a/swh/provenance/storage/archive.py +++ b/swh/provenance/storage/archive.py @@ -1,4 +1,4 @@ -from typing import Any, Dict, List +from typing import Any, Dict, Iterable, List # from functools import lru_cache from methodtools import lru_cache @@ -32,13 +32,13 @@ # TODO: filter unused fields yield from iter_origin_visit_statuses(self.storage, origin, visit) - def release_get(self, ids: List[bytes]): + def release_get(self, ids: Iterable[bytes]): # TODO: filter unused fields - yield from self.storage.release_get(ids) + yield from self.storage.release_get(list(ids)) - def revision_get(self, ids: List[bytes]): + def revision_get(self, ids: Iterable[bytes]): # TODO: filter unused fields - yield from self.storage.revision_get(ids) + yield from self.storage.revision_get(list(ids)) def snapshot_get_all_branches(self, snapshot: bytes): from swh.storage.algos.snapshot import snapshot_get_all_branches diff --git a/swh/provenance/tests/test_provenance_db.py b/swh/provenance/tests/test_provenance_db.py --- a/swh/provenance/tests/test_provenance_db.py +++ b/swh/provenance/tests/test_provenance_db.py @@ -8,9 +8,10 @@ import pytest from swh.model.tests.swh_model_data import TEST_OBJECTS +from swh.provenance.model import RevisionEntry from swh.provenance.origin import OriginEntry from swh.provenance.provenance import origin_add, revision_add -from swh.provenance.revision import RevisionEntry +from swh.provenance.storage.archive import ArchiveStorage from swh.provenance.tests.conftest import synthetic_result @@ -26,7 +27,7 @@ """Test the ProvenanceDB.origin_add() method""" for origin in TEST_OBJECTS["origin"]: entry = OriginEntry(url=origin.url, revisions=[]) - origin_add(provenance, entry) + origin_add(ArchiveStorage(swh_storage_with_objects), provenance, entry) # TODO: check some facts here @@ -37,11 +38,9 @@ # do it twice, there should be no change in results for revision in data["revision"]: entry = RevisionEntry( - archive_pg, id=revision["id"], date=ts2dt(revision["date"]), root=revision["directory"], - parents=revision["parents"], ) revision_add(provenance, archive_pg, entry) @@ -83,11 +82,7 @@ storage, data = storage_and_CMDBTS for revision in data["revision"]: entry = RevisionEntry( - archive_pg, - id=revision["id"], - date=ts2dt(revision["date"]), - root=revision["directory"], - parents=revision["parents"], + id=revision["id"], date=ts2dt(revision["date"]), root=revision["directory"], ) revision_add(provenance, archive_pg, entry) @@ -173,9 +168,7 @@ ("synthetic_noroot_upper.txt", {"lower": False, "mindepth": 1}), ), ) -def test_provenance_db( - provenance, storage_and_CMDBTS, archive_pg, syntheticfile, args -): +def test_provenance_db(provenance, storage_and_CMDBTS, archive_pg, syntheticfile, args): storage, data = storage_and_CMDBTS revisions = {rev["id"]: rev for rev in data["revision"]} @@ -197,14 +190,11 @@ for synth_rev in synthetic_result(syntheticfile): revision = revisions[synth_rev["sha1"]] entry = RevisionEntry( - archive_pg, - id=revision["id"], - date=ts2dt(revision["date"]), - root=revision["directory"], - parents=revision["parents"], + id=revision["id"], date=ts2dt(revision["date"]), root=revision["directory"], ) revision_add(provenance, archive_pg, entry, **args) + # import pdb; pdb.set_trace() # each "entry" in the synth file is one new revision rows["revision"].add(synth_rev["sha1"]) assert len(rows["revision"]) == db_count("revision")