Page Menu
Home
Software Heritage
Search
Configure Global Search
Log In
Files
F7122810
D8833.diff
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
4 KB
Subscribers
None
D8833.diff
View Options
diff --git a/swh/journal/writer/inmemory.py b/swh/journal/writer/inmemory.py
--- a/swh/journal/writer/inmemory.py
+++ b/swh/journal/writer/inmemory.py
@@ -5,16 +5,16 @@
import logging
from multiprocessing import Manager
-from typing import Any, Callable, Dict, Iterable, List, Tuple
+from typing import Any, Callable, Dict, Iterable, List, Optional, Tuple
-from .interface import ValueProtocol
+from .interface import KeyType, ValueProtocol
logger = logging.getLogger(__name__)
class InMemoryJournalWriter:
- objects: List[Tuple[str, ValueProtocol]]
- privileged_objects: List[Tuple[str, ValueProtocol]]
+ objects: List[Tuple[str, KeyType, Optional[ValueProtocol]]]
+ privileged_objects: List[Tuple[str, KeyType, Optional[ValueProtocol]]]
def __init__(
self,
@@ -28,15 +28,15 @@
self.anonymize = anonymize
def write_addition(self, object_type: str, object_: ValueProtocol) -> None:
- object_.unique_key() # Check this does not error, to mimic the kafka writer
+ key = object_.unique_key()
anon_object_ = None
if self.anonymize:
anon_object_ = object_.anonymize()
if anon_object_ is not None:
- self.privileged_objects.append((object_type, object_))
- self.objects.append((object_type, anon_object_))
+ self.privileged_objects.append((object_type, key, object_))
+ self.objects.append((object_type, key, anon_object_))
else:
- self.objects.append((object_type, object_))
+ self.objects.append((object_type, key, object_))
def write_additions(
self, object_type: str, objects: Iterable[ValueProtocol]
@@ -46,3 +46,7 @@
def flush(self) -> None:
pass
+
+ def delete(self, object_type: str, object_keys: Iterable[KeyType]) -> None:
+ for key in object_keys:
+ self.objects.append((object_type, key, None))
diff --git a/swh/journal/writer/interface.py b/swh/journal/writer/interface.py
--- a/swh/journal/writer/interface.py
+++ b/swh/journal/writer/interface.py
@@ -38,3 +38,7 @@
def flush(self) -> None:
"""Flush the pending object additions in the backend, if any."""
...
+
+ def delete(self, object_type: str, object_keys: Iterable[KeyType]) -> None:
+ """Delete the object with keys `object_keys`."""
+ ...
diff --git a/swh/journal/writer/kafka.py b/swh/journal/writer/kafka.py
--- a/swh/journal/writer/kafka.py
+++ b/swh/journal/writer/kafka.py
@@ -160,7 +160,7 @@
)
)
- def send(self, topic: str, key: KeyType, value):
+ def reliable_produce(self, topic: str, key: KeyType, kafka_value: Optional[bytes]):
kafka_key = key_to_kafka(key)
max_attempts = 5
last_exception: Optional[Exception] = None
@@ -169,7 +169,7 @@
self.producer.produce(
topic=topic,
key=kafka_key,
- value=value_to_kafka(value),
+ value=kafka_value,
)
except BufferError as e:
last_exception = e
@@ -194,6 +194,10 @@
)
)
+ def send(self, topic: str, key: KeyType, value):
+ kafka_value = value_to_kafka(value)
+ return self.reliable_produce(topic, key, kafka_value)
+
def delivery_error(self, message) -> KafkaDeliveryError:
"""Get all failed deliveries, and clear them"""
ret = self.delivery_failures
@@ -266,3 +270,8 @@
if self.auto_flush:
self.flush()
+
+ def delete(self, object_type: str, object_keys: Iterable[KeyType]) -> None:
+ topic = f"{self._prefix}.{object_type}"
+ for key in object_keys:
+ self.reliable_produce(topic, key, None)
diff --git a/swh/journal/writer/stream.py b/swh/journal/writer/stream.py
--- a/swh/journal/writer/stream.py
+++ b/swh/journal/writer/stream.py
@@ -8,7 +8,7 @@
from swh.journal.serializers import value_to_kafka
-from .interface import ValueProtocol
+from .interface import KeyType, ValueProtocol
logger = logging.getLogger(__name__)
@@ -39,5 +39,8 @@
for object_ in objects:
self.write_addition(object_type, object_)
+ def delete(self, object_type: str, object_keys: Iterable[KeyType]) -> None:
+ pass
+
def flush(self) -> None:
self.output.flush()
File Metadata
Details
Attached
Mime Type
text/plain
Expires
Dec 17 2024, 2:41 AM (13 w, 6 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3216876
Attached To
D8833: Add base functionality to support object deletion
Event Timeline
Log In to Comment