diff --git a/swh/scheduler/tests/updater/conftest.py b/swh/scheduler/tests/updater/conftest.py new file mode 100644 --- /dev/null +++ b/swh/scheduler/tests/updater/conftest.py @@ -0,0 +1,33 @@ +import pytest +import glob +import os + +from swh.core.utils import numfile_sortkey as sortkey +from swh.scheduler.updater.backend import SchedulerUpdaterBackend +from swh.scheduler.tests import SQL_DIR +import swh.scheduler.tests.conftest # noqa + + +DUMP_FILES = os.path.join(SQL_DIR, 'updater', '*.sql') + + +@pytest.fixture +def swh_scheduler_updater(request, postgresql_proc, postgresql): + config = { + 'db': 'postgresql://{user}@{host}:{port}/{dbname}'.format( + host=postgresql_proc.host, + port=postgresql_proc.port, + user='postgres', + dbname='tests') + } + + all_dump_files = sorted(glob.glob(DUMP_FILES), key=sortkey) + + cursor = postgresql.cursor() + for fname in all_dump_files: + with open(fname) as fobj: + cursor.execute(fobj.read()) + postgresql.commit() + + backend = SchedulerUpdaterBackend(**config) + return backend diff --git a/swh/scheduler/tests/updater/test_backend.py b/swh/scheduler/tests/updater/test_backend.py --- a/swh/scheduler/tests/updater/test_backend.py +++ b/swh/scheduler/tests/updater/test_backend.py @@ -3,60 +3,34 @@ # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information -import os -import unittest - -from arrow import utcnow from hypothesis import given from hypothesis.strategies import sets -import pytest -from swh.core.tests.db_testing import SingleDbTestFixture -from swh.scheduler.tests import SQL_DIR -from swh.scheduler.updater.backend import SchedulerUpdaterBackend from swh.scheduler.updater.events import SWHEvent from . import from_regex -@pytest.mark.db -class SchedulerUpdaterBackendTest(SingleDbTestFixture, unittest.TestCase): - TEST_DB_NAME = 'softwareheritage-scheduler-updater-test' - TEST_DB_DUMP = os.path.join(SQL_DIR, 'updater', '*.sql') - - def setUp(self): - super().setUp() - config = { - 'db': 'dbname=' + self.TEST_DB_NAME, - 'cache_read_limit': 1000, - } - self.backend = SchedulerUpdaterBackend(**config) - - def _empty_tables(self): - self.cursor.execute( - """SELECT table_name FROM information_schema.tables - WHERE table_schema = %s""", ('public', )) - tables = set(table for (table,) in self.cursor.fetchall()) - for table in tables: - self.cursor.execute('truncate table %s cascade' % table) - self.conn.commit() - - def tearDown(self): - self._empty_tables() - super().tearDown() - - @given(sets( - from_regex( - r'^https://somewhere[.]org/[a-z0-9]{5,7}/[a-z0-9]{3,10}$'), - min_size=10, max_size=15)) - def test_cache_read(self, urls): - def gen_events(urls): - for url in urls: - yield SWHEvent({ - 'url': url, - 'type': 'create', - 'origin_type': 'git', - }) - self.backend.cache_put(gen_events(urls)) - r = self.backend.cache_read(timestamp=utcnow()) - self.assertNotEqual(r, []) +@given(urls=sets( + from_regex( + r'^https://somewhere[.]org/[a-z0-9]{5,7}/[a-z0-9]{3,10}$'), + min_size=10, max_size=15)) +def test_cache_read(urls, swh_scheduler_updater): + # beware that the fixture is only called once for all the tests + # generated by hypothesis, so the db is not cleared between calls. + # see the end of + # https://hypothesis.works/articles/hypothesis-pytest-fixtures/ + def gen_events(urls): + for url in urls: + yield SWHEvent({ + 'url': url, + 'type': 'create', + 'origin_type': 'git', + }) + known_urls = set(e['url'] for e in + swh_scheduler_updater.cache_read(limit=1000000)) + swh_scheduler_updater.cache_put(gen_events(urls)) + new_urls = {u.strip() for u in urls} - known_urls + all_urls = set(e['url'] for e in + swh_scheduler_updater.cache_read(limit=1000000)) + assert (all_urls - known_urls) == new_urls diff --git a/swh/scheduler/updater/backend.py b/swh/scheduler/updater/backend.py --- a/swh/scheduler/updater/backend.py +++ b/swh/scheduler/updater/backend.py @@ -47,16 +47,6 @@ """Write new events in the backend. """ - if timestamp is None: - timestamp = utcnow() - - def prepare_events(events): - for e in events: - event = e.get() - seen = event['last_seen'] - if seen is None: - event['last_seen'] = timestamp - yield event cur.execute('select swh_mktemp_cache()') db.copy_to(prepare_events(events, timestamp), 'tmp_cache', self.cache_put_keys, cur=cur) @@ -69,6 +59,9 @@ def cache_read(self, timestamp=None, limit=None, db=None, cur=None): """Read events from the cache prior to timestamp. + Note that limit=None does not meas 'no limit' but use the default + limit. + """ if not timestamp: timestamp = utcnow() @@ -91,3 +84,28 @@ q = 'delete from cache where url in (%s)' % ( ', '.join(("'%s'" % e for e in entries)), ) cur.execute(q) + + +def prepare_events(events, timestamp=None): + if timestamp is None: + timestamp = utcnow() + outevents = [] + urls = [] + for e in events: + event = e.get() + url = event['url'].strip() + if event['last_seen'] is None: + event['last_seen'] = timestamp + event['url'] = url + + if url in urls: + idx = urls.index(url) + urls.append(urls.pop(idx)) + pevent = outevents.pop(idx) + event['cnt'] += pevent['cnt'] + event['last_seen'] = max( + event['last_seen'], pevent['last_seen']) + else: + urls.append(url) + outevents.append(event) + return outevents