Changeset View
Changeset View
Standalone View
Standalone View
swh/scheduler/updater/backend.py
# Copyright (C) 2018 The Software Heritage developers | # Copyright (C) 2018 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
from arrow import utcnow | from arrow import utcnow | ||||
import psycopg2.pool | import psycopg2.pool | ||||
import psycopg2.extras | import psycopg2.extras | ||||
from swh.scheduler.backend import DbBackend | |||||
from swh.core.db import BaseDb | |||||
from swh.core.db.common import db_transaction, db_transaction_generator | from swh.core.db.common import db_transaction, db_transaction_generator | ||||
from swh.scheduler.backend import format_query | |||||
class SchedulerUpdaterBackend: | class SchedulerUpdaterBackend: | ||||
CONFIG_BASE_FILENAME = 'backend/scheduler-updater' | CONFIG_BASE_FILENAME = 'backend/scheduler-updater' | ||||
# 'cache_read_limit': ('int', 1000), | # 'cache_read_limit': ('int', 1000), | ||||
def __init__(self, db, cache_read_limit=1000, | def __init__(self, db, cache_read_limit=1000, | ||||
min_pool_conns=1, max_pool_conns=10): | min_pool_conns=1, max_pool_conns=10): | ||||
""" | """ | ||||
Args: | Args: | ||||
db_conn: either a libpq connection string, or a psycopg2 connection | db_conn: either a libpq connection string, or a psycopg2 connection | ||||
""" | """ | ||||
if isinstance(db, psycopg2.extensions.connection): | if isinstance(db, psycopg2.extensions.connection): | ||||
self._pool = None | self._pool = None | ||||
self._db = DbBackend(db) | self._db = BaseDb(db) | ||||
else: | else: | ||||
self._pool = psycopg2.pool.ThreadedConnectionPool( | self._pool = psycopg2.pool.ThreadedConnectionPool( | ||||
min_pool_conns, max_pool_conns, db, | min_pool_conns, max_pool_conns, db, | ||||
cursor_factory=psycopg2.extras.RealDictCursor, | cursor_factory=psycopg2.extras.RealDictCursor, | ||||
) | ) | ||||
self._db = None | self._db = None | ||||
self.limit = cache_read_limit | self.limit = cache_read_limit | ||||
def get_db(self): | def get_db(self): | ||||
if self._db: | if self._db: | ||||
return self._db | return self._db | ||||
return DbBackend.from_pool(self._pool) | return BaseDb.from_pool(self._pool) | ||||
cache_put_keys = ['url', 'cnt', 'last_seen', 'origin_type'] | cache_put_keys = ['url', 'cnt', 'last_seen', 'origin_type'] | ||||
@db_transaction() | @db_transaction() | ||||
def cache_put(self, events, timestamp=None, db=None, cur=None): | def cache_put(self, events, timestamp=None, db=None, cur=None): | ||||
"""Write new events in the backend. | """Write new events in the backend. | ||||
""" | """ | ||||
if timestamp is None: | if timestamp is None: | ||||
timestamp = utcnow() | timestamp = utcnow() | ||||
def prepare_events(events): | def prepare_events(events): | ||||
for e in events: | for e in events: | ||||
event = e.get() | event = e.get() | ||||
seen = event['last_seen'] | seen = event['last_seen'] | ||||
if seen is None: | if seen is None: | ||||
event['last_seen'] = timestamp | event['last_seen'] = timestamp | ||||
yield event | yield event | ||||
cur.execute('select swh_mktemp_cache()') | cur.execute('select swh_mktemp_cache()') | ||||
db.copy_to(prepare_events(events), | db.copy_to(prepare_events(events, timestamp), | ||||
'tmp_cache', self.cache_put_keys, cursor=cur) | 'tmp_cache', self.cache_put_keys, cur=cur) | ||||
cur.execute('select swh_cache_put()') | cur.execute('select swh_cache_put()') | ||||
cache_read_keys = ['id', 'url', 'origin_type', 'cnt', 'first_seen', | cache_read_keys = ['id', 'url', 'origin_type', 'cnt', 'first_seen', | ||||
'last_seen'] | 'last_seen'] | ||||
@db_transaction_generator() | @db_transaction_generator() | ||||
def cache_read(self, timestamp=None, limit=None, db=None, cur=None): | def cache_read(self, timestamp=None, limit=None, db=None, cur=None): | ||||
"""Read events from the cache prior to timestamp. | """Read events from the cache prior to timestamp. | ||||
""" | """ | ||||
if not timestamp: | if not timestamp: | ||||
timestamp = utcnow() | timestamp = utcnow() | ||||
if not limit: | if not limit: | ||||
limit = self.limit | limit = self.limit | ||||
q = db._format_query('select {keys} from swh_cache_read(%s, %s)', | q = format_query('select {keys} from swh_cache_read(%s, %s)', | ||||
self.cache_read_keys) | self.cache_read_keys) | ||||
cur.execute(q, (timestamp, limit)) | cur.execute(q, (timestamp, limit)) | ||||
for r in cur.fetchall(): | for r in cur.fetchall(): | ||||
r['id'] = r['id'].tobytes() | r['id'] = r['id'].tobytes() | ||||
yield r | yield r | ||||
@db_transaction() | @db_transaction() | ||||
def cache_remove(self, entries, db=None, cur=None): | def cache_remove(self, entries, db=None, cur=None): | ||||
"""Clean events from the cache | """Clean events from the cache | ||||
""" | """ | ||||
q = 'delete from cache where url in (%s)' % ( | q = 'delete from cache where url in (%s)' % ( | ||||
', '.join(("'%s'" % e for e in entries)), ) | ', '.join(("'%s'" % e for e in entries)), ) | ||||
cur.execute(q) | cur.execute(q) |