Changeset View
Changeset View
Standalone View
Standalone View
swh/storage/storage.py
Show First 20 Lines • Show All 148 Lines • ▼ Show 20 Lines | def content_add(self, content): | ||||
if missing_content: | if missing_content: | ||||
# create temporary table for metadata injection | # create temporary table for metadata injection | ||||
db.mktemp('content', cur) | db.mktemp('content', cur) | ||||
content_filtered = (cont for cont in content_with_data | content_filtered = (cont for cont in content_with_data | ||||
if cont['sha1'] in missing_content) | if cont['sha1'] in missing_content) | ||||
db.copy_to(content_filtered, 'tmp_content', | db.copy_to(content_filtered, 'tmp_content', | ||||
db.content_get_metadata_keys, cur) | db.content_get_metadata_keys, cur=cur) | ||||
# move metadata in place | # move metadata in place | ||||
try: | try: | ||||
db.content_add_from_temp(cur) | db.content_add_from_temp(cur) | ||||
except psycopg2.IntegrityError as e: | except psycopg2.IntegrityError as e: | ||||
from . import HashCollision | from . import HashCollision | ||||
if e.diag.sqlstate == '23505' and \ | if e.diag.sqlstate == '23505' and \ | ||||
e.diag.table_name == 'content': | e.diag.table_name == 'content': | ||||
Show All 11 Lines | def content_add(self, content): | ||||
if missing_skipped: | if missing_skipped: | ||||
missing_filtered = ( | missing_filtered = ( | ||||
cont for cont in content_without_data | cont for cont in content_without_data | ||||
if _unique_key(cont) in missing_skipped | if _unique_key(cont) in missing_skipped | ||||
) | ) | ||||
db.mktemp('skipped_content', cur) | db.mktemp('skipped_content', cur) | ||||
db.copy_to(missing_filtered, 'tmp_skipped_content', | db.copy_to(missing_filtered, 'tmp_skipped_content', | ||||
db.skipped_content_keys, cur) | db.skipped_content_keys, cur=cur) | ||||
# move metadata in place | # move metadata in place | ||||
db.skipped_content_add_from_temp(cur) | db.skipped_content_add_from_temp(cur) | ||||
# Wait for objstorage addition before returning from the | # Wait for objstorage addition before returning from the | ||||
# transaction, bubbling up any exception | # transaction, bubbling up any exception | ||||
added_to_objstorage.result() | added_to_objstorage.result() | ||||
Show All 18 Lines | def content_update(self, content, keys=[], db=None, cur=None): | ||||
new hash column | new hash column | ||||
""" | """ | ||||
# TODO: Add a check on input keys. How to properly implement | # TODO: Add a check on input keys. How to properly implement | ||||
# this? We don't know yet the new columns. | # this? We don't know yet the new columns. | ||||
db.mktemp('content', cur) | db.mktemp('content', cur) | ||||
select_keys = list(set(db.content_get_metadata_keys).union(set(keys))) | select_keys = list(set(db.content_get_metadata_keys).union(set(keys))) | ||||
db.copy_to(content, 'tmp_content', select_keys, cur) | db.copy_to(content, 'tmp_content', select_keys, cur=cur) | ||||
db.content_update_from_temp(keys_to_update=keys, | db.content_update_from_temp(keys_to_update=keys, | ||||
cur=cur) | cur=cur) | ||||
def content_get(self, content): | def content_get(self, content): | ||||
"""Retrieve in bulk contents and their data. | """Retrieve in bulk contents and their data. | ||||
This generator yields exactly as many items than provided sha1 | This generator yields exactly as many items than provided sha1 | ||||
identifiers, but callers should not assume this will always be true. | identifiers, but callers should not assume this will always be true. | ||||
▲ Show 20 Lines • Show All 144 Lines • ▼ Show 20 Lines | def skipped_content_missing(self, content, db=None, cur=None): | ||||
Returns: | Returns: | ||||
iterable: missing signatures | iterable: missing signatures | ||||
""" | """ | ||||
keys = db.content_hash_keys | keys = db.content_hash_keys | ||||
db.mktemp('skipped_content', cur) | db.mktemp('skipped_content', cur) | ||||
db.copy_to(content, 'tmp_skipped_content', | db.copy_to(content, 'tmp_skipped_content', | ||||
keys + ['length', 'reason'], cur) | keys + ['length', 'reason'], cur=cur) | ||||
yield from db.skipped_content_missing_from_temp(cur) | yield from db.skipped_content_missing_from_temp(cur) | ||||
@db_transaction() | @db_transaction() | ||||
def content_find(self, content, db=None, cur=None): | def content_find(self, content, db=None, cur=None): | ||||
"""Find a content hash in db. | """Find a content hash in db. | ||||
Args: | Args: | ||||
▲ Show 20 Lines • Show All 61 Lines • ▼ Show 20 Lines | def directory_add(self, directories): | ||||
if not dirs_missing: | if not dirs_missing: | ||||
return | return | ||||
db = self.get_db() | db = self.get_db() | ||||
with db.transaction() as cur: | with db.transaction() as cur: | ||||
# Copy directory ids | # Copy directory ids | ||||
dirs_missing_dict = ({'id': dir} for dir in dirs_missing) | dirs_missing_dict = ({'id': dir} for dir in dirs_missing) | ||||
db.mktemp('directory', cur) | db.mktemp('directory', cur) | ||||
db.copy_to(dirs_missing_dict, 'tmp_directory', ['id'], cur) | db.copy_to(dirs_missing_dict, 'tmp_directory', ['id'], cur=cur) | ||||
# Copy entries | # Copy entries | ||||
for entry_type, entry_list in dir_entries.items(): | for entry_type, entry_list in dir_entries.items(): | ||||
entries = itertools.chain.from_iterable( | entries = itertools.chain.from_iterable( | ||||
entries_for_dir | entries_for_dir | ||||
for dir_id, entries_for_dir | for dir_id, entries_for_dir | ||||
in entry_list.items() | in entry_list.items() | ||||
if dir_id in dirs_missing) | if dir_id in dirs_missing) | ||||
db.mktemp_dir_entry(entry_type) | db.mktemp_dir_entry(entry_type) | ||||
db.copy_to( | db.copy_to( | ||||
entries, | entries, | ||||
'tmp_directory_entry_%s' % entry_type, | 'tmp_directory_entry_%s' % entry_type, | ||||
['target', 'name', 'perms', 'dir_id'], | ['target', 'name', 'perms', 'dir_id'], | ||||
cur, | cur=cur, | ||||
) | ) | ||||
# Do the final copy | # Do the final copy | ||||
db.directory_add_from_temp(cur) | db.directory_add_from_temp(cur) | ||||
@db_transaction_generator() | @db_transaction_generator() | ||||
def directory_missing(self, directories, db=None, cur=None): | def directory_missing(self, directories, db=None, cur=None): | ||||
"""List directories missing from storage | """List directories missing from storage | ||||
▲ Show 20 Lines • Show All 93 Lines • ▼ Show 20 Lines | def revision_add(self, revisions): | ||||
revisions_filtered = ( | revisions_filtered = ( | ||||
converters.revision_to_db(revision) for revision in revisions | converters.revision_to_db(revision) for revision in revisions | ||||
if revision['id'] in revisions_missing) | if revision['id'] in revisions_missing) | ||||
parents_filtered = [] | parents_filtered = [] | ||||
db.copy_to( | db.copy_to( | ||||
revisions_filtered, 'tmp_revision', db.revision_add_cols, | revisions_filtered, 'tmp_revision', db.revision_add_cols, | ||||
cur, | cur=cur, | ||||
lambda rev: parents_filtered.extend(rev['parents'])) | item_cb=lambda rev: parents_filtered.extend(rev['parents'])) | ||||
db.revision_add_from_temp(cur) | db.revision_add_from_temp(cur) | ||||
print('PARENT FILTERED', parents_filtered) | |||||
db.copy_to(parents_filtered, 'revision_history', | db.copy_to(parents_filtered, 'revision_history', | ||||
ardumont: print! | |||||
['id', 'parent_id', 'parent_rank'], cur) | ['id', 'parent_id', 'parent_rank'], cur=cur) | ||||
@db_transaction_generator() | @db_transaction_generator() | ||||
def revision_missing(self, revisions, db=None, cur=None): | def revision_missing(self, revisions, db=None, cur=None): | ||||
"""List revisions missing from storage | """List revisions missing from storage | ||||
Args: | Args: | ||||
revisions (iterable): revision ids | revisions (iterable): revision ids | ||||
▲ Show 20 Lines • Show All 96 Lines • ▼ Show 20 Lines | def release_add(self, releases): | ||||
db.mktemp_release(cur) | db.mktemp_release(cur) | ||||
releases_filtered = ( | releases_filtered = ( | ||||
converters.release_to_db(release) for release in releases | converters.release_to_db(release) for release in releases | ||||
if release['id'] in releases_missing | if release['id'] in releases_missing | ||||
) | ) | ||||
db.copy_to(releases_filtered, 'tmp_release', db.release_add_cols, | db.copy_to(releases_filtered, 'tmp_release', db.release_add_cols, | ||||
cur) | cur=cur) | ||||
db.release_add_from_temp(cur) | db.release_add_from_temp(cur) | ||||
@db_transaction_generator() | @db_transaction_generator() | ||||
def release_missing(self, releases, db=None, cur=None): | def release_missing(self, releases, db=None, cur=None): | ||||
"""List releases missing from storage | """List releases missing from storage | ||||
Args: | Args: | ||||
▲ Show 20 Lines • Show All 62 Lines • ▼ Show 20 Lines | def snapshot_add(self, origin, visit, snapshot, | ||||
'name': name, | 'name': name, | ||||
'target': info['target'] if info else None, | 'target': info['target'] if info else None, | ||||
'target_type': info['target_type'] if info else None, | 'target_type': info['target_type'] if info else None, | ||||
} | } | ||||
for name, info in snapshot['branches'].items() | for name, info in snapshot['branches'].items() | ||||
), | ), | ||||
'tmp_snapshot_branch', | 'tmp_snapshot_branch', | ||||
['name', 'target', 'target_type'], | ['name', 'target', 'target_type'], | ||||
cur, | cur=cur, | ||||
) | ) | ||||
if not db.origin_visit_exists(origin, visit): | if not db.origin_visit_exists(origin, visit): | ||||
raise ValueError('Not origin visit with ids (%s, %s)' % | raise ValueError('Not origin visit with ids (%s, %s)' % | ||||
(origin, visit)) | (origin, visit)) | ||||
db.snapshot_add(origin, visit, snapshot['id'], cur) | db.snapshot_add(origin, visit, snapshot['id'], cur) | ||||
@db_transaction(statement_timeout=2000) | @db_transaction(statement_timeout=2000) | ||||
▲ Show 20 Lines • Show All 556 Lines • ▼ Show 20 Lines | def tool_add(self, tools, db=None, cur=None): | ||||
:class:`dict`: All the tools inserted in storage | :class:`dict`: All the tools inserted in storage | ||||
(including the internal ``id``). The order of the list is not | (including the internal ``id``). The order of the list is not | ||||
guaranteed to match the order of the initial list. | guaranteed to match the order of the initial list. | ||||
""" | """ | ||||
db.mktemp_tool(cur) | db.mktemp_tool(cur) | ||||
db.copy_to(tools, 'tmp_tool', | db.copy_to(tools, 'tmp_tool', | ||||
['name', 'version', 'configuration'], | ['name', 'version', 'configuration'], | ||||
cur) | cur=cur) | ||||
tools = db.tool_add_from_temp(cur) | tools = db.tool_add_from_temp(cur) | ||||
for line in tools: | for line in tools: | ||||
yield dict(zip(db.tool_cols, line)) | yield dict(zip(db.tool_cols, line)) | ||||
@db_transaction(statement_timeout=500) | @db_transaction(statement_timeout=500) | ||||
def tool_get(self, tool, db=None, cur=None): | def tool_get(self, tool, db=None, cur=None): | ||||
"""Retrieve tool information. | """Retrieve tool information. | ||||
▲ Show 20 Lines • Show All 121 Lines • Show Last 20 Lines |
print!