diff --git a/swh/provenance/archive.py b/swh/provenance/archive.py --- a/swh/provenance/archive.py +++ b/swh/provenance/archive.py @@ -1,27 +1,20 @@ -from typing import Any, Dict, Iterable, List +from typing import Any, Dict, Iterable, Optional from typing_extensions import Protocol, runtime_checkable +from swh.model.model import Release, Revision, Snapshot + @runtime_checkable class ArchiveInterface(Protocol): - def directory_ls(self, id: bytes) -> List[Dict[str, Any]]: - ... - - def iter_origins(self): - ... - - def iter_origin_visits(self, origin: str): - ... - - def iter_origin_visit_statuses(self, origin: str, visit: int): + def directory_ls(self, id: bytes) -> Iterable[Dict[str, Any]]: ... - def release_get(self, ids: Iterable[bytes]): + def release_get(self, ids: Iterable[bytes]) -> Iterable[Release]: ... - def revision_get(self, ids: Iterable[bytes]): + def revision_get(self, ids: Iterable[bytes]) -> Iterable[Revision]: ... - def snapshot_get_all_branches(self, snapshot: bytes): + def snapshot_get_all_branches(self, snapshot: bytes) -> Optional[Snapshot]: ... diff --git a/swh/provenance/model.py b/swh/provenance/model.py --- a/swh/provenance/model.py +++ b/swh/provenance/model.py @@ -3,7 +3,7 @@ # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information -from datetime import datetime +from datetime import datetime, timedelta, timezone from typing import Iterable, Iterator, List, Optional, Set from swh.core.utils import grouper @@ -39,7 +39,7 @@ for releases in grouper(releases_set, batchsize): targets_set.update( release.target - for release in archive.revision_get(releases) + for release in archive.release_get(releases) if release is not None and release.target_type == ObjectType.REVISION ) @@ -85,16 +85,22 @@ if revision: self._parents = list(revision)[0].parents if self._parents and not self._nodes: - self._nodes = [ - RevisionEntry( - id=rev.id, - root=rev.directory, - date=rev.date, - parents=rev.parents, - ) - for rev in archive.revision_get(self._parents) - if rev - ] + for rev in archive.revision_get(self._parents): + if rev.date is not None: + date = datetime.fromtimestamp( + rev.date.timestamp.seconds, + timezone(timedelta(minutes=rev.date.offset)), + ) + self._nodes.append( + RevisionEntry( + id=rev.id, + root=rev.directory, + date=date.replace( + microsecond=rev.date.timestamp.microseconds + ), + parents=rev.parents, + ) + ) yield from self._nodes def __str__(self): diff --git a/swh/provenance/postgresql/archive.py b/swh/provenance/postgresql/archive.py --- a/swh/provenance/postgresql/archive.py +++ b/swh/provenance/postgresql/archive.py @@ -1,9 +1,9 @@ -from typing import Any, Dict, Iterable, List +from typing import Any, Dict, Iterable, List, Optional from methodtools import lru_cache import psycopg2 -from swh.model.model import Revision +from swh.model.model import Release, Revision, Snapshot from swh.storage.postgresql.storage import Storage @@ -12,11 +12,11 @@ self.conn = conn self.storage = Storage(conn, objstorage={"cls": "memory"}) - def directory_ls(self, id: bytes) -> List[Dict[str, Any]]: + def directory_ls(self, id: bytes) -> Iterable[Dict[str, Any]]: # TODO: only call directory_ls_internal if the id is not being queried by # someone else. Otherwise wait until results get properly cached. entries = self.directory_ls_internal(id) - return entries + yield from entries @lru_cache(maxsize=100000) def directory_ls_internal(self, id: bytes) -> List[Dict[str, Any]]: @@ -62,28 +62,13 @@ for row in cursor.fetchall() ] - def iter_origins(self): - from swh.storage.algos.origin import iter_origins - - yield from iter_origins(self.storage) - - def iter_origin_visits(self, origin: str): - from swh.storage.algos.origin import iter_origin_visits - - # TODO: filter unused fields - yield from iter_origin_visits(self.storage, origin) - - def iter_origin_visit_statuses(self, origin: str, visit: int): - from swh.storage.algos.origin import iter_origin_visit_statuses - - # TODO: filter unused fields - yield from iter_origin_visit_statuses(self.storage, origin, visit) - - def release_get(self, ids: Iterable[bytes]): + def release_get(self, ids: Iterable[bytes]) -> Iterable[Release]: # TODO: filter unused fields - yield from self.storage.release_get(list(ids)) + yield from ( + rel for rel in self.storage.release_get(list(ids)) if rel is not None + ) - def revision_get(self, ids: Iterable[bytes]): + def revision_get(self, ids: Iterable[bytes]) -> Iterable[Revision]: with self.conn.cursor() as cursor: psycopg2.extras.execute_values( cursor, @@ -117,7 +102,7 @@ } ) - def snapshot_get_all_branches(self, snapshot: bytes): + def snapshot_get_all_branches(self, snapshot: bytes) -> Optional[Snapshot]: from swh.storage.algos.snapshot import snapshot_get_all_branches # TODO: filter unused fields diff --git a/swh/provenance/storage/archive.py b/swh/provenance/storage/archive.py --- a/swh/provenance/storage/archive.py +++ b/swh/provenance/storage/archive.py @@ -1,8 +1,6 @@ -from typing import Any, Dict, Iterable, List - -# from functools import lru_cache -from methodtools import lru_cache +from typing import Any, Dict, Iterable, Optional +from swh.model.model import Release, Revision, Snapshot from swh.storage.interface import StorageInterface @@ -10,37 +8,23 @@ def __init__(self, storage: StorageInterface): self.storage = storage - @lru_cache(maxsize=100000) - def directory_ls(self, id: bytes) -> List[Dict[str, Any]]: - # TODO: filter unused fields - return [entry for entry in self.storage.directory_ls(id)] - - def iter_origins(self): - from swh.storage.algos.origin import iter_origins - - yield from iter_origins(self.storage) - - def iter_origin_visits(self, origin: str): - from swh.storage.algos.origin import iter_origin_visits - - # TODO: filter unused fields - yield from iter_origin_visits(self.storage, origin) - - def iter_origin_visit_statuses(self, origin: str, visit: int): - from swh.storage.algos.origin import iter_origin_visit_statuses - + def directory_ls(self, id: bytes) -> Iterable[Dict[str, Any]]: # TODO: filter unused fields - yield from iter_origin_visit_statuses(self.storage, origin, visit) + yield from self.storage.directory_ls(id) - def release_get(self, ids: Iterable[bytes]): + def release_get(self, ids: Iterable[bytes]) -> Iterable[Release]: # TODO: filter unused fields - yield from self.storage.release_get(list(ids)) + yield from ( + rel for rel in self.storage.release_get(list(ids)) if rel is not None + ) - def revision_get(self, ids: Iterable[bytes]): + def revision_get(self, ids: Iterable[bytes]) -> Iterable[Revision]: # TODO: filter unused fields - yield from self.storage.revision_get(list(ids)) + yield from ( + rev for rev in self.storage.revision_get(list(ids)) if rev is not None + ) - def snapshot_get_all_branches(self, snapshot: bytes): + def snapshot_get_all_branches(self, snapshot: bytes) -> Optional[Snapshot]: from swh.storage.algos.snapshot import snapshot_get_all_branches # TODO: filter unused fields