Page Menu
Home
Software Heritage
Search
Configure Global Search
Log In
Files
F7122822
D1992.diff
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
5 KB
Subscribers
None
D1992.diff
View Options
diff --git a/swh/journal/direct_writer.py b/swh/journal/direct_writer.py
--- a/swh/journal/direct_writer.py
+++ b/swh/journal/direct_writer.py
@@ -3,92 +3,5 @@
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
-import logging
-
-from confluent_kafka import Producer
-
-from swh.model.hashutil import DEFAULT_ALGORITHMS
-from swh.model.model import BaseModel
-
-from .serializers import key_to_kafka, value_to_kafka
-
-logger = logging.getLogger(__name__)
-
-
-class DirectKafkaWriter:
- """This class is instantiated and used by swh-storage to write incoming
- new objects to Kafka before adding them to the storage backend
- (eg. postgresql) itself."""
- def __init__(self, brokers, prefix, client_id):
- self._prefix = prefix
-
- self.producer = Producer({
- 'bootstrap.servers': brokers,
- 'client.id': client_id,
- 'enable.idempotence': 'true',
- })
-
- def send(self, topic, key, value):
- self.producer.produce(
- topic=topic,
- key=key_to_kafka(key),
- value=value_to_kafka(value),
- )
-
- def flush(self):
- self.producer.flush()
- # Need to service the callbacks regularly by calling poll
- self.producer.poll(0)
-
- def _get_key(self, object_type, object_):
- if object_type in ('revision', 'release', 'directory', 'snapshot'):
- return object_['id']
- elif object_type == 'content':
- return object_['sha1'] # TODO: use a dict of hashes
- elif object_type == 'skipped_content':
- return {
- hash: object_[hash]
- for hash in DEFAULT_ALGORITHMS
- }
- elif object_type == 'origin':
- return {'url': object_['url'], 'type': object_['type']}
- elif object_type == 'origin_visit':
- return {
- 'origin': object_['origin'],
- 'date': str(object_['date']),
- }
- else:
- raise ValueError('Unknown object type: %s.' % object_type)
-
- def _sanitize_object(self, object_type, object_):
- if object_type == 'origin_visit':
- return {
- **object_,
- 'date': str(object_['date']),
- }
- elif object_type == 'origin':
- assert 'id' not in object_
- return object_
-
- def write_addition(self, object_type, object_, flush=True):
- """Write a single object to the journal"""
- if isinstance(object_, BaseModel):
- object_ = object_.to_dict()
- topic = '%s.%s' % (self._prefix, object_type)
- key = self._get_key(object_type, object_)
- object_ = self._sanitize_object(object_type, object_)
- logger.debug('topic: %s, key: %s, value: %s', topic, key, object_)
- self.send(topic, key=key, value=object_)
-
- if flush:
- self.flush()
-
- write_update = write_addition
-
- def write_additions(self, object_type, objects, flush=True):
- """Write a set of objects to the journal"""
- for object_ in objects:
- self.write_addition(object_type, object_, flush=False)
-
- if flush:
- self.flush()
+# for BW compat
+from .writer.direct import DirectKafkaWriter # noqa
diff --git a/swh/journal/writer/__init__.py b/swh/journal/writer/__init__.py
new file mode 100644
--- /dev/null
+++ b/swh/journal/writer/__init__.py
@@ -0,0 +1,15 @@
+# Copyright (C) 2019 The Software Heritage developers
+# See the AUTHORS file at the top-level directory of this distribution
+# License: GNU General Public License version 3, or any later version
+# See top-level LICENSE file for more information
+
+
+def get_journal_writer(cls, args={}):
+ if cls == 'inmemory':
+ from .inmemory import InMemoryJournalWriter as JournalWriter
+ elif cls == 'kafka':
+ from .direct import DirectKafkaWriter as JournalWriter
+ else:
+ raise ValueError('Unknown storage class `%s`' % cls)
+
+ return JournalWriter(**args)
diff --git a/swh/journal/direct_writer.py b/swh/journal/writer/direct.py
copy from swh/journal/direct_writer.py
copy to swh/journal/writer/direct.py
--- a/swh/journal/direct_writer.py
+++ b/swh/journal/writer/direct.py
@@ -10,7 +10,7 @@
from swh.model.hashutil import DEFAULT_ALGORITHMS
from swh.model.model import BaseModel
-from .serializers import key_to_kafka, value_to_kafka
+from swh.journal.serializers import key_to_kafka, value_to_kafka
logger = logging.getLogger(__name__)
diff --git a/swh/journal/writer/inmemory.py b/swh/journal/writer/inmemory.py
new file mode 100644
--- /dev/null
+++ b/swh/journal/writer/inmemory.py
@@ -0,0 +1,30 @@
+# Copyright (C) 2019 The Software Heritage developers
+# See the AUTHORS file at the top-level directory of this distribution
+# License: GNU General Public License version 3, or any later version
+# See top-level LICENSE file for more information
+
+import logging
+import copy
+from multiprocessing import Manager
+
+from swh.model.model import BaseModel
+
+logger = logging.getLogger(__name__)
+
+
+class InMemoryJournalWriter:
+ def __init__(self):
+ # Share the list of objects across processes, for RemoteAPI tests.
+ self.manager = Manager()
+ self.objects = self.manager.list()
+
+ def write_addition(self, object_type, object_):
+ if isinstance(object_, BaseModel):
+ object_ = object_.to_dict()
+ self.objects.append((object_type, copy.deepcopy(object_)))
+
+ write_update = write_addition
+
+ def write_additions(self, object_type, objects):
+ for object_ in objects:
+ self.write_addition(object_type, object_)
File Metadata
Details
Attached
Mime Type
text/plain
Expires
Tue, Dec 17, 4:22 AM (2 w, 1 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3226902
Attached To
D1992: writer: move the InMemoryJournalWriter from swh.storage in swh.journal
Event Timeline
Log In to Comment