Changeset View
Changeset View
Standalone View
Standalone View
swh/provenance/tests/conftest.py
# Copyright (C) 2021 The Software Heritage developers | # Copyright (C) 2021-2022 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
from datetime import datetime, timedelta, timezone | from datetime import datetime | ||||
from os import path | from os import path | ||||
from typing import Any, Dict, Generator, List | from typing import Any, Dict, Generator, List | ||||
from _pytest.fixtures import SubRequest | from _pytest.fixtures import SubRequest | ||||
import msgpack | import msgpack | ||||
import psycopg2.extensions | import psycopg2.extensions | ||||
import pytest | import pytest | ||||
from pytest_postgresql.factories import postgresql | from pytest_postgresql.factories import postgresql | ||||
from swh.journal.serializers import msgpack_ext_hook | from swh.journal.serializers import msgpack_ext_hook | ||||
from swh.model.model import BaseModel | from swh.model.model import BaseModel, TimestampWithTimezone | ||||
from swh.provenance import get_provenance, get_provenance_storage | from swh.provenance import get_provenance, get_provenance_storage | ||||
from swh.provenance.archive import ArchiveInterface | from swh.provenance.archive import ArchiveInterface | ||||
from swh.provenance.interface import ProvenanceInterface, ProvenanceStorageInterface | from swh.provenance.interface import ProvenanceInterface, ProvenanceStorageInterface | ||||
from swh.provenance.storage.archive import ArchiveStorage | from swh.provenance.storage.archive import ArchiveStorage | ||||
from swh.storage.interface import StorageInterface | from swh.storage.interface import StorageInterface | ||||
from swh.storage.replay import OBJECT_CONVERTERS, OBJECT_FIXERS, process_replay_objects | from swh.storage.replay import OBJECT_CONVERTERS, OBJECT_FIXERS, process_replay_objects | ||||
▲ Show 20 Lines • Show All 111 Lines • ▼ Show 20 Lines | def load_repo_data(repo: str) -> Dict[str, List[dict]]: | ||||
with open(get_datafile(f"{repo}.msgpack"), "rb") as fobj: | with open(get_datafile(f"{repo}.msgpack"), "rb") as fobj: | ||||
unpacker = msgpack.Unpacker( | unpacker = msgpack.Unpacker( | ||||
fobj, | fobj, | ||||
raw=False, | raw=False, | ||||
ext_hook=msgpack_ext_hook, | ext_hook=msgpack_ext_hook, | ||||
strict_map_key=False, | strict_map_key=False, | ||||
timestamp=3, # convert Timestamp in datetime objects (tz UTC) | timestamp=3, # convert Timestamp in datetime objects (tz UTC) | ||||
) | ) | ||||
for objtype, objd in unpacker: | for msg in unpacker: | ||||
if len(msg) == 2: # old format | |||||
objtype, objd = msg | |||||
else: # now we should have a triplet (type, key, value) | |||||
objtype, _, objd = msg | |||||
data.setdefault(objtype, []).append(objd) | data.setdefault(objtype, []).append(objd) | ||||
return data | return data | ||||
def objs_from_dict(object_type: str, dict_repr: dict) -> BaseModel: | def objs_from_dict(object_type: str, dict_repr: dict) -> BaseModel: | ||||
if object_type in OBJECT_FIXERS: | if object_type in OBJECT_FIXERS: | ||||
dict_repr = OBJECT_FIXERS[object_type](dict_repr) | dict_repr = OBJECT_FIXERS[object_type](dict_repr) | ||||
obj = OBJECT_CONVERTERS[object_type](dict_repr) | obj = OBJECT_CONVERTERS[object_type](dict_repr) | ||||
return obj | return obj | ||||
# TODO: remove this function in favour of TimestampWithTimezone.to_datetime | |||||
# from swh.model.model | |||||
def ts2dt(ts: Dict[str, Any]) -> datetime: | def ts2dt(ts: Dict[str, Any]) -> datetime: | ||||
timestamp = datetime.fromtimestamp( | return TimestampWithTimezone.from_dict(ts).to_datetime() | ||||
ts["timestamp"]["seconds"], timezone(timedelta(minutes=ts["offset"])) | |||||
) | |||||
return timestamp.replace(microsecond=ts["timestamp"]["microseconds"]) |