Page MenuHomeSoftware Heritage

D2812.id10034.diff
No OneTemporary

D2812.id10034.diff

diff --git a/swh/journal/replay.py b/swh/journal/replay.py
--- a/swh/journal/replay.py
+++ b/swh/journal/replay.py
@@ -6,7 +6,9 @@
import copy
import logging
from time import time
-from typing import Any, Callable, Dict, Iterable, List, Optional
+from typing import (
+ Any, Callable, Dict, Iterable, List, Optional
+)
from sentry_sdk import capture_exception, push_scope
try:
@@ -22,7 +24,11 @@
from swh.core.statsd import statsd
from swh.model.identifiers import normalize_timestamp
from swh.model.hashutil import hash_to_hex
-from swh.model.model import BaseContent, SkippedContent, SHA1_SIZE
+
+from swh.model.model import (
+ BaseContent, BaseModel, Content, Directory, Origin, Revision,
+ SHA1_SIZE, SkippedContent, Snapshot, Release
+)
from swh.objstorage.objstorage import (
ID_HASH_ALGO, ObjNotFoundError, ObjStorage,
)
@@ -38,6 +44,17 @@
CONTENT_DURATION_METRIC = "swh_content_replayer_duration_seconds"
+object_converter_fn: Dict[str, Callable[[Dict], BaseModel]] = {
+ 'origin': Origin.from_dict,
+ 'snapshot': Snapshot.from_dict,
+ 'revision': Revision.from_dict,
+ 'release': Release.from_dict,
+ 'directory': Directory.from_dict,
+ 'content': Content.from_dict,
+ 'skipped_content': SkippedContent.from_dict,
+}
+
+
def process_replay_objects(all_objects, *, storage):
for (object_type, objects) in all_objects.items():
logger.debug("Inserting %s %s objects", len(objects), object_type)
@@ -101,46 +118,8 @@
return _check_date(rev['date']) and _check_date(rev['committer_date'])
-def _fix_revisions(revisions):
- good_revisions = []
- for rev in revisions:
- rev = _fix_revision_pypi_empty_string(rev)
- rev = _fix_revision_transplant_source(rev)
- if not _check_revision_date(rev):
- logging.warning('Excluding revision (invalid date): %r', rev)
- continue
- if rev not in good_revisions:
- good_revisions.append(rev)
- return good_revisions
-
-
-def _fix_origin_visits(visits):
- good_visits = []
- for visit in visits:
- visit = visit.copy()
- if 'type' not in visit:
- if isinstance(visit['origin'], dict) and 'type' in visit['origin']:
- # Very old version of the schema: visits did not have a type,
- # but their 'origin' field was a dict with a 'type' key.
- visit['type'] = visit['origin']['type']
- else:
- # Very very old version of the schema: 'type' is missing,
- # so there is nothing we can do to fix it.
- raise ValueError('Got an origin_visit too old to be replayed.')
- if isinstance(visit['origin'], dict):
- # Old version of the schema: visit['origin'] was a dict.
- visit['origin'] = visit['origin']['url']
- good_visits.append(visit)
- return good_visits
-
-
-def fix_objects(object_type, objects):
- """Converts a possibly old object from the journal to its current
- expected format.
-
- List of conversions:
-
- Empty author name/email in PyPI releases:
+def _fix_revisions(revisions: List[Dict]) -> List[Dict]:
+ """Adapt revisions into a list of (current) storage compatible dicts.
>>> from pprint import pprint
>>> date = {
@@ -150,7 +129,7 @@
... },
... 'offset': 0,
... }
- >>> pprint(fix_objects('revision', [{
+ >>> pprint(_fix_revisions([{
... 'author': {'email': '', 'fullname': b'', 'name': ''},
... 'committer': {'email': '', 'fullname': b'', 'name': ''},
... 'date': date,
@@ -165,7 +144,7 @@
Fix type of 'transplant_source' extra headers:
- >>> revs = fix_objects('revision', [{
+ >>> revs = _fix_revisions([{
... 'author': {'email': '', 'fullname': b'', 'name': ''},
... 'committer': {'email': '', 'fullname': b'', 'name': ''},
... 'date': date,
@@ -173,7 +152,7 @@
... 'metadata': {
... 'extra_headers': [
... ['time_offset_seconds', b'-3600'],
- ... ['transplant_source', '29c154a012a70f49df983625090434587622b39e']
+ ... ['transplant_source', '29c154a012a70f49df983625090434587622b39e'] # noqa
... ]}
... }])
>>> pprint(revs[0]['metadata']['extra_headers'])
@@ -185,7 +164,7 @@
>>> from copy import deepcopy
>>> invalid_date1 = deepcopy(date)
>>> invalid_date1['timestamp']['microseconds'] = 1000000000 # > 10^6
- >>> fix_objects('revision', [{
+ >>> _fix_revisions([{
... 'author': {'email': '', 'fullname': b'', 'name': b''},
... 'committer': {'email': '', 'fullname': b'', 'name': b''},
... 'date': invalid_date1,
@@ -195,7 +174,7 @@
>>> invalid_date2 = deepcopy(date)
>>> invalid_date2['timestamp']['seconds'] = 2**70 # > 10^63
- >>> fix_objects('revision', [{
+ >>> _fix_revisions([{
... 'author': {'email': '', 'fullname': b'', 'name': b''},
... 'committer': {'email': '', 'fullname': b'', 'name': b''},
... 'date': invalid_date2,
@@ -205,7 +184,7 @@
>>> invalid_date3 = deepcopy(date)
>>> invalid_date3['offset'] = 2**20 # > 10^15
- >>> fix_objects('revision', [{
+ >>> _fix_revisions([{
... 'author': {'email': '', 'fullname': b'', 'name': b''},
... 'committer': {'email': '', 'fullname': b'', 'name': b''},
... 'date': date,
@@ -213,28 +192,58 @@
... }])
[]
+ """
+ good_revisions: List = []
+ for rev in revisions:
+ rev = _fix_revision_pypi_empty_string(rev)
+ rev = _fix_revision_transplant_source(rev)
+ if not _check_revision_date(rev):
+ logging.warning('Excluding revision (invalid date): %r', rev)
+ continue
+ if rev not in good_revisions:
+ good_revisions.append(rev)
+ return good_revisions
+
+
+def _fix_origin_visits(visits: List[Dict]) -> List[Dict]:
+ """Adapt origin visits into a list of current storage compatible dicts.
`visit['origin']` is a dict instead of an URL:
- >>> pprint(fix_objects('origin_visit', [{
+ >>> from pprint import pprint
+ >>> pprint(_fix_origin_visits([{
... 'origin': {'url': 'http://foo'},
... 'type': 'git',
... }]))
- [{'origin': 'http://foo', 'type': 'git'}]
+ [{'metadata': None, 'origin': 'http://foo', 'type': 'git'}]
`visit['type']` is missing , but `origin['visit']['type']` exists:
- >>> pprint(fix_objects('origin_visit', [
+ >>> pprint(_fix_origin_visits([
... {'origin': {'type': 'hg', 'url': 'http://foo'}
... }]))
- [{'origin': 'http://foo', 'type': 'hg'}]
- """ # noqa
+ [{'metadata': None, 'origin': 'http://foo', 'type': 'hg'}]
- if object_type == 'revision':
- objects = _fix_revisions(objects)
- elif object_type == 'origin_visit':
- objects = _fix_origin_visits(objects)
- return objects
+ """
+ good_visits = []
+ for visit in visits:
+ visit = visit.copy()
+ if 'type' not in visit:
+ if isinstance(visit['origin'], dict) and 'type' in visit['origin']:
+ # Very old version of the schema: visits did not have a type,
+ # but their 'origin' field was a dict with a 'type' key.
+ visit['type'] = visit['origin']['type']
+ else:
+ # Very very old version of the schema: 'type' is missing,
+ # so there is nothing we can do to fix it.
+ raise ValueError('Got an origin_visit too old to be replayed.')
+ if isinstance(visit['origin'], dict):
+ # Old version of the schema: visit['origin'] was a dict.
+ visit['origin'] = visit['origin']['url']
+ if 'metadata' not in visit:
+ visit['metadata'] = None
+ good_visits.append(visit)
+ return good_visits
def collision_aware_content_add(
@@ -253,7 +262,7 @@
colliding_content_hashes: Dict[str, List[Dict[str, bytes]]] = {}
while True:
try:
- content_add_fn(c.to_dict() for c in contents)
+ content_add_fn(contents)
except HashCollision as e:
algo, hash_id, colliding_hashes = e.args
hash_id = hash_to_hex(hash_id)
@@ -270,10 +279,13 @@
})
-def _insert_objects(object_type, objects, storage):
- objects = fix_objects(object_type, objects)
+def _insert_objects(object_type: str, objects: List[Dict], storage) -> None:
+ """Insert objects of type object_type in the storage.
+
+ """
if object_type == 'content':
- contents, skipped_contents = [], []
+ contents: List[BaseContent] = []
+ skipped_contents: List[BaseContent] = []
for content in objects:
c = BaseContent.from_dict(content)
if isinstance(c, SkippedContent):
@@ -285,19 +297,18 @@
storage.skipped_content_add, skipped_contents)
collision_aware_content_add(
storage.content_add_metadata, contents)
-
- elif object_type in ('directory', 'revision', 'release',
- 'snapshot', 'origin'):
- # TODO: split batches that are too large for the storage
- # to handle?
- method = getattr(storage, object_type + '_add')
- method(objects)
+ elif object_type == 'revision':
+ storage.revision_add(
+ Revision.from_dict(r) for r in _fix_revisions(objects)
+ )
elif object_type == 'origin_visit':
- for visit in objects:
- storage.origin_add_one({'url': visit['origin']})
- if 'metadata' not in visit:
- visit['metadata'] = None
- storage.origin_visit_upsert(objects)
+ storage.origin_add(Origin(url=o['origin']) for o in objects)
+ # FIXME: Should be List[OriginVisit], working on fixing
+ # swh.storage.origin_visit_upsert (D2813)
+ storage.origin_visit_upsert(_fix_origin_visits(objects))
+ elif object_type in ('directory', 'release', 'snapshot', 'origin'):
+ method = getattr(storage, object_type + '_add')
+ method(object_converter_fn[object_type](o) for o in objects)
else:
logger.warning('Received a series of %s, this should not happen',
object_type)
diff --git a/swh/journal/tests/test_cli.py b/swh/journal/tests/test_cli.py
--- a/swh/journal/tests/test_cli.py
+++ b/swh/journal/tests/test_cli.py
@@ -51,7 +51,6 @@
storage_config = {
'cls': 'pipeline',
'steps': [
- {'cls': 'validate'},
{'cls': 'memory'},
]
}
diff --git a/swh/journal/tests/test_kafka_writer.py b/swh/journal/tests/test_kafka_writer.py
--- a/swh/journal/tests/test_kafka_writer.py
+++ b/swh/journal/tests/test_kafka_writer.py
@@ -1,4 +1,4 @@
-# Copyright (C) 2018-2019 The Software Heritage developers
+# Copyright (C) 2018-2020 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
@@ -8,14 +8,17 @@
from confluent_kafka import Consumer, KafkaException
from subprocess import Popen
-from typing import Tuple
+from typing import List, Tuple
from swh.storage import get_storage
-from swh.journal.writer.kafka import KafkaJournalWriter
+from swh.journal.replay import object_converter_fn
from swh.journal.serializers import (
kafka_to_key, kafka_to_value
)
+from swh.journal.writer.kafka import KafkaJournalWriter
+
+from swh.model.model import Content, Origin, BaseModel
from .conftest import OBJECT_TYPE_KEYS
@@ -116,7 +119,6 @@
storage_config = {
'cls': 'pipeline',
'steps': [
- {'cls': 'validate'},
{'cls': 'memory', 'journal_writer': writer_config},
]
}
@@ -129,15 +131,25 @@
method = getattr(storage, object_type + '_add')
if object_type in ('content', 'directory', 'revision', 'release',
'snapshot', 'origin'):
+ objects_: List[BaseModel]
if object_type == 'content':
- objects = [{**obj, 'data': b''} for obj in objects]
- method(objects)
+ objects_ = [
+ Content.from_dict({
+ **obj, 'data': b''})
+ for obj in objects
+ ]
+ else:
+ objects_ = [
+ object_converter_fn[object_type](obj)
+ for obj in objects
+ ]
+ method(objects_)
expected_messages += len(objects)
elif object_type in ('origin_visit',):
for object_ in objects:
object_ = object_.copy()
origin_url = object_.pop('origin')
- storage.origin_add_one({'url': origin_url})
+ storage.origin_add_one(Origin(url=origin_url))
visit = method(origin=origin_url, date=object_.pop('date'),
type=object_.pop('type'))
expected_messages += 1
diff --git a/swh/journal/tests/test_replay.py b/swh/journal/tests/test_replay.py
--- a/swh/journal/tests/test_replay.py
+++ b/swh/journal/tests/test_replay.py
@@ -30,7 +30,6 @@
storage_config = {
'cls': 'pipeline',
'steps': [
- {'cls': 'validate'},
{'cls': 'memory'},
]
}
diff --git a/swh/journal/tests/test_write_replay.py b/swh/journal/tests/test_write_replay.py
--- a/swh/journal/tests/test_write_replay.py
+++ b/swh/journal/tests/test_write_replay.py
@@ -1,4 +1,4 @@
-# Copyright (C) 2019 The Software Heritage developers
+# Copyright (C) 2019-2020 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
@@ -11,10 +11,12 @@
from hypothesis.strategies import lists
from swh.model.hypothesis_strategies import object_dicts, present_contents
+from swh.model.model import Origin
from swh.storage import get_storage, HashCollision
-from swh.journal.replay import process_replay_objects
-from swh.journal.replay import process_replay_objects_content
+from swh.journal.replay import (
+ process_replay_objects, process_replay_objects_content, object_converter_fn
+)
from .utils import MockedJournalClient, MockedKafkaWriter
@@ -22,7 +24,6 @@
storage_config = {
'cls': 'pipeline',
'steps': [
- {'cls': 'validate'},
{'cls': 'memory', 'journal_writer': {'cls': 'memory'}},
]
}
@@ -64,17 +65,18 @@
return_value=MockedKafkaWriter(queue)):
storage1 = get_storage(**storage_config)
+ # Write objects to storage1
for (obj_type, obj) in objects:
obj = obj.copy()
if obj_type == 'origin_visit':
- storage1.origin_add_one({'url': obj['origin']})
+ storage1.origin_add_one(Origin(url=obj['origin']))
storage1.origin_visit_upsert([obj])
else:
if obj_type == 'content' and obj.get('status') == 'absent':
obj_type = 'skipped_content'
method = getattr(storage1, obj_type + '_add')
try:
- method([obj])
+ method([object_converter_fn[obj_type](obj)])
except HashCollision:
pass
@@ -128,7 +130,7 @@
contents = []
for obj in objects:
- obj = obj.to_dict()
+ obj = obj
storage1.content_add([obj])
contents.append(obj)
@@ -152,7 +154,7 @@
# only content with status visible will be copied in storage2
expected_objstorage_state = {
- c['sha1']: c['data'] for c in contents if c['status'] == 'visible'
+ c.sha1: c.data for c in contents if c.status == 'visible'
}
assert expected_objstorage_state == objstorage2.state

File Metadata

Mime Type
text/plain
Expires
Nov 5 2024, 2:17 PM (11 w, 16 h ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3232672

Event Timeline