Page MenuHomeSoftware Heritage

D1825.id6152.diff
No OneTemporary

D1825.id6152.diff

diff --git a/swh/journal/cli.py b/swh/journal/cli.py
--- a/swh/journal/cli.py
+++ b/swh/journal/cli.py
@@ -149,9 +149,6 @@
@click.option('--max-messages', '-m', default=None, type=int,
help='Maximum number of objects to replay. Default is to '
'run forever.')
-@click.option('--concurrency', type=int,
- default=8,
- help='Concurrentcy level.')
@click.option('--broker', 'brokers', type=str, multiple=True,
help='Kafka broker to connect to.'
'(deprecated, use the config file instead)')
@@ -164,7 +161,7 @@
@click.option('--exclude-sha1-file', default=None, type=click.File('rb'),
help='File containing a sorted array of hashes to be excluded.')
@click.pass_context
-def content_replay(ctx, max_messages, concurrency,
+def content_replay(ctx, max_messages,
brokers, prefix, group_id, exclude_sha1_file):
"""Fill a destination Object Storage (typically a mirror) by reading a Journal
and retrieving objects from an existing source ObjStorage.
@@ -214,7 +211,6 @@
worker_fn = functools.partial(process_replay_objects_content,
src=objstorage_src,
dst=objstorage_dst,
- concurrency=concurrency,
exclude_fn=exclude_fn)
try:
diff --git a/swh/journal/replay.py b/swh/journal/replay.py
--- a/swh/journal/replay.py
+++ b/swh/journal/replay.py
@@ -5,7 +5,6 @@
from time import time
import logging
-from concurrent.futures import ThreadPoolExecutor
from swh.core.statsd import statsd
from swh.model.identifiers import normalize_timestamp
@@ -259,11 +258,12 @@
len(obj))
except Exception:
obj = ''
- logger.exception('Failed to copy %s', hash_to_hex(obj_id))
+ logger.error('Failed to copy %s', hash_to_hex(obj_id))
+ raise
return len(obj)
-def process_replay_objects_content(all_objects, *, src, dst, concurrency=8,
+def process_replay_objects_content(all_objects, *, src, dst,
exclude_fn=None):
"""
Takes a list of records from Kafka (see
@@ -312,26 +312,26 @@
vol = []
nb_skipped = 0
t0 = time()
- with ThreadPoolExecutor(max_workers=concurrency) as executor:
- for (object_type, objects) in all_objects.items():
- if object_type != 'content':
- logger.warning(
- 'Received a series of %s, this should not happen',
- object_type)
- continue
- for obj in objects:
- obj_id = obj[ID_HASH_ALGO]
- if obj['status'] != 'visible':
- nb_skipped += 1
- logger.debug('skipped %s (status=%s)',
- hash_to_hex(obj_id), obj['status'])
- elif exclude_fn and exclude_fn(obj):
- nb_skipped += 1
- logger.debug('skipped %s (manually excluded)',
- hash_to_hex(obj_id))
- else:
- fut = executor.submit(copy_object, obj_id, src, dst)
- fut.add_done_callback(lambda fn: vol.append(fn.result()))
+
+ for (object_type, objects) in all_objects.items():
+ if object_type != 'content':
+ logger.warning(
+ 'Received a series of %s, this should not happen',
+ object_type)
+ continue
+ for obj in objects:
+ obj_id = obj[ID_HASH_ALGO]
+ if obj['status'] != 'visible':
+ nb_skipped += 1
+ logger.debug('skipped %s (status=%s)',
+ hash_to_hex(obj_id), obj['status'])
+ elif exclude_fn and exclude_fn(obj):
+ nb_skipped += 1
+ logger.debug('skipped %s (manually excluded)',
+ hash_to_hex(obj_id))
+ else:
+ vol.append(copy_object(obj_id, src, dst))
+
dt = time() - t0
logger.info(
'processed %s content objects in %.1fsec '

File Metadata

Mime Type
text/plain
Expires
Wed, Sep 17, 4:54 PM (1 m, 30 s)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3221616

Event Timeline