Changeset View
Changeset View
Standalone View
Standalone View
swh/loader/core/tests/test_loader.py
Show All 16 Lines | from swh.loader.core.loader import ( | ||||
SENTRY_VISIT_TYPE_TAG_NAME, | SENTRY_VISIT_TYPE_TAG_NAME, | ||||
BaseLoader, | BaseLoader, | ||||
ContentLoader, | ContentLoader, | ||||
DirectoryLoader, | DirectoryLoader, | ||||
DVCSLoader, | DVCSLoader, | ||||
) | ) | ||||
from swh.loader.core.metadata_fetchers import MetadataFetcherProtocol | from swh.loader.core.metadata_fetchers import MetadataFetcherProtocol | ||||
from swh.loader.exception import NotFound, UnsupportedChecksumComputation | from swh.loader.exception import NotFound, UnsupportedChecksumComputation | ||||
from swh.loader.tests import assert_last_visit_matches | from swh.loader.tests import assert_last_visit_matches, get_stats | ||||
from swh.model.hashutil import hash_to_bytes | from swh.model.hashutil import hash_to_bytes | ||||
from swh.model.model import ( | from swh.model.model import ( | ||||
MetadataAuthority, | MetadataAuthority, | ||||
MetadataAuthorityType, | MetadataAuthorityType, | ||||
MetadataFetcher, | MetadataFetcher, | ||||
Origin, | Origin, | ||||
RawExtrinsicMetadata, | RawExtrinsicMetadata, | ||||
Snapshot, | Snapshot, | ||||
SnapshotBranch, | |||||
TargetType, | |||||
) | ) | ||||
import swh.storage.exc | import swh.storage.exc | ||||
from .conftest import compute_hashes, compute_nar_hashes, nix_store_missing | from .conftest import compute_hashes, compute_nar_hashes, nix_store_missing | ||||
ORIGIN = Origin(url="some-url") | ORIGIN = Origin(url="some-url") | ||||
PARENT_ORIGIN = Origin(url="base-origin-url") | PARENT_ORIGIN = Origin(url="base-origin-url") | ||||
▲ Show 20 Lines • Show All 55 Lines • ▼ Show 20 Lines | class DummyDVCSLoader(DummyLoader, DVCSLoader): | ||||
def eventful(self): | def eventful(self): | ||||
return False | return False | ||||
class DummyBaseLoader(DummyLoader, BaseLoader): | class DummyBaseLoader(DummyLoader, BaseLoader): | ||||
"""Buffered loader will send new data when threshold is reached""" | """Buffered loader will send new data when threshold is reached""" | ||||
def store_data(self): | def store_data(self, create_partial_snapshot: bool = False): | ||||
pass | pass | ||||
class DummyMetadataFetcher: | class DummyMetadataFetcher: | ||||
SUPPORTED_LISTERS = {"fake-forge"} | SUPPORTED_LISTERS = {"fake-forge"} | ||||
FETCHER_NAME = "fake-forge" | FETCHER_NAME = "fake-forge" | ||||
def __init__(self, origin, credentials, lister_name, lister_instance_name): | def __init__(self, origin, credentials, lister_name, lister_instance_name): | ||||
▲ Show 20 Lines • Show All 321 Lines • ▼ Show 20 Lines | def test_dvcs_loader_exc_partial_visit(swh_storage, caplog): | ||||
_check_load_failure( | _check_load_failure( | ||||
caplog, | caplog, | ||||
loader, | loader, | ||||
RuntimeError, | RuntimeError, | ||||
"Failed to get contents!", | "Failed to get contents!", | ||||
) | ) | ||||
class DummyDVCSLoaderWithSnapshots(DummyDVCSLoader): | |||||
"""Dummy DVCS loader which simulates one visit with multiple snapshots creation | |||||
during ingestion.""" | |||||
call = 0 | |||||
def get_snapshot(self): | |||||
"""Simulate a different built snapshot depending on the loader state.""" | |||||
if self.call == 0: | |||||
return Snapshot(branches={}) | |||||
else: | |||||
# Another dummy snapshot | |||||
return Snapshot( | |||||
branches={ | |||||
b"alias": SnapshotBranch( | |||||
target=hash_to_bytes(b"0" * 20), | |||||
target_type=TargetType.DIRECTORY, | |||||
) | |||||
} | |||||
) | |||||
def fetch_data(self) -> bool: | |||||
# Simulate we fetched data but we need some more to fetch. | |||||
# The first time, it will be True, after that, False | |||||
return self.call == 0 | |||||
def store_data(self, create_partial_snapshot: bool = False): | |||||
"""Store data and simulate we need more data to fetch. This will be True the | |||||
first call time, False after that. | |||||
""" | |||||
# Let's store data as is | |||||
super().store_data(create_partial_snapshot) | |||||
# and then return some more data to fetch for the first call. That will end up | |||||
# calling the initial store_data again (and create a 2nd snapshot). Later call | |||||
# just returns False beyond the 2nd call | |||||
self.call += 1 | |||||
def test_dvcs_loader_ingestion_with_partial_snapshots(swh_storage): | |||||
loader = DummyDVCSLoaderWithSnapshots( | |||||
swh_storage, "dummy-url", create_partial_snapshot=True | |||||
) | |||||
result = loader.load() | |||||
# loading failed | |||||
assert result == {"status": "eventful"} | |||||
expected_stats = { | |||||
"origin": 1, # only 1 origin | |||||
"origin_visit": 1, # with 1 visit | |||||
"snapshot": 1 + 1, # but 2 snapshots | |||||
} | |||||
actual_stats = get_stats(swh_storage) | |||||
for key in expected_stats.keys(): | |||||
assert actual_stats[key] == expected_stats[key] | |||||
class BrokenStorageProxy: | class BrokenStorageProxy: | ||||
def __init__(self, storage): | def __init__(self, storage): | ||||
self.storage = storage | self.storage = storage | ||||
def __getattr__(self, attr): | def __getattr__(self, attr): | ||||
return getattr(self.storage, attr) | return getattr(self.storage, attr) | ||||
def snapshot_add(self, snapshots): | def snapshot_add(self, snapshots): | ||||
▲ Show 20 Lines • Show All 377 Lines • Show Last 20 Lines |