diff --git a/swh/storage/api/client.py b/swh/storage/api/client.py --- a/swh/storage/api/client.py +++ b/swh/storage/api/client.py @@ -3,255 +3,22 @@ # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information -import warnings - from swh.core.api import RPCClient from ..exc import StorageAPIError +from ..storage import Storage class RemoteStorage(RPCClient): """Proxy to a remote storage API""" api_exception = StorageAPIError - - def check_config(self, *, check_write): - return self.post('check_config', {'check_write': check_write}) + backend_class = Storage def reset(self): return self.post('reset', {}) - def content_add(self, content): - return self.post('content/add', {'content': content}) - - def content_add_metadata(self, content): - return self.post('content/add_metadata', {'content': content}) - - def content_update(self, content, keys=[]): - return self.post('content/update', {'content': content, - 'keys': keys}) - - def content_missing(self, content, key_hash='sha1'): - return self.post('content/missing', {'content': content, - 'key_hash': key_hash}) - - def content_missing_per_sha1(self, contents): - return self.post('content/missing/sha1', {'contents': contents}) - - def skipped_content_missing(self, contents): - return self.post('content/skipped/missing', {'contents': contents}) - - def content_get(self, content): - return self.post('content/data', {'content': content}) - - def content_get_metadata(self, content): - return self.post('content/metadata', {'content': content}) - - def content_get_range(self, start, end, limit=1000): - return self.post('content/range', {'start': start, - 'end': end, - 'limit': limit}) - - def content_find(self, content): - return self.post('content/present', {'content': content}) - - def directory_add(self, directories): - return self.post('directory/add', {'directories': directories}) - - def directory_missing(self, directories): - return self.post('directory/missing', {'directories': directories}) - - def directory_ls(self, directory, recursive=False): - return self.post('directory/ls', {'directory': directory, - 'recursive': recursive}) - - def revision_get(self, revisions): - return self.post('revision', {'revisions': revisions}) - - def revision_log(self, revisions, limit=None): - return self.post('revision/log', {'revisions': revisions, - 'limit': limit}) - - def revision_shortlog(self, revisions, limit=None): - return self.post('revision/shortlog', {'revisions': revisions, - 'limit': limit}) - - def revision_add(self, revisions): - return self.post('revision/add', {'revisions': revisions}) - - def revision_missing(self, revisions): - return self.post('revision/missing', {'revisions': revisions}) - - def release_add(self, releases): - return self.post('release/add', {'releases': releases}) - - def release_get(self, releases): - return self.post('release', {'releases': releases}) - - def release_missing(self, releases): - return self.post('release/missing', {'releases': releases}) - - def object_find_by_sha1_git(self, ids): - return self.post('object/find_by_sha1_git', {'ids': ids}) - - def snapshot_add(self, snapshots): - return self.post('snapshot/add', {'snapshots': snapshots}) - - def snapshot_get(self, snapshot_id): - return self.post('snapshot', { - 'snapshot_id': snapshot_id - }) - - def snapshot_get_by_origin_visit(self, origin, visit): - return self.post('snapshot/by_origin_visit', { - 'origin': origin, - 'visit': visit - }) - - def snapshot_get_latest(self, origin, allowed_statuses=None): - return self.post('snapshot/latest', { - 'origin': origin, - 'allowed_statuses': allowed_statuses - }) - - def snapshot_count_branches(self, snapshot_id): - return self.post('snapshot/count_branches', { - 'snapshot_id': snapshot_id - }) - - def snapshot_get_branches(self, snapshot_id, branches_from=b'', - branches_count=1000, target_types=None): - return self.post('snapshot/get_branches', { - 'snapshot_id': snapshot_id, - 'branches_from': branches_from, - 'branches_count': branches_count, - 'target_types': target_types - }) - - def origin_get(self, origins=None, *, origin=None): - if origin is None: - if origins is None: - raise TypeError('origin_get expected 1 argument') - else: - assert origins is None - origins = origin - warnings.warn("argument 'origin' of origin_get was renamed " - "to 'origins' in v0.0.123.", - DeprecationWarning) - return self.post('origin/get', {'origins': origins}) - - def origin_search(self, url_pattern, offset=0, limit=50, regexp=False, - with_visit=False): - return self.post('origin/search', {'url_pattern': url_pattern, - 'offset': offset, - 'limit': limit, - 'regexp': regexp, - 'with_visit': with_visit}) - - def origin_count(self, url_pattern, regexp=False, with_visit=False): - return self.post('origin/count', {'url_pattern': url_pattern, - 'regexp': regexp, - 'with_visit': with_visit}) - - def origin_get_range(self, origin_from=1, origin_count=100): - return self.post('origin/get_range', {'origin_from': origin_from, - 'origin_count': origin_count}) - - def origin_add(self, origins): - return self.post('origin/add_multi', {'origins': origins}) - - def origin_add_one(self, origin): - return self.post('origin/add', {'origin': origin}) - - def origin_visit_add(self, origin, date, type): - return self.post( - 'origin/visit/add', - {'origin': origin, 'date': date, 'type': type}) - - def origin_visit_update(self, origin, visit_id, status=None, - metadata=None, snapshot=None): - return self.post('origin/visit/update', {'origin': origin, - 'visit_id': visit_id, - 'status': status, - 'metadata': metadata, - 'snapshot': snapshot}) - - def origin_visit_upsert(self, visits): - return self.post('origin/visit/upsert', {'visits': visits}) - - def origin_visit_get(self, origin, last_visit=None, limit=None): - return self.post('origin/visit/get', { - 'origin': origin, 'last_visit': last_visit, 'limit': limit}) - - def origin_visit_find_by_date(self, origin, visit_date, limit=None): - return self.post('origin/visit/find_by_date', { - 'origin': origin, 'visit_date': visit_date}) - - def origin_visit_get_by(self, origin, visit): - return self.post('origin/visit/getby', {'origin': origin, - 'visit': visit}) - - def origin_visit_get_latest(self, origin, allowed_statuses=None, - require_snapshot=False): - return self.post( - 'origin/visit/get_latest', - {'origin': origin, 'allowed_statuses': allowed_statuses, - 'require_snapshot': require_snapshot}) - def stat_counters(self): return self.get('stat/counters') def refresh_stat_counters(self): return self.get('stat/refresh') - - def directory_entry_get_by_path(self, directory, paths): - return self.post('directory/path', dict(directory=directory, - paths=paths)) - - def tool_add(self, tools): - return self.post('tool/add', {'tools': tools}) - - def tool_get(self, tool): - return self.post('tool/data', {'tool': tool}) - - def origin_metadata_add(self, origin_url, ts, provider, tool, metadata): - return self.post('origin/metadata/add', {'origin_url': origin_url, - 'ts': ts, - 'provider': provider, - 'tool': tool, - 'metadata': metadata}) - - def origin_metadata_get_by(self, origin_url, provider_type=None): - return self.post('origin/metadata/get', { - 'origin_url': origin_url, - 'provider_type': provider_type - }) - - def metadata_provider_add(self, provider_name, provider_type, provider_url, - metadata): - return self.post('provider/add', {'provider_name': provider_name, - 'provider_type': provider_type, - 'provider_url': provider_url, - 'metadata': metadata}) - - def metadata_provider_get(self, provider_id): - return self.post('provider/get', {'provider_id': provider_id}) - - def metadata_provider_get_by(self, provider): - return self.post('provider/getby', {'provider': provider}) - - def diff_directories(self, from_dir, to_dir, track_renaming=False): - return self.post('algos/diff_directories', - {'from_dir': from_dir, - 'to_dir': to_dir, - 'track_renaming': track_renaming}) - - def diff_revisions(self, from_rev, to_rev, track_renaming=False): - return self.post('algos/diff_revisions', - {'from_rev': from_rev, - 'to_rev': to_rev, - 'track_renaming': track_renaming}) - - def diff_revision(self, revision, track_renaming=False): - return self.post('algos/diff_revision', - {'revision': revision, - 'track_renaming': track_renaming}) diff --git a/swh/storage/storage.py b/swh/storage/storage.py --- a/swh/storage/storage.py +++ b/swh/storage/storage.py @@ -16,12 +16,7 @@ import psycopg2 import psycopg2.pool -from . import converters -from .common import db_transaction_generator, db_transaction -from .db import Db -from .exc import StorageDBError -from .algos import diff - +from swh.core.api import remote_api_endpoint from swh.model.hashutil import ALGORITHMS, hash_to_bytes from swh.objstorage import get_objstorage from swh.objstorage.exc import ObjNotFoundError @@ -31,6 +26,12 @@ get_journal_writer = None # type: ignore # mypy limitation, see https://github.com/python/mypy/issues/1153 +from . import converters +from .common import db_transaction_generator, db_transaction +from .db import Db +from .exc import StorageDBError +from .algos import diff + # Max block size of contents to return BULK_BLOCK_CONTENT_LEN_MAX = 10000 @@ -94,8 +95,9 @@ if db: self.put_db(db) + @remote_api_endpoint('check_config') @db_transaction() - def check_config(self, *, check_write, db, cur): + def check_config(self, *, check_write, db=None, cur=None): """Check that the storage is configured and ready to go.""" if not self.objstorage.check_config(check_write=check_write): @@ -149,7 +151,7 @@ if d['length'] < -1: raise ValueError('Content length must be positive or -1.') - def _filter_new_content(self, content, db, cur): + def _filter_new_content(self, content, db=None, cur=None): """Sort contents into buckets 'with data' and 'without data', and filter out those already in the database.""" content_by_status = defaultdict(list) @@ -225,8 +227,9 @@ # move metadata in place db.skipped_content_add_from_temp(cur) + @remote_api_endpoint('content/add') @db_transaction() - def content_add(self, content, db, cur): + def content_add(self, content, db=None, cur=None): """Add content blobs to the storage Note: in case of DB errors, objects might have already been added to @@ -319,6 +322,7 @@ summary['content:add:bytes'] = content_bytes_added return summary + @remote_api_endpoint('content/update') @db_transaction() def content_update(self, content, keys=[], db=None, cur=None): """Update content blobs to the storage. Does nothing for unknown @@ -353,8 +357,9 @@ db.content_update_from_temp(keys_to_update=keys, cur=cur) + @remote_api_endpoint('content/add_metadata') @db_transaction() - def content_add_metadata(self, content, db, cur): + def content_add_metadata(self, content, db=None, cur=None): """Add content metadata to the storage (like `content_add`, but without inserting to the objstorage). @@ -398,6 +403,7 @@ return summary + @remote_api_endpoint('content/data') def content_get(self, content): """Retrieve in bulk contents and their data. @@ -435,6 +441,7 @@ yield {'sha1': obj_id, 'data': data} + @remote_api_endpoint('content/range') @db_transaction() def content_get_range(self, start, end, limit=1000, db=None, cur=None): """Retrieve contents within range [start, end] bound by limit. @@ -474,6 +481,7 @@ 'next': next_content, } + @remote_api_endpoint('content/metadata') @db_transaction_generator(statement_timeout=500) def content_get_metadata(self, content, db=None, cur=None): """Retrieve content metadata in bulk @@ -487,6 +495,7 @@ for metadata in db.content_get_metadata_from_sha1s(content, cur): yield dict(zip(db.content_get_metadata_keys, metadata)) + @remote_api_endpoint('content/missing') @db_transaction_generator() def content_missing(self, content, key_hash='sha1', db=None, cur=None): """List content missing from storage @@ -522,6 +531,7 @@ for obj in db.content_missing_from_list(content, cur): yield obj[key_hash_idx] + @remote_api_endpoint('content/missing/sha1') @db_transaction_generator() def content_missing_per_sha1(self, contents, db=None, cur=None): """List content missing from storage based only on sha1. @@ -539,6 +549,7 @@ for obj in db.content_missing_per_sha1(contents, cur): yield obj[0] + @remote_api_endpoint('content/skipped/missing') @db_transaction_generator() def skipped_content_missing(self, contents, db=None, cur=None): """List skipped_content missing from storage @@ -554,6 +565,7 @@ for content in db.skipped_content_missing(contents, cur): yield dict(zip(db.content_hash_keys, content)) + @remote_api_endpoint('content/present') @db_transaction() def content_find(self, content, db=None, cur=None): """Find a content hash in db. @@ -584,8 +596,9 @@ return [dict(zip(db.content_find_cols, content)) for content in contents] + @remote_api_endpoint('directory/add') @db_transaction() - def directory_add(self, directories, db, cur): + def directory_add(self, directories, db=None, cur=None): """Add directories to the storage Args: @@ -669,6 +682,7 @@ return summary + @remote_api_endpoint('directory/missing') @db_transaction_generator() def directory_missing(self, directories, db=None, cur=None): """List directories missing from storage @@ -683,6 +697,7 @@ for obj in db.directory_missing_from_list(directories, cur): yield obj[0] + @remote_api_endpoint('directory/ls') @db_transaction_generator(statement_timeout=20000) def directory_ls(self, directory, recursive=False, db=None, cur=None): """Get entries for one directory. @@ -706,6 +721,7 @@ for line in res_gen: yield dict(zip(db.directory_ls_cols, line)) + @remote_api_endpoint('directory/path') @db_transaction(statement_timeout=2000) def directory_entry_get_by_path(self, directory, paths, db=None, cur=None): """Get the directory entry (either file or dir) from directory with path. @@ -723,8 +739,9 @@ if res: return dict(zip(db.directory_ls_cols, res)) + @remote_api_endpoint('revision/add') @db_transaction() - def revision_add(self, revisions, db, cur): + def revision_add(self, revisions, db=None, cur=None): """Add revisions to the storage Args: @@ -795,6 +812,7 @@ return {'revision:add': len(revisions_missing)} + @remote_api_endpoint('revision/missing') @db_transaction_generator() def revision_missing(self, revisions, db=None, cur=None): """List revisions missing from storage @@ -812,6 +830,7 @@ for obj in db.revision_missing_from_list(revisions, cur): yield obj[0] + @remote_api_endpoint('revision') @db_transaction_generator(statement_timeout=1000) def revision_get(self, revisions, db=None, cur=None): """Get all revisions from storage @@ -833,6 +852,7 @@ continue yield data + @remote_api_endpoint('revision/log') @db_transaction_generator(statement_timeout=2000) def revision_log(self, revisions, limit=None, db=None, cur=None): """Fetch revision entry from the given root revisions. @@ -854,6 +874,7 @@ continue yield data + @remote_api_endpoint('revision/shortlog') @db_transaction_generator(statement_timeout=2000) def revision_shortlog(self, revisions, limit=None, db=None, cur=None): """Fetch the shortlog for the given revisions @@ -869,8 +890,9 @@ yield from db.revision_shortlog(revisions, limit, cur) + @remote_api_endpoint('release/add') @db_transaction() - def release_add(self, releases, db, cur): + def release_add(self, releases, db=None, cur=None): """Add releases to the storage Args: @@ -926,6 +948,7 @@ return {'release:add': len(releases_missing)} + @remote_api_endpoint('release/missing') @db_transaction_generator() def release_missing(self, releases, db=None, cur=None): """List releases missing from storage @@ -943,6 +966,7 @@ for obj in db.release_missing_from_list(releases, cur): yield obj[0] + @remote_api_endpoint('release') @db_transaction_generator(statement_timeout=500) def release_get(self, releases, db=None, cur=None): """Given a list of sha1, return the releases's information @@ -961,6 +985,7 @@ ) yield data if data['target_type'] else None + @remote_api_endpoint('snapshot/add') @db_transaction() def snapshot_add(self, snapshots, db=None, cur=None): """Add snapshots to the storage. @@ -1024,6 +1049,7 @@ return {'snapshot:add': count} + @remote_api_endpoint('snapshot') @db_transaction(statement_timeout=2000) def snapshot_get(self, snapshot_id, db=None, cur=None): """Get the content, possibly partial, of a snapshot with the given id @@ -1050,6 +1076,7 @@ return self.snapshot_get_branches(snapshot_id, db=db, cur=cur) + @remote_api_endpoint('snapshot/by_origin_visit') @db_transaction(statement_timeout=2000) def snapshot_get_by_origin_visit(self, origin, visit, db=None, cur=None): """Get the content, possibly partial, of a snapshot for the given origin visit @@ -1083,6 +1110,7 @@ return None + @remote_api_endpoint('snapshot/latest') @db_transaction(statement_timeout=4000) def snapshot_get_latest(self, origin, allowed_statuses=None, db=None, cur=None): @@ -1130,6 +1158,7 @@ 'last origin visit references an unknown snapshot') return snapshot + @remote_api_endpoint('snapshot/count_branches') @db_transaction(statement_timeout=2000) def snapshot_count_branches(self, snapshot_id, db=None, cur=None): """Count the number of branches in the snapshot with the given id @@ -1144,6 +1173,7 @@ return dict([bc for bc in db.snapshot_count_branches(snapshot_id, cur)]) + @remote_api_endpoint('snapshot/get_branches') @db_transaction(statement_timeout=2000) def snapshot_get_branches(self, snapshot_id, branches_from=b'', branches_count=1000, target_types=None, @@ -1209,6 +1239,7 @@ return None + @remote_api_endpoint('origin/visit/add') @db_transaction() def origin_visit_add(self, origin, date, type, db=None, cur=None): @@ -1247,6 +1278,7 @@ 'visit': visit_id, } + @remote_api_endpoint('origin/visit/update') @db_transaction() def origin_visit_update(self, origin, visit_id, status=None, metadata=None, snapshot=None, @@ -1290,6 +1322,7 @@ db.origin_visit_update(origin_url, visit_id, updates, cur) + @remote_api_endpoint('origin/visit/upsert') @db_transaction() def origin_visit_upsert(self, visits, db=None, cur=None): """Add a origin_visits with a specific id and with all its data. @@ -1323,6 +1356,7 @@ # TODO: upsert them all in a single query db.origin_visit_upsert(**visit, cur=cur) + @remote_api_endpoint('origin/visit/get') @db_transaction_generator(statement_timeout=500) def origin_visit_get(self, origin, last_visit=None, limit=None, db=None, cur=None): @@ -1344,6 +1378,7 @@ data = dict(zip(db.origin_visit_get_cols, line)) yield data + @remote_api_endpoint('origin/visit/find_by_date') @db_transaction(statement_timeout=500) def origin_visit_find_by_date(self, origin, visit_date, db=None, cur=None): """Retrieves the origin visit whose date is closest to the provided @@ -1362,6 +1397,7 @@ if line: return dict(zip(db.origin_visit_get_cols, line)) + @remote_api_endpoint('origin/visit/getby') @db_transaction(statement_timeout=500) def origin_visit_get_by(self, origin, visit, db=None, cur=None): """Retrieve origin visit's information. @@ -1380,6 +1416,7 @@ return dict(zip(db.origin_visit_get_cols, ori_visit)) + @remote_api_endpoint('origin/visit/get_latest') @db_transaction(statement_timeout=4000) def origin_visit_get_latest( self, origin, allowed_statuses=None, require_snapshot=False, @@ -1414,6 +1451,7 @@ if origin_visit: return dict(zip(db.origin_visit_get_cols, origin_visit)) + @remote_api_endpoint('object/find_by_sha1_git') @db_transaction(statement_timeout=2000) def object_find_by_sha1_git(self, ids, db=None, cur=None): """Return the objects found with the given ids. @@ -1440,6 +1478,7 @@ return ret + @remote_api_endpoint('origin/get') @db_transaction(statement_timeout=500) def origin_get(self, origins, db=None, cur=None): """Return origins, either all identified by their ids or all @@ -1488,6 +1527,7 @@ else: return [None if res['url'] is None else res for res in results] + @remote_api_endpoint('origin/get_range') @db_transaction_generator() def origin_get_range(self, origin_from=1, origin_count=100, db=None, cur=None): @@ -1507,6 +1547,7 @@ for origin in db.origin_get_range(origin_from, origin_count, cur): yield dict(zip(db.origin_get_range_cols, origin)) + @remote_api_endpoint('origin/search') @db_transaction_generator() def origin_search(self, url_pattern, offset=0, limit=50, regexp=False, with_visit=False, db=None, cur=None): @@ -1531,6 +1572,7 @@ regexp, with_visit, cur): yield dict(zip(db.origin_cols, origin)) + @remote_api_endpoint('origin/count') @db_transaction() def origin_count(self, url_pattern, regexp=False, with_visit=False, db=None, cur=None): @@ -1550,6 +1592,7 @@ """ return db.origin_count(url_pattern, regexp, with_visit, cur) + @remote_api_endpoint('origin/add_multi') @db_transaction() def origin_add(self, origins, db=None, cur=None): """Add origins to the storage @@ -1571,6 +1614,7 @@ return origins + @remote_api_endpoint('origin/add') @db_transaction() def origin_add_one(self, origin, db=None, cur=None): """Add origin to the storage @@ -1629,6 +1673,7 @@ for key in keys: cur.execute('select * from swh_update_counter(%s)', (key,)) + @remote_api_endpoint('origin/metadata/add') @db_transaction() def origin_metadata_add(self, origin_url, ts, provider, tool, metadata, db=None, cur=None): @@ -1648,6 +1693,7 @@ db.origin_metadata_add(origin_url, ts, provider, tool, metadata, cur) + @remote_api_endpoint('origin/metadata/get') @db_transaction_generator(statement_timeout=500) def origin_metadata_get_by(self, origin_url, provider_type=None, db=None, cur=None): @@ -1673,6 +1719,7 @@ for line in db.origin_metadata_get_by(origin_url, provider_type, cur): yield dict(zip(db.origin_metadata_get_cols, line)) + @remote_api_endpoint('tool/add') @db_transaction() def tool_add(self, tools, db=None, cur=None): """Add new tools to the storage. @@ -1700,6 +1747,7 @@ tools = db.tool_add_from_temp(cur) return [dict(zip(db.tool_cols, line)) for line in tools] + @remote_api_endpoint('tool/data') @db_transaction(statement_timeout=500) def tool_get(self, tool, db=None, cur=None): """Retrieve tool information. @@ -1724,6 +1772,7 @@ return None return dict(zip(db.tool_cols, idx)) + @remote_api_endpoint('provider/add') @db_transaction() def metadata_provider_add(self, provider_name, provider_type, provider_url, metadata, db=None, cur=None): @@ -1741,6 +1790,7 @@ return db.metadata_provider_add(provider_name, provider_type, provider_url, metadata, cur) + @remote_api_endpoint('provider/get') @db_transaction() def metadata_provider_get(self, provider_id, db=None, cur=None): """Get a metadata provider @@ -1757,6 +1807,7 @@ return None return dict(zip(db.metadata_provider_cols, result)) + @remote_api_endpoint('provider/getby') @db_transaction() def metadata_provider_get_by(self, provider, db=None, cur=None): """Get a metadata provider @@ -1776,6 +1827,7 @@ return None return dict(zip(db.metadata_provider_cols, result)) + @remote_api_endpoint('algos/diff_directories') def diff_directories(self, from_dir, to_dir, track_renaming=False): """Compute the list of file changes introduced between two arbitrary directories (insertion / deletion / modification / renaming of files). @@ -1792,6 +1844,7 @@ """ return diff.diff_directories(self, from_dir, to_dir, track_renaming) + @remote_api_endpoint('algos/diff_revisions') def diff_revisions(self, from_rev, to_rev, track_renaming=False): """Compute the list of file changes introduced between two arbitrary revisions (insertion / deletion / modification / renaming of files). @@ -1808,6 +1861,7 @@ """ return diff.diff_revisions(self, from_rev, to_rev, track_renaming) + @remote_api_endpoint('algos/diff_revision') def diff_revision(self, revision, track_renaming=False): """Compute the list of file changes introduced by a specific revision (insertion / deletion / modification / renaming of files) by comparing