diff --git a/swh/journal/backfill.py b/swh/journal/backfill.py deleted file mode 100644 index 902b897..0000000 --- a/swh/journal/backfill.py +++ /dev/null @@ -1,489 +0,0 @@ -# Copyright (C) 2017-2019 The Software Heritage developers -# See the AUTHORS file at the top-level directory of this distribution -# License: GNU General Public License version 3, or any later version -# See top-level LICENSE file for more information - -"""Module defining journal backfiller classes. - -Those backfiller goal is to produce back part or all of the objects -from the storage to the journal topics - -At the moment, a first naive implementation is the -JournalBackfiller. It simply reads the objects from the -storage and sends every object identifier back to the journal. - -""" - -import logging - -from .writer.kafka import KafkaJournalWriter - -from swh.core.db import BaseDb -from swh.storage.converters import db_to_release, db_to_revision - - -logger = logging.getLogger(__name__) - -PARTITION_KEY = { - "content": "sha1", - "skipped_content": None, # unused - "directory": "id", - "revision": "revision.id", - "release": "release.id", - "snapshot": "id", - "origin": "id", - "origin_visit": "origin_visit.origin", -} - -COLUMNS = { - "content": [ - "sha1", - "sha1_git", - "sha256", - "blake2s256", - "length", - "status", - "ctime", - ], - "skipped_content": [ - "sha1", - "sha1_git", - "sha256", - "blake2s256", - "length", - "ctime", - "status", - "reason", - ], - "directory": ["id", "dir_entries", "file_entries", "rev_entries"], - "revision": [ - ("revision.id", "id"), - "date", - "date_offset", - "committer_date", - "committer_date_offset", - "type", - "directory", - "message", - "synthetic", - "metadata", - "date_neg_utc_offset", - "committer_date_neg_utc_offset", - ( - "array(select parent_id::bytea from revision_history rh " - "where rh.id = revision.id order by rh.parent_rank asc)", - "parents", - ), - ("a.id", "author_id"), - ("a.name", "author_name"), - ("a.email", "author_email"), - ("a.fullname", "author_fullname"), - ("c.id", "committer_id"), - ("c.name", "committer_name"), - ("c.email", "committer_email"), - ("c.fullname", "committer_fullname"), - ], - "release": [ - ("release.id", "id"), - "date", - "date_offset", - "comment", - ("release.name", "name"), - "synthetic", - "date_neg_utc_offset", - "target", - "target_type", - ("a.id", "author_id"), - ("a.name", "author_name"), - ("a.email", "author_email"), - ("a.fullname", "author_fullname"), - ], - "snapshot": ["id", "object_id"], - "origin": ["type", "url"], - "origin_visit": [ - "visit", - "origin.type", - "origin_visit.type", - "url", - "date", - "snapshot", - "status", - "metadata", - ], -} - - -JOINS = { - "release": ["person a on release.author=a.id"], - "revision": [ - "person a on revision.author=a.id", - "person c on revision.committer=c.id", - ], - "origin_visit": ["origin on origin_visit.origin=origin.id"], -} - - -def directory_converter(db, directory): - """Convert directory from the flat representation to swh model - compatible objects. - - """ - columns = ["target", "name", "perms"] - query_template = """ - select %(columns)s - from directory_entry_%(type)s - where id in %%s - """ - - types = ["file", "dir", "rev"] - - entries = [] - with db.cursor() as cur: - for type in types: - ids = directory.pop("%s_entries" % type) - if not ids: - continue - query = query_template % { - "columns": ",".join(columns), - "type": type, - } - cur.execute(query, (tuple(ids),)) - for row in cur: - entry = dict(zip(columns, row)) - entry["type"] = type - entries.append(entry) - - directory["entries"] = entries - return directory - - -def revision_converter(db, revision): - """Convert revision from the flat representation to swh model - compatible objects. - - """ - return db_to_revision(revision) - - -def release_converter(db, release): - """Convert release from the flat representation to swh model - compatible objects. - - """ - release = db_to_release(release) - if "author" in release and release["author"]: - del release["author"]["id"] - return release - - -def snapshot_converter(db, snapshot): - """Convert snapshot from the flat representation to swh model - compatible objects. - - """ - columns = ["name", "target", "target_type"] - query = """ - select %s - from snapshot_branches sbs - inner join snapshot_branch sb on sb.object_id=sbs.branch_id - where sbs.snapshot_id=%%s - """ % ", ".join( - columns - ) - with db.cursor() as cur: - cur.execute(query, (snapshot.pop("object_id"),)) - branches = {} - for name, *row in cur: - branch = dict(zip(columns[1:], row)) - if not branch["target"] and not branch["target_type"]: - branch = None - branches[name] = branch - - snapshot["branches"] = branches - return snapshot - - -def origin_visit_converter(db, origin_visit): - origin = { - "type": origin_visit.pop("origin.type"), - "url": origin_visit.pop("url"), - } - origin_visit["origin"] = origin - origin_visit["type"] = origin_visit.pop("origin_visit.type") - return origin_visit - - -CONVERTERS = { - "directory": directory_converter, - "revision": revision_converter, - "release": release_converter, - "snapshot": snapshot_converter, - "origin_visit": origin_visit_converter, -} - - -def object_to_offset(object_id, numbits): - """Compute the index of the range containing object id, when dividing - space into 2^numbits. - - Args: - object_id (str): The hex representation of object_id - numbits (int): Number of bits in which we divide input space - - Returns: - The index of the range containing object id - - """ - q, r = divmod(numbits, 8) - length = q + (r != 0) - shift_bits = 8 - r if r else 0 - - truncated_id = object_id[: length * 2] - if len(truncated_id) < length * 2: - truncated_id += "0" * (length * 2 - len(truncated_id)) - - truncated_id_bytes = bytes.fromhex(truncated_id) - return int.from_bytes(truncated_id_bytes, byteorder="big") >> shift_bits - - -def byte_ranges(numbits, start_object=None, end_object=None): - """Generate start/end pairs of bytes spanning numbits bits and - constrained by optional start_object and end_object. - - Args: - numbits (int): Number of bits in which we divide input space - start_object (str): Hex object id contained in the first range - returned - end_object (str): Hex object id contained in the last range - returned - - Yields: - 2^numbits pairs of bytes - - """ - q, r = divmod(numbits, 8) - length = q + (r != 0) - shift_bits = 8 - r if r else 0 - - def to_bytes(i): - return int.to_bytes(i << shift_bits, length=length, byteorder="big") - - start_offset = 0 - end_offset = 1 << numbits - - if start_object is not None: - start_offset = object_to_offset(start_object, numbits) - if end_object is not None: - end_offset = object_to_offset(end_object, numbits) + 1 - - for start in range(start_offset, end_offset): - end = start + 1 - - if start == 0: - yield None, to_bytes(end) - elif end == 1 << numbits: - yield to_bytes(start), None - else: - yield to_bytes(start), to_bytes(end) - - -def integer_ranges(start, end, block_size=1000): - for start in range(start, end, block_size): - if start == 0: - yield None, block_size - elif start + block_size > end: - yield start, end - else: - yield start, start + block_size - - -RANGE_GENERATORS = { - "content": lambda start, end: byte_ranges(24, start, end), - "skipped_content": lambda start, end: [(None, None)], - "directory": lambda start, end: byte_ranges(24, start, end), - "revision": lambda start, end: byte_ranges(24, start, end), - "release": lambda start, end: byte_ranges(16, start, end), - "snapshot": lambda start, end: byte_ranges(16, start, end), - "origin": integer_ranges, - "origin_visit": integer_ranges, -} - - -def compute_query(obj_type, start, end): - columns = COLUMNS.get(obj_type) - join_specs = JOINS.get(obj_type, []) - join_clause = "\n".join("left join %s" % clause for clause in join_specs) - - where = [] - where_args = [] - if start: - where.append("%(keys)s >= %%s") - where_args.append(start) - if end: - where.append("%(keys)s < %%s") - where_args.append(end) - - where_clause = "" - if where: - where_clause = ("where " + " and ".join(where)) % { - "keys": "(%s)" % PARTITION_KEY[obj_type] - } - - column_specs = [] - column_aliases = [] - for column in columns: - if isinstance(column, str): - column_specs.append(column) - column_aliases.append(column) - else: - column_specs.append("%s as %s" % column) - column_aliases.append(column[1]) - - query = """ -select %(columns)s -from %(table)s -%(join)s -%(where)s - """ % { - "columns": ",".join(column_specs), - "table": obj_type, - "join": join_clause, - "where": where_clause, - } - - return query, where_args, column_aliases - - -def fetch(db, obj_type, start, end): - """Fetch all obj_type's identifiers from db. - - This opens one connection, stream objects and when done, close - the connection. - - Args: - db (BaseDb): Db connection object - obj_type (str): Object type - start (Union[bytes|Tuple]): Range start identifier - end (Union[bytes|Tuple]): Range end identifier - - Raises: - ValueError if obj_type is not supported - - Yields: - Objects in the given range - - """ - query, where_args, column_aliases = compute_query(obj_type, start, end) - converter = CONVERTERS.get(obj_type) - with db.cursor() as cursor: - logger.debug("Fetching data for table %s", obj_type) - logger.debug("query: %s %s", query, where_args) - cursor.execute(query, where_args) - for row in cursor: - record = dict(zip(column_aliases, row)) - if converter: - record = converter(db, record) - - logger.debug("record: %s" % record) - yield record - - -def _format_range_bound(bound): - if isinstance(bound, bytes): - return bound.hex() - else: - return str(bound) - - -MANDATORY_KEYS = ["brokers", "storage_dbconn", "prefix", "client_id"] - - -class JournalBackfiller: - """Class in charge of reading the storage's objects and sends those - back to the journal's topics. - - This is designed to be run periodically. - - """ - - def __init__(self, config=None): - self.config = config - self.check_config(config) - - def check_config(self, config): - missing_keys = [] - for key in MANDATORY_KEYS: - if not config.get(key): - missing_keys.append(key) - - if missing_keys: - raise ValueError( - "Configuration error: The following keys must be" - " provided: %s" % (",".join(missing_keys),) - ) - - def parse_arguments(self, object_type, start_object, end_object): - """Parse arguments - - Raises: - ValueError for unsupported object type - ValueError if object ids are not parseable - - Returns: - Parsed start and end object ids - - """ - if object_type not in COLUMNS: - raise ValueError( - "Object type %s is not supported. " - "The only possible values are %s" - % (object_type, ", ".join(COLUMNS.keys())) - ) - - if object_type in ["origin", "origin_visit"]: - if start_object: - start_object = int(start_object) - else: - start_object = 0 - if end_object: - end_object = int(end_object) - else: - end_object = 100 * 1000 * 1000 # hard-coded limit - - return start_object, end_object - - def run(self, object_type, start_object, end_object, dry_run=False): - """Reads storage's subscribed object types and send them to the - journal's reading topic. - - """ - start_object, end_object = self.parse_arguments( - object_type, start_object, end_object - ) - - db = BaseDb.connect(self.config["storage_dbconn"]) - writer = KafkaJournalWriter( - brokers=self.config["brokers"], - prefix=self.config["prefix"], - client_id=self.config["client_id"], - ) - for range_start, range_end in RANGE_GENERATORS[object_type]( - start_object, end_object - ): - logger.info( - "Processing %s range %s to %s", - object_type, - _format_range_bound(range_start), - _format_range_bound(range_end), - ) - - for obj in fetch(db, object_type, start=range_start, end=range_end,): - if dry_run: - continue - writer.write_addition(object_type=object_type, object_=obj) - - writer.producer.flush() - - -if __name__ == "__main__": - print('Please use the "swh-journal backfiller run" command') diff --git a/swh/journal/cli.py b/swh/journal/cli.py index d88bfa8..d1281a6 100644 --- a/swh/journal/cli.py +++ b/swh/journal/cli.py @@ -1,265 +1,213 @@ # Copyright (C) 2016-2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import functools import logging import mmap import os import warnings import click try: from systemd.daemon import notify except ImportError: notify = None from swh.core import config from swh.core.cli import CONTEXT_SETTINGS from swh.model.model import SHA1_SIZE -from swh.storage import get_storage from swh.objstorage import get_objstorage from swh.journal.client import get_journal_client as get_client from swh.journal.replay import is_hash_in_bytearray -from swh.journal.replay import process_replay_objects from swh.journal.replay import process_replay_objects_content -from swh.journal.backfill import JournalBackfiller @click.group(name="journal", context_settings=CONTEXT_SETTINGS) @click.option( "--config-file", "-C", default=None, type=click.Path(exists=True, dir_okay=False,), help="Configuration file.", ) @click.pass_context def cli(ctx, config_file): """Software Heritage Journal tools. The journal is a persistent logger of changes to the archive, with publish-subscribe support. """ if not config_file: config_file = os.environ.get("SWH_CONFIG_FILENAME") if config_file: if not os.path.exists(config_file): raise ValueError("%s does not exist" % config_file) conf = config.read(config_file) else: conf = {} ctx.ensure_object(dict) ctx.obj["config"] = conf def get_journal_client(ctx, **kwargs): conf = ctx.obj["config"].copy() if "journal" in conf: warnings.warn( "Journal client configuration should now be under the " "`journal_client` field and have a `cls` argument.", DeprecationWarning, ) conf["journal_client"] = {"cls": "kafka", **conf.pop("journal")} client_conf = conf.get("journal_client").copy() client_conf.update(kwargs) try: return get_client(**client_conf) except ValueError as exc: ctx.fail(exc) @cli.command() @click.option( "--stop-after-objects", "-n", default=None, type=int, help="Stop after processing this many objects. Default is to " "run forever.", ) @click.pass_context def replay(ctx, stop_after_objects): - """Fill a Storage by reading a Journal. + """DEPRECATED: use `swh storage replay` instead. - There can be several 'replayers' filling a Storage as long as they use - the same `group-id`. + Requires swh.storage >= 0.0.188. """ - conf = ctx.obj["config"] - try: - storage = get_storage(**conf.pop("storage")) - except KeyError: - ctx.fail("You must have a storage configured in your config file.") - - client = get_journal_client(ctx, stop_after_objects=stop_after_objects) - worker_fn = functools.partial(process_replay_objects, storage=storage) - - if notify: - notify("READY=1") - - try: - client.process(worker_fn) - except KeyboardInterrupt: - ctx.exit(0) - else: - print("Done.") - finally: - if notify: - notify("STOPPING=1") - client.close() + ctx.fail("DEPRECATED") @cli.command() @click.argument("object_type") @click.option("--start-object", default=None) @click.option("--end-object", default=None) @click.option("--dry-run", is_flag=True, default=False) @click.pass_context def backfiller(ctx, object_type, start_object, end_object, dry_run): - """Run the backfiller - - The backfiller list objects from a Storage and produce journal entries from - there. - - Typically used to rebuild a journal or compensate for missing objects in a - journal (eg. due to a downtime of this later). + """DEPRECATED: use `swh storage backfill` instead. - The configuration file requires the following entries: - - brokers: a list of kafka endpoints (the journal) in which entries will be - added. - - storage_dbconn: URL to connect to the storage DB. - - prefix: the prefix of the topics (topics will be .). - - client_id: the kafka client ID. + Requires swh.storage >= 0.0.188. """ - conf = ctx.obj["config"] - backfiller = JournalBackfiller(conf) - - if notify: - notify("READY=1") - - try: - backfiller.run( - object_type=object_type, - start_object=start_object, - end_object=end_object, - dry_run=dry_run, - ) - except KeyboardInterrupt: - if notify: - notify("STOPPING=1") - ctx.exit(0) + ctx.fail("DEPRECATED") @cli.command("content-replay") @click.option( "--stop-after-objects", "-n", default=None, type=int, help="Stop after processing this many objects. Default is to " "run forever.", ) @click.option( "--exclude-sha1-file", default=None, type=click.File("rb"), help="File containing a sorted array of hashes to be excluded.", ) @click.option( "--check-dst/--no-check-dst", default=True, help="Check whether the destination contains the object before " "copying.", ) @click.pass_context def content_replay(ctx, stop_after_objects, exclude_sha1_file, check_dst): """Fill a destination Object Storage (typically a mirror) by reading a Journal and retrieving objects from an existing source ObjStorage. There can be several 'replayers' filling a given ObjStorage as long as they use the same `group-id`. You can use the `KAFKA_GROUP_INSTANCE_ID` environment variable to use KIP-345 static group membership. This service retrieves object ids to copy from the 'content' topic. It will only copy object's content if the object's description in the kafka nmessage has the status:visible set. `--exclude-sha1-file` may be used to exclude some hashes to speed-up the replay in case many of the contents are already in the destination objstorage. It must contain a concatenation of all (sha1) hashes, and it must be sorted. This file will not be fully loaded into memory at any given time, so it can be arbitrarily large. `--check-dst` sets whether the replayer should check in the destination ObjStorage before copying an object. You can turn that off if you know you're copying to an empty ObjStorage. """ conf = ctx.obj["config"] try: objstorage_src = get_objstorage(**conf.pop("objstorage_src")) except KeyError: ctx.fail("You must have a source objstorage configured in " "your config file.") try: objstorage_dst = get_objstorage(**conf.pop("objstorage_dst")) except KeyError: ctx.fail( "You must have a destination objstorage configured " "in your config file." ) if exclude_sha1_file: map_ = mmap.mmap(exclude_sha1_file.fileno(), 0, prot=mmap.PROT_READ) if map_.size() % SHA1_SIZE != 0: ctx.fail( "--exclude-sha1 must link to a file whose size is an " "exact multiple of %d bytes." % SHA1_SIZE ) nb_excluded_hashes = int(map_.size() / SHA1_SIZE) def exclude_fn(obj): return is_hash_in_bytearray(obj["sha1"], map_, nb_excluded_hashes) else: exclude_fn = None client = get_journal_client( ctx, stop_after_objects=stop_after_objects, object_types=("content",) ) worker_fn = functools.partial( process_replay_objects_content, src=objstorage_src, dst=objstorage_dst, exclude_fn=exclude_fn, check_dst=check_dst, ) if notify: notify("READY=1") try: client.process(worker_fn) except KeyboardInterrupt: ctx.exit(0) else: print("Done.") finally: if notify: notify("STOPPING=1") client.close() def main(): logging.basicConfig() return cli(auto_envvar_prefix="SWH_JOURNAL") if __name__ == "__main__": main() diff --git a/swh/journal/direct_writer.py b/swh/journal/direct_writer.py deleted file mode 100644 index 2f6904c..0000000 --- a/swh/journal/direct_writer.py +++ /dev/null @@ -1,7 +0,0 @@ -# Copyright (C) 2019 The Software Heritage developers -# See the AUTHORS file at the top-level directory of this distribution -# License: GNU General Public License version 3, or any later version -# See top-level LICENSE file for more information - -# for BW compat -from .writer.kafka import KafkaJournalWriter as DirectKafkaWriter # noqa diff --git a/swh/journal/fixer.py b/swh/journal/fixer.py deleted file mode 100644 index 7322b35..0000000 --- a/swh/journal/fixer.py +++ /dev/null @@ -1,293 +0,0 @@ -import copy -import logging -from typing import Any, Dict, List, Optional -from swh.model.identifiers import normalize_timestamp - -logger = logging.getLogger(__name__) - - -def _fix_content(content: Dict[str, Any]) -> Dict[str, Any]: - """Filters-out invalid 'perms' key that leaked from swh.model.from_disk - to the journal. - - >>> _fix_content({'perms': 0o100644, 'sha1_git': b'foo'}) - {'sha1_git': b'foo'} - - >>> _fix_content({'sha1_git': b'bar'}) - {'sha1_git': b'bar'} - - """ - content = content.copy() - content.pop("perms", None) - return content - - -def _fix_revision_pypi_empty_string(rev): - """PyPI loader failed to encode empty strings as bytes, see: - swh:1:rev:8f0095ee0664867055d03de9bcc8f95b91d8a2b9 - or https://forge.softwareheritage.org/D1772 - """ - rev = { - **rev, - "author": rev["author"].copy(), - "committer": rev["committer"].copy(), - } - if rev["author"].get("email") == "": - rev["author"]["email"] = b"" - if rev["author"].get("name") == "": - rev["author"]["name"] = b"" - if rev["committer"].get("email") == "": - rev["committer"]["email"] = b"" - if rev["committer"].get("name") == "": - rev["committer"]["name"] = b"" - return rev - - -def _fix_revision_transplant_source(rev): - if rev.get("metadata") and rev["metadata"].get("extra_headers"): - rev = copy.deepcopy(rev) - rev["metadata"]["extra_headers"] = [ - [key, value.encode("ascii")] - if key == "transplant_source" and isinstance(value, str) - else [key, value] - for (key, value) in rev["metadata"]["extra_headers"] - ] - return rev - - -def _check_date(date): - """Returns whether the date can be represented in backends with sane - limits on timestamps and timezones (resp. signed 64-bits and - signed 16 bits), and that microseconds is valid (ie. between 0 and 10^6). - """ - if date is None: - return True - date = normalize_timestamp(date) - return ( - (-(2 ** 63) <= date["timestamp"]["seconds"] < 2 ** 63) - and (0 <= date["timestamp"]["microseconds"] < 10 ** 6) - and (-(2 ** 15) <= date["offset"] < 2 ** 15) - ) - - -def _check_revision_date(rev): - """Exclude revisions with invalid dates. - See https://forge.softwareheritage.org/T1339""" - return _check_date(rev["date"]) and _check_date(rev["committer_date"]) - - -def _fix_revision(revision: Dict[str, Any]) -> Optional[Dict]: - """Fix various legacy revision issues. - - Fix author/committer person: - - >>> from pprint import pprint - >>> date = { - ... 'timestamp': { - ... 'seconds': 1565096932, - ... 'microseconds': 0, - ... }, - ... 'offset': 0, - ... } - >>> rev0 = _fix_revision({ - ... 'id': b'rev-id', - ... 'author': {'fullname': b'', 'name': '', 'email': ''}, - ... 'committer': {'fullname': b'', 'name': '', 'email': ''}, - ... 'date': date, - ... 'committer_date': date, - ... 'type': 'git', - ... 'message': '', - ... 'directory': b'dir-id', - ... 'synthetic': False, - ... }) - >>> rev0['author'] - {'fullname': b'', 'name': b'', 'email': b''} - >>> rev0['committer'] - {'fullname': b'', 'name': b'', 'email': b''} - - Fix type of 'transplant_source' extra headers: - - >>> rev1 = _fix_revision({ - ... 'id': b'rev-id', - ... 'author': {'fullname': b'', 'name': '', 'email': ''}, - ... 'committer': {'fullname': b'', 'name': '', 'email': ''}, - ... 'date': date, - ... 'committer_date': date, - ... 'metadata': { - ... 'extra_headers': [ - ... ['time_offset_seconds', b'-3600'], - ... ['transplant_source', '29c154a012a70f49df983625090434587622b39e'] - ... ]}, - ... 'type': 'git', - ... 'message': '', - ... 'directory': b'dir-id', - ... 'synthetic': False, - ... }) - >>> pprint(rev1['metadata']['extra_headers']) - [['time_offset_seconds', b'-3600'], - ['transplant_source', b'29c154a012a70f49df983625090434587622b39e']] - - Revision with invalid date are filtered: - - >>> from copy import deepcopy - >>> invalid_date1 = deepcopy(date) - >>> invalid_date1['timestamp']['microseconds'] = 1000000000 # > 10^6 - >>> rev = _fix_revision({ - ... 'author': {'fullname': b'', 'name': '', 'email': ''}, - ... 'committer': {'fullname': b'', 'name': '', 'email': ''}, - ... 'date': invalid_date1, - ... 'committer_date': date, - ... }) - >>> rev is None - True - - >>> invalid_date2 = deepcopy(date) - >>> invalid_date2['timestamp']['seconds'] = 2**70 # > 10^63 - >>> rev = _fix_revision({ - ... 'author': {'fullname': b'', 'name': '', 'email': ''}, - ... 'committer': {'fullname': b'', 'name': '', 'email': ''}, - ... 'date': invalid_date2, - ... 'committer_date': date, - ... }) - >>> rev is None - True - - >>> invalid_date3 = deepcopy(date) - >>> invalid_date3['offset'] = 2**20 # > 10^15 - >>> rev = _fix_revision({ - ... 'author': {'fullname': b'', 'name': '', 'email': ''}, - ... 'committer': {'fullname': b'', 'name': '', 'email': ''}, - ... 'date': date, - ... 'committer_date': invalid_date3, - ... }) - >>> rev is None - True - - """ # noqa - rev = _fix_revision_pypi_empty_string(revision) - rev = _fix_revision_transplant_source(rev) - if not _check_revision_date(rev): - logger.warning( - "Invalid revision date detected: %(revision)s", {"revision": rev} - ) - return None - return rev - - -def _fix_origin(origin: Dict) -> Dict: - """Fix legacy origin with type which is no longer part of the model. - - >>> from pprint import pprint - >>> pprint(_fix_origin({ - ... 'url': 'http://foo', - ... })) - {'url': 'http://foo'} - >>> pprint(_fix_origin({ - ... 'url': 'http://bar', - ... 'type': 'foo', - ... })) - {'url': 'http://bar'} - - """ - o = origin.copy() - o.pop("type", None) - return o - - -def _fix_origin_visit(visit: Dict) -> Dict: - """Fix various legacy origin visit issues. - - `visit['origin']` is a dict instead of an URL: - - >>> from datetime import datetime, timezone - >>> from pprint import pprint - >>> date = datetime(2020, 2, 27, 14, 39, 19, tzinfo=timezone.utc) - >>> pprint(_fix_origin_visit({ - ... 'origin': {'url': 'http://foo'}, - ... 'date': date, - ... 'type': 'git', - ... 'status': 'ongoing', - ... 'snapshot': None, - ... })) - {'date': datetime.datetime(2020, 2, 27, 14, 39, 19, tzinfo=datetime.timezone.utc), - 'metadata': None, - 'origin': 'http://foo', - 'snapshot': None, - 'status': 'ongoing', - 'type': 'git'} - - `visit['type']` is missing , but `origin['visit']['type']` exists: - - >>> pprint(_fix_origin_visit( - ... {'origin': {'type': 'hg', 'url': 'http://foo'}, - ... 'date': date, - ... 'status': 'ongoing', - ... 'snapshot': None, - ... })) - {'date': datetime.datetime(2020, 2, 27, 14, 39, 19, tzinfo=datetime.timezone.utc), - 'metadata': None, - 'origin': 'http://foo', - 'snapshot': None, - 'status': 'ongoing', - 'type': 'hg'} - - Old visit format (origin_visit with no type) raises: - - >>> _fix_origin_visit({ - ... 'origin': {'url': 'http://foo'}, - ... 'date': date, - ... 'status': 'ongoing', - ... 'snapshot': None - ... }) - Traceback (most recent call last): - ... - ValueError: Old origin visit format detected... - - >>> _fix_origin_visit({ - ... 'origin': 'http://foo', - ... 'date': date, - ... 'status': 'ongoing', - ... 'snapshot': None - ... }) - Traceback (most recent call last): - ... - ValueError: Old origin visit format detected... - - """ # noqa - visit = visit.copy() - if "type" not in visit: - if isinstance(visit["origin"], dict) and "type" in visit["origin"]: - # Very old version of the schema: visits did not have a type, - # but their 'origin' field was a dict with a 'type' key. - visit["type"] = visit["origin"]["type"] - else: - # Very old schema version: 'type' is missing, stop early - - # We expect the journal's origin_visit topic to no longer reference - # such visits. If it does, the replayer must crash so we can fix - # the journal's topic. - raise ValueError(f"Old origin visit format detected: {visit}") - if isinstance(visit["origin"], dict): - # Old version of the schema: visit['origin'] was a dict. - visit["origin"] = visit["origin"]["url"] - if "metadata" not in visit: - visit["metadata"] = None - return visit - - -def fix_objects(object_type: str, objects: List[Dict]) -> List[Dict]: - """ - Fix legacy objects from the journal to bring them up to date with the - latest storage schema. - """ - if object_type == "content": - return [_fix_content(v) for v in objects] - elif object_type == "revision": - revisions = [_fix_revision(v) for v in objects] - return [rev for rev in revisions if rev is not None] - elif object_type == "origin": - return [_fix_origin(v) for v in objects] - elif object_type == "origin_visit": - return [_fix_origin_visit(v) for v in objects] - else: - return objects diff --git a/swh/journal/replay.py b/swh/journal/replay.py index dfc6fab..afb2c46 100644 --- a/swh/journal/replay.py +++ b/swh/journal/replay.py @@ -1,413 +1,303 @@ # Copyright (C) 2019-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import logging from time import time -from typing import Any, Callable, Dict, Iterable, List, Optional +from typing import Callable, Dict, List, Optional from sentry_sdk import capture_exception, push_scope try: from systemd.daemon import notify except ImportError: notify = None from tenacity import ( retry, retry_if_exception_type, stop_after_attempt, wait_random_exponential, ) from swh.core.statsd import statsd -from swh.journal.fixer import fix_objects from swh.model.hashutil import hash_to_hex +from swh.model.model import SHA1_SIZE -from swh.model.model import ( - BaseContent, - BaseModel, - Content, - Directory, - Origin, - OriginVisit, - Revision, - SHA1_SIZE, - SkippedContent, - Snapshot, - Release, -) from swh.objstorage.objstorage import ( ID_HASH_ALGO, ObjNotFoundError, ObjStorage, ) -from swh.storage.exc import HashCollision logger = logging.getLogger(__name__) -GRAPH_OPERATIONS_METRIC = "swh_graph_replayer_operations_total" -GRAPH_DURATION_METRIC = "swh_graph_replayer_duration_seconds" CONTENT_OPERATIONS_METRIC = "swh_content_replayer_operations_total" CONTENT_RETRY_METRIC = "swh_content_replayer_retries_total" CONTENT_BYTES_METRIC = "swh_content_replayer_bytes" CONTENT_DURATION_METRIC = "swh_content_replayer_duration_seconds" -object_converter_fn: Dict[str, Callable[[Dict], BaseModel]] = { - "origin": Origin.from_dict, - "origin_visit": OriginVisit.from_dict, - "snapshot": Snapshot.from_dict, - "revision": Revision.from_dict, - "release": Release.from_dict, - "directory": Directory.from_dict, - "content": Content.from_dict, - "skipped_content": SkippedContent.from_dict, -} - - -def process_replay_objects(all_objects, *, storage): - for (object_type, objects) in all_objects.items(): - logger.debug("Inserting %s %s objects", len(objects), object_type) - with statsd.timed(GRAPH_DURATION_METRIC, tags={"object_type": object_type}): - _insert_objects(object_type, objects, storage) - statsd.increment( - GRAPH_OPERATIONS_METRIC, len(objects), tags={"object_type": object_type} - ) - if notify: - notify("WATCHDOG=1") - - -def collision_aware_content_add( - content_add_fn: Callable[[Iterable[Any]], None], contents: List[BaseContent] -) -> None: - """Add contents to storage. If a hash collision is detected, an error is - logged. Then this adds the other non colliding contents to the storage. - - Args: - content_add_fn: Storage content callable - contents: List of contents or skipped contents to add to storage - - """ - if not contents: - return - colliding_content_hashes: List[Dict[str, Any]] = [] - while True: - try: - content_add_fn(contents) - except HashCollision as e: - colliding_content_hashes.append( - { - "algo": e.algo, - "hash": e.hash_id, # hex hash id - "objects": e.colliding_contents, # hex hashes - } - ) - colliding_hashes = e.colliding_content_hashes() - # Drop the colliding contents from the transaction - contents = [c for c in contents if c.hashes() not in colliding_hashes] - else: - # Successfully added contents, we are done - break - if colliding_content_hashes: - for collision in colliding_content_hashes: - logger.error("Collision detected: %(collision)s", {"collision": collision}) - - -def _insert_objects(object_type: str, objects: List[Dict], storage) -> None: - """Insert objects of type object_type in the storage. - - """ - objects = fix_objects(object_type, objects) - - if object_type == "content": - contents: List[BaseContent] = [] - skipped_contents: List[BaseContent] = [] - for content in objects: - c = BaseContent.from_dict(content) - if isinstance(c, SkippedContent): - skipped_contents.append(c) - else: - contents.append(c) - - collision_aware_content_add(storage.skipped_content_add, skipped_contents) - collision_aware_content_add(storage.content_add_metadata, contents) - elif object_type == "origin_visit": - visits: List[OriginVisit] = [] - origins: List[Origin] = [] - for obj in objects: - visit = OriginVisit.from_dict(obj) - visits.append(visit) - origins.append(Origin(url=visit.origin)) - storage.origin_add(origins) - storage.origin_visit_upsert(visits) - elif object_type in ("directory", "revision", "release", "snapshot", "origin"): - method = getattr(storage, object_type + "_add") - method(object_converter_fn[object_type](o) for o in objects) - else: - logger.warning("Received a series of %s, this should not happen", object_type) - - def is_hash_in_bytearray(hash_, array, nb_hashes, hash_size=SHA1_SIZE): """ Checks if the given hash is in the provided `array`. The array must be a *sorted* list of sha1 hashes, and contain `nb_hashes` hashes (so its size must by `nb_hashes*hash_size` bytes). Args: hash_ (bytes): the hash to look for array (bytes): a sorted concatenated array of hashes (may be of any type supporting slice indexing, eg. :class:`mmap.mmap`) nb_hashes (int): number of hashes in the array hash_size (int): size of a hash (defaults to 20, for SHA1) Example: >>> import os >>> hash1 = os.urandom(20) >>> hash2 = os.urandom(20) >>> hash3 = os.urandom(20) >>> array = b''.join(sorted([hash1, hash2])) >>> is_hash_in_bytearray(hash1, array, 2) True >>> is_hash_in_bytearray(hash2, array, 2) True >>> is_hash_in_bytearray(hash3, array, 2) False """ if len(hash_) != hash_size: raise ValueError("hash_ does not match the provided hash_size.") def get_hash(position): return array[position * hash_size : (position + 1) * hash_size] # Regular dichotomy: left = 0 right = nb_hashes while left < right - 1: middle = int((right + left) / 2) pivot = get_hash(middle) if pivot == hash_: return True elif pivot < hash_: left = middle else: right = middle return get_hash(left) == hash_ class ReplayError(Exception): """An error occurred during the replay of an object""" def __init__(self, operation, *, obj_id, exc): self.operation = operation self.obj_id = hash_to_hex(obj_id) self.exc = exc def __str__(self): return "ReplayError(doing %s, %s, %s)" % (self.operation, self.obj_id, self.exc) def log_replay_retry(retry_obj, sleep, last_result): """Log a retry of the content replayer""" exc = last_result.exception() logger.debug( "Retry operation %(operation)s on %(obj_id)s: %(exc)s", {"operation": exc.operation, "obj_id": exc.obj_id, "exc": str(exc.exc)}, ) statsd.increment( CONTENT_RETRY_METRIC, tags={ "operation": exc.operation, "attempt": str(retry_obj.statistics["attempt_number"]), }, ) def log_replay_error(last_attempt): """Log a replay error to sentry""" exc = last_attempt.exception() with push_scope() as scope: scope.set_tag("operation", exc.operation) scope.set_extra("obj_id", exc.obj_id) capture_exception(exc.exc) logger.error( "Failed operation %(operation)s on %(obj_id)s after %(retries)s" " retries: %(exc)s", { "obj_id": exc.obj_id, "operation": exc.operation, "exc": str(exc.exc), "retries": last_attempt.attempt_number, }, ) return None CONTENT_REPLAY_RETRIES = 3 content_replay_retry = retry( retry=retry_if_exception_type(ReplayError), stop=stop_after_attempt(CONTENT_REPLAY_RETRIES), wait=wait_random_exponential(multiplier=1, max=60), before_sleep=log_replay_retry, retry_error_callback=log_replay_error, ) @content_replay_retry def copy_object(obj_id, src, dst): hex_obj_id = hash_to_hex(obj_id) obj = "" try: with statsd.timed(CONTENT_DURATION_METRIC, tags={"request": "get"}): obj = src.get(obj_id) logger.debug("retrieved %(obj_id)s", {"obj_id": hex_obj_id}) with statsd.timed(CONTENT_DURATION_METRIC, tags={"request": "put"}): dst.add(obj, obj_id=obj_id, check_presence=False) logger.debug("copied %(obj_id)s", {"obj_id": hex_obj_id}) statsd.increment(CONTENT_BYTES_METRIC, len(obj)) except ObjNotFoundError: logger.error( "Failed to copy %(obj_id)s: object not found", {"obj_id": hex_obj_id} ) raise except Exception as exc: raise ReplayError("copy", obj_id=obj_id, exc=exc) from None return len(obj) @content_replay_retry def obj_in_objstorage(obj_id, dst): """Check if an object is already in an objstorage, tenaciously""" try: return obj_id in dst except Exception as exc: raise ReplayError("in_dst", obj_id=obj_id, exc=exc) from None def process_replay_objects_content( all_objects: Dict[str, List[dict]], *, src: ObjStorage, dst: ObjStorage, exclude_fn: Optional[Callable[[dict], bool]] = None, check_dst: bool = True, ): """ Takes a list of records from Kafka (see :py:func:`swh.journal.client.JournalClient.process`) and copies them from the `src` objstorage to the `dst` objstorage, if: * `obj['status']` is `'visible'` * `exclude_fn(obj)` is `False` (if `exclude_fn` is provided) * `obj['sha1'] not in dst` (if `check_dst` is True) Args: all_objects: Objects passed by the Kafka client. Most importantly, `all_objects['content'][*]['sha1']` is the sha1 hash of each content. src: An object storage (see :py:func:`swh.objstorage.get_objstorage`) dst: An object storage (see :py:func:`swh.objstorage.get_objstorage`) exclude_fn: Determines whether an object should be copied. check_dst: Determines whether we should check the destination objstorage before copying. Example: >>> from swh.objstorage import get_objstorage >>> src = get_objstorage('memory', {}) >>> dst = get_objstorage('memory', {}) >>> id1 = src.add(b'foo bar') >>> id2 = src.add(b'baz qux') >>> kafka_partitions = { ... 'content': [ ... { ... 'sha1': id1, ... 'status': 'visible', ... }, ... { ... 'sha1': id2, ... 'status': 'visible', ... }, ... ] ... } >>> process_replay_objects_content( ... kafka_partitions, src=src, dst=dst, ... exclude_fn=lambda obj: obj['sha1'] == id1) >>> id1 in dst False >>> id2 in dst True """ vol = [] nb_skipped = 0 nb_failures = 0 t0 = time() for (object_type, objects) in all_objects.items(): if object_type != "content": logger.warning( "Received a series of %s, this should not happen", object_type ) continue for obj in objects: obj_id = obj[ID_HASH_ALGO] if obj["status"] != "visible": nb_skipped += 1 logger.debug( "skipped %s (status=%s)", hash_to_hex(obj_id), obj["status"] ) statsd.increment( CONTENT_OPERATIONS_METRIC, tags={"decision": "skipped", "status": obj["status"]}, ) elif exclude_fn and exclude_fn(obj): nb_skipped += 1 logger.debug("skipped %s (manually excluded)", hash_to_hex(obj_id)) statsd.increment( CONTENT_OPERATIONS_METRIC, tags={"decision": "excluded"} ) elif check_dst and obj_in_objstorage(obj_id, dst): nb_skipped += 1 logger.debug("skipped %s (in dst)", hash_to_hex(obj_id)) statsd.increment(CONTENT_OPERATIONS_METRIC, tags={"decision": "in_dst"}) else: try: copied = copy_object(obj_id, src, dst) except ObjNotFoundError: nb_skipped += 1 statsd.increment( CONTENT_OPERATIONS_METRIC, tags={"decision": "not_in_src"} ) else: if copied is None: nb_failures += 1 statsd.increment( CONTENT_OPERATIONS_METRIC, tags={"decision": "failed"} ) else: vol.append(copied) statsd.increment( CONTENT_OPERATIONS_METRIC, tags={"decision": "copied"} ) dt = time() - t0 logger.info( "processed %s content objects in %.1fsec " "(%.1f obj/sec, %.1fMB/sec) - %d failed - %d skipped", len(vol), dt, len(vol) / dt, sum(vol) / 1024 / 1024 / dt, nb_failures, nb_skipped, ) if notify: notify("WATCHDOG=1") diff --git a/swh/journal/tests/test_backfill.py b/swh/journal/tests/test_backfill.py deleted file mode 100644 index 07194b0..0000000 --- a/swh/journal/tests/test_backfill.py +++ /dev/null @@ -1,160 +0,0 @@ -# Copyright (C) 2019 The Software Heritage developers -# See the AUTHORS file at the top-level directory of this distribution -# License: GNU General Public License version 3, or any later version -# See top-level LICENSE file for more information - -import pytest - -from swh.journal.backfill import JournalBackfiller, compute_query, PARTITION_KEY - - -TEST_CONFIG = { - "brokers": ["localhost"], - "prefix": "swh.tmp_journal.new", - "client_id": "swh.journal.client.test", - "storage_dbconn": "service=swh-dev", -} - - -def test_config_ko_missing_mandatory_key(): - """Missing configuration key will make the initialization fail - - """ - for key in TEST_CONFIG.keys(): - config = TEST_CONFIG.copy() - config.pop(key) - - with pytest.raises(ValueError) as e: - JournalBackfiller(config) - - error = "Configuration error: The following keys must be" " provided: %s" % ( - ",".join([key]), - ) - assert e.value.args[0] == error - - -def test_config_ko_unknown_object_type(): - """Parse arguments will fail if the object type is unknown - - """ - backfiller = JournalBackfiller(TEST_CONFIG) - with pytest.raises(ValueError) as e: - backfiller.parse_arguments("unknown-object-type", 1, 2) - - error = ( - "Object type unknown-object-type is not supported. " - "The only possible values are %s" % (", ".join(PARTITION_KEY)) - ) - assert e.value.args[0] == error - - -def test_compute_query_content(): - query, where_args, column_aliases = compute_query("content", "\x000000", "\x000001") - - assert where_args == ["\x000000", "\x000001"] - - assert column_aliases == [ - "sha1", - "sha1_git", - "sha256", - "blake2s256", - "length", - "status", - "ctime", - ] - - assert ( - query - == """ -select sha1,sha1_git,sha256,blake2s256,length,status,ctime -from content - -where (sha1) >= %s and (sha1) < %s - """ - ) - - -def test_compute_query_skipped_content(): - query, where_args, column_aliases = compute_query("skipped_content", None, None) - - assert where_args == [] - - assert column_aliases == [ - "sha1", - "sha1_git", - "sha256", - "blake2s256", - "length", - "ctime", - "status", - "reason", - ] - - assert ( - query - == """ -select sha1,sha1_git,sha256,blake2s256,length,ctime,status,reason -from skipped_content - - - """ - ) - - -def test_compute_query_origin_visit(): - query, where_args, column_aliases = compute_query("origin_visit", 1, 10) - - assert where_args == [1, 10] - - assert column_aliases == [ - "visit", - "origin.type", - "origin_visit.type", - "url", - "date", - "snapshot", - "status", - "metadata", - ] - - assert ( - query - == """ -select visit,origin.type,origin_visit.type,url,date,snapshot,status,metadata -from origin_visit -left join origin on origin_visit.origin=origin.id -where (origin_visit.origin) >= %s and (origin_visit.origin) < %s - """ - ) - - -def test_compute_query_release(): - query, where_args, column_aliases = compute_query("release", "\x000002", "\x000003") - - assert where_args == ["\x000002", "\x000003"] - - assert column_aliases == [ - "id", - "date", - "date_offset", - "comment", - "name", - "synthetic", - "date_neg_utc_offset", - "target", - "target_type", - "author_id", - "author_name", - "author_email", - "author_fullname", - ] - - assert ( - query - == """ -select release.id as id,date,date_offset,comment,release.name as name,synthetic,date_neg_utc_offset,target,target_type,a.id as author_id,a.name as author_name,a.email as author_email,a.fullname as author_fullname -from release -left join person a on release.author=a.id -where (release.id) >= %s and (release.id) < %s - """ # noqa - ) diff --git a/swh/journal/tests/test_cli.py b/swh/journal/tests/test_cli.py index b1924d8..c100461 100644 --- a/swh/journal/tests/test_cli.py +++ b/swh/journal/tests/test_cli.py @@ -1,655 +1,610 @@ # Copyright (C) 2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from collections import Counter import copy import functools import logging import re import tempfile -from typing import Any, Dict from unittest.mock import patch, MagicMock from click.testing import CliRunner from confluent_kafka import Producer import pytest import yaml from swh.model.hashutil import hash_to_hex from swh.objstorage.backends.in_memory import InMemoryObjStorage from swh.storage import get_storage from swh.journal.cli import cli, get_journal_client from swh.journal.replay import CONTENT_REPLAY_RETRIES -from swh.journal.serializers import key_to_kafka, value_to_kafka +from swh.journal.serializers import key_to_kafka logger = logging.getLogger(__name__) CLI_CONFIG = { "storage": {"cls": "memory",}, "objstorage_src": {"cls": "mocked", "name": "src",}, "objstorage_dst": {"cls": "mocked", "name": "dst",}, } @pytest.fixture def storage(): """An swh-storage object that gets injected into the CLI functions.""" storage_config = {"cls": "pipeline", "steps": [{"cls": "memory"},]} storage = get_storage(**storage_config) - with patch("swh.journal.cli.get_storage") as get_storage_mock: + with patch("swh.storage.get_storage") as get_storage_mock: get_storage_mock.return_value = storage yield storage @pytest.fixture def monkeypatch_retry_sleep(monkeypatch): from swh.journal.replay import copy_object, obj_in_objstorage monkeypatch.setattr(copy_object.retry, "sleep", lambda x: None) monkeypatch.setattr(obj_in_objstorage.retry, "sleep", lambda x: None) def invoke(*args, env=None, journal_config=None): config = copy.deepcopy(CLI_CONFIG) if journal_config: config["journal_client"] = journal_config.copy() config["journal_client"]["cls"] = "kafka" runner = CliRunner() with tempfile.NamedTemporaryFile("a", suffix=".yml") as config_fd: yaml.dump(config, config_fd) config_fd.seek(0) args = ["-C" + config_fd.name] + list(args) return runner.invoke(cli, args, obj={"log_level": logging.DEBUG}, env=env,) def test_get_journal_client_config_bwcompat(kafka_server): cfg = { "journal": { "brokers": [kafka_server], "group_id": "toto", "prefix": "xiferp", "object_types": ["content"], "batch_size": 50, } } ctx = MagicMock(obj={"config": cfg}) with pytest.deprecated_call(): client = get_journal_client(ctx, stop_after_objects=10, prefix="prefix") assert client.subscription == ["prefix.content"] assert client.stop_after_objects == 10 assert client.batch_size == 50 def test_get_journal_client_config(kafka_server): cfg = { "journal_client": { "cls": "kafka", "brokers": [kafka_server], "group_id": "toto", "prefix": "xiferp", "object_types": ["content"], "batch_size": 50, } } ctx = MagicMock(obj={"config": cfg}) client = get_journal_client(ctx, stop_after_objects=10, prefix="prefix") assert client.subscription == ["prefix.content"] assert client.stop_after_objects == 10 assert client.batch_size == 50 -def test_replay( - storage, kafka_prefix: str, kafka_consumer_group: str, kafka_server: str, -): - kafka_prefix += ".swh.journal.objects" - - producer = Producer( - { - "bootstrap.servers": kafka_server, - "client.id": "test-producer", - "acks": "all", - } - ) - - snapshot = { - "id": b"foo", - "branches": {b"HEAD": {"target_type": "revision", "target": b"\x01" * 20,}}, - } # type: Dict[str, Any] - producer.produce( - topic=kafka_prefix + ".snapshot", - key=key_to_kafka(snapshot["id"]), - value=value_to_kafka(snapshot), - ) - producer.flush() - - logger.debug("Flushed producer") - - result = invoke( - "replay", - "--stop-after-objects", - "1", - journal_config={ - "brokers": [kafka_server], - "group_id": kafka_consumer_group, - "prefix": kafka_prefix, - }, - ) - - expected = r"Done.\n" - assert result.exit_code == 0, result.output - assert re.fullmatch(expected, result.output, re.MULTILINE), result.output - - assert storage.snapshot_get(snapshot["id"]) == {**snapshot, "next_branch": None} - - def _patch_objstorages(names): objstorages = {name: InMemoryObjStorage() for name in names} def get_mock_objstorage(cls, **args): assert cls == "mocked", cls return objstorages[args["name"]] def decorator(f): @functools.wraps(f) @patch("swh.journal.cli.get_objstorage") def newf(get_objstorage_mock, *args, **kwargs): get_objstorage_mock.side_effect = get_mock_objstorage f(*args, objstorages=objstorages, **kwargs) return newf return decorator NUM_CONTENTS = 10 def _fill_objstorage_and_kafka(kafka_server, kafka_prefix, objstorages): producer = Producer( { "bootstrap.servers": kafka_server, "client.id": "test-producer", "acks": "all", } ) contents = {} for i in range(NUM_CONTENTS): content = b"\x00" * 19 + bytes([i]) sha1 = objstorages["src"].add(content) contents[sha1] = content producer.produce( topic=kafka_prefix + ".content", key=key_to_kafka(sha1), value=key_to_kafka({"sha1": sha1, "status": "visible",}), ) producer.flush() return contents @_patch_objstorages(["src", "dst"]) def test_replay_content( objstorages, storage, kafka_prefix: str, kafka_consumer_group: str, kafka_server: str, ): kafka_prefix += ".swh.journal.objects" contents = _fill_objstorage_and_kafka(kafka_server, kafka_prefix, objstorages) result = invoke( "content-replay", "--stop-after-objects", str(NUM_CONTENTS), journal_config={ "brokers": [kafka_server], "group_id": kafka_consumer_group, "prefix": kafka_prefix, }, ) expected = r"Done.\n" assert result.exit_code == 0, result.output assert re.fullmatch(expected, result.output, re.MULTILINE), result.output for (sha1, content) in contents.items(): assert sha1 in objstorages["dst"], sha1 assert objstorages["dst"].get(sha1) == content @_patch_objstorages(["src", "dst"]) def test_replay_content_structured_log( objstorages, storage, kafka_prefix: str, kafka_consumer_group: str, kafka_server: str, caplog, ): kafka_prefix += ".swh.journal.objects" contents = _fill_objstorage_and_kafka(kafka_server, kafka_prefix, objstorages) caplog.set_level(logging.DEBUG, "swh.journal.replay") expected_obj_ids = set(hash_to_hex(sha1) for sha1 in contents) result = invoke( "content-replay", "--stop-after-objects", str(NUM_CONTENTS), journal_config={ "brokers": [kafka_server], "group_id": kafka_consumer_group, "prefix": kafka_prefix, }, ) expected = r"Done.\n" assert result.exit_code == 0, result.output assert re.fullmatch(expected, result.output, re.MULTILINE), result.output copied = set() for record in caplog.records: logtext = record.getMessage() if "copied" in logtext: copied.add(record.args["obj_id"]) assert ( copied == expected_obj_ids ), "Mismatched logging; see captured log output for details." @_patch_objstorages(["src", "dst"]) def test_replay_content_static_group_id( objstorages, storage, kafka_prefix: str, kafka_consumer_group: str, kafka_server: str, caplog, ): kafka_prefix += ".swh.journal.objects" contents = _fill_objstorage_and_kafka(kafka_server, kafka_prefix, objstorages) # Setup log capture to fish the consumer settings out of the log messages caplog.set_level(logging.DEBUG, "swh.journal.client") result = invoke( "content-replay", "--stop-after-objects", str(NUM_CONTENTS), env={"KAFKA_GROUP_INSTANCE_ID": "static-group-instance-id"}, journal_config={ "brokers": [kafka_server], "group_id": kafka_consumer_group, "prefix": kafka_prefix, }, ) expected = r"Done.\n" assert result.exit_code == 0, result.output assert re.fullmatch(expected, result.output, re.MULTILINE), result.output consumer_settings = None for record in caplog.records: if "Consumer settings" in record.message: consumer_settings = record.args break assert consumer_settings is not None, ( "Failed to get consumer settings out of the consumer log. " "See log capture for details." ) assert consumer_settings["group.instance.id"] == "static-group-instance-id" assert consumer_settings["session.timeout.ms"] == 60 * 10 * 1000 assert consumer_settings["max.poll.interval.ms"] == 90 * 10 * 1000 for (sha1, content) in contents.items(): assert sha1 in objstorages["dst"], sha1 assert objstorages["dst"].get(sha1) == content @_patch_objstorages(["src", "dst"]) def test_replay_content_exclude( objstorages, storage, kafka_prefix: str, kafka_consumer_group: str, kafka_server: str, ): kafka_prefix += ".swh.journal.objects" contents = _fill_objstorage_and_kafka(kafka_server, kafka_prefix, objstorages) excluded_contents = list(contents)[0::2] # picking half of them with tempfile.NamedTemporaryFile(mode="w+b") as fd: fd.write(b"".join(sorted(excluded_contents))) fd.seek(0) result = invoke( "content-replay", "--stop-after-objects", str(NUM_CONTENTS), "--exclude-sha1-file", fd.name, journal_config={ "brokers": [kafka_server], "group_id": kafka_consumer_group, "prefix": kafka_prefix, }, ) expected = r"Done.\n" assert result.exit_code == 0, result.output assert re.fullmatch(expected, result.output, re.MULTILINE), result.output for (sha1, content) in contents.items(): if sha1 in excluded_contents: assert sha1 not in objstorages["dst"], sha1 else: assert sha1 in objstorages["dst"], sha1 assert objstorages["dst"].get(sha1) == content NUM_CONTENTS_DST = 5 @_patch_objstorages(["src", "dst"]) @pytest.mark.parametrize( "check_dst,expected_copied,expected_in_dst", [ (True, NUM_CONTENTS - NUM_CONTENTS_DST, NUM_CONTENTS_DST), (False, NUM_CONTENTS, 0), ], ) def test_replay_content_check_dst( objstorages, storage, kafka_prefix: str, kafka_consumer_group: str, kafka_server: str, check_dst: bool, expected_copied: int, expected_in_dst: int, caplog, ): kafka_prefix += ".swh.journal.objects" contents = _fill_objstorage_and_kafka(kafka_server, kafka_prefix, objstorages) for i, (sha1, content) in enumerate(contents.items()): if i >= NUM_CONTENTS_DST: break objstorages["dst"].add(content, obj_id=sha1) caplog.set_level(logging.DEBUG, "swh.journal.replay") result = invoke( "content-replay", "--stop-after-objects", str(NUM_CONTENTS), "--check-dst" if check_dst else "--no-check-dst", journal_config={ "brokers": [kafka_server], "group_id": kafka_consumer_group, "prefix": kafka_prefix, }, ) expected = r"Done.\n" assert result.exit_code == 0, result.output assert re.fullmatch(expected, result.output, re.MULTILINE), result.output copied = 0 in_dst = 0 for record in caplog.records: logtext = record.getMessage() if "copied" in logtext: copied += 1 elif "in dst" in logtext: in_dst += 1 assert ( copied == expected_copied and in_dst == expected_in_dst ), "Unexpected amount of objects copied, see the captured log for details" for (sha1, content) in contents.items(): assert sha1 in objstorages["dst"], sha1 assert objstorages["dst"].get(sha1) == content class FlakyObjStorage(InMemoryObjStorage): def __init__(self, *args, **kwargs): state = kwargs.pop("state") self.failures_left = Counter(kwargs.pop("failures")) super().__init__(*args, **kwargs) if state: self.state = state def flaky_operation(self, op, obj_id): if self.failures_left[op, obj_id] > 0: self.failures_left[op, obj_id] -= 1 raise RuntimeError("Failed %s on %s" % (op, hash_to_hex(obj_id))) def get(self, obj_id): self.flaky_operation("get", obj_id) return super().get(obj_id) def add(self, data, obj_id=None, check_presence=True): self.flaky_operation("add", obj_id) return super().add(data, obj_id=obj_id, check_presence=check_presence) def __contains__(self, obj_id): self.flaky_operation("in", obj_id) return super().__contains__(obj_id) @_patch_objstorages(["src", "dst"]) def test_replay_content_check_dst_retry( objstorages, storage, kafka_prefix: str, kafka_consumer_group: str, kafka_server: str, monkeypatch_retry_sleep, ): kafka_prefix += ".swh.journal.objects" contents = _fill_objstorage_and_kafka(kafka_server, kafka_prefix, objstorages) failures = {} for i, (sha1, content) in enumerate(contents.items()): if i >= NUM_CONTENTS_DST: break objstorages["dst"].add(content, obj_id=sha1) failures["in", sha1] = 1 orig_dst = objstorages["dst"] objstorages["dst"] = FlakyObjStorage(state=orig_dst.state, failures=failures) result = invoke( "content-replay", "--stop-after-objects", str(NUM_CONTENTS), "--check-dst", journal_config={ "brokers": [kafka_server], "group_id": kafka_consumer_group, "prefix": kafka_prefix, }, ) expected = r"Done.\n" assert result.exit_code == 0, result.output assert re.fullmatch(expected, result.output, re.MULTILINE), result.output for (sha1, content) in contents.items(): assert sha1 in objstorages["dst"], sha1 assert objstorages["dst"].get(sha1) == content @_patch_objstorages(["src", "dst"]) def test_replay_content_failed_copy_retry( objstorages, storage, kafka_prefix: str, kafka_consumer_group: str, kafka_server: str, caplog, monkeypatch_retry_sleep, ): kafka_prefix += ".swh.journal.objects" contents = _fill_objstorage_and_kafka(kafka_server, kafka_prefix, objstorages) add_failures = {} get_failures = {} definitely_failed = set() # We want to generate operations failing 1 to CONTENT_REPLAY_RETRIES times. # We generate failures for 2 different operations, get and add. num_retry_contents = 2 * CONTENT_REPLAY_RETRIES assert ( num_retry_contents < NUM_CONTENTS ), "Need to generate more test contents to properly test retry behavior" for i, sha1 in enumerate(contents): if i >= num_retry_contents: break # This generates a number of failures, up to CONTENT_REPLAY_RETRIES num_failures = (i % CONTENT_REPLAY_RETRIES) + 1 # This generates failures of add for the first CONTENT_REPLAY_RETRIES # objects, then failures of get. if i < CONTENT_REPLAY_RETRIES: add_failures["add", sha1] = num_failures else: get_failures["get", sha1] = num_failures # Only contents that have CONTENT_REPLAY_RETRIES or more are # definitely failing if num_failures >= CONTENT_REPLAY_RETRIES: definitely_failed.add(hash_to_hex(sha1)) objstorages["dst"] = FlakyObjStorage( state=objstorages["dst"].state, failures=add_failures, ) objstorages["src"] = FlakyObjStorage( state=objstorages["src"].state, failures=get_failures, ) caplog.set_level(logging.DEBUG, "swh.journal.replay") result = invoke( "content-replay", "--stop-after-objects", str(NUM_CONTENTS), journal_config={ "brokers": [kafka_server], "group_id": kafka_consumer_group, "prefix": kafka_prefix, }, ) expected = r"Done.\n" assert result.exit_code == 0, result.output assert re.fullmatch(expected, result.output, re.MULTILINE), result.output copied = 0 actually_failed = set() for record in caplog.records: logtext = record.getMessage() if "copied" in logtext: copied += 1 elif "Failed operation" in logtext: assert record.levelno == logging.ERROR assert record.args["retries"] == CONTENT_REPLAY_RETRIES actually_failed.add(record.args["obj_id"]) assert ( actually_failed == definitely_failed ), "Unexpected object copy failures; see captured log for details" for (sha1, content) in contents.items(): if hash_to_hex(sha1) in definitely_failed: assert sha1 not in objstorages["dst"] continue assert sha1 in objstorages["dst"], sha1 assert objstorages["dst"].get(sha1) == content @_patch_objstorages(["src", "dst"]) def test_replay_content_objnotfound( objstorages, storage, kafka_prefix: str, kafka_consumer_group: str, kafka_server: str, caplog, ): kafka_prefix += ".swh.journal.objects" contents = _fill_objstorage_and_kafka(kafka_server, kafka_prefix, objstorages) num_contents_deleted = 5 contents_deleted = set() for i, sha1 in enumerate(contents): if i >= num_contents_deleted: break del objstorages["src"].state[sha1] contents_deleted.add(hash_to_hex(sha1)) caplog.set_level(logging.DEBUG, "swh.journal.replay") result = invoke( "content-replay", "--stop-after-objects", str(NUM_CONTENTS), journal_config={ "brokers": [kafka_server], "group_id": kafka_consumer_group, "prefix": kafka_prefix, }, ) expected = r"Done.\n" assert result.exit_code == 0, result.output assert re.fullmatch(expected, result.output, re.MULTILINE), result.output copied = 0 not_in_src = set() for record in caplog.records: logtext = record.getMessage() if "copied" in logtext: copied += 1 elif "object not found" in logtext: # Check that the object id can be recovered from logs assert record.levelno == logging.ERROR not_in_src.add(record.args["obj_id"]) assert ( copied == NUM_CONTENTS - num_contents_deleted ), "Unexpected number of contents copied" assert ( not_in_src == contents_deleted ), "Mismatch between deleted contents and not_in_src logs" for (sha1, content) in contents.items(): if sha1 not in objstorages["src"]: continue assert sha1 in objstorages["dst"], sha1 assert objstorages["dst"].get(sha1) == content diff --git a/swh/journal/tests/test_kafka_writer.py b/swh/journal/tests/test_kafka_writer.py index fa5e7d3..80ebd55 100644 --- a/swh/journal/tests/test_kafka_writer.py +++ b/swh/journal/tests/test_kafka_writer.py @@ -1,151 +1,100 @@ # Copyright (C) 2018-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import pytest from confluent_kafka import Consumer, Producer -from swh.storage import get_storage - -from swh.model.model import Directory, Origin, OriginVisit +from swh.model.model import Directory from swh.journal.tests.journal_data import TEST_OBJECTS from swh.journal.pytest_plugin import consume_messages, assert_all_objects_consumed from swh.journal.writer.kafka import KafkaJournalWriter, KafkaDeliveryError def test_kafka_writer(kafka_prefix: str, kafka_server: str, consumer: Consumer): kafka_prefix += ".swh.journal.objects" writer = KafkaJournalWriter( brokers=[kafka_server], client_id="kafka_writer", prefix=kafka_prefix, ) expected_messages = 0 for object_type, objects in TEST_OBJECTS.items(): writer.write_additions(object_type, objects) expected_messages += len(objects) consumed_messages = consume_messages(consumer, kafka_prefix, expected_messages) assert_all_objects_consumed(consumed_messages) -def test_storage_direct_writer(kafka_prefix: str, kafka_server, consumer: Consumer): - kafka_prefix += ".swh.journal.objects" - - writer_config = { - "cls": "kafka", - "brokers": [kafka_server], - "client_id": "kafka_writer", - "prefix": kafka_prefix, - } - storage_config = { - "cls": "pipeline", - "steps": [{"cls": "memory", "journal_writer": writer_config},], - } - - storage = get_storage(**storage_config) - - expected_messages = 0 - - for object_type, objects in TEST_OBJECTS.items(): - method = getattr(storage, object_type + "_add") - if object_type in ( - "content", - "directory", - "revision", - "release", - "snapshot", - "origin", - ): - method(objects) - expected_messages += len(objects) - elif object_type in ("origin_visit",): - for obj in objects: - assert isinstance(obj, OriginVisit) - storage.origin_add_one(Origin(url=obj.origin)) - visit = method(obj.origin, date=obj.date, type=obj.type) - expected_messages += 1 - - obj_d = obj.to_dict() - for k in ("visit", "origin", "date", "type"): - del obj_d[k] - storage.origin_visit_update(obj.origin, visit.visit, **obj_d) - expected_messages += 1 - else: - assert False, object_type - - consumed_messages = consume_messages(consumer, kafka_prefix, expected_messages) - assert_all_objects_consumed(consumed_messages) - - def test_write_delivery_failure( kafka_prefix: str, kafka_server: str, consumer: Consumer ): class MockKafkaError: """A mocked kafka error""" def str(self): return "Mocked Kafka Error" def name(self): return "SWH_MOCK_ERROR" class KafkaJournalWriterFailDelivery(KafkaJournalWriter): """A journal writer which always fails delivering messages""" def _on_delivery(self, error, message): """Replace the inbound error with a fake delivery error""" super()._on_delivery(MockKafkaError(), message) kafka_prefix += ".swh.journal.objects" writer = KafkaJournalWriterFailDelivery( brokers=[kafka_server], client_id="kafka_writer", prefix=kafka_prefix, ) empty_dir = Directory(entries=[]) with pytest.raises(KafkaDeliveryError) as exc: writer.write_addition("directory", empty_dir) assert "Failed deliveries" in exc.value.message assert len(exc.value.delivery_failures) == 1 delivery_failure = exc.value.delivery_failures[0] assert delivery_failure.key == empty_dir.id assert delivery_failure.code == "SWH_MOCK_ERROR" def test_write_delivery_timeout( kafka_prefix: str, kafka_server: str, consumer: Consumer ): produced = [] class MockProducer(Producer): """A kafka producer which pretends to produce messages, but never sends any delivery acknowledgements""" def produce(self, **kwargs): produced.append(kwargs) kafka_prefix += ".swh.journal.objects" writer = KafkaJournalWriter( brokers=[kafka_server], client_id="kafka_writer", prefix=kafka_prefix, flush_timeout=1, producer_class=MockProducer, ) empty_dir = Directory(entries=[]) with pytest.raises(KafkaDeliveryError) as exc: writer.write_addition("directory", empty_dir) assert len(produced) == 1 assert "timeout" in exc.value.message assert len(exc.value.delivery_failures) == 1 delivery_failure = exc.value.delivery_failures[0] assert delivery_failure.key == empty_dir.id assert delivery_failure.code == "SWH_FLUSH_TIMEOUT" diff --git a/swh/journal/tests/test_replay.py b/swh/journal/tests/test_replay.py index 796236d..c038691 100644 --- a/swh/journal/tests/test_replay.py +++ b/swh/journal/tests/test_replay.py @@ -1,405 +1,33 @@ # Copyright (C) 2019-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information -import datetime -import functools -import logging -import random -from typing import Dict, List -import dateutil -import pytest - -from confluent_kafka import Producer from hypothesis import strategies, given, settings -from swh.storage import get_storage - -from swh.journal.client import JournalClient -from swh.journal.serializers import key_to_kafka, value_to_kafka -from swh.journal.replay import process_replay_objects, is_hash_in_bytearray -from swh.model.hashutil import hash_to_hex -from swh.model.model import Content - -from .conftest import TEST_OBJECT_DICTS, DUPLICATE_CONTENTS -from .utils import MockedJournalClient, MockedKafkaWriter +from swh.journal.replay import is_hash_in_bytearray storage_config = {"cls": "pipeline", "steps": [{"cls": "memory"},]} def make_topic(kafka_prefix: str, object_type: str) -> str: return kafka_prefix + "." + object_type -def test_storage_play( - kafka_prefix: str, kafka_consumer_group: str, kafka_server: str, caplog, -): - """Optimal replayer scenario. - - This: - - writes objects to the topic - - replayer consumes objects from the topic and replay them - - """ - kafka_prefix += ".swh.journal.objects" - - storage = get_storage(**storage_config) - - producer = Producer( - { - "bootstrap.servers": kafka_server, - "client.id": "test producer", - "acks": "all", - } - ) - - now = datetime.datetime.now(tz=datetime.timezone.utc) - - # Fill Kafka - nb_sent = 0 - nb_visits = 0 - for object_type, objects in TEST_OBJECT_DICTS.items(): - topic = make_topic(kafka_prefix, object_type) - for object_ in objects: - key = bytes(random.randint(0, 255) for _ in range(40)) - object_ = object_.copy() - if object_type == "content": - object_["ctime"] = now - elif object_type == "origin_visit": - nb_visits += 1 - object_["visit"] = nb_visits - producer.produce( - topic=topic, key=key_to_kafka(key), value=value_to_kafka(object_), - ) - nb_sent += 1 - - producer.flush() - - caplog.set_level(logging.ERROR, "swh.journal.replay") - # Fill the storage from Kafka - replayer = JournalClient( - brokers=kafka_server, - group_id=kafka_consumer_group, - prefix=kafka_prefix, - stop_after_objects=nb_sent, - ) - worker_fn = functools.partial(process_replay_objects, storage=storage) - nb_inserted = 0 - while nb_inserted < nb_sent: - nb_inserted += replayer.process(worker_fn) - assert nb_sent == nb_inserted - - # Check the objects were actually inserted in the storage - assert TEST_OBJECT_DICTS["revision"] == list( - storage.revision_get([rev["id"] for rev in TEST_OBJECT_DICTS["revision"]]) - ) - assert TEST_OBJECT_DICTS["release"] == list( - storage.release_get([rel["id"] for rel in TEST_OBJECT_DICTS["release"]]) - ) - - origins = list(storage.origin_get([orig for orig in TEST_OBJECT_DICTS["origin"]])) - assert TEST_OBJECT_DICTS["origin"] == [{"url": orig["url"]} for orig in origins] - for origin in origins: - origin_url = origin["url"] - expected_visits = [ - { - **visit, - "origin": origin_url, - "date": dateutil.parser.parse(visit["date"]), - } - for visit in TEST_OBJECT_DICTS["origin_visit"] - if visit["origin"] == origin["url"] - ] - actual_visits = list(storage.origin_visit_get(origin_url)) - for visit in actual_visits: - del visit["visit"] # opaque identifier - assert expected_visits == actual_visits - - input_contents = TEST_OBJECT_DICTS["content"] - contents = storage.content_get_metadata([cont["sha1"] for cont in input_contents]) - assert len(contents) == len(input_contents) - assert contents == {cont["sha1"]: [cont] for cont in input_contents} - - collision = 0 - for record in caplog.records: - logtext = record.getMessage() - if "Colliding contents:" in logtext: - collision += 1 - - assert collision == 0, "No collision should be detected" - - -def test_storage_play_with_collision( - kafka_prefix: str, kafka_consumer_group: str, kafka_server: str, caplog, -): - """Another replayer scenario with collisions. - - This: - - writes objects to the topic, including colliding contents - - replayer consumes objects from the topic and replay them - - This drops the colliding contents from the replay when detected - - """ - kafka_prefix += ".swh.journal.objects" - - storage = get_storage(**storage_config) - - producer = Producer( - { - "bootstrap.servers": kafka_server, - "client.id": "test producer", - "enable.idempotence": "true", - } - ) - - now = datetime.datetime.now(tz=datetime.timezone.utc) - - # Fill Kafka - nb_sent = 0 - nb_visits = 0 - for object_type, objects in TEST_OBJECT_DICTS.items(): - topic = make_topic(kafka_prefix, object_type) - for object_ in objects: - key = bytes(random.randint(0, 255) for _ in range(40)) - object_ = object_.copy() - if object_type == "content": - object_["ctime"] = now - elif object_type == "origin_visit": - nb_visits += 1 - object_["visit"] = nb_visits - producer.produce( - topic=topic, key=key_to_kafka(key), value=value_to_kafka(object_), - ) - nb_sent += 1 - - # Create collision in input data - # They are not written in the destination - for content in DUPLICATE_CONTENTS: - topic = make_topic(kafka_prefix, "content") - producer.produce( - topic=topic, key=key_to_kafka(key), value=value_to_kafka(content), - ) - - nb_sent += 1 - - producer.flush() - - caplog.set_level(logging.ERROR, "swh.journal.replay") - # Fill the storage from Kafka - replayer = JournalClient( - brokers=kafka_server, - group_id=kafka_consumer_group, - prefix=kafka_prefix, - stop_after_objects=nb_sent, - ) - worker_fn = functools.partial(process_replay_objects, storage=storage) - nb_inserted = 0 - while nb_inserted < nb_sent: - nb_inserted += replayer.process(worker_fn) - assert nb_sent == nb_inserted - - # Check the objects were actually inserted in the storage - assert TEST_OBJECT_DICTS["revision"] == list( - storage.revision_get([rev["id"] for rev in TEST_OBJECT_DICTS["revision"]]) - ) - assert TEST_OBJECT_DICTS["release"] == list( - storage.release_get([rel["id"] for rel in TEST_OBJECT_DICTS["release"]]) - ) - - origins = list(storage.origin_get([orig for orig in TEST_OBJECT_DICTS["origin"]])) - assert TEST_OBJECT_DICTS["origin"] == [{"url": orig["url"]} for orig in origins] - for origin in origins: - origin_url = origin["url"] - expected_visits = [ - { - **visit, - "origin": origin_url, - "date": dateutil.parser.parse(visit["date"]), - } - for visit in TEST_OBJECT_DICTS["origin_visit"] - if visit["origin"] == origin["url"] - ] - actual_visits = list(storage.origin_visit_get(origin_url)) - for visit in actual_visits: - del visit["visit"] # opaque identifier - assert expected_visits == actual_visits - - input_contents = TEST_OBJECT_DICTS["content"] - contents = storage.content_get_metadata([cont["sha1"] for cont in input_contents]) - assert len(contents) == len(input_contents) - assert contents == {cont["sha1"]: [cont] for cont in input_contents} - - nb_collisions = 0 - - actual_collision: Dict - for record in caplog.records: - logtext = record.getMessage() - if "Collision detected:" in logtext: - nb_collisions += 1 - actual_collision = record.args["collision"] - - assert nb_collisions == 1, "1 collision should be detected" - - algo = "sha1" - assert actual_collision["algo"] == algo - expected_colliding_hash = hash_to_hex(DUPLICATE_CONTENTS[0][algo]) - assert actual_collision["hash"] == expected_colliding_hash - - actual_colliding_hashes = actual_collision["objects"] - assert len(actual_colliding_hashes) == len(DUPLICATE_CONTENTS) - for content in DUPLICATE_CONTENTS: - expected_content_hashes = { - k: hash_to_hex(v) for k, v in Content.from_dict(content).hashes().items() - } - assert expected_content_hashes in actual_colliding_hashes - - -def _test_write_replay_origin_visit(visits: List[Dict]): - """Helper function to write tests for origin_visit. - - Each visit (a dict) given in the 'visits' argument will be sent to - a (mocked) kafka queue, which a in-memory-storage backed replayer is - listening to. - - Check that corresponding origin visits entities are present in the storage - and have correct values if they are not skipped. - - """ - queue: List = [] - replayer = MockedJournalClient(queue) - writer = MockedKafkaWriter(queue) - - # Note that flipping the order of these two insertions will crash - # the test, because the legacy origin_format does not allow to create - # the origin when needed (type is missing) - writer.send( - "origin", - "foo", - { - "url": "http://example.com/", - "type": "git", # test the legacy origin format is accepted - }, - ) - for visit in visits: - writer.send("origin_visit", "foo", visit) - - queue_size = len(queue) - assert replayer.stop_after_objects is None - replayer.stop_after_objects = queue_size - - storage = get_storage(**storage_config) - worker_fn = functools.partial(process_replay_objects, storage=storage) - - replayer.process(worker_fn) - - actual_visits = list(storage.origin_visit_get("http://example.com/")) - - assert len(actual_visits) == len(visits), actual_visits - - for vin, vout in zip(visits, actual_visits): - vin = vin.copy() - vout = vout.copy() - assert vout.pop("origin") == "http://example.com/" - vin.pop("origin") - vin.setdefault("type", "git") - vin.setdefault("metadata", None) - assert vin == vout - - -def test_write_replay_origin_visit(): - """Test origin_visit when the 'origin' is just a string.""" - now = datetime.datetime.now() - visits = [ - { - "visit": 1, - "origin": "http://example.com/", - "date": now, - "type": "git", - "status": "partial", - "snapshot": None, - } - ] - _test_write_replay_origin_visit(visits) - - -def test_write_replay_legacy_origin_visit1(): - """Origin_visit with no types should make the replayer crash - - We expect the journal's origin_visit topic to no longer reference such - visits. If it does, the replayer must crash so we can fix the journal's - topic. - - """ - now = datetime.datetime.now() - visit = { - "visit": 1, - "origin": "http://example.com/", - "date": now, - "status": "partial", - "snapshot": None, - } - now2 = datetime.datetime.now() - visit2 = { - "visit": 2, - "origin": {"url": "http://example.com/"}, - "date": now2, - "status": "partial", - "snapshot": None, - } - - for origin_visit in [visit, visit2]: - with pytest.raises(ValueError, match="Old origin visit format"): - _test_write_replay_origin_visit([origin_visit]) - - -def test_write_replay_legacy_origin_visit2(): - """Test origin_visit when 'type' is missing from the visit, but not - from the origin.""" - now = datetime.datetime.now() - visits = [ - { - "visit": 1, - "origin": {"url": "http://example.com/", "type": "git",}, - "date": now, - "type": "git", - "status": "partial", - "snapshot": None, - } - ] - _test_write_replay_origin_visit(visits) - - -def test_write_replay_legacy_origin_visit3(): - """Test origin_visit when the origin is a dict""" - now = datetime.datetime.now() - visits = [ - { - "visit": 1, - "origin": {"url": "http://example.com/",}, - "date": now, - "type": "git", - "status": "partial", - "snapshot": None, - } - ] - _test_write_replay_origin_visit(visits) - - hash_strategy = strategies.binary(min_size=20, max_size=20) @settings(max_examples=500) @given( strategies.sets(hash_strategy, min_size=0, max_size=500), strategies.sets(hash_strategy, min_size=10), ) def test_is_hash_in_bytearray(haystack, needles): array = b"".join(sorted(haystack)) needles |= haystack # Exhaustively test for all objects in the array for needle in needles: assert is_hash_in_bytearray(needle, array, len(haystack)) == ( needle in haystack ) diff --git a/swh/journal/tests/test_write_replay.py b/swh/journal/tests/test_write_replay.py index 3c4ffe6..24ead74 100644 --- a/swh/journal/tests/test_write_replay.py +++ b/swh/journal/tests/test_write_replay.py @@ -1,165 +1,54 @@ # Copyright (C) 2019-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import functools -from unittest.mock import patch -import attr from hypothesis import given, settings, HealthCheck from hypothesis.strategies import lists from swh.model.hypothesis_strategies import present_contents -from swh.model.model import Origin -from swh.storage import get_storage -from swh.storage.exc import HashCollision +from swh.objstorage import get_objstorage -from swh.journal.replay import ( - process_replay_objects, - process_replay_objects_content, - object_converter_fn, -) +from swh.journal.replay import process_replay_objects_content from .utils import MockedJournalClient, MockedKafkaWriter -from .conftest import objects_d - -storage_config = { - "cls": "memory", - "journal_writer": {"cls": "memory"}, -} - - -def empty_person_name_email(rev_or_rel): - """Empties the 'name' and 'email' fields of the author/committer fields - of a revision or release; leaving only the fullname.""" - if getattr(rev_or_rel, "author", None): - rev_or_rel = attr.evolve( - rev_or_rel, author=attr.evolve(rev_or_rel.author, name=b"", email=b"",) - ) - - if getattr(rev_or_rel, "committer", None): - rev_or_rel = attr.evolve( - rev_or_rel, - committer=attr.evolve(rev_or_rel.committer, name=b"", email=b"",), - ) - - return rev_or_rel - - -@given(lists(objects_d(), min_size=1)) -@settings(suppress_health_check=[HealthCheck.too_slow]) -def test_write_replay_same_order_batches(objects): - queue = [] - replayer = MockedJournalClient(queue) - - with patch( - "swh.journal.writer.inmemory.InMemoryJournalWriter", - return_value=MockedKafkaWriter(queue), - ): - storage1 = get_storage(**storage_config) - - # Write objects to storage1 - for (obj_type, obj) in objects: - if obj_type == "content" and obj.get("status") == "absent": - obj_type = "skipped_content" - - obj = object_converter_fn[obj_type](obj) - - if obj_type == "origin_visit": - storage1.origin_add_one(Origin(url=obj.origin)) - storage1.origin_visit_upsert([obj]) - else: - method = getattr(storage1, obj_type + "_add") - try: - method([obj]) - except HashCollision: - pass - - # Bail out early if we didn't insert any relevant objects... - queue_size = len(queue) - assert queue_size != 0, "No test objects found; hypothesis strategy bug?" - - assert replayer.stop_after_objects is None - replayer.stop_after_objects = queue_size - - storage2 = get_storage(**storage_config) - worker_fn = functools.partial(process_replay_objects, storage=storage2) - - replayer.process(worker_fn) - - assert replayer.consumer.committed - - for attr_name in ( - "_contents", - "_directories", - "_snapshots", - "_origin_visits", - "_origins", - ): - assert getattr(storage1, attr_name) == getattr(storage2, attr_name), attr_name - - # When hypothesis generates a revision and a release with same - # author (or committer) fullname but different name or email, then - # the storage will use the first name/email it sees. - # This first one will be either the one from the revision or the release, - # and since there is no order guarantees, storage2 has 1/2 chance of - # not seeing the same order as storage1, therefore we need to strip - # them out before comparing. - for attr_name in ("_revisions", "_releases"): - items1 = { - k: empty_person_name_email(v) - for (k, v) in getattr(storage1, attr_name).items() - } - items2 = { - k: empty_person_name_email(v) - for (k, v) in getattr(storage2, attr_name).items() - } - assert items1 == items2, attr_name - - -# TODO: add test for hash collision @given(lists(present_contents(), min_size=1)) @settings(suppress_health_check=[HealthCheck.too_slow]) def test_write_replay_content(objects): queue = [] replayer = MockedJournalClient(queue) + writer = MockedKafkaWriter(queue) - with patch( - "swh.journal.writer.inmemory.InMemoryJournalWriter", - return_value=MockedKafkaWriter(queue), - ): - storage1 = get_storage(**storage_config) + objstorage1 = get_objstorage(cls="memory", args={}) + objstorage2 = get_objstorage(cls="memory", args={}) contents = [] for obj in objects: - storage1.content_add([obj]) + objstorage1.add(obj.data) contents.append(obj) + writer.write_addition("content", obj) # Bail out early if we didn't insert any relevant objects... queue_size = len(queue) assert queue_size != 0, "No test objects found; hypothesis strategy bug?" assert replayer.stop_after_objects is None replayer.stop_after_objects = queue_size - storage2 = get_storage(**storage_config) - - objstorage1 = storage1.objstorage.objstorage - objstorage2 = storage2.objstorage.objstorage - worker_fn = functools.partial( process_replay_objects_content, src=objstorage1, dst=objstorage2 ) replayer.process(worker_fn) # only content with status visible will be copied in storage2 expected_objstorage_state = { c.sha1: c.data for c in contents if c.status == "visible" } assert expected_objstorage_state == objstorage2.state