Changeset View
Changeset View
Standalone View
Standalone View
swh/journal/replay.py
Show All 25 Lines | |||||
from swh.model.model import ( | from swh.model.model import ( | ||||
BaseContent, BaseModel, Content, Directory, Origin, OriginVisit, Revision, | BaseContent, BaseModel, Content, Directory, Origin, OriginVisit, Revision, | ||||
SHA1_SIZE, SkippedContent, Snapshot, Release | SHA1_SIZE, SkippedContent, Snapshot, Release | ||||
) | ) | ||||
from swh.objstorage.objstorage import ( | from swh.objstorage.objstorage import ( | ||||
ID_HASH_ALGO, ObjNotFoundError, ObjStorage, | ID_HASH_ALGO, ObjNotFoundError, ObjStorage, | ||||
) | ) | ||||
from swh.storage import HashCollision | from swh.storage.exc import HashCollision | ||||
logger = logging.getLogger(__name__) | logger = logging.getLogger(__name__) | ||||
GRAPH_OPERATIONS_METRIC = "swh_graph_replayer_operations_total" | GRAPH_OPERATIONS_METRIC = "swh_graph_replayer_operations_total" | ||||
GRAPH_DURATION_METRIC = "swh_graph_replayer_duration_seconds" | GRAPH_DURATION_METRIC = "swh_graph_replayer_duration_seconds" | ||||
CONTENT_OPERATIONS_METRIC = "swh_content_replayer_operations_total" | CONTENT_OPERATIONS_METRIC = "swh_content_replayer_operations_total" | ||||
CONTENT_RETRY_METRIC = "swh_content_replayer_retries_total" | CONTENT_RETRY_METRIC = "swh_content_replayer_retries_total" | ||||
CONTENT_BYTES_METRIC = "swh_content_replayer_bytes" | CONTENT_BYTES_METRIC = "swh_content_replayer_bytes" | ||||
Show All 37 Lines | def collision_aware_content_add( | ||||
""" | """ | ||||
if not contents: | if not contents: | ||||
return | return | ||||
colliding_content_hashes: List[Dict[str, Any]] = [] | colliding_content_hashes: List[Dict[str, Any]] = [] | ||||
while True: | while True: | ||||
try: | try: | ||||
content_add_fn(contents) | content_add_fn(contents) | ||||
except HashCollision as e: | except HashCollision as e: | ||||
algo, hash_id, colliding_hashes = e.args | |||||
hash_id = hash_to_hex(hash_id) | |||||
colliding_content_hashes.append({ | colliding_content_hashes.append({ | ||||
'algo': algo, | 'algo': e.algo, | ||||
'hash': hash_to_hex(hash_id), | 'hash': e.hash_id, # hex hash id | ||||
'objects': [{k: hash_to_hex(v) for k, v in collision.items()} | 'objects': e.colliding_contents # hex hashes | ||||
for collision in colliding_hashes] | |||||
}) | }) | ||||
colliding_hashes = e.colliding_content_hashes() | |||||
# Drop the colliding contents from the transaction | # Drop the colliding contents from the transaction | ||||
contents = [c for c in contents | contents = [c for c in contents | ||||
if c.hashes() not in colliding_hashes] | if c.hashes() not in colliding_hashes] | ||||
else: | else: | ||||
# Successfully added contents, we are done | # Successfully added contents, we are done | ||||
break | break | ||||
if colliding_content_hashes: | if colliding_content_hashes: | ||||
for collision in colliding_content_hashes: | for collision in colliding_content_hashes: | ||||
▲ Show 20 Lines • Show All 292 Lines • Show Last 20 Lines |