diff --git a/debian/control b/debian/control --- a/debian/control +++ b/debian/control @@ -30,8 +30,8 @@ python3-swh.objstorage (>= 0.0.17~), ${misc:Depends}, ${python3:Depends} -Breaks: python3-swh.archiver (<< 0.0.3~), - python3-swh.indexer (<< 0.0.48~), +Breaks: python3-swh.archiver (<< 0.0.4~), + python3-swh.indexer (<< 0.0.51~), python3-swh.vault (<< 0.0.19~) Description: Software Heritage storage utilities diff --git a/swh/storage/common.py b/swh/storage/common.py --- a/swh/storage/common.py +++ b/swh/storage/common.py @@ -6,35 +6,66 @@ import functools -def db_transaction(meth): +def apply_options(cursor, options): + """Applies the given postgresql client options to the given cursor. + + Returns a dictionary with the old values if they changed.""" + old_options = {} + for option, value in options.items(): + cursor.execute('SHOW %s' % option) + old_value = cursor.fetchall()[0][0] + if old_value != value: + cursor.execute('SET LOCAL %s TO %%s' % option, (value,)) + old_options[option] = old_value + return old_options + + +def db_transaction(**client_options): """decorator to execute Storage methods within DB transactions - The decorated method must accept a `cur` keyword argument + The decorated method must accept a `cur` and `db` keyword argument + + Client options are passed as `set` options to the postgresql server """ - @functools.wraps(meth) - def _meth(self, *args, **kwargs): - if 'cur' in kwargs and kwargs['cur']: - return meth(self, *args, **kwargs) - else: - db = self.get_db() - with db.transaction() as cur: - return meth(self, *args, db=db, cur=cur, **kwargs) - return _meth - - -def db_transaction_generator(meth): + def decorator(meth, __client_options=client_options): + @functools.wraps(meth) + def _meth(self, *args, **kwargs): + if 'cur' in kwargs and kwargs['cur']: + cur = kwargs['cur'] + old_options = apply_options(cur, __client_options) + ret = meth(self, *args, **kwargs) + apply_options(cur, old_options) + return ret + else: + db = self.get_db() + with db.transaction() as cur: + apply_options(cur, __client_options) + return meth(self, *args, db=db, cur=cur, **kwargs) + return _meth + + return decorator + + +def db_transaction_generator(**client_options): """decorator to execute Storage methods within DB transactions, while returning a generator - The decorated method must accept a `cur` keyword argument + The decorated method must accept a `cur` and `db` keyword argument + Client options are passed as `set` options to the postgresql server """ - @functools.wraps(meth) - def _meth(self, *args, **kwargs): - if 'cur' in kwargs and kwargs['cur']: - yield from meth(self, *args, **kwargs) - else: - db = self.get_db() - with db.transaction() as cur: - yield from meth(self, *args, db=db, cur=cur, **kwargs) - return _meth + def decorator(meth, __client_options=client_options): + @functools.wraps(meth) + def _meth(self, *args, **kwargs): + if 'cur' in kwargs and kwargs['cur']: + cur = kwargs['cur'] + old_options = apply_options(cur, __client_options) + yield from meth(self, *args, **kwargs) + apply_options(cur, old_options) + else: + db = self.get_db() + with db.transaction() as cur: + apply_options(cur, __client_options) + yield from meth(self, *args, db=db, cur=cur, **kwargs) + return _meth + return decorator diff --git a/swh/storage/storage.py b/swh/storage/storage.py --- a/swh/storage/storage.py +++ b/swh/storage/storage.py @@ -1,4 +1,4 @@ -# Copyright (C) 2015-2017 The Software Heritage developers +# Copyright (C) 2015-2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information @@ -175,7 +175,7 @@ # transaction, bubbling up any exception _ = added_to_objstorage.result() - @db_transaction + @db_transaction() def content_update(self, content, keys=[], db=None, cur=None): """Update content blobs to the storage. Does nothing for unknown contents or skipped ones. @@ -236,7 +236,7 @@ yield {'sha1': obj_id, 'data': data} - @db_transaction_generator + @db_transaction_generator(statement_timeout=500) def content_get_metadata(self, content, db=None, cur=None): """Retrieve content metadata in bulk @@ -251,7 +251,7 @@ for content_metadata in db.content_get_metadata_from_temp(cur): yield dict(zip(db.content_get_metadata_keys, content_metadata)) - @db_transaction_generator + @db_transaction_generator() def content_missing(self, content, key_hash='sha1', db=None, cur=None): """List content missing from storage @@ -289,7 +289,7 @@ for obj in db.content_missing_from_temp(cur): yield obj[key_hash_idx] - @db_transaction_generator + @db_transaction_generator() def content_missing_per_sha1(self, contents, db=None, cur=None): """List content missing from storage based only on sha1. @@ -307,7 +307,7 @@ for obj in db.content_missing_per_sha1_from_temp(cur): yield obj[0] - @db_transaction_generator + @db_transaction_generator() def skipped_content_missing(self, content, db=None, cur=None): """List skipped_content missing from storage @@ -327,7 +327,7 @@ yield from db.skipped_content_missing_from_temp(cur) - @db_transaction + @db_transaction() def content_find(self, content, db=None, cur=None): """Find a content hash in db. @@ -423,7 +423,7 @@ # Do the final copy db.directory_add_from_temp(cur) - @db_transaction_generator + @db_transaction_generator() def directory_missing(self, directories, db=None, cur=None): """List directories missing from storage @@ -444,7 +444,7 @@ for obj in db.directory_missing_from_temp(cur): yield obj[0] - @db_transaction_generator + @db_transaction_generator() def directory_get(self, directories, db=None, cur=None): """Get information on directories. @@ -465,7 +465,7 @@ for line in dirs: yield dict(zip(keys, line)) - @db_transaction_generator + @db_transaction_generator(statement_timeout=2000) def directory_ls(self, directory, recursive=False, db=None, cur=None): """Get entries for one directory. @@ -485,7 +485,7 @@ for line in res_gen: yield dict(zip(db.directory_ls_cols, line)) - @db_transaction + @db_transaction(statement_timeout=2000) def directory_entry_get_by_path(self, directory, paths, db=None, cur=None): """Get the directory entry (either file or dir) from directory with path. @@ -561,7 +561,7 @@ db.copy_to(parents_filtered, 'revision_history', ['id', 'parent_id', 'parent_rank'], cur) - @db_transaction_generator + @db_transaction_generator() def revision_missing(self, revisions, db=None, cur=None): """List revisions missing from storage @@ -577,7 +577,7 @@ for obj in db.revision_missing_from_temp(cur): yield obj[0] - @db_transaction_generator + @db_transaction_generator(statement_timeout=500) def revision_get(self, revisions, db=None, cur=None): """Get all revisions from storage @@ -600,7 +600,7 @@ continue yield data - @db_transaction_generator + @db_transaction_generator(statement_timeout=2000) def revision_log(self, revisions, limit=None, db=None, cur=None): """Fetch revision entry from the given root revisions. @@ -621,7 +621,7 @@ continue yield data - @db_transaction_generator + @db_transaction_generator(statement_timeout=2000) def revision_shortlog(self, revisions, limit=None, db=None, cur=None): """Fetch the shortlog for the given revisions @@ -636,7 +636,7 @@ yield from db.revision_shortlog(revisions, limit, cur) - @db_transaction_generator + @db_transaction_generator(statement_timeout=2000) def revision_log_by(self, origin_id, branch_name=None, timestamp=None, limit=None, db=None, cur=None): """Fetch revision entry from the actual origin_id's latest revision. @@ -708,7 +708,7 @@ db.release_add_from_temp(cur) - @db_transaction_generator + @db_transaction_generator() def release_missing(self, releases, db=None, cur=None): """List releases missing from storage @@ -725,7 +725,7 @@ for obj in db.release_missing_from_temp(cur): yield obj[0] - @db_transaction_generator + @db_transaction_generator(statement_timeout=500) def release_get(self, releases, db=None, cur=None): """Given a list of sha1, return the releases's information @@ -753,7 +753,7 @@ dict(zip(db.release_get_cols, release)) ) - @db_transaction + @db_transaction() def snapshot_add(self, origin, visit, snapshot, back_compat=False, db=None, cur=None): """Add a snapshot for the given origin/visit couple @@ -822,7 +822,7 @@ self.occurrence_add(occurrences, db=db, cur=cur) - @db_transaction + @db_transaction(statement_timeout=2000) def snapshot_get(self, snapshot_id, db=None, cur=None): """Get the snapshot with the given id @@ -852,7 +852,7 @@ return None - @db_transaction + @db_transaction(statement_timeout=2000) def snapshot_get_by_origin_visit(self, origin, visit, db=None, cur=None): """Get the snapshot for the given origin visit @@ -881,7 +881,7 @@ return None - @db_transaction + @db_transaction(statement_timeout=2000) def snapshot_get_latest(self, origin, allowed_statuses=None, db=None, cur=None): """Get the latest snapshot for the given origin, optionally only from visits @@ -905,7 +905,7 @@ origin_visit = dict(zip(db.origin_visit_get_cols, origin_visit)) return self.snapshot_get(origin_visit['snapshot'], db=db, cur=cur) - @db_transaction + @db_transaction() def occurrence_add(self, occurrences, db=None, cur=None): """Add occurrences to the storage @@ -930,7 +930,7 @@ db.occurrence_history_add_from_temp(cur) - @db_transaction_generator + @db_transaction_generator(statement_timeout=2000) def occurrence_get(self, origin_id, db=None, cur=None): """Retrieve occurrence information per origin_id. @@ -949,7 +949,7 @@ 'target_type': line[3], } - @db_transaction + @db_transaction() def origin_visit_add(self, origin, ts, db=None, cur=None): """Add an origin_visit for the origin at ts with status 'ongoing'. @@ -973,7 +973,7 @@ 'visit': db.origin_visit_add(origin, ts, cur) } - @db_transaction + @db_transaction() def origin_visit_update(self, origin, visit_id, status, metadata=None, db=None, cur=None): """Update an origin_visit's status. @@ -990,7 +990,7 @@ """ return db.origin_visit_update(origin, visit_id, status, metadata, cur) - @db_transaction_generator + @db_transaction_generator(statement_timeout=500) def origin_visit_get(self, origin, last_visit=None, limit=None, db=None, cur=None): """Retrieve all the origin's visit's information. @@ -1011,7 +1011,7 @@ data = dict(zip(db.origin_visit_get_cols, line)) yield data - @db_transaction + @db_transaction(statement_timeout=2000) def origin_visit_get_by(self, origin, visit, db=None, cur=None): """Retrieve origin visit's information. @@ -1046,7 +1046,7 @@ return ori_visit - @db_transaction_generator + @db_transaction_generator(statement_timeout=500) def revision_get_by(self, origin_id, branch_name=None, @@ -1077,7 +1077,7 @@ continue yield data - @db_transaction_generator + @db_transaction_generator(statement_timeout=500) def release_get_by(self, origin_id, limit=None, db=None, cur=None): """Given an origin id, return all the tag objects pointing to heads of origin_id. @@ -1097,7 +1097,7 @@ ) yield data - @db_transaction + @db_transaction(statement_timeout=2000) def object_find_by_sha1_git(self, ids, db=None, cur=None): """Return the objects found with the given ids. @@ -1125,7 +1125,7 @@ origin_keys = ['id', 'type', 'url', 'lister', 'project'] - @db_transaction + @db_transaction(statement_timeout=500) def origin_get(self, origin, db=None, cur=None): """Return the origin either identified by its id or its tuple (type, url). @@ -1166,7 +1166,7 @@ return dict(zip(self.origin_keys, ori)) return None - @db_transaction_generator + @db_transaction_generator() def origin_search(self, url_pattern, offset=0, limit=50, regexp=False, db=None, cur=None): """Search for origins whose urls contain a provided string pattern @@ -1188,7 +1188,7 @@ regexp, cur): yield dict(zip(self.origin_keys, origin)) - @db_transaction + @db_transaction() def _person_add(self, person, db=None, cur=None): """Add a person in storage. @@ -1206,7 +1206,7 @@ """ return db.person_add(person) - @db_transaction_generator + @db_transaction_generator(statement_timeout=500) def person_get(self, person, db=None, cur=None): """Return the persons identified by their ids. @@ -1220,7 +1220,7 @@ for person in db.person_get(person): yield dict(zip(db.person_get_cols, person)) - @db_transaction + @db_transaction() def origin_add(self, origins, db=None, cur=None): """Add origins to the storage @@ -1242,7 +1242,7 @@ return ret - @db_transaction + @db_transaction() def origin_add_one(self, origin, db=None, cur=None): """Add origin to the storage @@ -1264,7 +1264,7 @@ return db.origin_add(origin['type'], origin['url'], cur) - @db_transaction + @db_transaction() def fetch_history_start(self, origin_id, db=None, cur=None): """Add an entry for origin origin_id in fetch_history. Returns the id of the added fetch_history entry @@ -1276,7 +1276,7 @@ return db.create_fetch_history(fetch_history, cur) - @db_transaction + @db_transaction() def fetch_history_end(self, fetch_history_id, data, db=None, cur=None): """Close the fetch_history entry with id `fetch_history_id`, replacing its data with `data`. @@ -1293,13 +1293,13 @@ db.update_fetch_history(fetch_history, cur) - @db_transaction + @db_transaction() def fetch_history_get(self, fetch_history_id, db=None, cur=None): """Get the fetch_history entry with id `fetch_history_id`. """ return db.get_fetch_history(fetch_history_id, cur) - @db_transaction + @db_transaction() def entity_add(self, entities, db=None, cur=None): """Add the given entitites to the database (in entity_history). @@ -1330,7 +1330,7 @@ db.copy_to(entities, 'tmp_entity_history', cols, cur) db.entity_history_add_from_temp() - @db_transaction_generator + @db_transaction_generator() def entity_get_from_lister_metadata(self, entities, db=None, cur=None): """Fetch entities from the database, matching with the lister and associated metadata. @@ -1373,7 +1373,7 @@ 'lister_metadata': entities[i], } - @db_transaction_generator + @db_transaction_generator(statement_timeout=2000) def entity_get(self, uuid, db=None, cur=None): """Returns the list of entity per its uuid identifier and also its parent hierarchy. @@ -1389,7 +1389,7 @@ for entity in db.entity_get(uuid, cur): yield dict(zip(db.entity_cols, entity)) - @db_transaction + @db_transaction(statement_timeout=500) def entity_get_one(self, uuid, db=None, cur=None): """Returns one entity using its uuid identifier. @@ -1406,7 +1406,7 @@ else: return None - @db_transaction + @db_transaction(statement_timeout=500) def stat_counters(self, db=None, cur=None): """compute statistics about the number of tuples in various tables @@ -1417,7 +1417,7 @@ """ return {k: v for (k, v) in db.stat_counters()} - @db_transaction + @db_transaction() def origin_metadata_add(self, origin_id, ts, provider, tool, metadata, db=None, cur=None): """ Add an origin_metadata for the origin at ts with provenance and @@ -1439,7 +1439,7 @@ return db.origin_metadata_add(origin_id, ts, provider, tool, metadata, cur) - @db_transaction_generator + @db_transaction_generator(statement_timeout=500) def origin_metadata_get_by(self, origin_id, provider_type=None, db=None, cur=None): """Retrieve list of all origin_metadata entries for the origin_id @@ -1465,7 +1465,7 @@ for line in db.origin_metadata_get_by(origin_id, provider_type, cur): yield dict(zip(db.origin_metadata_get_cols, line)) - @db_transaction_generator + @db_transaction_generator() def tool_add(self, tools, db=None, cur=None): """Add new tools to the storage. @@ -1493,7 +1493,7 @@ for line in tools: yield dict(zip(db.tool_cols, line)) - @db_transaction + @db_transaction(statement_timeout=500) def tool_get(self, tool, db=None, cur=None): """Retrieve tool information. @@ -1517,20 +1517,20 @@ return None return dict(zip(db.tool_cols, idx)) - @db_transaction + @db_transaction() def metadata_provider_add(self, provider_name, provider_type, provider_url, metadata, db=None, cur=None): return db.metadata_provider_add(provider_name, provider_type, provider_url, metadata, cur) - @db_transaction + @db_transaction() def metadata_provider_get(self, provider_id, db=None, cur=None): result = db.metadata_provider_get(provider_id) if not result: return None return dict(zip(db.metadata_provider_cols, result)) - @db_transaction + @db_transaction() def metadata_provider_get_by(self, provider, db=None, cur=None): result = db.metadata_provider_get_by(provider['provider_name'], provider['provider_url'])