Page MenuHomeSoftware Heritage

D1992.diff
No OneTemporary

D1992.diff

diff --git a/swh/journal/direct_writer.py b/swh/journal/direct_writer.py
--- a/swh/journal/direct_writer.py
+++ b/swh/journal/direct_writer.py
@@ -3,92 +3,5 @@
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
-import logging
-
-from confluent_kafka import Producer
-
-from swh.model.hashutil import DEFAULT_ALGORITHMS
-from swh.model.model import BaseModel
-
-from .serializers import key_to_kafka, value_to_kafka
-
-logger = logging.getLogger(__name__)
-
-
-class DirectKafkaWriter:
- """This class is instantiated and used by swh-storage to write incoming
- new objects to Kafka before adding them to the storage backend
- (eg. postgresql) itself."""
- def __init__(self, brokers, prefix, client_id):
- self._prefix = prefix
-
- self.producer = Producer({
- 'bootstrap.servers': brokers,
- 'client.id': client_id,
- 'enable.idempotence': 'true',
- })
-
- def send(self, topic, key, value):
- self.producer.produce(
- topic=topic,
- key=key_to_kafka(key),
- value=value_to_kafka(value),
- )
-
- def flush(self):
- self.producer.flush()
- # Need to service the callbacks regularly by calling poll
- self.producer.poll(0)
-
- def _get_key(self, object_type, object_):
- if object_type in ('revision', 'release', 'directory', 'snapshot'):
- return object_['id']
- elif object_type == 'content':
- return object_['sha1'] # TODO: use a dict of hashes
- elif object_type == 'skipped_content':
- return {
- hash: object_[hash]
- for hash in DEFAULT_ALGORITHMS
- }
- elif object_type == 'origin':
- return {'url': object_['url'], 'type': object_['type']}
- elif object_type == 'origin_visit':
- return {
- 'origin': object_['origin'],
- 'date': str(object_['date']),
- }
- else:
- raise ValueError('Unknown object type: %s.' % object_type)
-
- def _sanitize_object(self, object_type, object_):
- if object_type == 'origin_visit':
- return {
- **object_,
- 'date': str(object_['date']),
- }
- elif object_type == 'origin':
- assert 'id' not in object_
- return object_
-
- def write_addition(self, object_type, object_, flush=True):
- """Write a single object to the journal"""
- if isinstance(object_, BaseModel):
- object_ = object_.to_dict()
- topic = '%s.%s' % (self._prefix, object_type)
- key = self._get_key(object_type, object_)
- object_ = self._sanitize_object(object_type, object_)
- logger.debug('topic: %s, key: %s, value: %s', topic, key, object_)
- self.send(topic, key=key, value=object_)
-
- if flush:
- self.flush()
-
- write_update = write_addition
-
- def write_additions(self, object_type, objects, flush=True):
- """Write a set of objects to the journal"""
- for object_ in objects:
- self.write_addition(object_type, object_, flush=False)
-
- if flush:
- self.flush()
+# for BW compat
+from .writer.direct import DirectKafkaWriter # noqa
diff --git a/swh/journal/writer/__init__.py b/swh/journal/writer/__init__.py
new file mode 100644
--- /dev/null
+++ b/swh/journal/writer/__init__.py
@@ -0,0 +1,15 @@
+# Copyright (C) 2019 The Software Heritage developers
+# See the AUTHORS file at the top-level directory of this distribution
+# License: GNU General Public License version 3, or any later version
+# See top-level LICENSE file for more information
+
+
+def get_journal_writer(cls, args={}):
+ if cls == 'inmemory':
+ from .inmemory import InMemoryJournalWriter as JournalWriter
+ elif cls == 'kafka':
+ from .direct import DirectKafkaWriter as JournalWriter
+ else:
+ raise ValueError('Unknown storage class `%s`' % cls)
+
+ return JournalWriter(**args)
diff --git a/swh/journal/direct_writer.py b/swh/journal/writer/direct.py
copy from swh/journal/direct_writer.py
copy to swh/journal/writer/direct.py
--- a/swh/journal/direct_writer.py
+++ b/swh/journal/writer/direct.py
@@ -10,7 +10,7 @@
from swh.model.hashutil import DEFAULT_ALGORITHMS
from swh.model.model import BaseModel
-from .serializers import key_to_kafka, value_to_kafka
+from swh.journal.serializers import key_to_kafka, value_to_kafka
logger = logging.getLogger(__name__)
diff --git a/swh/journal/writer/inmemory.py b/swh/journal/writer/inmemory.py
new file mode 100644
--- /dev/null
+++ b/swh/journal/writer/inmemory.py
@@ -0,0 +1,30 @@
+# Copyright (C) 2019 The Software Heritage developers
+# See the AUTHORS file at the top-level directory of this distribution
+# License: GNU General Public License version 3, or any later version
+# See top-level LICENSE file for more information
+
+import logging
+import copy
+from multiprocessing import Manager
+
+from swh.model.model import BaseModel
+
+logger = logging.getLogger(__name__)
+
+
+class InMemoryJournalWriter:
+ def __init__(self):
+ # Share the list of objects across processes, for RemoteAPI tests.
+ self.manager = Manager()
+ self.objects = self.manager.list()
+
+ def write_addition(self, object_type, object_):
+ if isinstance(object_, BaseModel):
+ object_ = object_.to_dict()
+ self.objects.append((object_type, copy.deepcopy(object_)))
+
+ write_update = write_addition
+
+ def write_additions(self, object_type, objects):
+ for object_ in objects:
+ self.write_addition(object_type, object_)

File Metadata

Mime Type
text/plain
Expires
Tue, Dec 17, 4:22 AM (2 w, 1 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3226902

Event Timeline