diff --git a/swh/provenance/archive.py b/swh/provenance/archive.py index 8246263..06aae57 100644 --- a/swh/provenance/archive.py +++ b/swh/provenance/archive.py @@ -1,45 +1,46 @@ from typing import Any, Dict, Iterable from typing_extensions import Protocol, runtime_checkable from swh.model.model import Sha1Git @runtime_checkable class ArchiveInterface(Protocol): def directory_ls(self, id: Sha1Git) -> Iterable[Dict[str, Any]]: """List entries for one directory. Args: id: sha1 id of the directory to list entries from. Yields: dictionary of entries in such directory containing only the keys "name", "target" and "type". """ ... def revision_get_parents(self, id: Sha1Git) -> Iterable[Sha1Git]: """List parents of one revision. Args: revisions: sha1 id of the revision to list parents from. Yields: sha1 ids for the parents of such revision. """ ... def snapshot_get_heads(self, id: Sha1Git) -> Iterable[Sha1Git]: """List all revisions targeted by one snapshot. Args: id: sha1 id of the snapshot. Yields: - sha1 ids of revisions that a target of such snapshot. + sha1 ids of revisions that are a target of such snapshot. Revisions are + guaranteed to be retrieved in chronological order """ ... diff --git a/swh/provenance/postgresql/archive.py b/swh/provenance/postgresql/archive.py index f4ce7d6..8737f31 100644 --- a/swh/provenance/postgresql/archive.py +++ b/swh/provenance/postgresql/archive.py @@ -1,99 +1,110 @@ from typing import Any, Dict, Iterable, List from methodtools import lru_cache import psycopg2 from swh.model.model import Sha1Git from swh.storage.postgresql.storage import Storage class ArchivePostgreSQL: def __init__(self, conn: psycopg2.extensions.connection): self.conn = conn self.storage = Storage(conn, objstorage={"cls": "memory"}) def directory_ls(self, id: Sha1Git) -> Iterable[Dict[str, Any]]: entries = self.directory_ls_internal(id) yield from entries @lru_cache(maxsize=100000) def directory_ls_internal(self, id: Sha1Git) -> List[Dict[str, Any]]: # TODO: add file size filtering with self.conn.cursor() as cursor: cursor.execute( """ WITH dir AS (SELECT id AS dir_id, dir_entries, file_entries, rev_entries FROM directory WHERE id=%s), ls_d AS (SELECT dir_id, UNNEST(dir_entries) AS entry_id FROM dir), ls_f AS (SELECT dir_id, UNNEST(file_entries) AS entry_id FROM dir), ls_r AS (SELECT dir_id, UNNEST(rev_entries) AS entry_id FROM dir) (SELECT 'dir'::directory_entry_type AS type, e.target, e.name, NULL::sha1_git FROM ls_d LEFT JOIN directory_entry_dir e ON ls_d.entry_id=e.id) UNION (WITH known_contents AS (SELECT 'file'::directory_entry_type AS type, e.target, e.name, c.sha1_git FROM ls_f LEFT JOIN directory_entry_file e ON ls_f.entry_id=e.id INNER JOIN content c ON e.target=c.sha1_git) SELECT * FROM known_contents UNION (SELECT 'file'::directory_entry_type AS type, e.target, e.name, c.sha1_git FROM ls_f LEFT JOIN directory_entry_file e ON ls_f.entry_id=e.id LEFT JOIN skipped_content c ON e.target=c.sha1_git WHERE NOT EXISTS ( SELECT 1 FROM known_contents WHERE known_contents.sha1_git=e.target ) ) ) """, (id,), ) return [ {"type": row[0], "target": row[1], "name": row[2]} for row in cursor.fetchall() ] def revision_get_parents(self, id: Sha1Git) -> Iterable[Sha1Git]: with self.conn.cursor() as cursor: cursor.execute( """ SELECT RH.parent_id::bytea FROM revision_history AS RH WHERE RH.id=%s ORDER BY RH.parent_rank """, (id,), ) # There should be at most one row anyway yield from (row[0] for row in cursor.fetchall()) def snapshot_get_heads(self, id: Sha1Git) -> Iterable[Sha1Git]: with self.conn.cursor() as cursor: cursor.execute( """ - WITH S AS (SELECT object_id FROM snapshot WHERE snapshot.id=%s) - (SELECT B.target AS head - FROM S - JOIN snapshot_branches AS BS ON (S.object_id=BS.snapshot_id) - JOIN snapshot_branch AS B ON (BS.branch_id=B.object_id) - WHERE B.target_type='revision'::snapshot_target) - UNION - (SELECT R.target AS head - FROM S - JOIN snapshot_branches AS BS ON (S.object_id=BS.snapshot_id) - JOIN snapshot_branch AS B ON (BS.branch_id=B.object_id) - JOIN release AS R ON (B.target=R.id) - WHERE B.target_type='release'::snapshot_target - AND R.target_type='revision'::object_type) + WITH + snaps AS (SELECT object_id FROM snapshot WHERE snapshot.id=%s), + heads AS ((SELECT R.id, R.date + FROM snaps + JOIN snapshot_branches AS BS + ON (snaps.object_id=BS.snapshot_id) + JOIN snapshot_branch AS B + ON (BS.branch_id=B.object_id) + JOIN revision AS R + ON (B.target=R.id) + WHERE B.target_type='revision'::snapshot_target) + UNION + (SELECT RV.id, RV.date + FROM snaps + JOIN snapshot_branches AS BS + ON (snaps.object_id=BS.snapshot_id) + JOIN snapshot_branch AS B + ON (BS.branch_id=B.object_id) + JOIN release AS RL + ON (B.target=RL.id) + JOIN revision AS RV + ON (RL.target=RV.id) + WHERE B.target_type='release'::snapshot_target + AND RL.target_type='revision'::object_type) + ORDER BY date, id) + SELECT id FROM heads """, (id,), ) - heads = [row[0] for row in cursor.fetchall()] - yield from heads + yield from (row[0] for row in cursor.fetchall()) diff --git a/swh/provenance/storage/archive.py b/swh/provenance/storage/archive.py index f2fd662..f53969f 100644 --- a/swh/provenance/storage/archive.py +++ b/swh/provenance/storage/archive.py @@ -1,57 +1,58 @@ -from typing import Any, Dict, Iterable, Set +from datetime import datetime +from typing import Any, Dict, Iterable, Set, Tuple from swh.model.model import ObjectType, Sha1Git, TargetType from swh.storage.interface import StorageInterface class ArchiveStorage: def __init__(self, storage: StorageInterface): self.storage = storage def directory_ls(self, id: Sha1Git) -> Iterable[Dict[str, Any]]: # TODO: add file size filtering for entry in self.storage.directory_ls(id): yield { "name": entry["name"], "target": entry["target"], "type": entry["type"], } def revision_get_parents(self, id: Sha1Git) -> Iterable[Sha1Git]: rev = self.storage.revision_get([id])[0] if rev is not None: yield from rev.parents def snapshot_get_heads(self, id: Sha1Git) -> Iterable[Sha1Git]: from swh.core.utils import grouper from swh.storage.algos.snapshot import snapshot_get_all_branches snapshot = snapshot_get_all_branches(self.storage, id) assert snapshot is not None targets_set = set() releases_set = set() if snapshot is not None: for branch in snapshot.branches: if snapshot.branches[branch].target_type == TargetType.REVISION: targets_set.add(snapshot.branches[branch].target) elif snapshot.branches[branch].target_type == TargetType.RELEASE: releases_set.add(snapshot.branches[branch].target) batchsize = 100 for releases in grouper(releases_set, batchsize): targets_set.update( release.target for release in self.storage.release_get(list(releases)) if release is not None and release.target_type == ObjectType.REVISION ) - revisions: Set[Sha1Git] = set() + revisions: Set[Tuple[datetime, Sha1Git]] = set() for targets in grouper(targets_set, batchsize): revisions.update( - revision.id + (revision.date.to_datetime(), revision.id) for revision in self.storage.revision_get(list(targets)) - if revision is not None + if revision is not None and revision.date is not None ) - yield from revisions + yield from (head for _, head in sorted(revisions))