Changeset View
Changeset View
Standalone View
Standalone View
swh/scheduler/tests/updater/conftest.py
import pytest | import pytest | ||||
import glob | import glob | ||||
import os | import os | ||||
from arrow import utcnow # XXX | |||||
from swh.core.utils import numfile_sortkey as sortkey | from swh.core.utils import numfile_sortkey as sortkey | ||||
from swh.scheduler.updater.backend import SchedulerUpdaterBackend | from swh.scheduler.updater.backend import SchedulerUpdaterBackend | ||||
from swh.scheduler.tests import SQL_DIR | from swh.scheduler.tests import SQL_DIR | ||||
import swh.scheduler.tests.conftest # noqa | import swh.scheduler.tests.conftest # noqa | ||||
DUMP_FILES = os.path.join(SQL_DIR, 'updater', '*.sql') | DUMP_FILES = os.path.join(SQL_DIR, 'updater', '*.sql') | ||||
@pytest.fixture | @pytest.fixture | ||||
def swh_scheduler_updater(request, postgresql_proc, postgresql): | def swh_scheduler_updater(postgresql): | ||||
config = { | config = { | ||||
'db': 'postgresql://{user}@{host}:{port}/{dbname}'.format( | 'db': postgresql.dsn, | ||||
host=postgresql_proc.host, | |||||
port=postgresql_proc.port, | |||||
user='postgres', | |||||
dbname='tests') | |||||
} | } | ||||
all_dump_files = sorted(glob.glob(DUMP_FILES), key=sortkey) | all_dump_files = sorted(glob.glob(DUMP_FILES), key=sortkey) | ||||
cursor = postgresql.cursor() | cursor = postgresql.cursor() | ||||
for fname in all_dump_files: | for fname in all_dump_files: | ||||
with open(fname) as fobj: | with open(fname) as fobj: | ||||
cursor.execute(fobj.read()) | cursor.execute(fobj.read()) | ||||
postgresql.commit() | postgresql.commit() | ||||
backend = SchedulerUpdaterBackend(**config) | backend = SchedulerUpdaterBackend(**config) | ||||
return backend | return backend | ||||
def make_event(event_type, name, origin_type): | |||||
return { | |||||
'type': event_type, | |||||
'repo': { | |||||
'name': name, | |||||
}, | |||||
'created_at': utcnow(), | |||||
'origin_type': origin_type, | |||||
} | |||||
def make_simple_event(event_type, name, origin_type): | |||||
return { | |||||
'type': event_type, | |||||
'url': 'https://fakeurl/%s' % name, | |||||
'origin_type': origin_type, | |||||
'created_at': utcnow(), | |||||
} | |||||
def make_events(events): | |||||
for event_type, repo_name, origin_type in events: | |||||
yield make_event(event_type, repo_name, origin_type) | |||||
def make_incomplete_event(event_type, name, origin_type, | |||||
missing_data_key): | |||||
event = make_event(event_type, name, origin_type) | |||||
del event[missing_data_key] | |||||
return event | |||||
def make_incomplete_events(events): | |||||
for event_type, repo_name, origin_type, missing_data_key in events: | |||||
yield make_incomplete_event(event_type, repo_name, | |||||
origin_type, missing_data_key) |