Page Menu
Home
Software Heritage
Search
Configure Global Search
Log In
Files
F7066519
D2812.id10034.diff
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
15 KB
Subscribers
None
D2812.id10034.diff
View Options
diff --git a/swh/journal/replay.py b/swh/journal/replay.py
--- a/swh/journal/replay.py
+++ b/swh/journal/replay.py
@@ -6,7 +6,9 @@
import copy
import logging
from time import time
-from typing import Any, Callable, Dict, Iterable, List, Optional
+from typing import (
+ Any, Callable, Dict, Iterable, List, Optional
+)
from sentry_sdk import capture_exception, push_scope
try:
@@ -22,7 +24,11 @@
from swh.core.statsd import statsd
from swh.model.identifiers import normalize_timestamp
from swh.model.hashutil import hash_to_hex
-from swh.model.model import BaseContent, SkippedContent, SHA1_SIZE
+
+from swh.model.model import (
+ BaseContent, BaseModel, Content, Directory, Origin, Revision,
+ SHA1_SIZE, SkippedContent, Snapshot, Release
+)
from swh.objstorage.objstorage import (
ID_HASH_ALGO, ObjNotFoundError, ObjStorage,
)
@@ -38,6 +44,17 @@
CONTENT_DURATION_METRIC = "swh_content_replayer_duration_seconds"
+object_converter_fn: Dict[str, Callable[[Dict], BaseModel]] = {
+ 'origin': Origin.from_dict,
+ 'snapshot': Snapshot.from_dict,
+ 'revision': Revision.from_dict,
+ 'release': Release.from_dict,
+ 'directory': Directory.from_dict,
+ 'content': Content.from_dict,
+ 'skipped_content': SkippedContent.from_dict,
+}
+
+
def process_replay_objects(all_objects, *, storage):
for (object_type, objects) in all_objects.items():
logger.debug("Inserting %s %s objects", len(objects), object_type)
@@ -101,46 +118,8 @@
return _check_date(rev['date']) and _check_date(rev['committer_date'])
-def _fix_revisions(revisions):
- good_revisions = []
- for rev in revisions:
- rev = _fix_revision_pypi_empty_string(rev)
- rev = _fix_revision_transplant_source(rev)
- if not _check_revision_date(rev):
- logging.warning('Excluding revision (invalid date): %r', rev)
- continue
- if rev not in good_revisions:
- good_revisions.append(rev)
- return good_revisions
-
-
-def _fix_origin_visits(visits):
- good_visits = []
- for visit in visits:
- visit = visit.copy()
- if 'type' not in visit:
- if isinstance(visit['origin'], dict) and 'type' in visit['origin']:
- # Very old version of the schema: visits did not have a type,
- # but their 'origin' field was a dict with a 'type' key.
- visit['type'] = visit['origin']['type']
- else:
- # Very very old version of the schema: 'type' is missing,
- # so there is nothing we can do to fix it.
- raise ValueError('Got an origin_visit too old to be replayed.')
- if isinstance(visit['origin'], dict):
- # Old version of the schema: visit['origin'] was a dict.
- visit['origin'] = visit['origin']['url']
- good_visits.append(visit)
- return good_visits
-
-
-def fix_objects(object_type, objects):
- """Converts a possibly old object from the journal to its current
- expected format.
-
- List of conversions:
-
- Empty author name/email in PyPI releases:
+def _fix_revisions(revisions: List[Dict]) -> List[Dict]:
+ """Adapt revisions into a list of (current) storage compatible dicts.
>>> from pprint import pprint
>>> date = {
@@ -150,7 +129,7 @@
... },
... 'offset': 0,
... }
- >>> pprint(fix_objects('revision', [{
+ >>> pprint(_fix_revisions([{
... 'author': {'email': '', 'fullname': b'', 'name': ''},
... 'committer': {'email': '', 'fullname': b'', 'name': ''},
... 'date': date,
@@ -165,7 +144,7 @@
Fix type of 'transplant_source' extra headers:
- >>> revs = fix_objects('revision', [{
+ >>> revs = _fix_revisions([{
... 'author': {'email': '', 'fullname': b'', 'name': ''},
... 'committer': {'email': '', 'fullname': b'', 'name': ''},
... 'date': date,
@@ -173,7 +152,7 @@
... 'metadata': {
... 'extra_headers': [
... ['time_offset_seconds', b'-3600'],
- ... ['transplant_source', '29c154a012a70f49df983625090434587622b39e']
+ ... ['transplant_source', '29c154a012a70f49df983625090434587622b39e'] # noqa
... ]}
... }])
>>> pprint(revs[0]['metadata']['extra_headers'])
@@ -185,7 +164,7 @@
>>> from copy import deepcopy
>>> invalid_date1 = deepcopy(date)
>>> invalid_date1['timestamp']['microseconds'] = 1000000000 # > 10^6
- >>> fix_objects('revision', [{
+ >>> _fix_revisions([{
... 'author': {'email': '', 'fullname': b'', 'name': b''},
... 'committer': {'email': '', 'fullname': b'', 'name': b''},
... 'date': invalid_date1,
@@ -195,7 +174,7 @@
>>> invalid_date2 = deepcopy(date)
>>> invalid_date2['timestamp']['seconds'] = 2**70 # > 10^63
- >>> fix_objects('revision', [{
+ >>> _fix_revisions([{
... 'author': {'email': '', 'fullname': b'', 'name': b''},
... 'committer': {'email': '', 'fullname': b'', 'name': b''},
... 'date': invalid_date2,
@@ -205,7 +184,7 @@
>>> invalid_date3 = deepcopy(date)
>>> invalid_date3['offset'] = 2**20 # > 10^15
- >>> fix_objects('revision', [{
+ >>> _fix_revisions([{
... 'author': {'email': '', 'fullname': b'', 'name': b''},
... 'committer': {'email': '', 'fullname': b'', 'name': b''},
... 'date': date,
@@ -213,28 +192,58 @@
... }])
[]
+ """
+ good_revisions: List = []
+ for rev in revisions:
+ rev = _fix_revision_pypi_empty_string(rev)
+ rev = _fix_revision_transplant_source(rev)
+ if not _check_revision_date(rev):
+ logging.warning('Excluding revision (invalid date): %r', rev)
+ continue
+ if rev not in good_revisions:
+ good_revisions.append(rev)
+ return good_revisions
+
+
+def _fix_origin_visits(visits: List[Dict]) -> List[Dict]:
+ """Adapt origin visits into a list of current storage compatible dicts.
`visit['origin']` is a dict instead of an URL:
- >>> pprint(fix_objects('origin_visit', [{
+ >>> from pprint import pprint
+ >>> pprint(_fix_origin_visits([{
... 'origin': {'url': 'http://foo'},
... 'type': 'git',
... }]))
- [{'origin': 'http://foo', 'type': 'git'}]
+ [{'metadata': None, 'origin': 'http://foo', 'type': 'git'}]
`visit['type']` is missing , but `origin['visit']['type']` exists:
- >>> pprint(fix_objects('origin_visit', [
+ >>> pprint(_fix_origin_visits([
... {'origin': {'type': 'hg', 'url': 'http://foo'}
... }]))
- [{'origin': 'http://foo', 'type': 'hg'}]
- """ # noqa
+ [{'metadata': None, 'origin': 'http://foo', 'type': 'hg'}]
- if object_type == 'revision':
- objects = _fix_revisions(objects)
- elif object_type == 'origin_visit':
- objects = _fix_origin_visits(objects)
- return objects
+ """
+ good_visits = []
+ for visit in visits:
+ visit = visit.copy()
+ if 'type' not in visit:
+ if isinstance(visit['origin'], dict) and 'type' in visit['origin']:
+ # Very old version of the schema: visits did not have a type,
+ # but their 'origin' field was a dict with a 'type' key.
+ visit['type'] = visit['origin']['type']
+ else:
+ # Very very old version of the schema: 'type' is missing,
+ # so there is nothing we can do to fix it.
+ raise ValueError('Got an origin_visit too old to be replayed.')
+ if isinstance(visit['origin'], dict):
+ # Old version of the schema: visit['origin'] was a dict.
+ visit['origin'] = visit['origin']['url']
+ if 'metadata' not in visit:
+ visit['metadata'] = None
+ good_visits.append(visit)
+ return good_visits
def collision_aware_content_add(
@@ -253,7 +262,7 @@
colliding_content_hashes: Dict[str, List[Dict[str, bytes]]] = {}
while True:
try:
- content_add_fn(c.to_dict() for c in contents)
+ content_add_fn(contents)
except HashCollision as e:
algo, hash_id, colliding_hashes = e.args
hash_id = hash_to_hex(hash_id)
@@ -270,10 +279,13 @@
})
-def _insert_objects(object_type, objects, storage):
- objects = fix_objects(object_type, objects)
+def _insert_objects(object_type: str, objects: List[Dict], storage) -> None:
+ """Insert objects of type object_type in the storage.
+
+ """
if object_type == 'content':
- contents, skipped_contents = [], []
+ contents: List[BaseContent] = []
+ skipped_contents: List[BaseContent] = []
for content in objects:
c = BaseContent.from_dict(content)
if isinstance(c, SkippedContent):
@@ -285,19 +297,18 @@
storage.skipped_content_add, skipped_contents)
collision_aware_content_add(
storage.content_add_metadata, contents)
-
- elif object_type in ('directory', 'revision', 'release',
- 'snapshot', 'origin'):
- # TODO: split batches that are too large for the storage
- # to handle?
- method = getattr(storage, object_type + '_add')
- method(objects)
+ elif object_type == 'revision':
+ storage.revision_add(
+ Revision.from_dict(r) for r in _fix_revisions(objects)
+ )
elif object_type == 'origin_visit':
- for visit in objects:
- storage.origin_add_one({'url': visit['origin']})
- if 'metadata' not in visit:
- visit['metadata'] = None
- storage.origin_visit_upsert(objects)
+ storage.origin_add(Origin(url=o['origin']) for o in objects)
+ # FIXME: Should be List[OriginVisit], working on fixing
+ # swh.storage.origin_visit_upsert (D2813)
+ storage.origin_visit_upsert(_fix_origin_visits(objects))
+ elif object_type in ('directory', 'release', 'snapshot', 'origin'):
+ method = getattr(storage, object_type + '_add')
+ method(object_converter_fn[object_type](o) for o in objects)
else:
logger.warning('Received a series of %s, this should not happen',
object_type)
diff --git a/swh/journal/tests/test_cli.py b/swh/journal/tests/test_cli.py
--- a/swh/journal/tests/test_cli.py
+++ b/swh/journal/tests/test_cli.py
@@ -51,7 +51,6 @@
storage_config = {
'cls': 'pipeline',
'steps': [
- {'cls': 'validate'},
{'cls': 'memory'},
]
}
diff --git a/swh/journal/tests/test_kafka_writer.py b/swh/journal/tests/test_kafka_writer.py
--- a/swh/journal/tests/test_kafka_writer.py
+++ b/swh/journal/tests/test_kafka_writer.py
@@ -1,4 +1,4 @@
-# Copyright (C) 2018-2019 The Software Heritage developers
+# Copyright (C) 2018-2020 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
@@ -8,14 +8,17 @@
from confluent_kafka import Consumer, KafkaException
from subprocess import Popen
-from typing import Tuple
+from typing import List, Tuple
from swh.storage import get_storage
-from swh.journal.writer.kafka import KafkaJournalWriter
+from swh.journal.replay import object_converter_fn
from swh.journal.serializers import (
kafka_to_key, kafka_to_value
)
+from swh.journal.writer.kafka import KafkaJournalWriter
+
+from swh.model.model import Content, Origin, BaseModel
from .conftest import OBJECT_TYPE_KEYS
@@ -116,7 +119,6 @@
storage_config = {
'cls': 'pipeline',
'steps': [
- {'cls': 'validate'},
{'cls': 'memory', 'journal_writer': writer_config},
]
}
@@ -129,15 +131,25 @@
method = getattr(storage, object_type + '_add')
if object_type in ('content', 'directory', 'revision', 'release',
'snapshot', 'origin'):
+ objects_: List[BaseModel]
if object_type == 'content':
- objects = [{**obj, 'data': b''} for obj in objects]
- method(objects)
+ objects_ = [
+ Content.from_dict({
+ **obj, 'data': b''})
+ for obj in objects
+ ]
+ else:
+ objects_ = [
+ object_converter_fn[object_type](obj)
+ for obj in objects
+ ]
+ method(objects_)
expected_messages += len(objects)
elif object_type in ('origin_visit',):
for object_ in objects:
object_ = object_.copy()
origin_url = object_.pop('origin')
- storage.origin_add_one({'url': origin_url})
+ storage.origin_add_one(Origin(url=origin_url))
visit = method(origin=origin_url, date=object_.pop('date'),
type=object_.pop('type'))
expected_messages += 1
diff --git a/swh/journal/tests/test_replay.py b/swh/journal/tests/test_replay.py
--- a/swh/journal/tests/test_replay.py
+++ b/swh/journal/tests/test_replay.py
@@ -30,7 +30,6 @@
storage_config = {
'cls': 'pipeline',
'steps': [
- {'cls': 'validate'},
{'cls': 'memory'},
]
}
diff --git a/swh/journal/tests/test_write_replay.py b/swh/journal/tests/test_write_replay.py
--- a/swh/journal/tests/test_write_replay.py
+++ b/swh/journal/tests/test_write_replay.py
@@ -1,4 +1,4 @@
-# Copyright (C) 2019 The Software Heritage developers
+# Copyright (C) 2019-2020 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
@@ -11,10 +11,12 @@
from hypothesis.strategies import lists
from swh.model.hypothesis_strategies import object_dicts, present_contents
+from swh.model.model import Origin
from swh.storage import get_storage, HashCollision
-from swh.journal.replay import process_replay_objects
-from swh.journal.replay import process_replay_objects_content
+from swh.journal.replay import (
+ process_replay_objects, process_replay_objects_content, object_converter_fn
+)
from .utils import MockedJournalClient, MockedKafkaWriter
@@ -22,7 +24,6 @@
storage_config = {
'cls': 'pipeline',
'steps': [
- {'cls': 'validate'},
{'cls': 'memory', 'journal_writer': {'cls': 'memory'}},
]
}
@@ -64,17 +65,18 @@
return_value=MockedKafkaWriter(queue)):
storage1 = get_storage(**storage_config)
+ # Write objects to storage1
for (obj_type, obj) in objects:
obj = obj.copy()
if obj_type == 'origin_visit':
- storage1.origin_add_one({'url': obj['origin']})
+ storage1.origin_add_one(Origin(url=obj['origin']))
storage1.origin_visit_upsert([obj])
else:
if obj_type == 'content' and obj.get('status') == 'absent':
obj_type = 'skipped_content'
method = getattr(storage1, obj_type + '_add')
try:
- method([obj])
+ method([object_converter_fn[obj_type](obj)])
except HashCollision:
pass
@@ -128,7 +130,7 @@
contents = []
for obj in objects:
- obj = obj.to_dict()
+ obj = obj
storage1.content_add([obj])
contents.append(obj)
@@ -152,7 +154,7 @@
# only content with status visible will be copied in storage2
expected_objstorage_state = {
- c['sha1']: c['data'] for c in contents if c['status'] == 'visible'
+ c.sha1: c.data for c in contents if c.status == 'visible'
}
assert expected_objstorage_state == objstorage2.state
File Metadata
Details
Attached
Mime Type
text/plain
Expires
Nov 5 2024, 2:17 PM (11 w, 16 h ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3232672
Attached To
D2812: journal: Use swh-model objects instead of dicts in replay and writer
Event Timeline
Log In to Comment