Changeset View
Changeset View
Standalone View
Standalone View
swh/provenance/tests/conftest.py
Show All 14 Lines | |||||
from pytest_postgresql.factories import postgresql | from pytest_postgresql.factories import postgresql | ||||
from swh.journal.serializers import msgpack_ext_hook | from swh.journal.serializers import msgpack_ext_hook | ||||
from swh.provenance import get_provenance, get_provenance_storage | from swh.provenance import get_provenance, get_provenance_storage | ||||
from swh.provenance.archive import ArchiveInterface | from swh.provenance.archive import ArchiveInterface | ||||
from swh.provenance.interface import ProvenanceInterface, ProvenanceStorageInterface | from swh.provenance.interface import ProvenanceInterface, ProvenanceStorageInterface | ||||
from swh.provenance.storage.archive import ArchiveStorage | from swh.provenance.storage.archive import ArchiveStorage | ||||
from swh.storage.interface import StorageInterface | from swh.storage.interface import StorageInterface | ||||
from swh.storage.replay import process_replay_objects | from swh.storage.replay import OBJECT_CONVERTERS, process_replay_objects | ||||
@pytest.fixture( | @pytest.fixture( | ||||
params=[ | params=[ | ||||
"with-path", | "with-path", | ||||
"without-path", | "without-path", | ||||
"with-path-denormalized", | "with-path-denormalized", | ||||
"without-path-denormalized", | "without-path-denormalized", | ||||
▲ Show 20 Lines • Show All 67 Lines • ▼ Show 20 Lines | def archive(swh_storage: StorageInterface) -> ArchiveInterface: | ||||
"""Return an ArchiveStorage-based ArchiveInterface object""" | """Return an ArchiveStorage-based ArchiveInterface object""" | ||||
return ArchiveStorage(swh_storage) | return ArchiveStorage(swh_storage) | ||||
def get_datafile(fname: str) -> str: | def get_datafile(fname: str) -> str: | ||||
return path.join(path.dirname(__file__), "data", fname) | return path.join(path.dirname(__file__), "data", fname) | ||||
def load_repo_data(repo: str) -> Dict[str, Any]: | def load_repo_data(repo: str) -> Dict[str, Any]: | ||||
aeviso: I've realized thanks to this fix that the type here can actually be refined to `Dict[str, List… | |||||
data: Dict[str, Any] = {} | data: Dict[str, Any] = {} | ||||
with open(get_datafile(f"{repo}.msgpack"), "rb") as fobj: | with open(get_datafile(f"{repo}.msgpack"), "rb") as fobj: | ||||
unpacker = msgpack.Unpacker( | unpacker = msgpack.Unpacker( | ||||
fobj, | fobj, | ||||
raw=False, | raw=False, | ||||
ext_hook=msgpack_ext_hook, | ext_hook=msgpack_ext_hook, | ||||
strict_map_key=False, | strict_map_key=False, | ||||
timestamp=3, # convert Timestamp in datetime objects (tz UTC) | timestamp=3, # convert Timestamp in datetime objects (tz UTC) | ||||
) | ) | ||||
for objtype, objd in unpacker: | for objtype, objd in unpacker: | ||||
data.setdefault(objtype, []).append(objd) | data.setdefault(objtype, []).append(objd) | ||||
return data | return data | ||||
def filter_dict(d: Dict[Any, Any], keys: Iterable[Any]) -> Dict[Any, Any]: | def filter_dict(d: Dict[Any, Any], keys: Iterable[Any]) -> Dict[Any, Any]: | ||||
Not Done Inline ActionsAnd this function should have been removed before aeviso: And this function should have been removed before | |||||
return {k: v for (k, v) in d.items() if k in keys} | return {k: v for (k, v) in d.items() if k in keys} | ||||
def fill_storage(storage: StorageInterface, data: Dict[str, Any]) -> None: | def fill_storage(storage: StorageInterface, data: Dict[str, Any]) -> None: | ||||
Not Done Inline ActionsI did almost the same but also considered if object_type in OBJECT_FIXERS: dict_repr = OBJECT_FIXERS[object_type](dict_repr) from ModelObjectDeserializer.convert. Hence, I wrote this in a separate function object_from_dict aeviso: I did almost the same but also considered
```if object_type in OBJECT_FIXERS:
dict_repr… | |||||
Done Inline ActionsYeah, the serialized data can be regenerated at will, so the fixers aren't really critical. I guess adding them is probably the safest way to go though. olasd: Yeah, the serialized data can be regenerated at will, so the fixers aren't really critical. I… | |||||
data = { | |||||
object_type: [OBJECT_CONVERTERS[object_type](d) for d in values] | |||||
for object_type, values in data.items() | |||||
} | |||||
process_replay_objects(data, storage=storage) | process_replay_objects(data, storage=storage) | ||||
# TODO: remove this function in favour of TimestampWithTimezone.to_datetime | # TODO: remove this function in favour of TimestampWithTimezone.to_datetime | ||||
# from swh.model.model | # from swh.model.model | ||||
def ts2dt(ts: Dict[str, Any]) -> datetime: | def ts2dt(ts: Dict[str, Any]) -> datetime: | ||||
timestamp = datetime.fromtimestamp( | timestamp = datetime.fromtimestamp( | ||||
ts["timestamp"]["seconds"], timezone(timedelta(minutes=ts["offset"])) | ts["timestamp"]["seconds"], timezone(timedelta(minutes=ts["offset"])) | ||||
) | ) | ||||
return timestamp.replace(microsecond=ts["timestamp"]["microseconds"]) | return timestamp.replace(microsecond=ts["timestamp"]["microseconds"]) |
I've realized thanks to this fix that the type here can actually be refined to Dict[str, List[Any]]. Also in the declaration of fill_storage