Changeset View
Changeset View
Standalone View
Standalone View
swh/journal/backfill.py
- This file was moved from swh/journal/checker.py.
# Copyright (C) 2017 The Software Heritage developers | # Copyright (C) 2017 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
"""Module defining journal checker classes. | """Module defining journal checker classes. | ||||
Those checker goal is to send back all, or missing objects from the | Those checker goal is to send back all, or missing objects from the | ||||
anlambert: I do not really understand what the journal backfiller is supposed to do by reading that first… | |||||
journal queues. | journal queues. | ||||
At the moment, a first naive implementation is the | At the moment, a first naive implementation is the | ||||
SimpleCheckerProducer. It simply reads the objects from the | JournalBackfiller. It simply reads the objects from the | ||||
storage and sends every object identifier back to the journal. | storage and sends every object identifier back to the journal. | ||||
""" | """ | ||||
import psycopg2 | import psycopg2 | ||||
from kafka import KafkaProducer | from kafka import KafkaProducer | ||||
Show All 20 Lines | if isinstance(entry, tuple): | ||||
return [entry_to_bytes(value) for value in entry] | return [entry_to_bytes(value) for value in entry] | ||||
return entry | return entry | ||||
def fetch(db_conn, obj_type): | def fetch(db_conn, obj_type): | ||||
"""Fetch all obj_type's identifiers from db. | """Fetch all obj_type's identifiers from db. | ||||
This opens one connection, stream objects and when done, close | This opens one connection, stream objects and when done, close | ||||
the connection. | the connection. | ||||
Not Done Inline ActionsI think it should simply be "id" here. The defined alias is the same as the column name. anlambert: I think it should simply be `"id"` here. The defined alias is the same as the column name. | |||||
Done Inline ActionsWe used those to disambiguate the sql query we generate (when a join is implied). For example, release, revision joins on person which also has columns name and id. So we need those. ardumont: We used those to disambiguate the sql query we generate (when a join is implied).
For example… | |||||
Not Done Inline ActionsOh I see, that was not straightforward to understand. anlambert: Oh I see, that was not straightforward to understand. | |||||
Done Inline ActionsIndeed! ardumont: Indeed! | |||||
Raises: | Raises: | ||||
ValueError if obj_type is not supported | ValueError if obj_type is not supported | ||||
Yields: | Yields: | ||||
Identifiers for the specific object_type | Identifiers for the specific object_type | ||||
""" | """ | ||||
primary_key = TYPE_TO_PRIMARY_KEY.get(obj_type) | primary_key = TYPE_TO_PRIMARY_KEY.get(obj_type) | ||||
if not primary_key: | if not primary_key: | ||||
raise ValueError('The object type %s is not supported. ' | raise ValueError('The object type %s is not supported. ' | ||||
'Only possible values are %s' % ( | 'Only possible values are %s' % ( | ||||
obj_type, TYPE_TO_PRIMARY_KEY.keys())) | obj_type, TYPE_TO_PRIMARY_KEY.keys())) | ||||
primary_key_str = ','.join(primary_key) | primary_key_str = ','.join(primary_key) | ||||
query = 'select %s from %s order by %s' % ( | query = 'select %s from %s order by %s' % ( | ||||
primary_key_str, obj_type, primary_key_str) | primary_key_str, obj_type, primary_key_str) | ||||
server_side_cursor_name = 'swh.journal.%s' % obj_type | server_side_cursor_name = 'swh.journal.%s' % obj_type | ||||
with psycopg2.connect(db_conn) as db: | with psycopg2.connect(db_conn) as db: | ||||
cursor = db.cursor(name=server_side_cursor_name) | cursor = db.cursor(name=server_side_cursor_name) | ||||
cursor.execute(query) | cursor.execute(query) | ||||
for o in cursor: | for o in cursor: | ||||
yield dict(zip(primary_key, entry_to_bytes(o))) | yield dict(zip(primary_key, entry_to_bytes(o))) | ||||
Not Done Inline Actionssame here anlambert: same here | |||||
class SimpleCheckerProducer(SWHConfig): | class JournalBackfiller(SWHConfig): | ||||
"""Class in charge of reading the storage's objects and sends those | """Class in charge of reading the storage's objects and sends those | ||||
back to the publisher queue. | back to the publisher queue. | ||||
Not Done Inline Actionssame here anlambert: same here | |||||
This is designed to be run periodically. | This is designed to be run periodically. | ||||
""" | """ | ||||
DEFAULT_CONFIG = { | DEFAULT_CONFIG = { | ||||
'brokers': ('list[str]', ['getty.internal.softwareheritage.org']), | 'brokers': ('list[str]', ['getty.internal.softwareheritage.org']), | ||||
'temporary_prefix': ('str', 'swh.tmp_journal.new'), | 'temporary_prefix': ('str', 'swh.tmp_journal.new'), | ||||
'publisher_id': ('str', 'swh.journal.publisher.test'), | 'publisher_id': ('str', 'swh.journal.publisher.test'), | ||||
▲ Show 20 Lines • Show All 42 Lines • ▼ Show 20 Lines | def run(self): | ||||
""" | """ | ||||
for obj_type, obj in self._read_storage(): | for obj_type, obj in self._read_storage(): | ||||
topic = '%s.%s' % (self.config['temporary_prefix'], obj_type) | topic = '%s.%s' % (self.config['temporary_prefix'], obj_type) | ||||
self.producer.send(topic, value=obj) | self.producer.send(topic, value=obj) | ||||
if __name__ == '__main__': | if __name__ == '__main__': | ||||
SimpleCheckerProducer().run() | JournalBackfiller().run() | ||||
Done Inline ActionsThis could be moved in a dedicated _compute_shift_bits function as it is also used in the byte_ranges method below. anlambert: This could be moved in a dedicated `_compute_shift_bits` function as it is also used in the… | |||||
Done Inline ActionsIndeed! ardumont: Indeed!
I will adapt. | |||||
Not Done Inline Actionss/Reads/Read as imperative form is used elsewhere anlambert: s/Reads/Read as imperative form is used elsewhere | |||||
Done Inline Actions@faux here :) ardumont: @faux here :) |
I do not really understand what the journal backfiller is supposed to do by reading that first sentence.