diff --git a/swh/dataset/journalprocessor.py b/swh/dataset/journalprocessor.py --- a/swh/dataset/journalprocessor.py +++ b/swh/dataset/journalprocessor.py @@ -22,6 +22,8 @@ from swh.journal.serializers import kafka_to_value from swh.storage.fixer import fix_objects +logger = logging.getLogger(__name__) + class JournalClientOffsetRanges(JournalClient): """ @@ -62,7 +64,7 @@ def subscribe(self): self.topic_name = self.subscription[0] time.sleep(0.1) # https://github.com/edenhill/librdkafka/issues/1983 - logging.debug("Changing assignment to %s", str(self.assignment)) + logger.debug("Changing assignment to %s", str(self.assignment)) self.consumer.assign( [TopicPartition(self.topic_name, pid) for pid in self.assignment] ) @@ -198,13 +200,18 @@ # topic/partition available for consumption + 1 def fetch_insert_partition_id(partition_id): + logger.debug(f"Fetching offset for partition {partition_id}") tp = TopicPartition(topic_name, partition_id) (lo, hi) = client.consumer.get_watermark_offsets(tp) + logger.debug(f"[{partition_id}] (lo,hi)={(lo, hi)}") if hi > lo: # hi == low means there is nothing in the partition to consume. # If the partition is not empty, retrieve the committed offset, # if any, to use it at lo offset. committed = client.consumer.committed([tp])[0] + logger.debug( + f"[{partition_id}] committed offset: {committed.offset}" + ) lo = max(lo, committed.offset) if hi > lo: # do only process the partition is there are actually new @@ -212,6 +219,7 @@ # offset is behind the high watermark). self.offsets[partition_id] = (lo, hi) + logger.debug(f"Fetching partition offsets using {self.processes} processes") with concurrent.futures.ThreadPoolExecutor( max_workers=self.processes ) as executor: @@ -415,7 +423,7 @@ try: exporter.process_object(object_type, obj) except Exception: - logging.exception( + logger.exception( "Exporter %s: error while exporting the object: %s", exporter.__class__.__name__, str(obj),