Changeset View
Changeset View
Standalone View
Standalone View
swh/loader/core/tests/test_loader.py
Show All 34 Lines | def prepare_origin_visit(self, *args, **kwargs): | ||||
self.storage.origin_add([ORIGIN]) | self.storage.origin_add([ORIGIN]) | ||||
visit = OriginVisit( | visit = OriginVisit( | ||||
origin=self.origin_url, date=self.visit_date, type=self.visit_type, | origin=self.origin_url, date=self.visit_date, type=self.visit_type, | ||||
) | ) | ||||
self.visit = self.storage.origin_visit_add([visit])[0] | self.visit = self.storage.origin_visit_add([visit])[0] | ||||
class DummyDVCSLoader(DummyLoader, DVCSLoader): | class DummyDVCSLoader(DummyLoader, DVCSLoader): | ||||
"""Unbuffered loader will send directly to storage new data | |||||
""" | |||||
def parse_config_file(self, *args, **kwargs): | |||||
return { | |||||
"max_content_size": 100 * 1024 * 1024, | |||||
"storage": { | |||||
"cls": "pipeline", | |||||
"steps": [{"cls": "retry",}, {"cls": "filter",}, {"cls": "memory",},], | |||||
}, | |||||
} | |||||
def get_contents(self): | def get_contents(self): | ||||
return [] | return [] | ||||
def get_directories(self): | def get_directories(self): | ||||
return [] | return [] | ||||
def get_revisions(self): | def get_revisions(self): | ||||
return [] | return [] | ||||
def get_releases(self): | def get_releases(self): | ||||
return [] | return [] | ||||
def get_snapshot(self): | def get_snapshot(self): | ||||
return Snapshot(branches={}) | return Snapshot(branches={}) | ||||
def eventful(self): | def eventful(self): | ||||
return False | return False | ||||
class DummyBaseLoader(DummyLoader, BaseLoader): | class DummyBaseLoader(DummyLoader, BaseLoader): | ||||
"""Buffered loader will send new data when threshold is reached | |||||
""" | |||||
def parse_config_file(self, *args, **kwargs): | |||||
return { | |||||
"max_content_size": 100 * 1024 * 1024, | |||||
"storage": { | |||||
"cls": "pipeline", | |||||
"steps": [ | |||||
{"cls": "retry",}, | |||||
{"cls": "filter",}, | |||||
{ | |||||
"cls": "buffer", | |||||
"min_batch_size": { | |||||
"content": 2, | |||||
"content_bytes": 8, | |||||
"directory": 2, | |||||
"revision": 2, | |||||
"release": 2, | |||||
}, | |||||
}, | |||||
{"cls": "memory",}, | |||||
], | |||||
}, | |||||
} | |||||
def store_data(self): | def store_data(self): | ||||
pass | pass | ||||
def test_base_loader(): | def test_base_loader(swh_config): | ||||
loader = DummyBaseLoader() | loader = DummyBaseLoader() | ||||
result = loader.load() | result = loader.load() | ||||
assert result == {"status": "eventful"} | assert result == {"status": "eventful"} | ||||
def test_dvcs_loader(): | def test_dvcs_loader(swh_config): | ||||
loader = DummyDVCSLoader() | loader = DummyDVCSLoader() | ||||
result = loader.load() | result = loader.load() | ||||
assert result == {"status": "eventful"} | assert result == {"status": "eventful"} | ||||
def test_loader_logger_default_name(): | def test_loader_logger_default_name(swh_config): | ||||
loader = DummyBaseLoader() | loader = DummyBaseLoader() | ||||
assert isinstance(loader.log, logging.Logger) | assert isinstance(loader.log, logging.Logger) | ||||
assert loader.log.name == "swh.loader.core.tests.test_loader.DummyBaseLoader" | assert loader.log.name == "swh.loader.core.tests.test_loader.DummyBaseLoader" | ||||
loader = DummyDVCSLoader() | loader = DummyDVCSLoader() | ||||
assert isinstance(loader.log, logging.Logger) | assert isinstance(loader.log, logging.Logger) | ||||
assert loader.log.name == "swh.loader.core.tests.test_loader.DummyDVCSLoader" | assert loader.log.name == "swh.loader.core.tests.test_loader.DummyDVCSLoader" | ||||
def test_loader_logger_with_name(): | def test_loader_logger_with_name(swh_config): | ||||
loader = DummyBaseLoader("some.logger.name") | loader = DummyBaseLoader("some.logger.name") | ||||
assert isinstance(loader.log, logging.Logger) | assert isinstance(loader.log, logging.Logger) | ||||
assert loader.log.name == "some.logger.name" | assert loader.log.name == "some.logger.name" | ||||
def test_loader_save_data_path(tmp_path): | def test_loader_save_data_path(tmp_path, swh_config): | ||||
loader = DummyBaseLoader("some.logger.name.1") | loader = DummyBaseLoader("some.logger.name.1") | ||||
url = "http://bitbucket.org/something" | url = "http://bitbucket.org/something" | ||||
loader.origin = Origin(url=url) | loader.origin = Origin(url=url) | ||||
loader.visit_date = datetime.datetime(year=2019, month=10, day=1) | loader.visit_date = datetime.datetime(year=2019, month=10, day=1) | ||||
loader.config = { | loader.config = { | ||||
"save_data_path": tmp_path, | "save_data_path": tmp_path, | ||||
} | } | ||||
Show All 29 Lines | |||||
class DummyDVCSLoaderExc(DummyDVCSLoader): | class DummyDVCSLoaderExc(DummyDVCSLoader): | ||||
"""A loader which raises an exception when loading some contents""" | """A loader which raises an exception when loading some contents""" | ||||
def get_contents(self): | def get_contents(self): | ||||
raise RuntimeError("Failed to get contents!") | raise RuntimeError("Failed to get contents!") | ||||
def test_dvcs_loader_exc_partial_visit(caplog): | def test_dvcs_loader_exc_partial_visit(swh_config, caplog): | ||||
logger_name = "dvcsloaderexc" | logger_name = "dvcsloaderexc" | ||||
caplog.set_level(logging.ERROR, logger=logger_name) | caplog.set_level(logging.ERROR, logger=logger_name) | ||||
loader = DummyDVCSLoaderExc(logging_class=logger_name) | loader = DummyDVCSLoaderExc(logging_class=logger_name) | ||||
result = loader.load() | result = loader.load() | ||||
assert result == {"status": "failed"} | assert result == {"status": "failed"} | ||||
Show All 14 Lines | |||||
class DummyDVCSLoaderStorageExc(DummyDVCSLoader): | class DummyDVCSLoaderStorageExc(DummyDVCSLoader): | ||||
"""A loader which raises an exception when loading some contents""" | """A loader which raises an exception when loading some contents""" | ||||
def __init__(self, *args, **kwargs): | def __init__(self, *args, **kwargs): | ||||
super().__init__(*args, **kwargs) | super().__init__(*args, **kwargs) | ||||
self.storage = BrokenStorageProxy(self.storage) | self.storage = BrokenStorageProxy(self.storage) | ||||
def test_dvcs_loader_storage_exc_partial_visit(caplog): | def test_dvcs_loader_storage_exc_partial_visit(swh_config, caplog): | ||||
logger_name = "dvcsloaderexc" | logger_name = "dvcsloaderexc" | ||||
caplog.set_level(logging.ERROR, logger=logger_name) | caplog.set_level(logging.ERROR, logger=logger_name) | ||||
loader = DummyDVCSLoaderStorageExc(logging_class=logger_name) | loader = DummyDVCSLoaderStorageExc(logging_class=logger_name) | ||||
result = loader.load() | result = loader.load() | ||||
assert result == {"status": "failed"} | assert result == {"status": "failed"} | ||||
_check_load_failure(caplog, loader, RuntimeError, "Failed to add snapshot!") | _check_load_failure(caplog, loader, RuntimeError, "Failed to add snapshot!") |