Changeset View
Changeset View
Standalone View
Standalone View
swh/journal/replay.py
Show All 9 Lines | |||||
from swh.storage import HashCollision | from swh.storage import HashCollision | ||||
from swh.model.hashutil import hash_to_hex | from swh.model.hashutil import hash_to_hex | ||||
from swh.objstorage.objstorage import ID_HASH_ALGO | from swh.objstorage.objstorage import ID_HASH_ALGO | ||||
from swh.core.statsd import statsd | from swh.core.statsd import statsd | ||||
logger = logging.getLogger(__name__) | logger = logging.getLogger(__name__) | ||||
SHA1_SIZE = 20 | |||||
def process_replay_objects(all_objects, *, storage): | def process_replay_objects(all_objects, *, storage): | ||||
for (object_type, objects) in all_objects.items(): | for (object_type, objects) in all_objects.items(): | ||||
_insert_objects(object_type, objects, storage) | _insert_objects(object_type, objects, storage) | ||||
def _insert_objects(object_type, objects, storage): | def _insert_objects(object_type, objects, storage): | ||||
if object_type == 'content': | if object_type == 'content': | ||||
# TODO: insert 'content' in batches | # TODO: insert 'content' in batches | ||||
Show All 24 Lines | elif object_type == 'origin_visit': | ||||
visit['type'] = visit['origin']['type'] | visit['type'] = visit['origin']['type'] | ||||
storage.origin_visit_upsert(objects) | storage.origin_visit_upsert(objects) | ||||
else: | else: | ||||
logger.warning('Received a series of %s, this should not happen', | logger.warning('Received a series of %s, this should not happen', | ||||
object_type) | object_type) | ||||
def is_hash_in_bytearray(hash_, array, nb_hashes, hash_size=SHA1_SIZE): | |||||
""" | |||||
Checks if the given hash is in the provided `array`. The array must be | |||||
a *sorted* list of sha1 hashes, and contain `nb_hashes` hashes | |||||
(so its size must by `nb_hashes*hash_size` bytes). | |||||
Args: | |||||
hash_ (bytes): the hash to look for | |||||
array (bytes): a sorted concatenated array of hashes (may be of | |||||
any type supporting slice indexing, eg. :py:cls:`mmap.mmap`) | |||||
nb_hashes (int): number of hashes in the array | |||||
hash_size (int): size of a hash (defaults to 20, for SHA1) | |||||
Example: | |||||
>>> import os | |||||
>>> hash1 = os.urandom(20) | |||||
>>> hash2 = os.urandom(20) | |||||
>>> hash3 = os.urandom(20) | |||||
>>> array = b''.join(sorted([hash1, hash2])) | |||||
>>> is_hash_in_bytearray(hash1, array, 2) | |||||
True | |||||
>>> is_hash_in_bytearray(hash2, array, 2) | |||||
True | |||||
>>> is_hash_in_bytearray(hash3, array, 2) | |||||
False | |||||
""" | |||||
if len(hash_) != hash_size: | |||||
raise ValueError('hash_ does not match the provided hash_size.') | |||||
def get_hash(position): | |||||
return array[position*hash_size:(position+1)*hash_size] | |||||
# Regular dichotomy: | |||||
left = 0 | |||||
right = nb_hashes | |||||
while left < right-1: | |||||
middle = int((right+left)/2) | |||||
pivot = get_hash(middle) | |||||
if pivot == hash_: | |||||
return True | |||||
elif pivot < hash_: | |||||
left = middle | |||||
else: | |||||
right = middle | |||||
return get_hash(left) == hash_ | |||||
def copy_object(obj_id, src, dst): | def copy_object(obj_id, src, dst): | ||||
statsd_name = 'swh_journal_content_replayer_%s_duration_seconds' | statsd_name = 'swh_journal_content_replayer_%s_duration_seconds' | ||||
try: | try: | ||||
with statsd.timed(statsd_name % 'get'): | with statsd.timed(statsd_name % 'get'): | ||||
obj = src.get(obj_id) | obj = src.get(obj_id) | ||||
with statsd.timed(statsd_name % 'put'): | with statsd.timed(statsd_name % 'put'): | ||||
dst.add(obj, obj_id=obj_id, check_presence=False) | dst.add(obj, obj_id=obj_id, check_presence=False) | ||||
logger.debug('copied %s', hash_to_hex(obj_id)) | logger.debug('copied %s', hash_to_hex(obj_id)) | ||||
Show All 34 Lines |