Changeset View
Changeset View
Standalone View
Standalone View
swh/scheduler/updater/backend.py
Show First 20 Lines • Show All 41 Lines • ▼ Show 20 Lines | # 'cache_read_limit': ('int', 1000), | ||||
cache_put_keys = ['url', 'cnt', 'last_seen', 'origin_type'] | cache_put_keys = ['url', 'cnt', 'last_seen', 'origin_type'] | ||||
@db_transaction() | @db_transaction() | ||||
def cache_put(self, events, timestamp=None, db=None, cur=None): | def cache_put(self, events, timestamp=None, db=None, cur=None): | ||||
"""Write new events in the backend. | """Write new events in the backend. | ||||
""" | """ | ||||
if timestamp is None: | |||||
timestamp = utcnow() | |||||
def prepare_events(events): | |||||
for e in events: | |||||
event = e.get() | |||||
seen = event['last_seen'] | |||||
if seen is None: | |||||
event['last_seen'] = timestamp | |||||
yield event | |||||
cur.execute('select swh_mktemp_cache()') | cur.execute('select swh_mktemp_cache()') | ||||
db.copy_to(prepare_events(events, timestamp), | db.copy_to(prepare_events(events, timestamp), | ||||
'tmp_cache', self.cache_put_keys, cur=cur) | 'tmp_cache', self.cache_put_keys, cur=cur) | ||||
cur.execute('select swh_cache_put()') | cur.execute('select swh_cache_put()') | ||||
cache_read_keys = ['id', 'url', 'origin_type', 'cnt', 'first_seen', | cache_read_keys = ['id', 'url', 'origin_type', 'cnt', 'first_seen', | ||||
'last_seen'] | 'last_seen'] | ||||
@db_transaction_generator() | @db_transaction_generator() | ||||
def cache_read(self, timestamp=None, limit=None, db=None, cur=None): | def cache_read(self, timestamp=None, limit=None, db=None, cur=None): | ||||
"""Read events from the cache prior to timestamp. | """Read events from the cache prior to timestamp. | ||||
ardumont: `not mean` | |||||
Note that limit=None does not mean 'no limit' but use the default | |||||
limit (see cache_read_limit constructor argument). | |||||
""" | """ | ||||
if not timestamp: | if not timestamp: | ||||
timestamp = utcnow() | timestamp = utcnow() | ||||
if not limit: | if not limit: | ||||
limit = self.limit | limit = self.limit | ||||
q = format_query('select {keys} from swh_cache_read(%s, %s)', | q = format_query('select {keys} from swh_cache_read(%s, %s)', | ||||
self.cache_read_keys) | self.cache_read_keys) | ||||
cur.execute(q, (timestamp, limit)) | cur.execute(q, (timestamp, limit)) | ||||
for r in cur.fetchall(): | for r in cur.fetchall(): | ||||
r['id'] = r['id'].tobytes() | r['id'] = r['id'].tobytes() | ||||
yield r | yield r | ||||
@db_transaction() | @db_transaction() | ||||
def cache_remove(self, entries, db=None, cur=None): | def cache_remove(self, entries, db=None, cur=None): | ||||
"""Clean events from the cache | """Clean events from the cache | ||||
""" | """ | ||||
q = 'delete from cache where url in (%s)' % ( | q = 'delete from cache where url in (%s)' % ( | ||||
', '.join(("'%s'" % e for e in entries)), ) | ', '.join(("'%s'" % e for e in entries)), ) | ||||
cur.execute(q) | cur.execute(q) | ||||
def prepare_events(events, timestamp=None): | |||||
if timestamp is None: | |||||
timestamp = utcnow() | |||||
outevents = [] | |||||
urls = [] | |||||
for e in events: | |||||
event = e.get() | |||||
url = event['url'].strip() | |||||
if event['last_seen'] is None: | |||||
event['last_seen'] = timestamp | |||||
event['url'] = url | |||||
if url in urls: | |||||
idx = urls.index(url) | |||||
urls.append(urls.pop(idx)) | |||||
pevent = outevents.pop(idx) | |||||
event['cnt'] += pevent['cnt'] | |||||
event['last_seen'] = max( | |||||
event['last_seen'], pevent['last_seen']) | |||||
else: | |||||
urls.append(url) | |||||
outevents.append(event) | |||||
return outevents |
not mean