diff --git a/swh/provenance/__init__.py b/swh/provenance/__init__.py index 8705725..7368f72 100644 --- a/swh/provenance/__init__.py +++ b/swh/provenance/__init__.py @@ -1,28 +1,38 @@ -from .archive import ArchiveInterface -from .postgresql.archive import ArchivePostgreSQL +from typing import TYPE_CHECKING + from .postgresql.db_utils import connect -from .storage.archive import ArchiveStorage -from .provenance import ProvenanceInterface + +if TYPE_CHECKING: + from swh.provenance.archive import ArchiveInterface + from swh.provenance.provenance import ProvenanceInterface -def get_archive(cls: str, **kwargs) -> ArchiveInterface: +def get_archive(cls: str, **kwargs) -> "ArchiveInterface": if cls == "api": + from swh.provenance.storage.archive import ArchiveStorage + return ArchiveStorage(**kwargs["storage"]) elif cls == "direct": + from swh.provenance.postgresql.archive import ArchivePostgreSQL + conn = connect(kwargs["db"]) return ArchivePostgreSQL(conn) else: raise NotImplementedError -def get_provenance(cls: str, **kwargs) -> ProvenanceInterface: +def get_provenance(cls: str, **kwargs) -> "ProvenanceInterface": if cls == "local": conn = connect(kwargs["db"]) if kwargs.get("with_path", True): - from .postgresql.provenance_with_path import ProvenanceWithPathDB + from swh.provenance.postgresql.provenancedb_with_path import ( + ProvenanceWithPathDB, + ) return ProvenanceWithPathDB(conn) else: - from .postgresql.provenance_without_path import ProvenanceWithoutPathDB + from swh.provenance.postgresql.provenancedb_without_path import ( + ProvenanceWithoutPathDB, + ) return ProvenanceWithoutPathDB(conn) else: raise NotImplementedError diff --git a/swh/provenance/archive.py b/swh/provenance/archive.py index 86fa12b..42ca2f0 100644 --- a/swh/provenance/archive.py +++ b/swh/provenance/archive.py @@ -1,27 +1,27 @@ from typing import Any, Dict, List +from typing_extensions import Protocol, runtime_checkable -class ArchiveInterface: - def __init__(self, **kwargs): - raise NotImplementedError +@runtime_checkable +class ArchiveInterface(Protocol): def directory_ls(self, id: bytes) -> List[Dict[str, Any]]: - raise NotImplementedError + ... def iter_origins(self): - raise NotImplementedError + ... def iter_origin_visits(self, origin: str): - raise NotImplementedError + ... def iter_origin_visit_statuses(self, origin: str, visit: int): - raise NotImplementedError + ... def release_get(self, ids: List[bytes]): - raise NotImplementedError + ... def revision_get(self, ids: List[bytes]): - raise NotImplementedError + ... def snapshot_get_all_branches(self, snapshot: bytes): - raise NotImplementedError + ... diff --git a/swh/provenance/postgresql/provenancedb_base.py b/swh/provenance/postgresql/provenancedb_base.py index 994f8ed..94555ea 100644 --- a/swh/provenance/postgresql/provenancedb_base.py +++ b/swh/provenance/postgresql/provenancedb_base.py @@ -1,299 +1,298 @@ +from datetime import datetime import itertools import logging +from typing import Any, Dict, List, Optional + import psycopg2 import psycopg2.extras from ..model import DirectoryEntry, FileEntry from ..origin import OriginEntry -from ..provenance import ProvenanceInterface from ..revision import RevisionEntry -from datetime import datetime -from typing import Any, Dict, List, Optional - -class ProvenanceDBBase(ProvenanceInterface): +class ProvenanceDBBase: def __init__(self, conn: psycopg2.extensions.connection): # TODO: consider adding a mutex for thread safety conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT) self.conn = conn self.cursor = self.conn.cursor() self.insert_cache: Dict[str, Any] = {} self.remove_cache: Dict[str, Any] = {} self.select_cache: Dict[str, Any] = {} self.clear_caches() def clear_caches(self): self.insert_cache = { "content": dict(), "content_early_in_rev": set(), "content_in_dir": set(), "directory": dict(), "directory_in_rev": set(), "revision": dict(), "revision_before_rev": list(), "revision_in_org": list(), } self.remove_cache = {"directory": dict()} self.select_cache = {"content": dict(), "directory": dict(), "revision": dict()} def commit(self): result = False try: self.insert_all() self.clear_caches() result = True except Exception as error: # Unexpected error occurred, rollback all changes and log message logging.error(f"Unexpected error: {error}") return result def content_get_early_date(self, blob: FileEntry) -> Optional[datetime]: # First check if the date is being modified by current transection. date = self.insert_cache["content"].get(blob.id, None) if date is None: # If not, check whether it's been query before date = self.select_cache["content"].get(blob.id, None) if date is None: # Otherwise, query the database and cache the value self.cursor.execute( """SELECT date FROM content WHERE sha1=%s""", (blob.id,) ) row = self.cursor.fetchone() date = row[0] if row is not None else None self.select_cache["content"][blob.id] = date return date def content_get_early_dates(self, blobs: List[FileEntry]) -> Dict[bytes, datetime]: dates = {} pending = [] for blob in blobs: # First check if the date is being modified by current transection. date = self.insert_cache["content"].get(blob.id, None) if date is not None: dates[blob.id] = date else: # If not, check whether it's been query before date = self.select_cache["content"].get(blob.id, None) if date is not None: dates[blob.id] = date else: pending.append(blob.id) if pending: # Otherwise, query the database and cache the values values = ", ".join(itertools.repeat("%s", len(pending))) self.cursor.execute( f"""SELECT sha1, date FROM content WHERE sha1 IN ({values})""", tuple(pending), ) for row in self.cursor.fetchall(): dates[row[0]] = row[1] self.select_cache["content"][row[0]] = row[1] return dates def content_set_early_date(self, blob: FileEntry, date: datetime): self.insert_cache["content"][blob.id] = date def directory_get_date_in_isochrone_frontier( self, directory: DirectoryEntry ) -> Optional[datetime]: # First check if the date is being modified by current transection. date = self.insert_cache["directory"].get(directory.id, None) if date is None and directory.id not in self.remove_cache["directory"]: # If not, check whether it's been query before date = self.select_cache["directory"].get(directory.id, None) if date is None: # Otherwise, query the database and cache the value self.cursor.execute( """SELECT date FROM directory WHERE sha1=%s""", (directory.id,) ) row = self.cursor.fetchone() date = row[0] if row is not None else None self.select_cache["directory"][directory.id] = date return date def directory_get_dates_in_isochrone_frontier( self, dirs: List[DirectoryEntry] ) -> Dict[bytes, datetime]: dates = {} pending = [] for directory in dirs: # First check if the date is being modified by current transection. date = self.insert_cache["directory"].get(directory.id, None) if date is not None: dates[directory.id] = date elif directory.id not in self.remove_cache["directory"]: # If not, check whether it's been query before date = self.select_cache["directory"].get(directory.id, None) if date is not None: dates[directory.id] = date else: pending.append(directory.id) if pending: # Otherwise, query the database and cache the values values = ", ".join(itertools.repeat("%s", len(pending))) self.cursor.execute( f"""SELECT sha1, date FROM directory WHERE sha1 IN ({values})""", tuple(pending), ) for row in self.cursor.fetchall(): dates[row[0]] = row[1] self.select_cache["directory"][row[0]] = row[1] return dates def directory_invalidate_in_isochrone_frontier(self, directory: DirectoryEntry): self.remove_cache["directory"][directory.id] = None self.insert_cache["directory"].pop(directory.id, None) def directory_set_date_in_isochrone_frontier( self, directory: DirectoryEntry, date: datetime ): self.insert_cache["directory"][directory.id] = date self.remove_cache["directory"].pop(directory.id, None) def insert_all(self): # Performe insertions with cached information if self.insert_cache["content"]: psycopg2.extras.execute_values( self.cursor, """LOCK TABLE ONLY content; INSERT INTO content(sha1, date) VALUES %s ON CONFLICT (sha1) DO UPDATE SET date=LEAST(EXCLUDED.date,content.date)""", self.insert_cache["content"].items(), ) self.insert_cache["content"].clear() if self.insert_cache["directory"]: psycopg2.extras.execute_values( self.cursor, """LOCK TABLE ONLY directory; INSERT INTO directory(sha1, date) VALUES %s ON CONFLICT (sha1) DO UPDATE SET date=LEAST(EXCLUDED.date,directory.date)""", self.insert_cache["directory"].items(), ) self.insert_cache["directory"].clear() if self.insert_cache["revision"]: psycopg2.extras.execute_values( self.cursor, """LOCK TABLE ONLY revision; INSERT INTO revision(sha1, date) VALUES %s ON CONFLICT (sha1) DO UPDATE SET date=LEAST(EXCLUDED.date,revision.date)""", self.insert_cache["revision"].items(), ) self.insert_cache["revision"].clear() # Relations should come after ids for elements were resolved if self.insert_cache["content_early_in_rev"]: self.insert_location("content", "revision", "content_early_in_rev") if self.insert_cache["content_in_dir"]: self.insert_location("content", "directory", "content_in_dir") if self.insert_cache["directory_in_rev"]: self.insert_location("directory", "revision", "directory_in_rev") # if self.insert_cache["revision_before_rev"]: # psycopg2.extras.execute_values( # self.cursor, # """INSERT INTO revision_before_rev VALUES %s # ON CONFLICT DO NOTHING""", # self.insert_cache["revision_before_rev"], # ) # self.insert_cache["revision_before_rev"].clear() # if self.insert_cache["revision_in_org"]: # psycopg2.extras.execute_values( # self.cursor, # """INSERT INTO revision_in_org VALUES %s # ON CONFLICT DO NOTHING""", # self.insert_cache["revision_in_org"], # ) # self.insert_cache["revision_in_org"].clear() def origin_get_id(self, origin: OriginEntry) -> int: if origin.id is None: # Insert origin in the DB and return the assigned id self.cursor.execute( """INSERT INTO origin (url) VALUES (%s) ON CONFLICT DO NOTHING RETURNING id""", (origin.url,), ) return self.cursor.fetchone()[0] else: return origin.id def revision_add(self, revision: RevisionEntry): # Add current revision to the compact DB self.insert_cache["revision"][revision.id] = revision.date def revision_add_before_revision( self, relative: RevisionEntry, revision: RevisionEntry ): self.insert_cache["revision_before_rev"].append((revision.id, relative.id)) def revision_add_to_origin(self, origin: OriginEntry, revision: RevisionEntry): self.insert_cache["revision_in_org"].append((revision.id, origin.id)) def revision_get_early_date(self, revision: RevisionEntry) -> Optional[datetime]: date = self.insert_cache["revision"].get(revision.id, None) if date is None: # If not, check whether it's been query before date = self.select_cache["revision"].get(revision.id, None) if date is None: # Otherwise, query the database and cache the value self.cursor.execute( """SELECT date FROM revision WHERE sha1=%s""", (revision.id,) ) row = self.cursor.fetchone() date = row[0] if row is not None else None self.select_cache["revision"][revision.id] = date return date def revision_get_preferred_origin(self, revision: RevisionEntry) -> int: # TODO: adapt this method to consider cached values self.cursor.execute( """SELECT COALESCE(org,0) FROM revision WHERE sha1=%s""", (revision.id,) ) row = self.cursor.fetchone() # None means revision is not in database; # 0 means revision has no preferred origin return row[0] if row is not None and row[0] != 0 else None def revision_in_history(self, revision: RevisionEntry) -> bool: # TODO: adapt this method to consider cached values self.cursor.execute( """SELECT 1 FROM revision_before_rev JOIN revision ON revision.id=revision_before_rev.prev WHERE revision.sha1=%s""", (revision.id,), ) return self.cursor.fetchone() is not None def revision_set_preferred_origin( self, origin: OriginEntry, revision: RevisionEntry ): # TODO: adapt this method to consider cached values self.cursor.execute( """UPDATE revision SET org=%s WHERE sha1=%s""", (origin.id, revision.id) ) def revision_visited(self, revision: RevisionEntry) -> bool: # TODO: adapt this method to consider cached values self.cursor.execute( """SELECT 1 FROM revision_in_org JOIN revision ON revision.id=revision_in_org.rev WHERE revision.sha1=%s""", (revision.id,), ) return self.cursor.fetchone() is not None diff --git a/swh/provenance/provenance.py b/swh/provenance/provenance.py index 6af9730..715596a 100644 --- a/swh/provenance/provenance.py +++ b/swh/provenance/provenance.py @@ -1,362 +1,368 @@ -import os from datetime import datetime +import os from typing import Dict, Generator, List, Optional, Tuple +from typing_extensions import Protocol, runtime_checkable + from .archive import ArchiveInterface from .model import DirectoryEntry, FileEntry, TreeEntry from .origin import OriginEntry from .revision import RevisionEntry # TODO: consider moving to path utils file together with normalize. def is_child(path: bytes, prefix: bytes) -> bool: return path != prefix and os.path.dirname(path) == prefix -class ProvenanceInterface: - def __init__(self, **kwargs): - raise NotImplementedError - +@runtime_checkable +class ProvenanceInterface(Protocol): def commit(self): - raise NotImplementedError + """Commit currently ongoing transactions in the backend DB""" + ... def content_add_to_directory( self, directory: DirectoryEntry, blob: FileEntry, prefix: bytes - ): - raise NotImplementedError + ) -> None: + ... def content_add_to_revision( self, revision: RevisionEntry, blob: FileEntry, prefix: bytes - ): - raise NotImplementedError + ) -> None: + ... def content_find_first( self, blobid: bytes ) -> Optional[Tuple[bytes, bytes, datetime, bytes]]: - raise NotImplementedError + ... def content_find_all( self, blobid: bytes ) -> Generator[Tuple[bytes, bytes, datetime, bytes], None, None]: - raise NotImplementedError + ... def content_get_early_date(self, blob: FileEntry) -> Optional[datetime]: - raise NotImplementedError + ... def content_get_early_dates(self, blobs: List[FileEntry]) -> Dict[bytes, datetime]: - raise NotImplementedError + ... - def content_set_early_date(self, blob: FileEntry, date: datetime): - raise NotImplementedError + def content_set_early_date(self, blob: FileEntry, date: datetime) -> None: + ... def directory_add_to_revision( self, revision: RevisionEntry, directory: DirectoryEntry, path: bytes - ): - raise NotImplementedError + ) -> None: + ... def directory_get_date_in_isochrone_frontier( self, directory: DirectoryEntry ) -> Optional[datetime]: - raise NotImplementedError + ... def directory_get_dates_in_isochrone_frontier( self, dirs: List[DirectoryEntry] ) -> Dict[bytes, datetime]: - raise NotImplementedError + ... - def directory_invalidate_in_isochrone_frontier(self, directory: DirectoryEntry): - raise NotImplementedError + def directory_invalidate_in_isochrone_frontier( + self, directory: DirectoryEntry + ) -> None: + ... def directory_set_date_in_isochrone_frontier( self, directory: DirectoryEntry, date: datetime - ): - raise NotImplementedError + ) -> None: + ... def origin_get_id(self, origin: OriginEntry) -> int: - raise NotImplementedError + ... - def revision_add(self, revision: RevisionEntry): - raise NotImplementedError + def revision_add(self, revision: RevisionEntry) -> None: + ... def revision_add_before_revision( self, relative: RevisionEntry, revision: RevisionEntry - ): - raise NotImplementedError + ) -> None: + ... - def revision_add_to_origin(self, origin: OriginEntry, revision: RevisionEntry): - raise NotImplementedError + def revision_add_to_origin( + self, origin: OriginEntry, revision: RevisionEntry + ) -> None: + ... def revision_get_early_date(self, revision: RevisionEntry) -> Optional[datetime]: - raise NotImplementedError + ... def revision_get_preferred_origin(self, revision: RevisionEntry) -> int: - raise NotImplementedError + ... def revision_in_history(self, revision: RevisionEntry) -> bool: - raise NotImplementedError + ... def revision_set_preferred_origin( self, origin: OriginEntry, revision: RevisionEntry - ): - raise NotImplementedError + ) -> None: + ... def revision_visited(self, revision: RevisionEntry) -> bool: - raise NotImplementedError + ... def directory_process_content( provenance: ProvenanceInterface, directory: DirectoryEntry, relative: DirectoryEntry -): +) -> None: stack = [(directory, b"")] while stack: current, prefix = stack.pop() for child in iter(current): if isinstance(child, FileEntry): # Add content to the relative directory with the computed prefix. provenance.content_add_to_directory(relative, child, prefix) else: # Recursively walk the child directory. stack.append((child, os.path.join(prefix, child.name))) -def origin_add(provenance: ProvenanceInterface, origin: OriginEntry): +def origin_add(provenance: ProvenanceInterface, origin: OriginEntry) -> None: # TODO: refactor to iterate over origin visit statuses and commit only once # per status. origin.id = provenance.origin_get_id(origin) for revision in origin.revisions: origin_add_revision(provenance, origin, revision) # Commit after each revision provenance.commit() # TODO: verify this! def origin_add_revision( provenance: ProvenanceInterface, origin: OriginEntry, revision: RevisionEntry -): +) -> None: stack: List[Tuple[Optional[RevisionEntry], RevisionEntry]] = [(None, revision)] while stack: relative, current = stack.pop() # Check if current revision has no preferred origin and update if necessary. preferred = provenance.revision_get_preferred_origin(current) if preferred is None: provenance.revision_set_preferred_origin(origin, current) ######################################################################## if relative is None: # This revision is pointed directly by the origin. visited = provenance.revision_visited(current) provenance.revision_add_to_origin(origin, current) if not visited: stack.append((current, current)) else: # This revision is a parent of another one in the history of the # relative revision. for parent in iter(current): visited = provenance.revision_visited(parent) if not visited: # The parent revision has never been seen before pointing # directly to an origin. known = provenance.revision_in_history(parent) if known: # The parent revision is already known in some other # revision's history. We should point it directly to # the origin and (eventually) walk its history. stack.append((None, parent)) else: # The parent revision was never seen before. We should # walk its history and associate it with the same # relative revision. provenance.revision_add_before_revision(relative, parent) stack.append((relative, parent)) else: # The parent revision already points to an origin, so its # history was properly processed before. We just need to # make sure it points to the current origin as well. provenance.revision_add_to_origin(origin, parent) def revision_add( provenance: ProvenanceInterface, archive: ArchiveInterface, revision: RevisionEntry -): +) -> None: assert revision.date is not None assert revision.root is not None # Processed content starting from the revision's root directory. date = provenance.revision_get_early_date(revision) if date is None or revision.date < date: provenance.revision_add(revision) # TODO: add file size filtering revision_process_content( provenance, revision, DirectoryEntry(archive, revision.root, b"") ) # TODO: improve this! Maybe using a max attempt counter? # Ideally Provenance class should guarantee that a commit never fails. while not provenance.commit(): continue class IsochroneNode: def __init__(self, entry: TreeEntry, dates: Dict[bytes, datetime] = {}): self.entry = entry self.date = dates.get(self.entry.id, None) self.children: List[IsochroneNode] = [] self.maxdate: Optional[datetime] = None def add_child( self, child: TreeEntry, dates: Dict[bytes, datetime] = {} ) -> "IsochroneNode": assert isinstance(self.entry, DirectoryEntry) and self.date is None node = IsochroneNode(child, dates=dates) self.children.append(node) return node def build_isochrone_graph( provenance: ProvenanceInterface, revision: RevisionEntry, directory: DirectoryEntry -): +) -> IsochroneNode: assert revision.date is not None # Build the nodes structure root = IsochroneNode(directory) root.date = provenance.directory_get_date_in_isochrone_frontier(directory) stack = [root] while stack: current = stack.pop() assert isinstance(current.entry, DirectoryEntry) if current.date is None or current.date >= revision.date: # If current directory has an associated date in the isochrone frontier that # is greater or equal to the current revision's one, it should be ignored as # the revision is being processed out of order. if current.date is not None and current.date >= revision.date: provenance.directory_invalidate_in_isochrone_frontier(current.entry) current.date = None # Pre-query all known dates for content/directories in the current directory # for the provenance object to have them cached and (potentially) improve # performance. ddates = provenance.directory_get_dates_in_isochrone_frontier( [child for child in current.entry if isinstance(child, DirectoryEntry)] ) fdates = provenance.content_get_early_dates( [child for child in current.entry if isinstance(child, FileEntry)] ) for child in current.entry: # Recursively analyse directory nodes. if isinstance(child, DirectoryEntry): node = current.add_child(child, dates=ddates) stack.append(node) else: current.add_child(child, dates=fdates) # Precalculate max known date for each node in the graph. stack = [root] while stack: current = stack.pop() if current.date is None: if any(map(lambda child: child.maxdate is None, current.children)): # Current node needs to be analysed again after its children. stack.append(current) for child in current.children: if isinstance(child.entry, FileEntry): if child.date is not None: # File node that has been seen before, just use its known # date. child.maxdate = child.date else: # File node that has never been seen before, use current # revision date. child.maxdate = revision.date else: # Recursively analyse directory nodes. stack.append(child) else: maxdates = [] for child in current.children: assert child.maxdate is not None maxdates.append(child.maxdate) current.maxdate = max(maxdates) if maxdates else revision.date else: # Directory node in the frontier, just use its known date. current.maxdate = current.date return root def revision_process_content( provenance: ProvenanceInterface, revision: RevisionEntry, root: DirectoryEntry ): assert revision.date is not None stack = [(build_isochrone_graph(provenance, revision, root), root.name)] while stack: current, path = stack.pop() + assert isinstance(current.entry, DirectoryEntry) if current.date is not None: assert current.date < revision.date # Current directory is an outer isochrone frontier for a previously # processed revision. It should be reused as is. provenance.directory_add_to_revision(revision, current.entry, path) else: # Current directory is not an outer isochrone frontier for any previous # revision. It might be eligible for this one. if is_new_frontier(current, revision): assert current.maxdate is not None # Outer frontier should be moved to current position in the isochrone # graph. This is the first time this directory is found in the isochrone # frontier. provenance.directory_set_date_in_isochrone_frontier( current.entry, current.maxdate ) provenance.directory_add_to_revision(revision, current.entry, path) directory_process_content( provenance, directory=current.entry, relative=current.entry, ) else: # No point moving the frontier here. Either there are no files or they # are being seen for the first time here. Add all blobs to current # revision updating date if necessary, and recursively analyse # subdirectories as candidates to the outer frontier. for child in current.children: if isinstance(child.entry, FileEntry): blob = child.entry if child.date is None or revision.date < child.date: provenance.content_set_early_date(blob, revision.date) provenance.content_add_to_revision(revision, blob, path) else: stack.append((child, os.path.join(path, child.entry.name))) def is_new_frontier(node: IsochroneNode, revision: RevisionEntry) -> bool: assert node.maxdate is not None and revision.date is not None # Using the following condition should we should get an algorithm equivalent to old # version where frontiers are pushed up in the tree whenever possible. return node.maxdate < revision.date # return node.maxdate < revision.date and has_blobs(node) def has_blobs(node: IsochroneNode) -> bool: # We may want to look for files in different ways to decide whether to define a # frontier or not: # 1. Only files in current node: # return any(map(lambda child: isinstance(child.entry, FileEntry), node.children)) # 2. Files anywhere in the isochrone graph # stack = [node] # while stack: # current = stack.pop() # if any( # map(lambda child: isinstance(child.entry, FileEntry), current.children)): # return True # else: # # All children are directory entries. # stack.extend(current.children) # return False # 3. Files in the intermediate directories between current node and any previously # defined frontier: return ( any(map(lambda child: isinstance(child.entry, FileEntry), node.children)) or all( map( lambda child: (not (isinstance(child.entry, DirectoryEntry) and child.date is None)) or has_blobs(child), node.children ) ) ) diff --git a/swh/provenance/storage/archive.py b/swh/provenance/storage/archive.py index 003412f..e31de7c 100644 --- a/swh/provenance/storage/archive.py +++ b/swh/provenance/storage/archive.py @@ -1,49 +1,47 @@ -from typing import List +from typing import Any, Dict, List # from functools import lru_cache from methodtools import lru_cache from swh.storage import get_storage -from ..archive import ArchiveInterface - -class ArchiveStorage(ArchiveInterface): +class ArchiveStorage: def __init__(self, cls: str, **kwargs): self.storage = get_storage(cls, **kwargs) @lru_cache(maxsize=1000000) - def directory_ls(self, id: bytes): + def directory_ls(self, id: bytes) -> List[Dict[str, Any]]: # TODO: filter unused fields return [entry for entry in self.storage.directory_ls(id)] def iter_origins(self): from swh.storage.algos.origin import iter_origins yield from iter_origins(self.storage) def iter_origin_visits(self, origin: str): from swh.storage.algos.origin import iter_origin_visits # TODO: filter unused fields yield from iter_origin_visits(self.storage, origin) def iter_origin_visit_statuses(self, origin: str, visit: int): from swh.storage.algos.origin import iter_origin_visit_statuses # TODO: filter unused fields yield from iter_origin_visit_statuses(self.storage, origin, visit) def release_get(self, ids: List[bytes]): # TODO: filter unused fields yield from self.storage.release_get(ids) def revision_get(self, ids: List[bytes]): # TODO: filter unused fields yield from self.storage.revision_get(ids) def snapshot_get_all_branches(self, snapshot: bytes): from swh.storage.algos.snapshot import snapshot_get_all_branches # TODO: filter unused fields return snapshot_get_all_branches(self.storage, snapshot)