Changeset View
Changeset View
Standalone View
Standalone View
swh/provenance/tests/conftest.py
# Copyright (C) 2021 The Software Heritage developers | # Copyright (C) 2021 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
from datetime import datetime, timedelta, timezone | |||||
from os import path | from os import path | ||||
from typing import Any, Dict, Iterable, Iterator | from typing import Any, Dict, Iterable, Iterator | ||||
from _pytest.fixtures import SubRequest | from _pytest.fixtures import SubRequest | ||||
import msgpack | import msgpack | ||||
import psycopg2.extensions | import psycopg2.extensions | ||||
import pytest | import pytest | ||||
from swh.journal.serializers import msgpack_ext_hook | from swh.journal.serializers import msgpack_ext_hook | ||||
from swh.model.tests.swh_model_data import TEST_OBJECTS | |||||
from swh.provenance import get_provenance, get_provenance_storage | from swh.provenance import get_provenance, get_provenance_storage | ||||
from swh.provenance.api.client import RemoteProvenanceStorage | from swh.provenance.api.client import RemoteProvenanceStorage | ||||
import swh.provenance.api.server as server | import swh.provenance.api.server as server | ||||
from swh.provenance.archive import ArchiveInterface | from swh.provenance.archive import ArchiveInterface | ||||
from swh.provenance.interface import ProvenanceInterface, ProvenanceStorageInterface | from swh.provenance.interface import ProvenanceInterface, ProvenanceStorageInterface | ||||
from swh.provenance.postgresql.archive import ArchivePostgreSQL | |||||
from swh.provenance.storage.archive import ArchiveStorage | from swh.provenance.storage.archive import ArchiveStorage | ||||
from swh.storage.postgresql.storage import Storage | from swh.storage.interface import StorageInterface | ||||
from swh.storage.replay import process_replay_objects | from swh.storage.replay import process_replay_objects | ||||
@pytest.fixture( | @pytest.fixture( | ||||
params=[ | params=[ | ||||
"with-path", | "with-path", | ||||
"without-path", | "without-path", | ||||
"with-path-denormalized", | "with-path-denormalized", | ||||
▲ Show 20 Lines • Show All 43 Lines • ▼ Show 20 Lines | ) -> ProvenanceInterface: | ||||
else: | else: | ||||
# in test sessions, we DO want to raise any exception occurring at commit time | # in test sessions, we DO want to raise any exception occurring at commit time | ||||
prov = get_provenance(cls=request.param, db=populated_db, raise_on_commit=True) | prov = get_provenance(cls=request.param, db=populated_db, raise_on_commit=True) | ||||
return prov | return prov | ||||
@pytest.fixture | @pytest.fixture | ||||
def swh_storage_with_objects(swh_storage: Storage) -> Storage: | def archive(swh_storage: StorageInterface) -> ArchiveInterface: | ||||
"""return a Storage object (postgresql-based by default) with a few of each | """Return an ArchiveStorage-based ArchiveInterface object""" | ||||
object type in it | return ArchiveStorage(swh_storage) | ||||
The inserted content comes from swh.model.tests.swh_model_data. | |||||
""" | |||||
for obj_type in ( | |||||
"content", | |||||
"skipped_content", | |||||
"directory", | |||||
"revision", | |||||
"release", | |||||
"snapshot", | |||||
"origin", | |||||
"origin_visit", | |||||
"origin_visit_status", | |||||
): | |||||
getattr(swh_storage, f"{obj_type}_add")(TEST_OBJECTS[obj_type]) | |||||
return swh_storage | |||||
@pytest.fixture | |||||
def archive_direct(swh_storage_with_objects: Storage) -> ArchiveInterface: | |||||
return ArchivePostgreSQL(swh_storage_with_objects.get_db().conn) | |||||
@pytest.fixture | |||||
def archive_api(swh_storage_with_objects: Storage) -> ArchiveInterface: | |||||
return ArchiveStorage(swh_storage_with_objects) | |||||
@pytest.fixture(params=["archive", "db"]) | |||||
def archive(request, swh_storage_with_objects: Storage) -> Iterator[ArchiveInterface]: | |||||
"""Return a ArchivePostgreSQL based StorageInterface object""" | |||||
# this is a workaround to prevent tests from hanging because of an unclosed | |||||
# transaction. | |||||
# TODO: refactor the ArchivePostgreSQL to properly deal with | |||||
# transactions and get rid of this fixture | |||||
if request.param == "db": | |||||
archive = ArchivePostgreSQL(conn=swh_storage_with_objects.get_db().conn) | |||||
yield archive | |||||
archive.conn.rollback() | |||||
else: | |||||
yield ArchiveStorage(swh_storage_with_objects) | |||||
def get_datafile(fname: str) -> str: | def get_datafile(fname: str) -> str: | ||||
return path.join(path.dirname(__file__), "data", fname) | return path.join(path.dirname(__file__), "data", fname) | ||||
def load_repo_data(repo: str) -> Dict[str, Any]: | def load_repo_data(repo: str) -> Dict[str, Any]: | ||||
data: Dict[str, Any] = {} | data: Dict[str, Any] = {} | ||||
Show All 9 Lines | with open(get_datafile(f"{repo}.msgpack"), "rb") as fobj: | ||||
data.setdefault(objtype, []).append(objd) | data.setdefault(objtype, []).append(objd) | ||||
return data | return data | ||||
def filter_dict(d: Dict[Any, Any], keys: Iterable[Any]) -> Dict[Any, Any]: | def filter_dict(d: Dict[Any, Any], keys: Iterable[Any]) -> Dict[Any, Any]: | ||||
return {k: v for (k, v) in d.items() if k in keys} | return {k: v for (k, v) in d.items() if k in keys} | ||||
def fill_storage(storage: Storage, data: Dict[str, Any]) -> None: | def fill_storage(storage: StorageInterface, data: Dict[str, Any]) -> None: | ||||
process_replay_objects(data, storage=storage) | process_replay_objects(data, storage=storage) | ||||
# TODO: remove this function in favour of TimestampWithTimezone.to_datetime | |||||
# from swh.model.model | |||||
def ts2dt(ts: Dict[str, Any]) -> datetime: | |||||
timestamp = datetime.fromtimestamp( | |||||
ts["timestamp"]["seconds"], timezone(timedelta(minutes=ts["offset"])) | |||||
) | |||||
return timestamp.replace(microsecond=ts["timestamp"]["microseconds"]) |