diff --git a/swh/journal/backfill.py b/swh/journal/backfill.py deleted file mode 100644 --- a/swh/journal/backfill.py +++ /dev/null @@ -1,489 +0,0 @@ -# Copyright (C) 2017-2019 The Software Heritage developers -# See the AUTHORS file at the top-level directory of this distribution -# License: GNU General Public License version 3, or any later version -# See top-level LICENSE file for more information - -"""Module defining journal backfiller classes. - -Those backfiller goal is to produce back part or all of the objects -from the storage to the journal topics - -At the moment, a first naive implementation is the -JournalBackfiller. It simply reads the objects from the -storage and sends every object identifier back to the journal. - -""" - -import logging - -from .writer.kafka import KafkaJournalWriter - -from swh.core.db import BaseDb -from swh.storage.converters import db_to_release, db_to_revision - - -logger = logging.getLogger(__name__) - -PARTITION_KEY = { - "content": "sha1", - "skipped_content": None, # unused - "directory": "id", - "revision": "revision.id", - "release": "release.id", - "snapshot": "id", - "origin": "id", - "origin_visit": "origin_visit.origin", -} - -COLUMNS = { - "content": [ - "sha1", - "sha1_git", - "sha256", - "blake2s256", - "length", - "status", - "ctime", - ], - "skipped_content": [ - "sha1", - "sha1_git", - "sha256", - "blake2s256", - "length", - "ctime", - "status", - "reason", - ], - "directory": ["id", "dir_entries", "file_entries", "rev_entries"], - "revision": [ - ("revision.id", "id"), - "date", - "date_offset", - "committer_date", - "committer_date_offset", - "type", - "directory", - "message", - "synthetic", - "metadata", - "date_neg_utc_offset", - "committer_date_neg_utc_offset", - ( - "array(select parent_id::bytea from revision_history rh " - "where rh.id = revision.id order by rh.parent_rank asc)", - "parents", - ), - ("a.id", "author_id"), - ("a.name", "author_name"), - ("a.email", "author_email"), - ("a.fullname", "author_fullname"), - ("c.id", "committer_id"), - ("c.name", "committer_name"), - ("c.email", "committer_email"), - ("c.fullname", "committer_fullname"), - ], - "release": [ - ("release.id", "id"), - "date", - "date_offset", - "comment", - ("release.name", "name"), - "synthetic", - "date_neg_utc_offset", - "target", - "target_type", - ("a.id", "author_id"), - ("a.name", "author_name"), - ("a.email", "author_email"), - ("a.fullname", "author_fullname"), - ], - "snapshot": ["id", "object_id"], - "origin": ["type", "url"], - "origin_visit": [ - "visit", - "origin.type", - "origin_visit.type", - "url", - "date", - "snapshot", - "status", - "metadata", - ], -} - - -JOINS = { - "release": ["person a on release.author=a.id"], - "revision": [ - "person a on revision.author=a.id", - "person c on revision.committer=c.id", - ], - "origin_visit": ["origin on origin_visit.origin=origin.id"], -} - - -def directory_converter(db, directory): - """Convert directory from the flat representation to swh model - compatible objects. - - """ - columns = ["target", "name", "perms"] - query_template = """ - select %(columns)s - from directory_entry_%(type)s - where id in %%s - """ - - types = ["file", "dir", "rev"] - - entries = [] - with db.cursor() as cur: - for type in types: - ids = directory.pop("%s_entries" % type) - if not ids: - continue - query = query_template % { - "columns": ",".join(columns), - "type": type, - } - cur.execute(query, (tuple(ids),)) - for row in cur: - entry = dict(zip(columns, row)) - entry["type"] = type - entries.append(entry) - - directory["entries"] = entries - return directory - - -def revision_converter(db, revision): - """Convert revision from the flat representation to swh model - compatible objects. - - """ - return db_to_revision(revision) - - -def release_converter(db, release): - """Convert release from the flat representation to swh model - compatible objects. - - """ - release = db_to_release(release) - if "author" in release and release["author"]: - del release["author"]["id"] - return release - - -def snapshot_converter(db, snapshot): - """Convert snapshot from the flat representation to swh model - compatible objects. - - """ - columns = ["name", "target", "target_type"] - query = """ - select %s - from snapshot_branches sbs - inner join snapshot_branch sb on sb.object_id=sbs.branch_id - where sbs.snapshot_id=%%s - """ % ", ".join( - columns - ) - with db.cursor() as cur: - cur.execute(query, (snapshot.pop("object_id"),)) - branches = {} - for name, *row in cur: - branch = dict(zip(columns[1:], row)) - if not branch["target"] and not branch["target_type"]: - branch = None - branches[name] = branch - - snapshot["branches"] = branches - return snapshot - - -def origin_visit_converter(db, origin_visit): - origin = { - "type": origin_visit.pop("origin.type"), - "url": origin_visit.pop("url"), - } - origin_visit["origin"] = origin - origin_visit["type"] = origin_visit.pop("origin_visit.type") - return origin_visit - - -CONVERTERS = { - "directory": directory_converter, - "revision": revision_converter, - "release": release_converter, - "snapshot": snapshot_converter, - "origin_visit": origin_visit_converter, -} - - -def object_to_offset(object_id, numbits): - """Compute the index of the range containing object id, when dividing - space into 2^numbits. - - Args: - object_id (str): The hex representation of object_id - numbits (int): Number of bits in which we divide input space - - Returns: - The index of the range containing object id - - """ - q, r = divmod(numbits, 8) - length = q + (r != 0) - shift_bits = 8 - r if r else 0 - - truncated_id = object_id[: length * 2] - if len(truncated_id) < length * 2: - truncated_id += "0" * (length * 2 - len(truncated_id)) - - truncated_id_bytes = bytes.fromhex(truncated_id) - return int.from_bytes(truncated_id_bytes, byteorder="big") >> shift_bits - - -def byte_ranges(numbits, start_object=None, end_object=None): - """Generate start/end pairs of bytes spanning numbits bits and - constrained by optional start_object and end_object. - - Args: - numbits (int): Number of bits in which we divide input space - start_object (str): Hex object id contained in the first range - returned - end_object (str): Hex object id contained in the last range - returned - - Yields: - 2^numbits pairs of bytes - - """ - q, r = divmod(numbits, 8) - length = q + (r != 0) - shift_bits = 8 - r if r else 0 - - def to_bytes(i): - return int.to_bytes(i << shift_bits, length=length, byteorder="big") - - start_offset = 0 - end_offset = 1 << numbits - - if start_object is not None: - start_offset = object_to_offset(start_object, numbits) - if end_object is not None: - end_offset = object_to_offset(end_object, numbits) + 1 - - for start in range(start_offset, end_offset): - end = start + 1 - - if start == 0: - yield None, to_bytes(end) - elif end == 1 << numbits: - yield to_bytes(start), None - else: - yield to_bytes(start), to_bytes(end) - - -def integer_ranges(start, end, block_size=1000): - for start in range(start, end, block_size): - if start == 0: - yield None, block_size - elif start + block_size > end: - yield start, end - else: - yield start, start + block_size - - -RANGE_GENERATORS = { - "content": lambda start, end: byte_ranges(24, start, end), - "skipped_content": lambda start, end: [(None, None)], - "directory": lambda start, end: byte_ranges(24, start, end), - "revision": lambda start, end: byte_ranges(24, start, end), - "release": lambda start, end: byte_ranges(16, start, end), - "snapshot": lambda start, end: byte_ranges(16, start, end), - "origin": integer_ranges, - "origin_visit": integer_ranges, -} - - -def compute_query(obj_type, start, end): - columns = COLUMNS.get(obj_type) - join_specs = JOINS.get(obj_type, []) - join_clause = "\n".join("left join %s" % clause for clause in join_specs) - - where = [] - where_args = [] - if start: - where.append("%(keys)s >= %%s") - where_args.append(start) - if end: - where.append("%(keys)s < %%s") - where_args.append(end) - - where_clause = "" - if where: - where_clause = ("where " + " and ".join(where)) % { - "keys": "(%s)" % PARTITION_KEY[obj_type] - } - - column_specs = [] - column_aliases = [] - for column in columns: - if isinstance(column, str): - column_specs.append(column) - column_aliases.append(column) - else: - column_specs.append("%s as %s" % column) - column_aliases.append(column[1]) - - query = """ -select %(columns)s -from %(table)s -%(join)s -%(where)s - """ % { - "columns": ",".join(column_specs), - "table": obj_type, - "join": join_clause, - "where": where_clause, - } - - return query, where_args, column_aliases - - -def fetch(db, obj_type, start, end): - """Fetch all obj_type's identifiers from db. - - This opens one connection, stream objects and when done, close - the connection. - - Args: - db (BaseDb): Db connection object - obj_type (str): Object type - start (Union[bytes|Tuple]): Range start identifier - end (Union[bytes|Tuple]): Range end identifier - - Raises: - ValueError if obj_type is not supported - - Yields: - Objects in the given range - - """ - query, where_args, column_aliases = compute_query(obj_type, start, end) - converter = CONVERTERS.get(obj_type) - with db.cursor() as cursor: - logger.debug("Fetching data for table %s", obj_type) - logger.debug("query: %s %s", query, where_args) - cursor.execute(query, where_args) - for row in cursor: - record = dict(zip(column_aliases, row)) - if converter: - record = converter(db, record) - - logger.debug("record: %s" % record) - yield record - - -def _format_range_bound(bound): - if isinstance(bound, bytes): - return bound.hex() - else: - return str(bound) - - -MANDATORY_KEYS = ["brokers", "storage_dbconn", "prefix", "client_id"] - - -class JournalBackfiller: - """Class in charge of reading the storage's objects and sends those - back to the journal's topics. - - This is designed to be run periodically. - - """ - - def __init__(self, config=None): - self.config = config - self.check_config(config) - - def check_config(self, config): - missing_keys = [] - for key in MANDATORY_KEYS: - if not config.get(key): - missing_keys.append(key) - - if missing_keys: - raise ValueError( - "Configuration error: The following keys must be" - " provided: %s" % (",".join(missing_keys),) - ) - - def parse_arguments(self, object_type, start_object, end_object): - """Parse arguments - - Raises: - ValueError for unsupported object type - ValueError if object ids are not parseable - - Returns: - Parsed start and end object ids - - """ - if object_type not in COLUMNS: - raise ValueError( - "Object type %s is not supported. " - "The only possible values are %s" - % (object_type, ", ".join(COLUMNS.keys())) - ) - - if object_type in ["origin", "origin_visit"]: - if start_object: - start_object = int(start_object) - else: - start_object = 0 - if end_object: - end_object = int(end_object) - else: - end_object = 100 * 1000 * 1000 # hard-coded limit - - return start_object, end_object - - def run(self, object_type, start_object, end_object, dry_run=False): - """Reads storage's subscribed object types and send them to the - journal's reading topic. - - """ - start_object, end_object = self.parse_arguments( - object_type, start_object, end_object - ) - - db = BaseDb.connect(self.config["storage_dbconn"]) - writer = KafkaJournalWriter( - brokers=self.config["brokers"], - prefix=self.config["prefix"], - client_id=self.config["client_id"], - ) - for range_start, range_end in RANGE_GENERATORS[object_type]( - start_object, end_object - ): - logger.info( - "Processing %s range %s to %s", - object_type, - _format_range_bound(range_start), - _format_range_bound(range_end), - ) - - for obj in fetch(db, object_type, start=range_start, end=range_end,): - if dry_run: - continue - writer.write_addition(object_type=object_type, object_=obj) - - writer.producer.flush() - - -if __name__ == "__main__": - print('Please use the "swh-journal backfiller run" command') diff --git a/swh/journal/cli.py b/swh/journal/cli.py --- a/swh/journal/cli.py +++ b/swh/journal/cli.py @@ -19,14 +19,11 @@ from swh.core import config from swh.core.cli import CONTEXT_SETTINGS from swh.model.model import SHA1_SIZE -from swh.storage import get_storage from swh.objstorage import get_objstorage from swh.journal.client import get_journal_client as get_client from swh.journal.replay import is_hash_in_bytearray -from swh.journal.replay import process_replay_objects from swh.journal.replay import process_replay_objects_content -from swh.journal.backfill import JournalBackfiller @click.group(name="journal", context_settings=CONTEXT_SETTINGS) @@ -89,33 +86,11 @@ ) @click.pass_context def replay(ctx, stop_after_objects): - """Fill a Storage by reading a Journal. + """DEPRECATED: use `swh storage replay` instead. - There can be several 'replayers' filling a Storage as long as they use - the same `group-id`. + Requires swh.storage >= 0.0.188. """ - conf = ctx.obj["config"] - try: - storage = get_storage(**conf.pop("storage")) - except KeyError: - ctx.fail("You must have a storage configured in your config file.") - - client = get_journal_client(ctx, stop_after_objects=stop_after_objects) - worker_fn = functools.partial(process_replay_objects, storage=storage) - - if notify: - notify("READY=1") - - try: - client.process(worker_fn) - except KeyboardInterrupt: - ctx.exit(0) - else: - print("Done.") - finally: - if notify: - notify("STOPPING=1") - client.close() + ctx.fail("DEPRECATED") @cli.command() @@ -125,39 +100,12 @@ @click.option("--dry-run", is_flag=True, default=False) @click.pass_context def backfiller(ctx, object_type, start_object, end_object, dry_run): - """Run the backfiller - - The backfiller list objects from a Storage and produce journal entries from - there. - - Typically used to rebuild a journal or compensate for missing objects in a - journal (eg. due to a downtime of this later). + """DEPRECATED: use `swh storage backfill` instead. - The configuration file requires the following entries: - - brokers: a list of kafka endpoints (the journal) in which entries will be - added. - - storage_dbconn: URL to connect to the storage DB. - - prefix: the prefix of the topics (topics will be .). - - client_id: the kafka client ID. + Requires swh.storage >= 0.0.188. """ - conf = ctx.obj["config"] - backfiller = JournalBackfiller(conf) - - if notify: - notify("READY=1") - - try: - backfiller.run( - object_type=object_type, - start_object=start_object, - end_object=end_object, - dry_run=dry_run, - ) - except KeyboardInterrupt: - if notify: - notify("STOPPING=1") - ctx.exit(0) + ctx.fail("DEPRECATED") @cli.command("content-replay") diff --git a/swh/journal/direct_writer.py b/swh/journal/direct_writer.py deleted file mode 100644 --- a/swh/journal/direct_writer.py +++ /dev/null @@ -1,7 +0,0 @@ -# Copyright (C) 2019 The Software Heritage developers -# See the AUTHORS file at the top-level directory of this distribution -# License: GNU General Public License version 3, or any later version -# See top-level LICENSE file for more information - -# for BW compat -from .writer.kafka import KafkaJournalWriter as DirectKafkaWriter # noqa diff --git a/swh/journal/fixer.py b/swh/journal/fixer.py deleted file mode 100644 --- a/swh/journal/fixer.py +++ /dev/null @@ -1,293 +0,0 @@ -import copy -import logging -from typing import Any, Dict, List, Optional -from swh.model.identifiers import normalize_timestamp - -logger = logging.getLogger(__name__) - - -def _fix_content(content: Dict[str, Any]) -> Dict[str, Any]: - """Filters-out invalid 'perms' key that leaked from swh.model.from_disk - to the journal. - - >>> _fix_content({'perms': 0o100644, 'sha1_git': b'foo'}) - {'sha1_git': b'foo'} - - >>> _fix_content({'sha1_git': b'bar'}) - {'sha1_git': b'bar'} - - """ - content = content.copy() - content.pop("perms", None) - return content - - -def _fix_revision_pypi_empty_string(rev): - """PyPI loader failed to encode empty strings as bytes, see: - swh:1:rev:8f0095ee0664867055d03de9bcc8f95b91d8a2b9 - or https://forge.softwareheritage.org/D1772 - """ - rev = { - **rev, - "author": rev["author"].copy(), - "committer": rev["committer"].copy(), - } - if rev["author"].get("email") == "": - rev["author"]["email"] = b"" - if rev["author"].get("name") == "": - rev["author"]["name"] = b"" - if rev["committer"].get("email") == "": - rev["committer"]["email"] = b"" - if rev["committer"].get("name") == "": - rev["committer"]["name"] = b"" - return rev - - -def _fix_revision_transplant_source(rev): - if rev.get("metadata") and rev["metadata"].get("extra_headers"): - rev = copy.deepcopy(rev) - rev["metadata"]["extra_headers"] = [ - [key, value.encode("ascii")] - if key == "transplant_source" and isinstance(value, str) - else [key, value] - for (key, value) in rev["metadata"]["extra_headers"] - ] - return rev - - -def _check_date(date): - """Returns whether the date can be represented in backends with sane - limits on timestamps and timezones (resp. signed 64-bits and - signed 16 bits), and that microseconds is valid (ie. between 0 and 10^6). - """ - if date is None: - return True - date = normalize_timestamp(date) - return ( - (-(2 ** 63) <= date["timestamp"]["seconds"] < 2 ** 63) - and (0 <= date["timestamp"]["microseconds"] < 10 ** 6) - and (-(2 ** 15) <= date["offset"] < 2 ** 15) - ) - - -def _check_revision_date(rev): - """Exclude revisions with invalid dates. - See https://forge.softwareheritage.org/T1339""" - return _check_date(rev["date"]) and _check_date(rev["committer_date"]) - - -def _fix_revision(revision: Dict[str, Any]) -> Optional[Dict]: - """Fix various legacy revision issues. - - Fix author/committer person: - - >>> from pprint import pprint - >>> date = { - ... 'timestamp': { - ... 'seconds': 1565096932, - ... 'microseconds': 0, - ... }, - ... 'offset': 0, - ... } - >>> rev0 = _fix_revision({ - ... 'id': b'rev-id', - ... 'author': {'fullname': b'', 'name': '', 'email': ''}, - ... 'committer': {'fullname': b'', 'name': '', 'email': ''}, - ... 'date': date, - ... 'committer_date': date, - ... 'type': 'git', - ... 'message': '', - ... 'directory': b'dir-id', - ... 'synthetic': False, - ... }) - >>> rev0['author'] - {'fullname': b'', 'name': b'', 'email': b''} - >>> rev0['committer'] - {'fullname': b'', 'name': b'', 'email': b''} - - Fix type of 'transplant_source' extra headers: - - >>> rev1 = _fix_revision({ - ... 'id': b'rev-id', - ... 'author': {'fullname': b'', 'name': '', 'email': ''}, - ... 'committer': {'fullname': b'', 'name': '', 'email': ''}, - ... 'date': date, - ... 'committer_date': date, - ... 'metadata': { - ... 'extra_headers': [ - ... ['time_offset_seconds', b'-3600'], - ... ['transplant_source', '29c154a012a70f49df983625090434587622b39e'] - ... ]}, - ... 'type': 'git', - ... 'message': '', - ... 'directory': b'dir-id', - ... 'synthetic': False, - ... }) - >>> pprint(rev1['metadata']['extra_headers']) - [['time_offset_seconds', b'-3600'], - ['transplant_source', b'29c154a012a70f49df983625090434587622b39e']] - - Revision with invalid date are filtered: - - >>> from copy import deepcopy - >>> invalid_date1 = deepcopy(date) - >>> invalid_date1['timestamp']['microseconds'] = 1000000000 # > 10^6 - >>> rev = _fix_revision({ - ... 'author': {'fullname': b'', 'name': '', 'email': ''}, - ... 'committer': {'fullname': b'', 'name': '', 'email': ''}, - ... 'date': invalid_date1, - ... 'committer_date': date, - ... }) - >>> rev is None - True - - >>> invalid_date2 = deepcopy(date) - >>> invalid_date2['timestamp']['seconds'] = 2**70 # > 10^63 - >>> rev = _fix_revision({ - ... 'author': {'fullname': b'', 'name': '', 'email': ''}, - ... 'committer': {'fullname': b'', 'name': '', 'email': ''}, - ... 'date': invalid_date2, - ... 'committer_date': date, - ... }) - >>> rev is None - True - - >>> invalid_date3 = deepcopy(date) - >>> invalid_date3['offset'] = 2**20 # > 10^15 - >>> rev = _fix_revision({ - ... 'author': {'fullname': b'', 'name': '', 'email': ''}, - ... 'committer': {'fullname': b'', 'name': '', 'email': ''}, - ... 'date': date, - ... 'committer_date': invalid_date3, - ... }) - >>> rev is None - True - - """ # noqa - rev = _fix_revision_pypi_empty_string(revision) - rev = _fix_revision_transplant_source(rev) - if not _check_revision_date(rev): - logger.warning( - "Invalid revision date detected: %(revision)s", {"revision": rev} - ) - return None - return rev - - -def _fix_origin(origin: Dict) -> Dict: - """Fix legacy origin with type which is no longer part of the model. - - >>> from pprint import pprint - >>> pprint(_fix_origin({ - ... 'url': 'http://foo', - ... })) - {'url': 'http://foo'} - >>> pprint(_fix_origin({ - ... 'url': 'http://bar', - ... 'type': 'foo', - ... })) - {'url': 'http://bar'} - - """ - o = origin.copy() - o.pop("type", None) - return o - - -def _fix_origin_visit(visit: Dict) -> Dict: - """Fix various legacy origin visit issues. - - `visit['origin']` is a dict instead of an URL: - - >>> from datetime import datetime, timezone - >>> from pprint import pprint - >>> date = datetime(2020, 2, 27, 14, 39, 19, tzinfo=timezone.utc) - >>> pprint(_fix_origin_visit({ - ... 'origin': {'url': 'http://foo'}, - ... 'date': date, - ... 'type': 'git', - ... 'status': 'ongoing', - ... 'snapshot': None, - ... })) - {'date': datetime.datetime(2020, 2, 27, 14, 39, 19, tzinfo=datetime.timezone.utc), - 'metadata': None, - 'origin': 'http://foo', - 'snapshot': None, - 'status': 'ongoing', - 'type': 'git'} - - `visit['type']` is missing , but `origin['visit']['type']` exists: - - >>> pprint(_fix_origin_visit( - ... {'origin': {'type': 'hg', 'url': 'http://foo'}, - ... 'date': date, - ... 'status': 'ongoing', - ... 'snapshot': None, - ... })) - {'date': datetime.datetime(2020, 2, 27, 14, 39, 19, tzinfo=datetime.timezone.utc), - 'metadata': None, - 'origin': 'http://foo', - 'snapshot': None, - 'status': 'ongoing', - 'type': 'hg'} - - Old visit format (origin_visit with no type) raises: - - >>> _fix_origin_visit({ - ... 'origin': {'url': 'http://foo'}, - ... 'date': date, - ... 'status': 'ongoing', - ... 'snapshot': None - ... }) - Traceback (most recent call last): - ... - ValueError: Old origin visit format detected... - - >>> _fix_origin_visit({ - ... 'origin': 'http://foo', - ... 'date': date, - ... 'status': 'ongoing', - ... 'snapshot': None - ... }) - Traceback (most recent call last): - ... - ValueError: Old origin visit format detected... - - """ # noqa - visit = visit.copy() - if "type" not in visit: - if isinstance(visit["origin"], dict) and "type" in visit["origin"]: - # Very old version of the schema: visits did not have a type, - # but their 'origin' field was a dict with a 'type' key. - visit["type"] = visit["origin"]["type"] - else: - # Very old schema version: 'type' is missing, stop early - - # We expect the journal's origin_visit topic to no longer reference - # such visits. If it does, the replayer must crash so we can fix - # the journal's topic. - raise ValueError(f"Old origin visit format detected: {visit}") - if isinstance(visit["origin"], dict): - # Old version of the schema: visit['origin'] was a dict. - visit["origin"] = visit["origin"]["url"] - if "metadata" not in visit: - visit["metadata"] = None - return visit - - -def fix_objects(object_type: str, objects: List[Dict]) -> List[Dict]: - """ - Fix legacy objects from the journal to bring them up to date with the - latest storage schema. - """ - if object_type == "content": - return [_fix_content(v) for v in objects] - elif object_type == "revision": - revisions = [_fix_revision(v) for v in objects] - return [rev for rev in revisions if rev is not None] - elif object_type == "origin": - return [_fix_origin(v) for v in objects] - elif object_type == "origin_visit": - return [_fix_origin_visit(v) for v in objects] - else: - return objects diff --git a/swh/journal/replay.py b/swh/journal/replay.py --- a/swh/journal/replay.py +++ b/swh/journal/replay.py @@ -5,7 +5,7 @@ import logging from time import time -from typing import Any, Callable, Dict, Iterable, List, Optional +from typing import Callable, Dict, List, Optional from sentry_sdk import capture_exception, push_scope @@ -22,133 +22,23 @@ ) from swh.core.statsd import statsd -from swh.journal.fixer import fix_objects from swh.model.hashutil import hash_to_hex +from swh.model.model import SHA1_SIZE -from swh.model.model import ( - BaseContent, - BaseModel, - Content, - Directory, - Origin, - OriginVisit, - Revision, - SHA1_SIZE, - SkippedContent, - Snapshot, - Release, -) from swh.objstorage.objstorage import ( ID_HASH_ALGO, ObjNotFoundError, ObjStorage, ) -from swh.storage.exc import HashCollision logger = logging.getLogger(__name__) -GRAPH_OPERATIONS_METRIC = "swh_graph_replayer_operations_total" -GRAPH_DURATION_METRIC = "swh_graph_replayer_duration_seconds" CONTENT_OPERATIONS_METRIC = "swh_content_replayer_operations_total" CONTENT_RETRY_METRIC = "swh_content_replayer_retries_total" CONTENT_BYTES_METRIC = "swh_content_replayer_bytes" CONTENT_DURATION_METRIC = "swh_content_replayer_duration_seconds" -object_converter_fn: Dict[str, Callable[[Dict], BaseModel]] = { - "origin": Origin.from_dict, - "origin_visit": OriginVisit.from_dict, - "snapshot": Snapshot.from_dict, - "revision": Revision.from_dict, - "release": Release.from_dict, - "directory": Directory.from_dict, - "content": Content.from_dict, - "skipped_content": SkippedContent.from_dict, -} - - -def process_replay_objects(all_objects, *, storage): - for (object_type, objects) in all_objects.items(): - logger.debug("Inserting %s %s objects", len(objects), object_type) - with statsd.timed(GRAPH_DURATION_METRIC, tags={"object_type": object_type}): - _insert_objects(object_type, objects, storage) - statsd.increment( - GRAPH_OPERATIONS_METRIC, len(objects), tags={"object_type": object_type} - ) - if notify: - notify("WATCHDOG=1") - - -def collision_aware_content_add( - content_add_fn: Callable[[Iterable[Any]], None], contents: List[BaseContent] -) -> None: - """Add contents to storage. If a hash collision is detected, an error is - logged. Then this adds the other non colliding contents to the storage. - - Args: - content_add_fn: Storage content callable - contents: List of contents or skipped contents to add to storage - - """ - if not contents: - return - colliding_content_hashes: List[Dict[str, Any]] = [] - while True: - try: - content_add_fn(contents) - except HashCollision as e: - colliding_content_hashes.append( - { - "algo": e.algo, - "hash": e.hash_id, # hex hash id - "objects": e.colliding_contents, # hex hashes - } - ) - colliding_hashes = e.colliding_content_hashes() - # Drop the colliding contents from the transaction - contents = [c for c in contents if c.hashes() not in colliding_hashes] - else: - # Successfully added contents, we are done - break - if colliding_content_hashes: - for collision in colliding_content_hashes: - logger.error("Collision detected: %(collision)s", {"collision": collision}) - - -def _insert_objects(object_type: str, objects: List[Dict], storage) -> None: - """Insert objects of type object_type in the storage. - - """ - objects = fix_objects(object_type, objects) - - if object_type == "content": - contents: List[BaseContent] = [] - skipped_contents: List[BaseContent] = [] - for content in objects: - c = BaseContent.from_dict(content) - if isinstance(c, SkippedContent): - skipped_contents.append(c) - else: - contents.append(c) - - collision_aware_content_add(storage.skipped_content_add, skipped_contents) - collision_aware_content_add(storage.content_add_metadata, contents) - elif object_type == "origin_visit": - visits: List[OriginVisit] = [] - origins: List[Origin] = [] - for obj in objects: - visit = OriginVisit.from_dict(obj) - visits.append(visit) - origins.append(Origin(url=visit.origin)) - storage.origin_add(origins) - storage.origin_visit_upsert(visits) - elif object_type in ("directory", "revision", "release", "snapshot", "origin"): - method = getattr(storage, object_type + "_add") - method(object_converter_fn[object_type](o) for o in objects) - else: - logger.warning("Received a series of %s, this should not happen", object_type) - - def is_hash_in_bytearray(hash_, array, nb_hashes, hash_size=SHA1_SIZE): """ Checks if the given hash is in the provided `array`. The array must be diff --git a/swh/journal/tests/test_backfill.py b/swh/journal/tests/test_backfill.py deleted file mode 100644 --- a/swh/journal/tests/test_backfill.py +++ /dev/null @@ -1,160 +0,0 @@ -# Copyright (C) 2019 The Software Heritage developers -# See the AUTHORS file at the top-level directory of this distribution -# License: GNU General Public License version 3, or any later version -# See top-level LICENSE file for more information - -import pytest - -from swh.journal.backfill import JournalBackfiller, compute_query, PARTITION_KEY - - -TEST_CONFIG = { - "brokers": ["localhost"], - "prefix": "swh.tmp_journal.new", - "client_id": "swh.journal.client.test", - "storage_dbconn": "service=swh-dev", -} - - -def test_config_ko_missing_mandatory_key(): - """Missing configuration key will make the initialization fail - - """ - for key in TEST_CONFIG.keys(): - config = TEST_CONFIG.copy() - config.pop(key) - - with pytest.raises(ValueError) as e: - JournalBackfiller(config) - - error = "Configuration error: The following keys must be" " provided: %s" % ( - ",".join([key]), - ) - assert e.value.args[0] == error - - -def test_config_ko_unknown_object_type(): - """Parse arguments will fail if the object type is unknown - - """ - backfiller = JournalBackfiller(TEST_CONFIG) - with pytest.raises(ValueError) as e: - backfiller.parse_arguments("unknown-object-type", 1, 2) - - error = ( - "Object type unknown-object-type is not supported. " - "The only possible values are %s" % (", ".join(PARTITION_KEY)) - ) - assert e.value.args[0] == error - - -def test_compute_query_content(): - query, where_args, column_aliases = compute_query("content", "\x000000", "\x000001") - - assert where_args == ["\x000000", "\x000001"] - - assert column_aliases == [ - "sha1", - "sha1_git", - "sha256", - "blake2s256", - "length", - "status", - "ctime", - ] - - assert ( - query - == """ -select sha1,sha1_git,sha256,blake2s256,length,status,ctime -from content - -where (sha1) >= %s and (sha1) < %s - """ - ) - - -def test_compute_query_skipped_content(): - query, where_args, column_aliases = compute_query("skipped_content", None, None) - - assert where_args == [] - - assert column_aliases == [ - "sha1", - "sha1_git", - "sha256", - "blake2s256", - "length", - "ctime", - "status", - "reason", - ] - - assert ( - query - == """ -select sha1,sha1_git,sha256,blake2s256,length,ctime,status,reason -from skipped_content - - - """ - ) - - -def test_compute_query_origin_visit(): - query, where_args, column_aliases = compute_query("origin_visit", 1, 10) - - assert where_args == [1, 10] - - assert column_aliases == [ - "visit", - "origin.type", - "origin_visit.type", - "url", - "date", - "snapshot", - "status", - "metadata", - ] - - assert ( - query - == """ -select visit,origin.type,origin_visit.type,url,date,snapshot,status,metadata -from origin_visit -left join origin on origin_visit.origin=origin.id -where (origin_visit.origin) >= %s and (origin_visit.origin) < %s - """ - ) - - -def test_compute_query_release(): - query, where_args, column_aliases = compute_query("release", "\x000002", "\x000003") - - assert where_args == ["\x000002", "\x000003"] - - assert column_aliases == [ - "id", - "date", - "date_offset", - "comment", - "name", - "synthetic", - "date_neg_utc_offset", - "target", - "target_type", - "author_id", - "author_name", - "author_email", - "author_fullname", - ] - - assert ( - query - == """ -select release.id as id,date,date_offset,comment,release.name as name,synthetic,date_neg_utc_offset,target,target_type,a.id as author_id,a.name as author_name,a.email as author_email,a.fullname as author_fullname -from release -left join person a on release.author=a.id -where (release.id) >= %s and (release.id) < %s - """ # noqa - ) diff --git a/swh/journal/tests/test_cli.py b/swh/journal/tests/test_cli.py --- a/swh/journal/tests/test_cli.py +++ b/swh/journal/tests/test_cli.py @@ -9,7 +9,6 @@ import logging import re import tempfile -from typing import Any, Dict from unittest.mock import patch, MagicMock from click.testing import CliRunner @@ -23,7 +22,7 @@ from swh.journal.cli import cli, get_journal_client from swh.journal.replay import CONTENT_REPLAY_RETRIES -from swh.journal.serializers import key_to_kafka, value_to_kafka +from swh.journal.serializers import key_to_kafka logger = logging.getLogger(__name__) @@ -41,7 +40,7 @@ """An swh-storage object that gets injected into the CLI functions.""" storage_config = {"cls": "pipeline", "steps": [{"cls": "memory"},]} storage = get_storage(**storage_config) - with patch("swh.journal.cli.get_storage") as get_storage_mock: + with patch("swh.storage.get_storage") as get_storage_mock: get_storage_mock.return_value = storage yield storage @@ -104,50 +103,6 @@ assert client.batch_size == 50 -def test_replay( - storage, kafka_prefix: str, kafka_consumer_group: str, kafka_server: str, -): - kafka_prefix += ".swh.journal.objects" - - producer = Producer( - { - "bootstrap.servers": kafka_server, - "client.id": "test-producer", - "acks": "all", - } - ) - - snapshot = { - "id": b"foo", - "branches": {b"HEAD": {"target_type": "revision", "target": b"\x01" * 20,}}, - } # type: Dict[str, Any] - producer.produce( - topic=kafka_prefix + ".snapshot", - key=key_to_kafka(snapshot["id"]), - value=value_to_kafka(snapshot), - ) - producer.flush() - - logger.debug("Flushed producer") - - result = invoke( - "replay", - "--stop-after-objects", - "1", - journal_config={ - "brokers": [kafka_server], - "group_id": kafka_consumer_group, - "prefix": kafka_prefix, - }, - ) - - expected = r"Done.\n" - assert result.exit_code == 0, result.output - assert re.fullmatch(expected, result.output, re.MULTILINE), result.output - - assert storage.snapshot_get(snapshot["id"]) == {**snapshot, "next_branch": None} - - def _patch_objstorages(names): objstorages = {name: InMemoryObjStorage() for name in names} diff --git a/swh/journal/tests/test_kafka_writer.py b/swh/journal/tests/test_kafka_writer.py --- a/swh/journal/tests/test_kafka_writer.py +++ b/swh/journal/tests/test_kafka_writer.py @@ -6,9 +6,7 @@ import pytest from confluent_kafka import Consumer, Producer -from swh.storage import get_storage - -from swh.model.model import Directory, Origin, OriginVisit +from swh.model.model import Directory from swh.journal.tests.journal_data import TEST_OBJECTS from swh.journal.pytest_plugin import consume_messages, assert_all_objects_consumed @@ -32,55 +30,6 @@ assert_all_objects_consumed(consumed_messages) -def test_storage_direct_writer(kafka_prefix: str, kafka_server, consumer: Consumer): - kafka_prefix += ".swh.journal.objects" - - writer_config = { - "cls": "kafka", - "brokers": [kafka_server], - "client_id": "kafka_writer", - "prefix": kafka_prefix, - } - storage_config = { - "cls": "pipeline", - "steps": [{"cls": "memory", "journal_writer": writer_config},], - } - - storage = get_storage(**storage_config) - - expected_messages = 0 - - for object_type, objects in TEST_OBJECTS.items(): - method = getattr(storage, object_type + "_add") - if object_type in ( - "content", - "directory", - "revision", - "release", - "snapshot", - "origin", - ): - method(objects) - expected_messages += len(objects) - elif object_type in ("origin_visit",): - for obj in objects: - assert isinstance(obj, OriginVisit) - storage.origin_add_one(Origin(url=obj.origin)) - visit = method(obj.origin, date=obj.date, type=obj.type) - expected_messages += 1 - - obj_d = obj.to_dict() - for k in ("visit", "origin", "date", "type"): - del obj_d[k] - storage.origin_visit_update(obj.origin, visit.visit, **obj_d) - expected_messages += 1 - else: - assert False, object_type - - consumed_messages = consume_messages(consumer, kafka_prefix, expected_messages) - assert_all_objects_consumed(consumed_messages) - - def test_write_delivery_failure( kafka_prefix: str, kafka_server: str, consumer: Consumer ): diff --git a/swh/journal/tests/test_replay.py b/swh/journal/tests/test_replay.py --- a/swh/journal/tests/test_replay.py +++ b/swh/journal/tests/test_replay.py @@ -3,28 +3,10 @@ # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information -import datetime -import functools -import logging -import random -from typing import Dict, List -import dateutil -import pytest - -from confluent_kafka import Producer from hypothesis import strategies, given, settings -from swh.storage import get_storage - -from swh.journal.client import JournalClient -from swh.journal.serializers import key_to_kafka, value_to_kafka -from swh.journal.replay import process_replay_objects, is_hash_in_bytearray -from swh.model.hashutil import hash_to_hex -from swh.model.model import Content - -from .conftest import TEST_OBJECT_DICTS, DUPLICATE_CONTENTS -from .utils import MockedJournalClient, MockedKafkaWriter +from swh.journal.replay import is_hash_in_bytearray storage_config = {"cls": "pipeline", "steps": [{"cls": "memory"},]} @@ -34,360 +16,6 @@ return kafka_prefix + "." + object_type -def test_storage_play( - kafka_prefix: str, kafka_consumer_group: str, kafka_server: str, caplog, -): - """Optimal replayer scenario. - - This: - - writes objects to the topic - - replayer consumes objects from the topic and replay them - - """ - kafka_prefix += ".swh.journal.objects" - - storage = get_storage(**storage_config) - - producer = Producer( - { - "bootstrap.servers": kafka_server, - "client.id": "test producer", - "acks": "all", - } - ) - - now = datetime.datetime.now(tz=datetime.timezone.utc) - - # Fill Kafka - nb_sent = 0 - nb_visits = 0 - for object_type, objects in TEST_OBJECT_DICTS.items(): - topic = make_topic(kafka_prefix, object_type) - for object_ in objects: - key = bytes(random.randint(0, 255) for _ in range(40)) - object_ = object_.copy() - if object_type == "content": - object_["ctime"] = now - elif object_type == "origin_visit": - nb_visits += 1 - object_["visit"] = nb_visits - producer.produce( - topic=topic, key=key_to_kafka(key), value=value_to_kafka(object_), - ) - nb_sent += 1 - - producer.flush() - - caplog.set_level(logging.ERROR, "swh.journal.replay") - # Fill the storage from Kafka - replayer = JournalClient( - brokers=kafka_server, - group_id=kafka_consumer_group, - prefix=kafka_prefix, - stop_after_objects=nb_sent, - ) - worker_fn = functools.partial(process_replay_objects, storage=storage) - nb_inserted = 0 - while nb_inserted < nb_sent: - nb_inserted += replayer.process(worker_fn) - assert nb_sent == nb_inserted - - # Check the objects were actually inserted in the storage - assert TEST_OBJECT_DICTS["revision"] == list( - storage.revision_get([rev["id"] for rev in TEST_OBJECT_DICTS["revision"]]) - ) - assert TEST_OBJECT_DICTS["release"] == list( - storage.release_get([rel["id"] for rel in TEST_OBJECT_DICTS["release"]]) - ) - - origins = list(storage.origin_get([orig for orig in TEST_OBJECT_DICTS["origin"]])) - assert TEST_OBJECT_DICTS["origin"] == [{"url": orig["url"]} for orig in origins] - for origin in origins: - origin_url = origin["url"] - expected_visits = [ - { - **visit, - "origin": origin_url, - "date": dateutil.parser.parse(visit["date"]), - } - for visit in TEST_OBJECT_DICTS["origin_visit"] - if visit["origin"] == origin["url"] - ] - actual_visits = list(storage.origin_visit_get(origin_url)) - for visit in actual_visits: - del visit["visit"] # opaque identifier - assert expected_visits == actual_visits - - input_contents = TEST_OBJECT_DICTS["content"] - contents = storage.content_get_metadata([cont["sha1"] for cont in input_contents]) - assert len(contents) == len(input_contents) - assert contents == {cont["sha1"]: [cont] for cont in input_contents} - - collision = 0 - for record in caplog.records: - logtext = record.getMessage() - if "Colliding contents:" in logtext: - collision += 1 - - assert collision == 0, "No collision should be detected" - - -def test_storage_play_with_collision( - kafka_prefix: str, kafka_consumer_group: str, kafka_server: str, caplog, -): - """Another replayer scenario with collisions. - - This: - - writes objects to the topic, including colliding contents - - replayer consumes objects from the topic and replay them - - This drops the colliding contents from the replay when detected - - """ - kafka_prefix += ".swh.journal.objects" - - storage = get_storage(**storage_config) - - producer = Producer( - { - "bootstrap.servers": kafka_server, - "client.id": "test producer", - "enable.idempotence": "true", - } - ) - - now = datetime.datetime.now(tz=datetime.timezone.utc) - - # Fill Kafka - nb_sent = 0 - nb_visits = 0 - for object_type, objects in TEST_OBJECT_DICTS.items(): - topic = make_topic(kafka_prefix, object_type) - for object_ in objects: - key = bytes(random.randint(0, 255) for _ in range(40)) - object_ = object_.copy() - if object_type == "content": - object_["ctime"] = now - elif object_type == "origin_visit": - nb_visits += 1 - object_["visit"] = nb_visits - producer.produce( - topic=topic, key=key_to_kafka(key), value=value_to_kafka(object_), - ) - nb_sent += 1 - - # Create collision in input data - # They are not written in the destination - for content in DUPLICATE_CONTENTS: - topic = make_topic(kafka_prefix, "content") - producer.produce( - topic=topic, key=key_to_kafka(key), value=value_to_kafka(content), - ) - - nb_sent += 1 - - producer.flush() - - caplog.set_level(logging.ERROR, "swh.journal.replay") - # Fill the storage from Kafka - replayer = JournalClient( - brokers=kafka_server, - group_id=kafka_consumer_group, - prefix=kafka_prefix, - stop_after_objects=nb_sent, - ) - worker_fn = functools.partial(process_replay_objects, storage=storage) - nb_inserted = 0 - while nb_inserted < nb_sent: - nb_inserted += replayer.process(worker_fn) - assert nb_sent == nb_inserted - - # Check the objects were actually inserted in the storage - assert TEST_OBJECT_DICTS["revision"] == list( - storage.revision_get([rev["id"] for rev in TEST_OBJECT_DICTS["revision"]]) - ) - assert TEST_OBJECT_DICTS["release"] == list( - storage.release_get([rel["id"] for rel in TEST_OBJECT_DICTS["release"]]) - ) - - origins = list(storage.origin_get([orig for orig in TEST_OBJECT_DICTS["origin"]])) - assert TEST_OBJECT_DICTS["origin"] == [{"url": orig["url"]} for orig in origins] - for origin in origins: - origin_url = origin["url"] - expected_visits = [ - { - **visit, - "origin": origin_url, - "date": dateutil.parser.parse(visit["date"]), - } - for visit in TEST_OBJECT_DICTS["origin_visit"] - if visit["origin"] == origin["url"] - ] - actual_visits = list(storage.origin_visit_get(origin_url)) - for visit in actual_visits: - del visit["visit"] # opaque identifier - assert expected_visits == actual_visits - - input_contents = TEST_OBJECT_DICTS["content"] - contents = storage.content_get_metadata([cont["sha1"] for cont in input_contents]) - assert len(contents) == len(input_contents) - assert contents == {cont["sha1"]: [cont] for cont in input_contents} - - nb_collisions = 0 - - actual_collision: Dict - for record in caplog.records: - logtext = record.getMessage() - if "Collision detected:" in logtext: - nb_collisions += 1 - actual_collision = record.args["collision"] - - assert nb_collisions == 1, "1 collision should be detected" - - algo = "sha1" - assert actual_collision["algo"] == algo - expected_colliding_hash = hash_to_hex(DUPLICATE_CONTENTS[0][algo]) - assert actual_collision["hash"] == expected_colliding_hash - - actual_colliding_hashes = actual_collision["objects"] - assert len(actual_colliding_hashes) == len(DUPLICATE_CONTENTS) - for content in DUPLICATE_CONTENTS: - expected_content_hashes = { - k: hash_to_hex(v) for k, v in Content.from_dict(content).hashes().items() - } - assert expected_content_hashes in actual_colliding_hashes - - -def _test_write_replay_origin_visit(visits: List[Dict]): - """Helper function to write tests for origin_visit. - - Each visit (a dict) given in the 'visits' argument will be sent to - a (mocked) kafka queue, which a in-memory-storage backed replayer is - listening to. - - Check that corresponding origin visits entities are present in the storage - and have correct values if they are not skipped. - - """ - queue: List = [] - replayer = MockedJournalClient(queue) - writer = MockedKafkaWriter(queue) - - # Note that flipping the order of these two insertions will crash - # the test, because the legacy origin_format does not allow to create - # the origin when needed (type is missing) - writer.send( - "origin", - "foo", - { - "url": "http://example.com/", - "type": "git", # test the legacy origin format is accepted - }, - ) - for visit in visits: - writer.send("origin_visit", "foo", visit) - - queue_size = len(queue) - assert replayer.stop_after_objects is None - replayer.stop_after_objects = queue_size - - storage = get_storage(**storage_config) - worker_fn = functools.partial(process_replay_objects, storage=storage) - - replayer.process(worker_fn) - - actual_visits = list(storage.origin_visit_get("http://example.com/")) - - assert len(actual_visits) == len(visits), actual_visits - - for vin, vout in zip(visits, actual_visits): - vin = vin.copy() - vout = vout.copy() - assert vout.pop("origin") == "http://example.com/" - vin.pop("origin") - vin.setdefault("type", "git") - vin.setdefault("metadata", None) - assert vin == vout - - -def test_write_replay_origin_visit(): - """Test origin_visit when the 'origin' is just a string.""" - now = datetime.datetime.now() - visits = [ - { - "visit": 1, - "origin": "http://example.com/", - "date": now, - "type": "git", - "status": "partial", - "snapshot": None, - } - ] - _test_write_replay_origin_visit(visits) - - -def test_write_replay_legacy_origin_visit1(): - """Origin_visit with no types should make the replayer crash - - We expect the journal's origin_visit topic to no longer reference such - visits. If it does, the replayer must crash so we can fix the journal's - topic. - - """ - now = datetime.datetime.now() - visit = { - "visit": 1, - "origin": "http://example.com/", - "date": now, - "status": "partial", - "snapshot": None, - } - now2 = datetime.datetime.now() - visit2 = { - "visit": 2, - "origin": {"url": "http://example.com/"}, - "date": now2, - "status": "partial", - "snapshot": None, - } - - for origin_visit in [visit, visit2]: - with pytest.raises(ValueError, match="Old origin visit format"): - _test_write_replay_origin_visit([origin_visit]) - - -def test_write_replay_legacy_origin_visit2(): - """Test origin_visit when 'type' is missing from the visit, but not - from the origin.""" - now = datetime.datetime.now() - visits = [ - { - "visit": 1, - "origin": {"url": "http://example.com/", "type": "git",}, - "date": now, - "type": "git", - "status": "partial", - "snapshot": None, - } - ] - _test_write_replay_origin_visit(visits) - - -def test_write_replay_legacy_origin_visit3(): - """Test origin_visit when the origin is a dict""" - now = datetime.datetime.now() - visits = [ - { - "visit": 1, - "origin": {"url": "http://example.com/",}, - "date": now, - "type": "git", - "status": "partial", - "snapshot": None, - } - ] - _test_write_replay_origin_visit(visits) - - hash_strategy = strategies.binary(min_size=20, max_size=20) diff --git a/swh/journal/tests/test_write_replay.py b/swh/journal/tests/test_write_replay.py --- a/swh/journal/tests/test_write_replay.py +++ b/swh/journal/tests/test_write_replay.py @@ -4,121 +4,16 @@ # See top-level LICENSE file for more information import functools -from unittest.mock import patch -import attr from hypothesis import given, settings, HealthCheck from hypothesis.strategies import lists from swh.model.hypothesis_strategies import present_contents -from swh.model.model import Origin -from swh.storage import get_storage -from swh.storage.exc import HashCollision +from swh.objstorage import get_objstorage -from swh.journal.replay import ( - process_replay_objects, - process_replay_objects_content, - object_converter_fn, -) +from swh.journal.replay import process_replay_objects_content from .utils import MockedJournalClient, MockedKafkaWriter -from .conftest import objects_d - -storage_config = { - "cls": "memory", - "journal_writer": {"cls": "memory"}, -} - - -def empty_person_name_email(rev_or_rel): - """Empties the 'name' and 'email' fields of the author/committer fields - of a revision or release; leaving only the fullname.""" - if getattr(rev_or_rel, "author", None): - rev_or_rel = attr.evolve( - rev_or_rel, author=attr.evolve(rev_or_rel.author, name=b"", email=b"",) - ) - - if getattr(rev_or_rel, "committer", None): - rev_or_rel = attr.evolve( - rev_or_rel, - committer=attr.evolve(rev_or_rel.committer, name=b"", email=b"",), - ) - - return rev_or_rel - - -@given(lists(objects_d(), min_size=1)) -@settings(suppress_health_check=[HealthCheck.too_slow]) -def test_write_replay_same_order_batches(objects): - queue = [] - replayer = MockedJournalClient(queue) - - with patch( - "swh.journal.writer.inmemory.InMemoryJournalWriter", - return_value=MockedKafkaWriter(queue), - ): - storage1 = get_storage(**storage_config) - - # Write objects to storage1 - for (obj_type, obj) in objects: - if obj_type == "content" and obj.get("status") == "absent": - obj_type = "skipped_content" - - obj = object_converter_fn[obj_type](obj) - - if obj_type == "origin_visit": - storage1.origin_add_one(Origin(url=obj.origin)) - storage1.origin_visit_upsert([obj]) - else: - method = getattr(storage1, obj_type + "_add") - try: - method([obj]) - except HashCollision: - pass - - # Bail out early if we didn't insert any relevant objects... - queue_size = len(queue) - assert queue_size != 0, "No test objects found; hypothesis strategy bug?" - - assert replayer.stop_after_objects is None - replayer.stop_after_objects = queue_size - - storage2 = get_storage(**storage_config) - worker_fn = functools.partial(process_replay_objects, storage=storage2) - - replayer.process(worker_fn) - - assert replayer.consumer.committed - - for attr_name in ( - "_contents", - "_directories", - "_snapshots", - "_origin_visits", - "_origins", - ): - assert getattr(storage1, attr_name) == getattr(storage2, attr_name), attr_name - - # When hypothesis generates a revision and a release with same - # author (or committer) fullname but different name or email, then - # the storage will use the first name/email it sees. - # This first one will be either the one from the revision or the release, - # and since there is no order guarantees, storage2 has 1/2 chance of - # not seeing the same order as storage1, therefore we need to strip - # them out before comparing. - for attr_name in ("_revisions", "_releases"): - items1 = { - k: empty_person_name_email(v) - for (k, v) in getattr(storage1, attr_name).items() - } - items2 = { - k: empty_person_name_email(v) - for (k, v) in getattr(storage2, attr_name).items() - } - assert items1 == items2, attr_name - - -# TODO: add test for hash collision @given(lists(present_contents(), min_size=1)) @@ -127,17 +22,16 @@ queue = [] replayer = MockedJournalClient(queue) + writer = MockedKafkaWriter(queue) - with patch( - "swh.journal.writer.inmemory.InMemoryJournalWriter", - return_value=MockedKafkaWriter(queue), - ): - storage1 = get_storage(**storage_config) + objstorage1 = get_objstorage(cls="memory", args={}) + objstorage2 = get_objstorage(cls="memory", args={}) contents = [] for obj in objects: - storage1.content_add([obj]) + objstorage1.add(obj.data) contents.append(obj) + writer.write_addition("content", obj) # Bail out early if we didn't insert any relevant objects... queue_size = len(queue) @@ -146,11 +40,6 @@ assert replayer.stop_after_objects is None replayer.stop_after_objects = queue_size - storage2 = get_storage(**storage_config) - - objstorage1 = storage1.objstorage.objstorage - objstorage2 = storage2.objstorage.objstorage - worker_fn = functools.partial( process_replay_objects_content, src=objstorage1, dst=objstorage2 )