diff --git a/swh/provenance/postgresql/provenance.py b/swh/provenance/postgresql/provenance.py index 1f788ff..f1e0900 100644 --- a/swh/provenance/postgresql/provenance.py +++ b/swh/provenance/postgresql/provenance.py @@ -1,374 +1,375 @@ # Copyright (C) 2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from datetime import datetime import itertools import logging from typing import Dict, Generator, Iterable, Optional, Set, Tuple import psycopg2.extensions import psycopg2.extras from typing_extensions import Literal from swh.core.db import BaseDb from swh.model.model import Sha1Git from ..interface import ( EntityType, ProvenanceResult, RelationData, RelationType, RevisionData, ) class ProvenanceStoragePostgreSql: def __init__( self, conn: psycopg2.extensions.connection, raise_on_commit: bool = False ) -> None: BaseDb.adapt_conn(conn) conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT) conn.set_session(autocommit=True) self.conn = conn self.cursor = self.conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) # XXX: not sure this is the best place to do it! sql = "SET timezone TO 'UTC'" self.cursor.execute(sql) self._flavor: Optional[str] = None self.raise_on_commit = raise_on_commit @property def flavor(self) -> str: if self._flavor is None: sql = "SELECT swh_get_dbflavor() AS flavor" self.cursor.execute(sql) self._flavor = self.cursor.fetchone()["flavor"] assert self._flavor is not None return self._flavor def with_path(self) -> bool: return "with-path" in self.flavor @property def denormalized(self) -> bool: return "denormalized" in self.flavor def content_set_date(self, dates: Dict[Sha1Git, datetime]) -> bool: return self._entity_set_date("content", dates) def content_get(self, ids: Iterable[Sha1Git]) -> Dict[Sha1Git, datetime]: return self._entity_get_date("content", ids) def directory_set_date(self, dates: Dict[Sha1Git, datetime]) -> bool: return self._entity_set_date("directory", dates) def directory_get(self, ids: Iterable[Sha1Git]) -> Dict[Sha1Git, datetime]: return self._entity_get_date("directory", ids) def entity_get_all(self, entity: EntityType) -> Set[Sha1Git]: sql = f"SELECT sha1 FROM {entity.value}" self.cursor.execute(sql) return {row["sha1"] for row in self.cursor.fetchall()} def location_get(self) -> Set[bytes]: - sql = "SELECT encode(location.path::bytea, 'escape') AS path FROM location" + sql = "SELECT location.path AS path FROM location" self.cursor.execute(sql) return {row["path"] for row in self.cursor.fetchall()} def origin_set_url(self, urls: Dict[Sha1Git, str]) -> bool: try: if urls: sql = """ LOCK TABLE ONLY origin; INSERT INTO origin(sha1, url) VALUES %s ON CONFLICT DO NOTHING """ psycopg2.extras.execute_values(self.cursor, sql, urls.items()) return True except: # noqa: E722 # Unexpected error occurred, rollback all changes and log message logging.exception("Unexpected error") if self.raise_on_commit: raise return False def origin_get(self, ids: Iterable[Sha1Git]) -> Dict[Sha1Git, str]: urls: Dict[Sha1Git, str] = {} sha1s = tuple(ids) if sha1s: # TODO: consider splitting this query in several ones if sha1s is too big! values = ", ".join(itertools.repeat("%s", len(sha1s))) sql = f""" SELECT sha1, url FROM origin WHERE sha1 IN ({values}) """ self.cursor.execute(sql, sha1s) urls.update( (row["sha1"], row["url"].decode()) for row in self.cursor.fetchall() ) return urls def revision_set_date(self, dates: Dict[Sha1Git, datetime]) -> bool: return self._entity_set_date("revision", dates) def content_find_first(self, id: Sha1Git) -> Optional[ProvenanceResult]: sql = "SELECT * FROM swh_provenance_content_find_first(%s)" self.cursor.execute(sql, (id,)) row = self.cursor.fetchone() return ProvenanceResult(**row) if row is not None else None def content_find_all( self, id: Sha1Git, limit: Optional[int] = None ) -> Generator[ProvenanceResult, None, None]: sql = "SELECT * FROM swh_provenance_content_find_all(%s, %s)" self.cursor.execute(sql, (id, limit)) yield from (ProvenanceResult(**row) for row in self.cursor.fetchall()) def revision_set_origin(self, origins: Dict[Sha1Git, Sha1Git]) -> bool: try: if origins: sql = """ LOCK TABLE ONLY revision; INSERT INTO revision(sha1, origin) (SELECT V.rev AS sha1, O.id AS origin FROM (VALUES %s) AS V(rev, org) JOIN origin AS O ON (O.sha1=V.org)) ON CONFLICT (sha1) DO UPDATE SET origin=EXCLUDED.origin """ psycopg2.extras.execute_values(self.cursor, sql, origins.items()) return True except: # noqa: E722 # Unexpected error occurred, rollback all changes and log message logging.exception("Unexpected error") if self.raise_on_commit: raise return False def revision_get(self, ids: Iterable[Sha1Git]) -> Dict[Sha1Git, RevisionData]: result: Dict[Sha1Git, RevisionData] = {} sha1s = tuple(ids) if sha1s: # TODO: consider splitting this query in several ones if sha1s is too big! values = ", ".join(itertools.repeat("%s", len(sha1s))) sql = f""" - SELECT sha1, date, origin - FROM revision - WHERE sha1 IN ({values}) + SELECT R.sha1, R.date, O.sha1 AS origin + FROM revision AS R + LEFT JOIN origin AS O ON (O.id=R.origin) + WHERE R.sha1 IN ({values}) """ self.cursor.execute(sql, sha1s) result.update( (row["sha1"], RevisionData(date=row["date"], origin=row["origin"])) for row in self.cursor.fetchall() ) return result def relation_add( self, relation: RelationType, data: Iterable[RelationData] ) -> bool: try: rows = tuple((rel.src, rel.dst, rel.path) for rel in data) if rows: table = relation.value src, *_, dst = table.split("_") if src != "origin": # Origin entries should be inserted previously as they require extra # non-null information srcs = tuple(set((sha1,) for (sha1, _, _) in rows)) sql = f""" LOCK TABLE ONLY {src}; INSERT INTO {src}(sha1) VALUES %s ON CONFLICT DO NOTHING """ psycopg2.extras.execute_values(self.cursor, sql, srcs) if dst != "origin": # Origin entries should be inserted previously as they require extra # non-null information dsts = tuple(set((sha1,) for (_, sha1, _) in rows)) sql = f""" LOCK TABLE ONLY {dst}; INSERT INTO {dst}(sha1) VALUES %s ON CONFLICT DO NOTHING """ psycopg2.extras.execute_values(self.cursor, sql, dsts) joins = [ f"INNER JOIN {src} AS S ON (S.sha1=V.src)", f"INNER JOIN {dst} AS D ON (D.sha1=V.dst)", ] nope = (RelationType.REV_BEFORE_REV, RelationType.REV_IN_ORG) selected = ["S.id"] if self.denormalized and relation not in nope: selected.append("ARRAY_AGG(D.id)") else: selected.append("D.id") if self._relation_uses_location_table(relation): locations = tuple(set((path,) for (_, _, path) in rows)) sql = """ LOCK TABLE ONLY location; INSERT INTO location(path) VALUES %s ON CONFLICT (path) DO NOTHING """ psycopg2.extras.execute_values(self.cursor, sql, locations) joins.append("INNER JOIN location AS L ON (L.path=V.path)") if self.denormalized: selected.append("ARRAY_AGG(L.id)") else: selected.append("L.id") sql_l = [ f"INSERT INTO {table}", f" SELECT {', '.join(selected)}", " FROM (VALUES %s) AS V(src, dst, path)", *joins, ] if self.denormalized and relation not in nope: sql_l.append("GROUP BY S.id") sql_l.append( f"""ON CONFLICT ({src}) DO UPDATE SET {dst}=ARRAY( SELECT UNNEST({table}.{dst} || EXCLUDED.{dst}) ), location=ARRAY( SELECT UNNEST({relation.value}.location || EXCLUDED.location) ) """ ) else: sql_l.append("ON CONFLICT DO NOTHING") sql = "\n".join(sql_l) psycopg2.extras.execute_values(self.cursor, sql, rows) return True except: # noqa: E722 # Unexpected error occurred, rollback all changes and log message logging.exception("Unexpected error") if self.raise_on_commit: raise return False def relation_get( self, relation: RelationType, ids: Iterable[Sha1Git], reverse: bool = False ) -> Set[RelationData]: return self._relation_get(relation, ids, reverse) def relation_get_all(self, relation: RelationType) -> Set[RelationData]: return self._relation_get(relation, None) def _entity_get_date( self, entity: Literal["content", "directory", "revision"], ids: Iterable[Sha1Git], ) -> Dict[Sha1Git, datetime]: dates: Dict[Sha1Git, datetime] = {} sha1s = tuple(ids) if sha1s: # TODO: consider splitting this query in several ones if sha1s is too big! values = ", ".join(itertools.repeat("%s", len(sha1s))) sql = f""" SELECT sha1, date FROM {entity} WHERE sha1 IN ({values}) """ self.cursor.execute(sql, sha1s) dates.update((row["sha1"], row["date"]) for row in self.cursor.fetchall()) return dates def _entity_set_date( self, entity: Literal["content", "directory", "revision"], data: Dict[Sha1Git, datetime], ) -> bool: try: if data: sql = f""" LOCK TABLE ONLY {entity}; INSERT INTO {entity}(sha1, date) VALUES %s ON CONFLICT (sha1) DO UPDATE SET date=LEAST(EXCLUDED.date,{entity}.date) """ psycopg2.extras.execute_values(self.cursor, sql, data.items()) return True except: # noqa: E722 # Unexpected error occurred, rollback all changes and log message logging.exception("Unexpected error") if self.raise_on_commit: raise return False def _relation_get( self, relation: RelationType, ids: Optional[Iterable[Sha1Git]], reverse: bool = False, ) -> Set[RelationData]: result: Set[RelationData] = set() sha1s: Optional[Tuple[Tuple[Sha1Git, ...]]] if ids is not None: sha1s = (tuple(ids),) where = f"WHERE {'S' if not reverse else 'D'}.sha1 IN %s" else: sha1s = None where = "" aggreg_dst = self.denormalized and relation in ( RelationType.CNT_EARLY_IN_REV, RelationType.CNT_IN_DIR, RelationType.DIR_IN_REV, ) if sha1s is None or sha1s[0]: table = relation.value src, *_, dst = table.split("_") # TODO: improve this! if src == "revision" and dst == "revision": src_field = "prev" dst_field = "next" else: src_field = src dst_field = dst if aggreg_dst: revloc = f"UNNEST(R.{dst_field}) AS dst" if self._relation_uses_location_table(relation): revloc += ", UNNEST(R.location) AS path" else: revloc = f"R.{dst_field} AS dst" if self._relation_uses_location_table(relation): revloc += ", R.location AS path" inner_sql = f""" SELECT S.sha1 AS src, {revloc} FROM {table} AS R INNER JOIN {src} AS S ON (S.id=R.{src_field}) """ if where != "" and not reverse: inner_sql += where if self._relation_uses_location_table(relation): loc = "L.path AS path" else: loc = "NULL AS path" sql = f""" SELECT CL.src, D.sha1 AS dst, {loc} FROM ({inner_sql}) AS CL INNER JOIN {dst} AS D ON (D.id=CL.dst) """ if self._relation_uses_location_table(relation): sql += "INNER JOIN location AS L ON (L.id=CL.path)" if where != "" and reverse: sql += where self.cursor.execute(sql, sha1s) result.update(RelationData(**row) for row in self.cursor.fetchall()) return result def _relation_uses_location_table(self, relation: RelationType) -> bool: if self.with_path(): src = relation.value.split("_")[0] return src in ("content", "directory") return False diff --git a/swh/provenance/tests/test_revision_content_layer.py b/swh/provenance/tests/test_revision_content_layer.py index 2aa17d4..4d59114 100644 --- a/swh/provenance/tests/test_revision_content_layer.py +++ b/swh/provenance/tests/test_revision_content_layer.py @@ -1,447 +1,447 @@ # Copyright (C) 2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import re from typing import Any, Dict, Iterable, Iterator, List, Optional, Set, Tuple import pytest from typing_extensions import TypedDict from swh.model.hashutil import hash_to_bytes from swh.model.model import Sha1Git from swh.provenance.archive import ArchiveInterface from swh.provenance.interface import EntityType, ProvenanceInterface, RelationType from swh.provenance.model import RevisionEntry from swh.provenance.revision import revision_add from swh.provenance.tests.conftest import ( fill_storage, get_datafile, load_repo_data, ts2dt, ) class SynthRelation(TypedDict): prefix: Optional[str] path: str src: Sha1Git dst: Sha1Git rel_ts: float class SynthRevision(TypedDict): sha1: Sha1Git date: float msg: str R_C: List[SynthRelation] R_D: List[SynthRelation] D_C: List[SynthRelation] def synthetic_revision_content_result(filename: str) -> Iterator[SynthRevision]: """Generates dict representations of synthetic revisions found in the synthetic file (from the data/ directory) given as argument of the generator. Generated SynthRevision (typed dict) with the following elements: "sha1": (Sha1Git) sha1 of the revision, "date": (float) timestamp of the revision, "msg": (str) commit message of the revision, "R_C": (list) new R---C relations added by this revision "R_D": (list) new R-D relations added by this revision "D_C": (list) new D-C relations added by this revision Each relation above is a SynthRelation typed dict with: "path": (str) location "src": (Sha1Git) sha1 of the source of the relation "dst": (Sha1Git) sha1 of the destination of the relation "rel_ts": (float) timestamp of the target of the relation (related to the timestamp of the revision) """ with open(get_datafile(filename), "r") as fobj: yield from _parse_synthetic_revision_content_file(fobj) def _parse_synthetic_revision_content_file( fobj: Iterable[str], ) -> Iterator[SynthRevision]: """Read a 'synthetic' file and generate a dict representation of the synthetic revision for each revision listed in the synthetic file. """ regs = [ "(?PR[0-9]{2,4})?", "(?P[^| ]*)", "([+] )?(?P[^| +]*?)[/]?", "(?P[RDC]) (?P[0-9a-f]{40})", "(?P-?[0-9]+(.[0-9]+)?)", ] regex = re.compile("^ *" + r" *[|] *".join(regs) + r" *(#.*)?$") current_rev: List[dict] = [] for m in (regex.match(line) for line in fobj): if m: d = m.groupdict() if d["revname"]: if current_rev: yield _mk_synth_rev(current_rev) current_rev.clear() current_rev.append(d) if current_rev: yield _mk_synth_rev(current_rev) def _mk_synth_rev(synth_rev: List[Dict[str, str]]) -> SynthRevision: assert synth_rev[0]["type"] == "R" rev = SynthRevision( sha1=hash_to_bytes(synth_rev[0]["sha1"]), date=float(synth_rev[0]["ts"]), msg=synth_rev[0]["revname"], R_C=[], R_D=[], D_C=[], ) current_path = None # path of the last R-D relation we parsed, used a prefix for next D-C # relations for row in synth_rev[1:]: if row["reltype"] == "R---C": assert row["type"] == "C" rev["R_C"].append( SynthRelation( prefix=None, path=row["path"], src=rev["sha1"], dst=hash_to_bytes(row["sha1"]), rel_ts=float(row["ts"]), ) ) current_path = None elif row["reltype"] == "R-D": assert row["type"] == "D" rev["R_D"].append( SynthRelation( prefix=None, path=row["path"], src=rev["sha1"], dst=hash_to_bytes(row["sha1"]), rel_ts=float(row["ts"]), ) ) current_path = row["path"] elif row["reltype"] == "D-C": assert row["type"] == "C" rev["D_C"].append( SynthRelation( prefix=current_path, path=row["path"], src=rev["R_D"][-1]["dst"], dst=hash_to_bytes(row["sha1"]), rel_ts=float(row["ts"]), ) ) return rev @pytest.mark.parametrize( "repo, lower, mindepth", ( ("cmdbts2", True, 1), ("cmdbts2", False, 1), ("cmdbts2", True, 2), ("cmdbts2", False, 2), ("out-of-order", True, 1), ), ) def test_revision_content_result( provenance: ProvenanceInterface, archive: ArchiveInterface, repo: str, lower: bool, mindepth: int, ) -> None: # read data/README.md for more details on how these datasets are generated data = load_repo_data(repo) fill_storage(archive.storage, data) syntheticfile = get_datafile( f"synthetic_{repo}_{'lower' if lower else 'upper'}_{mindepth}.txt" ) revisions = {rev["id"]: rev for rev in data["revision"]} rows: Dict[str, Set[Any]] = { "content": set(), "content_in_directory": set(), "content_in_revision": set(), "directory": set(), "directory_in_revision": set(), "location": set(), "revision": set(), } def maybe_path(path: str) -> Optional[bytes]: if provenance.storage.with_path(): return path.encode("utf-8") return None for synth_rev in synthetic_revision_content_result(syntheticfile): revision = revisions[synth_rev["sha1"]] entry = RevisionEntry( id=revision["id"], date=ts2dt(revision["date"]), root=revision["directory"], ) revision_add(provenance, archive, [entry], lower=lower, mindepth=mindepth) # each "entry" in the synth file is one new revision rows["revision"].add(synth_rev["sha1"]) assert rows["revision"] == provenance.storage.entity_get_all( EntityType.REVISION ), synth_rev["msg"] # check the timestamp of the revision rev_ts = synth_rev["date"] rev_data = provenance.storage.revision_get([synth_rev["sha1"]])[ synth_rev["sha1"] ] assert ( rev_data.date is not None and rev_ts == rev_data.date.timestamp() ), synth_rev["msg"] # this revision might have added new content objects rows["content"] |= set(x["dst"] for x in synth_rev["R_C"]) rows["content"] |= set(x["dst"] for x in synth_rev["D_C"]) assert rows["content"] == provenance.storage.entity_get_all( EntityType.CONTENT ), synth_rev["msg"] # check for R-C (direct) entries # these are added directly in the content_early_in_rev table rows["content_in_revision"] |= set( (x["dst"], x["src"], maybe_path(x["path"])) for x in synth_rev["R_C"] ) assert rows["content_in_revision"] == { (rel.src, rel.dst, rel.path) for rel in provenance.storage.relation_get_all( RelationType.CNT_EARLY_IN_REV ) }, synth_rev["msg"] # check timestamps for rc in synth_rev["R_C"]: assert ( rev_ts + rc["rel_ts"] == provenance.storage.content_get([rc["dst"]])[rc["dst"]].timestamp() ), synth_rev["msg"] # check directories # each directory stored in the provenance index is an entry # in the "directory" table... rows["directory"] |= set(x["dst"] for x in synth_rev["R_D"]) assert rows["directory"] == provenance.storage.entity_get_all( EntityType.DIRECTORY ), synth_rev["msg"] # ... + a number of rows in the "directory_in_rev" table... # check for R-D entries rows["directory_in_revision"] |= set( (x["dst"], x["src"], maybe_path(x["path"])) for x in synth_rev["R_D"] ) assert rows["directory_in_revision"] == { (rel.src, rel.dst, rel.path) for rel in provenance.storage.relation_get_all(RelationType.DIR_IN_REV) }, synth_rev["msg"] # check timestamps for rd in synth_rev["R_D"]: assert ( rev_ts + rd["rel_ts"] == provenance.storage.directory_get([rd["dst"]])[rd["dst"]].timestamp() ), synth_rev["msg"] # ... + a number of rows in the "content_in_dir" table # for content of the directory. # check for D-C entries rows["content_in_directory"] |= set( (x["dst"], x["src"], maybe_path(x["path"])) for x in synth_rev["D_C"] ) assert rows["content_in_directory"] == { (rel.src, rel.dst, rel.path) for rel in provenance.storage.relation_get_all(RelationType.CNT_IN_DIR) }, synth_rev["msg"] # check timestamps for dc in synth_rev["D_C"]: assert ( rev_ts + dc["rel_ts"] == provenance.storage.content_get([dc["dst"]])[dc["dst"]].timestamp() ), synth_rev["msg"] if provenance.storage.with_path(): # check for location entries - rows["location"] |= set(x["path"] for x in synth_rev["R_C"]) - rows["location"] |= set(x["path"] for x in synth_rev["D_C"]) - rows["location"] |= set(x["path"] for x in synth_rev["R_D"]) + rows["location"] |= set(x["path"].encode() for x in synth_rev["R_C"]) + rows["location"] |= set(x["path"].encode() for x in synth_rev["D_C"]) + rows["location"] |= set(x["path"].encode() for x in synth_rev["R_D"]) assert rows["location"] == provenance.storage.location_get(), synth_rev[ "msg" ] @pytest.mark.parametrize( "repo, lower, mindepth", ( ("cmdbts2", True, 1), ("cmdbts2", False, 1), ("cmdbts2", True, 2), ("cmdbts2", False, 2), ("out-of-order", True, 1), ), ) @pytest.mark.parametrize("batch", (True, False)) def test_provenance_heuristics_content_find_all( provenance: ProvenanceInterface, archive: ArchiveInterface, repo: str, lower: bool, mindepth: int, batch: bool, ) -> None: # read data/README.md for more details on how these datasets are generated data = load_repo_data(repo) fill_storage(archive.storage, data) revisions = [ RevisionEntry( id=revision["id"], date=ts2dt(revision["date"]), root=revision["directory"], ) for revision in data["revision"] ] def maybe_path(path: str) -> str: if provenance.storage.with_path(): return path return "" if batch: revision_add(provenance, archive, revisions, lower=lower, mindepth=mindepth) else: for revision in revisions: revision_add( provenance, archive, [revision], lower=lower, mindepth=mindepth ) syntheticfile = get_datafile( f"synthetic_{repo}_{'lower' if lower else 'upper'}_{mindepth}.txt" ) expected_occurrences: Dict[str, List[Tuple[str, float, Optional[str], str]]] = {} for synth_rev in synthetic_revision_content_result(syntheticfile): rev_id = synth_rev["sha1"].hex() rev_ts = synth_rev["date"] for rc in synth_rev["R_C"]: expected_occurrences.setdefault(rc["dst"].hex(), []).append( (rev_id, rev_ts, None, maybe_path(rc["path"])) ) for dc in synth_rev["D_C"]: assert dc["prefix"] is not None # to please mypy expected_occurrences.setdefault(dc["dst"].hex(), []).append( (rev_id, rev_ts, None, maybe_path(dc["prefix"] + "/" + dc["path"])) ) for content_id, results in expected_occurrences.items(): expected = [(content_id, *result) for result in results] db_occurrences = [ ( occur.content.hex(), occur.revision.hex(), occur.date.timestamp(), occur.origin, occur.path.decode(), ) for occur in provenance.content_find_all(hash_to_bytes(content_id)) ] if provenance.storage.with_path(): # this is not true if the db stores no path, because a same content # that appears several times in a given revision may be reported # only once by content_find_all() assert len(db_occurrences) == len(expected) assert set(db_occurrences) == set(expected) @pytest.mark.parametrize( "repo, lower, mindepth", ( ("cmdbts2", True, 1), ("cmdbts2", False, 1), ("cmdbts2", True, 2), ("cmdbts2", False, 2), ("out-of-order", True, 1), ), ) @pytest.mark.parametrize("batch", (True, False)) def test_provenance_heuristics_content_find_first( provenance: ProvenanceInterface, archive: ArchiveInterface, repo: str, lower: bool, mindepth: int, batch: bool, ) -> None: # read data/README.md for more details on how these datasets are generated data = load_repo_data(repo) fill_storage(archive.storage, data) revisions = [ RevisionEntry( id=revision["id"], date=ts2dt(revision["date"]), root=revision["directory"], ) for revision in data["revision"] ] if batch: revision_add(provenance, archive, revisions, lower=lower, mindepth=mindepth) else: for revision in revisions: revision_add( provenance, archive, [revision], lower=lower, mindepth=mindepth ) syntheticfile = get_datafile( f"synthetic_{repo}_{'lower' if lower else 'upper'}_{mindepth}.txt" ) expected_first: Dict[str, Tuple[str, float, List[str]]] = {} # dict of tuples (blob_id, rev_id, [path, ...]) the third element for path # is a list because a content can be added at several places in a single # revision, in which case the result of content_find_first() is one of # those path, but we have no guarantee which one it will return. for synth_rev in synthetic_revision_content_result(syntheticfile): rev_id = synth_rev["sha1"].hex() rev_ts = synth_rev["date"] for rc in synth_rev["R_C"]: sha1 = rc["dst"].hex() if sha1 not in expected_first: assert rc["rel_ts"] == 0 expected_first[sha1] = (rev_id, rev_ts, [rc["path"]]) else: if rev_ts == expected_first[sha1][1]: expected_first[sha1][2].append(rc["path"]) elif rev_ts < expected_first[sha1][1]: expected_first[sha1] = (rev_id, rev_ts, [rc["path"]]) for dc in synth_rev["D_C"]: sha1 = rc["dst"].hex() assert sha1 in expected_first # nothing to do there, this content cannot be a "first seen file" for content_id, (rev_id, ts, paths) in expected_first.items(): occur = provenance.content_find_first(hash_to_bytes(content_id)) assert occur is not None assert occur.content.hex() == content_id assert occur.revision.hex() == rev_id assert occur.date.timestamp() == ts assert occur.origin is None if provenance.storage.with_path(): assert occur.path.decode() in paths