diff --git a/swh/provenance/postgresql/provenancedb_base.py b/swh/provenance/postgresql/provenancedb_base.py index 87e1a14..8dce3f7 100644 --- a/swh/provenance/postgresql/provenancedb_base.py +++ b/swh/provenance/postgresql/provenancedb_base.py @@ -1,318 +1,320 @@ from datetime import datetime import itertools import logging from typing import Any, Dict, List, Optional import psycopg2 import psycopg2.extras from ..model import DirectoryEntry, FileEntry from ..origin import OriginEntry from ..revision import RevisionEntry class ProvenanceDBBase: def __init__(self, conn: psycopg2.extensions.connection): # TODO: consider adding a mutex for thread safety conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT) conn.set_session(autocommit=True) self.conn = conn self.cursor = self.conn.cursor() + # XXX: not sure this is the best place to do it! + self.cursor.execute("SET timezone TO 'UTC'") self.insert_cache: Dict[str, Any] = {} self.remove_cache: Dict[str, Any] = {} self.select_cache: Dict[str, Any] = {} self.clear_caches() def clear_caches(self): self.insert_cache = { "content": dict(), "content_early_in_rev": set(), "content_in_dir": set(), "directory": dict(), "directory_in_rev": set(), "revision": dict(), "revision_before_rev": list(), "revision_in_org": list(), } self.remove_cache = {"directory": dict()} self.select_cache = {"content": dict(), "directory": dict(), "revision": dict()} def commit(self): result = False try: self.insert_all() self.clear_caches() result = True except Exception as error: # Unexpected error occurred, rollback all changes and log message logging.error(f"Unexpected error: {error}") return result def content_get_early_date(self, blob: FileEntry) -> Optional[datetime]: # First check if the date is being modified by current transection. date = self.insert_cache["content"].get(blob.id, None) if date is None: # If not, check whether it's been query before date = self.select_cache["content"].get(blob.id, None) if date is None: # Otherwise, query the database and cache the value self.cursor.execute( """SELECT date FROM content WHERE sha1=%s""", (blob.id,) ) row = self.cursor.fetchone() date = row[0] if row is not None else None self.select_cache["content"][blob.id] = date return date def content_get_early_dates(self, blobs: List[FileEntry]) -> Dict[bytes, datetime]: dates = {} pending = [] for blob in blobs: # First check if the date is being modified by current transection. date = self.insert_cache["content"].get(blob.id, None) if date is not None: dates[blob.id] = date else: # If not, check whether it's been query before date = self.select_cache["content"].get(blob.id, None) if date is not None: dates[blob.id] = date else: pending.append(blob.id) if pending: # Otherwise, query the database and cache the values values = ", ".join(itertools.repeat("%s", len(pending))) self.cursor.execute( f"""SELECT sha1, date FROM content WHERE sha1 IN ({values})""", tuple(pending), ) for row in self.cursor.fetchall(): dates[row[0]] = row[1] self.select_cache["content"][row[0]] = row[1] return dates def content_set_early_date(self, blob: FileEntry, date: datetime): self.insert_cache["content"][blob.id] = date def directory_get_date_in_isochrone_frontier( self, directory: DirectoryEntry ) -> Optional[datetime]: # First check if the date is being modified by current transection. date = self.insert_cache["directory"].get(directory.id, None) if date is None and directory.id not in self.remove_cache["directory"]: # If not, check whether it's been query before date = self.select_cache["directory"].get(directory.id, None) if date is None: # Otherwise, query the database and cache the value self.cursor.execute( """SELECT date FROM directory WHERE sha1=%s""", (directory.id,) ) row = self.cursor.fetchone() date = row[0] if row is not None else None self.select_cache["directory"][directory.id] = date return date def directory_get_dates_in_isochrone_frontier( self, dirs: List[DirectoryEntry] ) -> Dict[bytes, datetime]: dates = {} pending = [] for directory in dirs: # First check if the date is being modified by current transection. date = self.insert_cache["directory"].get(directory.id, None) if date is not None: dates[directory.id] = date elif directory.id not in self.remove_cache["directory"]: # If not, check whether it's been query before date = self.select_cache["directory"].get(directory.id, None) if date is not None: dates[directory.id] = date else: pending.append(directory.id) if pending: # Otherwise, query the database and cache the values values = ", ".join(itertools.repeat("%s", len(pending))) self.cursor.execute( f"""SELECT sha1, date FROM directory WHERE sha1 IN ({values})""", tuple(pending), ) for row in self.cursor.fetchall(): dates[row[0]] = row[1] self.select_cache["directory"][row[0]] = row[1] return dates def directory_invalidate_in_isochrone_frontier(self, directory: DirectoryEntry): self.remove_cache["directory"][directory.id] = None self.insert_cache["directory"].pop(directory.id, None) def directory_set_date_in_isochrone_frontier( self, directory: DirectoryEntry, date: datetime ): self.insert_cache["directory"][directory.id] = date self.remove_cache["directory"].pop(directory.id, None) def insert_all(self): # Performe insertions with cached information if self.insert_cache["content"]: psycopg2.extras.execute_values( self.cursor, """ LOCK TABLE ONLY content; INSERT INTO content(sha1, date) VALUES %s ON CONFLICT (sha1) DO UPDATE SET date=LEAST(EXCLUDED.date,content.date) """, self.insert_cache["content"].items(), ) self.insert_cache["content"].clear() if self.insert_cache["directory"]: psycopg2.extras.execute_values( self.cursor, """ LOCK TABLE ONLY directory; INSERT INTO directory(sha1, date) VALUES %s ON CONFLICT (sha1) DO UPDATE SET date=LEAST(EXCLUDED.date,directory.date) """, self.insert_cache["directory"].items(), ) self.insert_cache["directory"].clear() if self.insert_cache["revision"]: psycopg2.extras.execute_values( self.cursor, """ LOCK TABLE ONLY revision; INSERT INTO revision(sha1, date) VALUES %s ON CONFLICT (sha1) DO UPDATE SET date=LEAST(EXCLUDED.date,revision.date) """, self.insert_cache["revision"].items(), ) self.insert_cache["revision"].clear() # Relations should come after ids for elements were resolved if self.insert_cache["content_early_in_rev"]: self.insert_location("content", "revision", "content_early_in_rev") if self.insert_cache["content_in_dir"]: self.insert_location("content", "directory", "content_in_dir") if self.insert_cache["directory_in_rev"]: self.insert_location("directory", "revision", "directory_in_rev") # if self.insert_cache["revision_before_rev"]: # psycopg2.extras.execute_values( # self.cursor, # """ # LOCK TABLE ONLY revision_before_rev; # INSERT INTO revision_before_rev VALUES %s # ON CONFLICT DO NOTHING # """, # self.insert_cache["revision_before_rev"], # ) # self.insert_cache["revision_before_rev"].clear() # if self.insert_cache["revision_in_org"]: # psycopg2.extras.execute_values( # self.cursor, # """ # LOCK TABLE ONLY revision_in_org; # INSERT INTO revision_in_org VALUES %s # ON CONFLICT DO NOTHING # """, # self.insert_cache["revision_in_org"], # ) # self.insert_cache["revision_in_org"].clear() def origin_get_id(self, origin: OriginEntry) -> int: if origin.id is None: # Insert origin in the DB and return the assigned id self.cursor.execute( """ LOCK TABLE ONLY origin; INSERT INTO origin(url) VALUES (%s) ON CONFLICT DO NOTHING RETURNING id """, (origin.url,), ) return self.cursor.fetchone()[0] else: return origin.id def revision_add(self, revision: RevisionEntry): # Add current revision to the compact DB self.insert_cache["revision"][revision.id] = revision.date def revision_add_before_revision( self, relative: RevisionEntry, revision: RevisionEntry ): self.insert_cache["revision_before_rev"].append((revision.id, relative.id)) def revision_add_to_origin(self, origin: OriginEntry, revision: RevisionEntry): self.insert_cache["revision_in_org"].append((revision.id, origin.id)) def revision_get_early_date(self, revision: RevisionEntry) -> Optional[datetime]: date = self.insert_cache["revision"].get(revision.id, None) if date is None: # If not, check whether it's been query before date = self.select_cache["revision"].get(revision.id, None) if date is None: # Otherwise, query the database and cache the value self.cursor.execute( """SELECT date FROM revision WHERE sha1=%s""", (revision.id,) ) row = self.cursor.fetchone() date = row[0] if row is not None else None self.select_cache["revision"][revision.id] = date return date def revision_get_preferred_origin(self, revision: RevisionEntry) -> int: # TODO: adapt this method to consider cached values self.cursor.execute( """SELECT COALESCE(org,0) FROM revision WHERE sha1=%s""", (revision.id,) ) row = self.cursor.fetchone() # None means revision is not in database; # 0 means revision has no preferred origin return row[0] if row is not None and row[0] != 0 else None def revision_in_history(self, revision: RevisionEntry) -> bool: # TODO: adapt this method to consider cached values self.cursor.execute( """ SELECT 1 FROM revision_before_rev JOIN revision ON revision.id=revision_before_rev.prev WHERE revision.sha1=%s """, (revision.id,), ) return self.cursor.fetchone() is not None def revision_set_preferred_origin( self, origin: OriginEntry, revision: RevisionEntry ): # TODO: adapt this method to consider cached values self.cursor.execute( """UPDATE revision SET org=%s WHERE sha1=%s""", (origin.id, revision.id) ) def revision_visited(self, revision: RevisionEntry) -> bool: # TODO: adapt this method to consider cached values self.cursor.execute( """ SELECT 1 FROM revision_in_org JOIN revision ON revision.id=revision_in_org.rev WHERE revision.sha1=%s """, (revision.id,), ) return self.cursor.fetchone() is not None diff --git a/swh/provenance/provenance.py b/swh/provenance/provenance.py index 36e5357..2df1b53 100644 --- a/swh/provenance/provenance.py +++ b/swh/provenance/provenance.py @@ -1,442 +1,445 @@ from datetime import datetime, timezone import logging import os from typing import Dict, Generator, List, Optional, Tuple from typing_extensions import Protocol, runtime_checkable from swh.model.hashutil import hash_to_hex from .archive import ArchiveInterface from .model import DirectoryEntry, FileEntry, OriginEntry, RevisionEntry UTCMIN = datetime.min.replace(tzinfo=timezone.utc) @runtime_checkable class ProvenanceInterface(Protocol): def commit(self): """Commit currently ongoing transactions in the backend DB""" ... def content_add_to_directory( self, directory: DirectoryEntry, blob: FileEntry, prefix: bytes ) -> None: ... def content_add_to_revision( self, revision: RevisionEntry, blob: FileEntry, prefix: bytes ) -> None: ... def content_find_first( self, blobid: bytes ) -> Optional[Tuple[bytes, bytes, datetime, bytes]]: ... def content_find_all( self, blobid: bytes, limit: Optional[int] = None ) -> Generator[Tuple[bytes, bytes, datetime, bytes], None, None]: ... def content_get_early_date(self, blob: FileEntry) -> Optional[datetime]: ... def content_get_early_dates(self, blobs: List[FileEntry]) -> Dict[bytes, datetime]: ... def content_set_early_date(self, blob: FileEntry, date: datetime) -> None: ... def directory_add_to_revision( self, revision: RevisionEntry, directory: DirectoryEntry, path: bytes ) -> None: ... def directory_get_date_in_isochrone_frontier( self, directory: DirectoryEntry ) -> Optional[datetime]: ... def directory_get_dates_in_isochrone_frontier( self, dirs: List[DirectoryEntry] ) -> Dict[bytes, datetime]: ... def directory_invalidate_in_isochrone_frontier( self, directory: DirectoryEntry ) -> None: ... def directory_set_date_in_isochrone_frontier( self, directory: DirectoryEntry, date: datetime ) -> None: ... def origin_get_id(self, origin: OriginEntry) -> int: ... def revision_add(self, revision: RevisionEntry) -> None: ... def revision_add_before_revision( self, relative: RevisionEntry, revision: RevisionEntry ) -> None: ... def revision_add_to_origin( self, origin: OriginEntry, revision: RevisionEntry ) -> None: ... def revision_get_early_date(self, revision: RevisionEntry) -> Optional[datetime]: ... def revision_get_preferred_origin(self, revision: RevisionEntry) -> int: ... def revision_in_history(self, revision: RevisionEntry) -> bool: ... def revision_set_preferred_origin( self, origin: OriginEntry, revision: RevisionEntry ) -> None: ... def revision_visited(self, revision: RevisionEntry) -> bool: ... def flatten_directory( archive: ArchiveInterface, provenance: ProvenanceInterface, directory: DirectoryEntry, ) -> None: stack = [(directory, b"")] while stack: current, prefix = stack.pop() for child in current.ls(archive): if isinstance(child, FileEntry): # Add content to the directory with the computed prefix. provenance.content_add_to_directory(directory, child, prefix) elif isinstance(child, DirectoryEntry): # Recursively walk the child directory. stack.append((child, os.path.join(prefix, child.name))) def origin_add( archive: ArchiveInterface, provenance: ProvenanceInterface, origin: OriginEntry ) -> None: # TODO: refactor to iterate over origin visit statuses and commit only once # per status. origin.id = provenance.origin_get_id(origin) for revision in origin.revisions: origin_add_revision(archive, provenance, origin, revision) # Commit after each revision provenance.commit() # TODO: verify this! def origin_add_revision( archive: ArchiveInterface, provenance: ProvenanceInterface, origin: OriginEntry, revision: RevisionEntry, ) -> None: stack: List[Tuple[Optional[RevisionEntry], RevisionEntry]] = [(None, revision)] while stack: relative, current = stack.pop() # Check if current revision has no preferred origin and update if necessary. preferred = provenance.revision_get_preferred_origin(current) if preferred is None: provenance.revision_set_preferred_origin(origin, current) ######################################################################## if relative is None: # This revision is pointed directly by the origin. visited = provenance.revision_visited(current) provenance.revision_add_to_origin(origin, current) if not visited: stack.append((current, current)) else: # This revision is a parent of another one in the history of the # relative revision. for parent in current.parents(archive): visited = provenance.revision_visited(parent) if not visited: # The parent revision has never been seen before pointing # directly to an origin. known = provenance.revision_in_history(parent) if known: # The parent revision is already known in some other # revision's history. We should point it directly to # the origin and (eventually) walk its history. stack.append((None, parent)) else: # The parent revision was never seen before. We should # walk its history and associate it with the same # relative revision. provenance.revision_add_before_revision(relative, parent) stack.append((relative, parent)) else: # The parent revision already points to an origin, so its # history was properly processed before. We just need to # make sure it points to the current origin as well. provenance.revision_add_to_origin(origin, parent) def revision_add( provenance: ProvenanceInterface, archive: ArchiveInterface, revision: RevisionEntry, lower: bool = True, mindepth: int = 1, ) -> None: assert revision.date is not None assert revision.root is not None - logging.debug(f"Processing revision {hash_to_hex(revision.id)}...") # Processed content starting from the revision's root directory. date = provenance.revision_get_early_date(revision) if date is None or revision.date < date: + logging.debug( + f"Processing revision {hash_to_hex(revision.id)}" + f" (known date {date} / revision date {revision.date})..." + ) provenance.revision_add(revision) # TODO: add file size filtering revision_process_content( archive, provenance, revision, DirectoryEntry(revision.root, b""), lower=lower, mindepth=mindepth, ) - # TODO: improve this! Maybe using a max attempt counter? - # Ideally Provenance class should guarantee that a commit never fails. - logging.debug(f"Attempt to commit revision {hash_to_hex(revision.id)}...") - while not provenance.commit(): - logging.warning( - f"Could not commit revision {hash_to_hex(revision.id)}. Retrying..." - ) - logging.debug(f"Revision {hash_to_hex(revision.id)} successfully committed!") + # TODO: improve this! Maybe using a max attempt counter? + # Ideally Provenance class should guarantee that a commit never fails. + logging.debug(f"Attempt to commit revision {hash_to_hex(revision.id)}...") + while not provenance.commit(): + logging.warning( + f"Could not commit revision {hash_to_hex(revision.id)}. Retrying..." + ) + logging.debug(f"Revision {hash_to_hex(revision.id)} successfully committed!") class IsochroneNode: def __init__( self, entry: DirectoryEntry, dates: Dict[bytes, datetime] = {}, depth: int = 0 ): self.entry = entry self.depth = depth self.date = dates.get(self.entry.id, None) self.known = self.date is not None self.children: List[IsochroneNode] = [] self.maxdate: Optional[datetime] = None def add_child( self, child: DirectoryEntry, dates: Dict[bytes, datetime] = {} ) -> "IsochroneNode": assert isinstance(self.entry, DirectoryEntry) and self.date is None node = IsochroneNode(child, dates=dates, depth=self.depth + 1) self.children.append(node) return node def build_isochrone_graph( archive: ArchiveInterface, provenance: ProvenanceInterface, revision: RevisionEntry, directory: DirectoryEntry, ) -> IsochroneNode: assert revision.date is not None assert revision.root == directory.id # Build the nodes structure root = IsochroneNode(directory) root.date = provenance.directory_get_date_in_isochrone_frontier(directory) root.known = root.date is not None stack = [root] logging.debug( f"Recursively creating graph for revision {hash_to_hex(revision.id)}..." ) while stack: current = stack.pop() assert isinstance(current.entry, DirectoryEntry) if current.date is None or current.date > revision.date: # If current directory has an associated date in the isochrone frontier that # is greater or equal to the current revision's one, it should be ignored as # the revision is being processed out of order. if current.date is not None and current.date > revision.date: provenance.directory_invalidate_in_isochrone_frontier(current.entry) current.date = None current.known = False # Pre-query all known dates for content/directories in the current directory # for the provenance object to have them cached and (potentially) improve # performance. ddates = provenance.directory_get_dates_in_isochrone_frontier( [ child for child in current.entry.ls(archive) if isinstance(child, DirectoryEntry) ] ) fdates = provenance.content_get_early_dates( [ child for child in current.entry.ls(archive) if isinstance(child, FileEntry) ] ) for child in current.entry.ls(archive): # Recursively analyse directory nodes. if isinstance(child, DirectoryEntry): node = current.add_child(child, dates=ddates) stack.append(node) else: # WARNING: there is a type checking issue here! current.add_child(child, dates=fdates) logging.debug( f"Isochrone graph for revision {hash_to_hex(revision.id)} successfully created!" ) # Precalculate max known date for each node in the graph (only directory nodes are # pushed to the stack). stack = [root] logging.debug(f"Computing maxdates for revision {hash_to_hex(revision.id)}...") while stack: current = stack.pop() # Current directory node is known if it already has an assigned date (ie. it was # already seen as an isochrone frontier). if not current.known: if any(map(lambda child: child.maxdate is None, current.children)): # Current node needs to be analysed again after its children. stack.append(current) for child in current.children: if isinstance(child.entry, FileEntry): # A file node is known if it already has an assigned date (ie. # is was processed before) if child.known: assert child.date is not None # Just use its known date. child.maxdate = child.date else: # Use current revision date. child.maxdate = revision.date else: stack.append(child) else: maxdates = [ child.maxdate for child in current.children if child.maxdate is not None # mostly to please mypy ] current.maxdate = max(maxdates) if maxdates else UTCMIN # If all content is already known, update current directory info. current.known = all(map(lambda child: child.known, current.children)) else: # Directory node in the frontier, just use its known date. current.maxdate = current.date logging.debug( f"Maxdates for revision {hash_to_hex(revision.id)} successfully computed!" ) return root def revision_process_content( archive: ArchiveInterface, provenance: ProvenanceInterface, revision: RevisionEntry, root: DirectoryEntry, lower: bool = True, mindepth: int = 1, ): assert revision.date is not None logging.debug( f"Building isochrone graph for revision {hash_to_hex(revision.id)}..." ) stack = [(build_isochrone_graph(archive, provenance, revision, root), root.name)] logging.debug( f"Isochrone graph for revision {hash_to_hex(revision.id)} successfully built!" ) while stack: current, path = stack.pop() assert isinstance(current.entry, DirectoryEntry) if current.date is not None: assert current.date <= revision.date # Current directory is an outer isochrone frontier for a previously # processed revision. It should be reused as is. provenance.directory_add_to_revision(revision, current.entry, path) else: # Current directory is not an outer isochrone frontier for any previous # revision. It might be eligible for this one. if is_new_frontier(current, revision, lower=lower, mindepth=mindepth): assert current.maxdate is not None # Outer frontier should be moved to current position in the isochrone # graph. This is the first time this directory is found in the isochrone # frontier. provenance.directory_set_date_in_isochrone_frontier( current.entry, current.maxdate ) provenance.directory_add_to_revision(revision, current.entry, path) flatten_directory(archive, provenance, current.entry) else: # No point moving the frontier here. Either there are no files or they # are being seen for the first time here. Add all blobs to current # revision updating date if necessary, and recursively analyse # subdirectories as candidates to the outer frontier. for child in current.children: if isinstance(child.entry, FileEntry): blob = child.entry if child.date is None or revision.date < child.date: provenance.content_set_early_date(blob, revision.date) provenance.content_add_to_revision(revision, blob, path) else: stack.append((child, os.path.join(path, child.entry.name))) def is_new_frontier( node: IsochroneNode, revision: RevisionEntry, lower: bool = True, mindepth: int = 1 ) -> bool: assert node.maxdate is not None and revision.date is not None # The only real condition for a directory to be a frontier is that its content is # already known and its maxdate is less (or equal) than current revision's date. # Checking mindepth is meant to skip root directories (or any arbitrary depth) to # improve the result. The option lower tries to maximize the reusage rate of # previously defined frontiers by keeping them low in the directory tree. return ( node.known # all content in node was already seen before and node.maxdate <= revision.date # all content is earlier than revision and node.depth >= mindepth # current node is deeper than the min allowed depth and (has_blobs(node) if lower else True) # there is at least one blob in it ) def has_blobs(node: IsochroneNode) -> bool: # We may want to look for files in different ways to decide whether to define a # frontier or not: # 1. Only files in current node: return any(map(lambda child: isinstance(child.entry, FileEntry), node.children)) # 2. Files anywhere in the isochrone graph # stack = [node] # while stack: # current = stack.pop() # if any( # map(lambda child: isinstance(child.entry, FileEntry), current.children)): # return True # else: # # All children are directory entries. # stack.extend(current.children) # return False # 3. Files in the intermediate directories between current node and any previously # defined frontier: # TODO: complete this case! # return any( # map(lambda child: isinstance(child.entry, FileEntry), node.children) # ) or all( # map( # lambda child: ( # not (isinstance(child.entry, DirectoryEntry) and child.date is None) # ) # or has_blobs(child), # node.children, # ) # ) diff --git a/swh/provenance/tests/conftest.py b/swh/provenance/tests/conftest.py index 802c303..7500acb 100644 --- a/swh/provenance/tests/conftest.py +++ b/swh/provenance/tests/conftest.py @@ -1,301 +1,301 @@ # Copyright (C) 2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import glob from os import path import re from typing import Iterable, Iterator, List import pytest from typing_extensions import TypedDict from swh.core.api.serializers import msgpack_loads from swh.core.db import BaseDb from swh.core.db.pytest_plugin import postgresql_fact from swh.core.utils import numfile_sortkey as sortkey from swh.model.model import Content, Directory, DirectoryEntry, Revision from swh.model.tests.swh_model_data import TEST_OBJECTS import swh.provenance from swh.provenance.postgresql.archive import ArchivePostgreSQL from swh.provenance.storage.archive import ArchiveStorage SQL_DIR = path.join(path.dirname(swh.provenance.__file__), "sql") SQL_FILES = [ sqlfile for sqlfile in sorted(glob.glob(path.join(SQL_DIR, "*.sql")), key=sortkey) if "-without-path-" not in sqlfile ] provenance_db = postgresql_fact( - "postgresql_proc", db_name="provenance", dump_files=SQL_FILES + "postgresql_proc", dbname="provenance", dump_files=SQL_FILES ) @pytest.fixture def provenance(provenance_db): """return a working and initialized provenance db""" from swh.provenance.postgresql.provenancedb_with_path import ( ProvenanceWithPathDB as ProvenanceDB, ) BaseDb.adapt_conn(provenance_db) return ProvenanceDB(provenance_db) @pytest.fixture def swh_storage_with_objects(swh_storage): """return a Storage object (postgresql-based by default) with a few of each object type in it The inserted content comes from swh.model.tests.swh_model_data. """ for obj_type in ( "content", "skipped_content", "directory", "revision", "release", "snapshot", "origin", "origin_visit", "origin_visit_status", ): getattr(swh_storage, f"{obj_type}_add")(TEST_OBJECTS[obj_type]) return swh_storage @pytest.fixture def archive_direct(swh_storage_with_objects): return ArchivePostgreSQL(swh_storage_with_objects.get_db().conn) @pytest.fixture def archive_api(swh_storage_with_objects): return ArchiveStorage(swh_storage_with_objects) @pytest.fixture def archive(swh_storage_with_objects): """Return a ArchivePostgreSQL based StorageInterface object""" # this is a workaround to prevent tests from hanging because of an unclosed # transaction. # TODO: refactor the ArchivePostgreSQL to properly deal with # transactions and get rif of this fixture archive = ArchivePostgreSQL(conn=swh_storage_with_objects.get_db().conn) yield archive archive.conn.rollback() def get_datafile(fname): return path.join(path.dirname(__file__), "data", fname) @pytest.fixture def CMDBTS_data(): # imported git tree is https://github.com/grouss/CMDBTS rev 4c5551b496 # ([xxx] is the timestamp): # o - [1609757158] first commit 35ccb8dd1b53d2d8a5c1375eb513ef2beaa79ae5 # | `- README.md * 43f3c871310a8e524004e91f033e7fb3b0bc8475 # o - [1610644094] Reset Empty repository 840b91df68e9549c156942ddd5002111efa15604 # | # o - [1610644094] R0000 9e36e095b79e36a3da104ce272989b39cd68aefd # | `- Red/Blue/Green/a * 6dc7e44ead5c0e300fe94448c3e046dfe33ad4d1 # o - [1610644097] R0001 bfbfcc72ae7fc35d6941386c36280512e6b38440 # | |- Red/Blue/Green/a 6dc7e44ead5c0e300fe94448c3e046dfe33ad4d1 # | `- Red/Blue/Green/b * 9f6e04be05297905f1275d3f4e0bb0583458b2e8 # o - [1610644099] R0002 0a31c9d509783abfd08f9fdfcd3acae20f17dfd0 # | |- Red/Blue/Green/a 6dc7e44ead5c0e300fe94448c3e046dfe33ad4d1 # | |- Red/Blue/Green/b 9f6e04be05297905f1275d3f4e0bb0583458b2e8 # | `- Red/Blue/c * a28fa70e725ebda781e772795ca080cd737b823c # o - [1610644101] R0003 ca6ec564c69efd2e5c70fb05486fd3f794765a04 # | |- Red/Green/a 6dc7e44ead5c0e300fe94448c3e046dfe33ad4d1 # | |- Red/Green/b 9f6e04be05297905f1275d3f4e0bb0583458b2e8 # | `- Red/a 6dc7e44ead5c0e300fe94448c3e046dfe33ad4d1 # o - [1610644103] R0004 fc6e10b7d41b1d56a94091134e3683ce91e80d91 # | |- Red/Blue/Green/a 6dc7e44ead5c0e300fe94448c3e046dfe33ad4d1 # | |- Red/Blue/Green/b 9f6e04be05297905f1275d3f4e0bb0583458b2e8 # | `- Red/Blue/c a28fa70e725ebda781e772795ca080cd737b823c # o - [1610644105] R0005 1d1fcf1816a8a2a77f9b1f342ba11d0fe9fd7f17 # | `- Purple/d * c0229d305adf3edf49f031269a70e3e87665fe88 # o - [1610644107] R0006 9a71f967ae1a125be9b6569cc4eccec0aecabb7c # | `- Purple/Brown/Purple/d c0229d305adf3edf49f031269a70e3e87665fe88 # o - [1610644109] R0007 4fde4ea4494a630030a4bda99d03961d9add00c7 # | |- Dark/Brown/Purple/d c0229d305adf3edf49f031269a70e3e87665fe88 # | `- Dark/d c0229d305adf3edf49f031269a70e3e87665fe88 # o - [1610644111] R0008 ba00e89d47dc820bb32c783af7123ffc6e58b56d # | |- Dark/Brown/Purple/d c0229d305adf3edf49f031269a70e3e87665fe88 # | |- Dark/Brown/Purple/e c0229d305adf3edf49f031269a70e3e87665fe88 # | `- Dark/a 6dc7e44ead5c0e300fe94448c3e046dfe33ad4d1 # o - [1610644113] R0009 55d4dc9471de6144f935daf3c38878155ca274d5 # | |- Dark/Brown/Purple/f * 94ba40161084e8b80943accd9d24e1f9dd47189b # | |- Dark/Brown/Purple/g 94ba40161084e8b80943accd9d24e1f9dd47189b # | `- Dark/f 94ba40161084e8b80943accd9d24e1f9dd47189b # o - [1610644116] R0010 a8939755d0be76cfea136e9e5ebce9bc51c49fef # | |- Dark/Brown/Purple/f 94ba40161084e8b80943accd9d24e1f9dd47189b # | |- Dark/Brown/Purple/g 94ba40161084e8b80943accd9d24e1f9dd47189b # | `- Dark/h * 5e8f9ceaee9dafae2e3210e254fdf170295f8b5b # o - [1610644118] R0011 ca1774a07b6e02c1caa7ae678924efa9259ee7c6 # | |- Paris/Brown/Purple/f 94ba40161084e8b80943accd9d24e1f9dd47189b # | |- Paris/Brown/Purple/g 94ba40161084e8b80943accd9d24e1f9dd47189b # | `- Paris/i * bbd54b961764094b13f10cef733e3725d0a834c3 # o - [1610644120] R0012 611fe71d75b6ea151b06e3845c09777acc783d82 # | |- Paris/Berlin/Purple/f 94ba40161084e8b80943accd9d24e1f9dd47189b # | |- Paris/Berlin/Purple/g 94ba40161084e8b80943accd9d24e1f9dd47189b # | `- Paris/j * 7ce4fe9a22f589fa1656a752ea371b0ebc2106b1 # o - [1610644122] R0013 4c5551b4969eb2160824494d40b8e1f6187fc01e # |- Paris/Berlin/Purple/f 94ba40161084e8b80943accd9d24e1f9dd47189b # |- Paris/Berlin/Purple/g 94ba40161084e8b80943accd9d24e1f9dd47189b # |- Paris/Munich/Purple/f 94ba40161084e8b80943accd9d24e1f9dd47189b # |- Paris/Munich/Purple/g 94ba40161084e8b80943accd9d24e1f9dd47189b # |- Paris/Purple/f 94ba40161084e8b80943accd9d24e1f9dd47189b # |- Paris/Purple/g 94ba40161084e8b80943accd9d24e1f9dd47189b # `- Paris/k * cb79b39935c9392fa5193d9f84a6c35dc9c22c75 data = {"revision": [], "directory": [], "content": []} with open(get_datafile("CMDBTS.msgpack"), "rb") as fobj: for etype, value in msgpack_loads(fobj.read()): data[etype].append(value) return data def filter_dict(d, keys): return {k: v for (k, v) in d.items() if k in keys} @pytest.fixture def storage_and_CMDBTS(swh_storage, CMDBTS_data): swh_storage.content_add_metadata( Content.from_dict(content) for content in CMDBTS_data["content"] ) swh_storage.directory_add( [ Directory( entries=tuple( [ DirectoryEntry.from_dict( filter_dict(entry, ("name", "type", "target", "perms")) ) for entry in dir["entries"] ] ) ) for dir in CMDBTS_data["directory"] ] ) swh_storage.revision_add( Revision.from_dict(revision) for revision in CMDBTS_data["revision"] ) return swh_storage, CMDBTS_data class SynthRelation(TypedDict): path: str src: bytes dst: bytes rel_ts: float class SynthRevision(TypedDict): sha1: bytes date: float msg: str R_C: List[SynthRelation] R_D: List[SynthRelation] D_C: List[SynthRelation] def synthetic_result(filename: str) -> Iterator[SynthRevision]: """Generates dict representations of synthetic revisions found in the synthetic file (from the data/ directory) given as argument of the generator. Generated SynthRevision (typed dict) with the following elements: "sha1": (bytes) sha1 of the revision, "date": (float) timestamp of the revision, "msg": (str) commit message of the revision, "R_C": (list) new R---C relations added by this revision "R_D": (list) new R-D relations added by this revision "D_C": (list) new D-C relations added by this revision Each relation above is a SynthRelation typed dict with: "path": (str) location "src": (bytes) sha1 of the source of the relation "dst": (bytes) sha1 of the destination of the relation "rel_ts": (float) timestamp of the target of the relation (related to the timestamp of the revision) """ with open(get_datafile(filename), "r") as fobj: yield from _parse_synthetic_file(fobj) def _parse_synthetic_file(fobj: Iterable[str]) -> Iterator[SynthRevision]: """Read a 'synthetic' file and generate a dict representation of the synthetic revision for each revision listed in the synthetic file. """ regs = [ "(?PR[0-9]{4})?", "(?P[^| ]*)", "([+] )?(?P[^| +]*?)[/]?", "(?P[RDC]) (?P[0-9a-z]{40})", "(?P-?[0-9]+(.[0-9]+)?)", ] regex = re.compile("^ *" + r" *[|] *".join(regs) + r" *$") current_rev: List[dict] = [] for m in (regex.match(line) for line in fobj): if m: d = m.groupdict() if d["revname"]: if current_rev: yield _mk_synth_rev(current_rev) current_rev.clear() current_rev.append(d) if current_rev: yield _mk_synth_rev(current_rev) def _mk_synth_rev(synth_rev) -> SynthRevision: assert synth_rev[0]["type"] == "R" rev = SynthRevision( sha1=bytes.fromhex(synth_rev[0]["sha1"]), date=float(synth_rev[0]["ts"]), msg=synth_rev[0]["revname"], R_C=[], R_D=[], D_C=[], ) for row in synth_rev[1:]: if row["reltype"] == "R---C": assert row["type"] == "C" rev["R_C"].append( SynthRelation( path=row["path"], src=rev["sha1"], dst=bytes.fromhex(row["sha1"]), rel_ts=float(row["ts"]), ) ) elif row["reltype"] == "R-D": assert row["type"] == "D" rev["R_D"].append( SynthRelation( path=row["path"], src=rev["sha1"], dst=bytes.fromhex(row["sha1"]), rel_ts=float(row["ts"]), ) ) elif row["reltype"] == "D-C": assert row["type"] == "C" rev["D_C"].append( SynthRelation( path=row["path"], src=rev["R_D"][-1]["dst"], dst=bytes.fromhex(row["sha1"]), rel_ts=float(row["ts"]), ) ) return rev