Changeset View
Changeset View
Standalone View
Standalone View
swh/storage/storage.py
Show All 21 Lines | |||||
from swh.objstorage import get_objstorage | from swh.objstorage import get_objstorage | ||||
from swh.objstorage.exc import ObjNotFoundError | from swh.objstorage.exc import ObjNotFoundError | ||||
try: | try: | ||||
from swh.journal.writer import get_journal_writer | from swh.journal.writer import get_journal_writer | ||||
except ImportError: | except ImportError: | ||||
get_journal_writer = None # type: ignore | get_journal_writer = None # type: ignore | ||||
# mypy limitation, see https://github.com/python/mypy/issues/1153 | # mypy limitation, see https://github.com/python/mypy/issues/1153 | ||||
from . import converters | from . import converters, HashCollision | ||||
from .common import db_transaction_generator, db_transaction | from .common import db_transaction_generator, db_transaction | ||||
from .db import Db | from .db import Db | ||||
from .exc import StorageDBError | from .exc import StorageArgumentException, StorageDBError | ||||
from .algos import diff | from .algos import diff | ||||
from .metrics import timed, send_metric, process_metrics | from .metrics import timed, send_metric, process_metrics | ||||
from .utils import get_partition_bounds_bytes | from .utils import get_partition_bounds_bytes | ||||
# Max block size of contents to return | # Max block size of contents to return | ||||
BULK_BLOCK_CONTENT_LEN_MAX = 10000 | BULK_BLOCK_CONTENT_LEN_MAX = 10000 | ||||
▲ Show 20 Lines • Show All 94 Lines • ▼ Show 20 Lines | def _content_normalize(d): | ||||
return d | return d | ||||
@staticmethod | @staticmethod | ||||
def _content_validate(d): | def _content_validate(d): | ||||
"""Sanity checks on status / reason / length, that postgresql | """Sanity checks on status / reason / length, that postgresql | ||||
doesn't enforce.""" | doesn't enforce.""" | ||||
if d['status'] not in ('visible', 'hidden'): | if d['status'] not in ('visible', 'hidden'): | ||||
raise ValueError('Invalid content status: {}'.format(d['status'])) | raise StorageArgumentException( | ||||
'Invalid content status: {}'.format(d['status'])) | |||||
if d.get('reason') is not None: | if d.get('reason') is not None: | ||||
raise ValueError( | raise StorageArgumentException( | ||||
'Must not provide a reason if content is present.') | 'Must not provide a reason if content is present.') | ||||
if d['length'] is None or d['length'] < 0: | if d['length'] is None or d['length'] < 0: | ||||
raise ValueError('Content length must be positive.') | raise StorageArgumentException('Content length must be positive.') | ||||
def _content_add_metadata(self, db, cur, content): | def _content_add_metadata(self, db, cur, content): | ||||
"""Add content to the postgresql database but not the object storage. | """Add content to the postgresql database but not the object storage. | ||||
""" | """ | ||||
# create temporary table for metadata injection | # create temporary table for metadata injection | ||||
db.mktemp('content', cur) | db.mktemp('content', cur) | ||||
db.copy_to(content, 'tmp_content', | db.copy_to(content, 'tmp_content', | ||||
db.content_add_keys, cur) | db.content_add_keys, cur) | ||||
# move metadata in place | # move metadata in place | ||||
try: | try: | ||||
db.content_add_from_temp(cur) | db.content_add_from_temp(cur) | ||||
except psycopg2.IntegrityError as e: | except psycopg2.IntegrityError as e: | ||||
from . import HashCollision | |||||
if e.diag.sqlstate == '23505' and \ | if e.diag.sqlstate == '23505' and \ | ||||
e.diag.table_name == 'content': | e.diag.table_name == 'content': | ||||
constraint_to_hash_name = { | constraint_to_hash_name = { | ||||
'content_pkey': 'sha1', | 'content_pkey': 'sha1', | ||||
'content_sha1_git_idx': 'sha1_git', | 'content_sha1_git_idx': 'sha1_git', | ||||
'content_sha256_idx': 'sha256', | 'content_sha256_idx': 'sha256', | ||||
} | } | ||||
colliding_hash_name = constraint_to_hash_name \ | colliding_hash_name = constraint_to_hash_name \ | ||||
▲ Show 20 Lines • Show All 97 Lines • ▼ Show 20 Lines | def content_add_metadata(self, content, db=None, cur=None): | ||||
return { | return { | ||||
'content:add': len(content), | 'content:add': len(content), | ||||
} | } | ||||
@timed | @timed | ||||
def content_get(self, content): | def content_get(self, content): | ||||
# FIXME: Make this method support slicing the `data`. | # FIXME: Make this method support slicing the `data`. | ||||
if len(content) > BULK_BLOCK_CONTENT_LEN_MAX: | if len(content) > BULK_BLOCK_CONTENT_LEN_MAX: | ||||
raise ValueError( | raise StorageArgumentException( | ||||
"Send at maximum %s contents." % BULK_BLOCK_CONTENT_LEN_MAX) | "Send at maximum %s contents." % BULK_BLOCK_CONTENT_LEN_MAX) | ||||
for obj_id in content: | for obj_id in content: | ||||
try: | try: | ||||
data = self.objstorage.get(obj_id) | data = self.objstorage.get(obj_id) | ||||
except ObjNotFoundError: | except ObjNotFoundError: | ||||
yield None | yield None | ||||
continue | continue | ||||
yield {'sha1': obj_id, 'data': data} | yield {'sha1': obj_id, 'data': data} | ||||
@timed | @timed | ||||
@db_transaction() | @db_transaction() | ||||
def content_get_range(self, start, end, limit=1000, db=None, cur=None): | def content_get_range(self, start, end, limit=1000, db=None, cur=None): | ||||
if limit is None: | if limit is None: | ||||
raise ValueError('Development error: limit should not be None') | raise StorageArgumentException('limit should not be None') | ||||
contents = [] | contents = [] | ||||
next_content = None | next_content = None | ||||
for counter, content_row in enumerate( | for counter, content_row in enumerate( | ||||
db.content_get_range(start, end, limit+1, cur)): | db.content_get_range(start, end, limit+1, cur)): | ||||
content = dict(zip(db.content_get_metadata_keys, content_row)) | content = dict(zip(db.content_get_metadata_keys, content_row)) | ||||
if counter >= limit: | if counter >= limit: | ||||
# take the last commit for the next page starting from this | # take the last commit for the next page starting from this | ||||
next_content = content['sha1'] | next_content = content['sha1'] | ||||
break | break | ||||
contents.append(content) | contents.append(content) | ||||
return { | return { | ||||
'contents': contents, | 'contents': contents, | ||||
'next': next_content, | 'next': next_content, | ||||
} | } | ||||
@timed | @timed | ||||
@db_transaction() | @db_transaction() | ||||
def content_get_partition( | def content_get_partition( | ||||
self, partition_id: int, nb_partitions: int, limit: int = 1000, | self, partition_id: int, nb_partitions: int, limit: int = 1000, | ||||
page_token: str = None, db=None, cur=None): | page_token: str = None, db=None, cur=None): | ||||
if limit is None: | if limit is None: | ||||
raise ValueError('Development error: limit should not be None') | raise StorageArgumentException('limit should not be None') | ||||
(start, end) = get_partition_bounds_bytes( | (start, end) = get_partition_bounds_bytes( | ||||
partition_id, nb_partitions, SHA1_SIZE) | partition_id, nb_partitions, SHA1_SIZE) | ||||
if page_token: | if page_token: | ||||
start = hash_to_bytes(page_token) | start = hash_to_bytes(page_token) | ||||
if end is None: | if end is None: | ||||
end = b'\xff'*SHA1_SIZE | end = b'\xff'*SHA1_SIZE | ||||
result = self.content_get_range(start, end, limit) | result = self.content_get_range(start, end, limit) | ||||
result2 = { | result2 = { | ||||
Show All 16 Lines | def content_get_metadata( | ||||
return result | return result | ||||
@timed | @timed | ||||
@db_transaction_generator() | @db_transaction_generator() | ||||
def content_missing(self, content, key_hash='sha1', db=None, cur=None): | def content_missing(self, content, key_hash='sha1', db=None, cur=None): | ||||
keys = db.content_hash_keys | keys = db.content_hash_keys | ||||
if key_hash not in keys: | if key_hash not in keys: | ||||
raise ValueError("key_hash should be one of %s" % keys) | raise StorageArgumentException( | ||||
"key_hash should be one of %s" % keys) | |||||
key_hash_idx = keys.index(key_hash) | key_hash_idx = keys.index(key_hash) | ||||
if not content: | if not content: | ||||
return | return | ||||
for obj in db.content_missing_from_list(content, cur): | for obj in db.content_missing_from_list(content, cur): | ||||
yield obj[key_hash_idx] | yield obj[key_hash_idx] | ||||
Show All 9 Lines | class Storage(): | ||||
def content_missing_per_sha1_git(self, contents, db=None, cur=None): | def content_missing_per_sha1_git(self, contents, db=None, cur=None): | ||||
for obj in db.content_missing_per_sha1_git(contents, cur): | for obj in db.content_missing_per_sha1_git(contents, cur): | ||||
yield obj[0] | yield obj[0] | ||||
@timed | @timed | ||||
@db_transaction() | @db_transaction() | ||||
def content_find(self, content, db=None, cur=None): | def content_find(self, content, db=None, cur=None): | ||||
if not set(content).intersection(ALGORITHMS): | if not set(content).intersection(ALGORITHMS): | ||||
raise ValueError('content keys must contain at least one of: ' | raise StorageArgumentException( | ||||
'content keys must contain at least one of: ' | |||||
'sha1, sha1_git, sha256, blake2s256') | 'sha1, sha1_git, sha256, blake2s256') | ||||
contents = db.content_find(sha1=content.get('sha1'), | contents = db.content_find(sha1=content.get('sha1'), | ||||
sha1_git=content.get('sha1_git'), | sha1_git=content.get('sha1_git'), | ||||
sha256=content.get('sha256'), | sha256=content.get('sha256'), | ||||
blake2s256=content.get('blake2s256'), | blake2s256=content.get('blake2s256'), | ||||
cur=cur) | cur=cur) | ||||
return [dict(zip(db.content_find_cols, content)) | return [dict(zip(db.content_find_cols, content)) | ||||
for content in contents] | for content in contents] | ||||
Show All 15 Lines | def _skipped_content_normalize(d): | ||||
return d | return d | ||||
@staticmethod | @staticmethod | ||||
def _skipped_content_validate(d): | def _skipped_content_validate(d): | ||||
"""Sanity checks on status / reason / length, that postgresql | """Sanity checks on status / reason / length, that postgresql | ||||
doesn't enforce.""" | doesn't enforce.""" | ||||
if d['status'] != 'absent': | if d['status'] != 'absent': | ||||
raise ValueError('Invalid content status: {}'.format(d['status'])) | raise StorageArgumentException( | ||||
'Invalid content status: {}'.format(d['status'])) | |||||
if d.get('reason') is None: | if d.get('reason') is None: | ||||
raise ValueError( | raise StorageArgumentException( | ||||
'Must provide a reason if content is absent.') | 'Must provide a reason if content is absent.') | ||||
if d['length'] < -1: | if d['length'] < -1: | ||||
raise ValueError('Content length must be positive or -1.') | raise StorageArgumentException( | ||||
'Content length must be positive or -1.') | |||||
def _skipped_content_add_metadata(self, db, cur, content): | def _skipped_content_add_metadata(self, db, cur, content): | ||||
content = \ | content = \ | ||||
[cont.copy() for cont in content] | [cont.copy() for cont in content] | ||||
origin_ids = db.origin_id_get_by_url( | origin_ids = db.origin_id_get_by_url( | ||||
[cont.get('origin') for cont in content], | [cont.get('origin') for cont in content], | ||||
cur=cur) | cur=cur) | ||||
for (cont, origin_id) in zip(content, origin_ids): | for (cont, origin_id) in zip(content, origin_ids): | ||||
▲ Show 20 Lines • Show All 57 Lines • ▼ Show 20 Lines | def directory_add(self, directories, db=None, cur=None): | ||||
for cur_dir in directories: | for cur_dir in directories: | ||||
dir_id = cur_dir['id'] | dir_id = cur_dir['id'] | ||||
dirs.add(dir_id) | dirs.add(dir_id) | ||||
for src_entry in cur_dir['entries']: | for src_entry in cur_dir['entries']: | ||||
entry = src_entry.copy() | entry = src_entry.copy() | ||||
entry['dir_id'] = dir_id | entry['dir_id'] = dir_id | ||||
if entry['type'] not in ('file', 'dir', 'rev'): | if entry['type'] not in ('file', 'dir', 'rev'): | ||||
raise ValueError( | raise StorageArgumentException( | ||||
'Entry type must be file, dir, or rev; not %s' | 'Entry type must be file, dir, or rev; not %s' | ||||
% entry['type']) | % entry['type']) | ||||
dir_entries[entry['type']][dir_id].append(entry) | dir_entries[entry['type']][dir_id].append(entry) | ||||
dirs_missing = set(self.directory_missing(dirs, db=db, cur=cur)) | dirs_missing = set(self.directory_missing(dirs, db=db, cur=cur)) | ||||
if not dirs_missing: | if not dirs_missing: | ||||
return summary | return summary | ||||
▲ Show 20 Lines • Show All 271 Lines • ▼ Show 20 Lines | def snapshot_get_latest(self, origin, allowed_statuses=None, db=None, | ||||
origin_visit = self.origin_visit_get_latest( | origin_visit = self.origin_visit_get_latest( | ||||
origin, allowed_statuses=allowed_statuses, require_snapshot=True, | origin, allowed_statuses=allowed_statuses, require_snapshot=True, | ||||
db=db, cur=cur) | db=db, cur=cur) | ||||
if origin_visit and origin_visit['snapshot']: | if origin_visit and origin_visit['snapshot']: | ||||
snapshot = self.snapshot_get( | snapshot = self.snapshot_get( | ||||
origin_visit['snapshot'], db=db, cur=cur) | origin_visit['snapshot'], db=db, cur=cur) | ||||
if not snapshot: | if not snapshot: | ||||
raise ValueError( | raise StorageArgumentException( | ||||
'last origin visit references an unknown snapshot') | 'last origin visit references an unknown snapshot') | ||||
return snapshot | return snapshot | ||||
@timed | @timed | ||||
@db_transaction(statement_timeout=2000) | @db_transaction(statement_timeout=2000) | ||||
def snapshot_count_branches(self, snapshot_id, db=None, cur=None): | def snapshot_count_branches(self, snapshot_id, db=None, cur=None): | ||||
return dict([bc for bc in | return dict([bc for bc in | ||||
db.snapshot_count_branches(snapshot_id, cur)]) | db.snapshot_count_branches(snapshot_id, cur)]) | ||||
▲ Show 20 Lines • Show All 71 Lines • ▼ Show 20 Lines | def origin_visit_add(self, origin, date, type, | ||||
} | } | ||||
@timed | @timed | ||||
@db_transaction() | @db_transaction() | ||||
def origin_visit_update(self, origin, visit_id, status=None, | def origin_visit_update(self, origin, visit_id, status=None, | ||||
metadata=None, snapshot=None, | metadata=None, snapshot=None, | ||||
db=None, cur=None): | db=None, cur=None): | ||||
if not isinstance(origin, str): | if not isinstance(origin, str): | ||||
raise TypeError('origin must be a string, not %r' % (origin,)) | raise StorageArgumentException( | ||||
'origin must be a string, not %r' % (origin,)) | |||||
origin_url = origin | origin_url = origin | ||||
visit = db.origin_visit_get(origin_url, visit_id, cur=cur) | visit = db.origin_visit_get(origin_url, visit_id, cur=cur) | ||||
if not visit: | if not visit: | ||||
raise ValueError('Invalid visit_id for this origin.') | raise StorageArgumentException('Invalid visit_id for this origin.') | ||||
visit = dict(zip(db.origin_visit_get_cols, visit)) | visit = dict(zip(db.origin_visit_get_cols, visit)) | ||||
updates = {} | updates = {} | ||||
if status and status != visit['status']: | if status and status != visit['status']: | ||||
updates['status'] = status | updates['status'] = status | ||||
if metadata and metadata != visit['metadata']: | if metadata and metadata != visit['metadata']: | ||||
updates['metadata'] = metadata | updates['metadata'] = metadata | ||||
Show All 10 Lines | class Storage(): | ||||
@timed | @timed | ||||
@db_transaction() | @db_transaction() | ||||
def origin_visit_upsert(self, visits, db=None, cur=None): | def origin_visit_upsert(self, visits, db=None, cur=None): | ||||
visits = copy.deepcopy(visits) | visits = copy.deepcopy(visits) | ||||
for visit in visits: | for visit in visits: | ||||
if isinstance(visit['date'], str): | if isinstance(visit['date'], str): | ||||
visit['date'] = dateutil.parser.parse(visit['date']) | visit['date'] = dateutil.parser.parse(visit['date']) | ||||
if not isinstance(visit['origin'], str): | if not isinstance(visit['origin'], str): | ||||
raise TypeError("visit['origin'] must be a string, not %r" | raise StorageArgumentException( | ||||
"visit['origin'] must be a string, not %r" | |||||
% (visit['origin'],)) | % (visit['origin'],)) | ||||
if self.journal_writer: | if self.journal_writer: | ||||
for visit in visits: | for visit in visits: | ||||
self.journal_writer.write_addition('origin_visit', visit) | self.journal_writer.write_addition('origin_visit', visit) | ||||
for visit in visits: | for visit in visits: | ||||
# TODO: upsert them all in a single query | # TODO: upsert them all in a single query | ||||
db.origin_visit_upsert(**visit, cur=cur) | db.origin_visit_upsert(**visit, cur=cur) | ||||
▲ Show 20 Lines • Show All 99 Lines • ▼ Show 20 Lines | def origin_get_range(self, origin_from=1, origin_count=100, | ||||
yield dict(zip(db.origin_get_range_cols, origin)) | yield dict(zip(db.origin_get_range_cols, origin)) | ||||
@timed | @timed | ||||
@db_transaction() | @db_transaction() | ||||
def origin_list(self, page_token: Optional[str] = None, limit: int = 100, | def origin_list(self, page_token: Optional[str] = None, limit: int = 100, | ||||
*, db=None, cur=None) -> dict: | *, db=None, cur=None) -> dict: | ||||
page_token = page_token or '0' | page_token = page_token or '0' | ||||
if not isinstance(page_token, str): | if not isinstance(page_token, str): | ||||
raise TypeError('page_token must be a string.') | raise StorageArgumentException('page_token must be a string.') | ||||
origin_from = int(page_token) | origin_from = int(page_token) | ||||
result: Dict[str, Any] = { | result: Dict[str, Any] = { | ||||
'origins': [ | 'origins': [ | ||||
dict(zip(db.origin_get_range_cols, origin)) | dict(zip(db.origin_get_range_cols, origin)) | ||||
for origin in db.origin_get_range(origin_from, limit, cur) | for origin in db.origin_get_range(origin_from, limit, cur) | ||||
], | ], | ||||
} | } | ||||
▲ Show 20 Lines • Show All 156 Lines • Show Last 20 Lines |