diff --git a/swh/provenance/archive.py b/swh/provenance/archive.py index 651c972..3925cdd 100644 --- a/swh/provenance/archive.py +++ b/swh/provenance/archive.py @@ -1,27 +1,45 @@ -from typing import Any, Dict, Iterable, List +from typing import Any, Dict, Iterable from typing_extensions import Protocol, runtime_checkable +from swh.model.model import Revision, Sha1 + @runtime_checkable class ArchiveInterface(Protocol): - def directory_ls(self, id: bytes) -> List[Dict[str, Any]]: - ... + def directory_ls(self, id: Sha1) -> Iterable[Dict[str, Any]]: + """List entries for one directory. - def iter_origins(self): - ... + Args: + id: sha1 id of the directory to list entries from. - def iter_origin_visits(self, origin: str): - ... + Yields: + directory entries for such directory. - def iter_origin_visit_statuses(self, origin: str, visit: int): + """ ... - def release_get(self, ids: Iterable[bytes]): - ... + def revision_get(self, ids: Iterable[Sha1]) -> Iterable[Revision]: + """Given a list of sha1, return the revisions' information - def revision_get(self, ids: Iterable[bytes]): + Args: + revisions: list of sha1s for the revisions to be retrieved + + Yields: + revisions matching the identifiers. If a revision does + not exist, the provided sha1 is simply ignored. + + """ ... - def snapshot_get_all_branches(self, snapshot: bytes): + def snapshot_get_heads(self, id: Sha1) -> Iterable[Sha1]: + """List all revisions pointed by one snapshot. + + Args: + snapshot: the snapshot's identifier + + Yields: + sha1 ids of found revisions. + + """ ... diff --git a/swh/provenance/model.py b/swh/provenance/model.py index c80e07f..a731e5e 100644 --- a/swh/provenance/model.py +++ b/swh/provenance/model.py @@ -1,169 +1,141 @@ # Copyright (C) 2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from datetime import datetime -from typing import Iterable, Iterator, List, Optional, Set - -from swh.core.utils import grouper -from swh.model.model import ObjectType, TargetType +from typing import Iterable, Iterator, List, Optional from .archive import ArchiveInterface class OriginEntry: def __init__( self, url: str, date: datetime, snapshot: bytes, id: Optional[int] = None ): self.url = url - self.date = date + # TODO: this is probably not needed and will be removed! + # self.date = date self.snapshot = snapshot self.id = id self._revisions: Optional[List[RevisionEntry]] = None def retrieve_revisions(self, archive: ArchiveInterface): if self._revisions is None: - snapshot = archive.snapshot_get_all_branches(self.snapshot) - assert snapshot is not None - targets_set = set() - releases_set = set() - if snapshot is not None: - for branch in snapshot.branches: - if snapshot.branches[branch].target_type == TargetType.REVISION: - targets_set.add(snapshot.branches[branch].target) - elif snapshot.branches[branch].target_type == TargetType.RELEASE: - releases_set.add(snapshot.branches[branch].target) - - batchsize = 100 - for releases in grouper(releases_set, batchsize): - targets_set.update( - release.target - for release in archive.revision_get(releases) - if release is not None - and release.target_type == ObjectType.REVISION - ) - - revisions: Set[RevisionEntry] = set() - for targets in grouper(targets_set, batchsize): - revisions.update( - RevisionEntry(revision.id) - for revision in archive.revision_get(targets) - if revision is not None - ) - - self._revisions = list(revisions) + self._revisions = [ + RevisionEntry(rev) for rev in archive.snapshot_get_heads(self.snapshot) + ] @property def revisions(self) -> Iterator["RevisionEntry"]: if self._revisions is None: raise RuntimeError( "Revisions of this node has not yet been retrieved. " "Please call retrieve_revisions() before using this property." ) return (x for x in self._revisions) class RevisionEntry: def __init__( self, id: bytes, date: Optional[datetime] = None, root: Optional[bytes] = None, parents: Optional[Iterable[bytes]] = None, ): self.id = id self.date = date assert self.date is None or self.date.tzinfo is not None self.root = root self._parents = parents self._nodes: List[RevisionEntry] = [] def parents(self, archive: ArchiveInterface): if self._parents is None: revision = list(archive.revision_get([self.id])) if revision: self._parents = revision[0].parents if self._parents and not self._nodes: self._nodes = [ RevisionEntry( id=rev.id, root=rev.directory, date=rev.date.to_datetime(), parents=rev.parents, ) for rev in archive.revision_get(self._parents) - if rev is not None and rev.date is not None + if rev.date is not None ] yield from self._nodes def __str__(self): return f"" class DirectoryEntry: def __init__(self, id: bytes, name: bytes = b""): self.id = id self.name = name self._files: Optional[List[FileEntry]] = None self._dirs: Optional[List[DirectoryEntry]] = None def retrieve_children(self, archive: ArchiveInterface): if self._files is None and self._dirs is None: self._files = [] self._dirs = [] for child in archive.directory_ls(self.id): if child["type"] == "dir": self._dirs.append( DirectoryEntry(child["target"], name=child["name"]) ) elif child["type"] == "file": self._files.append(FileEntry(child["target"], child["name"])) @property def files(self) -> Iterator["FileEntry"]: if self._files is None: raise RuntimeError( "Children of this node has not yet been retrieved. " "Please call retrieve_children() before using this property." ) return (x for x in self._files) @property def dirs(self) -> Iterator["DirectoryEntry"]: if self._dirs is None: raise RuntimeError( "Children of this node has not yet been retrieved. " "Please call retrieve_children() before using this property." ) return (x for x in self._dirs) def __str__(self): return f"" def __eq__(self, other): return isinstance(other, DirectoryEntry) and (self.id, self.name) == ( other.id, other.name, ) def __hash__(self): return hash((self.id, self.name)) class FileEntry: def __init__(self, id: bytes, name: bytes): self.id = id self.name = name def __str__(self): return f"" def __eq__(self, other): return isinstance(other, FileEntry) and (self.id, self.name) == ( other.id, other.name, ) def __hash__(self): return hash((self.id, self.name)) diff --git a/swh/provenance/postgresql/archive.py b/swh/provenance/postgresql/archive.py index 2e70d89..d655ce4 100644 --- a/swh/provenance/postgresql/archive.py +++ b/swh/provenance/postgresql/archive.py @@ -1,124 +1,134 @@ -from typing import Any, Dict, Iterable, List +from typing import Any, Dict, Iterable, List, Set from methodtools import lru_cache import psycopg2 -from swh.model.model import Revision +from swh.model.model import ObjectType, Revision, Sha1, TargetType from swh.storage.postgresql.storage import Storage class ArchivePostgreSQL: def __init__(self, conn: psycopg2.extensions.connection): self.conn = conn self.storage = Storage(conn, objstorage={"cls": "memory"}) - def directory_ls(self, id: bytes) -> List[Dict[str, Any]]: + def directory_ls(self, id: Sha1) -> Iterable[Dict[str, Any]]: # TODO: only call directory_ls_internal if the id is not being queried by # someone else. Otherwise wait until results get properly cached. entries = self.directory_ls_internal(id) - return entries + yield from entries @lru_cache(maxsize=100000) - def directory_ls_internal(self, id: bytes) -> List[Dict[str, Any]]: + def directory_ls_internal(self, id: Sha1) -> List[Dict[str, Any]]: # TODO: add file size filtering with self.conn.cursor() as cursor: cursor.execute( """WITH dir AS (SELECT id AS dir_id, dir_entries, file_entries, rev_entries FROM directory WHERE id=%s), ls_d AS (SELECT dir_id, UNNEST(dir_entries) AS entry_id FROM dir), ls_f AS (SELECT dir_id, UNNEST(file_entries) AS entry_id FROM dir), ls_r AS (SELECT dir_id, UNNEST(rev_entries) AS entry_id FROM dir) (SELECT 'dir'::directory_entry_type AS type, e.target, e.name, NULL::sha1_git FROM ls_d LEFT JOIN directory_entry_dir e ON ls_d.entry_id=e.id) UNION (WITH known_contents AS (SELECT 'file'::directory_entry_type AS type, e.target, e.name, c.sha1_git FROM ls_f LEFT JOIN directory_entry_file e ON ls_f.entry_id=e.id INNER JOIN content c ON e.target=c.sha1_git) SELECT * FROM known_contents UNION (SELECT 'file'::directory_entry_type AS type, e.target, e.name, c.sha1_git FROM ls_f LEFT JOIN directory_entry_file e ON ls_f.entry_id=e.id LEFT JOIN skipped_content c ON e.target=c.sha1_git WHERE NOT EXISTS ( SELECT 1 FROM known_contents WHERE known_contents.sha1_git=e.target ) ) ) ORDER BY name """, (id,), ) return [ {"type": row[0], "target": row[1], "name": row[2]} for row in cursor.fetchall() ] - def iter_origins(self): - from swh.storage.algos.origin import iter_origins - - yield from iter_origins(self.storage) - - def iter_origin_visits(self, origin: str): - from swh.storage.algos.origin import iter_origin_visits - - # TODO: filter unused fields - yield from iter_origin_visits(self.storage, origin) - - def iter_origin_visit_statuses(self, origin: str, visit: int): - from swh.storage.algos.origin import iter_origin_visit_statuses - - # TODO: filter unused fields - yield from iter_origin_visit_statuses(self.storage, origin, visit) - - def release_get(self, ids: Iterable[bytes]): - # TODO: filter unused fields - yield from self.storage.release_get(list(ids)) - - def revision_get(self, ids: Iterable[bytes]): + def revision_get(self, ids: Iterable[Sha1]) -> Iterable[Revision]: with self.conn.cursor() as cursor: psycopg2.extras.execute_values( cursor, """ SELECT t.id, revision.date, revision.directory, ARRAY( SELECT rh.parent_id::bytea FROM revision_history rh WHERE rh.id = t.id ORDER BY rh.parent_rank ) FROM (VALUES %s) as t(sortkey, id) LEFT JOIN revision ON t.id = revision.id LEFT JOIN person author ON revision.author = author.id LEFT JOIN person committer ON revision.committer = committer.id ORDER BY sortkey """, ((sortkey, id) for sortkey, id in enumerate(ids)), ) for row in cursor.fetchall(): parents = [] for parent in row[3]: if parent: parents.append(parent) yield Revision.from_dict( { "id": row[0], "date": row[1], "directory": row[2], "parents": tuple(parents), } ) - def snapshot_get_all_branches(self, snapshot: bytes): + def snapshot_get_heads(self, id: Sha1) -> Iterable[Sha1]: + # TODO: this code is duplicated here (same as in swh.provenance.storage.archive) + # but it's just temporary. This method should actually perform a direct query to + # the SQL db of the archive. + from swh.core.utils import grouper from swh.storage.algos.snapshot import snapshot_get_all_branches - # TODO: filter unused fields - return snapshot_get_all_branches(self.storage, snapshot) + snapshot = snapshot_get_all_branches(self.storage, id) + assert snapshot is not None + + targets_set = set() + releases_set = set() + if snapshot is not None: + for branch in snapshot.branches: + if snapshot.branches[branch].target_type == TargetType.REVISION: + targets_set.add(snapshot.branches[branch].target) + elif snapshot.branches[branch].target_type == TargetType.RELEASE: + releases_set.add(snapshot.branches[branch].target) + + batchsize = 100 + for releases in grouper(releases_set, batchsize): + targets_set.update( + release.target + for release in self.storage.release_get(releases) + if release is not None and release.target_type == ObjectType.REVISION + ) + + revisions: Set[Sha1] = set() + for targets in grouper(targets_set, batchsize): + revisions.update( + revision.id + for revision in self.storage.revision_get(targets) + if revision is not None + ) + + yield from revisions diff --git a/swh/provenance/storage/archive.py b/swh/provenance/storage/archive.py index c1a1b03..92d6388 100644 --- a/swh/provenance/storage/archive.py +++ b/swh/provenance/storage/archive.py @@ -1,47 +1,53 @@ -from typing import Any, Dict, Iterable, List - -# from functools import lru_cache -from methodtools import lru_cache +from typing import Any, Dict, Iterable, Set +from swh.model.model import ObjectType, Revision, Sha1, TargetType from swh.storage.interface import StorageInterface class ArchiveStorage: def __init__(self, storage: StorageInterface): self.storage = storage - @lru_cache(maxsize=100000) - def directory_ls(self, id: bytes) -> List[Dict[str, Any]]: - # TODO: filter unused fields - return [entry for entry in self.storage.directory_ls(id)] - - def iter_origins(self): - from swh.storage.algos.origin import iter_origins - - yield from iter_origins(self.storage) - - def iter_origin_visits(self, origin: str): - from swh.storage.algos.origin import iter_origin_visits - + def directory_ls(self, id: Sha1) -> Iterable[Dict[str, Any]]: # TODO: filter unused fields - yield from iter_origin_visits(self.storage, origin) + yield from self.storage.directory_ls(id) - def iter_origin_visit_statuses(self, origin: str, visit: int): - from swh.storage.algos.origin import iter_origin_visit_statuses - - # TODO: filter unused fields - yield from iter_origin_visit_statuses(self.storage, origin, visit) - - def release_get(self, ids: Iterable[bytes]): + def revision_get(self, ids: Iterable[Sha1]) -> Iterable[Revision]: # TODO: filter unused fields - yield from self.storage.release_get(list(ids)) + yield from ( + rev for rev in self.storage.revision_get(list(ids)) if rev is not None + ) - def revision_get(self, ids: Iterable[bytes]): - # TODO: filter unused fields - yield from self.storage.revision_get(list(ids)) - - def snapshot_get_all_branches(self, snapshot: bytes): + def snapshot_get_heads(self, id: Sha1) -> Iterable[Sha1]: + from swh.core.utils import grouper from swh.storage.algos.snapshot import snapshot_get_all_branches - # TODO: filter unused fields - return snapshot_get_all_branches(self.storage, snapshot) + snapshot = snapshot_get_all_branches(self.storage, id) + assert snapshot is not None + + targets_set = set() + releases_set = set() + if snapshot is not None: + for branch in snapshot.branches: + if snapshot.branches[branch].target_type == TargetType.REVISION: + targets_set.add(snapshot.branches[branch].target) + elif snapshot.branches[branch].target_type == TargetType.RELEASE: + releases_set.add(snapshot.branches[branch].target) + + batchsize = 100 + for releases in grouper(releases_set, batchsize): + targets_set.update( + release.target + for release in self.storage.release_get(list(releases)) + if release is not None and release.target_type == ObjectType.REVISION + ) + + revisions: Set[Sha1] = set() + for targets in grouper(targets_set, batchsize): + revisions.update( + revision.id + for revision in self.storage.revision_get(list(targets)) + if revision is not None + ) + + yield from revisions