diff --git a/swh/provenance/archive.py b/swh/provenance/archive.py index b79b2ce..dfa8eb6 100644 --- a/swh/provenance/archive.py +++ b/swh/provenance/archive.py @@ -1,45 +1,44 @@ from typing import Any, Dict, Iterable from typing_extensions import Protocol, runtime_checkable -from swh.model.model import Revision, Sha1Git +from swh.model.model import Sha1Git @runtime_checkable class ArchiveInterface(Protocol): def directory_ls(self, id: Sha1Git) -> Iterable[Dict[str, Any]]: """List entries for one directory. Args: id: sha1 id of the directory to list entries from. Yields: directory entries for such directory. """ ... - def revision_get(self, ids: Iterable[Sha1Git]) -> Iterable[Revision]: - """Given a list of sha1, return the revisions' information + def revision_get_parents(self, id: Sha1Git) -> Iterable[Sha1Git]: + """List parents of one revision. Args: - revisions: list of sha1s for the revisions to be retrieved + revisions: sha1 id of the revision to list parents from. Yields: - revisions matching the identifiers. If a revision does - not exist, the provided sha1 is simply ignored. + sha1 ids for the parents for such revision. """ ... def snapshot_get_heads(self, id: Sha1Git) -> Iterable[Sha1Git]: """List all revisions pointed by one snapshot. Args: id: sha1 id of the snapshot. Yields: sha1 ids of found revisions. """ ... diff --git a/swh/provenance/model.py b/swh/provenance/model.py index 7001e2b..38253f9 100644 --- a/swh/provenance/model.py +++ b/swh/provenance/model.py @@ -1,158 +1,144 @@ # Copyright (C) 2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from datetime import datetime from typing import Iterable, Iterator, List, Optional from swh.model.hashutil import hash_to_bytes from swh.model.identifiers import origin_identifier from swh.model.model import Sha1Git from .archive import ArchiveInterface class OriginEntry: def __init__(self, url: str, snapshot: Sha1Git): self.url = url self.id: Sha1Git = hash_to_bytes(origin_identifier({"url": self.url})) self.snapshot = snapshot self._revisions: Optional[List[RevisionEntry]] = None def retrieve_revisions(self, archive: ArchiveInterface): if self._revisions is None: self._revisions = [ RevisionEntry(rev) for rev in archive.snapshot_get_heads(self.snapshot) ] @property def revisions(self) -> Iterator["RevisionEntry"]: if self._revisions is None: raise RuntimeError( "Revisions of this node has not yet been retrieved. " "Please call retrieve_revisions() before using this property." ) return (x for x in self._revisions) def __str__(self): return f"" class RevisionEntry: def __init__( self, id: Sha1Git, date: Optional[datetime] = None, root: Optional[Sha1Git] = None, parents: Optional[Iterable[Sha1Git]] = None, ): self.id = id self.date = date assert self.date is None or self.date.tzinfo is not None self.root = root self._parents_ids = parents self._parents_entries: Optional[List[RevisionEntry]] = None def retrieve_parents(self, archive: ArchiveInterface): if self._parents_entries is None: if self._parents_ids is None: - revision = list(archive.revision_get([self.id])) - if revision: - self._parents_ids = revision[0].parents - else: - self._parents_ids = [] - - self._parents_entries = [ - RevisionEntry( - id=rev.id, - root=rev.directory, - date=rev.date.to_datetime(), - parents=rev.parents, - ) - for rev in archive.revision_get(self._parents_ids) - if rev.date is not None - ] + self._parents_ids = archive.revision_get_parents(self.id) + self._parents_entries = [RevisionEntry(id) for id in self._parents_ids] @property def parents(self) -> Iterator["RevisionEntry"]: if self._parents_entries is None: raise RuntimeError( "Parents of this node has not yet been retrieved. " "Please call retrieve_parents() before using this property." ) return (x for x in self._parents_entries) def __str__(self): return ( f"" ) class DirectoryEntry: def __init__(self, id: Sha1Git, name: bytes = b""): self.id = id self.name = name self._files: Optional[List[FileEntry]] = None self._dirs: Optional[List[DirectoryEntry]] = None def retrieve_children(self, archive: ArchiveInterface): if self._files is None and self._dirs is None: self._files = [] self._dirs = [] for child in archive.directory_ls(self.id): if child["type"] == "dir": self._dirs.append( DirectoryEntry(child["target"], name=child["name"]) ) elif child["type"] == "file": self._files.append(FileEntry(child["target"], child["name"])) @property def files(self) -> Iterator["FileEntry"]: if self._files is None: raise RuntimeError( "Children of this node has not yet been retrieved. " "Please call retrieve_children() before using this property." ) return (x for x in self._files) @property def dirs(self) -> Iterator["DirectoryEntry"]: if self._dirs is None: raise RuntimeError( "Children of this node has not yet been retrieved. " "Please call retrieve_children() before using this property." ) return (x for x in self._dirs) def __str__(self): return f"" def __eq__(self, other): return isinstance(other, DirectoryEntry) and (self.id, self.name) == ( other.id, other.name, ) def __hash__(self): return hash((self.id, self.name)) class FileEntry: def __init__(self, id: Sha1Git, name: bytes): self.id = id self.name = name def __str__(self): return f"" def __eq__(self, other): return isinstance(other, FileEntry) and (self.id, self.name) == ( other.id, other.name, ) def __hash__(self): return hash((self.id, self.name)) diff --git a/swh/provenance/postgresql/archive.py b/swh/provenance/postgresql/archive.py index 95f3bec..559dcb9 100644 --- a/swh/provenance/postgresql/archive.py +++ b/swh/provenance/postgresql/archive.py @@ -1,134 +1,114 @@ from typing import Any, Dict, Iterable, List, Set from methodtools import lru_cache import psycopg2 -from swh.model.model import ObjectType, Revision, Sha1Git, TargetType +from swh.model.model import ObjectType, Sha1Git, TargetType from swh.storage.postgresql.storage import Storage class ArchivePostgreSQL: def __init__(self, conn: psycopg2.extensions.connection): self.conn = conn self.storage = Storage(conn, objstorage={"cls": "memory"}) def directory_ls(self, id: Sha1Git) -> Iterable[Dict[str, Any]]: # TODO: only call directory_ls_internal if the id is not being queried by # someone else. Otherwise wait until results get properly cached. entries = self.directory_ls_internal(id) yield from entries @lru_cache(maxsize=100000) def directory_ls_internal(self, id: Sha1Git) -> List[Dict[str, Any]]: # TODO: add file size filtering with self.conn.cursor() as cursor: cursor.execute( """WITH dir AS (SELECT id AS dir_id, dir_entries, file_entries, rev_entries FROM directory WHERE id=%s), ls_d AS (SELECT dir_id, UNNEST(dir_entries) AS entry_id FROM dir), ls_f AS (SELECT dir_id, UNNEST(file_entries) AS entry_id FROM dir), ls_r AS (SELECT dir_id, UNNEST(rev_entries) AS entry_id FROM dir) (SELECT 'dir'::directory_entry_type AS type, e.target, e.name, NULL::sha1_git FROM ls_d LEFT JOIN directory_entry_dir e ON ls_d.entry_id=e.id) UNION (WITH known_contents AS (SELECT 'file'::directory_entry_type AS type, e.target, e.name, c.sha1_git FROM ls_f LEFT JOIN directory_entry_file e ON ls_f.entry_id=e.id INNER JOIN content c ON e.target=c.sha1_git) SELECT * FROM known_contents UNION (SELECT 'file'::directory_entry_type AS type, e.target, e.name, c.sha1_git FROM ls_f LEFT JOIN directory_entry_file e ON ls_f.entry_id=e.id LEFT JOIN skipped_content c ON e.target=c.sha1_git WHERE NOT EXISTS ( SELECT 1 FROM known_contents WHERE known_contents.sha1_git=e.target ) ) ) ORDER BY name """, (id,), ) return [ {"type": row[0], "target": row[1], "name": row[2]} for row in cursor.fetchall() ] - def revision_get(self, ids: Iterable[Sha1Git]) -> Iterable[Revision]: + def revision_get_parents(self, id: Sha1Git) -> Iterable[Sha1Git]: with self.conn.cursor() as cursor: - psycopg2.extras.execute_values( - cursor, + cursor.execute( """ - SELECT t.id, revision.date, revision.directory, - ARRAY( - SELECT rh.parent_id::bytea - FROM revision_history rh - WHERE rh.id = t.id - ORDER BY rh.parent_rank - ) - FROM (VALUES %s) as t(sortkey, id) - LEFT JOIN revision ON t.id = revision.id - LEFT JOIN person author ON revision.author = author.id - LEFT JOIN person committer ON revision.committer = committer.id - ORDER BY sortkey + SELECT RH.parent_id::bytea + FROM revision_history AS RH + WHERE RH.id=%s + ORDER BY RH.parent_rank """, - ((sortkey, id) for sortkey, id in enumerate(ids)), + (id,), ) - for row in cursor.fetchall(): - parents = [] - for parent in row[3]: - if parent: - parents.append(parent) - yield Revision.from_dict( - { - "id": row[0], - "date": row[1], - "directory": row[2], - "parents": tuple(parents), - } - ) + # There should be at most one row anyway + yield from (row[0] for row in cursor.fetchall()) def snapshot_get_heads(self, id: Sha1Git) -> Iterable[Sha1Git]: # TODO: this code is duplicated here (same as in swh.provenance.storage.archive) # but it's just temporary. This method should actually perform a direct query to # the SQL db of the archive. from swh.core.utils import grouper from swh.storage.algos.snapshot import snapshot_get_all_branches snapshot = snapshot_get_all_branches(self.storage, id) assert snapshot is not None targets_set = set() releases_set = set() if snapshot is not None: for branch in snapshot.branches: if snapshot.branches[branch].target_type == TargetType.REVISION: targets_set.add(snapshot.branches[branch].target) elif snapshot.branches[branch].target_type == TargetType.RELEASE: releases_set.add(snapshot.branches[branch].target) batchsize = 100 for releases in grouper(releases_set, batchsize): targets_set.update( release.target for release in self.storage.release_get(releases) if release is not None and release.target_type == ObjectType.REVISION ) revisions: Set[Sha1Git] = set() for targets in grouper(targets_set, batchsize): revisions.update( revision.id for revision in self.storage.revision_get(targets) if revision is not None ) yield from revisions diff --git a/swh/provenance/storage/archive.py b/swh/provenance/storage/archive.py index 9dff527..4d83c5c 100644 --- a/swh/provenance/storage/archive.py +++ b/swh/provenance/storage/archive.py @@ -1,53 +1,52 @@ from typing import Any, Dict, Iterable, Set -from swh.model.model import ObjectType, Revision, Sha1Git, TargetType +from swh.model.model import ObjectType, Sha1Git, TargetType from swh.storage.interface import StorageInterface class ArchiveStorage: def __init__(self, storage: StorageInterface): self.storage = storage def directory_ls(self, id: Sha1Git) -> Iterable[Dict[str, Any]]: # TODO: filter unused fields yield from self.storage.directory_ls(id) - def revision_get(self, ids: Iterable[Sha1Git]) -> Iterable[Revision]: - # TODO: filter unused fields - yield from ( - rev for rev in self.storage.revision_get(list(ids)) if rev is not None - ) + def revision_get_parents(self, id: Sha1Git) -> Iterable[Sha1Git]: + rev = self.storage.revision_get([id])[0] + if rev is not None: + yield from rev.parents def snapshot_get_heads(self, id: Sha1Git) -> Iterable[Sha1Git]: from swh.core.utils import grouper from swh.storage.algos.snapshot import snapshot_get_all_branches snapshot = snapshot_get_all_branches(self.storage, id) assert snapshot is not None targets_set = set() releases_set = set() if snapshot is not None: for branch in snapshot.branches: if snapshot.branches[branch].target_type == TargetType.REVISION: targets_set.add(snapshot.branches[branch].target) elif snapshot.branches[branch].target_type == TargetType.RELEASE: releases_set.add(snapshot.branches[branch].target) batchsize = 100 for releases in grouper(releases_set, batchsize): targets_set.update( release.target for release in self.storage.release_get(list(releases)) if release is not None and release.target_type == ObjectType.REVISION ) revisions: Set[Sha1Git] = set() for targets in grouper(targets_set, batchsize): revisions.update( revision.id for revision in self.storage.revision_get(list(targets)) if revision is not None ) yield from revisions