Changeset View
Standalone View
swh/storage/storage.py
Show All 17 Lines | |||||
from swh.model.hashutil import ALGORITHMS | from swh.model.hashutil import ALGORITHMS | ||||
from swh.objstorage import get_objstorage | from swh.objstorage import get_objstorage | ||||
from swh.objstorage.exc import ObjNotFoundError | from swh.objstorage.exc import ObjNotFoundError | ||||
# Max block size of contents to return | # Max block size of contents to return | ||||
BULK_BLOCK_CONTENT_LEN_MAX = 10000 | BULK_BLOCK_CONTENT_LEN_MAX = 10000 | ||||
CONTENT_HASH_KEYS = ['sha1', 'sha1_git', 'sha256', 'blake2s256'] | |||||
class Storage(): | class Storage(): | ||||
"""SWH storage proxy, encompassing DB and object storage | """SWH storage proxy, encompassing DB and object storage | ||||
""" | """ | ||||
def __init__(self, db, objstorage): | def __init__(self, db, objstorage): | ||||
""" | """ | ||||
Args: | Args: | ||||
▲ Show 20 Lines • Show All 49 Lines • ▼ Show 20 Lines | def content_add(self, content): | ||||
checksum | checksum | ||||
- status (str): one of visible, hidden, absent | - status (str): one of visible, hidden, absent | ||||
- reason (str): if status = absent, the reason why | - reason (str): if status = absent, the reason why | ||||
- origin (int): if status = absent, the origin we saw the | - origin (int): if status = absent, the origin we saw the | ||||
content in | content in | ||||
""" | """ | ||||
db = self.db | db = self.db | ||||
def _unique_key(hash, keys=CONTENT_HASH_KEYS): | |||||
"""Given a hash (tuple or dict), return a unique key from the | |||||
aggregation of keys. | |||||
""" | |||||
if isinstance(hash, tuple): | |||||
return hash | |||||
return tuple([hash[k] for k in keys]) | |||||
content_by_status = defaultdict(list) | content_by_status = defaultdict(list) | ||||
for d in content: | for d in content: | ||||
olasd: This won't work if one of the values is `None`, which might happen (the git loader currently… | |||||
Done Inline Actions
Oh, right!
Yes, indeed, adding the case in test make it break!
Right. I did not know we could use tuple in set, awesome thanks. That's way clearer. And \o/ This also had the fortunate effect to show a reverted bug fix in my sql. (cf. swh_skipped_content_add function...) ardumont: > This won't work if one of the values is None, which might happen (the git loader currently… | |||||
if 'status' not in d: | if 'status' not in d: | ||||
d['status'] = 'visible' | d['status'] = 'visible' | ||||
if 'length' not in d: | if 'length' not in d: | ||||
d['length'] = -1 | d['length'] = -1 | ||||
content_by_status[d['status']].append(d) | content_by_status[d['status']].append(d) | ||||
content_with_data = content_by_status['visible'] | content_with_data = content_by_status['visible'] | ||||
content_without_data = content_by_status['absent'] | content_without_data = content_by_status['absent'] | ||||
missing_content = set(self.content_missing(content_with_data)) | missing_content = set(self.content_missing(content_with_data)) | ||||
missing_skipped = set( | missing_skipped = set(_unique_key(hashes) for hashes | ||||
Done Inline ActionsThis whole logic seems buggy. Why would sha1_git be "more important" than other checksums here? olasd: This whole logic seems buggy. Why would sha1_git be "more important" than other checksums here? | |||||
Done Inline ActionsI don't know the initial reason. I assumed it was to be able to use sets. I kept that set logic but use an aggregate key of all hashes (in a specific order). ardumont: I don't know the initial reason. I assumed it was to be able to use sets.
I kept that set… | |||||
sha1_git for sha1, sha1_git, sha256 | in self.skipped_content_missing( | ||||
in self.skipped_content_missing(content_without_data)) | content_without_data)) | ||||
with db.transaction() as cur: | with db.transaction() as cur: | ||||
if missing_content: | if missing_content: | ||||
# create temporary table for metadata injection | # create temporary table for metadata injection | ||||
db.mktemp('content', cur) | db.mktemp('content', cur) | ||||
def add_to_objstorage(cont): | def add_to_objstorage(cont): | ||||
self.objstorage.add(cont['data'], | self.objstorage.add(cont['data'], | ||||
obj_id=cont['sha1']) | obj_id=cont['sha1']) | ||||
content_filtered = (cont for cont in content_with_data | content_filtered = (cont for cont in content_with_data | ||||
if cont['sha1'] in missing_content) | if cont['sha1'] in missing_content) | ||||
db.copy_to(content_filtered, 'tmp_content', | db.copy_to(content_filtered, 'tmp_content', | ||||
['sha1', 'sha1_git', 'sha256', 'length', 'status'], | db.content_get_metadata_keys, | ||||
cur, item_cb=add_to_objstorage) | cur, item_cb=add_to_objstorage) | ||||
Done Inline ActionsThis should get factored out in db. (db.Db.content_keys?) olasd: This should get factored out in db. (`db.Db.content_keys`?) | |||||
Done Inline Actionsdb.Db.content_get_metadata_keys already holds the same list. ardumont: db.Db.content_get_metadata_keys already holds the same list. | |||||
# move metadata in place | # move metadata in place | ||||
db.content_add_from_temp(cur) | db.content_add_from_temp(cur) | ||||
if missing_skipped: | if missing_skipped: | ||||
missing_filtered = (cont for cont in content_without_data | missing_filtered = [cont for cont in content_without_data | ||||
if cont['sha1_git'] in missing_skipped) | if _unique_key(cont) in missing_skipped] | ||||
db.mktemp('skipped_content', cur) | db.mktemp('skipped_content', cur) | ||||
db.copy_to(missing_filtered, 'tmp_skipped_content', | db.copy_to(missing_filtered, 'tmp_skipped_content', | ||||
['sha1', 'sha1_git', 'sha256', 'length', | db.skipped_content_keys, cur) | ||||
'reason', 'status', 'origin'], cur) | |||||
Done Inline ActionsSame here (db.Db.skipped_content_keys?) olasd: Same here (`db.Db.skipped_content_keys`?) | |||||
Done Inline Actionsright. ardumont: right. | |||||
# move metadata in place | # move metadata in place | ||||
db.skipped_content_add_from_temp(cur) | db.skipped_content_add_from_temp(cur) | ||||
@db_transaction | @db_transaction | ||||
def content_update(self, content, keys=[], cur=None): | def content_update(self, content, keys=[], cur=None): | ||||
"""Update content blobs to the storage. Does nothing for unknown | """Update content blobs to the storage. Does nothing for unknown | ||||
contents or skipped ones. | contents or skipped ones. | ||||
▲ Show 20 Lines • Show All 84 Lines • ▼ Show 20 Lines | def content_missing(self, content, key_hash='sha1', cur=None): | ||||
an iterable of `key_hash`es missing from the storage | an iterable of `key_hash`es missing from the storage | ||||
Raises: | Raises: | ||||
TODO: an exception when we get a hash collision. | TODO: an exception when we get a hash collision. | ||||
""" | """ | ||||
db = self.db | db = self.db | ||||
keys = ['sha1', 'sha1_git', 'sha256'] | keys = CONTENT_HASH_KEYS | ||||
if key_hash not in keys: | if key_hash not in CONTENT_HASH_KEYS: | ||||
raise ValueError("key_hash should be one of %s" % keys) | raise ValueError("key_hash should be one of %s" % keys) | ||||
key_hash_idx = keys.index(key_hash) | key_hash_idx = keys.index(key_hash) | ||||
# Create temporary table for metadata injection | # Create temporary table for metadata injection | ||||
db.mktemp('content', cur) | db.mktemp('content', cur) | ||||
db.copy_to(content, 'tmp_content', keys + ['length'], cur) | db.copy_to(content, 'tmp_content', keys + ['length'], cur) | ||||
Show All 27 Lines | def skipped_content_missing(self, content, cur=None): | ||||
Args: | Args: | ||||
content: iterable of dictionaries containing the data for each | content: iterable of dictionaries containing the data for each | ||||
checksum algorithm. | checksum algorithm. | ||||
Returns: | Returns: | ||||
an iterable of signatures missing from the storage | an iterable of signatures missing from the storage | ||||
""" | """ | ||||
keys = ['sha1', 'sha1_git', 'sha256'] | keys = CONTENT_HASH_KEYS | ||||
db = self.db | db = self.db | ||||
db.mktemp('skipped_content', cur) | db.mktemp('skipped_content', cur) | ||||
db.copy_to(content, 'tmp_skipped_content', | db.copy_to(content, 'tmp_skipped_content', | ||||
keys + ['length', 'reason'], cur) | keys + ['length', 'reason'], cur) | ||||
yield from db.skipped_content_missing_from_temp(cur) | yield from db.skipped_content_missing_from_temp(cur) | ||||
Show All 15 Lines | def content_find(self, content, cur=None): | ||||
ValueError in case the key of the dictionary is not sha1, sha1_git | ValueError in case the key of the dictionary is not sha1, sha1_git | ||||
nor sha256. | nor sha256. | ||||
""" | """ | ||||
db = self.db | db = self.db | ||||
if not set(content).intersection(ALGORITHMS): | if not set(content).intersection(ALGORITHMS): | ||||
raise ValueError('content keys must contain at least one of: ' | raise ValueError('content keys must contain at least one of: ' | ||||
'sha1, sha1_git, sha256') | 'sha1, sha1_git, sha256, blake2s256') | ||||
c = db.content_find(sha1=content.get('sha1'), | c = db.content_find(sha1=content.get('sha1'), | ||||
sha1_git=content.get('sha1_git'), | sha1_git=content.get('sha1_git'), | ||||
sha256=content.get('sha256'), | sha256=content.get('sha256'), | ||||
blake2s256=content.get('blake2s256'), | |||||
cur=cur) | cur=cur) | ||||
if c: | if c: | ||||
keys = ['sha1', 'sha1_git', 'sha256', 'length', 'ctime', 'status'] | return dict(zip(db.content_find_cols, c)) | ||||
return dict(zip(keys, c)) | |||||
return None | return None | ||||
@db_transaction_generator | @db_transaction_generator | ||||
def content_find_provenance(self, content, cur=None): | def content_find_provenance(self, content, cur=None): | ||||
"""Find content's provenance information. | """Find content's provenance information. | ||||
Args: | Args: | ||||
content: a dictionary entry representing one content hash. | content: a dictionary entry representing one content hash. | ||||
▲ Show 20 Lines • Show All 1,273 Lines • Show Last 20 Lines |
This won't work if one of the values is None, which might happen (the git loader currently computes all checksums for contents that are too big, but nothing imposes it). That constraint should also be reflected in tests.
It should work if you make the "unique key" a tuple of values (return tuple(values) instead of b''.join(values)).