Page MenuHomeSoftware Heritage

No OneTemporary

diff --git a/debian/control b/debian/control
index 00fd2c8..fbd1d59 100644
--- a/debian/control
+++ b/debian/control
@@ -1,20 +1,19 @@
Source: swh-journal
Maintainer: Software Heritage developers <swh-devel@inria.fr>
Section: python
Priority: optional
Build-Depends: debhelper (>= 9),
dh-python,
python3-all,
python3-nose,
python3-setuptools,
python3-swh.core,
- python3-swh.model,
python3-swh.storage,
python3-vcversioner
Standards-Version: 3.9.6
Homepage: https://forge.softwareheritage.org/diffusion/DJNL/
Package: python3-swh.journal
Architecture: all
Depends: ${misc:Depends}, ${python3:Depends}
Description: Software Heritage Journal utilities
diff --git a/requirements-swh.txt b/requirements-swh.txt
index cf1e1fd..e69de29 100644
--- a/requirements-swh.txt
+++ b/requirements-swh.txt
@@ -1 +0,0 @@
-swh.model >= 0.0.14
diff --git a/swh/journal/checker.py b/swh/journal/checker.py
index cbe4482..a52dbe8 100644
--- a/swh/journal/checker.py
+++ b/swh/journal/checker.py
@@ -1,158 +1,82 @@
# Copyright (C) 2017 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
-"""Module defining a class in charge of computing the missing objects
-from the journal queues and sending those back to the queues.
+"""Module defining journal checker classes.
+
+Those checker goal is to send back all, or missing objects from the
+journal queues.
+
+At the moment, a first naive implementation is the
+SWHSimpleCheckerProducer. It simply reads the objects from the
+storage and sends every object identifier back to the journal.
"""
-from collections import defaultdict
-from kafka import KafkaProducer, KafkaConsumer
+from kafka import KafkaProducer
from swh.core.config import SWHConfig
from .backend import Backend
-from .serializers import kafka_to_value, value_to_kafka
-
-
-# Dict from object to its identifier
-OBJECT_TO_ID_FN = {
- 'content': lambda c: c[b'sha1'],
- 'origin': lambda o: o[b'id'],
- 'revision': lambda r: r[b'id'],
- 'release': lambda r: r[b'id'],
-}
+from .serializers import value_to_kafka
-class SWHJournalChecker(SWHConfig):
- """Class in charge of computing a diff against list of objects and the
- actual swh-storage's objects. The missing objects resulting
- from that diff are queued back in the publisher's queues.
+class SWHJournalSimpleCheckerProducer(SWHConfig):
+ """Class in charge of reading the storage's objects and sends those
+ back to the publisher queue.
This is designed to be run periodically.
"""
DEFAULT_CONFIG = {
'brokers': ('list[str]', ['getty.internal.softwareheritage.org']),
-
- 'reading_prefix': ('str', 'swh.journal.objects'),
'writing_prefix': ('str', 'swh.journal.objects'),
-
- 'consumer_id': ('str', 'swh.journal.publisher.test'),
'publisher_id': ('str', 'swh.journal.publisher.test'),
-
'object_types': ('list[str]', ['content', 'revision', 'release']),
- 'diff_journal': ('bool', False),
-
'storage_dbconn': ('str', 'service=swh-dev'),
}
CONFIG_BASE_FILENAME = 'journal/checker'
def __init__(self, extra_configuration=None):
self.config = config = self.parse_config_file()
if extra_configuration:
config.update(extra_configuration)
self.storage_backend = Backend(self.config['storage_dbconn'])
- self.consumer = KafkaConsumer(
- bootstrap_servers=config['brokers'],
- value_deserializer=kafka_to_value,
- auto_offset_reset='earliest',
- enable_auto_commit=False,
- group_id=config['consumer_id'],
- )
-
self.producer = KafkaProducer(
bootstrap_servers=config['brokers'],
value_serializer=value_to_kafka,
client_id=config['publisher_id'],
)
- self.diff_journal = self.config['diff_journal']
-
self.object_read_fn = {
'content': self.storage_backend.content_get_ids,
'origin': self.storage_backend.origin_get_ids,
'revision': self.storage_backend.revision_get_ids,
'release': self.storage_backend.release_get_ids,
}
- if self.diff_journal:
- self.consumer.subscribe(
- topics=['%s.%s' % (config['reading_prefix'], obj_type)
- for obj_type in config['object_types']],
- )
-
- def _read_journal(self):
- """Read all the journal objects and returns as a dict of obj_type,
- set of identifiers.
-
- """
- journal_objs = defaultdict(set)
- for message in self.consumer:
- obj_type = message.topic.split('.')[-1]
- obj_id = OBJECT_TO_ID_FN[obj_type](message.value)
- journal_objs[obj_type].add(obj_id)
-
- return journal_objs
-
def _read_storage(self):
"""Read all the storage's objects and returns as dict of object_types,
set of identifiers.
"""
- storage_objs = {}
- for obj_type in self.config['object_types']:
- storage_objs[obj_type] = set(self.object_read_fn[obj_type]())
-
- return storage_objs
-
- def _compute_diff(self, storage_objs, journal_objs):
- """Compute the difference between storage_objects and journal_objects.
-
- Args:
- storage_objects (dict): objects from storage, key is the
- type, value is the set of ids for
- that type.
- journal_objects (dict): objects from journal, key is the
- type, value is the set of ids for
- that type.
-
- Returns:
- dict of difference for each object_type
-
- """
- objects = {}
for obj_type in self.config['object_types']:
- objects[obj_type] = storage_objs[obj_type] - journal_objs[obj_type]
-
- return objects
+ for obj_id in self.object_read_fn[obj_type]():
+ yield obj_type, obj_id
def run(self):
"""Reads storage's subscribed object types and send them all back to
the publisher queue.
- Optionally, reads journal's objects and compute the
- difference. The missing objects present in the storage and
- missing from the journal are sent back to the journal.
-
"""
- storage_objs = self._read_storage()
-
- if self.diff_journal:
- journal_objs = self._read_journal()
- objects = self._compute_diffs(storage_objs, journal_objs)
- else:
- objects = storage_objs
- for obj_type, objs in objects.items():
+ for obj_type, obj_id in self._read_storage():
topic = '%s.%s' % (self.config['writing_prefix'], obj_type)
- for obj_id in objs:
- self.producer.send(topic, value=obj_id)
+ self.producer.send(topic, value=obj_id)
if __name__ == '__main__':
- SWHJournalChecker().run()
+ SWHJournalSimpleCheckerProducer().run()

File Metadata

Mime Type
text/x-diff
Expires
Sat, Jun 21, 6:19 PM (1 w, 6 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3236672

Event Timeline