diff --git a/sql/archiver/swh-archiver-func.sql b/sql/archiver/swh-archiver-func.sql index 0274bd1..625ed28 100644 --- a/sql/archiver/swh-archiver-func.sql +++ b/sql/archiver/swh-archiver-func.sql @@ -1,108 +1,127 @@ create or replace function swh_mktemp_content_archive() returns void language sql as $$ create temporary table tmp_content_archive ( like content_archive including defaults ) on commit drop; alter table tmp_content_archive drop column copies; alter table tmp_content_archive drop column num_present; $$; COMMENT ON FUNCTION swh_mktemp_content_archive() IS 'Create temporary table content_archive'; create or replace function swh_content_archive_missing(backend_name text) returns setof sha1 language plpgsql as $$ begin return query select content_id from tmp_content_archive tmp where exists ( select 1 from content_archive c where tmp.content_id = c.content_id and (not c.copies ? backend_name or c.copies @> jsonb_build_object(backend_name, '{"status": "missing"}'::jsonb)) ); end $$; COMMENT ON FUNCTION swh_content_archive_missing(text) IS 'Filter missing data from a specific backend'; create or replace function swh_content_archive_unknown() returns setof sha1 language plpgsql as $$ begin return query select content_id from tmp_content_archive tmp where not exists ( select 1 from content_archive c where tmp.content_id = c.content_id ); end $$; COMMENT ON FUNCTION swh_content_archive_unknown() IS 'Retrieve list of unknown sha1s'; CREATE OR REPLACE FUNCTION count_copies(from_id bytea, to_id bytea) returns void language sql as $$ with sample as ( select content_id, copies from content_archive where content_id > from_id and content_id <= to_id ), data as ( select substring(content_id from 19) as bucket, jbe.key as archive from sample join lateral jsonb_each(copies) jbe on true where jbe.value->>'status' = 'present' ), bucketed as ( select bucket, archive, count(*) as count from data group by bucket, archive ) update content_archive_counts cac set count = cac.count + bucketed.count from bucketed where cac.archive = bucketed.archive and cac.bucket = bucketed.bucket; $$; comment on function count_copies(bytea, bytea) is 'Count the objects between from_id and to_id, add the results to content_archive_counts'; CREATE OR REPLACE FUNCTION init_content_archive_counts() returns void language sql as $$ insert into content_archive_counts ( select id, decode(lpad(to_hex(bucket), 4, '0'), 'hex')::bucket as bucket, 0 as count from archive join lateral generate_series(0, 65535) bucket on true ) on conflict (archive, bucket) do nothing; $$; comment on function init_content_archive_counts() is 'Initialize the content archive counts for the registered archives'; create type content_archive_count as ( archive text, count bigint ); create or replace function get_content_archive_counts() returns setof content_archive_count language sql as $$ select archive, sum(count)::bigint from content_archive_counts group by archive order by archive; $$; comment on function get_content_archive_counts() is 'Get count for each archive'; --- Add new content_archive from temporary table, skipping duplicates. +-- create a temporary table called tmp_TBLNAME, mimicking existing table +-- TBLNAME +create or replace function swh_mktemp(tblname regclass) + returns void + language plpgsql +as $$ +begin + execute format(' + create temporary table tmp_%1$I + (like %1$I including defaults) + on commit drop; + ', tblname); + return; +end +$$; + +comment on function swh_mktemp(regclass) is 'Helper function to create a temporary table mimicking the existing one'; + +-- Helper function to insert new entries in content_archive from a +-- temporary table skipping duplicates. create or replace function swh_content_archive_add() returns void language plpgsql as $$ begin insert into content_archive (content_id, copies, num_present) select distinct content_id, copies, num_present from tmp_content_archive on conflict(content_id) do nothing; return; end $$; comment on function swh_content_archive_add() is 'Helper function to insert new entry in content_archive'; diff --git a/sql/archiver/upgrades/009.sql b/sql/archiver/upgrades/009.sql index 96932a4..5a3133b 100644 --- a/sql/archiver/upgrades/009.sql +++ b/sql/archiver/upgrades/009.sql @@ -1,22 +1,42 @@ --- SWH DB schema upgrade +-- SWH Archiver DB schema upgrade -- from_version: 8 -- to_version: 9 --- description: Add helper function to create new entries in content_archive table +-- description: Add helper functions to create temporary table and insert new entries in content_archive table -INSERT INTO dbversion(version, release, description) -VALUES(9, now(), 'Work In Progress'); +insert into dbversion(version, release, description) +values(9, now(), 'Work In Progress'); +-- create a temporary table called tmp_TBLNAME, mimicking existing +-- table TBLNAME +create or replace function swh_mktemp(tblname regclass) + returns void + language plpgsql +as $$ +begin + execute format(' + create temporary table tmp_%1$I + (like %1$I including defaults) + on commit drop; + ', tblname); + return; +end +$$; + +comment on function swh_mktemp(regclass) is 'Helper function to create a temporary table mimicking the existing one'; + +-- Helper function to insert new entries in content_archive from a +-- temporary table skipping duplicates. create or replace function swh_content_archive_add() returns void language plpgsql as $$ begin insert into content_archive (content_id, copies, num_present) select distinct content_id, copies, num_present from tmp_content_archive on conflict(content_id) do nothing; return; end $$; comment on function swh_content_archive_add() is 'Helper function to insert new entry in content_archive'; diff --git a/swh/storage/archiver/db.py b/swh/storage/archiver/db.py index 6b20e03..b130d3b 100644 --- a/swh/storage/archiver/db.py +++ b/swh/storage/archiver/db.py @@ -1,284 +1,251 @@ # Copyright (C) 2015-2017 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information -import json import time from swh.core import hashutil from swh.storage.db import BaseDb, cursor_to_bytes, stored_procedure class ArchiverDb(BaseDb): """Proxy to the SWH's archiver DB """ def archive_ls(self, cur=None): """ Get all the archives registered on the server. Yields: a tuple (server_id, server_url) for each archive server. """ cur = self._cursor(cur) cur.execute("SELECT * FROM archive") yield from cursor_to_bytes(cur) def content_archive_get(self, content_id, cur=None): """ Get the archival status of a content in a specific server. Retrieve from the database the archival status of the given content in the given archive server. Args: content_id: the sha1 of the content. Yields: A tuple (content_id, present_copies, ongoing_copies), where ongoing_copies is a dict mapping copy to mtime. """ query = """SELECT content_id, array( SELECT key FROM jsonb_each(copies) WHERE value->>'status' = 'present' ORDER BY key ) AS present, array( SELECT key FROM jsonb_each(copies) WHERE value->>'status' = 'ongoing' ORDER BY key ) AS ongoing, array( SELECT value->'mtime' FROM jsonb_each(copies) WHERE value->>'status' = 'ongoing' ORDER BY key ) AS ongoing_mtime FROM content_archive WHERE content_id = %s ORDER BY content_id """ cur = self._cursor(cur) cur.execute(query, (content_id,)) row = cur.fetchone() if not row: return None content_id, present, ongoing, mtimes = row return (content_id, present, dict(zip(ongoing, mtimes))) def content_archive_get_copies(self, last_content=None, limit=1000, cur=None): """Get the list of copies for `limit` contents starting after `last_content`. Args: last_content: sha1 of the last content retrieved. May be None to start at the beginning. limit: number of contents to retrieve. Can be None to retrieve all objects (will be slow). Yields: A tuple (content_id, present_copies, ongoing_copies), where ongoing_copies is a dict mapping copy to mtime. """ query = """SELECT content_id, array( SELECT key FROM jsonb_each(copies) WHERE value->>'status' = 'present' ORDER BY key ) AS present, array( SELECT key FROM jsonb_each(copies) WHERE value->>'status' = 'ongoing' ORDER BY key ) AS ongoing, array( SELECT value->'mtime' FROM jsonb_each(copies) WHERE value->>'status' = 'ongoing' ORDER BY key ) AS ongoing_mtime FROM content_archive WHERE content_id > %s ORDER BY content_id LIMIT %s """ if last_content is None: last_content = b'' cur = self._cursor(cur) cur.execute(query, (last_content, limit)) for content_id, present, ongoing, mtimes in cursor_to_bytes(cur): yield (content_id, present, dict(zip(ongoing, mtimes))) def content_archive_get_unarchived_copies( self, retention_policy, last_content=None, limit=1000, cur=None): """ Get the list of copies for `limit` contents starting after `last_content`. Yields only copies with number of present smaller than `retention policy`. Args: last_content: sha1 of the last content retrieved. May be None to start at the beginning. retention_policy: number of required present copies limit: number of contents to retrieve. Can be None to retrieve all objects (will be slow). Yields: A tuple (content_id, present_copies, ongoing_copies), where ongoing_copies is a dict mapping copy to mtime. """ query = """SELECT content_id, array( SELECT key FROM jsonb_each(copies) WHERE value->>'status' = 'present' ORDER BY key ) AS present, array( SELECT key FROM jsonb_each(copies) WHERE value->>'status' = 'ongoing' ORDER BY key ) AS ongoing, array( SELECT value->'mtime' FROM jsonb_each(copies) WHERE value->>'status' = 'ongoing' ORDER BY key ) AS ongoing_mtime FROM content_archive WHERE content_id > %s AND num_present < %s ORDER BY content_id LIMIT %s """ if last_content is None: last_content = b'' cur = self._cursor(cur) cur.execute(query, (last_content, retention_policy, limit)) for content_id, present, ongoing, mtimes in cursor_to_bytes(cur): yield (content_id, present, dict(zip(ongoing, mtimes))) @stored_procedure('swh_mktemp_content_archive') def mktemp_content_archive(self, cur=None): """Trigger the creation of the temporary table tmp_content_archive during the lifetime of the transaction. """ pass - @stored_procedure('swh_add_content_archive') - def add_content_archive_from_temp(self, cur=None): + @stored_procedure('swh_content_archive_add') + def content_archive_add_from_temp(self, cur=None): """Add new content archive entries from temporary table. Use from archiver.storage module: self.db.mktemp_content_archive() # copy data over to the temp table self.db.copy_to([{'colname': id0}, {'colname': id1}], 'tmp_cache_content', ['colname'], cur) # insert into the main table self.db.add_content_archive_from_temp(cur) """ pass def content_archive_get_missing(self, backend_name, cur=None): """Retrieve the content missing from backend_name. """ cur = self._cursor(cur) cur.execute("select * from swh_content_archive_missing(%s)", (backend_name,)) yield from cursor_to_bytes(cur) def content_archive_get_unknown(self, cur=None): """Retrieve unknown sha1 from archiver db. """ cur = self._cursor(cur) cur.execute('select * from swh_content_archive_unknown()') yield from cursor_to_bytes(cur) def content_archive_update(self, content_id, archive_id, new_status=None, cur=None): """ Update the status of an archive content and set its mtime to Change the mtime of an archived content for the given archive and set it's mtime to the current time. Args: content_id (str): content sha1 archive_id (str): name of the archive new_status (str): one of 'missing', 'present' or 'ongoing'. this status will replace the previous one. If not given, the function only change the mtime of the content for the given archive. """ if isinstance(content_id, bytes): content_id = '\\x%s' % hashutil.hash_to_hex(content_id) if new_status is not None: query = """UPDATE content_archive SET copies=jsonb_set( copies, '{%s}', '{"status":"%s", "mtime":%d}' ) WHERE content_id='%s' """ % (archive_id, new_status, int(time.time()), content_id) else: query = """ UPDATE content_archive SET copies=jsonb_set(copies, '{%s,mtime}', '%d') WHERE content_id='%s' """ % (archive_id, int(time.time())) cur = self._cursor(cur) cur.execute(query) - - def content_archive_add( - self, content_id, sources_present, sources_missing, cur=None): - """Add content archive entry for the content content_id. - The status is: - - present for all sources in sources_present. - - missing for all sources in sources_missing. - - """ - - if isinstance(content_id, bytes): - content_id = '\\x%s' % hashutil.hash_to_hex(content_id) - - copies = {} - num_present = 0 - for source in sources_present: - copies[source] = { - "status": "present", - "mtime": int(time.time()), - } - num_present += 1 - - for source in sources_missing: - copies[source] = { - "status": "missing", - } - - query = """INSERT INTO content_archive(content_id, copies, num_present) - VALUES('%s', '%s', %s) - """ % (content_id, json.dumps(copies), num_present) - cur = self._cursor(cur) - cur.execute(query) diff --git a/swh/storage/archiver/director.py b/swh/storage/archiver/director.py index 457e132..8d62b64 100644 --- a/swh/storage/archiver/director.py +++ b/swh/storage/archiver/director.py @@ -1,300 +1,299 @@ # Copyright (C) 2015-2017 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import abc import click import sys from swh.core import config, utils, hashutil from swh.objstorage import get_objstorage from swh.scheduler.utils import get_task from . import tasks # noqa from .storage import ArchiverStorage class ArchiverDirectorBase(config.SWHConfig, metaclass=abc.ABCMeta): """Abstract Director class An archiver director is in charge of dispatching batch of contents to archiver workers (for them to archive). Inherit from this class and provide: - ADDITIONAL_CONFIG: Some added configuration needed for the director to work - CONFIG_BASE_FILENAME: relative path to lookup for the configuration file - def get_contents_to_archive(self): Implementation method to read contents to archive """ DEFAULT_CONFIG = { 'batch_max_size': ('int', 1500), 'asynchronous': ('bool', True), 'dbconn': ('str', 'dbname=softwareheritage-archiver-dev user=guest') } # Destined to be overridden by subclass ADDITIONAL_CONFIG = {} # We use the same configuration file as the worker CONFIG_BASE_FILENAME = 'archiver/worker' # The worker's task queue name to use TASK_NAME = None def __init__(self): """ Constructor of the archiver director. Args: db_conn_archiver: Either a libpq connection string, or a psycopg2 connection for the archiver db. config: optionnal additional configuration. Keys in the dict will override the one parsed from the configuration file. """ super().__init__() self.config = self.parse_config_file( additional_configs=[self.ADDITIONAL_CONFIG]) self.archiver_storage = ArchiverStorage(self.config['dbconn']) self.task = get_task(self.TASK_NAME) def run(self): """ Run the archiver director. The archiver director will check all the contents of the archiver database and do the required backup jobs. """ if self.config['asynchronous']: run_fn = self.run_async_worker else: run_fn = self.run_sync_worker for batch in self.read_batch_contents(): run_fn(batch) def run_async_worker(self, batch): """Produce a worker that will be added to the task queue. """ self.task.delay(batch=batch) def run_sync_worker(self, batch): """Run synchronously a worker on the given batch. """ self.task(batch=batch) def read_batch_contents(self): """ Create batch of contents that needs to be archived Yields: batch of sha1 that corresponds to contents that needs more archive copies. """ contents = [] for content in self.get_contents_to_archive(): contents.append(content) if len(contents) > self.config['batch_max_size']: yield contents contents = [] if len(contents) > 0: yield contents @abc.abstractmethod def get_contents_to_archive(self): """Retrieve generator of sha1 to archive Yields: sha1 to archive """ pass class ArchiverWithRetentionPolicyDirector(ArchiverDirectorBase): """Process the files in order to know which one is needed as backup. The archiver director processes the files in the local storage in order to know which one needs archival and it delegates this task to archiver workers. """ ADDITIONAL_CONFIG = { 'retention_policy': ('int', 2), } TASK_NAME = 'swh.storage.archiver.tasks.SWHArchiverWithRetentionPolicyTask' def get_contents_to_archive(self): """Create batch of contents that needs to be archived Yields: Datas about a content as a tuple (content_id, present_copies, ongoing_copies) where ongoing_copies is a dict mapping copy to mtime. """ last_content = None while True: archiver_contents = list( self.archiver_storage.content_archive_get_unarchived_copies( last_content=last_content, retention_policy=self.config['retention_policy'])) if not archiver_contents: return for content_id, _, _ in archiver_contents: last_content = content_id yield content_id def read_sha1_from_stdin(): """Read sha1 from stdin. """ for sha1 in sys.stdin: yield {'content_id': hashutil.hex_to_hash(sha1.rstrip())} class ArchiverStdinToBackendDirector(ArchiverDirectorBase): """A cloud archiver director in charge of reading contents and send them in batch in the cloud. The archiver director, in order: - Reads sha1 to send to a specific backend. - Checks if those sha1 are known in the archiver. If they are not, add them - if the sha1 are missing, they are sent for the worker to archive If the flag force_copy is set, this will force the copy to be sent for archive even though it has already been done. """ ADDITIONAL_CONFIG = { 'destination': ('str', 'azure'), 'force_copy': ('bool', False), 'source': ('str', 'uffizi'), 'storages': ('list[dict]', [ {'host': 'uffizi', 'cls': 'pathslicing', 'args': {'root': '/tmp/softwareheritage/objects', 'slicing': '0:2/2:4/4:6'}}, {'host': 'banco', 'cls': 'remote', 'args': {'base_url': 'http://banco:5003/'}} ]) } CONFIG_BASE_FILENAME = 'archiver/worker-to-backend' TASK_NAME = 'swh.storage.archiver.tasks.SWHArchiverToBackendTask' def __init__(self): super().__init__() self.destination = self.config['destination'] self.force_copy = self.config['force_copy'] self.objstorages = { storage['host']: get_objstorage(storage['cls'], storage['args']) for storage in self.config.get('storages', []) } # Fallback objstorage self.source = self.config['source'] # Where the content is missing - self.sources_missing = list( - set(self.objstorages.keys()) - set(self.source)) + self.sources_missing = set(self.objstorages.keys()) - set([self.source]) # noqa def _add_unknown_content_ids(self, content_ids, source_objstorage): """Check whether some content_id are unknown. If they are, add them to the archiver db. Args: content_ids: List of dict with one key content_id source_objstorage (ObjStorage): objstorage to check if content_id is there """ - unknowns = self.archiver_storage.content_archive_get_unknown( - content_ids) self.archiver_storage.content_archive_add( - [u_id for u_id in unknowns if u_id in self.source], + (h['content_id'] + for h in content_ids + if h['content_id'] in source_objstorage), sources_present=[self.source], sources_missing=self.sources_missing) def get_contents_to_archive(self): gen_content_ids = ( ids for ids in utils.grouper(read_sha1_from_stdin(), self.config['batch_max_size'])) source_objstorage = self.objstorages[self.source] if self.force_copy: for content_ids in gen_content_ids: content_ids = list(content_ids) if not content_ids: continue # Add missing entries in archiver table self._add_unknown_content_ids(content_ids, source_objstorage) print('Send %s contents to archive' % len(content_ids)) for content in content_ids: content_id = content['content_id'] # force its status to missing self.archiver_storage.content_archive_update( content_id, self.destination, 'missing') yield content_id else: for content_ids in gen_content_ids: content_ids = list(content_ids) # Add missing entries in archiver table self._add_unknown_content_ids(content_ids, source_objstorage) # Filter already copied data content_ids = list( self.archiver_storage.content_archive_get_missing( content_ids=content_ids, backend_name=self.destination)) if not content_ids: continue print('Send %s contents to archive' % len(content_ids)) for content in content_ids: yield content def run_async_worker(self, batch): """Produce a worker that will be added to the task queue. """ self.task.delay(destination=self.destination, batch=batch) def run_sync_worker(self, batch): """Run synchronously a worker on the given batch. """ self.task(destination=self.destination, batch=batch) @click.command() @click.option('--direct', is_flag=True, help="""The archiver sends content for backup to one storage.""") def launch(direct): if direct: archiver = ArchiverStdinToBackendDirector() else: archiver = ArchiverWithRetentionPolicyDirector() archiver.run() if __name__ == '__main__': launch() diff --git a/swh/storage/archiver/storage.py b/swh/storage/archiver/storage.py index a63301a..eb02bc5 100644 --- a/swh/storage/archiver/storage.py +++ b/swh/storage/archiver/storage.py @@ -1,199 +1,200 @@ # Copyright (C) 2016-2017 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import json import psycopg2 import time from .db import ArchiverDb from swh.storage.common import db_transaction_generator, db_transaction from swh.storage.exc import StorageDBError class ArchiverStorage(): """SWH Archiver storage proxy, encompassing DB """ def __init__(self, db_conn): """ Args: db_conn: either a libpq connection string, or a psycopg2 connection """ try: if isinstance(db_conn, psycopg2.extensions.connection): self.db = ArchiverDb(db_conn) else: self.db = ArchiverDb.connect(db_conn) except psycopg2.OperationalError as e: raise StorageDBError(e) @db_transaction_generator def archive_ls(self, cur=None): """ Get all the archives registered on the server. Yields: a tuple (server_id, server_url) for each archive server. """ yield from self.db.archive_ls(cur) @db_transaction def content_archive_get(self, content_id, cur=None): """ Get the archival status of a content. Retrieve from the database the archival status of the given content Args: content_id: the sha1 of the content Yields: A tuple (content_id, present_copies, ongoing_copies), where ongoing_copies is a dict mapping copy to mtime. """ return self.db.content_archive_get(content_id, cur) @db_transaction_generator def content_archive_get_copies(self, last_content=None, limit=1000, cur=None): """ Get the list of copies for `limit` contents starting after `last_content`. Args: last_content: sha1 of the last content retrieved. May be None to start at the beginning. limit: number of contents to retrieve. Can be None to retrieve all objects (will be slow). Yields: A tuple (content_id, present_copies, ongoing_copies), where ongoing_copies is a dict mapping copy to mtime. """ yield from self.db.content_archive_get_copies(last_content, limit, cur) @db_transaction_generator def content_archive_get_unarchived_copies( self, retention_policy, last_content=None, limit=1000, cur=None): """ Get the list of copies for `limit` contents starting after `last_content`. Yields only copies with number of present smaller than `retention policy`. Args: last_content: sha1 of the last content retrieved. May be None to start at the beginning. retention_policy: number of required present copies limit: number of contents to retrieve. Can be None to retrieve all objects (will be slow). Yields: A tuple (content_id, present_copies, ongoing_copies), where ongoing_copies is a dict mapping copy to mtime. """ yield from self.db.content_archive_get_unarchived_copies( retention_policy, last_content, limit, cur) @db_transaction_generator def content_archive_get_missing(self, content_ids, backend_name, cur=None): """Retrieve missing sha1s from source_name. Args: content_ids ([sha1s]): list of sha1s to test source_name (str): Name of the backend to check for content Yields: missing sha1s from backend_name """ db = self.db db.mktemp_content_archive() db.copy_to(content_ids, 'tmp_content_archive', ['content_id'], cur) for content_id in db.content_archive_get_missing(backend_name, cur): yield content_id[0] @db_transaction_generator def content_archive_get_unknown(self, content_ids, cur=None): """Retrieve unknown sha1s from content_archive. Args: content_ids ([sha1s]): list of sha1s to test Yields: Unknown sha1s from content_archive """ db = self.db db.mktemp_content_archive() db.copy_to(content_ids, 'tmp_content_archive', ['content_id'], cur) for content_id in db.content_archive_get_unknown(cur): yield content_id[0] @db_transaction def content_archive_update(self, content_id, archive_id, new_status=None, cur=None): """ Update the status of an archive content and set its mtime to now Change the mtime of an archived content for the given archive and set it's mtime to the current time. Args: content_id (str): content sha1 archive_id (str): name of the archive new_status (str): one of 'missing', 'present' or 'ongoing'. this status will replace the previous one. If not given, the function only change the mtime of the content for the given archive. """ self.db.content_archive_update(content_id, archive_id, new_status, cur) @db_transaction def content_archive_add( self, content_ids, sources_present, sources_missing, cur=None): """Insert a new entry in db about content_id. Args: content_ids ([bytes|str]): content identifiers sources_present ([str]): List of source names where contents are present sources_missing ([str]): List of sources names where contents are missing """ + db = self.db + + # Prepare copies dictionary copies = {} - num_present = 0 for source in sources_present: copies[source] = { "status": "present", "mtime": int(time.time()), } - num_present += 1 for source in sources_missing: copies[source] = { "status": "missing", } copies = json.dumps(copies) num_present = len(sources_present) - db = self.db - db.mktemp_content_archive(cur) - db.copy_to(({'content_id': id, - 'copies': copies, - 'num_present': num_present} - for id in content_ids), - 'tmp_content_archive', - ['content_id', 'copies', 'num_present'], - cur) + db.mktemp('content_archive') + db.copy_to( + ({'content_id': id, + 'copies': copies, + 'num_present': num_present} + for id in content_ids), + 'tmp_content_archive', + ['content_id', 'copies', 'num_present'], + cur) db.content_archive_add_from_temp(cur) diff --git a/swh/storage/db.py b/swh/storage/db.py index 73c2214..6bb4c8c 100644 --- a/swh/storage/db.py +++ b/swh/storage/db.py @@ -1,966 +1,966 @@ # Copyright (C) 2015-2016 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import binascii import datetime import functools import json import psycopg2 import psycopg2.extras import select import tempfile from contextlib import contextmanager from swh.core import hashutil TMP_CONTENT_TABLE = 'tmp_content' psycopg2.extras.register_uuid() def stored_procedure(stored_proc): """decorator to execute remote stored procedure, specified as argument Generally, the body of the decorated function should be empty. If it is not, the stored procedure will be executed first; the function body then. """ def wrap(meth): @functools.wraps(meth) def _meth(self, *args, **kwargs): cur = kwargs.get('cur', None) self._cursor(cur).execute('SELECT %s()' % stored_proc) meth(self, *args, **kwargs) return _meth return wrap def jsonize(value): """Convert a value to a psycopg2 JSON object if necessary""" if isinstance(value, dict): return psycopg2.extras.Json(value) return value def entry_to_bytes(entry): """Convert an entry coming from the database to bytes""" if isinstance(entry, memoryview): return entry.tobytes() if isinstance(entry, list): return [entry_to_bytes(value) for value in entry] return entry def line_to_bytes(line): """Convert a line coming from the database to bytes""" if not line: return line if isinstance(line, dict): return {k: entry_to_bytes(v) for k, v in line.items()} return line.__class__(entry_to_bytes(entry) for entry in line) def cursor_to_bytes(cursor): """Yield all the data from a cursor as bytes""" yield from (line_to_bytes(line) for line in cursor) class BaseDb: """Base class for swh.storage.*Db. cf. swh.storage.db.Db, swh.storage.archiver.db.ArchiverDb """ @classmethod def connect(cls, *args, **kwargs): """factory method to create a DB proxy Accepts all arguments of psycopg2.connect; only some specific possibilities are reported below. Args: connstring: libpq2 connection string """ conn = psycopg2.connect(*args, **kwargs) return cls(conn) def _cursor(self, cur_arg): """get a cursor: from cur_arg if given, or a fresh one otherwise meant to avoid boilerplate if/then/else in methods that proxy stored procedures """ if cur_arg is not None: return cur_arg # elif self.cur is not None: # return self.cur else: return self.conn.cursor() def __init__(self, conn): """create a DB proxy Args: conn: psycopg2 connection to the SWH DB """ self.conn = conn @contextmanager def transaction(self): """context manager to execute within a DB transaction Yields: a psycopg2 cursor """ with self.conn.cursor() as cur: try: yield cur self.conn.commit() except: if not self.conn.closed: self.conn.rollback() raise def copy_to(self, items, tblname, columns, cur=None, item_cb=None): """Copy items' entries to table tblname with columns information. Args: items (dict): dictionary of data to copy over tblname tblname (str): Destination table's name columns ([str]): keys to access data in items and also the column names in the destination table. item_cb (fn): optional function to apply to items's entry """ def escape(data): if data is None: return '' if isinstance(data, bytes): return '\\x%s' % binascii.hexlify(data).decode('ascii') elif isinstance(data, str): return '"%s"' % data.replace('"', '""') elif isinstance(data, datetime.datetime): # We escape twice to make sure the string generated by # isoformat gets escaped return escape(data.isoformat()) elif isinstance(data, dict): return escape(json.dumps(data)) elif isinstance(data, list): return escape("{%s}" % ','.join(escape(d) for d in data)) elif isinstance(data, psycopg2.extras.Range): # We escape twice here too, so that we make sure # everything gets passed to copy properly return escape( '%s%s,%s%s' % ( '[' if data.lower_inc else '(', '-infinity' if data.lower_inf else escape(data.lower), 'infinity' if data.upper_inf else escape(data.upper), ']' if data.upper_inc else ')', ) ) else: # We don't escape here to make sure we pass literals properly return str(data) with tempfile.TemporaryFile('w+') as f: for d in items: if item_cb is not None: item_cb(d) line = [escape(d.get(k)) for k in columns] f.write(','.join(line)) f.write('\n') f.seek(0) self._cursor(cur).copy_expert('COPY %s (%s) FROM STDIN CSV' % ( tblname, ', '.join(columns)), f) + def mktemp(self, tblname, cur=None): + self._cursor(cur).execute('SELECT swh_mktemp(%s)', (tblname,)) + class Db(BaseDb): """Proxy to the SWH DB, with wrappers around stored procedures """ - def mktemp(self, tblname, cur=None): - self._cursor(cur).execute('SELECT swh_mktemp(%s)', (tblname,)) - def mktemp_dir_entry(self, entry_type, cur=None): self._cursor(cur).execute('SELECT swh_mktemp_dir_entry(%s)', (('directory_entry_%s' % entry_type),)) @stored_procedure('swh_mktemp_revision') def mktemp_revision(self, cur=None): pass @stored_procedure('swh_mktemp_release') def mktemp_release(self, cur=None): pass @stored_procedure('swh_mktemp_occurrence_history') def mktemp_occurrence_history(self, cur=None): pass @stored_procedure('swh_mktemp_entity_lister') def mktemp_entity_lister(self, cur=None): pass @stored_procedure('swh_mktemp_entity_history') def mktemp_entity_history(self, cur=None): pass @stored_procedure('swh_mktemp_bytea') def mktemp_bytea(self, cur=None): pass @stored_procedure('swh_mktemp_content_ctags') def mktemp_content_ctags(self, cur=None): pass @stored_procedure('swh_mktemp_content_ctags_missing') def mktemp_content_ctags_missing(self, cur=None): pass def register_listener(self, notify_queue, cur=None): """Register a listener for NOTIFY queue `notify_queue`""" self._cursor(cur).execute("LISTEN %s" % notify_queue) def listen_notifies(self, timeout): """Listen to notifications for `timeout` seconds""" if select.select([self.conn], [], [], timeout) == ([], [], []): return else: self.conn.poll() while self.conn.notifies: yield self.conn.notifies.pop(0) @stored_procedure('swh_content_add') def content_add_from_temp(self, cur=None): pass @stored_procedure('swh_directory_add') def directory_add_from_temp(self, cur=None): pass @stored_procedure('swh_skipped_content_add') def skipped_content_add_from_temp(self, cur=None): pass @stored_procedure('swh_revision_add') def revision_add_from_temp(self, cur=None): pass @stored_procedure('swh_release_add') def release_add_from_temp(self, cur=None): pass @stored_procedure('swh_occurrence_history_add') def occurrence_history_add_from_temp(self, cur=None): pass @stored_procedure('swh_entity_history_add') def entity_history_add_from_temp(self, cur=None): pass @stored_procedure('swh_cache_content_revision_add') def cache_content_revision_add(self, cur=None): pass def store_tmp_bytea(self, ids, cur=None): """Store the given identifiers in a new tmp_bytea table""" cur = self._cursor(cur) self.mktemp_bytea(cur) self.copy_to(({'id': elem} for elem in ids), 'tmp_bytea', ['id'], cur) content_get_metadata_keys = ['sha1', 'sha1_git', 'sha256', 'length', 'status'] def content_get_metadata_from_temp(self, cur=None): cur = self._cursor(cur) cur.execute("""select t.id as sha1, %s from tmp_bytea t left join content on t.id = content.sha1 """ % ', '.join(self.content_get_metadata_keys[1:])) yield from cursor_to_bytes(cur) def content_missing_from_temp(self, cur=None): cur = self._cursor(cur) cur.execute("""SELECT sha1, sha1_git, sha256 FROM swh_content_missing()""") yield from cursor_to_bytes(cur) def content_missing_per_sha1_from_temp(self, cur=None): cur = self._cursor(cur) cur.execute("""SELECT * FROM swh_content_missing_per_sha1()""") yield from cursor_to_bytes(cur) def skipped_content_missing_from_temp(self, cur=None): cur = self._cursor(cur) cur.execute("""SELECT sha1, sha1_git, sha256 FROM swh_skipped_content_missing()""") yield from cursor_to_bytes(cur) def occurrence_get(self, origin_id, cur=None): """Retrieve latest occurrence's information by origin_id. """ cur = self._cursor(cur) cur.execute("""SELECT origin, branch, target, target_type, (select max(date) from origin_visit where origin=%s) as date FROM occurrence WHERE origin=%s """, (origin_id, origin_id)) yield from cursor_to_bytes(cur) def content_find(self, sha1=None, sha1_git=None, sha256=None, cur=None): """Find the content optionally on a combination of the following checksums sha1, sha1_git or sha256. Args: sha1: sha1 content git_sha1: the sha1 computed `a la git` sha1 of the content sha256: sha256 content Returns: The triplet (sha1, sha1_git, sha256) if found or None. """ cur = self._cursor(cur) cur.execute("""SELECT sha1, sha1_git, sha256, length, ctime, status FROM swh_content_find(%s, %s, %s) LIMIT 1""", (sha1, sha1_git, sha256)) content = line_to_bytes(cur.fetchone()) if set(content) == {None}: return None else: return content provenance_cols = ['content', 'revision', 'origin', 'visit', 'path'] def content_find_provenance(self, sha1_git, cur=None): """Find content's provenance information Args: sha1: sha1_git content cur: cursor to use Returns: Provenance information on such content """ cur = self._cursor(cur) cur.execute("""SELECT content, revision, origin, visit, path FROM swh_content_find_provenance(%s)""", (sha1_git, )) yield from cursor_to_bytes(cur) def directory_get_from_temp(self, cur=None): cur = self._cursor(cur) cur.execute('''SELECT id, file_entries, dir_entries, rev_entries FROM swh_directory_get()''') yield from cursor_to_bytes(cur) def directory_missing_from_temp(self, cur=None): cur = self._cursor(cur) cur.execute('SELECT * FROM swh_directory_missing()') yield from cursor_to_bytes(cur) directory_ls_cols = ['dir_id', 'type', 'target', 'name', 'perms', 'status', 'sha1', 'sha1_git', 'sha256'] def directory_walk_one(self, directory, cur=None): cur = self._cursor(cur) cur.execute('SELECT * FROM swh_directory_walk_one(%s)', (directory,)) yield from cursor_to_bytes(cur) def directory_walk(self, directory, cur=None): cur = self._cursor(cur) cur.execute('SELECT * FROM swh_directory_walk(%s)', (directory,)) yield from cursor_to_bytes(cur) def revision_missing_from_temp(self, cur=None): cur = self._cursor(cur) cur.execute('SELECT id FROM swh_revision_missing() as r(id)') yield from cursor_to_bytes(cur) revision_add_cols = [ 'id', 'date', 'date_offset', 'date_neg_utc_offset', 'committer_date', 'committer_date_offset', 'committer_date_neg_utc_offset', 'type', 'directory', 'message', 'author_fullname', 'author_name', 'author_email', 'committer_fullname', 'committer_name', 'committer_email', 'metadata', 'synthetic', ] revision_get_cols = revision_add_cols + [ 'author_id', 'committer_id', 'parents'] def origin_visit_add(self, origin, ts, cur=None): """Add a new origin_visit for origin origin at timestamp ts with status 'ongoing'. Args: origin: origin concerned by the visit ts: the date of the visit Returns: The new visit index step for that origin """ cur = self._cursor(cur) self._cursor(cur).execute('SELECT swh_origin_visit_add(%s, %s)', (origin, ts)) return cur.fetchone()[0] def origin_visit_update(self, origin, visit_id, status, metadata, cur=None): """Update origin_visit's status.""" cur = self._cursor(cur) update = """UPDATE origin_visit SET status=%s, metadata=%s WHERE origin=%s AND visit=%s""" cur.execute(update, (status, jsonize(metadata), origin, visit_id)) origin_visit_get_cols = ['origin', 'visit', 'date', 'status', 'metadata'] def origin_visit_get_all(self, origin_id, last_visit=None, limit=None, cur=None): """Retrieve all visits for origin with id origin_id. Args: origin_id: The occurrence's origin Yields: The occurrence's history visits """ cur = self._cursor(cur) query_suffix = '' if last_visit: query_suffix += ' AND %s < visit' % last_visit if limit: query_suffix += ' LIMIT %s' % limit query = """\ SELECT %s FROM origin_visit WHERE origin=%%s %s""" % ( ', '.join(self.origin_visit_get_cols), query_suffix) cur.execute(query, (origin_id, )) yield from cursor_to_bytes(cur) def origin_visit_get(self, origin_id, visit_id, cur=None): """Retrieve information on visit visit_id of origin origin_id. Args: origin_id: the origin concerned visit_id: The visit step for that origin Returns: The origin_visit information """ cur = self._cursor(cur) query = """\ SELECT %s FROM origin_visit WHERE origin = %%s AND visit = %%s """ % (', '.join(self.origin_visit_get_cols)) cur.execute(query, (origin_id, visit_id)) r = cur.fetchall() if not r: return None return line_to_bytes(r[0]) occurrence_cols = ['origin', 'branch', 'target', 'target_type'] def occurrence_by_origin_visit(self, origin_id, visit_id, cur=None): """Retrieve all occurrences for a particular origin_visit. Args: origin_id: the origin concerned visit_id: The visit step for that origin Yields: The occurrence's history visits """ cur = self._cursor(cur) query = """\ SELECT %s FROM swh_occurrence_by_origin_visit(%%s, %%s) """ % (', '.join(self.occurrence_cols)) cur.execute(query, (origin_id, visit_id)) yield from cursor_to_bytes(cur) def revision_get_from_temp(self, cur=None): cur = self._cursor(cur) query = 'SELECT %s FROM swh_revision_get()' % ( ', '.join(self.revision_get_cols)) cur.execute(query) yield from cursor_to_bytes(cur) def revision_log(self, root_revisions, limit=None, cur=None): cur = self._cursor(cur) query = """SELECT %s FROM swh_revision_log(%%s, %%s) """ % ', '.join(self.revision_get_cols) cur.execute(query, (root_revisions, limit)) yield from cursor_to_bytes(cur) revision_shortlog_cols = ['id', 'parents'] def revision_shortlog(self, root_revisions, limit=None, cur=None): cur = self._cursor(cur) query = """SELECT %s FROM swh_revision_list(%%s, %%s) """ % ', '.join(self.revision_shortlog_cols) cur.execute(query, (root_revisions, limit)) yield from cursor_to_bytes(cur) cache_content_get_cols = [ 'sha1', 'sha1_git', 'sha256', 'revision_paths'] def cache_content_get_all(self, cur=None): """Retrieve cache contents' sha1, sha256, sha1_git """ cur = self._cursor(cur) cur.execute('SELECT * FROM swh_cache_content_get_all()') yield from cursor_to_bytes(cur) def cache_content_get(self, sha1_git, cur=None): """Retrieve cache content information sh. """ cur = self._cursor(cur) cur.execute('SELECT * FROM swh_cache_content_get(%s)', (sha1_git, )) data = cur.fetchone() if data: return line_to_bytes(data) return None def cache_revision_origin_add(self, origin, visit, cur=None): """Populate the content provenance information cache for the given (origin, visit) couple.""" cur = self._cursor(cur) cur.execute('SELECT * FROM swh_cache_revision_origin_add(%s, %s)', (origin, visit)) yield from cursor_to_bytes(cur) def release_missing_from_temp(self, cur=None): cur = self._cursor(cur) cur.execute('SELECT id FROM swh_release_missing() as r(id)') yield from cursor_to_bytes(cur) object_find_by_sha1_git_cols = ['sha1_git', 'type', 'id', 'object_id'] def object_find_by_sha1_git(self, ids, cur=None): cur = self._cursor(cur) self.store_tmp_bytea(ids, cur) query = 'select %s from swh_object_find_by_sha1_git()' % ( ', '.join(self.object_find_by_sha1_git_cols) ) cur.execute(query) yield from cursor_to_bytes(cur) def stat_counters(self, cur=None): cur = self._cursor(cur) cur.execute('SELECT * FROM swh_stat_counters()') yield from cur fetch_history_cols = ['origin', 'date', 'status', 'result', 'stdout', 'stderr', 'duration'] def create_fetch_history(self, fetch_history, cur=None): """Create a fetch_history entry with the data in fetch_history""" cur = self._cursor(cur) query = '''INSERT INTO fetch_history (%s) VALUES (%s) RETURNING id''' % ( ','.join(self.fetch_history_cols), ','.join(['%s'] * len(self.fetch_history_cols)) ) cur.execute(query, [fetch_history.get(col) for col in self.fetch_history_cols]) return cur.fetchone()[0] def get_fetch_history(self, fetch_history_id, cur=None): """Get a fetch_history entry with the given id""" cur = self._cursor(cur) query = '''SELECT %s FROM fetch_history WHERE id=%%s''' % ( ', '.join(self.fetch_history_cols), ) cur.execute(query, (fetch_history_id,)) data = cur.fetchone() if not data: return None ret = {'id': fetch_history_id} for i, col in enumerate(self.fetch_history_cols): ret[col] = data[i] return ret def update_fetch_history(self, fetch_history, cur=None): """Update the fetch_history entry from the data in fetch_history""" cur = self._cursor(cur) query = '''UPDATE fetch_history SET %s WHERE id=%%s''' % ( ','.join('%s=%%s' % col for col in self.fetch_history_cols) ) cur.execute(query, [jsonize(fetch_history.get(col)) for col in self.fetch_history_cols + ['id']]) base_entity_cols = ['uuid', 'parent', 'name', 'type', 'description', 'homepage', 'active', 'generated', 'lister_metadata', 'metadata'] entity_cols = base_entity_cols + ['last_seen', 'last_id'] entity_history_cols = base_entity_cols + ['id', 'validity'] def origin_add(self, type, url, cur=None): """Insert a new origin and return the new identifier.""" insert = """INSERT INTO origin (type, url) values (%s, %s) RETURNING id""" cur.execute(insert, (type, url)) return cur.fetchone()[0] def origin_get_with(self, type, url, cur=None): """Retrieve the origin id from its type and url if found.""" cur = self._cursor(cur) query = """SELECT id, type, url, lister, project FROM origin WHERE type=%s AND url=%s""" cur.execute(query, (type, url)) data = cur.fetchone() if data: return line_to_bytes(data) return None def origin_get(self, id, cur=None): """Retrieve the origin per its identifier. """ cur = self._cursor(cur) query = "SELECT id, type, url, lister, project FROM origin WHERE id=%s" cur.execute(query, (id,)) data = cur.fetchone() if data: return line_to_bytes(data) return None person_cols = ['fullname', 'name', 'email'] person_get_cols = person_cols + ['id'] def person_add(self, person, cur=None): """Add a person identified by its name and email. Returns: The new person's id """ cur = self._cursor(cur) query_new_person = '''\ INSERT INTO person(%s) VALUES (%s) RETURNING id''' % ( ', '.join(self.person_cols), ', '.join('%s' for i in range(len(self.person_cols))) ) cur.execute(query_new_person, [person[col] for col in self.person_cols]) return cur.fetchone()[0] def person_get(self, ids, cur=None): """Retrieve the persons identified by the list of ids. """ cur = self._cursor(cur) query = """SELECT %s FROM person WHERE id IN %%s""" % ', '.join(self.person_get_cols) cur.execute(query, (tuple(ids),)) yield from cursor_to_bytes(cur) release_add_cols = [ 'id', 'target', 'target_type', 'date', 'date_offset', 'date_neg_utc_offset', 'name', 'comment', 'synthetic', 'author_fullname', 'author_name', 'author_email', ] release_get_cols = release_add_cols + ['author_id'] def release_get_from_temp(self, cur=None): cur = self._cursor(cur) query = ''' SELECT %s FROM swh_release_get() ''' % ', '.join(self.release_get_cols) cur.execute(query) yield from cursor_to_bytes(cur) def release_get_by(self, origin_id, limit=None, cur=None): """Retrieve a release by occurrence criterion (only origin right now) Args: - origin_id: The origin to look for. """ cur = self._cursor(cur) query = """ SELECT %s FROM swh_release_get_by(%%s) LIMIT %%s """ % ', '.join(self.release_get_cols) cur.execute(query, (origin_id, limit)) yield from cursor_to_bytes(cur) def revision_get_by(self, origin_id, branch_name, datetime, limit=None, cur=None): """Retrieve a revision by occurrence criterion. Args: - origin_id: The origin to look for - branch_name: the branch name to look for - datetime: the lower bound of timerange to look for. - limit: limit number of results to return The upper bound being now. """ cur = self._cursor(cur) if branch_name and isinstance(branch_name, str): branch_name = branch_name.encode('utf-8') query = ''' SELECT %s FROM swh_revision_get_by(%%s, %%s, %%s) LIMIT %%s ''' % ', '.join(self.revision_get_cols) cur.execute(query, (origin_id, branch_name, datetime, limit)) yield from cursor_to_bytes(cur) def directory_entry_get_by_path(self, directory, paths, cur=None): """Retrieve a directory entry by path. """ cur = self._cursor(cur) cur.execute("""SELECT dir_id, type, target, name, perms, status, sha1, sha1_git, sha256 FROM swh_find_directory_entry_by_path(%s, %s)""", (directory, paths)) data = cur.fetchone() if set(data) == {None}: return None return line_to_bytes(data) def entity_get(self, uuid, cur=None): """Retrieve the entity and its parent hierarchy chain per uuid. """ cur = self._cursor(cur) cur.execute("""SELECT %s FROM swh_entity_get(%%s)""" % ( ', '.join(self.entity_cols)), (uuid, )) yield from cursor_to_bytes(cur) def entity_get_one(self, uuid, cur=None): """Retrieve a single entity given its uuid. """ cur = self._cursor(cur) cur.execute("""SELECT %s FROM entity WHERE uuid = %%s""" % ( ', '.join(self.entity_cols)), (uuid, )) data = cur.fetchone() if not data: return None return line_to_bytes(data) content_mimetype_cols = ['id', 'mimetype', 'encoding', 'tool_name', 'tool_version'] @stored_procedure('swh_mktemp_content_mimetype_missing') def mktemp_content_mimetype_missing(self, cur=None): pass def content_mimetype_missing_from_temp(self, cur=None): """List missing mimetypes. """ cur = self._cursor(cur) cur.execute("SELECT * FROM swh_content_mimetype_missing()") yield from cursor_to_bytes(cur) @stored_procedure('swh_mktemp_content_mimetype') def mktemp_content_mimetype(self, cur=None): pass def content_mimetype_add_from_temp(self, conflict_update, cur=None): self._cursor(cur).execute("SELECT swh_content_mimetype_add(%s)", (conflict_update, )) content_language_cols = ['id', 'lang', 'tool_name', 'tool_version'] @stored_procedure('swh_mktemp_content_language') def mktemp_content_language(self, cur=None): pass def content_mimetype_get_from_temp(self, cur=None): cur = self._cursor(cur) query = "SELECT %s FROM swh_content_mimetype_get()" % ( ','.join(self.content_mimetype_cols)) cur.execute(query) yield from cursor_to_bytes(cur) @stored_procedure('swh_mktemp_content_language_missing') def mktemp_content_language_missing(self, cur=None): pass def content_language_missing_from_temp(self, cur=None): """List missing languages. """ cur = self._cursor(cur) cur.execute("SELECT * FROM swh_content_language_missing()") yield from cursor_to_bytes(cur) def content_language_add_from_temp(self, conflict_update, cur=None): self._cursor(cur).execute("SELECT swh_content_language_add(%s)", (conflict_update, )) def content_language_get_from_temp(self, cur=None): cur = self._cursor(cur) query = "SELECT %s FROM swh_content_language_get()" % ( ','.join(self.content_language_cols)) cur.execute(query) yield from cursor_to_bytes(cur) def content_ctags_missing_from_temp(self, cur=None): """List missing ctags. """ cur = self._cursor(cur) cur.execute("SELECT * FROM swh_content_ctags_missing()") yield from cursor_to_bytes(cur) def content_ctags_add_from_temp(self, conflict_update, cur=None): self._cursor(cur).execute("SELECT swh_content_ctags_add(%s)", (conflict_update, )) content_ctags_cols = ['id', 'name', 'kind', 'line', 'lang', 'tool_name', 'tool_version'] def content_ctags_get_from_temp(self, cur=None): cur = self._cursor(cur) query = "SELECT %s FROM swh_content_ctags_get()" % ( ','.join(self.content_ctags_cols)) cur.execute(query) yield from cursor_to_bytes(cur) def content_ctags_search(self, expression, last_sha1, limit, cur=None): cur = self._cursor(cur) if not last_sha1: query = """SELECT %s FROM swh_content_ctags_search(%%s, %%s)""" % ( ','.join(self.content_ctags_cols)) cur.execute(query, (expression, limit)) else: if last_sha1 and isinstance(last_sha1, bytes): last_sha1 = '\\x%s' % hashutil.hash_to_hex(last_sha1) elif last_sha1: last_sha1 = '\\x%s' % last_sha1 query = """SELECT %s FROM swh_content_ctags_search(%%s, %%s, %%s)""" % ( ','.join(self.content_ctags_cols)) cur.execute(query, (expression, limit, last_sha1)) yield from cursor_to_bytes(cur) content_fossology_license_cols = ['id', 'tool_name', 'tool_version', 'licenses'] @stored_procedure('swh_mktemp_content_fossology_license_missing') def mktemp_content_fossology_license_missing(self, cur=None): pass def content_fossology_license_missing_from_temp(self, cur=None): """List missing licenses. """ cur = self._cursor(cur) cur.execute("SELECT * FROM swh_content_fossology_license_missing()") yield from cursor_to_bytes(cur) @stored_procedure('swh_mktemp_content_fossology_license') def mktemp_content_fossology_license(self, cur=None): pass @stored_procedure('swh_mktemp_content_fossology_license_unknown') def mktemp_content_fossology_license_unknown(self, cur=None): pass def content_fossology_license_add_from_temp(self, conflict_update, cur=None): """Add new licenses per content. """ self._cursor(cur).execute( "SELECT swh_content_fossology_license_add(%s)", (conflict_update, )) def content_fossology_license_get_from_temp(self, cur=None): """Retrieve licenses per content. """ cur = self._cursor(cur) query = "SELECT %s FROM swh_content_fossology_license_get()" % ( ','.join(self.content_fossology_license_cols)) cur.execute(query) yield from cursor_to_bytes(cur) def content_fossology_license_unknown(self, cur=None): """Returns the unknown licenses from tmp_content_fossology_license_unknown. """ cur = self._cursor(cur) cur.execute("SELECT * FROM swh_content_fossology_license_unknown()") yield from cursor_to_bytes(cur)