Changeset View
Changeset View
Standalone View
Standalone View
swh/provenance/tests/conftest.py
# Copyright (C) 2021 The Software Heritage developers | # Copyright (C) 2021 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
from datetime import datetime, timedelta, timezone | from datetime import datetime, timedelta, timezone | ||||
from os import path | from os import path | ||||
from typing import Any, Dict, Generator, Iterable | from typing import Any, Dict, Generator, List | ||||
from _pytest.fixtures import SubRequest | from _pytest.fixtures import SubRequest | ||||
import mongomock.database | import mongomock.database | ||||
import msgpack | import msgpack | ||||
import psycopg2.extensions | import psycopg2.extensions | ||||
import pytest | import pytest | ||||
from pytest_postgresql.factories import postgresql | from pytest_postgresql.factories import postgresql | ||||
from swh.journal.serializers import msgpack_ext_hook | from swh.journal.serializers import msgpack_ext_hook | ||||
from swh.model.model import BaseModel | |||||
from swh.provenance import get_provenance, get_provenance_storage | from swh.provenance import get_provenance, get_provenance_storage | ||||
from swh.provenance.archive import ArchiveInterface | from swh.provenance.archive import ArchiveInterface | ||||
from swh.provenance.interface import ProvenanceInterface, ProvenanceStorageInterface | from swh.provenance.interface import ProvenanceInterface, ProvenanceStorageInterface | ||||
from swh.provenance.storage.archive import ArchiveStorage | from swh.provenance.storage.archive import ArchiveStorage | ||||
from swh.storage.interface import StorageInterface | from swh.storage.interface import StorageInterface | ||||
from swh.storage.replay import OBJECT_CONVERTERS, process_replay_objects | from swh.storage.replay import OBJECT_CONVERTERS, OBJECT_FIXERS, process_replay_objects | ||||
@pytest.fixture( | @pytest.fixture( | ||||
params=[ | params=[ | ||||
"with-path", | "with-path", | ||||
"without-path", | "without-path", | ||||
"with-path-denormalized", | "with-path-denormalized", | ||||
"without-path-denormalized", | "without-path-denormalized", | ||||
▲ Show 20 Lines • Show All 63 Lines • ▼ Show 20 Lines | |||||
@pytest.fixture | @pytest.fixture | ||||
def archive(swh_storage: StorageInterface) -> ArchiveInterface: | def archive(swh_storage: StorageInterface) -> ArchiveInterface: | ||||
"""Return an ArchiveStorage-based ArchiveInterface object""" | """Return an ArchiveStorage-based ArchiveInterface object""" | ||||
return ArchiveStorage(swh_storage) | return ArchiveStorage(swh_storage) | ||||
def fill_storage(storage: StorageInterface, data: Dict[str, List[dict]]) -> None: | |||||
objects = { | |||||
objtype: [objs_from_dict(objtype, d) for d in dicts] | |||||
for objtype, dicts in data.items() | |||||
} | |||||
process_replay_objects(objects, storage=storage) | |||||
def get_datafile(fname: str) -> str: | def get_datafile(fname: str) -> str: | ||||
return path.join(path.dirname(__file__), "data", fname) | return path.join(path.dirname(__file__), "data", fname) | ||||
def load_repo_data(repo: str) -> Dict[str, Any]: | # TODO: this should return Dict[str, List[BaseModel]] directly, but it requires | ||||
data: Dict[str, Any] = {} | # refactoring several tests | ||||
def load_repo_data(repo: str) -> Dict[str, List[dict]]: | |||||
data: Dict[str, List[dict]] = {} | |||||
with open(get_datafile(f"{repo}.msgpack"), "rb") as fobj: | with open(get_datafile(f"{repo}.msgpack"), "rb") as fobj: | ||||
unpacker = msgpack.Unpacker( | unpacker = msgpack.Unpacker( | ||||
fobj, | fobj, | ||||
raw=False, | raw=False, | ||||
ext_hook=msgpack_ext_hook, | ext_hook=msgpack_ext_hook, | ||||
strict_map_key=False, | strict_map_key=False, | ||||
timestamp=3, # convert Timestamp in datetime objects (tz UTC) | timestamp=3, # convert Timestamp in datetime objects (tz UTC) | ||||
) | ) | ||||
for objtype, objd in unpacker: | for objtype, objd in unpacker: | ||||
data.setdefault(objtype, []).append(objd) | data.setdefault(objtype, []).append(objd) | ||||
return data | return data | ||||
def filter_dict(d: Dict[Any, Any], keys: Iterable[Any]) -> Dict[Any, Any]: | def objs_from_dict(object_type: str, dict_repr: dict) -> BaseModel: | ||||
return {k: v for (k, v) in d.items() if k in keys} | if object_type in OBJECT_FIXERS: | ||||
dict_repr = OBJECT_FIXERS[object_type](dict_repr) | |||||
obj = OBJECT_CONVERTERS[object_type](dict_repr) | |||||
def fill_storage(storage: StorageInterface, data: Dict[str, Any]) -> None: | return obj | ||||
data = { | |||||
object_type: [OBJECT_CONVERTERS[object_type](d) for d in values] | |||||
for object_type, values in data.items() | |||||
} | |||||
process_replay_objects(data, storage=storage) | |||||
# TODO: remove this function in favour of TimestampWithTimezone.to_datetime | # TODO: remove this function in favour of TimestampWithTimezone.to_datetime | ||||
# from swh.model.model | # from swh.model.model | ||||
def ts2dt(ts: Dict[str, Any]) -> datetime: | def ts2dt(ts: Dict[str, Any]) -> datetime: | ||||
timestamp = datetime.fromtimestamp( | timestamp = datetime.fromtimestamp( | ||||
ts["timestamp"]["seconds"], timezone(timedelta(minutes=ts["offset"])) | ts["timestamp"]["seconds"], timezone(timedelta(minutes=ts["offset"])) | ||||
) | ) | ||||
return timestamp.replace(microsecond=ts["timestamp"]["microseconds"]) | return timestamp.replace(microsecond=ts["timestamp"]["microseconds"]) |