diff --git a/swh/provenance/cli.py b/swh/provenance/cli.py --- a/swh/provenance/cli.py +++ b/swh/provenance/cli.py @@ -165,15 +165,14 @@ provenance = get_provenance(**ctx.obj["config"]["provenance"]) # TODO: return a dictionary with proper keys for each field - row = provenance.content_find_first(hash_to_bytes(swhid)) - if row is not None: + occur = provenance.content_find_first(hash_to_bytes(swhid)) + if occur is not None: print( - "swh:1:cnt:{cnt}, swh:1:rev:{rev}, {date}, {path}".format( - cnt=hash_to_hex(row[0]), - rev=hash_to_hex(row[1]), - date=row[2], - path=os.fsdecode(row[3]), - ) + f"swh:1:cnt:{hash_to_hex(occur.content)}, " + f"swh:1:rev:{hash_to_hex(occur.revision)}, " + f"{occur.date}, " + f"{occur.origin}, " + f"{os.fsdecode(occur.path)}" ) else: print(f"Cannot find a content with the id {swhid}") @@ -189,12 +188,11 @@ provenance = get_provenance(**ctx.obj["config"]["provenance"]) # TODO: return a dictionary with proper keys for each field - for row in provenance.content_find_all(hash_to_bytes(swhid), limit=limit): + for occur in provenance.content_find_all(hash_to_bytes(swhid), limit=limit): print( - "swh:1:cnt:{cnt}, swh:1:rev:{rev}, {date}, {path}".format( - cnt=hash_to_hex(row[0]), - rev=hash_to_hex(row[1]), - date=row[2], - path=os.fsdecode(row[3]), - ) + f"swh:1:cnt:{hash_to_hex(occur.content)}, " + f"swh:1:rev:{hash_to_hex(occur.revision)}, " + f"{occur.date}, " + f"{occur.origin}, " + f"{os.fsdecode(occur.path)}" ) diff --git a/swh/provenance/origin.py b/swh/provenance/origin.py --- a/swh/provenance/origin.py +++ b/swh/provenance/origin.py @@ -52,7 +52,7 @@ graph = build_history_graph(archive, provenance, revision) origin_add_revision(provenance, origin, graph) done = time.time() - provenance.commit() + provenance.flush() stop = time.time() logging.debug( "Origins " diff --git a/swh/provenance/postgresql/provenancedb_base.py b/swh/provenance/postgresql/provenancedb_base.py --- a/swh/provenance/postgresql/provenancedb_base.py +++ b/swh/provenance/postgresql/provenancedb_base.py @@ -1,13 +1,15 @@ from datetime import datetime import itertools import logging -from typing import Any, Dict, Generator, List, Optional, Set, Tuple +from typing import Any, Dict, Generator, List, Mapping, Optional, Set, Tuple import psycopg2 import psycopg2.extras from swh.model.model import Sha1Git +from ..provenance import ProvenanceResult + class ProvenanceDBBase: def __init__(self, conn: psycopg2.extensions.connection): @@ -31,7 +33,7 @@ def with_path(self) -> bool: return self.flavor == "with-path" - def commit(self, data: Dict[str, Any], raise_on_commit: bool = False) -> bool: + def commit(self, data: Mapping[str, Any], raise_on_commit: bool = False) -> bool: try: # First insert entities for entity in ("content", "directory", "revision"): @@ -87,14 +89,12 @@ return False - def content_find_first( - self, id: Sha1Git - ) -> Optional[Tuple[Sha1Git, Sha1Git, datetime, bytes]]: + def content_find_first(self, id: Sha1Git) -> Optional[ProvenanceResult]: ... def content_find_all( self, id: Sha1Git, limit: Optional[int] = None - ) -> Generator[Tuple[Sha1Git, Sha1Git, datetime, bytes], None, None]: + ) -> Generator[ProvenanceResult, None, None]: ... def get_dates(self, entity: str, ids: List[Sha1Git]) -> Dict[Sha1Git, datetime]: diff --git a/swh/provenance/postgresql/provenancedb_with_path.py b/swh/provenance/postgresql/provenancedb_with_path.py --- a/swh/provenance/postgresql/provenancedb_with_path.py +++ b/swh/provenance/postgresql/provenancedb_with_path.py @@ -1,4 +1,3 @@ -from datetime import datetime from typing import Generator, Optional, Set, Tuple import psycopg2 @@ -6,66 +5,80 @@ from swh.model.model import Sha1Git +from ..provenance import ProvenanceResult from .provenancedb_base import ProvenanceDBBase class ProvenanceWithPathDB(ProvenanceDBBase): - def content_find_first( - self, id: Sha1Git - ) -> Optional[Tuple[Sha1Git, Sha1Git, datetime, bytes]]: + def content_find_first(self, id: Sha1Git) -> Optional[ProvenanceResult]: self.cursor.execute( """ SELECT C.sha1 AS blob, R.sha1 AS rev, R.date AS date, + O.url AS url, L.path AS path FROM content AS C - INNER JOIN content_in_revision AS CR ON (CR.content = C.id) - INNER JOIN location as L ON (CR.location = L.id) - INNER JOIN revision as R ON (CR.revision = R.id) + INNER JOIN content_in_revision AS CR ON (CR.content=C.id) + INNER JOIN location as L ON (CR.location=L.id) + INNER JOIN revision as R ON (CR.revision=R.id) + LEFT JOIN origin as O ON (R.origin=O.id) WHERE C.sha1=%s - ORDER BY date, rev, path ASC LIMIT 1 + ORDER BY date, rev, url, path ASC LIMIT 1 """, (id,), ) - return self.cursor.fetchone() + row = self.cursor.fetchone() + if row: + return ProvenanceResult( + content=row[0], revision=row[1], date=row[2], origin=row[3], path=row[4] + ) + else: + return None def content_find_all( self, id: Sha1Git, limit: Optional[int] = None - ) -> Generator[Tuple[Sha1Git, Sha1Git, datetime, bytes], None, None]: + ) -> Generator[ProvenanceResult, None, None]: early_cut = f"LIMIT {limit}" if limit is not None else "" self.cursor.execute( f""" (SELECT C.sha1 AS blob, R.sha1 AS rev, R.date AS date, + O.url AS url, L.path AS path FROM content AS C - INNER JOIN content_in_revision AS CR ON (CR.content = C.id) - INNER JOIN location AS L ON (CR.location = L.id) - INNER JOIN revision AS R ON (CR.revision = R.id) + INNER JOIN content_in_revision AS CR ON (CR.content=C.id) + INNER JOIN location AS L ON (CR.location=L.id) + INNER JOIN revision AS R ON (CR.revision=R.id) + LEFT JOIN origin as O ON (R.origin=O.id) WHERE C.sha1=%s) UNION (SELECT C.sha1 AS content, R.sha1 AS revision, R.date AS date, + O.url AS url, CASE DL.path WHEN '' THEN CL.path WHEN '.' THEN CL.path ELSE (DL.path || '/' || CL.path)::unix_path END AS path FROM content AS C - INNER JOIN content_in_directory AS CD ON (C.id = CD.content) - INNER JOIN directory_in_revision AS DR ON (CD.directory = DR.directory) - INNER JOIN revision AS R ON (DR.revision = R.id) - INNER JOIN location AS CL ON (CD.location = CL.id) - INNER JOIN location AS DL ON (DR.location = DL.id) + INNER JOIN content_in_directory AS CD ON (C.id=CD.content) + INNER JOIN directory_in_revision AS DR ON (CD.directory=DR.directory) + INNER JOIN revision AS R ON (DR.revision=R.id) + INNER JOIN location AS CL ON (CD.location=CL.id) + INNER JOIN location AS DL ON (DR.location=DL.id) + LEFT JOIN origin AS O ON (R.origin=O.id) WHERE C.sha1=%s) - ORDER BY date, rev, path {early_cut} + ORDER BY date, rev, url, path {early_cut} """, (id, id), ) - yield from self.cursor.fetchall() + for row in self.cursor.fetchall(): + yield ProvenanceResult( + content=row[0], revision=row[1], date=row[2], origin=row[3], path=row[4] + ) def insert_relation(self, relation: str, data: Set[Tuple[Sha1Git, Sha1Git, bytes]]): """Insert entries in `relation` from `data` diff --git a/swh/provenance/postgresql/provenancedb_without_path.py b/swh/provenance/postgresql/provenancedb_without_path.py --- a/swh/provenance/postgresql/provenancedb_without_path.py +++ b/swh/provenance/postgresql/provenancedb_without_path.py @@ -1,4 +1,3 @@ -from datetime import datetime from typing import Generator, Optional, Set, Tuple import psycopg2 @@ -6,58 +5,72 @@ from swh.model.model import Sha1Git +from ..provenance import ProvenanceResult from .provenancedb_base import ProvenanceDBBase class ProvenanceWithoutPathDB(ProvenanceDBBase): - def content_find_first( - self, id: Sha1Git - ) -> Optional[Tuple[Sha1Git, Sha1Git, datetime, bytes]]: + def content_find_first(self, id: Sha1Git) -> Optional[ProvenanceResult]: self.cursor.execute( """ SELECT C.sha1 AS blob, R.sha1 AS rev, R.date AS date, + O.url AS url, '\\x'::bytea as path FROM content AS C - INNER JOIN content_in_revision AS CR ON (CR.content = C.id) - INNER JOIN revision as R ON (CR.revision = R.id) + INNER JOIN content_in_revision AS CR ON (CR.content=C.id) + INNER JOIN revision as R ON (CR.revision=R.id) + LEFT JOIN origin as O ON (R.origin=O.id) WHERE C.sha1=%s - ORDER BY date, rev ASC LIMIT 1 + ORDER BY date, rev, url ASC LIMIT 1 """, (id,), ) - return self.cursor.fetchone() + row = self.cursor.fetchone() + if row: + return ProvenanceResult( + content=row[0], revision=row[1], date=row[2], origin=row[3], path=row[4] + ) + else: + return None def content_find_all( self, id: Sha1Git, limit: Optional[int] = None - ) -> Generator[Tuple[Sha1Git, Sha1Git, datetime, bytes], None, None]: + ) -> Generator[ProvenanceResult, None, None]: early_cut = f"LIMIT {limit}" if limit is not None else "" self.cursor.execute( f""" (SELECT C.sha1 AS blob, R.sha1 AS rev, R.date AS date, + O.url AS url, '\\x'::bytea as path FROM content AS C - INNER JOIN content_in_revision AS CR ON (CR.content = C.id) - INNER JOIN revision AS R ON (CR.revision = R.id) + INNER JOIN content_in_revision AS CR ON (CR.content=C.id) + INNER JOIN revision AS R ON (CR.revision=R.id) + LEFT JOIN origin as O ON (R.origin=O.id) WHERE C.sha1=%s) UNION (SELECT C.sha1 AS content, R.sha1 AS revision, R.date AS date, + O.url AS url, '\\x'::bytea as path FROM content AS C - INNER JOIN content_in_directory AS CD ON (C.id = CD.content) - INNER JOIN directory_in_revision AS DR ON (CD.directory = DR.directory) - INNER JOIN revision AS R ON (DR.revision = R.id) + INNER JOIN content_in_directory AS CD ON (C.id=CD.content) + INNER JOIN directory_in_revision AS DR ON (CD.directory=DR.directory) + INNER JOIN revision AS R ON (DR.revision=R.id) + LEFT JOIN origin as O ON (R.origin=O.id) WHERE C.sha1=%s) - ORDER BY date, rev, path {early_cut} + ORDER BY date, rev, url {early_cut} """, (id, id), ) - yield from self.cursor.fetchall() + for row in self.cursor.fetchall(): + yield ProvenanceResult( + content=row[0], revision=row[1], date=row[2], origin=row[3], path=row[4] + ) def insert_relation(self, relation: str, data: Set[Tuple[Sha1Git, Sha1Git, bytes]]): if data: diff --git a/swh/provenance/provenance.py b/swh/provenance/provenance.py --- a/swh/provenance/provenance.py +++ b/swh/provenance/provenance.py @@ -11,100 +11,156 @@ from .model import DirectoryEntry, FileEntry, OriginEntry, RevisionEntry -# XXX: this protocol doesn't make much sense now that flavours have been delegated to -# another class, lower in the callstack. +class ProvenanceResult: + def __init__( + self, + content: Sha1Git, + revision: Sha1Git, + date: datetime, + origin: Optional[str], + path: bytes, + ) -> None: + self.content = content + self.revision = revision + self.date = date + self.origin = origin + self.path = path + + @runtime_checkable class ProvenanceInterface(Protocol): raise_on_commit: bool = False - def commit(self): - """Commit currently ongoing transactions in the backend DB""" + def flush(self) -> None: + """Flush internal cache to the underlying `storage`.""" ... def content_add_to_directory( self, directory: DirectoryEntry, blob: FileEntry, prefix: bytes ) -> None: + """Associate `blob` with `directory` in the provenance model. `prefix` is the + relative path from `directory` to `blob` (excluding `blob`'s name). + """ ... def content_add_to_revision( self, revision: RevisionEntry, blob: FileEntry, prefix: bytes ) -> None: + """Associate `blob` with `revision` in the provenance model. `prefix` is the + absolute path from `revision`'s root directory to `blob` (excluding `blob`'s + name). + """ ... - def content_find_first( - self, id: Sha1Git - ) -> Optional[Tuple[Sha1Git, Sha1Git, datetime, bytes]]: + def content_find_first(self, id: Sha1Git) -> Optional[ProvenanceResult]: + """Retrieve the first occurrence of the blob identified by `id`.""" ... def content_find_all( self, id: Sha1Git, limit: Optional[int] = None - ) -> Generator[Tuple[Sha1Git, Sha1Git, datetime, bytes], None, None]: + ) -> Generator[ProvenanceResult, None, None]: + """Retrieve all the occurrences of the blob identified by `id`.""" ... def content_get_early_date(self, blob: FileEntry) -> Optional[datetime]: + """Retrieve the earliest known date of `blob`.""" ... def content_get_early_dates( self, blobs: Iterable[FileEntry] ) -> Dict[Sha1Git, datetime]: + """Retrieve the earliest known date for each blob in `blobs`. If some blob has + no associated date, it is not present in the resulting dictionary. + """ ... def content_set_early_date(self, blob: FileEntry, date: datetime) -> None: + """Associate `date` to `blob` as it's earliest known date.""" ... def directory_add_to_revision( self, revision: RevisionEntry, directory: DirectoryEntry, path: bytes ) -> None: + """Associate `directory` with `revision` in the provenance model. `path` is the + absolute path from `revision`'s root directory to `directory` (including + `directory`'s name). + """ ... def directory_get_date_in_isochrone_frontier( self, directory: DirectoryEntry ) -> Optional[datetime]: + """Retrieve the earliest known date of `directory` as an isochrone frontier in + the provenance model. + """ ... def directory_get_dates_in_isochrone_frontier( self, dirs: Iterable[DirectoryEntry] ) -> Dict[Sha1Git, datetime]: + """Retrieve the earliest known date for each directory in `dirs` as isochrone + frontiers provenance model. If some directory has no associated date, it is not + present in the resulting dictionary. + """ ... def directory_set_date_in_isochrone_frontier( self, directory: DirectoryEntry, date: datetime ) -> None: + """Associate `date` to `directory` as it's earliest known date as an isochrone + frontier in the provenance model. + """ ... def origin_add(self, origin: OriginEntry) -> None: + """Add `origin` to the provenance model.""" ... def revision_add(self, revision: RevisionEntry) -> None: + """Add `revision` to the provenance model. This implies storing `revision`'s + date in the model, thus `revision.date` must be a valid date. + """ ... def revision_add_before_revision( - self, relative: RevisionEntry, revision: RevisionEntry + self, head: RevisionEntry, revision: RevisionEntry ) -> None: + """Associate `revision` to `head` as an ancestor of the latter.""" ... def revision_add_to_origin( self, origin: OriginEntry, revision: RevisionEntry ) -> None: + """Associate `revision` to `origin` as a head revision of the latter (ie. the + target of an snapshot for `origin` in the archive).""" ... - def revision_get_early_date(self, revision: RevisionEntry) -> Optional[datetime]: + def revision_get_date(self, revision: RevisionEntry) -> Optional[datetime]: + """Retrieve the date associated to `revision`.""" ... def revision_get_preferred_origin( self, revision: RevisionEntry ) -> Optional[Sha1Git]: + """Retrieve the preferred origin associated to `revision`.""" ... def revision_in_history(self, revision: RevisionEntry) -> bool: + """Check if `revision` is known to be an ancestor of some head revision in the + provenance model. + """ ... def revision_set_preferred_origin( self, origin: OriginEntry, revision: RevisionEntry ) -> None: + """Associate `origin` as the preferred origin for `revision`.""" ... def revision_visited(self, revision: RevisionEntry) -> bool: + """Check if `revision` is known to be a head revision for some origin in the + provenance model. + """ ... @@ -138,7 +194,7 @@ revision_in_origin: Set[Tuple[Sha1Git, Sha1Git]] -def new_cache(): +def new_cache() -> ProvenanceCache: return ProvenanceCache( content=DatetimeCache(data={}, added=set()), directory=DatetimeCache(data={}, added=set()), @@ -173,39 +229,37 @@ self.storage = ProvenanceWithoutPathDB(conn) self.cache: ProvenanceCache = new_cache() - def clear_caches(self): + def clear_caches(self) -> None: self.cache = new_cache() - def commit(self): + def flush(self) -> None: # TODO: for now we just forward the cache. This should be improved! while not self.storage.commit(self.cache, raise_on_commit=self.raise_on_commit): logging.warning( - f"Unable to commit cached information {self.write_cache}. Retrying..." + f"Unable to commit cached information {self.cache}. Retrying..." ) self.clear_caches() def content_add_to_directory( self, directory: DirectoryEntry, blob: FileEntry, prefix: bytes - ): + ) -> None: self.cache["content_in_directory"].add( (blob.id, directory.id, normalize(os.path.join(prefix, blob.name))) ) def content_add_to_revision( self, revision: RevisionEntry, blob: FileEntry, prefix: bytes - ): + ) -> None: self.cache["content_in_revision"].add( (blob.id, revision.id, normalize(os.path.join(prefix, blob.name))) ) - def content_find_first( - self, id: Sha1Git - ) -> Optional[Tuple[Sha1Git, Sha1Git, datetime, bytes]]: + def content_find_first(self, id: Sha1Git) -> Optional[ProvenanceResult]: return self.storage.content_find_first(id) def content_find_all( self, id: Sha1Git, limit: Optional[int] = None - ) -> Generator[Tuple[Sha1Git, Sha1Git, datetime, bytes], None, None]: + ) -> Generator[ProvenanceResult, None, None]: yield from self.storage.content_find_all(id, limit=limit) def content_get_early_date(self, blob: FileEntry) -> Optional[datetime]: @@ -216,13 +270,13 @@ ) -> Dict[Sha1Git, datetime]: return self.get_dates("content", [blob.id for blob in blobs]) - def content_set_early_date(self, blob: FileEntry, date: datetime): + def content_set_early_date(self, blob: FileEntry, date: datetime) -> None: self.cache["content"]["data"][blob.id] = date self.cache["content"]["added"].add(blob.id) def directory_add_to_revision( self, revision: RevisionEntry, directory: DirectoryEntry, path: bytes - ): + ) -> None: self.cache["directory_in_revision"].add( (directory.id, revision.id, normalize(path)) ) @@ -239,7 +293,7 @@ def directory_set_date_in_isochrone_frontier( self, directory: DirectoryEntry, date: datetime - ): + ) -> None: self.cache["directory"]["data"][directory.id] = date self.cache["directory"]["added"].add(directory.id) @@ -261,21 +315,23 @@ self.cache["origin"]["data"][origin.id] = origin.url self.cache["origin"]["added"].add(origin.id) - def revision_add(self, revision: RevisionEntry): + def revision_add(self, revision: RevisionEntry) -> None: self.cache["revision"]["data"][revision.id] = revision.date self.cache["revision"]["added"].add(revision.id) def revision_add_before_revision( - self, relative: RevisionEntry, revision: RevisionEntry - ): + self, head: RevisionEntry, revision: RevisionEntry + ) -> None: self.cache["revision_before_revision"].setdefault(revision.id, set()).add( - relative.id + head.id ) - def revision_add_to_origin(self, origin: OriginEntry, revision: RevisionEntry): + def revision_add_to_origin( + self, origin: OriginEntry, revision: RevisionEntry + ) -> None: self.cache["revision_in_origin"].add((revision.id, origin.id)) - def revision_get_early_date(self, revision: RevisionEntry) -> Optional[datetime]: + def revision_get_date(self, revision: RevisionEntry) -> Optional[datetime]: return self.get_dates("revision", [revision.id]).get(revision.id) def revision_get_preferred_origin( @@ -295,7 +351,7 @@ def revision_set_preferred_origin( self, origin: OriginEntry, revision: RevisionEntry - ): + ) -> None: self.cache["revision_origin"]["data"][revision.id] = origin.id self.cache["revision_origin"]["added"].add(revision.id) diff --git a/swh/provenance/revision.py b/swh/provenance/revision.py --- a/swh/provenance/revision.py +++ b/swh/provenance/revision.py @@ -69,7 +69,7 @@ assert revision.date is not None assert revision.root is not None # Processed content starting from the revision's root directory. - date = provenance.revision_get_early_date(revision) + date = provenance.revision_get_date(revision) if date is None or revision.date < date: logging.debug( f"Processing revisions {revision.id.hex()}" @@ -93,16 +93,12 @@ ) done = time.time() if commit: - provenance.commit() + provenance.flush() stop = time.time() logging.debug( f"Revisions {';'.join([revision.id.hex() for revision in revisions])} " f" were processed in {stop - start} secs (commit took {stop - done} secs)!" ) - # logging.critical( - # ";".join([revision.id.hex() for revision in revisions]) - # + f",{stop - start},{stop - done}" - # ) def revision_process_content( diff --git a/swh/provenance/tests/test_conftest.py b/swh/provenance/tests/test_conftest.py --- a/swh/provenance/tests/test_conftest.py +++ b/swh/provenance/tests/test_conftest.py @@ -7,7 +7,7 @@ def test_provenance_fixture(provenance): """Check the 'provenance' fixture produce a working ProvenanceDB object""" assert provenance - provenance.commit() # should be a noop + provenance.flush() # should be a noop def test_storage(swh_storage_with_objects): diff --git a/swh/provenance/tests/test_history_graph.py b/swh/provenance/tests/test_history_graph.py --- a/swh/provenance/tests/test_history_graph.py +++ b/swh/provenance/tests/test_history_graph.py @@ -59,4 +59,4 @@ origin_add_revision(provenance, entry, computed_graph) if not batch: - provenance.commit() + provenance.flush() diff --git a/swh/provenance/tests/test_provenance_heuristics.py b/swh/provenance/tests/test_provenance_heuristics.py --- a/swh/provenance/tests/test_provenance_heuristics.py +++ b/swh/provenance/tests/test_provenance_heuristics.py @@ -253,21 +253,25 @@ for rc in synth_rev["R_C"]: expected_occurrences.setdefault(rc["dst"].hex(), []).append( - (rev_id, rev_ts, maybe_path(rc["path"])) + (rev_id, rev_ts, None, maybe_path(rc["path"])) ) for dc in synth_rev["D_C"]: assert dc["prefix"] is not None # to please mypy expected_occurrences.setdefault(dc["dst"].hex(), []).append( - (rev_id, rev_ts, maybe_path(dc["prefix"] + "/" + dc["path"])) + (rev_id, rev_ts, None, maybe_path(dc["prefix"] + "/" + dc["path"])) ) for content_id, results in expected_occurrences.items(): expected = [(content_id, *result) for result in results] db_occurrences = [ - (blob.hex(), rev.hex(), date.timestamp(), path.decode()) - for blob, rev, date, path in provenance.content_find_all( - hash_to_bytes(content_id) + ( + occur.content.hex(), + occur.revision.hex(), + occur.date.timestamp(), + occur.origin, + occur.path.decode(), ) + for occur in provenance.content_find_all(hash_to_bytes(content_id)) ] if provenance.storage.with_path: # this is not true if the db stores no path, because a same content @@ -337,11 +341,10 @@ # nothing to do there, this content cannot be a "first seen file" for content_id, (rev_id, ts, paths) in expected_first.items(): - (r_sha1, r_rev_id, r_ts, r_path) = provenance.content_find_first( - hash_to_bytes(content_id) - ) - assert r_sha1.hex() == content_id - assert r_rev_id.hex() == rev_id - assert r_ts.timestamp() == ts + occur = provenance.content_find_first(hash_to_bytes(content_id)) + assert occur.content.hex() == content_id + assert occur.revision.hex() == rev_id + assert occur.date.timestamp() == ts + assert occur.origin is None if provenance.storage.with_path: - assert r_path.decode() in paths + assert occur.path.decode() in paths