diff --git a/swh/dataset/journalprocessor.py b/swh/dataset/journalprocessor.py --- a/swh/dataset/journalprocessor.py +++ b/swh/dataset/journalprocessor.py @@ -7,7 +7,6 @@ import concurrent.futures from concurrent.futures import FIRST_EXCEPTION, ProcessPoolExecutor import contextlib -from hashlib import sha1 import logging import multiprocessing from pathlib import Path @@ -21,8 +20,6 @@ from swh.dataset.utils import LevelDBSet from swh.journal.client import JournalClient from swh.journal.serializers import kafka_to_value -from swh.model.hashutil import hash_to_hex -from swh.model.model import Origin from swh.storage.fixer import fix_objects @@ -392,13 +389,16 @@ fixed_objects_by_partition = collections.defaultdict(list) for message in message_list: fixed_objects_by_partition[message.partition()].extend( - fix_objects(object_type, [kafka_to_value(message.value())]) + zip( + [message.key()], + fix_objects(object_type, [kafka_to_value(message.value())]), + ) ) for partition, objects in fixed_objects_by_partition.items(): - for obj in objects: - self.process_message(object_type, partition, obj) + for (key, obj) in objects: + self.process_message(object_type, partition, key, obj) - def process_message(self, object_type, partition, obj): + def process_message(self, object_type, partition, obj_key, obj): """ Process a single incoming Kafka message if the object it refers to has not been processed yet. @@ -406,27 +406,8 @@ It uses an on-disk set to make sure that each object is only ever processed once. """ - if object_type == "origin_visit": - origin_id = hash_to_hex(Origin(url=obj["origin"]).id) - visit = obj["visit"] - node_id = sha1(f"{origin_id}:{visit}".encode()).digest() - elif object_type == "origin_visit_status": - if obj["status"] not in ("partial", "full"): - # Temporary visit object, not useful for the exports - return - origin_id = hash_to_hex(Origin(url=obj["origin"]).id) - visit = obj["visit"] - ts = obj["date"].timestamp() - node_id = sha1(f"{origin_id}:{visit}:{ts}".encode()).digest() - elif object_type == "origin": - node_id = sha1(obj["url"].encode()).digest() - elif object_type in ("content", "skipped_content"): - node_id = obj["sha1_git"] - else: - node_id = obj["id"] - - node_set = self.get_node_set_for_object(partition, node_id) - if not node_set.add(node_id): + node_set = self.get_node_set_for_object(partition, obj_key) + if not node_set.add(obj_key): # Node already processed, skipping. return