Page MenuHomeSoftware Heritage

D8833.diff
No OneTemporary

D8833.diff

diff --git a/swh/journal/writer/inmemory.py b/swh/journal/writer/inmemory.py
--- a/swh/journal/writer/inmemory.py
+++ b/swh/journal/writer/inmemory.py
@@ -5,16 +5,16 @@
import logging
from multiprocessing import Manager
-from typing import Any, Callable, Dict, Iterable, List, Tuple
+from typing import Any, Callable, Dict, Iterable, List, Optional, Tuple
-from .interface import ValueProtocol
+from .interface import KeyType, ValueProtocol
logger = logging.getLogger(__name__)
class InMemoryJournalWriter:
- objects: List[Tuple[str, ValueProtocol]]
- privileged_objects: List[Tuple[str, ValueProtocol]]
+ objects: List[Tuple[str, KeyType, Optional[ValueProtocol]]]
+ privileged_objects: List[Tuple[str, KeyType, Optional[ValueProtocol]]]
def __init__(
self,
@@ -28,15 +28,15 @@
self.anonymize = anonymize
def write_addition(self, object_type: str, object_: ValueProtocol) -> None:
- object_.unique_key() # Check this does not error, to mimic the kafka writer
+ key = object_.unique_key()
anon_object_ = None
if self.anonymize:
anon_object_ = object_.anonymize()
if anon_object_ is not None:
- self.privileged_objects.append((object_type, object_))
- self.objects.append((object_type, anon_object_))
+ self.privileged_objects.append((object_type, key, object_))
+ self.objects.append((object_type, key, anon_object_))
else:
- self.objects.append((object_type, object_))
+ self.objects.append((object_type, key, object_))
def write_additions(
self, object_type: str, objects: Iterable[ValueProtocol]
@@ -46,3 +46,7 @@
def flush(self) -> None:
pass
+
+ def delete(self, object_type: str, object_keys: Iterable[KeyType]) -> None:
+ for key in object_keys:
+ self.objects.append((object_type, key, None))
diff --git a/swh/journal/writer/interface.py b/swh/journal/writer/interface.py
--- a/swh/journal/writer/interface.py
+++ b/swh/journal/writer/interface.py
@@ -38,3 +38,7 @@
def flush(self) -> None:
"""Flush the pending object additions in the backend, if any."""
...
+
+ def delete(self, object_type: str, object_keys: Iterable[KeyType]) -> None:
+ """Delete the object with keys `object_keys`."""
+ ...
diff --git a/swh/journal/writer/kafka.py b/swh/journal/writer/kafka.py
--- a/swh/journal/writer/kafka.py
+++ b/swh/journal/writer/kafka.py
@@ -160,7 +160,7 @@
)
)
- def send(self, topic: str, key: KeyType, value):
+ def reliable_produce(self, topic: str, key: KeyType, kafka_value: Optional[bytes]):
kafka_key = key_to_kafka(key)
max_attempts = 5
last_exception: Optional[Exception] = None
@@ -169,7 +169,7 @@
self.producer.produce(
topic=topic,
key=kafka_key,
- value=value_to_kafka(value),
+ value=kafka_value,
)
except BufferError as e:
last_exception = e
@@ -194,6 +194,10 @@
)
)
+ def send(self, topic: str, key: KeyType, value):
+ kafka_value = value_to_kafka(value)
+ return self.reliable_produce(topic, key, kafka_value)
+
def delivery_error(self, message) -> KafkaDeliveryError:
"""Get all failed deliveries, and clear them"""
ret = self.delivery_failures
@@ -266,3 +270,8 @@
if self.auto_flush:
self.flush()
+
+ def delete(self, object_type: str, object_keys: Iterable[KeyType]) -> None:
+ topic = f"{self._prefix}.{object_type}"
+ for key in object_keys:
+ self.reliable_produce(topic, key, None)
diff --git a/swh/journal/writer/stream.py b/swh/journal/writer/stream.py
--- a/swh/journal/writer/stream.py
+++ b/swh/journal/writer/stream.py
@@ -8,7 +8,7 @@
from swh.journal.serializers import value_to_kafka
-from .interface import ValueProtocol
+from .interface import KeyType, ValueProtocol
logger = logging.getLogger(__name__)
@@ -39,5 +39,8 @@
for object_ in objects:
self.write_addition(object_type, object_)
+ def delete(self, object_type: str, object_keys: Iterable[KeyType]) -> None:
+ pass
+
def flush(self) -> None:
self.output.flush()

File Metadata

Mime Type
text/plain
Expires
Dec 17 2024, 2:41 AM (13 w, 6 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3216876

Event Timeline