Page MenuHomeSoftware Heritage

Jenkins > .tox.py3.lib.python3.7.site-packages.swh.storage.tests.test_backfill::test_backfiller
Failed

TEST RESULT

Run At
Sep 15 2021, 3:20 PM
Details
swh_storage_backend_config = {'check_config': {'check_write': True}, 'cls': 'postgresql', 'db': "dbname=storage user=postgres host=127.0.0.1 port=2...riter': {'brokers': ['127.0.0.1:56057'], 'client_id': 'kafka_writer-1', 'cls': 'kafka', 'prefix': 'vgaqnifelh-1'}, ...} kafka_prefix = 'vgaqnifelh', kafka_consumer_group = 'test-consumer-vgaqnifelh' kafka_server = '127.0.0.1:56057' caplog = <_pytest.logging.LogCaptureFixture object at 0x7f02453ea198> @patch("swh.storage.backfill.RANGE_GENERATORS", RANGE_GENERATORS) def test_backfiller( swh_storage_backend_config, kafka_prefix: str, kafka_consumer_group: str, kafka_server: str, caplog, ): prefix1 = f"{kafka_prefix}-1" prefix2 = f"{kafka_prefix}-2" journal1 = { "cls": "kafka", "brokers": [kafka_server], "client_id": "kafka_writer-1", "prefix": prefix1, } swh_storage_backend_config["journal_writer"] = journal1 storage = get_storage(**swh_storage_backend_config) # fill the storage and the journal (under prefix1) for object_type, objects in TEST_OBJECTS.items(): method = getattr(storage, object_type + "_add") method(objects) # now apply the backfiller on the storage to fill the journal under prefix2 backfiller_config = { "journal_writer": { "brokers": [kafka_server], "client_id": "kafka_writer-2", "prefix": prefix2, }, "storage": swh_storage_backend_config, } # Backfilling backfiller = JournalBackfiller(backfiller_config) for object_type in TEST_OBJECTS: backfiller.run(object_type, None, None) # Trace log messages for unhandled object types in the replayer caplog.set_level(logging.DEBUG, "swh.storage.replay") # now check journal content are the same under both topics # use the replayer scaffolding to fill storages to make is a bit easier # Replaying #1 sto1 = get_storage(cls="memory") replayer1 = JournalClient( brokers=kafka_server, group_id=f"{kafka_consumer_group}-1", prefix=prefix1, stop_on_eof=True, ) worker_fn1 = functools.partial(process_replay_objects, storage=sto1) > replayer1.process(worker_fn1) .tox/py3/lib/python3.7/site-packages/swh/storage/tests/test_backfill.py:277: _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ .tox/py3/lib/python3.7/site-packages/swh/journal/client.py:265: in process batch_processed, at_eof = self.handle_messages(messages, worker_fn) .tox/py3/lib/python3.7/site-packages/swh/journal/client.py:292: in handle_messages worker_fn(dict(objects)) .tox/py3/lib/python3.7/site-packages/swh/storage/replay.py:62: in process_replay_objects _insert_objects(object_type, objects, storage) .tox/py3/lib/python3.7/site-packages/swh/storage/replay.py:133: in _insert_objects collision_aware_content_add(storage.content_add_metadata, contents) .tox/py3/lib/python3.7/site-packages/swh/storage/replay.py:86: in collision_aware_content_add content_add_fn(contents) .tox/py3/lib/python3.7/site-packages/swh/storage/metrics.py:24: in d return f(*a, **kw) .tox/py3/lib/python3.7/site-packages/swh/storage/metrics.py:77: in d r = f(*a, **kw) .tox/py3/lib/python3.7/site-packages/swh/storage/cassandra/storage.py:278: in content_add_metadata return self._content_add(content, with_data=False) .tox/py3/lib/python3.7/site-packages/swh/storage/cassandra/storage.py:221: in _content_add for row in rows: _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ self = <swh.storage.in_memory.InMemoryStorage object at 0x7f0244634d30> algo = 'sha1_git' hashes = [b'\x05\xa5\x14s\x8e\xfd\xab\x16\x17\x1a92)\xa1\x1fWJG&m'] def _content_get_from_hashes(self, algo, hashes: List[bytes]) -> Iterable: """From the name of a hash algorithm and a value of that hash, looks up the "hash -> token" secondary table (content_by_{algo}) to get tokens. Then, looks up the main table (content) to get all contents with that token, and filters out contents whose hash doesn't match.""" found_tokens = list( self._cql_runner.content_get_tokens_from_single_algo(algo, hashes) ) assert all(isinstance(token, int) for token in found_tokens) # Query the main table ('content'). > rows = self._cql_runner.content_get_from_tokens(found_tokens) E AttributeError: 'InMemoryCqlRunner' object has no attribute 'content_get_from_tokens' .tox/py3/lib/python3.7/site-packages/swh/storage/cassandra/storage.py:172: AttributeError