Changeset View
Changeset View
Standalone View
Standalone View
swh/storage/storage.py
Show First 20 Lines • Show All 122 Lines • ▼ Show 20 Lines | def _content_unique_key(self, hash, db): | ||||
""" | """ | ||||
keys = db.content_hash_keys | keys = db.content_hash_keys | ||||
if isinstance(hash, tuple): | if isinstance(hash, tuple): | ||||
return hash | return hash | ||||
return tuple([hash[k] for k in keys]) | return tuple([hash[k] for k in keys]) | ||||
@staticmethod | @staticmethod | ||||
def _normalize_content(d): | def _content_normalize(d): | ||||
d = d.copy() | d = d.copy() | ||||
if 'status' not in d: | if 'status' not in d: | ||||
d['status'] = 'visible' | d['status'] = 'visible' | ||||
if 'length' not in d: | |||||
d['length'] = -1 | |||||
return d | return d | ||||
@staticmethod | @staticmethod | ||||
def _validate_content(d): | def _content_validate(d): | ||||
"""Sanity checks on status / reason / length, that postgresql | """Sanity checks on status / reason / length, that postgresql | ||||
doesn't enforce.""" | doesn't enforce.""" | ||||
if d['status'] not in ('visible', 'absent', 'hidden'): | if d['status'] not in ('visible', 'hidden'): | ||||
raise ValueError('Invalid content status: {}'.format(d['status'])) | raise ValueError('Invalid content status: {}'.format(d['status'])) | ||||
if d['status'] != 'absent' and d.get('reason') is not None: | if d.get('reason') is not None: | ||||
raise ValueError( | raise ValueError( | ||||
'Must not provide a reason if content is not absent.') | 'Must not provide a reason if content is present.') | ||||
if d['length'] < -1: | |||||
raise ValueError('Content length must be positive or -1.') | |||||
def _filter_new_content(self, content, db=None, cur=None): | if d['length'] is None or d['length'] < 0: | ||||
"""Sort contents into buckets 'with data' and 'without data', | raise ValueError('Content length must be positive.') | ||||
and filter out those already in the database.""" | |||||
content_by_status = defaultdict(list) | |||||
for d in content: | |||||
content_by_status[d['status']].append(d) | |||||
content_with_data = content_by_status['visible'] \ | |||||
+ content_by_status['hidden'] | |||||
content_without_data = content_by_status['absent'] | |||||
missing_content = set(self.content_missing(content_with_data, | def _content_add_metadata(self, db, cur, content): | ||||
db=db, cur=cur)) | |||||
missing_skipped = set(self._content_unique_key(hashes, db) | |||||
for hashes in self.skipped_content_missing( | |||||
content_without_data, db=db, cur=cur)) | |||||
content_with_data = [ | |||||
cont for cont in content_with_data | |||||
if cont['sha1'] in missing_content] | |||||
content_without_data = [ | |||||
cont for cont in content_without_data | |||||
if self._content_unique_key(cont, db) in missing_skipped] | |||||
summary = { | |||||
'content:add': len(missing_content), | |||||
'skipped_content:add': len(missing_skipped), | |||||
} | |||||
return (content_with_data, content_without_data, summary) | |||||
def _content_add_metadata(self, db, cur, | |||||
content_with_data, content_without_data): | |||||
"""Add content to the postgresql database but not the object storage. | """Add content to the postgresql database but not the object storage. | ||||
""" | """ | ||||
if content_with_data: | |||||
# create temporary table for metadata injection | # create temporary table for metadata injection | ||||
db.mktemp('content', cur) | db.mktemp('content', cur) | ||||
db.copy_to(content_with_data, 'tmp_content', | db.copy_to(content, 'tmp_content', | ||||
db.content_add_keys, cur) | db.content_add_keys, cur) | ||||
# move metadata in place | # move metadata in place | ||||
try: | try: | ||||
db.content_add_from_temp(cur) | db.content_add_from_temp(cur) | ||||
except psycopg2.IntegrityError as e: | except psycopg2.IntegrityError as e: | ||||
from . import HashCollision | from . import HashCollision | ||||
if e.diag.sqlstate == '23505' and \ | if e.diag.sqlstate == '23505' and \ | ||||
e.diag.table_name == 'content': | e.diag.table_name == 'content': | ||||
constraint_to_hash_name = { | constraint_to_hash_name = { | ||||
'content_pkey': 'sha1', | 'content_pkey': 'sha1', | ||||
'content_sha1_git_idx': 'sha1_git', | 'content_sha1_git_idx': 'sha1_git', | ||||
'content_sha256_idx': 'sha256', | 'content_sha256_idx': 'sha256', | ||||
} | } | ||||
colliding_hash_name = constraint_to_hash_name \ | colliding_hash_name = constraint_to_hash_name \ | ||||
.get(e.diag.constraint_name) | .get(e.diag.constraint_name) | ||||
raise HashCollision(colliding_hash_name) from None | raise HashCollision(colliding_hash_name) from None | ||||
else: | else: | ||||
raise | raise | ||||
if content_without_data: | |||||
content_without_data = \ | |||||
[cont.copy() for cont in content_without_data] | |||||
origin_ids = db.origin_id_get_by_url( | |||||
[cont.get('origin') for cont in content_without_data], | |||||
cur=cur) | |||||
for (cont, origin_id) in zip(content_without_data, origin_ids): | |||||
if 'origin' in cont: | |||||
cont['origin'] = origin_id | |||||
db.mktemp('skipped_content', cur) | |||||
db.copy_to(content_without_data, 'tmp_skipped_content', | |||||
db.skipped_content_keys, cur) | |||||
# move metadata in place | |||||
db.skipped_content_add_from_temp(cur) | |||||
@timed | @timed | ||||
@process_metrics | @process_metrics | ||||
@db_transaction() | @db_transaction() | ||||
def content_add(self, content, db=None, cur=None): | def content_add(self, content, db=None, cur=None): | ||||
content = [dict(c.items()) for c in content] # semi-shallow copy | content = [dict(c.items()) for c in content] # semi-shallow copy | ||||
now = datetime.datetime.now(tz=datetime.timezone.utc) | now = datetime.datetime.now(tz=datetime.timezone.utc) | ||||
for item in content: | for item in content: | ||||
item['ctime'] = now | item['ctime'] = now | ||||
content = [self._normalize_content(c) for c in content] | content = [self._content_normalize(c) for c in content] | ||||
for c in content: | for c in content: | ||||
self._validate_content(c) | self._content_validate(c) | ||||
(content_with_data, content_without_data, summary) = \ | missing = list(self.content_missing(content, key_hash='sha1_git')) | ||||
self._filter_new_content(content, db, cur) | content = [c for c in content if c['sha1_git'] in missing] | ||||
if self.journal_writer: | if self.journal_writer: | ||||
for item in content_with_data: | for item in content: | ||||
if 'data' in item: | if 'data' in item: | ||||
item = item.copy() | item = item.copy() | ||||
del item['data'] | del item['data'] | ||||
self.journal_writer.write_addition('content', item) | self.journal_writer.write_addition('content', item) | ||||
for item in content_without_data: | |||||
self.journal_writer.write_addition('content', item) | |||||
def add_to_objstorage(): | def add_to_objstorage(): | ||||
"""Add to objstorage the new missing_content | """Add to objstorage the new missing_content | ||||
Returns: | Returns: | ||||
Sum of all the content's data length pushed to the | Sum of all the content's data length pushed to the | ||||
objstorage. Content present twice is only sent once. | objstorage. Content present twice is only sent once. | ||||
""" | """ | ||||
content_bytes_added = 0 | content_bytes_added = 0 | ||||
data = {} | data = {} | ||||
for cont in content_with_data: | for cont in content: | ||||
if cont['sha1'] not in data: | if cont['sha1'] not in data: | ||||
data[cont['sha1']] = cont['data'] | data[cont['sha1']] = cont['data'] | ||||
content_bytes_added += max(0, cont['length']) | content_bytes_added += max(0, cont['length']) | ||||
# FIXME: Since we do the filtering anyway now, we might as | # FIXME: Since we do the filtering anyway now, we might as | ||||
# well make the objstorage's add_batch call return what we | # well make the objstorage's add_batch call return what we | ||||
# want here (real bytes added)... that'd simplify this... | # want here (real bytes added)... that'd simplify this... | ||||
self.objstorage.add_batch(data) | self.objstorage.add_batch(data) | ||||
return content_bytes_added | return content_bytes_added | ||||
with ThreadPoolExecutor(max_workers=1) as executor: | with ThreadPoolExecutor(max_workers=1) as executor: | ||||
added_to_objstorage = executor.submit(add_to_objstorage) | added_to_objstorage = executor.submit(add_to_objstorage) | ||||
self._content_add_metadata( | self._content_add_metadata(db, cur, content) | ||||
db, cur, content_with_data, content_without_data) | |||||
# Wait for objstorage addition before returning from the | # Wait for objstorage addition before returning from the | ||||
# transaction, bubbling up any exception | # transaction, bubbling up any exception | ||||
content_bytes_added = added_to_objstorage.result() | content_bytes_added = added_to_objstorage.result() | ||||
summary['content:add:bytes'] = content_bytes_added | return { | ||||
return summary | 'content:add': len(content), | ||||
'content:add:bytes': content_bytes_added, | |||||
} | |||||
@timed | @timed | ||||
@db_transaction() | @db_transaction() | ||||
def content_update(self, content, keys=[], db=None, cur=None): | def content_update(self, content, keys=[], db=None, cur=None): | ||||
# TODO: Add a check on input keys. How to properly implement | # TODO: Add a check on input keys. How to properly implement | ||||
# this? We don't know yet the new columns. | # this? We don't know yet the new columns. | ||||
if self.journal_writer: | if self.journal_writer: | ||||
raise NotImplementedError( | raise NotImplementedError( | ||||
'content_update is not yet supported with a journal_writer.') | 'content_update is not yet supported with a journal_writer.') | ||||
db.mktemp('content', cur) | db.mktemp('content', cur) | ||||
select_keys = list(set(db.content_get_metadata_keys).union(set(keys))) | select_keys = list(set(db.content_get_metadata_keys).union(set(keys))) | ||||
db.copy_to(content, 'tmp_content', select_keys, cur) | db.copy_to(content, 'tmp_content', select_keys, cur) | ||||
db.content_update_from_temp(keys_to_update=keys, | db.content_update_from_temp(keys_to_update=keys, | ||||
cur=cur) | cur=cur) | ||||
@timed | @timed | ||||
@process_metrics | @process_metrics | ||||
@db_transaction() | @db_transaction() | ||||
def content_add_metadata(self, content, db=None, cur=None): | def content_add_metadata(self, content, db=None, cur=None): | ||||
content = [self._normalize_content(c) for c in content] | content = [self._content_normalize(c) for c in content] | ||||
for c in content: | for c in content: | ||||
self._validate_content(c) | self._content_validate(c) | ||||
(content_with_data, content_without_data, summary) = \ | missing = self.content_missing(content, key_hash='sha1_git') | ||||
self._filter_new_content(content, db, cur) | content = [c for c in content if c['sha1_git'] in missing] | ||||
if self.journal_writer: | if self.journal_writer: | ||||
for item in itertools.chain(content_with_data, | for item in itertools.chain(content): | ||||
content_without_data): | |||||
assert 'data' not in content | assert 'data' not in content | ||||
self.journal_writer.write_addition('content', item) | self.journal_writer.write_addition('content', item) | ||||
self._content_add_metadata( | self._content_add_metadata(db, cur, content) | ||||
db, cur, content_with_data, content_without_data) | |||||
return summary | return { | ||||
'content:add': len(content), | |||||
} | |||||
@timed | @timed | ||||
def content_get(self, content): | def content_get(self, content): | ||||
# FIXME: Make this method support slicing the `data`. | # FIXME: Make this method support slicing the `data`. | ||||
if len(content) > BULK_BLOCK_CONTENT_LEN_MAX: | if len(content) > BULK_BLOCK_CONTENT_LEN_MAX: | ||||
raise ValueError( | raise ValueError( | ||||
"Send at maximum %s contents." % BULK_BLOCK_CONTENT_LEN_MAX) | "Send at maximum %s contents." % BULK_BLOCK_CONTENT_LEN_MAX) | ||||
▲ Show 20 Lines • Show All 83 Lines • ▼ Show 20 Lines | class Storage(): | ||||
@timed | @timed | ||||
@db_transaction_generator() | @db_transaction_generator() | ||||
def content_missing_per_sha1_git(self, contents, db=None, cur=None): | def content_missing_per_sha1_git(self, contents, db=None, cur=None): | ||||
for obj in db.content_missing_per_sha1_git(contents, cur): | for obj in db.content_missing_per_sha1_git(contents, cur): | ||||
yield obj[0] | yield obj[0] | ||||
@timed | @timed | ||||
@db_transaction_generator() | |||||
def skipped_content_missing(self, contents, db=None, cur=None): | |||||
for content in db.skipped_content_missing(contents, cur): | |||||
yield dict(zip(db.content_hash_keys, content)) | |||||
@timed | |||||
@db_transaction() | @db_transaction() | ||||
def content_find(self, content, db=None, cur=None): | def content_find(self, content, db=None, cur=None): | ||||
if not set(content).intersection(ALGORITHMS): | if not set(content).intersection(ALGORITHMS): | ||||
raise ValueError('content keys must contain at least one of: ' | raise ValueError('content keys must contain at least one of: ' | ||||
'sha1, sha1_git, sha256, blake2s256') | 'sha1, sha1_git, sha256, blake2s256') | ||||
contents = db.content_find(sha1=content.get('sha1'), | contents = db.content_find(sha1=content.get('sha1'), | ||||
sha1_git=content.get('sha1_git'), | sha1_git=content.get('sha1_git'), | ||||
sha256=content.get('sha256'), | sha256=content.get('sha256'), | ||||
blake2s256=content.get('blake2s256'), | blake2s256=content.get('blake2s256'), | ||||
cur=cur) | cur=cur) | ||||
return [dict(zip(db.content_find_cols, content)) | return [dict(zip(db.content_find_cols, content)) | ||||
for content in contents] | for content in contents] | ||||
@timed | @timed | ||||
@db_transaction() | @db_transaction() | ||||
def content_get_random(self, db=None, cur=None): | def content_get_random(self, db=None, cur=None): | ||||
return db.content_get_random(cur) | return db.content_get_random(cur) | ||||
@staticmethod | |||||
def _skipped_content_normalize(d): | |||||
d = d.copy() | |||||
if d.get('status') is None: | |||||
d['status'] = 'absent' | |||||
if d.get('length') is None: | |||||
d['length'] = -1 | |||||
return d | |||||
@staticmethod | |||||
def _skipped_content_validate(d): | |||||
"""Sanity checks on status / reason / length, that postgresql | |||||
doesn't enforce.""" | |||||
if d['status'] != 'absent': | |||||
raise ValueError('Invalid content status: {}'.format(d['status'])) | |||||
if d.get('reason') is None: | |||||
raise ValueError( | |||||
'Must provide a reason if content is absent.') | |||||
if d['length'] < -1: | |||||
raise ValueError('Content length must be positive or -1.') | |||||
def _skipped_content_add_metadata(self, db, cur, content): | |||||
content = \ | |||||
[cont.copy() for cont in content] | |||||
origin_ids = db.origin_id_get_by_url( | |||||
[cont.get('origin') for cont in content], | |||||
cur=cur) | |||||
for (cont, origin_id) in zip(content, origin_ids): | |||||
if 'origin' in cont: | |||||
cont['origin'] = origin_id | |||||
db.mktemp('skipped_content', cur) | |||||
db.copy_to(content, 'tmp_skipped_content', | |||||
db.skipped_content_keys, cur) | |||||
# move metadata in place | |||||
db.skipped_content_add_from_temp(cur) | |||||
@timed | |||||
@process_metrics | |||||
@db_transaction() | |||||
def skipped_content_add(self, content, db=None, cur=None): | |||||
content = [dict(c.items()) for c in content] # semi-shallow copy | |||||
now = datetime.datetime.now(tz=datetime.timezone.utc) | |||||
for item in content: | |||||
ardumont: can't we avoid one loop?
```
now = datetime.datetime.now(tz=datetime.timezone.utc)
content =… | |||||
Done Inline ActionsI find my code simpler. Python does not have to be purely functional :) vlorentz: I find my code simpler. Python does not have to be purely functional :) | |||||
Not Done Inline ActionsIt's more to avoid one extra iteration over the list... ardumont: It's more to avoid one extra iteration over the list...
but it's skipped content so it's… | |||||
Done Inline Actionsa list comprehension is a loop too... vlorentz: a list comprehension is a loop too... | |||||
item['ctime'] = now | |||||
content = [self._skipped_content_normalize(c) for c in content] | |||||
for c in content: | |||||
self._skipped_content_validate(c) | |||||
missing_contents = self.skipped_content_missing(content) | |||||
content = [c for c in content | |||||
if any(all(c.get(algo) == missing_content.get(algo) | |||||
for algo in ALGORITHMS) | |||||
for missing_content in missing_contents)] | |||||
if self.journal_writer: | |||||
for item in content: | |||||
self.journal_writer.write_addition('content', item) | |||||
self._skipped_content_add_metadata(db, cur, content) | |||||
return { | |||||
'skipped_content:add': len(content), | |||||
} | |||||
@timed | |||||
@db_transaction_generator() | |||||
def skipped_content_missing(self, contents, db=None, cur=None): | |||||
for content in db.skipped_content_missing(contents, cur): | |||||
yield dict(zip(db.content_hash_keys, content)) | |||||
@timed | @timed | ||||
@process_metrics | @process_metrics | ||||
@db_transaction() | @db_transaction() | ||||
def directory_add(self, directories, db=None, cur=None): | def directory_add(self, directories, db=None, cur=None): | ||||
directories = list(directories) | directories = list(directories) | ||||
summary = {'directory:add': 0} | summary = {'directory:add': 0} | ||||
dirs = set() | dirs = set() | ||||
▲ Show 20 Lines • Show All 703 Lines • Show Last 20 Lines |
can't we avoid one loop?