diff --git a/swh/provenance/model.py b/swh/provenance/model.py --- a/swh/provenance/model.py +++ b/swh/provenance/model.py @@ -6,14 +6,17 @@ from datetime import datetime from typing import Iterable, Iterator, List, Optional +from swh.model.hashutil import hash_to_bytes +from swh.model.identifiers import origin_identifier +from swh.model.model import Sha1Git + from .archive import ArchiveInterface class OriginEntry: - def __init__(self, url: str, date: datetime, snapshot: bytes): + def __init__(self, url: str, snapshot: Sha1Git): self.url = url - # TODO: this is probably not needed and will be removed! - # self.date = date + self.id: Sha1Git = hash_to_bytes(origin_identifier({"url": self.url})) self.snapshot = snapshot self._revisions: Optional[List[RevisionEntry]] = None @@ -33,10 +36,7 @@ return (x for x in self._revisions) def __str__(self): - return ( - f"" - ) + return f"" class RevisionEntry: diff --git a/swh/provenance/origin.py b/swh/provenance/origin.py --- a/swh/provenance/origin.py +++ b/swh/provenance/origin.py @@ -1,10 +1,9 @@ -from datetime import datetime, timezone from itertools import islice import logging import time from typing import Iterable, Iterator, List, Optional, Tuple -import iso8601 +from swh.model.model import Sha1Git from .archive import ArchiveInterface from .graph import HistoryNode, build_history_graph @@ -16,31 +15,28 @@ """Iterator over origin visit statuses typically present in the given CSV file. - The input is an iterator that produces 3 elements per row: + The input is an iterator that produces 2 elements per row: - (url, date, snap) + (url, snap) where: - url: is the origin url of the visit - - date: is the date of the visit - snap: sha1_git of the snapshot pointed by the visit status """ def __init__( self, - statuses: Iterable[Tuple[str, datetime, bytes]], + statuses: Iterable[Tuple[str, Sha1Git]], limit: Optional[int] = None, ): - self.statuses: Iterator[Tuple[str, datetime, bytes]] + self.statuses: Iterator[Tuple[str, Sha1Git]] if limit is not None: self.statuses = islice(statuses, limit) else: self.statuses = iter(statuses) def __iter__(self): - for url, date, snap in self.statuses: - date = iso8601.parse_date(date, default_timezone=timezone.utc) - yield OriginEntry(url, date, snap) + return (OriginEntry(url, snapshot) for url, snapshot in self.statuses) def origin_add( @@ -50,6 +46,7 @@ ): start = time.time() for origin in origins: + provenance.origin_add(origin) origin.retrieve_revisions(archive) for revision in origin.revisions: graph = build_history_graph(archive, provenance, revision) @@ -59,7 +56,7 @@ stop = time.time() logging.debug( "Origins " - ";".join([origin.url + ":" + origin.snapshot.hex() for origin in origins]) + ";".join([origin.id.hex() + ":" + origin.snapshot.hex() for origin in origins]) + f" were processed in {stop - start} secs (commit took {stop - done} secs)!" ) diff --git a/swh/provenance/postgresql/provenancedb_base.py b/swh/provenance/postgresql/provenancedb_base.py --- a/swh/provenance/postgresql/provenancedb_base.py +++ b/swh/provenance/postgresql/provenancedb_base.py @@ -6,6 +6,8 @@ import psycopg2 import psycopg2.extras +from swh.model.model import Sha1Git + class ProvenanceDBBase: def __init__(self, conn: psycopg2.extensions.connection): @@ -51,6 +53,16 @@ ): self.insert_relation(relation, data[relation]) + # Insert origins + self.insert_origin( + { + sha1: data["origin"]["data"][sha1] + for sha1 in data["origin"]["added"] + }, + ) + data["origin"]["data"].clear() + data["origin"]["added"].clear() + # Insert relations from the origin-revision layer self.insert_origin_head(data["revision_in_origin"]) self.insert_revision_history(data["revision_before_revision"]) @@ -58,12 +70,12 @@ # Update preferred origins self.update_preferred_origin( { - sha1: data["revision_preferred_origin"]["data"][sha1] - for sha1 in data["revision_preferred_origin"]["added"] + sha1: data["revision_origin"]["data"][sha1] + for sha1 in data["revision_origin"]["added"] } ) - data["revision_preferred_origin"]["data"].clear() - data["revision_preferred_origin"]["added"].clear() + data["revision_origin"]["data"].clear() + data["revision_origin"]["added"].clear() return True @@ -112,7 +124,22 @@ # This might be useless! data.clear() - def insert_origin_head(self, data: Set[Tuple[bytes, str]]): + def insert_origin(self, data: Dict[Sha1Git, str]): + if data: + psycopg2.extras.execute_values( + self.cursor, + """ + LOCK TABLE ONLY origin; + INSERT INTO origin(sha1, url) VALUES %s + ON CONFLICT DO NOTHING + """, + data.items(), + ) + # XXX: not sure if Python takes a reference or a copy. + # This might be useless! + data.clear() + + def insert_origin_head(self, data: Set[Tuple[Sha1Git, Sha1Git]]): if data: psycopg2.extras.execute_values( self.cursor, @@ -123,7 +150,7 @@ SELECT R.id, O.id FROM (VALUES %s) AS V(rev, org) INNER JOIN revision AS R on (R.sha1=V.rev) - INNER JOIN origin AS O on (O.url=V.org::unix_path) + INNER JOIN origin AS O on (O.sha1=V.org) """, data, ) @@ -150,10 +177,10 @@ ) data.clear() - def revision_get_preferred_origin(self, revision: bytes) -> Optional[str]: + def revision_get_preferred_origin(self, revision: Sha1Git) -> Optional[Sha1Git]: self.cursor.execute( """ - SELECT O.url + SELECT O.sha1 FROM revision AS R JOIN origin as O ON R.origin=O.id @@ -161,7 +188,7 @@ (revision,), ) row = self.cursor.fetchone() - return str(row[0], encoding="utf-8") if row is not None else None + return row[0] if row is not None else None def revision_in_history(self, revision: bytes) -> bool: self.cursor.execute( @@ -189,18 +216,18 @@ ) return self.cursor.fetchone() is not None - def update_preferred_origin(self, data: Dict[bytes, str]): + def update_preferred_origin(self, data: Dict[Sha1Git, Sha1Git]): if data: # XXX: this is assuming the revision already exists in the db! It should # be improved by allowing null dates in the revision table. psycopg2.extras.execute_values( self.cursor, """ - UPDATE revision + UPDATE revision R SET origin=O.id FROM (VALUES %s) AS V(rev, org) - INNER JOIN origin AS O on (O.url=V.org::unix_path) - WHERE sha1=V.rev + INNER JOIN origin AS O on (O.sha1=V.org) + WHERE R.sha1=V.rev """, data.items(), ) diff --git a/swh/provenance/provenance.py b/swh/provenance/provenance.py --- a/swh/provenance/provenance.py +++ b/swh/provenance/provenance.py @@ -6,6 +6,8 @@ import psycopg2 from typing_extensions import Literal, Protocol, TypedDict, runtime_checkable +from swh.model.model import Sha1Git + from .model import DirectoryEntry, FileEntry, OriginEntry, RevisionEntry @@ -70,6 +72,9 @@ ) -> None: ... + def origin_add(self, origin: OriginEntry) -> None: + ... + def revision_add(self, revision: RevisionEntry) -> None: ... @@ -86,7 +91,9 @@ def revision_get_early_date(self, revision: RevisionEntry) -> Optional[datetime]: ... - def revision_get_preferred_origin(self, revision: RevisionEntry) -> Optional[str]: + def revision_get_preferred_origin( + self, revision: RevisionEntry + ) -> Optional[Sha1Git]: ... def revision_in_history(self, revision: RevisionEntry) -> bool: @@ -107,8 +114,13 @@ class OriginCache(TypedDict): - data: Dict[bytes, str] - added: Set[bytes] + data: Dict[Sha1Git, str] + added: Set[Sha1Git] + + +class RevisionCache(TypedDict): + data: Dict[Sha1Git, Sha1Git] + added: Set[Sha1Git] class ProvenanceCache(TypedDict): @@ -120,9 +132,10 @@ content_in_directory: Set[Tuple[bytes, bytes, bytes]] directory_in_revision: Set[Tuple[bytes, bytes, bytes]] # these two are for the origin layer + origin: OriginCache + revision_origin: RevisionCache revision_before_revision: Dict[bytes, Set[bytes]] - revision_in_origin: Set[Tuple[bytes, str]] - revision_preferred_origin: OriginCache + revision_in_origin: Set[Tuple[Sha1Git, Sha1Git]] def new_cache(): @@ -133,9 +146,10 @@ content_in_revision=set(), content_in_directory=set(), directory_in_revision=set(), + origin=OriginCache(data={}, added=set()), + revision_origin=RevisionCache(data={}, added=set()), revision_before_revision={}, revision_in_origin=set(), - revision_preferred_origin=OriginCache(data={}, added=set()), ) @@ -238,6 +252,10 @@ cache["data"].update(self.storage.get_dates(entity, list(missing_ids))) return {sha1: cache["data"][sha1] for sha1 in ids if sha1 in cache["data"]} + def origin_add(self, origin: OriginEntry) -> None: + self.cache["origin"]["data"][origin.id] = origin.url + self.cache["origin"]["added"].add(origin.id) + def revision_add(self, revision: RevisionEntry): # Add current revision to the compact DB assert revision.date is not None @@ -252,17 +270,20 @@ ) def revision_add_to_origin(self, origin: OriginEntry, revision: RevisionEntry): - self.cache["revision_in_origin"].add((revision.id, origin.url)) + self.cache["revision_in_origin"].add((revision.id, origin.id)) def revision_get_early_date(self, revision: RevisionEntry) -> Optional[datetime]: return self.get_dates("revision", [revision.id]).get(revision.id, None) - def revision_get_preferred_origin(self, revision: RevisionEntry) -> Optional[str]: - if revision.id not in self.cache["revision_preferred_origin"]["data"]: - url = self.storage.revision_get_preferred_origin(revision.id) - if url is not None: - self.cache["revision_preferred_origin"]["data"][revision.id] = url - return self.cache["revision_preferred_origin"]["data"].get(revision.id) + def revision_get_preferred_origin( + self, revision: RevisionEntry + ) -> Optional[Sha1Git]: + cache = self.cache["revision_origin"] + if revision.id not in cache: + origin = self.storage.revision_get_preferred_origin(revision.id) + if origin is not None: + cache["data"][revision.id] = origin + return cache["data"].get(revision.id) def revision_in_history(self, revision: RevisionEntry) -> bool: return revision.id in self.cache[ @@ -272,8 +293,8 @@ def revision_set_preferred_origin( self, origin: OriginEntry, revision: RevisionEntry ): - self.cache["revision_preferred_origin"]["data"][revision.id] = origin.url - self.cache["revision_preferred_origin"]["added"].add(revision.id) + self.cache["revision_origin"]["data"][revision.id] = origin.id + self.cache["revision_origin"]["added"].add(revision.id) def revision_visited(self, revision: RevisionEntry) -> bool: return revision.id in dict( diff --git a/swh/provenance/sql/30-schema.sql b/swh/provenance/sql/30-schema.sql --- a/swh/provenance/sql/30-schema.sql +++ b/swh/provenance/sql/30-schema.sql @@ -67,9 +67,11 @@ create table origin ( id bigserial primary key, -- internal identifier of the origin + sha1 sha1_git unique not null, -- intrinsic identifier of the origin url unix_path unique not null -- url of the origin ); comment on column origin.id is 'Origin internal identifier'; +comment on column origin.sha1 is 'Origin intrinsic identifier'; comment on column origin.url is 'URL of the origin'; -- relation tables diff --git a/swh/provenance/tests/test_origin_iterator.py b/swh/provenance/tests/test_origin_iterator.py --- a/swh/provenance/tests/test_origin_iterator.py +++ b/swh/provenance/tests/test_origin_iterator.py @@ -6,9 +6,9 @@ from swh.model.tests.swh_model_data import TEST_OBJECTS from swh.provenance.origin import CSVOriginIterator from swh.storage.algos.origin import ( - iter_origins, - iter_origin_visits, iter_origin_visit_statuses, + iter_origin_visits, + iter_origins, ) @@ -21,9 +21,7 @@ swh_storage_with_objects, origin.url, visit.visit ): if status.snapshot is not None: - origins_csv.append( - (status.origin, status.date.isoformat(), status.snapshot) - ) + origins_csv.append((status.origin, status.snapshot)) origins = list(CSVOriginIterator(origins_csv)) assert origins assert len(origins) == len( diff --git a/swh/provenance/tests/test_provenance_db.py b/swh/provenance/tests/test_provenance_db.py --- a/swh/provenance/tests/test_provenance_db.py +++ b/swh/provenance/tests/test_provenance_db.py @@ -26,9 +26,7 @@ archive = ArchiveStorage(swh_storage_with_objects) for status in TEST_OBJECTS["origin_visit_status"]: if status.snapshot is not None: - entry = OriginEntry( - url=status.origin, date=status.date, snapshot=status.snapshot - ) + entry = OriginEntry(url=status.origin, snapshot=status.snapshot) origin_add(provenance, archive, [entry]) # TODO: check some facts here