Page Menu
Home
Software Heritage
Search
Configure Global Search
Log In
Files
F7124224
D3010.id10679.diff
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
27 KB
Subscribers
None
D3010.id10679.diff
View Options
diff --git a/mypy.ini b/mypy.ini
--- a/mypy.ini
+++ b/mypy.ini
@@ -14,6 +14,9 @@
[mypy-cassandra.*]
ignore_missing_imports = True
+[mypy-confluent_kafka.*]
+ignore_missing_imports = True
+
# only shipped indirectly via hypothesis
[mypy-django.*]
ignore_missing_imports = True
@@ -27,6 +30,9 @@
[mypy-pytest.*]
ignore_missing_imports = True
+[mypy-pytest_kafka.*]
+ignore_missing_imports = True
+
[mypy-systemd.daemon.*]
ignore_missing_imports = True
diff --git a/swh/storage/cli.py b/swh/storage/cli.py
--- a/swh/storage/cli.py
+++ b/swh/storage/cli.py
@@ -3,13 +3,21 @@
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
+import functools
import logging
import click
from swh.core.cli import CONTEXT_SETTINGS
+from swh.journal.cli import get_journal_client
+from swh.storage import get_storage
from swh.storage.api.server import load_and_check_config, app
+try:
+ from systemd.daemon import notify
+except ImportError:
+ notify = None
+
@click.group(name="storage", context_settings=CONTEXT_SETTINGS)
@click.pass_context
@@ -103,6 +111,47 @@
ctx.exit(0)
+@storage.command()
+@click.option(
+ "--stop-after-objects",
+ "-n",
+ default=None,
+ type=int,
+ help="Stop after processing this many objects. Default is to " "run forever.",
+)
+@click.pass_context
+def replay(ctx, stop_after_objects):
+ """Fill a Storage by reading a Journal.
+
+ There can be several 'replayers' filling a Storage as long as they use
+ the same `group-id`.
+ """
+ from swh.storage.replay import process_replay_objects
+
+ conf = ctx.obj["config"]
+ try:
+ storage = get_storage(**conf.pop("storage"))
+ except KeyError:
+ ctx.fail("You must have a storage configured in your config file.")
+
+ client = get_journal_client(ctx, stop_after_objects=stop_after_objects)
+ worker_fn = functools.partial(process_replay_objects, storage=storage)
+
+ if notify:
+ notify("READY=1")
+
+ try:
+ client.process(worker_fn)
+ except KeyboardInterrupt:
+ ctx.exit(0)
+ else:
+ print("Done.")
+ finally:
+ if notify:
+ notify("STOPPING=1")
+ client.close()
+
+
def main():
logging.basicConfig()
return serve(auto_envvar_prefix="SWH_STORAGE")
diff --git a/swh/storage/fixer.py b/swh/storage/fixer.py
new file mode 100644
--- /dev/null
+++ b/swh/storage/fixer.py
@@ -0,0 +1,293 @@
+import copy
+import logging
+from typing import Any, Dict, List, Optional
+from swh.model.identifiers import normalize_timestamp
+
+logger = logging.getLogger(__name__)
+
+
+def _fix_content(content: Dict[str, Any]) -> Dict[str, Any]:
+ """Filters-out invalid 'perms' key that leaked from swh.model.from_disk
+ to the journal.
+
+ >>> _fix_content({'perms': 0o100644, 'sha1_git': b'foo'})
+ {'sha1_git': b'foo'}
+
+ >>> _fix_content({'sha1_git': b'bar'})
+ {'sha1_git': b'bar'}
+
+ """
+ content = content.copy()
+ content.pop("perms", None)
+ return content
+
+
+def _fix_revision_pypi_empty_string(rev):
+ """PyPI loader failed to encode empty strings as bytes, see:
+ swh:1:rev:8f0095ee0664867055d03de9bcc8f95b91d8a2b9
+ or https://forge.softwareheritage.org/D1772
+ """
+ rev = {
+ **rev,
+ "author": rev["author"].copy(),
+ "committer": rev["committer"].copy(),
+ }
+ if rev["author"].get("email") == "":
+ rev["author"]["email"] = b""
+ if rev["author"].get("name") == "":
+ rev["author"]["name"] = b""
+ if rev["committer"].get("email") == "":
+ rev["committer"]["email"] = b""
+ if rev["committer"].get("name") == "":
+ rev["committer"]["name"] = b""
+ return rev
+
+
+def _fix_revision_transplant_source(rev):
+ if rev.get("metadata") and rev["metadata"].get("extra_headers"):
+ rev = copy.deepcopy(rev)
+ rev["metadata"]["extra_headers"] = [
+ [key, value.encode("ascii")]
+ if key == "transplant_source" and isinstance(value, str)
+ else [key, value]
+ for (key, value) in rev["metadata"]["extra_headers"]
+ ]
+ return rev
+
+
+def _check_date(date):
+ """Returns whether the date can be represented in backends with sane
+ limits on timestamps and timezones (resp. signed 64-bits and
+ signed 16 bits), and that microseconds is valid (ie. between 0 and 10^6).
+ """
+ if date is None:
+ return True
+ date = normalize_timestamp(date)
+ return (
+ (-(2 ** 63) <= date["timestamp"]["seconds"] < 2 ** 63)
+ and (0 <= date["timestamp"]["microseconds"] < 10 ** 6)
+ and (-(2 ** 15) <= date["offset"] < 2 ** 15)
+ )
+
+
+def _check_revision_date(rev):
+ """Exclude revisions with invalid dates.
+ See https://forge.softwareheritage.org/T1339"""
+ return _check_date(rev["date"]) and _check_date(rev["committer_date"])
+
+
+def _fix_revision(revision: Dict[str, Any]) -> Optional[Dict]:
+ """Fix various legacy revision issues.
+
+ Fix author/committer person:
+
+ >>> from pprint import pprint
+ >>> date = {
+ ... 'timestamp': {
+ ... 'seconds': 1565096932,
+ ... 'microseconds': 0,
+ ... },
+ ... 'offset': 0,
+ ... }
+ >>> rev0 = _fix_revision({
+ ... 'id': b'rev-id',
+ ... 'author': {'fullname': b'', 'name': '', 'email': ''},
+ ... 'committer': {'fullname': b'', 'name': '', 'email': ''},
+ ... 'date': date,
+ ... 'committer_date': date,
+ ... 'type': 'git',
+ ... 'message': '',
+ ... 'directory': b'dir-id',
+ ... 'synthetic': False,
+ ... })
+ >>> rev0['author']
+ {'fullname': b'', 'name': b'', 'email': b''}
+ >>> rev0['committer']
+ {'fullname': b'', 'name': b'', 'email': b''}
+
+ Fix type of 'transplant_source' extra headers:
+
+ >>> rev1 = _fix_revision({
+ ... 'id': b'rev-id',
+ ... 'author': {'fullname': b'', 'name': '', 'email': ''},
+ ... 'committer': {'fullname': b'', 'name': '', 'email': ''},
+ ... 'date': date,
+ ... 'committer_date': date,
+ ... 'metadata': {
+ ... 'extra_headers': [
+ ... ['time_offset_seconds', b'-3600'],
+ ... ['transplant_source', '29c154a012a70f49df983625090434587622b39e']
+ ... ]},
+ ... 'type': 'git',
+ ... 'message': '',
+ ... 'directory': b'dir-id',
+ ... 'synthetic': False,
+ ... })
+ >>> pprint(rev1['metadata']['extra_headers'])
+ [['time_offset_seconds', b'-3600'],
+ ['transplant_source', b'29c154a012a70f49df983625090434587622b39e']]
+
+ Revision with invalid date are filtered:
+
+ >>> from copy import deepcopy
+ >>> invalid_date1 = deepcopy(date)
+ >>> invalid_date1['timestamp']['microseconds'] = 1000000000 # > 10^6
+ >>> rev = _fix_revision({
+ ... 'author': {'fullname': b'', 'name': '', 'email': ''},
+ ... 'committer': {'fullname': b'', 'name': '', 'email': ''},
+ ... 'date': invalid_date1,
+ ... 'committer_date': date,
+ ... })
+ >>> rev is None
+ True
+
+ >>> invalid_date2 = deepcopy(date)
+ >>> invalid_date2['timestamp']['seconds'] = 2**70 # > 10^63
+ >>> rev = _fix_revision({
+ ... 'author': {'fullname': b'', 'name': '', 'email': ''},
+ ... 'committer': {'fullname': b'', 'name': '', 'email': ''},
+ ... 'date': invalid_date2,
+ ... 'committer_date': date,
+ ... })
+ >>> rev is None
+ True
+
+ >>> invalid_date3 = deepcopy(date)
+ >>> invalid_date3['offset'] = 2**20 # > 10^15
+ >>> rev = _fix_revision({
+ ... 'author': {'fullname': b'', 'name': '', 'email': ''},
+ ... 'committer': {'fullname': b'', 'name': '', 'email': ''},
+ ... 'date': date,
+ ... 'committer_date': invalid_date3,
+ ... })
+ >>> rev is None
+ True
+
+ """ # noqa
+ rev = _fix_revision_pypi_empty_string(revision)
+ rev = _fix_revision_transplant_source(rev)
+ if not _check_revision_date(rev):
+ logger.warning(
+ "Invalid revision date detected: %(revision)s", {"revision": rev}
+ )
+ return None
+ return rev
+
+
+def _fix_origin(origin: Dict) -> Dict:
+ """Fix legacy origin with type which is no longer part of the model.
+
+ >>> from pprint import pprint
+ >>> pprint(_fix_origin({
+ ... 'url': 'http://foo',
+ ... }))
+ {'url': 'http://foo'}
+ >>> pprint(_fix_origin({
+ ... 'url': 'http://bar',
+ ... 'type': 'foo',
+ ... }))
+ {'url': 'http://bar'}
+
+ """
+ o = origin.copy()
+ o.pop("type", None)
+ return o
+
+
+def _fix_origin_visit(visit: Dict) -> Dict:
+ """Fix various legacy origin visit issues.
+
+ `visit['origin']` is a dict instead of an URL:
+
+ >>> from datetime import datetime, timezone
+ >>> from pprint import pprint
+ >>> date = datetime(2020, 2, 27, 14, 39, 19, tzinfo=timezone.utc)
+ >>> pprint(_fix_origin_visit({
+ ... 'origin': {'url': 'http://foo'},
+ ... 'date': date,
+ ... 'type': 'git',
+ ... 'status': 'ongoing',
+ ... 'snapshot': None,
+ ... }))
+ {'date': datetime.datetime(2020, 2, 27, 14, 39, 19, tzinfo=datetime.timezone.utc),
+ 'metadata': None,
+ 'origin': 'http://foo',
+ 'snapshot': None,
+ 'status': 'ongoing',
+ 'type': 'git'}
+
+ `visit['type']` is missing , but `origin['visit']['type']` exists:
+
+ >>> pprint(_fix_origin_visit(
+ ... {'origin': {'type': 'hg', 'url': 'http://foo'},
+ ... 'date': date,
+ ... 'status': 'ongoing',
+ ... 'snapshot': None,
+ ... }))
+ {'date': datetime.datetime(2020, 2, 27, 14, 39, 19, tzinfo=datetime.timezone.utc),
+ 'metadata': None,
+ 'origin': 'http://foo',
+ 'snapshot': None,
+ 'status': 'ongoing',
+ 'type': 'hg'}
+
+ Old visit format (origin_visit with no type) raises:
+
+ >>> _fix_origin_visit({
+ ... 'origin': {'url': 'http://foo'},
+ ... 'date': date,
+ ... 'status': 'ongoing',
+ ... 'snapshot': None
+ ... })
+ Traceback (most recent call last):
+ ...
+ ValueError: Old origin visit format detected...
+
+ >>> _fix_origin_visit({
+ ... 'origin': 'http://foo',
+ ... 'date': date,
+ ... 'status': 'ongoing',
+ ... 'snapshot': None
+ ... })
+ Traceback (most recent call last):
+ ...
+ ValueError: Old origin visit format detected...
+
+ """ # noqa
+ visit = visit.copy()
+ if "type" not in visit:
+ if isinstance(visit["origin"], dict) and "type" in visit["origin"]:
+ # Very old version of the schema: visits did not have a type,
+ # but their 'origin' field was a dict with a 'type' key.
+ visit["type"] = visit["origin"]["type"]
+ else:
+ # Very old schema version: 'type' is missing, stop early
+
+ # We expect the journal's origin_visit topic to no longer reference
+ # such visits. If it does, the replayer must crash so we can fix
+ # the journal's topic.
+ raise ValueError(f"Old origin visit format detected: {visit}")
+ if isinstance(visit["origin"], dict):
+ # Old version of the schema: visit['origin'] was a dict.
+ visit["origin"] = visit["origin"]["url"]
+ if "metadata" not in visit:
+ visit["metadata"] = None
+ return visit
+
+
+def fix_objects(object_type: str, objects: List[Dict]) -> List[Dict]:
+ """
+ Fix legacy objects from the journal to bring them up to date with the
+ latest storage schema.
+ """
+ if object_type == "content":
+ return [_fix_content(v) for v in objects]
+ elif object_type == "revision":
+ revisions = [_fix_revision(v) for v in objects]
+ return [rev for rev in revisions if rev is not None]
+ elif object_type == "origin":
+ return [_fix_origin(v) for v in objects]
+ elif object_type == "origin_visit":
+ return [_fix_origin_visit(v) for v in objects]
+ else:
+ return objects
diff --git a/swh/storage/replay.py b/swh/storage/replay.py
new file mode 100644
--- /dev/null
+++ b/swh/storage/replay.py
@@ -0,0 +1,128 @@
+# Copyright (C) 2019-2020 The Software Heritage developers
+# See the AUTHORS file at the top-level directory of this distribution
+# License: GNU General Public License version 3, or any later version
+# See top-level LICENSE file for more information
+
+import logging
+from typing import Any, Callable, Dict, Iterable, List
+
+try:
+ from systemd.daemon import notify
+except ImportError:
+ notify = None
+
+from swh.core.statsd import statsd
+from swh.storage.fixer import fix_objects
+
+from swh.model.model import (
+ BaseContent,
+ BaseModel,
+ Content,
+ Directory,
+ Origin,
+ OriginVisit,
+ Revision,
+ SkippedContent,
+ Snapshot,
+ Release,
+)
+from swh.storage.exc import HashCollision
+
+logger = logging.getLogger(__name__)
+
+GRAPH_OPERATIONS_METRIC = "swh_graph_replayer_operations_total"
+GRAPH_DURATION_METRIC = "swh_graph_replayer_duration_seconds"
+
+
+object_converter_fn: Dict[str, Callable[[Dict], BaseModel]] = {
+ "origin": Origin.from_dict,
+ "origin_visit": OriginVisit.from_dict,
+ "snapshot": Snapshot.from_dict,
+ "revision": Revision.from_dict,
+ "release": Release.from_dict,
+ "directory": Directory.from_dict,
+ "content": Content.from_dict,
+ "skipped_content": SkippedContent.from_dict,
+}
+
+
+def process_replay_objects(all_objects, *, storage):
+ for (object_type, objects) in all_objects.items():
+ logger.debug("Inserting %s %s objects", len(objects), object_type)
+ with statsd.timed(GRAPH_DURATION_METRIC, tags={"object_type": object_type}):
+ _insert_objects(object_type, objects, storage)
+ statsd.increment(
+ GRAPH_OPERATIONS_METRIC, len(objects), tags={"object_type": object_type}
+ )
+ if notify:
+ notify("WATCHDOG=1")
+
+
+def collision_aware_content_add(
+ content_add_fn: Callable[[Iterable[Any]], None], contents: List[BaseContent]
+) -> None:
+ """Add contents to storage. If a hash collision is detected, an error is
+ logged. Then this adds the other non colliding contents to the storage.
+
+ Args:
+ content_add_fn: Storage content callable
+ contents: List of contents or skipped contents to add to storage
+
+ """
+ if not contents:
+ return
+ colliding_content_hashes: List[Dict[str, Any]] = []
+ while True:
+ try:
+ content_add_fn(contents)
+ except HashCollision as e:
+ colliding_content_hashes.append(
+ {
+ "algo": e.algo,
+ "hash": e.hash_id, # hex hash id
+ "objects": e.colliding_contents, # hex hashes
+ }
+ )
+ colliding_hashes = e.colliding_content_hashes()
+ # Drop the colliding contents from the transaction
+ contents = [c for c in contents if c.hashes() not in colliding_hashes]
+ else:
+ # Successfully added contents, we are done
+ break
+ if colliding_content_hashes:
+ for collision in colliding_content_hashes:
+ logger.error("Collision detected: %(collision)s", {"collision": collision})
+
+
+def _insert_objects(object_type: str, objects: List[Dict], storage) -> None:
+ """Insert objects of type object_type in the storage.
+
+ """
+ objects = fix_objects(object_type, objects)
+
+ if object_type == "content":
+ contents: List[BaseContent] = []
+ skipped_contents: List[BaseContent] = []
+ for content in objects:
+ c = BaseContent.from_dict(content)
+ if isinstance(c, SkippedContent):
+ skipped_contents.append(c)
+ else:
+ contents.append(c)
+
+ collision_aware_content_add(storage.skipped_content_add, skipped_contents)
+ collision_aware_content_add(storage.content_add_metadata, contents)
+ elif object_type == "origin_visit":
+ visits: List[OriginVisit] = []
+ origins: List[Origin] = []
+ for obj in objects:
+ visit = OriginVisit.from_dict(obj)
+ visits.append(visit)
+ origins.append(Origin(url=visit.origin))
+ storage.origin_add(origins)
+ storage.origin_visit_upsert(visits)
+ elif object_type in ("directory", "revision", "release", "snapshot", "origin"):
+ method = getattr(storage, object_type + "_add")
+ method(object_converter_fn[object_type](o) for o in objects)
+ else:
+ logger.warning("Received a series of %s, this should not happen", object_type)
diff --git a/swh/storage/tests/conftest.py b/swh/storage/tests/conftest.py
--- a/swh/storage/tests/conftest.py
+++ b/swh/storage/tests/conftest.py
@@ -19,7 +19,6 @@
import swh.storage
from swh.core.utils import numfile_sortkey as sortkey
-
from swh.model.tests.generate_testdata import gen_contents, gen_origins
from swh.model.model import (
Content,
@@ -45,7 +44,6 @@
"snapshot": Snapshot.from_dict,
}
-
SQL_DIR = path.join(path.dirname(swh.storage.__file__), "sql")
environ["LC_ALL"] = "C.UTF-8"
diff --git a/swh/storage/tests/test_replay.py b/swh/storage/tests/test_replay.py
new file mode 100644
--- /dev/null
+++ b/swh/storage/tests/test_replay.py
@@ -0,0 +1,155 @@
+# Copyright (C) 2019-2020 The Software Heritage developers
+# See the AUTHORS file at the top-level directory of this distribution
+# License: GNU General Public License version 3, or any later version
+# See top-level LICENSE file for more information
+
+import datetime
+import functools
+
+from typing import Dict, List
+
+import pytest
+
+from swh.storage import get_storage
+
+from swh.storage.replay import process_replay_objects
+
+from swh.journal.tests.utils import MockedJournalClient, MockedKafkaWriter
+
+
+storage_config = {"cls": "pipeline", "steps": [{"cls": "memory"},]}
+
+
+def make_topic(kafka_prefix: str, object_type: str) -> str:
+ return kafka_prefix + "." + object_type
+
+
+def _test_write_replay_origin_visit(visits: List[Dict]):
+ """Helper function to write tests for origin_visit.
+
+ Each visit (a dict) given in the 'visits' argument will be sent to
+ a (mocked) kafka queue, which a in-memory-storage backed replayer is
+ listening to.
+
+ Check that corresponding origin visits entities are present in the storage
+ and have correct values if they are not skipped.
+
+ """
+ queue: List = []
+ replayer = MockedJournalClient(queue)
+ writer = MockedKafkaWriter(queue)
+
+ # Note that flipping the order of these two insertions will crash
+ # the test, because the legacy origin_format does not allow to create
+ # the origin when needed (type is missing)
+ writer.send(
+ "origin",
+ "foo",
+ {
+ "url": "http://example.com/",
+ "type": "git", # test the legacy origin format is accepted
+ },
+ )
+ for visit in visits:
+ writer.send("origin_visit", "foo", visit)
+
+ queue_size = len(queue)
+ assert replayer.stop_after_objects is None
+ replayer.stop_after_objects = queue_size
+
+ storage = get_storage(**storage_config)
+ worker_fn = functools.partial(process_replay_objects, storage=storage)
+
+ replayer.process(worker_fn)
+
+ actual_visits = list(storage.origin_visit_get("http://example.com/"))
+
+ assert len(actual_visits) == len(visits), actual_visits
+
+ for vin, vout in zip(visits, actual_visits):
+ vin = vin.copy()
+ vout = vout.copy()
+ assert vout.pop("origin") == "http://example.com/"
+ vin.pop("origin")
+ vin.setdefault("type", "git")
+ vin.setdefault("metadata", None)
+ assert vin == vout
+
+
+def test_write_replay_origin_visit():
+ """Test origin_visit when the 'origin' is just a string."""
+ now = datetime.datetime.now()
+ visits = [
+ {
+ "visit": 1,
+ "origin": "http://example.com/",
+ "date": now,
+ "type": "git",
+ "status": "partial",
+ "snapshot": None,
+ }
+ ]
+ _test_write_replay_origin_visit(visits)
+
+
+def test_write_replay_legacy_origin_visit1():
+ """Origin_visit with no types should make the replayer crash
+
+ We expect the journal's origin_visit topic to no longer reference such
+ visits. If it does, the replayer must crash so we can fix the journal's
+ topic.
+
+ """
+ now = datetime.datetime.now()
+ visit = {
+ "visit": 1,
+ "origin": "http://example.com/",
+ "date": now,
+ "status": "partial",
+ "snapshot": None,
+ }
+ now2 = datetime.datetime.now()
+ visit2 = {
+ "visit": 2,
+ "origin": {"url": "http://example.com/"},
+ "date": now2,
+ "status": "partial",
+ "snapshot": None,
+ }
+
+ for origin_visit in [visit, visit2]:
+ with pytest.raises(ValueError, match="Old origin visit format"):
+ _test_write_replay_origin_visit([origin_visit])
+
+
+def test_write_replay_legacy_origin_visit2():
+ """Test origin_visit when 'type' is missing from the visit, but not
+ from the origin."""
+ now = datetime.datetime.now()
+ visits = [
+ {
+ "visit": 1,
+ "origin": {"url": "http://example.com/", "type": "git",},
+ "date": now,
+ "type": "git",
+ "status": "partial",
+ "snapshot": None,
+ }
+ ]
+ _test_write_replay_origin_visit(visits)
+
+
+def test_write_replay_legacy_origin_visit3():
+ """Test origin_visit when the origin is a dict"""
+ now = datetime.datetime.now()
+ visits = [
+ {
+ "visit": 1,
+ "origin": {"url": "http://example.com/",},
+ "date": now,
+ "type": "git",
+ "status": "partial",
+ "snapshot": None,
+ }
+ ]
+ _test_write_replay_origin_visit(visits)
diff --git a/swh/storage/tests/test_write_replay.py b/swh/storage/tests/test_write_replay.py
new file mode 100644
--- /dev/null
+++ b/swh/storage/tests/test_write_replay.py
@@ -0,0 +1,166 @@
+# Copyright (C) 2019-2020 The Software Heritage developers
+# See the AUTHORS file at the top-level directory of this distribution
+# License: GNU General Public License version 3, or any later version
+# See top-level LICENSE file for more information
+
+import functools
+from unittest.mock import patch
+
+import attr
+from hypothesis import given, settings, HealthCheck
+from hypothesis.strategies import lists
+
+from swh.model.hypothesis_strategies import present_contents
+from swh.model.hypothesis_strategies import object_dicts
+from swh.model.model import Origin
+from swh.storage import get_storage
+from swh.storage.exc import HashCollision
+
+from swh.journal.replay import (
+ process_replay_objects,
+ process_replay_objects_content,
+ object_converter_fn,
+)
+
+from swh.journal.tests.utils import MockedJournalClient, MockedKafkaWriter
+
+
+storage_config = {
+ "cls": "memory",
+ "journal_writer": {"cls": "memory"},
+}
+
+
+def empty_person_name_email(rev_or_rel):
+ """Empties the 'name' and 'email' fields of the author/committer fields
+ of a revision or release; leaving only the fullname."""
+ if getattr(rev_or_rel, "author", None):
+ rev_or_rel = attr.evolve(
+ rev_or_rel, author=attr.evolve(rev_or_rel.author, name=b"", email=b"",)
+ )
+
+ if getattr(rev_or_rel, "committer", None):
+ rev_or_rel = attr.evolve(
+ rev_or_rel,
+ committer=attr.evolve(rev_or_rel.committer, name=b"", email=b"",),
+ )
+
+ return rev_or_rel
+
+
+@given(lists(object_dicts(), min_size=1))
+@settings(suppress_health_check=[HealthCheck.too_slow])
+def test_write_replay_same_order_batches(objects):
+ queue = []
+ replayer = MockedJournalClient(queue)
+
+ with patch(
+ "swh.journal.writer.inmemory.InMemoryJournalWriter",
+ return_value=MockedKafkaWriter(queue),
+ ):
+ storage1 = get_storage(**storage_config)
+
+ # Write objects to storage1
+ for (obj_type, obj) in objects:
+ if obj_type == "content" and obj.get("status") == "absent":
+ obj_type = "skipped_content"
+
+ obj = object_converter_fn[obj_type](obj)
+
+ if obj_type == "origin_visit":
+ storage1.origin_add_one(Origin(url=obj.origin))
+ storage1.origin_visit_upsert([obj])
+ else:
+ method = getattr(storage1, obj_type + "_add")
+ try:
+ method([obj])
+ except HashCollision:
+ pass
+
+ # Bail out early if we didn't insert any relevant objects...
+ queue_size = len(queue)
+ assert queue_size != 0, "No test objects found; hypothesis strategy bug?"
+
+ assert replayer.stop_after_objects is None
+ replayer.stop_after_objects = queue_size
+
+ storage2 = get_storage(**storage_config)
+ worker_fn = functools.partial(process_replay_objects, storage=storage2)
+
+ replayer.process(worker_fn)
+
+ assert replayer.consumer.committed
+
+ for attr_name in (
+ "_contents",
+ "_directories",
+ "_snapshots",
+ "_origin_visits",
+ "_origins",
+ ):
+ assert getattr(storage1, attr_name) == getattr(storage2, attr_name), attr_name
+
+ # When hypothesis generates a revision and a release with same
+ # author (or committer) fullname but different name or email, then
+ # the storage will use the first name/email it sees.
+ # This first one will be either the one from the revision or the release,
+ # and since there is no order guarantees, storage2 has 1/2 chance of
+ # not seeing the same order as storage1, therefore we need to strip
+ # them out before comparing.
+ for attr_name in ("_revisions", "_releases"):
+ items1 = {
+ k: empty_person_name_email(v)
+ for (k, v) in getattr(storage1, attr_name).items()
+ }
+ items2 = {
+ k: empty_person_name_email(v)
+ for (k, v) in getattr(storage2, attr_name).items()
+ }
+ assert items1 == items2, attr_name
+
+
+# TODO: add test for hash collision
+
+
+@given(lists(present_contents(), min_size=1))
+@settings(suppress_health_check=[HealthCheck.too_slow])
+def test_write_replay_content(objects):
+
+ queue = []
+ replayer = MockedJournalClient(queue)
+
+ with patch(
+ "swh.journal.writer.inmemory.InMemoryJournalWriter",
+ return_value=MockedKafkaWriter(queue),
+ ):
+ storage1 = get_storage(**storage_config)
+
+ contents = []
+ for obj in objects:
+ storage1.content_add([obj])
+ contents.append(obj)
+
+ # Bail out early if we didn't insert any relevant objects...
+ queue_size = len(queue)
+ assert queue_size != 0, "No test objects found; hypothesis strategy bug?"
+
+ assert replayer.stop_after_objects is None
+ replayer.stop_after_objects = queue_size
+
+ storage2 = get_storage(**storage_config)
+
+ objstorage1 = storage1.objstorage.objstorage
+ objstorage2 = storage2.objstorage.objstorage
+
+ worker_fn = functools.partial(
+ process_replay_objects_content, src=objstorage1, dst=objstorage2
+ )
+
+ replayer.process(worker_fn)
+
+ # only content with status visible will be copied in storage2
+ expected_objstorage_state = {
+ c.sha1: c.data for c in contents if c.status == "visible"
+ }
+
+ assert expected_objstorage_state == objstorage2.state
File Metadata
Details
Attached
Mime Type
text/plain
Expires
Dec 21 2024, 1:37 AM (11 w, 4 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3218937
Attached To
D3010: Copy the graph replayer component from swh-journal
Event Timeline
Log In to Comment