diff --git a/swh/dataset/journalprocessor.py b/swh/dataset/journalprocessor.py --- a/swh/dataset/journalprocessor.py +++ b/swh/dataset/journalprocessor.py @@ -22,6 +22,8 @@ from swh.journal.serializers import kafka_to_value from swh.storage.fixer import fix_objects +logger = logging.getLogger(__name__) + class JournalClientOffsetRanges(JournalClient): """ @@ -62,7 +64,7 @@ def subscribe(self): self.topic_name = self.subscription[0] time.sleep(0.1) # https://github.com/edenhill/librdkafka/issues/1983 - logging.debug("Changing assignment to %s", str(self.assignment)) + logger.debug("Changing assignment to %s", str(self.assignment)) self.consumer.assign( [TopicPartition(self.topic_name, pid) for pid in self.assignment] ) @@ -198,13 +200,18 @@ # topic/partition available for consumption + 1 def fetch_insert_partition_id(partition_id): + logger.debug("Fetching offset for partition %s", partition_id) tp = TopicPartition(topic_name, partition_id) (lo, hi) = client.consumer.get_watermark_offsets(tp) + logger.debug("[%s] (lo,hi)=(%s, %s)", partition_id, lo, hi) if hi > lo: # hi == low means there is nothing in the partition to consume. # If the partition is not empty, retrieve the committed offset, # if any, to use it at lo offset. committed = client.consumer.committed([tp])[0] + logger.debug( + "[%s] committed offset: %s", partition_id, committed.offset + ) lo = max(lo, committed.offset) if hi > lo: # do only process the partition is there are actually new @@ -212,6 +219,9 @@ # offset is behind the high watermark). self.offsets[partition_id] = (lo, hi) + logger.debug( + "Fetching partition offsets using %s processes", self.processes + ) with concurrent.futures.ThreadPoolExecutor( max_workers=self.processes ) as executor: @@ -415,7 +425,7 @@ try: exporter.process_object(object_type, obj) except Exception: - logging.exception( + logger.exception( "Exporter %s: error while exporting the object: %s", exporter.__class__.__name__, str(obj),