Page Menu
Home
Software Heritage
Search
Configure Global Search
Log In
Files
F9697323
D1745.id5876.diff
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
9 KB
Subscribers
None
D1745.id5876.diff
View Options
diff --git a/swh/journal/client.py b/swh/journal/client.py
--- a/swh/journal/client.py
+++ b/swh/journal/client.py
@@ -3,9 +3,10 @@
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
-from kafka import KafkaConsumer
import logging
+from kafka import KafkaConsumer
+
from .serializers import kafka_to_key, kafka_to_value
from swh.journal import DEFAULT_PREFIX
diff --git a/swh/journal/replay.py b/swh/journal/replay.py
--- a/swh/journal/replay.py
+++ b/swh/journal/replay.py
@@ -35,12 +35,26 @@
method = getattr(storage, object_type + '_add')
method(objects)
elif object_type == 'origin_visit':
- storage.origin_visit_upsert([
- {
- **obj,
- 'origin': storage.origin_add_one(obj['origin'])
- }
- for obj in objects])
+ for visit in objects:
+ if isinstance(visit['origin'], str):
+ # old format; note that it will crash with the pg and
+ # in-mem storages if the origin is not already known,
+ # but there is no other choice because we can't add an
+ # origin without knowing its type. Non-pg storages
+ # don't use a numeric FK internally,
+ visit['origin'] = {'url': visit['origin']}
+ else:
+ storage.origin_add_one(visit['origin'])
+ if 'type' not in visit:
+ # old format
+ visit['type'] = visit['origin']['type']
+
+ # filter out buggy objects produced by the backfiller
+ # see https://forge.softwareheritage.org/D1742
+ objects = [visit for visit in objects
+ if 'visit' in visit]
+
+ storage.origin_visit_upsert(objects)
else:
logger.warning('Received a series of %s, this should not happen',
object_type)
diff --git a/swh/journal/tests/test_replay.py b/swh/journal/tests/test_replay.py
--- a/swh/journal/tests/test_replay.py
+++ b/swh/journal/tests/test_replay.py
@@ -19,6 +19,7 @@
from swh.journal.replay import process_replay_objects
from .conftest import OBJECT_TYPE_KEYS
+from .utils import MockedJournalClient, MockedKafkaWriter
def test_storage_play(
@@ -100,3 +101,101 @@
[cont['sha1'] for cont in OBJECT_TYPE_KEYS['content'][1]]))
assert None not in contents
assert contents == OBJECT_TYPE_KEYS['content'][1]
+
+
+def test_write_replay_legacy_origin_visit1():
+ """Test origin_visit when the 'origin' is just a string."""
+ queue = []
+ replayer = MockedJournalClient(queue)
+ writer = MockedKafkaWriter(queue)
+
+ # Note that flipping the order of these two insertions will crash
+ # the test, because the legacy origin_format does not allow to create
+ # the origin when needed (type is missing)
+ now = datetime.datetime.now()
+ writer.send('origin', 'foo', {
+ 'url': 'http://example.com/',
+ 'type': 'git',
+ })
+ writer.send('origin_visit', 'foo', {
+ 'visit': 1,
+ 'origin': 'http://example.com/',
+ 'date': now,
+ })
+
+ queue_size = sum(len(partition)
+ for batch in queue
+ for partition in batch.values())
+
+ storage = get_storage('memory', {})
+ worker_fn = functools.partial(process_replay_objects, storage=storage)
+ nb_messages = 0
+ while nb_messages < queue_size:
+ nb_messages += replayer.process(worker_fn)
+
+ visits = list(storage.origin_visit_get('http://example.com/'))
+
+ assert visits in [
+ # old format (with origin ids)
+ [{
+ 'visit': 1,
+ 'origin': 1,
+ 'date': now,
+ }],
+ # new format (with origin urls)
+ [{
+ 'visit': 1,
+ 'origin': {'url': 'http://example.com/'},
+ 'date': now,
+ }]
+ ]
+
+
+def test_write_replay_legacy_origin_visit2():
+ """Test origin_visit when 'type' is missing."""
+ queue = []
+ replayer = MockedJournalClient(queue)
+ writer = MockedKafkaWriter(queue)
+
+ now = datetime.datetime.now()
+ writer.send('origin', 'foo', {
+ 'url': 'http://example.com/',
+ 'type': 'git',
+ })
+ writer.send('origin_visit', 'foo', {
+ 'visit': 1,
+ 'origin': {
+ 'url': 'http://example.com/',
+ 'type': 'git',
+ },
+ 'date': now,
+ })
+
+ queue_size = sum(len(partition)
+ for batch in queue
+ for partition in batch.values())
+
+ storage = get_storage('memory', {})
+ worker_fn = functools.partial(process_replay_objects, storage=storage)
+ nb_messages = 0
+ while nb_messages < queue_size:
+ nb_messages += replayer.process(worker_fn)
+
+ visits = list(storage.origin_visit_get('http://example.com/'))
+
+ assert visits in [
+ # old format (with origin ids)
+ [{
+ 'visit': 1,
+ 'origin': 1,
+ 'date': now,
+ 'type': 'git',
+ }],
+ # new format (with origin urls)
+ [{
+ 'visit': 1,
+ 'origin': {'url': 'http://example.com/'},
+ 'date': now,
+ 'type': 'git',
+ }]
+ ]
diff --git a/swh/journal/tests/test_write_replay.py b/swh/journal/tests/test_write_replay.py
--- a/swh/journal/tests/test_write_replay.py
+++ b/swh/journal/tests/test_write_replay.py
@@ -3,7 +3,6 @@
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
-from collections import namedtuple
import functools
from hypothesis import given, settings, HealthCheck
@@ -13,51 +12,10 @@
from swh.storage.in_memory import Storage
from swh.storage import HashCollision
-from swh.journal.client import JournalClient, ACCEPTED_OBJECT_TYPES
-from swh.journal.direct_writer import DirectKafkaWriter
from swh.journal.replay import process_replay_objects
from swh.journal.replay import process_replay_objects_content
-from swh.journal.serializers import (
- key_to_kafka, kafka_to_key, value_to_kafka, kafka_to_value)
-
-FakeKafkaMessage = namedtuple('FakeKafkaMessage', 'key value')
-FakeKafkaPartition = namedtuple('FakeKafkaPartition', 'topic')
-
-
-class MockedKafkaWriter(DirectKafkaWriter):
- def __init__(self, queue):
- self._prefix = 'prefix'
- self.queue = queue
-
- def send(self, topic, key, value):
- key = kafka_to_key(key_to_kafka(key))
- value = kafka_to_value(value_to_kafka(value))
- partition = FakeKafkaPartition(topic)
- msg = FakeKafkaMessage(key=key, value=value)
- if self.queue and {partition} == set(self.queue[-1]):
- # The last message is of the same object type, groupping them
- self.queue[-1][partition].append(msg)
- else:
- self.queue.append({partition: [msg]})
-
-
-class MockedKafkaConsumer:
- def __init__(self, queue):
- self.queue = queue
- self.committed = False
-
- def poll(self):
- return self.queue.pop(0)
-
- def commit(self):
- if self.queue == []:
- self.committed = True
-
-class MockedJournalClient(JournalClient):
- def __init__(self, queue, object_types=ACCEPTED_OBJECT_TYPES):
- self._object_types = object_types
- self.consumer = MockedKafkaConsumer(queue)
+from .utils import MockedJournalClient, MockedKafkaWriter
@given(lists(object_dicts(), min_size=1))
diff --git a/swh/journal/tests/utils.py b/swh/journal/tests/utils.py
new file mode 100644
--- /dev/null
+++ b/swh/journal/tests/utils.py
@@ -0,0 +1,45 @@
+from collections import namedtuple
+
+from swh.journal.client import JournalClient, ACCEPTED_OBJECT_TYPES
+from swh.journal.direct_writer import DirectKafkaWriter
+from swh.journal.serializers import (
+ key_to_kafka, kafka_to_key, value_to_kafka, kafka_to_value)
+
+FakeKafkaMessage = namedtuple('FakeKafkaMessage', 'key value')
+FakeKafkaPartition = namedtuple('FakeKafkaPartition', 'topic')
+
+
+class MockedKafkaWriter(DirectKafkaWriter):
+ def __init__(self, queue):
+ self._prefix = 'prefix'
+ self.queue = queue
+
+ def send(self, topic, key, value):
+ key = kafka_to_key(key_to_kafka(key))
+ value = kafka_to_value(value_to_kafka(value))
+ partition = FakeKafkaPartition(topic)
+ msg = FakeKafkaMessage(key=key, value=value)
+ if self.queue and {partition} == set(self.queue[-1]):
+ # The last message is of the same object type, groupping them
+ self.queue[-1][partition].append(msg)
+ else:
+ self.queue.append({partition: [msg]})
+
+
+class MockedKafkaConsumer:
+ def __init__(self, queue):
+ self.queue = queue
+ self.committed = False
+
+ def poll(self):
+ return self.queue.pop(0)
+
+ def commit(self):
+ if self.queue == []:
+ self.committed = True
+
+
+class MockedJournalClient(JournalClient):
+ def __init__(self, queue, object_types=ACCEPTED_OBJECT_TYPES):
+ self._object_types = object_types
+ self.consumer = MockedKafkaConsumer(queue)
File Metadata
Details
Attached
Mime Type
text/plain
Expires
Sun, Aug 17, 11:26 PM (1 w, 3 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3232887
Attached To
D1745: Make the replayer adapt from older formats.
Event Timeline
Log In to Comment