diff --git a/sql/updater/sql/swh-func.sql b/sql/updater/sql/swh-func.sql index 435ce34..cbde2ed 100644 --- a/sql/updater/sql/swh-func.sql +++ b/sql/updater/sql/swh-func.sql @@ -1,34 +1,48 @@ -- Postgresql index helper function create or replace function hash_sha1(text) returns sha1 as $$ select public.digest($1, 'sha1') :: sha1 $$ language sql strict immutable; comment on function hash_sha1(text) is 'Compute sha1 hash as text'; -- create a temporary table for cache tmp_cache, create or replace function swh_mktemp_cache() returns void language sql as $$ create temporary table tmp_cache ( like cache including defaults ) on commit drop; alter table tmp_cache drop column id; $$; create or replace function swh_cache_put() returns void language plpgsql as $$ begin - insert into cache (id, url, rate, last_seen, origin_type) - select hash_sha1(url), url, rate, last_seen, origin_type + insert into cache (id, url, origin_type, rate, last_seen) + select hash_sha1(url), url, origin_type, rate, last_seen from tmp_cache t on conflict(id) do update set rate = (select rate from cache where id=excluded.id) + excluded.rate, last_seen = excluded.last_seen; return; end $$; + +comment on function swh_cache_put() is 'Write to cache temporary events'; + +create or replace function swh_cache_read(ts timestamptz, lim integer) + returns setof cache + language sql stable +as $$ + select id, url, origin_type, rate, first_seen, last_seen + from cache + where last_seen <= ts + limit lim; +$$; + +comment on function swh_cache_read(timestamptz, integer) is 'Read cache entries'; diff --git a/sql/updater/sql/swh-schema.sql b/sql/updater/sql/swh-schema.sql index 965a4bf..2f6736f 100644 --- a/sql/updater/sql/swh-schema.sql +++ b/sql/updater/sql/swh-schema.sql @@ -1,28 +1,29 @@ create table dbversion ( version int primary key, release timestamptz not null, description text not null ); comment on table dbversion is 'Schema update tracking'; -- a SHA1 checksum (not necessarily originating from Git) create domain sha1 as bytea check (length(value) = 20); insert into dbversion (version, release, description) values (1, now(), 'Work In Progress'); create type origin_type as enum ('git', 'svn', 'hg', 'deb'); comment on type origin_type is 'Url''s repository type'; create table cache ( id sha1 primary key, url text not null, + origin_type origin_type not null, rate int default 1, - last_seen timestamptz not null, - origin_type origin_type not null -); + first_seen timestamptz not null default now(), + last_seen timestamptz not null + ); create index on cache(url); create index on cache(last_seen); diff --git a/swh/scheduler/updater/backend.py b/swh/scheduler/updater/backend.py index e4b6958..1b898e7 100644 --- a/swh/scheduler/updater/backend.py +++ b/swh/scheduler/updater/backend.py @@ -1,80 +1,78 @@ # Copyright (C) 2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from arrow import utcnow from swh.core.config import SWHConfig from swh.scheduler.backend import DbBackend, autocommit class SchedulerUpdaterBackend(SWHConfig, DbBackend): CONFIG_BASE_FILENAME = 'scheduler-updater' DEFAULT_CONFIG = { 'scheduling_updater_db': ( 'str', 'dbname=softwareheritage-scheduler-updater-dev'), 'cache_read_limit': ('int', 1000), } def __init__(self, **override_config): super().__init__() if override_config: self.config = override_config else: self.config = self.parse_config_file(global_config=False) self.db = None self.db_conn_dsn = self.config['scheduling_updater_db'] self.limit = self.config['cache_read_limit'] self.reconnect() cache_put_keys = ['url', 'rate', 'last_seen', 'origin_type'] @autocommit def cache_put(self, events, timestamp=None, cursor=None): """Write new events in the backend. """ if timestamp is None: timestamp = utcnow() def prepare_events(events): for e in events: event = e.get() seen = event['last_seen'] if seen is None: event['last_seen'] = timestamp yield event cursor.execute('select swh_mktemp_cache()') self.copy_to(prepare_events(events), 'tmp_cache', self.cache_put_keys, cursor) cursor.execute('select swh_cache_put()') - cache_read_keys = ['id', 'url', 'rate', 'origin_type'] + cache_read_keys = ['id', 'url', 'origin_type', 'rate', 'first_seen', + 'last_seen'] @autocommit def cache_read(self, timestamp, limit=None, cursor=None): """Read events from the cache prior to timestamp. """ if not limit: limit = self.limit - q = self._format_query("""select {keys} - from cache - where last_seen <= %s - limit %s - """, self.cache_read_keys) + q = self._format_query('select {keys} from swh_cache_read(%s, %s)', + self.cache_read_keys) cursor.execute(q, (timestamp, limit)) for r in cursor.fetchall(): r['id'] = r['id'].tobytes() yield r @autocommit def cache_remove(self, entries, cursor=None): """Clean events from the cache """ q = 'delete from cache where url in (%s)' % ( ', '.join(("'%s'" % e for e in entries)), ) cursor.execute(q)