diff --git a/sql/swh-schema.sql b/sql/swh-schema.sql index 861f318b..fe9dd64e 100644 --- a/sql/swh-schema.sql +++ b/sql/swh-schema.sql @@ -1,90 +1,97 @@ --- --- Software Heritage - SWH Deposit Data Model --- ----------- -- schema ----------- create table dbversion( version int primary key, release timestamptz, description text ); create type deposit_status as enum ( - 'partially-received', -- the deposit is partial since it can be done in multiple requests - 'received', -- deposit is fully deposited and can be injected - 'injecting', -- injection is ongoing on swh's side - 'injected', -- injection is successfully done - 'failed' -- injection failed due to some error + 'partial', -- the deposit is new or partially received since it + -- can be done in multiple requests + 'expired', -- deposit has been there too long and is now deemed + -- ready to be garbage collected + 'ready', -- deposit is fully received and ready for injection + 'injecting', -- injection is ongoing on swh's side + 'success', -- injection successful + 'failure' -- injection failure ); comment on type deposit_status is 'Deposit''s life cycle'; create table client( id bigserial primary key, - name text not null + name text not null, + credential bytea ); comment on table client is 'Deposit''s Client references'; comment on column client.id is 'Short identifier for the client'; comment on column client.name is 'Human readable name for the client e.g hal, arXiv, etc...'; +comment on column client.credential is 'Associated credential for the clients...'; create table deposit_type( id serial primary key, name text not null ); comment on table deposit_type is 'Deposit type'; comment on column deposit_type.id is 'Short identifier for the deposit type'; comment on column deposit_type.name is 'Human readable name for the deposit type e.g HAL, arXiv, etc...'; create table deposit( id bigserial primary key, reception_date timestamptz not null, complete_date timestamptz not null, type serial not null references deposit_type(id), external_id text not null, status deposit_status not null, - client_id bigint not null + client_id bigint not null, + swh_id text ); comment on table deposit is 'Deposit reception table of archive to load in swh'; comment on column deposit.id is 'Deposit receipt id'; comment on column deposit.reception_date is 'First deposit reception date'; comment on column deposit.complete_date is 'Date when the deposit is deemed complete and ready for injection'; comment on column deposit.type is 'Deposit reception source type'; comment on column deposit.external_id is 'Deposit''s unique external identifier'; comment on column deposit.status is 'Deposit''s status regarding injection'; comment on column deposit.client_id is 'Deposit client identifier'; +comment on column deposit.swh_id is 'SWH''s result identifier'; ---create unique index deposit_pkey on deposit(client_id, external_id); +-- create unique index deposit_pkey on deposit(client_id, external_id); -- We may need another table for the multiple requests -- When reading for injection, we'd need to aggregate information create table deposit_request( id bigserial primary key, deposit_id bigint not null references deposit(id), metadata jsonb not null ); comment on table deposit_request is 'Deposit request made by the client'; comment on column deposit_request.id is 'Deposit request id'; comment on column deposit_request.deposit_id is 'Deposit concerned by the request'; comment on column deposit_request.metadata is 'Deposit request information on the data to inject'; -- path to the archive + "raw" metadata from the source (e.g hal) ----------- -- data ----------- insert into dbversion(version, release, description) values(1, now(), 'Work In Progress'); insert into client(name) values ('hal'); diff --git a/swh/deposit/backend.py b/swh/deposit/backend.py new file mode 100644 index 00000000..8647642c --- /dev/null +++ b/swh/deposit/backend.py @@ -0,0 +1,163 @@ +# Copyright (C) 2017 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +from functools import wraps + +import psycopg2 +import psycopg2.extras + +from swh.core.config import SWHConfig + + +psycopg2.extensions.register_adapter(dict, psycopg2.extras.Json) + + +def autocommit(fn): + @wraps(fn) + def wrapped(self, *args, **kwargs): + autocommit = False + if 'cursor' not in kwargs or not kwargs['cursor']: + autocommit = True + kwargs['cursor'] = self.cursor() + + try: + ret = fn(self, *args, **kwargs) + except: + if autocommit: + self.rollback() + raise + + if autocommit: + self.commit() + + return ret + + return wrapped + + +class DepositBackend(SWHConfig): + """Backend for the Software Heritage deposit database. + + """ + + CONFIG_BASE_FILENAME = 'deposit/backend' + + DEFAULT_CONFIG = { + 'deposit_db': ('str', 'dbname=swh-deposit'), + } + + def __init__(self, **override_config): + self.config = self.parse_config_file(global_config=False) + self.config.update(override_config) + + self.db = None + + self.reconnect() + + def reconnect(self): + if not self.db or self.db.closed: + self.db = psycopg2.connect( + dsn=self.config['deposit_db'], + cursor_factory=psycopg2.extras.RealDictCursor, + ) + + def cursor(self): + """Return a fresh cursor on the database, with auto-reconnection in + case of failure + + """ + cur = None + + # Get a fresh cursor and reconnect at most three times + tries = 0 + while True: + tries += 1 + try: + cur = self.db.cursor() + cur.execute('select 1') + break + except psycopg2.OperationalError: + if tries < 3: + self.reconnect() + else: + raise + + return cur + + def commit(self): + """Commit a transaction""" + self.db.commit() + + def rollback(self): + """Rollback a transaction""" + self.db.rollback() + + deposit_keys = [ + 'reception_date', 'complete_date', 'type', 'external_id', + 'status', 'client_id', + ] + + def _format_query(self, query, keys): + """Format a query with the given keys""" + + query_keys = ', '.join(keys) + placeholders = ', '.join(['%s'] * len(keys)) + + return query.format(keys=query_keys, placeholders=placeholders) + + @autocommit + def deposit_add(self, deposit, cursor=None): + """Create a new deposit. + + A deposit is a dictionary with the following keys: + type (str): an identifier for the deposit type + reception_date (date): deposit's reception date + complete_date (date): deposit's date when the deposit is + deemed complete + external_id (str): the external identifier in the client's + information system + status (str): deposit status + client_id (integer): client's identifier + + """ + query = self._format_query( + """insert into deposit ({keys}) values ({placeholders})""", + self.deposit_keys, + ) + cursor.execute(query, [deposit[key] for key in self.deposi_keys]) + + @autocommit + def deposit_get(self, id, cursor=None): + """Retrieve the task type with id + + """ + query = self._format_query( + "select {keys} from deposit where type=%s", + self.deposit_keys, + ) + cursor.execute(query, (id,)) + ret = cursor.fetchone() + return ret + + @autocommit + def request_add(self, request, cursor=None): + pass + + @autocommit + def request_get(self, deposit_id, cursor=None): + pass + + @autocommit + def client_list(self, cursor=None): + cursor.execute('select id, name from client') + + return {row['name']: row['id'] for row in cursor.fetchall()} + + @autocommit + def client_get(self, id, cursor=None): + cursor.execute('select id, name, credential from client where id=%s', + (id, )) + + return cursor.fetchone() diff --git a/swh/deposit/server.py b/swh/deposit/server.py index e5a4d65d..f358aecc 100644 --- a/swh/deposit/server.py +++ b/swh/deposit/server.py @@ -1,107 +1,119 @@ # Copyright (C) 2017 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import asyncio import aiohttp.web import click import jinja2 +import json from swh.core import config from swh.core.config import SWHConfig from swh.core.api_async import SWHRemoteAPI - +from swh.deposit.backend import DepositBackend DEFAULT_CONFIG_SERVER = { 'host': ('str', '0.0.0.0'), 'port': ('int', 5012), } def encode_data(data, template_name=None, **kwargs): return aiohttp.web.Response( body=data, headers={'Content-Type': 'application/xml'}, **kwargs ) class DepositWebServer(SWHConfig): """Base class to define endpoints route. """ CONFIG_BASE_FILENAME = 'deposit/server' DEFAULT_CONFIG = { 'max_upload_size': ('int', 200 * 1024 * 1024), + 'deposit_db': ('str', 'dbname=deposit-db'), } def __init__(self, config=None): if config: self.config = config else: self.config = self.parse_config_file() template_loader = jinja2.FileSystemLoader( searchpath=["swh/deposit/templates"]) self.template_env = jinja2.Environment(loader=template_loader) + self.backend = DepositBackend() @asyncio.coroutine def index(self, request): return aiohttp.web.Response(text='SWH Deposit Server') @asyncio.coroutine def service_document(self, request): tpl = self.template_env.get_template('service_document.xml') output = tpl.render( - noop=True, verbose=False, max_upload_size=200*1024*1024) + noop=True, verbose=False, + max_upload_size=self.config['max_upload_size']) return encode_data(data=output) @asyncio.coroutine def create_document(self, request): pass @asyncio.coroutine def update_document(self, request): pass @asyncio.coroutine def status_operation(self, request): pass @asyncio.coroutine def delete_document(self, request): raise ValueError('Not implemented') + @asyncio.coroutine + def client_get(self, request): + clients = self.backend.client_list() + return aiohttp.web.Response( + body=json.dumps(clients), + headers={'Content-Type': 'application/json'}) + def make_app(config, **kwargs): app = SWHRemoteAPI(**kwargs) server = DepositWebServer() - app.router.add_route('GET', '/', server.index) + app.router.add_route('GET', '/', server.index) app.router.add_route('GET', '/api/1/deposit/', server.service_document) app.router.add_route('GET', '/api/1/status/', server.status_operation) app.router.add_route('POST', '/api/1/deposit/', server.create_document) app.router.add_route('PUT', '/api/1/deposit/', server.update_document) app.router.add_route('DELETE', '/api/1/deposit/', server.delete_document) + app.router.add_route('GET', '/api/1/client/', server.client_get) app.update(config) return app @click.command() @click.argument('config-path', required=1) @click.option('--host', default='0.0.0.0', help="Host to run the server") @click.option('--port', default=5012, type=click.INT, help="Binding port of the server") @click.option('--debug/--nodebug', default=True, help="Indicates if the server should run in debug mode") def launch(config_path, host, port, debug): cfg = config.read(config_path, DEFAULT_CONFIG_SERVER) port = port if port else cfg['port'] host = host if host else cfg['host'] app = make_app(cfg, debug=bool(debug)) aiohttp.web.run_app(app, host=host, port=port) if __name__ == '__main__': launch()