diff --git a/swh/provenance/model.py b/swh/provenance/model.py index 9464271..eb8c444 100644 --- a/swh/provenance/model.py +++ b/swh/provenance/model.py @@ -1,161 +1,158 @@ # Copyright (C) 2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from datetime import datetime from typing import Iterable, Iterator, List, Optional from .archive import ArchiveInterface class OriginEntry: - def __init__( - self, url: str, date: datetime, snapshot: bytes, id: Optional[int] = None - ): + def __init__(self, url: str, date: datetime, snapshot: bytes): self.url = url # TODO: this is probably not needed and will be removed! # self.date = date self.snapshot = snapshot - self.id = id self._revisions: Optional[List[RevisionEntry]] = None def retrieve_revisions(self, archive: ArchiveInterface): if self._revisions is None: self._revisions = [ RevisionEntry(rev) for rev in archive.snapshot_get_heads(self.snapshot) ] @property def revisions(self) -> Iterator["RevisionEntry"]: if self._revisions is None: raise RuntimeError( "Revisions of this node has not yet been retrieved. " "Please call retrieve_revisions() before using this property." ) return (x for x in self._revisions) def __str__(self): return ( f"" ) class RevisionEntry: def __init__( self, id: bytes, date: Optional[datetime] = None, root: Optional[bytes] = None, parents: Optional[Iterable[bytes]] = None, ): self.id = id self.date = date assert self.date is None or self.date.tzinfo is not None self.root = root self._parents_ids = parents self._parents_entries: Optional[List[RevisionEntry]] = None def retrieve_parents(self, archive: ArchiveInterface): if self._parents_entries is None: if self._parents_ids is None: revision = list(archive.revision_get([self.id])) if revision: self._parents_ids = revision[0].parents else: self._parents_ids = [] self._parents_entries = [ RevisionEntry( id=rev.id, root=rev.directory, date=rev.date.to_datetime(), parents=rev.parents, ) for rev in archive.revision_get(self._parents_ids) if rev.date is not None ] @property def parents(self) -> Iterator["RevisionEntry"]: if self._parents_entries is None: raise RuntimeError( "Parents of this node has not yet been retrieved. " "Please call retrieve_parents() before using this property." ) return (x for x in self._parents_entries) def __str__(self): return ( f"" ) class DirectoryEntry: def __init__(self, id: bytes, name: bytes = b""): self.id = id self.name = name self._files: Optional[List[FileEntry]] = None self._dirs: Optional[List[DirectoryEntry]] = None def retrieve_children(self, archive: ArchiveInterface): if self._files is None and self._dirs is None: self._files = [] self._dirs = [] for child in archive.directory_ls(self.id): if child["type"] == "dir": self._dirs.append( DirectoryEntry(child["target"], name=child["name"]) ) elif child["type"] == "file": self._files.append(FileEntry(child["target"], child["name"])) @property def files(self) -> Iterator["FileEntry"]: if self._files is None: raise RuntimeError( "Children of this node has not yet been retrieved. " "Please call retrieve_children() before using this property." ) return (x for x in self._files) @property def dirs(self) -> Iterator["DirectoryEntry"]: if self._dirs is None: raise RuntimeError( "Children of this node has not yet been retrieved. " "Please call retrieve_children() before using this property." ) return (x for x in self._dirs) def __str__(self): return f"" def __eq__(self, other): return isinstance(other, DirectoryEntry) and (self.id, self.name) == ( other.id, other.name, ) def __hash__(self): return hash((self.id, self.name)) class FileEntry: def __init__(self, id: bytes, name: bytes): self.id = id self.name = name def __str__(self): return f"" def __eq__(self, other): return isinstance(other, FileEntry) and (self.id, self.name) == ( other.id, other.name, ) def __hash__(self): return hash((self.id, self.name)) diff --git a/swh/provenance/origin.py b/swh/provenance/origin.py index f36c7db..3f89efc 100644 --- a/swh/provenance/origin.py +++ b/swh/provenance/origin.py @@ -1,106 +1,104 @@ from datetime import datetime, timezone from itertools import islice import logging import time from typing import Iterable, Iterator, List, Optional, Tuple import iso8601 from .archive import ArchiveInterface from .graph import HistoryNode, build_history_graph from .model import OriginEntry, RevisionEntry from .provenance import ProvenanceInterface class CSVOriginIterator: """Iterator over origin visit statuses typically present in the given CSV file. The input is an iterator that produces 3 elements per row: (url, date, snap) where: - url: is the origin url of the visit - date: is the date of the visit - snap: sha1_git of the snapshot pointed by the visit status """ def __init__( self, statuses: Iterable[Tuple[str, datetime, bytes]], limit: Optional[int] = None, ): self.statuses: Iterator[Tuple[str, datetime, bytes]] if limit is not None: self.statuses = islice(statuses, limit) else: self.statuses = iter(statuses) def __iter__(self): for url, date, snap in self.statuses: date = iso8601.parse_date(date, default_timezone=timezone.utc) yield OriginEntry(url, date, snap) def origin_add( provenance: ProvenanceInterface, archive: ArchiveInterface, origins: List[OriginEntry], ): start = time.time() for origin in origins: origin.retrieve_revisions(archive) for revision in origin.revisions: graph = build_history_graph(archive, provenance, revision) origin_add_revision(provenance, origin, graph) done = time.time() provenance.commit() stop = time.time() logging.debug( "Origins " ";".join([origin.url + ":" + origin.snapshot.hex() for origin in origins]) + f" were processed in {stop - start} secs (commit took {stop - done} secs)!" ) def origin_add_revision( provenance: ProvenanceInterface, origin: OriginEntry, graph: HistoryNode, ): - origin.id = provenance.origin_get_id(origin) - # head is treated separately since it should always be added to the given origin head = graph.entry check_preferred_origin(provenance, origin, head) provenance.revision_add_to_origin(origin, head) # head's history should be recursively iterated starting from its parents stack = list(graph.parents) while stack: current = stack.pop() check_preferred_origin(provenance, origin, current.entry) if current.visited: # if current revision was already visited just add it to the current origin # and stop recursion (its history has already been flattened) provenance.revision_add_to_origin(origin, current.entry) else: # if current revision was not visited before create a link between it and # the head, and recursively walk its history provenance.revision_add_before_revision(head, current.entry) for parent in current.parents: stack.append(parent) def check_preferred_origin( provenance: ProvenanceInterface, origin: OriginEntry, revision: RevisionEntry, ): # if the revision has no preferred origin just set the given origin as the # preferred one. TODO: this should be improved in the future! preferred = provenance.revision_get_preferred_origin(revision) if preferred is None: provenance.revision_set_preferred_origin(origin, revision) diff --git a/swh/provenance/postgresql/provenancedb_base.py b/swh/provenance/postgresql/provenancedb_base.py index 8ab3842..293dd93 100644 --- a/swh/provenance/postgresql/provenancedb_base.py +++ b/swh/provenance/postgresql/provenancedb_base.py @@ -1,215 +1,207 @@ from datetime import datetime import itertools import logging from typing import Any, Dict, Generator, List, Optional, Set, Tuple import psycopg2 import psycopg2.extras class ProvenanceDBBase: def __init__(self, conn: psycopg2.extensions.connection): conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT) conn.set_session(autocommit=True) self.conn = conn self.cursor = self.conn.cursor() # XXX: not sure this is the best place to do it! self.cursor.execute("SET timezone TO 'UTC'") self._flavor: Optional[str] = None @property def flavor(self) -> str: if self._flavor is None: self.cursor.execute("select swh_get_dbflavor()") self._flavor = self.cursor.fetchone()[0] assert self._flavor is not None return self._flavor @property def with_path(self) -> bool: return self.flavor == "with-path" def commit(self, data: Dict[str, Any], raise_on_commit: bool = False) -> bool: try: # First insert entities for entity in ("content", "directory", "revision"): self.insert_entity( entity, { sha1: data[entity]["data"][sha1] for sha1 in data[entity]["added"] }, ) data[entity]["data"].clear() data[entity]["added"].clear() # Relations should come after ids for entities were resolved for relation in ( "content_in_revision", "content_in_directory", "directory_in_revision", ): self.insert_relation(relation, data[relation]) # Insert relations from the origin-revision layer self.insert_origin_head(data["revision_in_origin"]) self.insert_revision_history(data["revision_before_revision"]) # Update preferred origins self.update_preferred_origin( { sha1: data["revision_preferred_origin"]["data"][sha1] for sha1 in data["revision_preferred_origin"]["added"] } ) data["revision_preferred_origin"]["data"].clear() data["revision_preferred_origin"]["added"].clear() return True except: # noqa: E722 # Unexpected error occurred, rollback all changes and log message logging.exception("Unexpected error") if raise_on_commit: raise return False def content_find_first( self, blob: bytes ) -> Optional[Tuple[bytes, bytes, datetime, bytes]]: ... def content_find_all( self, blob: bytes, limit: Optional[int] = None ) -> Generator[Tuple[bytes, bytes, datetime, bytes], None, None]: ... def get_dates(self, entity: str, ids: List[bytes]) -> Dict[bytes, datetime]: dates = {} if ids: values = ", ".join(itertools.repeat("%s", len(ids))) self.cursor.execute( f"""SELECT sha1, date FROM {entity} WHERE sha1 IN ({values})""", tuple(ids), ) dates.update(self.cursor.fetchall()) return dates def insert_entity(self, entity: str, data: Dict[bytes, datetime]): if data: psycopg2.extras.execute_values( self.cursor, f""" LOCK TABLE ONLY {entity}; INSERT INTO {entity}(sha1, date) VALUES %s ON CONFLICT (sha1) DO UPDATE SET date=LEAST(EXCLUDED.date,{entity}.date) """, data.items(), ) # XXX: not sure if Python takes a reference or a copy. # This might be useless! data.clear() - def insert_origin_head(self, data: Dict[bytes, int]): + def insert_origin_head(self, data: Set[Tuple[bytes, str]]): if data: psycopg2.extras.execute_values( self.cursor, # XXX: not clear how conflicts are handled here! """ LOCK TABLE ONLY revision_in_origin; INSERT INTO revision_in_origin - SELECT R.id, V.org + SELECT R.id, O.id FROM (VALUES %s) AS V(rev, org) INNER JOIN revision AS R on (R.sha1=V.rev) + INNER JOIN origin AS O on (O.url=V.org::unix_path) """, data, ) data.clear() def insert_relation(self, relation: str, data: Set[Tuple[bytes, bytes, bytes]]): ... def insert_revision_history(self, data: Dict[bytes, bytes]): if data: values = [[(prev, next) for next in data[prev]] for prev in data] psycopg2.extras.execute_values( self.cursor, # XXX: not clear how conflicts are handled here! """ LOCK TABLE ONLY revision_before_revision; INSERT INTO revision_before_revision SELECT P.id, N.id FROM (VALUES %s) AS V(prev, next) INNER JOIN revision AS P on (P.sha1=V.prev) INNER JOIN revision AS N on (N.sha1=V.next) """, tuple(sum(values, [])), ) data.clear() - def origin_get_id(self, url: str) -> int: - # Insert origin in the DB and return the assigned id - # XXX: not sure this works as expected if url is already in the db! + def revision_get_preferred_origin(self, revision: bytes) -> Optional[str]: self.cursor.execute( """ - LOCK TABLE ONLY origin; - INSERT INTO origin(url) VALUES (%s) - ON CONFLICT DO NOTHING - RETURNING id - """, - (url,), - ) - return self.cursor.fetchone()[0] - - def revision_get_preferred_origin(self, revision: bytes) -> Optional[int]: - self.cursor.execute( - """SELECT COALESCE(origin, 0) FROM revision WHERE sha1=%s""", (revision,) + SELECT O.url + FROM revision AS R + JOIN origin as O + ON R.origin=O.id + WHERE R.sha1=%s""", + (revision,), ) row = self.cursor.fetchone() - # None means revision is not in database; - # 0 means revision has no preferred origin - return row[0] if row is not None and row[0] != 0 else None + return str(row[0], encoding="utf-8") if row is not None else None def revision_in_history(self, revision: bytes) -> bool: self.cursor.execute( """ SELECT 1 FROM revision_before_revision JOIN revision ON revision.id=revision_before_revision.prev WHERE revision.sha1=%s """, (revision,), ) return self.cursor.fetchone() is not None def revision_visited(self, revision: bytes) -> bool: self.cursor.execute( """ SELECT 1 FROM revision_in_origin JOIN revision ON revision.id=revision_in_origin.revision WHERE revision.sha1=%s """, (revision,), ) return self.cursor.fetchone() is not None - def update_preferred_origin(self, data: Dict[bytes, int]): + def update_preferred_origin(self, data: Dict[bytes, str]): if data: # XXX: this is assuming the revision already exists in the db! It should # be improved by allowing null dates in the revision table. psycopg2.extras.execute_values( self.cursor, """ UPDATE revision - SET origin=V.org - FROM (VALUES %s) AS V(rev, org) - WHERE sha1=V.rev + SET origin=O.id + FROM (VALUES %s) AS V(rev, org) + INNER JOIN origin AS O on (O.url=V.org::unix_path) + WHERE sha1=V.rev """, data.items(), ) data.clear() diff --git a/swh/provenance/provenance.py b/swh/provenance/provenance.py index 76f4ef7..a72bb16 100644 --- a/swh/provenance/provenance.py +++ b/swh/provenance/provenance.py @@ -1,296 +1,285 @@ from datetime import datetime import logging import os from typing import Dict, Generator, Iterable, List, Optional, Set, Tuple import psycopg2 from typing_extensions import Literal, Protocol, TypedDict, runtime_checkable from .model import DirectoryEntry, FileEntry, OriginEntry, RevisionEntry # XXX: this protocol doesn't make much sense now that flavours have been delegated to # another class, lower in the callstack. @runtime_checkable class ProvenanceInterface(Protocol): raise_on_commit: bool = False def commit(self): """Commit currently ongoing transactions in the backend DB""" ... def content_add_to_directory( self, directory: DirectoryEntry, blob: FileEntry, prefix: bytes ) -> None: ... def content_add_to_revision( self, revision: RevisionEntry, blob: FileEntry, prefix: bytes ) -> None: ... def content_find_first( self, blob: bytes ) -> Optional[Tuple[bytes, bytes, datetime, bytes]]: ... def content_find_all( self, blob: bytes, limit: Optional[int] = None ) -> Generator[Tuple[bytes, bytes, datetime, bytes], None, None]: ... def content_get_early_date(self, blob: FileEntry) -> Optional[datetime]: ... def content_get_early_dates( self, blobs: Iterable[FileEntry] ) -> Dict[bytes, datetime]: ... def content_set_early_date(self, blob: FileEntry, date: datetime) -> None: ... def directory_add_to_revision( self, revision: RevisionEntry, directory: DirectoryEntry, path: bytes ) -> None: ... def directory_get_date_in_isochrone_frontier( self, directory: DirectoryEntry ) -> Optional[datetime]: ... def directory_get_dates_in_isochrone_frontier( self, dirs: Iterable[DirectoryEntry] ) -> Dict[bytes, datetime]: ... def directory_set_date_in_isochrone_frontier( self, directory: DirectoryEntry, date: datetime ) -> None: ... - def origin_get_id(self, origin: OriginEntry) -> int: - ... - def revision_add(self, revision: RevisionEntry) -> None: ... def revision_add_before_revision( self, relative: RevisionEntry, revision: RevisionEntry ) -> None: ... def revision_add_to_origin( self, origin: OriginEntry, revision: RevisionEntry ) -> None: ... def revision_get_early_date(self, revision: RevisionEntry) -> Optional[datetime]: ... - def revision_get_preferred_origin(self, revision: RevisionEntry) -> Optional[int]: + def revision_get_preferred_origin(self, revision: RevisionEntry) -> Optional[str]: ... def revision_in_history(self, revision: RevisionEntry) -> bool: ... def revision_set_preferred_origin( self, origin: OriginEntry, revision: RevisionEntry ) -> None: ... def revision_visited(self, revision: RevisionEntry) -> bool: ... class DatetimeCache(TypedDict): data: Dict[bytes, datetime] added: Set[bytes] class OriginCache(TypedDict): - data: Dict[bytes, int] # TODO: we should switch to use Url instead + data: Dict[bytes, str] added: Set[bytes] class ProvenanceCache(TypedDict): content: DatetimeCache directory: DatetimeCache revision: DatetimeCache # below are insertion caches only content_in_revision: Set[Tuple[bytes, bytes, bytes]] content_in_directory: Set[Tuple[bytes, bytes, bytes]] directory_in_revision: Set[Tuple[bytes, bytes, bytes]] # these two are for the origin layer revision_before_revision: Dict[bytes, Set[bytes]] - revision_in_origin: Set[Tuple[bytes, int]] + revision_in_origin: Set[Tuple[bytes, str]] revision_preferred_origin: OriginCache def new_cache(): return ProvenanceCache( content=DatetimeCache(data={}, added=set()), directory=DatetimeCache(data={}, added=set()), revision=DatetimeCache(data={}, added=set()), content_in_revision=set(), content_in_directory=set(), directory_in_revision=set(), revision_before_revision={}, revision_in_origin=set(), revision_preferred_origin=OriginCache(data={}, added=set()), ) # TODO: maybe move this to a separate file class ProvenanceBackend: raise_on_commit: bool = False def __init__(self, conn: psycopg2.extensions.connection): from .postgresql.provenancedb_base import ProvenanceDBBase # TODO: this class should not know what the actual used DB is. self.storage: ProvenanceDBBase flavor = ProvenanceDBBase(conn).flavor if flavor == "with-path": from .postgresql.provenancedb_with_path import ProvenanceWithPathDB self.storage = ProvenanceWithPathDB(conn) else: from .postgresql.provenancedb_without_path import ProvenanceWithoutPathDB self.storage = ProvenanceWithoutPathDB(conn) self.cache: ProvenanceCache = new_cache() def clear_caches(self): self.cache = new_cache() def commit(self): # TODO: for now we just forward the cache. This should be improved! while not self.storage.commit(self.cache, raise_on_commit=self.raise_on_commit): logging.warning( f"Unable to commit cached information {self.write_cache}. Retrying..." ) self.clear_caches() def content_add_to_directory( self, directory: DirectoryEntry, blob: FileEntry, prefix: bytes ): self.cache["content_in_directory"].add( (blob.id, directory.id, normalize(os.path.join(prefix, blob.name))) ) def content_add_to_revision( self, revision: RevisionEntry, blob: FileEntry, prefix: bytes ): self.cache["content_in_revision"].add( (blob.id, revision.id, normalize(os.path.join(prefix, blob.name))) ) def content_find_first( self, blob: bytes ) -> Optional[Tuple[bytes, bytes, datetime, bytes]]: return self.storage.content_find_first(blob) def content_find_all( self, blob: bytes, limit: Optional[int] = None ) -> Generator[Tuple[bytes, bytes, datetime, bytes], None, None]: yield from self.storage.content_find_all(blob, limit=limit) def content_get_early_date(self, blob: FileEntry) -> Optional[datetime]: return self.get_dates("content", [blob.id]).get(blob.id, None) def content_get_early_dates( self, blobs: Iterable[FileEntry] ) -> Dict[bytes, datetime]: return self.get_dates("content", [blob.id for blob in blobs]) def content_set_early_date(self, blob: FileEntry, date: datetime): self.cache["content"]["data"][blob.id] = date self.cache["content"]["added"].add(blob.id) def directory_add_to_revision( self, revision: RevisionEntry, directory: DirectoryEntry, path: bytes ): self.cache["directory_in_revision"].add( (directory.id, revision.id, normalize(path)) ) def directory_get_date_in_isochrone_frontier( self, directory: DirectoryEntry ) -> Optional[datetime]: return self.get_dates("directory", [directory.id]).get(directory.id, None) def directory_get_dates_in_isochrone_frontier( self, dirs: Iterable[DirectoryEntry] ) -> Dict[bytes, datetime]: return self.get_dates("directory", [directory.id for directory in dirs]) def directory_set_date_in_isochrone_frontier( self, directory: DirectoryEntry, date: datetime ): self.cache["directory"]["data"][directory.id] = date self.cache["directory"]["added"].add(directory.id) def get_dates( self, entity: Literal["content", "revision", "directory"], ids: List[bytes] ) -> Dict[bytes, datetime]: cache = self.cache[entity] missing_ids = set(id for id in ids if id not in cache) if missing_ids: cache["data"].update(self.storage.get_dates(entity, list(missing_ids))) return {sha1: cache["data"][sha1] for sha1 in ids if sha1 in cache["data"]} - def origin_get_id(self, origin: OriginEntry) -> int: - if origin.id is None: - return self.storage.origin_get_id(origin.url) - else: - return origin.id - def revision_add(self, revision: RevisionEntry): # Add current revision to the compact DB assert revision.date is not None self.cache["revision"]["data"][revision.id] = revision.date self.cache["revision"]["added"].add(revision.id) def revision_add_before_revision( self, relative: RevisionEntry, revision: RevisionEntry ): self.cache["revision_before_revision"].setdefault(revision.id, set()).add( relative.id ) def revision_add_to_origin(self, origin: OriginEntry, revision: RevisionEntry): - assert origin.id is not None - self.cache["revision_in_origin"].add((revision.id, origin.id)) + self.cache["revision_in_origin"].add((revision.id, origin.url)) def revision_get_early_date(self, revision: RevisionEntry) -> Optional[datetime]: return self.get_dates("revision", [revision.id]).get(revision.id, None) - def revision_get_preferred_origin(self, revision: RevisionEntry) -> Optional[int]: + def revision_get_preferred_origin(self, revision: RevisionEntry) -> Optional[str]: if revision.id not in self.cache["revision_preferred_origin"]["data"]: - origin = self.storage.revision_get_preferred_origin(revision.id) - if origin is not None: - self.cache["revision_preferred_origin"]["data"][revision.id] = origin + url = self.storage.revision_get_preferred_origin(revision.id) + if url is not None: + self.cache["revision_preferred_origin"]["data"][revision.id] = url return self.cache["revision_preferred_origin"]["data"].get(revision.id) def revision_in_history(self, revision: RevisionEntry) -> bool: return revision.id in self.cache[ "revision_before_revision" ] or self.storage.revision_in_history(revision.id) def revision_set_preferred_origin( self, origin: OriginEntry, revision: RevisionEntry ): - assert origin.id is not None - self.cache["revision_preferred_origin"]["data"][revision.id] = origin.id + self.cache["revision_preferred_origin"]["data"][revision.id] = origin.url self.cache["revision_preferred_origin"]["added"].add(revision.id) def revision_visited(self, revision: RevisionEntry) -> bool: return revision.id in dict( self.cache["revision_in_origin"] ) or self.storage.revision_visited(revision.id) def normalize(path: bytes) -> bytes: return path[2:] if path.startswith(bytes("." + os.path.sep, "utf-8")) else path