Page MenuHomeSoftware Heritage

D1745.id5876.diff
No OneTemporary

D1745.id5876.diff

diff --git a/swh/journal/client.py b/swh/journal/client.py
--- a/swh/journal/client.py
+++ b/swh/journal/client.py
@@ -3,9 +3,10 @@
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
-from kafka import KafkaConsumer
import logging
+from kafka import KafkaConsumer
+
from .serializers import kafka_to_key, kafka_to_value
from swh.journal import DEFAULT_PREFIX
diff --git a/swh/journal/replay.py b/swh/journal/replay.py
--- a/swh/journal/replay.py
+++ b/swh/journal/replay.py
@@ -35,12 +35,26 @@
method = getattr(storage, object_type + '_add')
method(objects)
elif object_type == 'origin_visit':
- storage.origin_visit_upsert([
- {
- **obj,
- 'origin': storage.origin_add_one(obj['origin'])
- }
- for obj in objects])
+ for visit in objects:
+ if isinstance(visit['origin'], str):
+ # old format; note that it will crash with the pg and
+ # in-mem storages if the origin is not already known,
+ # but there is no other choice because we can't add an
+ # origin without knowing its type. Non-pg storages
+ # don't use a numeric FK internally,
+ visit['origin'] = {'url': visit['origin']}
+ else:
+ storage.origin_add_one(visit['origin'])
+ if 'type' not in visit:
+ # old format
+ visit['type'] = visit['origin']['type']
+
+ # filter out buggy objects produced by the backfiller
+ # see https://forge.softwareheritage.org/D1742
+ objects = [visit for visit in objects
+ if 'visit' in visit]
+
+ storage.origin_visit_upsert(objects)
else:
logger.warning('Received a series of %s, this should not happen',
object_type)
diff --git a/swh/journal/tests/test_replay.py b/swh/journal/tests/test_replay.py
--- a/swh/journal/tests/test_replay.py
+++ b/swh/journal/tests/test_replay.py
@@ -19,6 +19,7 @@
from swh.journal.replay import process_replay_objects
from .conftest import OBJECT_TYPE_KEYS
+from .utils import MockedJournalClient, MockedKafkaWriter
def test_storage_play(
@@ -100,3 +101,101 @@
[cont['sha1'] for cont in OBJECT_TYPE_KEYS['content'][1]]))
assert None not in contents
assert contents == OBJECT_TYPE_KEYS['content'][1]
+
+
+def test_write_replay_legacy_origin_visit1():
+ """Test origin_visit when the 'origin' is just a string."""
+ queue = []
+ replayer = MockedJournalClient(queue)
+ writer = MockedKafkaWriter(queue)
+
+ # Note that flipping the order of these two insertions will crash
+ # the test, because the legacy origin_format does not allow to create
+ # the origin when needed (type is missing)
+ now = datetime.datetime.now()
+ writer.send('origin', 'foo', {
+ 'url': 'http://example.com/',
+ 'type': 'git',
+ })
+ writer.send('origin_visit', 'foo', {
+ 'visit': 1,
+ 'origin': 'http://example.com/',
+ 'date': now,
+ })
+
+ queue_size = sum(len(partition)
+ for batch in queue
+ for partition in batch.values())
+
+ storage = get_storage('memory', {})
+ worker_fn = functools.partial(process_replay_objects, storage=storage)
+ nb_messages = 0
+ while nb_messages < queue_size:
+ nb_messages += replayer.process(worker_fn)
+
+ visits = list(storage.origin_visit_get('http://example.com/'))
+
+ assert visits in [
+ # old format (with origin ids)
+ [{
+ 'visit': 1,
+ 'origin': 1,
+ 'date': now,
+ }],
+ # new format (with origin urls)
+ [{
+ 'visit': 1,
+ 'origin': {'url': 'http://example.com/'},
+ 'date': now,
+ }]
+ ]
+
+
+def test_write_replay_legacy_origin_visit2():
+ """Test origin_visit when 'type' is missing."""
+ queue = []
+ replayer = MockedJournalClient(queue)
+ writer = MockedKafkaWriter(queue)
+
+ now = datetime.datetime.now()
+ writer.send('origin', 'foo', {
+ 'url': 'http://example.com/',
+ 'type': 'git',
+ })
+ writer.send('origin_visit', 'foo', {
+ 'visit': 1,
+ 'origin': {
+ 'url': 'http://example.com/',
+ 'type': 'git',
+ },
+ 'date': now,
+ })
+
+ queue_size = sum(len(partition)
+ for batch in queue
+ for partition in batch.values())
+
+ storage = get_storage('memory', {})
+ worker_fn = functools.partial(process_replay_objects, storage=storage)
+ nb_messages = 0
+ while nb_messages < queue_size:
+ nb_messages += replayer.process(worker_fn)
+
+ visits = list(storage.origin_visit_get('http://example.com/'))
+
+ assert visits in [
+ # old format (with origin ids)
+ [{
+ 'visit': 1,
+ 'origin': 1,
+ 'date': now,
+ 'type': 'git',
+ }],
+ # new format (with origin urls)
+ [{
+ 'visit': 1,
+ 'origin': {'url': 'http://example.com/'},
+ 'date': now,
+ 'type': 'git',
+ }]
+ ]
diff --git a/swh/journal/tests/test_write_replay.py b/swh/journal/tests/test_write_replay.py
--- a/swh/journal/tests/test_write_replay.py
+++ b/swh/journal/tests/test_write_replay.py
@@ -3,7 +3,6 @@
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
-from collections import namedtuple
import functools
from hypothesis import given, settings, HealthCheck
@@ -13,51 +12,10 @@
from swh.storage.in_memory import Storage
from swh.storage import HashCollision
-from swh.journal.client import JournalClient, ACCEPTED_OBJECT_TYPES
-from swh.journal.direct_writer import DirectKafkaWriter
from swh.journal.replay import process_replay_objects
from swh.journal.replay import process_replay_objects_content
-from swh.journal.serializers import (
- key_to_kafka, kafka_to_key, value_to_kafka, kafka_to_value)
-
-FakeKafkaMessage = namedtuple('FakeKafkaMessage', 'key value')
-FakeKafkaPartition = namedtuple('FakeKafkaPartition', 'topic')
-
-
-class MockedKafkaWriter(DirectKafkaWriter):
- def __init__(self, queue):
- self._prefix = 'prefix'
- self.queue = queue
-
- def send(self, topic, key, value):
- key = kafka_to_key(key_to_kafka(key))
- value = kafka_to_value(value_to_kafka(value))
- partition = FakeKafkaPartition(topic)
- msg = FakeKafkaMessage(key=key, value=value)
- if self.queue and {partition} == set(self.queue[-1]):
- # The last message is of the same object type, groupping them
- self.queue[-1][partition].append(msg)
- else:
- self.queue.append({partition: [msg]})
-
-
-class MockedKafkaConsumer:
- def __init__(self, queue):
- self.queue = queue
- self.committed = False
-
- def poll(self):
- return self.queue.pop(0)
-
- def commit(self):
- if self.queue == []:
- self.committed = True
-
-class MockedJournalClient(JournalClient):
- def __init__(self, queue, object_types=ACCEPTED_OBJECT_TYPES):
- self._object_types = object_types
- self.consumer = MockedKafkaConsumer(queue)
+from .utils import MockedJournalClient, MockedKafkaWriter
@given(lists(object_dicts(), min_size=1))
diff --git a/swh/journal/tests/utils.py b/swh/journal/tests/utils.py
new file mode 100644
--- /dev/null
+++ b/swh/journal/tests/utils.py
@@ -0,0 +1,45 @@
+from collections import namedtuple
+
+from swh.journal.client import JournalClient, ACCEPTED_OBJECT_TYPES
+from swh.journal.direct_writer import DirectKafkaWriter
+from swh.journal.serializers import (
+ key_to_kafka, kafka_to_key, value_to_kafka, kafka_to_value)
+
+FakeKafkaMessage = namedtuple('FakeKafkaMessage', 'key value')
+FakeKafkaPartition = namedtuple('FakeKafkaPartition', 'topic')
+
+
+class MockedKafkaWriter(DirectKafkaWriter):
+ def __init__(self, queue):
+ self._prefix = 'prefix'
+ self.queue = queue
+
+ def send(self, topic, key, value):
+ key = kafka_to_key(key_to_kafka(key))
+ value = kafka_to_value(value_to_kafka(value))
+ partition = FakeKafkaPartition(topic)
+ msg = FakeKafkaMessage(key=key, value=value)
+ if self.queue and {partition} == set(self.queue[-1]):
+ # The last message is of the same object type, groupping them
+ self.queue[-1][partition].append(msg)
+ else:
+ self.queue.append({partition: [msg]})
+
+
+class MockedKafkaConsumer:
+ def __init__(self, queue):
+ self.queue = queue
+ self.committed = False
+
+ def poll(self):
+ return self.queue.pop(0)
+
+ def commit(self):
+ if self.queue == []:
+ self.committed = True
+
+
+class MockedJournalClient(JournalClient):
+ def __init__(self, queue, object_types=ACCEPTED_OBJECT_TYPES):
+ self._object_types = object_types
+ self.consumer = MockedKafkaConsumer(queue)

File Metadata

Mime Type
text/plain
Expires
Sun, Aug 17, 11:26 PM (1 w, 3 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3232887

Event Timeline