diff --git a/PKG-INFO b/PKG-INFO index 226048f..8e5fddd 100644 --- a/PKG-INFO +++ b/PKG-INFO @@ -1,71 +1,72 @@ Metadata-Version: 2.1 Name: swh.journal -Version: 0.0.31 +Version: 0.0.32 Summary: Software Heritage Journal utilities Home-page: https://forge.softwareheritage.org/diffusion/DJNL/ Author: Software Heritage developers Author-email: swh-devel@inria.fr License: UNKNOWN Project-URL: Bug Reports, https://forge.softwareheritage.org/maniphest Project-URL: Funding, https://www.softwareheritage.org/donate Project-URL: Source, https://forge.softwareheritage.org/source/swh-journal +Project-URL: Documentation, https://docs.softwareheritage.org/devel/swh-journal/ Description: swh-journal =========== Persistent logger of changes to the archive, with publish-subscribe support. See the [documentation](https://docs.softwareheritage.org/devel/swh-journal/index.html#software-heritage-journal) for more details. # Local test As a pre-requisite, you need a kakfa installation path. The following target will take care of this: ``` make install ``` Then, provided you are in the right virtual environment as described in the [swh getting-started](https://docs.softwareheritage.org/devel/developer-setup.html#developer-setup): ``` pytest ``` or: ``` tox ``` # Running ## publisher Command: ``` $ swh-journal --config-file ~/.config/swh/journal/publisher.yml \ publisher ``` # Auto-completion To have the completion, add the following in your ~/.virtualenvs/swh/bin/postactivate: ``` eval "$(_SWH_JOURNAL_COMPLETE=$autocomplete_cmd swh-journal)" ``` Platform: UNKNOWN Classifier: Programming Language :: Python :: 3 Classifier: Intended Audience :: Developers Classifier: License :: OSI Approved :: GNU General Public License v3 (GPLv3) Classifier: Operating System :: OS Independent Classifier: Development Status :: 5 - Production/Stable Requires-Python: >=3.7 Description-Content-Type: text/markdown Provides-Extra: testing diff --git a/requirements-swh.txt b/requirements-swh.txt index ba52a6d..8f55b09 100644 --- a/requirements-swh.txt +++ b/requirements-swh.txt @@ -1,3 +1,2 @@ swh.core[db,http] >= 0.0.60 swh.model >= 0.0.61 -swh.storage >= 0.0.181 diff --git a/requirements-test.txt b/requirements-test.txt index c727717..9b9e5f9 100644 --- a/requirements-test.txt +++ b/requirements-test.txt @@ -1,3 +1,2 @@ pytest -swh.model >= 0.0.34 hypothesis diff --git a/setup.py b/setup.py index c4dab38..309309c 100755 --- a/setup.py +++ b/setup.py @@ -1,74 +1,71 @@ #!/usr/bin/env python3 # Copyright (C) 2015-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from setuptools import setup, find_packages from os import path from io import open here = path.abspath(path.dirname(__file__)) # Get the long description from the README file with open(path.join(here, "README.md"), encoding="utf-8") as f: long_description = f.read() def parse_requirements(name=None): if name: reqf = "requirements-%s.txt" % name else: reqf = "requirements.txt" requirements = [] if not path.exists(reqf): return requirements with open(reqf) as f: for line in f.readlines(): line = line.strip() if not line or line.startswith("#"): continue requirements.append(line) return requirements setup( name="swh.journal", description="Software Heritage Journal utilities", long_description=long_description, long_description_content_type="text/markdown", python_requires=">=3.7", author="Software Heritage developers", author_email="swh-devel@inria.fr", url="https://forge.softwareheritage.org/diffusion/DJNL/", packages=find_packages(), scripts=[], entry_points=""" - [console_scripts] - swh-journal=swh.journal.cli:main - [swh.cli.subcommands] - journal=swh.journal.cli:cli [pytest11] pytest_swh_journal = swh.journal.pytest_plugin """, install_requires=parse_requirements() + parse_requirements("swh"), setup_requires=["vcversioner"], extras_require={"testing": parse_requirements("test")}, vcversioner={}, include_package_data=True, classifiers=[ "Programming Language :: Python :: 3", "Intended Audience :: Developers", "License :: OSI Approved :: GNU General Public License v3 (GPLv3)", "Operating System :: OS Independent", "Development Status :: 5 - Production/Stable", ], project_urls={ "Bug Reports": "https://forge.softwareheritage.org/maniphest", "Funding": "https://www.softwareheritage.org/donate", "Source": "https://forge.softwareheritage.org/source/swh-journal", + "Documentation": "https://docs.softwareheritage.org/devel/swh-journal/", }, ) diff --git a/swh.journal.egg-info/PKG-INFO b/swh.journal.egg-info/PKG-INFO index 226048f..8e5fddd 100644 --- a/swh.journal.egg-info/PKG-INFO +++ b/swh.journal.egg-info/PKG-INFO @@ -1,71 +1,72 @@ Metadata-Version: 2.1 Name: swh.journal -Version: 0.0.31 +Version: 0.0.32 Summary: Software Heritage Journal utilities Home-page: https://forge.softwareheritage.org/diffusion/DJNL/ Author: Software Heritage developers Author-email: swh-devel@inria.fr License: UNKNOWN Project-URL: Bug Reports, https://forge.softwareheritage.org/maniphest Project-URL: Funding, https://www.softwareheritage.org/donate Project-URL: Source, https://forge.softwareheritage.org/source/swh-journal +Project-URL: Documentation, https://docs.softwareheritage.org/devel/swh-journal/ Description: swh-journal =========== Persistent logger of changes to the archive, with publish-subscribe support. See the [documentation](https://docs.softwareheritage.org/devel/swh-journal/index.html#software-heritage-journal) for more details. # Local test As a pre-requisite, you need a kakfa installation path. The following target will take care of this: ``` make install ``` Then, provided you are in the right virtual environment as described in the [swh getting-started](https://docs.softwareheritage.org/devel/developer-setup.html#developer-setup): ``` pytest ``` or: ``` tox ``` # Running ## publisher Command: ``` $ swh-journal --config-file ~/.config/swh/journal/publisher.yml \ publisher ``` # Auto-completion To have the completion, add the following in your ~/.virtualenvs/swh/bin/postactivate: ``` eval "$(_SWH_JOURNAL_COMPLETE=$autocomplete_cmd swh-journal)" ``` Platform: UNKNOWN Classifier: Programming Language :: Python :: 3 Classifier: Intended Audience :: Developers Classifier: License :: OSI Approved :: GNU General Public License v3 (GPLv3) Classifier: Operating System :: OS Independent Classifier: Development Status :: 5 - Production/Stable Requires-Python: >=3.7 Description-Content-Type: text/markdown Provides-Extra: testing diff --git a/swh.journal.egg-info/SOURCES.txt b/swh.journal.egg-info/SOURCES.txt index 219568a..1225d0e 100644 --- a/swh.journal.egg-info/SOURCES.txt +++ b/swh.journal.egg-info/SOURCES.txt @@ -1,42 +1,34 @@ MANIFEST.in Makefile README.md pyproject.toml requirements-swh.txt requirements-test.txt requirements.txt setup.cfg setup.py version.txt swh/__init__.py swh.journal.egg-info/PKG-INFO swh.journal.egg-info/SOURCES.txt swh.journal.egg-info/dependency_links.txt swh.journal.egg-info/entry_points.txt swh.journal.egg-info/requires.txt swh.journal.egg-info/top_level.txt swh/journal/__init__.py -swh/journal/backfill.py swh/journal/cli.py swh/journal/client.py -swh/journal/direct_writer.py -swh/journal/fixer.py swh/journal/py.typed swh/journal/pytest_plugin.py -swh/journal/replay.py swh/journal/serializers.py swh/journal/tests/__init__.py swh/journal/tests/conftest.py swh/journal/tests/journal_data.py swh/journal/tests/log4j.properties -swh/journal/tests/test_backfill.py -swh/journal/tests/test_cli.py swh/journal/tests/test_client.py swh/journal/tests/test_kafka_writer.py -swh/journal/tests/test_replay.py swh/journal/tests/test_serializers.py -swh/journal/tests/test_write_replay.py swh/journal/tests/utils.py swh/journal/writer/__init__.py swh/journal/writer/inmemory.py swh/journal/writer/kafka.py \ No newline at end of file diff --git a/swh.journal.egg-info/entry_points.txt b/swh.journal.egg-info/entry_points.txt index e00a011..467e9f5 100644 --- a/swh.journal.egg-info/entry_points.txt +++ b/swh.journal.egg-info/entry_points.txt @@ -1,8 +1,4 @@ - [console_scripts] - swh-journal=swh.journal.cli:main - [swh.cli.subcommands] - journal=swh.journal.cli:cli [pytest11] pytest_swh_journal = swh.journal.pytest_plugin \ No newline at end of file diff --git a/swh.journal.egg-info/requires.txt b/swh.journal.egg-info/requires.txt index 276c16f..7e018c5 100644 --- a/swh.journal.egg-info/requires.txt +++ b/swh.journal.egg-info/requires.txt @@ -1,12 +1,10 @@ confluent-kafka msgpack tenacity vcversioner swh.core[db,http]>=0.0.60 swh.model>=0.0.61 -swh.storage>=0.0.181 [testing] pytest -swh.model>=0.0.34 hypothesis diff --git a/swh/journal/backfill.py b/swh/journal/backfill.py deleted file mode 100644 index 902b897..0000000 --- a/swh/journal/backfill.py +++ /dev/null @@ -1,489 +0,0 @@ -# Copyright (C) 2017-2019 The Software Heritage developers -# See the AUTHORS file at the top-level directory of this distribution -# License: GNU General Public License version 3, or any later version -# See top-level LICENSE file for more information - -"""Module defining journal backfiller classes. - -Those backfiller goal is to produce back part or all of the objects -from the storage to the journal topics - -At the moment, a first naive implementation is the -JournalBackfiller. It simply reads the objects from the -storage and sends every object identifier back to the journal. - -""" - -import logging - -from .writer.kafka import KafkaJournalWriter - -from swh.core.db import BaseDb -from swh.storage.converters import db_to_release, db_to_revision - - -logger = logging.getLogger(__name__) - -PARTITION_KEY = { - "content": "sha1", - "skipped_content": None, # unused - "directory": "id", - "revision": "revision.id", - "release": "release.id", - "snapshot": "id", - "origin": "id", - "origin_visit": "origin_visit.origin", -} - -COLUMNS = { - "content": [ - "sha1", - "sha1_git", - "sha256", - "blake2s256", - "length", - "status", - "ctime", - ], - "skipped_content": [ - "sha1", - "sha1_git", - "sha256", - "blake2s256", - "length", - "ctime", - "status", - "reason", - ], - "directory": ["id", "dir_entries", "file_entries", "rev_entries"], - "revision": [ - ("revision.id", "id"), - "date", - "date_offset", - "committer_date", - "committer_date_offset", - "type", - "directory", - "message", - "synthetic", - "metadata", - "date_neg_utc_offset", - "committer_date_neg_utc_offset", - ( - "array(select parent_id::bytea from revision_history rh " - "where rh.id = revision.id order by rh.parent_rank asc)", - "parents", - ), - ("a.id", "author_id"), - ("a.name", "author_name"), - ("a.email", "author_email"), - ("a.fullname", "author_fullname"), - ("c.id", "committer_id"), - ("c.name", "committer_name"), - ("c.email", "committer_email"), - ("c.fullname", "committer_fullname"), - ], - "release": [ - ("release.id", "id"), - "date", - "date_offset", - "comment", - ("release.name", "name"), - "synthetic", - "date_neg_utc_offset", - "target", - "target_type", - ("a.id", "author_id"), - ("a.name", "author_name"), - ("a.email", "author_email"), - ("a.fullname", "author_fullname"), - ], - "snapshot": ["id", "object_id"], - "origin": ["type", "url"], - "origin_visit": [ - "visit", - "origin.type", - "origin_visit.type", - "url", - "date", - "snapshot", - "status", - "metadata", - ], -} - - -JOINS = { - "release": ["person a on release.author=a.id"], - "revision": [ - "person a on revision.author=a.id", - "person c on revision.committer=c.id", - ], - "origin_visit": ["origin on origin_visit.origin=origin.id"], -} - - -def directory_converter(db, directory): - """Convert directory from the flat representation to swh model - compatible objects. - - """ - columns = ["target", "name", "perms"] - query_template = """ - select %(columns)s - from directory_entry_%(type)s - where id in %%s - """ - - types = ["file", "dir", "rev"] - - entries = [] - with db.cursor() as cur: - for type in types: - ids = directory.pop("%s_entries" % type) - if not ids: - continue - query = query_template % { - "columns": ",".join(columns), - "type": type, - } - cur.execute(query, (tuple(ids),)) - for row in cur: - entry = dict(zip(columns, row)) - entry["type"] = type - entries.append(entry) - - directory["entries"] = entries - return directory - - -def revision_converter(db, revision): - """Convert revision from the flat representation to swh model - compatible objects. - - """ - return db_to_revision(revision) - - -def release_converter(db, release): - """Convert release from the flat representation to swh model - compatible objects. - - """ - release = db_to_release(release) - if "author" in release and release["author"]: - del release["author"]["id"] - return release - - -def snapshot_converter(db, snapshot): - """Convert snapshot from the flat representation to swh model - compatible objects. - - """ - columns = ["name", "target", "target_type"] - query = """ - select %s - from snapshot_branches sbs - inner join snapshot_branch sb on sb.object_id=sbs.branch_id - where sbs.snapshot_id=%%s - """ % ", ".join( - columns - ) - with db.cursor() as cur: - cur.execute(query, (snapshot.pop("object_id"),)) - branches = {} - for name, *row in cur: - branch = dict(zip(columns[1:], row)) - if not branch["target"] and not branch["target_type"]: - branch = None - branches[name] = branch - - snapshot["branches"] = branches - return snapshot - - -def origin_visit_converter(db, origin_visit): - origin = { - "type": origin_visit.pop("origin.type"), - "url": origin_visit.pop("url"), - } - origin_visit["origin"] = origin - origin_visit["type"] = origin_visit.pop("origin_visit.type") - return origin_visit - - -CONVERTERS = { - "directory": directory_converter, - "revision": revision_converter, - "release": release_converter, - "snapshot": snapshot_converter, - "origin_visit": origin_visit_converter, -} - - -def object_to_offset(object_id, numbits): - """Compute the index of the range containing object id, when dividing - space into 2^numbits. - - Args: - object_id (str): The hex representation of object_id - numbits (int): Number of bits in which we divide input space - - Returns: - The index of the range containing object id - - """ - q, r = divmod(numbits, 8) - length = q + (r != 0) - shift_bits = 8 - r if r else 0 - - truncated_id = object_id[: length * 2] - if len(truncated_id) < length * 2: - truncated_id += "0" * (length * 2 - len(truncated_id)) - - truncated_id_bytes = bytes.fromhex(truncated_id) - return int.from_bytes(truncated_id_bytes, byteorder="big") >> shift_bits - - -def byte_ranges(numbits, start_object=None, end_object=None): - """Generate start/end pairs of bytes spanning numbits bits and - constrained by optional start_object and end_object. - - Args: - numbits (int): Number of bits in which we divide input space - start_object (str): Hex object id contained in the first range - returned - end_object (str): Hex object id contained in the last range - returned - - Yields: - 2^numbits pairs of bytes - - """ - q, r = divmod(numbits, 8) - length = q + (r != 0) - shift_bits = 8 - r if r else 0 - - def to_bytes(i): - return int.to_bytes(i << shift_bits, length=length, byteorder="big") - - start_offset = 0 - end_offset = 1 << numbits - - if start_object is not None: - start_offset = object_to_offset(start_object, numbits) - if end_object is not None: - end_offset = object_to_offset(end_object, numbits) + 1 - - for start in range(start_offset, end_offset): - end = start + 1 - - if start == 0: - yield None, to_bytes(end) - elif end == 1 << numbits: - yield to_bytes(start), None - else: - yield to_bytes(start), to_bytes(end) - - -def integer_ranges(start, end, block_size=1000): - for start in range(start, end, block_size): - if start == 0: - yield None, block_size - elif start + block_size > end: - yield start, end - else: - yield start, start + block_size - - -RANGE_GENERATORS = { - "content": lambda start, end: byte_ranges(24, start, end), - "skipped_content": lambda start, end: [(None, None)], - "directory": lambda start, end: byte_ranges(24, start, end), - "revision": lambda start, end: byte_ranges(24, start, end), - "release": lambda start, end: byte_ranges(16, start, end), - "snapshot": lambda start, end: byte_ranges(16, start, end), - "origin": integer_ranges, - "origin_visit": integer_ranges, -} - - -def compute_query(obj_type, start, end): - columns = COLUMNS.get(obj_type) - join_specs = JOINS.get(obj_type, []) - join_clause = "\n".join("left join %s" % clause for clause in join_specs) - - where = [] - where_args = [] - if start: - where.append("%(keys)s >= %%s") - where_args.append(start) - if end: - where.append("%(keys)s < %%s") - where_args.append(end) - - where_clause = "" - if where: - where_clause = ("where " + " and ".join(where)) % { - "keys": "(%s)" % PARTITION_KEY[obj_type] - } - - column_specs = [] - column_aliases = [] - for column in columns: - if isinstance(column, str): - column_specs.append(column) - column_aliases.append(column) - else: - column_specs.append("%s as %s" % column) - column_aliases.append(column[1]) - - query = """ -select %(columns)s -from %(table)s -%(join)s -%(where)s - """ % { - "columns": ",".join(column_specs), - "table": obj_type, - "join": join_clause, - "where": where_clause, - } - - return query, where_args, column_aliases - - -def fetch(db, obj_type, start, end): - """Fetch all obj_type's identifiers from db. - - This opens one connection, stream objects and when done, close - the connection. - - Args: - db (BaseDb): Db connection object - obj_type (str): Object type - start (Union[bytes|Tuple]): Range start identifier - end (Union[bytes|Tuple]): Range end identifier - - Raises: - ValueError if obj_type is not supported - - Yields: - Objects in the given range - - """ - query, where_args, column_aliases = compute_query(obj_type, start, end) - converter = CONVERTERS.get(obj_type) - with db.cursor() as cursor: - logger.debug("Fetching data for table %s", obj_type) - logger.debug("query: %s %s", query, where_args) - cursor.execute(query, where_args) - for row in cursor: - record = dict(zip(column_aliases, row)) - if converter: - record = converter(db, record) - - logger.debug("record: %s" % record) - yield record - - -def _format_range_bound(bound): - if isinstance(bound, bytes): - return bound.hex() - else: - return str(bound) - - -MANDATORY_KEYS = ["brokers", "storage_dbconn", "prefix", "client_id"] - - -class JournalBackfiller: - """Class in charge of reading the storage's objects and sends those - back to the journal's topics. - - This is designed to be run periodically. - - """ - - def __init__(self, config=None): - self.config = config - self.check_config(config) - - def check_config(self, config): - missing_keys = [] - for key in MANDATORY_KEYS: - if not config.get(key): - missing_keys.append(key) - - if missing_keys: - raise ValueError( - "Configuration error: The following keys must be" - " provided: %s" % (",".join(missing_keys),) - ) - - def parse_arguments(self, object_type, start_object, end_object): - """Parse arguments - - Raises: - ValueError for unsupported object type - ValueError if object ids are not parseable - - Returns: - Parsed start and end object ids - - """ - if object_type not in COLUMNS: - raise ValueError( - "Object type %s is not supported. " - "The only possible values are %s" - % (object_type, ", ".join(COLUMNS.keys())) - ) - - if object_type in ["origin", "origin_visit"]: - if start_object: - start_object = int(start_object) - else: - start_object = 0 - if end_object: - end_object = int(end_object) - else: - end_object = 100 * 1000 * 1000 # hard-coded limit - - return start_object, end_object - - def run(self, object_type, start_object, end_object, dry_run=False): - """Reads storage's subscribed object types and send them to the - journal's reading topic. - - """ - start_object, end_object = self.parse_arguments( - object_type, start_object, end_object - ) - - db = BaseDb.connect(self.config["storage_dbconn"]) - writer = KafkaJournalWriter( - brokers=self.config["brokers"], - prefix=self.config["prefix"], - client_id=self.config["client_id"], - ) - for range_start, range_end in RANGE_GENERATORS[object_type]( - start_object, end_object - ): - logger.info( - "Processing %s range %s to %s", - object_type, - _format_range_bound(range_start), - _format_range_bound(range_end), - ) - - for obj in fetch(db, object_type, start=range_start, end=range_end,): - if dry_run: - continue - writer.write_addition(object_type=object_type, object_=obj) - - writer.producer.flush() - - -if __name__ == "__main__": - print('Please use the "swh-journal backfiller run" command') diff --git a/swh/journal/cli.py b/swh/journal/cli.py index d88bfa8..dcbc8a8 100644 --- a/swh/journal/cli.py +++ b/swh/journal/cli.py @@ -1,265 +1,93 @@ # Copyright (C) 2016-2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information -import functools import logging -import mmap -import os -import warnings import click -try: - from systemd.daemon import notify -except ImportError: - notify = None - -from swh.core import config from swh.core.cli import CONTEXT_SETTINGS -from swh.model.model import SHA1_SIZE -from swh.storage import get_storage -from swh.objstorage import get_objstorage - -from swh.journal.client import get_journal_client as get_client -from swh.journal.replay import is_hash_in_bytearray -from swh.journal.replay import process_replay_objects -from swh.journal.replay import process_replay_objects_content -from swh.journal.backfill import JournalBackfiller @click.group(name="journal", context_settings=CONTEXT_SETTINGS) @click.option( "--config-file", "-C", default=None, type=click.Path(exists=True, dir_okay=False,), help="Configuration file.", ) @click.pass_context def cli(ctx, config_file): - """Software Heritage Journal tools. - - The journal is a persistent logger of changes to the archive, with - publish-subscribe support. - + """DEPRECATED Software Heritage Journal tools. """ - if not config_file: - config_file = os.environ.get("SWH_CONFIG_FILENAME") - - if config_file: - if not os.path.exists(config_file): - raise ValueError("%s does not exist" % config_file) - conf = config.read(config_file) - else: - conf = {} - - ctx.ensure_object(dict) - - ctx.obj["config"] = conf - - -def get_journal_client(ctx, **kwargs): - conf = ctx.obj["config"].copy() - if "journal" in conf: - warnings.warn( - "Journal client configuration should now be under the " - "`journal_client` field and have a `cls` argument.", - DeprecationWarning, - ) - conf["journal_client"] = {"cls": "kafka", **conf.pop("journal")} - - client_conf = conf.get("journal_client").copy() - client_conf.update(kwargs) - - try: - return get_client(**client_conf) - except ValueError as exc: - ctx.fail(exc) + pass @cli.command() @click.option( "--stop-after-objects", "-n", default=None, type=int, help="Stop after processing this many objects. Default is to " "run forever.", ) @click.pass_context def replay(ctx, stop_after_objects): - """Fill a Storage by reading a Journal. + """DEPRECATED: use `swh storage replay` instead. - There can be several 'replayers' filling a Storage as long as they use - the same `group-id`. + Requires swh.storage >= 0.0.188. """ - conf = ctx.obj["config"] - try: - storage = get_storage(**conf.pop("storage")) - except KeyError: - ctx.fail("You must have a storage configured in your config file.") - - client = get_journal_client(ctx, stop_after_objects=stop_after_objects) - worker_fn = functools.partial(process_replay_objects, storage=storage) - - if notify: - notify("READY=1") - - try: - client.process(worker_fn) - except KeyboardInterrupt: - ctx.exit(0) - else: - print("Done.") - finally: - if notify: - notify("STOPPING=1") - client.close() + ctx.fail("DEPRECATED") @cli.command() @click.argument("object_type") @click.option("--start-object", default=None) @click.option("--end-object", default=None) @click.option("--dry-run", is_flag=True, default=False) @click.pass_context def backfiller(ctx, object_type, start_object, end_object, dry_run): - """Run the backfiller + """DEPRECATED: use `swh storage backfill` instead. - The backfiller list objects from a Storage and produce journal entries from - there. - - Typically used to rebuild a journal or compensate for missing objects in a - journal (eg. due to a downtime of this later). - - The configuration file requires the following entries: - - brokers: a list of kafka endpoints (the journal) in which entries will be - added. - - storage_dbconn: URL to connect to the storage DB. - - prefix: the prefix of the topics (topics will be .). - - client_id: the kafka client ID. + Requires swh.storage >= 0.0.188. """ - conf = ctx.obj["config"] - backfiller = JournalBackfiller(conf) - - if notify: - notify("READY=1") - - try: - backfiller.run( - object_type=object_type, - start_object=start_object, - end_object=end_object, - dry_run=dry_run, - ) - except KeyboardInterrupt: - if notify: - notify("STOPPING=1") - ctx.exit(0) + ctx.fail("DEPRECATED") @cli.command("content-replay") @click.option( "--stop-after-objects", "-n", default=None, type=int, help="Stop after processing this many objects. Default is to " "run forever.", ) @click.option( "--exclude-sha1-file", default=None, type=click.File("rb"), help="File containing a sorted array of hashes to be excluded.", ) @click.option( "--check-dst/--no-check-dst", default=True, help="Check whether the destination contains the object before " "copying.", ) @click.pass_context def content_replay(ctx, stop_after_objects, exclude_sha1_file, check_dst): - """Fill a destination Object Storage (typically a mirror) by reading a Journal - and retrieving objects from an existing source ObjStorage. - - There can be several 'replayers' filling a given ObjStorage as long as they - use the same `group-id`. You can use the `KAFKA_GROUP_INSTANCE_ID` - environment variable to use KIP-345 static group membership. - - This service retrieves object ids to copy from the 'content' topic. It will - only copy object's content if the object's description in the kafka - nmessage has the status:visible set. - - `--exclude-sha1-file` may be used to exclude some hashes to speed-up the - replay in case many of the contents are already in the destination - objstorage. It must contain a concatenation of all (sha1) hashes, - and it must be sorted. - This file will not be fully loaded into memory at any given time, - so it can be arbitrarily large. - - `--check-dst` sets whether the replayer should check in the destination - ObjStorage before copying an object. You can turn that off if you know - you're copying to an empty ObjStorage. - """ - conf = ctx.obj["config"] - try: - objstorage_src = get_objstorage(**conf.pop("objstorage_src")) - except KeyError: - ctx.fail("You must have a source objstorage configured in " "your config file.") - try: - objstorage_dst = get_objstorage(**conf.pop("objstorage_dst")) - except KeyError: - ctx.fail( - "You must have a destination objstorage configured " "in your config file." - ) - - if exclude_sha1_file: - map_ = mmap.mmap(exclude_sha1_file.fileno(), 0, prot=mmap.PROT_READ) - if map_.size() % SHA1_SIZE != 0: - ctx.fail( - "--exclude-sha1 must link to a file whose size is an " - "exact multiple of %d bytes." % SHA1_SIZE - ) - nb_excluded_hashes = int(map_.size() / SHA1_SIZE) - - def exclude_fn(obj): - return is_hash_in_bytearray(obj["sha1"], map_, nb_excluded_hashes) - - else: - exclude_fn = None - - client = get_journal_client( - ctx, stop_after_objects=stop_after_objects, object_types=("content",) - ) - worker_fn = functools.partial( - process_replay_objects_content, - src=objstorage_src, - dst=objstorage_dst, - exclude_fn=exclude_fn, - check_dst=check_dst, - ) - - if notify: - notify("READY=1") + """DEPRECATED: use `swh objstorage replay` instead. - try: - client.process(worker_fn) - except KeyboardInterrupt: - ctx.exit(0) - else: - print("Done.") - finally: - if notify: - notify("STOPPING=1") - client.close() + This needs the swh.objstorage.replayer package.""" + ctx.fail("DEPRECATED") def main(): logging.basicConfig() return cli(auto_envvar_prefix="SWH_JOURNAL") if __name__ == "__main__": main() diff --git a/swh/journal/direct_writer.py b/swh/journal/direct_writer.py deleted file mode 100644 index 2f6904c..0000000 --- a/swh/journal/direct_writer.py +++ /dev/null @@ -1,7 +0,0 @@ -# Copyright (C) 2019 The Software Heritage developers -# See the AUTHORS file at the top-level directory of this distribution -# License: GNU General Public License version 3, or any later version -# See top-level LICENSE file for more information - -# for BW compat -from .writer.kafka import KafkaJournalWriter as DirectKafkaWriter # noqa diff --git a/swh/journal/fixer.py b/swh/journal/fixer.py deleted file mode 100644 index 7322b35..0000000 --- a/swh/journal/fixer.py +++ /dev/null @@ -1,293 +0,0 @@ -import copy -import logging -from typing import Any, Dict, List, Optional -from swh.model.identifiers import normalize_timestamp - -logger = logging.getLogger(__name__) - - -def _fix_content(content: Dict[str, Any]) -> Dict[str, Any]: - """Filters-out invalid 'perms' key that leaked from swh.model.from_disk - to the journal. - - >>> _fix_content({'perms': 0o100644, 'sha1_git': b'foo'}) - {'sha1_git': b'foo'} - - >>> _fix_content({'sha1_git': b'bar'}) - {'sha1_git': b'bar'} - - """ - content = content.copy() - content.pop("perms", None) - return content - - -def _fix_revision_pypi_empty_string(rev): - """PyPI loader failed to encode empty strings as bytes, see: - swh:1:rev:8f0095ee0664867055d03de9bcc8f95b91d8a2b9 - or https://forge.softwareheritage.org/D1772 - """ - rev = { - **rev, - "author": rev["author"].copy(), - "committer": rev["committer"].copy(), - } - if rev["author"].get("email") == "": - rev["author"]["email"] = b"" - if rev["author"].get("name") == "": - rev["author"]["name"] = b"" - if rev["committer"].get("email") == "": - rev["committer"]["email"] = b"" - if rev["committer"].get("name") == "": - rev["committer"]["name"] = b"" - return rev - - -def _fix_revision_transplant_source(rev): - if rev.get("metadata") and rev["metadata"].get("extra_headers"): - rev = copy.deepcopy(rev) - rev["metadata"]["extra_headers"] = [ - [key, value.encode("ascii")] - if key == "transplant_source" and isinstance(value, str) - else [key, value] - for (key, value) in rev["metadata"]["extra_headers"] - ] - return rev - - -def _check_date(date): - """Returns whether the date can be represented in backends with sane - limits on timestamps and timezones (resp. signed 64-bits and - signed 16 bits), and that microseconds is valid (ie. between 0 and 10^6). - """ - if date is None: - return True - date = normalize_timestamp(date) - return ( - (-(2 ** 63) <= date["timestamp"]["seconds"] < 2 ** 63) - and (0 <= date["timestamp"]["microseconds"] < 10 ** 6) - and (-(2 ** 15) <= date["offset"] < 2 ** 15) - ) - - -def _check_revision_date(rev): - """Exclude revisions with invalid dates. - See https://forge.softwareheritage.org/T1339""" - return _check_date(rev["date"]) and _check_date(rev["committer_date"]) - - -def _fix_revision(revision: Dict[str, Any]) -> Optional[Dict]: - """Fix various legacy revision issues. - - Fix author/committer person: - - >>> from pprint import pprint - >>> date = { - ... 'timestamp': { - ... 'seconds': 1565096932, - ... 'microseconds': 0, - ... }, - ... 'offset': 0, - ... } - >>> rev0 = _fix_revision({ - ... 'id': b'rev-id', - ... 'author': {'fullname': b'', 'name': '', 'email': ''}, - ... 'committer': {'fullname': b'', 'name': '', 'email': ''}, - ... 'date': date, - ... 'committer_date': date, - ... 'type': 'git', - ... 'message': '', - ... 'directory': b'dir-id', - ... 'synthetic': False, - ... }) - >>> rev0['author'] - {'fullname': b'', 'name': b'', 'email': b''} - >>> rev0['committer'] - {'fullname': b'', 'name': b'', 'email': b''} - - Fix type of 'transplant_source' extra headers: - - >>> rev1 = _fix_revision({ - ... 'id': b'rev-id', - ... 'author': {'fullname': b'', 'name': '', 'email': ''}, - ... 'committer': {'fullname': b'', 'name': '', 'email': ''}, - ... 'date': date, - ... 'committer_date': date, - ... 'metadata': { - ... 'extra_headers': [ - ... ['time_offset_seconds', b'-3600'], - ... ['transplant_source', '29c154a012a70f49df983625090434587622b39e'] - ... ]}, - ... 'type': 'git', - ... 'message': '', - ... 'directory': b'dir-id', - ... 'synthetic': False, - ... }) - >>> pprint(rev1['metadata']['extra_headers']) - [['time_offset_seconds', b'-3600'], - ['transplant_source', b'29c154a012a70f49df983625090434587622b39e']] - - Revision with invalid date are filtered: - - >>> from copy import deepcopy - >>> invalid_date1 = deepcopy(date) - >>> invalid_date1['timestamp']['microseconds'] = 1000000000 # > 10^6 - >>> rev = _fix_revision({ - ... 'author': {'fullname': b'', 'name': '', 'email': ''}, - ... 'committer': {'fullname': b'', 'name': '', 'email': ''}, - ... 'date': invalid_date1, - ... 'committer_date': date, - ... }) - >>> rev is None - True - - >>> invalid_date2 = deepcopy(date) - >>> invalid_date2['timestamp']['seconds'] = 2**70 # > 10^63 - >>> rev = _fix_revision({ - ... 'author': {'fullname': b'', 'name': '', 'email': ''}, - ... 'committer': {'fullname': b'', 'name': '', 'email': ''}, - ... 'date': invalid_date2, - ... 'committer_date': date, - ... }) - >>> rev is None - True - - >>> invalid_date3 = deepcopy(date) - >>> invalid_date3['offset'] = 2**20 # > 10^15 - >>> rev = _fix_revision({ - ... 'author': {'fullname': b'', 'name': '', 'email': ''}, - ... 'committer': {'fullname': b'', 'name': '', 'email': ''}, - ... 'date': date, - ... 'committer_date': invalid_date3, - ... }) - >>> rev is None - True - - """ # noqa - rev = _fix_revision_pypi_empty_string(revision) - rev = _fix_revision_transplant_source(rev) - if not _check_revision_date(rev): - logger.warning( - "Invalid revision date detected: %(revision)s", {"revision": rev} - ) - return None - return rev - - -def _fix_origin(origin: Dict) -> Dict: - """Fix legacy origin with type which is no longer part of the model. - - >>> from pprint import pprint - >>> pprint(_fix_origin({ - ... 'url': 'http://foo', - ... })) - {'url': 'http://foo'} - >>> pprint(_fix_origin({ - ... 'url': 'http://bar', - ... 'type': 'foo', - ... })) - {'url': 'http://bar'} - - """ - o = origin.copy() - o.pop("type", None) - return o - - -def _fix_origin_visit(visit: Dict) -> Dict: - """Fix various legacy origin visit issues. - - `visit['origin']` is a dict instead of an URL: - - >>> from datetime import datetime, timezone - >>> from pprint import pprint - >>> date = datetime(2020, 2, 27, 14, 39, 19, tzinfo=timezone.utc) - >>> pprint(_fix_origin_visit({ - ... 'origin': {'url': 'http://foo'}, - ... 'date': date, - ... 'type': 'git', - ... 'status': 'ongoing', - ... 'snapshot': None, - ... })) - {'date': datetime.datetime(2020, 2, 27, 14, 39, 19, tzinfo=datetime.timezone.utc), - 'metadata': None, - 'origin': 'http://foo', - 'snapshot': None, - 'status': 'ongoing', - 'type': 'git'} - - `visit['type']` is missing , but `origin['visit']['type']` exists: - - >>> pprint(_fix_origin_visit( - ... {'origin': {'type': 'hg', 'url': 'http://foo'}, - ... 'date': date, - ... 'status': 'ongoing', - ... 'snapshot': None, - ... })) - {'date': datetime.datetime(2020, 2, 27, 14, 39, 19, tzinfo=datetime.timezone.utc), - 'metadata': None, - 'origin': 'http://foo', - 'snapshot': None, - 'status': 'ongoing', - 'type': 'hg'} - - Old visit format (origin_visit with no type) raises: - - >>> _fix_origin_visit({ - ... 'origin': {'url': 'http://foo'}, - ... 'date': date, - ... 'status': 'ongoing', - ... 'snapshot': None - ... }) - Traceback (most recent call last): - ... - ValueError: Old origin visit format detected... - - >>> _fix_origin_visit({ - ... 'origin': 'http://foo', - ... 'date': date, - ... 'status': 'ongoing', - ... 'snapshot': None - ... }) - Traceback (most recent call last): - ... - ValueError: Old origin visit format detected... - - """ # noqa - visit = visit.copy() - if "type" not in visit: - if isinstance(visit["origin"], dict) and "type" in visit["origin"]: - # Very old version of the schema: visits did not have a type, - # but their 'origin' field was a dict with a 'type' key. - visit["type"] = visit["origin"]["type"] - else: - # Very old schema version: 'type' is missing, stop early - - # We expect the journal's origin_visit topic to no longer reference - # such visits. If it does, the replayer must crash so we can fix - # the journal's topic. - raise ValueError(f"Old origin visit format detected: {visit}") - if isinstance(visit["origin"], dict): - # Old version of the schema: visit['origin'] was a dict. - visit["origin"] = visit["origin"]["url"] - if "metadata" not in visit: - visit["metadata"] = None - return visit - - -def fix_objects(object_type: str, objects: List[Dict]) -> List[Dict]: - """ - Fix legacy objects from the journal to bring them up to date with the - latest storage schema. - """ - if object_type == "content": - return [_fix_content(v) for v in objects] - elif object_type == "revision": - revisions = [_fix_revision(v) for v in objects] - return [rev for rev in revisions if rev is not None] - elif object_type == "origin": - return [_fix_origin(v) for v in objects] - elif object_type == "origin_visit": - return [_fix_origin_visit(v) for v in objects] - else: - return objects diff --git a/swh/journal/replay.py b/swh/journal/replay.py deleted file mode 100644 index dfc6fab..0000000 --- a/swh/journal/replay.py +++ /dev/null @@ -1,413 +0,0 @@ -# Copyright (C) 2019-2020 The Software Heritage developers -# See the AUTHORS file at the top-level directory of this distribution -# License: GNU General Public License version 3, or any later version -# See top-level LICENSE file for more information - -import logging -from time import time -from typing import Any, Callable, Dict, Iterable, List, Optional - -from sentry_sdk import capture_exception, push_scope - -try: - from systemd.daemon import notify -except ImportError: - notify = None - -from tenacity import ( - retry, - retry_if_exception_type, - stop_after_attempt, - wait_random_exponential, -) - -from swh.core.statsd import statsd -from swh.journal.fixer import fix_objects -from swh.model.hashutil import hash_to_hex - -from swh.model.model import ( - BaseContent, - BaseModel, - Content, - Directory, - Origin, - OriginVisit, - Revision, - SHA1_SIZE, - SkippedContent, - Snapshot, - Release, -) -from swh.objstorage.objstorage import ( - ID_HASH_ALGO, - ObjNotFoundError, - ObjStorage, -) -from swh.storage.exc import HashCollision - -logger = logging.getLogger(__name__) - -GRAPH_OPERATIONS_METRIC = "swh_graph_replayer_operations_total" -GRAPH_DURATION_METRIC = "swh_graph_replayer_duration_seconds" -CONTENT_OPERATIONS_METRIC = "swh_content_replayer_operations_total" -CONTENT_RETRY_METRIC = "swh_content_replayer_retries_total" -CONTENT_BYTES_METRIC = "swh_content_replayer_bytes" -CONTENT_DURATION_METRIC = "swh_content_replayer_duration_seconds" - - -object_converter_fn: Dict[str, Callable[[Dict], BaseModel]] = { - "origin": Origin.from_dict, - "origin_visit": OriginVisit.from_dict, - "snapshot": Snapshot.from_dict, - "revision": Revision.from_dict, - "release": Release.from_dict, - "directory": Directory.from_dict, - "content": Content.from_dict, - "skipped_content": SkippedContent.from_dict, -} - - -def process_replay_objects(all_objects, *, storage): - for (object_type, objects) in all_objects.items(): - logger.debug("Inserting %s %s objects", len(objects), object_type) - with statsd.timed(GRAPH_DURATION_METRIC, tags={"object_type": object_type}): - _insert_objects(object_type, objects, storage) - statsd.increment( - GRAPH_OPERATIONS_METRIC, len(objects), tags={"object_type": object_type} - ) - if notify: - notify("WATCHDOG=1") - - -def collision_aware_content_add( - content_add_fn: Callable[[Iterable[Any]], None], contents: List[BaseContent] -) -> None: - """Add contents to storage. If a hash collision is detected, an error is - logged. Then this adds the other non colliding contents to the storage. - - Args: - content_add_fn: Storage content callable - contents: List of contents or skipped contents to add to storage - - """ - if not contents: - return - colliding_content_hashes: List[Dict[str, Any]] = [] - while True: - try: - content_add_fn(contents) - except HashCollision as e: - colliding_content_hashes.append( - { - "algo": e.algo, - "hash": e.hash_id, # hex hash id - "objects": e.colliding_contents, # hex hashes - } - ) - colliding_hashes = e.colliding_content_hashes() - # Drop the colliding contents from the transaction - contents = [c for c in contents if c.hashes() not in colliding_hashes] - else: - # Successfully added contents, we are done - break - if colliding_content_hashes: - for collision in colliding_content_hashes: - logger.error("Collision detected: %(collision)s", {"collision": collision}) - - -def _insert_objects(object_type: str, objects: List[Dict], storage) -> None: - """Insert objects of type object_type in the storage. - - """ - objects = fix_objects(object_type, objects) - - if object_type == "content": - contents: List[BaseContent] = [] - skipped_contents: List[BaseContent] = [] - for content in objects: - c = BaseContent.from_dict(content) - if isinstance(c, SkippedContent): - skipped_contents.append(c) - else: - contents.append(c) - - collision_aware_content_add(storage.skipped_content_add, skipped_contents) - collision_aware_content_add(storage.content_add_metadata, contents) - elif object_type == "origin_visit": - visits: List[OriginVisit] = [] - origins: List[Origin] = [] - for obj in objects: - visit = OriginVisit.from_dict(obj) - visits.append(visit) - origins.append(Origin(url=visit.origin)) - storage.origin_add(origins) - storage.origin_visit_upsert(visits) - elif object_type in ("directory", "revision", "release", "snapshot", "origin"): - method = getattr(storage, object_type + "_add") - method(object_converter_fn[object_type](o) for o in objects) - else: - logger.warning("Received a series of %s, this should not happen", object_type) - - -def is_hash_in_bytearray(hash_, array, nb_hashes, hash_size=SHA1_SIZE): - """ - Checks if the given hash is in the provided `array`. The array must be - a *sorted* list of sha1 hashes, and contain `nb_hashes` hashes - (so its size must by `nb_hashes*hash_size` bytes). - - Args: - hash_ (bytes): the hash to look for - array (bytes): a sorted concatenated array of hashes (may be of - any type supporting slice indexing, eg. :class:`mmap.mmap`) - nb_hashes (int): number of hashes in the array - hash_size (int): size of a hash (defaults to 20, for SHA1) - - Example: - - >>> import os - >>> hash1 = os.urandom(20) - >>> hash2 = os.urandom(20) - >>> hash3 = os.urandom(20) - >>> array = b''.join(sorted([hash1, hash2])) - >>> is_hash_in_bytearray(hash1, array, 2) - True - >>> is_hash_in_bytearray(hash2, array, 2) - True - >>> is_hash_in_bytearray(hash3, array, 2) - False - """ - if len(hash_) != hash_size: - raise ValueError("hash_ does not match the provided hash_size.") - - def get_hash(position): - return array[position * hash_size : (position + 1) * hash_size] - - # Regular dichotomy: - left = 0 - right = nb_hashes - while left < right - 1: - middle = int((right + left) / 2) - pivot = get_hash(middle) - if pivot == hash_: - return True - elif pivot < hash_: - left = middle - else: - right = middle - return get_hash(left) == hash_ - - -class ReplayError(Exception): - """An error occurred during the replay of an object""" - - def __init__(self, operation, *, obj_id, exc): - self.operation = operation - self.obj_id = hash_to_hex(obj_id) - self.exc = exc - - def __str__(self): - return "ReplayError(doing %s, %s, %s)" % (self.operation, self.obj_id, self.exc) - - -def log_replay_retry(retry_obj, sleep, last_result): - """Log a retry of the content replayer""" - exc = last_result.exception() - logger.debug( - "Retry operation %(operation)s on %(obj_id)s: %(exc)s", - {"operation": exc.operation, "obj_id": exc.obj_id, "exc": str(exc.exc)}, - ) - - statsd.increment( - CONTENT_RETRY_METRIC, - tags={ - "operation": exc.operation, - "attempt": str(retry_obj.statistics["attempt_number"]), - }, - ) - - -def log_replay_error(last_attempt): - """Log a replay error to sentry""" - exc = last_attempt.exception() - with push_scope() as scope: - scope.set_tag("operation", exc.operation) - scope.set_extra("obj_id", exc.obj_id) - capture_exception(exc.exc) - - logger.error( - "Failed operation %(operation)s on %(obj_id)s after %(retries)s" - " retries: %(exc)s", - { - "obj_id": exc.obj_id, - "operation": exc.operation, - "exc": str(exc.exc), - "retries": last_attempt.attempt_number, - }, - ) - - return None - - -CONTENT_REPLAY_RETRIES = 3 - -content_replay_retry = retry( - retry=retry_if_exception_type(ReplayError), - stop=stop_after_attempt(CONTENT_REPLAY_RETRIES), - wait=wait_random_exponential(multiplier=1, max=60), - before_sleep=log_replay_retry, - retry_error_callback=log_replay_error, -) - - -@content_replay_retry -def copy_object(obj_id, src, dst): - hex_obj_id = hash_to_hex(obj_id) - obj = "" - try: - with statsd.timed(CONTENT_DURATION_METRIC, tags={"request": "get"}): - obj = src.get(obj_id) - logger.debug("retrieved %(obj_id)s", {"obj_id": hex_obj_id}) - - with statsd.timed(CONTENT_DURATION_METRIC, tags={"request": "put"}): - dst.add(obj, obj_id=obj_id, check_presence=False) - logger.debug("copied %(obj_id)s", {"obj_id": hex_obj_id}) - statsd.increment(CONTENT_BYTES_METRIC, len(obj)) - except ObjNotFoundError: - logger.error( - "Failed to copy %(obj_id)s: object not found", {"obj_id": hex_obj_id} - ) - raise - except Exception as exc: - raise ReplayError("copy", obj_id=obj_id, exc=exc) from None - return len(obj) - - -@content_replay_retry -def obj_in_objstorage(obj_id, dst): - """Check if an object is already in an objstorage, tenaciously""" - try: - return obj_id in dst - except Exception as exc: - raise ReplayError("in_dst", obj_id=obj_id, exc=exc) from None - - -def process_replay_objects_content( - all_objects: Dict[str, List[dict]], - *, - src: ObjStorage, - dst: ObjStorage, - exclude_fn: Optional[Callable[[dict], bool]] = None, - check_dst: bool = True, -): - """ - Takes a list of records from Kafka (see - :py:func:`swh.journal.client.JournalClient.process`) and copies them - from the `src` objstorage to the `dst` objstorage, if: - - * `obj['status']` is `'visible'` - * `exclude_fn(obj)` is `False` (if `exclude_fn` is provided) - * `obj['sha1'] not in dst` (if `check_dst` is True) - - Args: - all_objects: Objects passed by the Kafka client. Most importantly, - `all_objects['content'][*]['sha1']` is the sha1 hash of each - content. - src: An object storage (see :py:func:`swh.objstorage.get_objstorage`) - dst: An object storage (see :py:func:`swh.objstorage.get_objstorage`) - exclude_fn: Determines whether an object should be copied. - check_dst: Determines whether we should check the destination - objstorage before copying. - - Example: - - >>> from swh.objstorage import get_objstorage - >>> src = get_objstorage('memory', {}) - >>> dst = get_objstorage('memory', {}) - >>> id1 = src.add(b'foo bar') - >>> id2 = src.add(b'baz qux') - >>> kafka_partitions = { - ... 'content': [ - ... { - ... 'sha1': id1, - ... 'status': 'visible', - ... }, - ... { - ... 'sha1': id2, - ... 'status': 'visible', - ... }, - ... ] - ... } - >>> process_replay_objects_content( - ... kafka_partitions, src=src, dst=dst, - ... exclude_fn=lambda obj: obj['sha1'] == id1) - >>> id1 in dst - False - >>> id2 in dst - True - """ - vol = [] - nb_skipped = 0 - nb_failures = 0 - t0 = time() - - for (object_type, objects) in all_objects.items(): - if object_type != "content": - logger.warning( - "Received a series of %s, this should not happen", object_type - ) - continue - for obj in objects: - obj_id = obj[ID_HASH_ALGO] - if obj["status"] != "visible": - nb_skipped += 1 - logger.debug( - "skipped %s (status=%s)", hash_to_hex(obj_id), obj["status"] - ) - statsd.increment( - CONTENT_OPERATIONS_METRIC, - tags={"decision": "skipped", "status": obj["status"]}, - ) - elif exclude_fn and exclude_fn(obj): - nb_skipped += 1 - logger.debug("skipped %s (manually excluded)", hash_to_hex(obj_id)) - statsd.increment( - CONTENT_OPERATIONS_METRIC, tags={"decision": "excluded"} - ) - elif check_dst and obj_in_objstorage(obj_id, dst): - nb_skipped += 1 - logger.debug("skipped %s (in dst)", hash_to_hex(obj_id)) - statsd.increment(CONTENT_OPERATIONS_METRIC, tags={"decision": "in_dst"}) - else: - try: - copied = copy_object(obj_id, src, dst) - except ObjNotFoundError: - nb_skipped += 1 - statsd.increment( - CONTENT_OPERATIONS_METRIC, tags={"decision": "not_in_src"} - ) - else: - if copied is None: - nb_failures += 1 - statsd.increment( - CONTENT_OPERATIONS_METRIC, tags={"decision": "failed"} - ) - else: - vol.append(copied) - statsd.increment( - CONTENT_OPERATIONS_METRIC, tags={"decision": "copied"} - ) - - dt = time() - t0 - logger.info( - "processed %s content objects in %.1fsec " - "(%.1f obj/sec, %.1fMB/sec) - %d failed - %d skipped", - len(vol), - dt, - len(vol) / dt, - sum(vol) / 1024 / 1024 / dt, - nb_failures, - nb_skipped, - ) - - if notify: - notify("WATCHDOG=1") diff --git a/swh/journal/serializers.py b/swh/journal/serializers.py index 293151a..986eeb3 100644 --- a/swh/journal/serializers.py +++ b/swh/journal/serializers.py @@ -1,112 +1,112 @@ # Copyright (C) 2016-2017 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from typing import Any, Dict, Union, overload import msgpack from swh.core.api.serializers import msgpack_dumps, msgpack_loads from swh.model.hashutil import DEFAULT_ALGORITHMS from swh.model.model import ( Content, Directory, Origin, OriginVisit, Release, Revision, SkippedContent, Snapshot, ) ModelObject = Union[ Content, Directory, Origin, OriginVisit, Release, Revision, SkippedContent, Snapshot ] KeyType = Union[Dict[str, str], Dict[str, bytes], bytes] # these @overload'ed versions of the object_key method aim at helping mypy figuring # the correct type-ing. @overload def object_key( object_type: str, object_: Union[Content, Directory, Revision, Release, Snapshot] ) -> bytes: ... @overload def object_key( object_type: str, object_: Union[Origin, SkippedContent] ) -> Dict[str, bytes]: ... @overload def object_key(object_type: str, object_: OriginVisit) -> Dict[str, str]: ... def object_key(object_type: str, object_) -> KeyType: if object_type in ("revision", "release", "directory", "snapshot"): return object_.id elif object_type == "content": return object_.sha1 # TODO: use a dict of hashes elif object_type == "skipped_content": return {hash: getattr(object_, hash) for hash in DEFAULT_ALGORITHMS} elif object_type == "origin": return {"url": object_.url} elif object_type == "origin_visit": return { "origin": object_.origin, "date": str(object_.date), } else: raise ValueError("Unknown object type: %s." % object_type) def stringify_key_item(k: str, v: Union[str, bytes]) -> str: """Turn the item of a dict key into a string""" if isinstance(v, str): return v if k == "url": return v.decode("utf-8") return v.hex() def pprint_key(key: KeyType) -> str: """Pretty-print a kafka key""" if isinstance(key, dict): return "{%s}" % ", ".join( f"{k}: {stringify_key_item(k, v)}" for k, v in key.items() ) elif isinstance(key, bytes): return key.hex() else: return key def key_to_kafka(key: KeyType) -> bytes: """Serialize a key, possibly a dict, in a predictable way""" p = msgpack.Packer(use_bin_type=True) if isinstance(key, dict): return p.pack_map_pairs(sorted(key.items())) else: return p.pack(key) def kafka_to_key(kafka_key: bytes) -> KeyType: """Deserialize a key""" - return msgpack.loads(kafka_key, raw=False, strict_map_key=False) + return msgpack.loads(kafka_key, raw=False) def value_to_kafka(value: Any) -> bytes: """Serialize some data for storage in kafka""" return msgpack_dumps(value) def kafka_to_value(kafka_value: bytes) -> Any: """Deserialize some data stored in kafka""" return msgpack_loads(kafka_value) diff --git a/swh/journal/tests/test_backfill.py b/swh/journal/tests/test_backfill.py deleted file mode 100644 index 07194b0..0000000 --- a/swh/journal/tests/test_backfill.py +++ /dev/null @@ -1,160 +0,0 @@ -# Copyright (C) 2019 The Software Heritage developers -# See the AUTHORS file at the top-level directory of this distribution -# License: GNU General Public License version 3, or any later version -# See top-level LICENSE file for more information - -import pytest - -from swh.journal.backfill import JournalBackfiller, compute_query, PARTITION_KEY - - -TEST_CONFIG = { - "brokers": ["localhost"], - "prefix": "swh.tmp_journal.new", - "client_id": "swh.journal.client.test", - "storage_dbconn": "service=swh-dev", -} - - -def test_config_ko_missing_mandatory_key(): - """Missing configuration key will make the initialization fail - - """ - for key in TEST_CONFIG.keys(): - config = TEST_CONFIG.copy() - config.pop(key) - - with pytest.raises(ValueError) as e: - JournalBackfiller(config) - - error = "Configuration error: The following keys must be" " provided: %s" % ( - ",".join([key]), - ) - assert e.value.args[0] == error - - -def test_config_ko_unknown_object_type(): - """Parse arguments will fail if the object type is unknown - - """ - backfiller = JournalBackfiller(TEST_CONFIG) - with pytest.raises(ValueError) as e: - backfiller.parse_arguments("unknown-object-type", 1, 2) - - error = ( - "Object type unknown-object-type is not supported. " - "The only possible values are %s" % (", ".join(PARTITION_KEY)) - ) - assert e.value.args[0] == error - - -def test_compute_query_content(): - query, where_args, column_aliases = compute_query("content", "\x000000", "\x000001") - - assert where_args == ["\x000000", "\x000001"] - - assert column_aliases == [ - "sha1", - "sha1_git", - "sha256", - "blake2s256", - "length", - "status", - "ctime", - ] - - assert ( - query - == """ -select sha1,sha1_git,sha256,blake2s256,length,status,ctime -from content - -where (sha1) >= %s and (sha1) < %s - """ - ) - - -def test_compute_query_skipped_content(): - query, where_args, column_aliases = compute_query("skipped_content", None, None) - - assert where_args == [] - - assert column_aliases == [ - "sha1", - "sha1_git", - "sha256", - "blake2s256", - "length", - "ctime", - "status", - "reason", - ] - - assert ( - query - == """ -select sha1,sha1_git,sha256,blake2s256,length,ctime,status,reason -from skipped_content - - - """ - ) - - -def test_compute_query_origin_visit(): - query, where_args, column_aliases = compute_query("origin_visit", 1, 10) - - assert where_args == [1, 10] - - assert column_aliases == [ - "visit", - "origin.type", - "origin_visit.type", - "url", - "date", - "snapshot", - "status", - "metadata", - ] - - assert ( - query - == """ -select visit,origin.type,origin_visit.type,url,date,snapshot,status,metadata -from origin_visit -left join origin on origin_visit.origin=origin.id -where (origin_visit.origin) >= %s and (origin_visit.origin) < %s - """ - ) - - -def test_compute_query_release(): - query, where_args, column_aliases = compute_query("release", "\x000002", "\x000003") - - assert where_args == ["\x000002", "\x000003"] - - assert column_aliases == [ - "id", - "date", - "date_offset", - "comment", - "name", - "synthetic", - "date_neg_utc_offset", - "target", - "target_type", - "author_id", - "author_name", - "author_email", - "author_fullname", - ] - - assert ( - query - == """ -select release.id as id,date,date_offset,comment,release.name as name,synthetic,date_neg_utc_offset,target,target_type,a.id as author_id,a.name as author_name,a.email as author_email,a.fullname as author_fullname -from release -left join person a on release.author=a.id -where (release.id) >= %s and (release.id) < %s - """ # noqa - ) diff --git a/swh/journal/tests/test_cli.py b/swh/journal/tests/test_cli.py deleted file mode 100644 index b1924d8..0000000 --- a/swh/journal/tests/test_cli.py +++ /dev/null @@ -1,655 +0,0 @@ -# Copyright (C) 2019 The Software Heritage developers -# See the AUTHORS file at the top-level directory of this distribution -# License: GNU General Public License version 3, or any later version -# See top-level LICENSE file for more information - -from collections import Counter -import copy -import functools -import logging -import re -import tempfile -from typing import Any, Dict -from unittest.mock import patch, MagicMock - -from click.testing import CliRunner -from confluent_kafka import Producer -import pytest -import yaml - -from swh.model.hashutil import hash_to_hex -from swh.objstorage.backends.in_memory import InMemoryObjStorage -from swh.storage import get_storage - -from swh.journal.cli import cli, get_journal_client -from swh.journal.replay import CONTENT_REPLAY_RETRIES -from swh.journal.serializers import key_to_kafka, value_to_kafka - - -logger = logging.getLogger(__name__) - - -CLI_CONFIG = { - "storage": {"cls": "memory",}, - "objstorage_src": {"cls": "mocked", "name": "src",}, - "objstorage_dst": {"cls": "mocked", "name": "dst",}, -} - - -@pytest.fixture -def storage(): - """An swh-storage object that gets injected into the CLI functions.""" - storage_config = {"cls": "pipeline", "steps": [{"cls": "memory"},]} - storage = get_storage(**storage_config) - with patch("swh.journal.cli.get_storage") as get_storage_mock: - get_storage_mock.return_value = storage - yield storage - - -@pytest.fixture -def monkeypatch_retry_sleep(monkeypatch): - from swh.journal.replay import copy_object, obj_in_objstorage - - monkeypatch.setattr(copy_object.retry, "sleep", lambda x: None) - monkeypatch.setattr(obj_in_objstorage.retry, "sleep", lambda x: None) - - -def invoke(*args, env=None, journal_config=None): - config = copy.deepcopy(CLI_CONFIG) - if journal_config: - config["journal_client"] = journal_config.copy() - config["journal_client"]["cls"] = "kafka" - - runner = CliRunner() - with tempfile.NamedTemporaryFile("a", suffix=".yml") as config_fd: - yaml.dump(config, config_fd) - config_fd.seek(0) - args = ["-C" + config_fd.name] + list(args) - return runner.invoke(cli, args, obj={"log_level": logging.DEBUG}, env=env,) - - -def test_get_journal_client_config_bwcompat(kafka_server): - cfg = { - "journal": { - "brokers": [kafka_server], - "group_id": "toto", - "prefix": "xiferp", - "object_types": ["content"], - "batch_size": 50, - } - } - ctx = MagicMock(obj={"config": cfg}) - with pytest.deprecated_call(): - client = get_journal_client(ctx, stop_after_objects=10, prefix="prefix") - assert client.subscription == ["prefix.content"] - assert client.stop_after_objects == 10 - assert client.batch_size == 50 - - -def test_get_journal_client_config(kafka_server): - cfg = { - "journal_client": { - "cls": "kafka", - "brokers": [kafka_server], - "group_id": "toto", - "prefix": "xiferp", - "object_types": ["content"], - "batch_size": 50, - } - } - ctx = MagicMock(obj={"config": cfg}) - client = get_journal_client(ctx, stop_after_objects=10, prefix="prefix") - assert client.subscription == ["prefix.content"] - assert client.stop_after_objects == 10 - assert client.batch_size == 50 - - -def test_replay( - storage, kafka_prefix: str, kafka_consumer_group: str, kafka_server: str, -): - kafka_prefix += ".swh.journal.objects" - - producer = Producer( - { - "bootstrap.servers": kafka_server, - "client.id": "test-producer", - "acks": "all", - } - ) - - snapshot = { - "id": b"foo", - "branches": {b"HEAD": {"target_type": "revision", "target": b"\x01" * 20,}}, - } # type: Dict[str, Any] - producer.produce( - topic=kafka_prefix + ".snapshot", - key=key_to_kafka(snapshot["id"]), - value=value_to_kafka(snapshot), - ) - producer.flush() - - logger.debug("Flushed producer") - - result = invoke( - "replay", - "--stop-after-objects", - "1", - journal_config={ - "brokers": [kafka_server], - "group_id": kafka_consumer_group, - "prefix": kafka_prefix, - }, - ) - - expected = r"Done.\n" - assert result.exit_code == 0, result.output - assert re.fullmatch(expected, result.output, re.MULTILINE), result.output - - assert storage.snapshot_get(snapshot["id"]) == {**snapshot, "next_branch": None} - - -def _patch_objstorages(names): - objstorages = {name: InMemoryObjStorage() for name in names} - - def get_mock_objstorage(cls, **args): - assert cls == "mocked", cls - return objstorages[args["name"]] - - def decorator(f): - @functools.wraps(f) - @patch("swh.journal.cli.get_objstorage") - def newf(get_objstorage_mock, *args, **kwargs): - get_objstorage_mock.side_effect = get_mock_objstorage - f(*args, objstorages=objstorages, **kwargs) - - return newf - - return decorator - - -NUM_CONTENTS = 10 - - -def _fill_objstorage_and_kafka(kafka_server, kafka_prefix, objstorages): - producer = Producer( - { - "bootstrap.servers": kafka_server, - "client.id": "test-producer", - "acks": "all", - } - ) - - contents = {} - for i in range(NUM_CONTENTS): - content = b"\x00" * 19 + bytes([i]) - sha1 = objstorages["src"].add(content) - contents[sha1] = content - producer.produce( - topic=kafka_prefix + ".content", - key=key_to_kafka(sha1), - value=key_to_kafka({"sha1": sha1, "status": "visible",}), - ) - - producer.flush() - - return contents - - -@_patch_objstorages(["src", "dst"]) -def test_replay_content( - objstorages, - storage, - kafka_prefix: str, - kafka_consumer_group: str, - kafka_server: str, -): - kafka_prefix += ".swh.journal.objects" - - contents = _fill_objstorage_and_kafka(kafka_server, kafka_prefix, objstorages) - - result = invoke( - "content-replay", - "--stop-after-objects", - str(NUM_CONTENTS), - journal_config={ - "brokers": [kafka_server], - "group_id": kafka_consumer_group, - "prefix": kafka_prefix, - }, - ) - - expected = r"Done.\n" - assert result.exit_code == 0, result.output - assert re.fullmatch(expected, result.output, re.MULTILINE), result.output - - for (sha1, content) in contents.items(): - assert sha1 in objstorages["dst"], sha1 - assert objstorages["dst"].get(sha1) == content - - -@_patch_objstorages(["src", "dst"]) -def test_replay_content_structured_log( - objstorages, - storage, - kafka_prefix: str, - kafka_consumer_group: str, - kafka_server: str, - caplog, -): - kafka_prefix += ".swh.journal.objects" - - contents = _fill_objstorage_and_kafka(kafka_server, kafka_prefix, objstorages) - - caplog.set_level(logging.DEBUG, "swh.journal.replay") - - expected_obj_ids = set(hash_to_hex(sha1) for sha1 in contents) - - result = invoke( - "content-replay", - "--stop-after-objects", - str(NUM_CONTENTS), - journal_config={ - "brokers": [kafka_server], - "group_id": kafka_consumer_group, - "prefix": kafka_prefix, - }, - ) - expected = r"Done.\n" - assert result.exit_code == 0, result.output - assert re.fullmatch(expected, result.output, re.MULTILINE), result.output - - copied = set() - for record in caplog.records: - logtext = record.getMessage() - if "copied" in logtext: - copied.add(record.args["obj_id"]) - - assert ( - copied == expected_obj_ids - ), "Mismatched logging; see captured log output for details." - - -@_patch_objstorages(["src", "dst"]) -def test_replay_content_static_group_id( - objstorages, - storage, - kafka_prefix: str, - kafka_consumer_group: str, - kafka_server: str, - caplog, -): - kafka_prefix += ".swh.journal.objects" - - contents = _fill_objstorage_and_kafka(kafka_server, kafka_prefix, objstorages) - - # Setup log capture to fish the consumer settings out of the log messages - caplog.set_level(logging.DEBUG, "swh.journal.client") - - result = invoke( - "content-replay", - "--stop-after-objects", - str(NUM_CONTENTS), - env={"KAFKA_GROUP_INSTANCE_ID": "static-group-instance-id"}, - journal_config={ - "brokers": [kafka_server], - "group_id": kafka_consumer_group, - "prefix": kafka_prefix, - }, - ) - expected = r"Done.\n" - assert result.exit_code == 0, result.output - assert re.fullmatch(expected, result.output, re.MULTILINE), result.output - - consumer_settings = None - for record in caplog.records: - if "Consumer settings" in record.message: - consumer_settings = record.args - break - - assert consumer_settings is not None, ( - "Failed to get consumer settings out of the consumer log. " - "See log capture for details." - ) - assert consumer_settings["group.instance.id"] == "static-group-instance-id" - assert consumer_settings["session.timeout.ms"] == 60 * 10 * 1000 - assert consumer_settings["max.poll.interval.ms"] == 90 * 10 * 1000 - - for (sha1, content) in contents.items(): - assert sha1 in objstorages["dst"], sha1 - assert objstorages["dst"].get(sha1) == content - - -@_patch_objstorages(["src", "dst"]) -def test_replay_content_exclude( - objstorages, - storage, - kafka_prefix: str, - kafka_consumer_group: str, - kafka_server: str, -): - kafka_prefix += ".swh.journal.objects" - - contents = _fill_objstorage_and_kafka(kafka_server, kafka_prefix, objstorages) - - excluded_contents = list(contents)[0::2] # picking half of them - with tempfile.NamedTemporaryFile(mode="w+b") as fd: - fd.write(b"".join(sorted(excluded_contents))) - - fd.seek(0) - - result = invoke( - "content-replay", - "--stop-after-objects", - str(NUM_CONTENTS), - "--exclude-sha1-file", - fd.name, - journal_config={ - "brokers": [kafka_server], - "group_id": kafka_consumer_group, - "prefix": kafka_prefix, - }, - ) - expected = r"Done.\n" - assert result.exit_code == 0, result.output - assert re.fullmatch(expected, result.output, re.MULTILINE), result.output - - for (sha1, content) in contents.items(): - if sha1 in excluded_contents: - assert sha1 not in objstorages["dst"], sha1 - else: - assert sha1 in objstorages["dst"], sha1 - assert objstorages["dst"].get(sha1) == content - - -NUM_CONTENTS_DST = 5 - - -@_patch_objstorages(["src", "dst"]) -@pytest.mark.parametrize( - "check_dst,expected_copied,expected_in_dst", - [ - (True, NUM_CONTENTS - NUM_CONTENTS_DST, NUM_CONTENTS_DST), - (False, NUM_CONTENTS, 0), - ], -) -def test_replay_content_check_dst( - objstorages, - storage, - kafka_prefix: str, - kafka_consumer_group: str, - kafka_server: str, - check_dst: bool, - expected_copied: int, - expected_in_dst: int, - caplog, -): - kafka_prefix += ".swh.journal.objects" - - contents = _fill_objstorage_and_kafka(kafka_server, kafka_prefix, objstorages) - - for i, (sha1, content) in enumerate(contents.items()): - if i >= NUM_CONTENTS_DST: - break - - objstorages["dst"].add(content, obj_id=sha1) - - caplog.set_level(logging.DEBUG, "swh.journal.replay") - - result = invoke( - "content-replay", - "--stop-after-objects", - str(NUM_CONTENTS), - "--check-dst" if check_dst else "--no-check-dst", - journal_config={ - "brokers": [kafka_server], - "group_id": kafka_consumer_group, - "prefix": kafka_prefix, - }, - ) - expected = r"Done.\n" - assert result.exit_code == 0, result.output - assert re.fullmatch(expected, result.output, re.MULTILINE), result.output - - copied = 0 - in_dst = 0 - for record in caplog.records: - logtext = record.getMessage() - if "copied" in logtext: - copied += 1 - elif "in dst" in logtext: - in_dst += 1 - - assert ( - copied == expected_copied and in_dst == expected_in_dst - ), "Unexpected amount of objects copied, see the captured log for details" - - for (sha1, content) in contents.items(): - assert sha1 in objstorages["dst"], sha1 - assert objstorages["dst"].get(sha1) == content - - -class FlakyObjStorage(InMemoryObjStorage): - def __init__(self, *args, **kwargs): - state = kwargs.pop("state") - self.failures_left = Counter(kwargs.pop("failures")) - super().__init__(*args, **kwargs) - if state: - self.state = state - - def flaky_operation(self, op, obj_id): - if self.failures_left[op, obj_id] > 0: - self.failures_left[op, obj_id] -= 1 - raise RuntimeError("Failed %s on %s" % (op, hash_to_hex(obj_id))) - - def get(self, obj_id): - self.flaky_operation("get", obj_id) - return super().get(obj_id) - - def add(self, data, obj_id=None, check_presence=True): - self.flaky_operation("add", obj_id) - return super().add(data, obj_id=obj_id, check_presence=check_presence) - - def __contains__(self, obj_id): - self.flaky_operation("in", obj_id) - return super().__contains__(obj_id) - - -@_patch_objstorages(["src", "dst"]) -def test_replay_content_check_dst_retry( - objstorages, - storage, - kafka_prefix: str, - kafka_consumer_group: str, - kafka_server: str, - monkeypatch_retry_sleep, -): - kafka_prefix += ".swh.journal.objects" - - contents = _fill_objstorage_and_kafka(kafka_server, kafka_prefix, objstorages) - - failures = {} - for i, (sha1, content) in enumerate(contents.items()): - if i >= NUM_CONTENTS_DST: - break - - objstorages["dst"].add(content, obj_id=sha1) - failures["in", sha1] = 1 - - orig_dst = objstorages["dst"] - objstorages["dst"] = FlakyObjStorage(state=orig_dst.state, failures=failures) - - result = invoke( - "content-replay", - "--stop-after-objects", - str(NUM_CONTENTS), - "--check-dst", - journal_config={ - "brokers": [kafka_server], - "group_id": kafka_consumer_group, - "prefix": kafka_prefix, - }, - ) - expected = r"Done.\n" - assert result.exit_code == 0, result.output - assert re.fullmatch(expected, result.output, re.MULTILINE), result.output - - for (sha1, content) in contents.items(): - assert sha1 in objstorages["dst"], sha1 - assert objstorages["dst"].get(sha1) == content - - -@_patch_objstorages(["src", "dst"]) -def test_replay_content_failed_copy_retry( - objstorages, - storage, - kafka_prefix: str, - kafka_consumer_group: str, - kafka_server: str, - caplog, - monkeypatch_retry_sleep, -): - kafka_prefix += ".swh.journal.objects" - - contents = _fill_objstorage_and_kafka(kafka_server, kafka_prefix, objstorages) - - add_failures = {} - get_failures = {} - definitely_failed = set() - - # We want to generate operations failing 1 to CONTENT_REPLAY_RETRIES times. - # We generate failures for 2 different operations, get and add. - num_retry_contents = 2 * CONTENT_REPLAY_RETRIES - - assert ( - num_retry_contents < NUM_CONTENTS - ), "Need to generate more test contents to properly test retry behavior" - - for i, sha1 in enumerate(contents): - if i >= num_retry_contents: - break - - # This generates a number of failures, up to CONTENT_REPLAY_RETRIES - num_failures = (i % CONTENT_REPLAY_RETRIES) + 1 - - # This generates failures of add for the first CONTENT_REPLAY_RETRIES - # objects, then failures of get. - if i < CONTENT_REPLAY_RETRIES: - add_failures["add", sha1] = num_failures - else: - get_failures["get", sha1] = num_failures - - # Only contents that have CONTENT_REPLAY_RETRIES or more are - # definitely failing - if num_failures >= CONTENT_REPLAY_RETRIES: - definitely_failed.add(hash_to_hex(sha1)) - - objstorages["dst"] = FlakyObjStorage( - state=objstorages["dst"].state, failures=add_failures, - ) - objstorages["src"] = FlakyObjStorage( - state=objstorages["src"].state, failures=get_failures, - ) - - caplog.set_level(logging.DEBUG, "swh.journal.replay") - - result = invoke( - "content-replay", - "--stop-after-objects", - str(NUM_CONTENTS), - journal_config={ - "brokers": [kafka_server], - "group_id": kafka_consumer_group, - "prefix": kafka_prefix, - }, - ) - expected = r"Done.\n" - assert result.exit_code == 0, result.output - assert re.fullmatch(expected, result.output, re.MULTILINE), result.output - - copied = 0 - actually_failed = set() - for record in caplog.records: - logtext = record.getMessage() - if "copied" in logtext: - copied += 1 - elif "Failed operation" in logtext: - assert record.levelno == logging.ERROR - assert record.args["retries"] == CONTENT_REPLAY_RETRIES - actually_failed.add(record.args["obj_id"]) - - assert ( - actually_failed == definitely_failed - ), "Unexpected object copy failures; see captured log for details" - - for (sha1, content) in contents.items(): - if hash_to_hex(sha1) in definitely_failed: - assert sha1 not in objstorages["dst"] - continue - - assert sha1 in objstorages["dst"], sha1 - assert objstorages["dst"].get(sha1) == content - - -@_patch_objstorages(["src", "dst"]) -def test_replay_content_objnotfound( - objstorages, - storage, - kafka_prefix: str, - kafka_consumer_group: str, - kafka_server: str, - caplog, -): - kafka_prefix += ".swh.journal.objects" - - contents = _fill_objstorage_and_kafka(kafka_server, kafka_prefix, objstorages) - - num_contents_deleted = 5 - contents_deleted = set() - - for i, sha1 in enumerate(contents): - if i >= num_contents_deleted: - break - - del objstorages["src"].state[sha1] - contents_deleted.add(hash_to_hex(sha1)) - - caplog.set_level(logging.DEBUG, "swh.journal.replay") - - result = invoke( - "content-replay", - "--stop-after-objects", - str(NUM_CONTENTS), - journal_config={ - "brokers": [kafka_server], - "group_id": kafka_consumer_group, - "prefix": kafka_prefix, - }, - ) - expected = r"Done.\n" - assert result.exit_code == 0, result.output - assert re.fullmatch(expected, result.output, re.MULTILINE), result.output - - copied = 0 - not_in_src = set() - for record in caplog.records: - logtext = record.getMessage() - if "copied" in logtext: - copied += 1 - elif "object not found" in logtext: - # Check that the object id can be recovered from logs - assert record.levelno == logging.ERROR - not_in_src.add(record.args["obj_id"]) - - assert ( - copied == NUM_CONTENTS - num_contents_deleted - ), "Unexpected number of contents copied" - - assert ( - not_in_src == contents_deleted - ), "Mismatch between deleted contents and not_in_src logs" - - for (sha1, content) in contents.items(): - if sha1 not in objstorages["src"]: - continue - assert sha1 in objstorages["dst"], sha1 - assert objstorages["dst"].get(sha1) == content diff --git a/swh/journal/tests/test_kafka_writer.py b/swh/journal/tests/test_kafka_writer.py index fa5e7d3..80ebd55 100644 --- a/swh/journal/tests/test_kafka_writer.py +++ b/swh/journal/tests/test_kafka_writer.py @@ -1,151 +1,100 @@ # Copyright (C) 2018-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import pytest from confluent_kafka import Consumer, Producer -from swh.storage import get_storage - -from swh.model.model import Directory, Origin, OriginVisit +from swh.model.model import Directory from swh.journal.tests.journal_data import TEST_OBJECTS from swh.journal.pytest_plugin import consume_messages, assert_all_objects_consumed from swh.journal.writer.kafka import KafkaJournalWriter, KafkaDeliveryError def test_kafka_writer(kafka_prefix: str, kafka_server: str, consumer: Consumer): kafka_prefix += ".swh.journal.objects" writer = KafkaJournalWriter( brokers=[kafka_server], client_id="kafka_writer", prefix=kafka_prefix, ) expected_messages = 0 for object_type, objects in TEST_OBJECTS.items(): writer.write_additions(object_type, objects) expected_messages += len(objects) consumed_messages = consume_messages(consumer, kafka_prefix, expected_messages) assert_all_objects_consumed(consumed_messages) -def test_storage_direct_writer(kafka_prefix: str, kafka_server, consumer: Consumer): - kafka_prefix += ".swh.journal.objects" - - writer_config = { - "cls": "kafka", - "brokers": [kafka_server], - "client_id": "kafka_writer", - "prefix": kafka_prefix, - } - storage_config = { - "cls": "pipeline", - "steps": [{"cls": "memory", "journal_writer": writer_config},], - } - - storage = get_storage(**storage_config) - - expected_messages = 0 - - for object_type, objects in TEST_OBJECTS.items(): - method = getattr(storage, object_type + "_add") - if object_type in ( - "content", - "directory", - "revision", - "release", - "snapshot", - "origin", - ): - method(objects) - expected_messages += len(objects) - elif object_type in ("origin_visit",): - for obj in objects: - assert isinstance(obj, OriginVisit) - storage.origin_add_one(Origin(url=obj.origin)) - visit = method(obj.origin, date=obj.date, type=obj.type) - expected_messages += 1 - - obj_d = obj.to_dict() - for k in ("visit", "origin", "date", "type"): - del obj_d[k] - storage.origin_visit_update(obj.origin, visit.visit, **obj_d) - expected_messages += 1 - else: - assert False, object_type - - consumed_messages = consume_messages(consumer, kafka_prefix, expected_messages) - assert_all_objects_consumed(consumed_messages) - - def test_write_delivery_failure( kafka_prefix: str, kafka_server: str, consumer: Consumer ): class MockKafkaError: """A mocked kafka error""" def str(self): return "Mocked Kafka Error" def name(self): return "SWH_MOCK_ERROR" class KafkaJournalWriterFailDelivery(KafkaJournalWriter): """A journal writer which always fails delivering messages""" def _on_delivery(self, error, message): """Replace the inbound error with a fake delivery error""" super()._on_delivery(MockKafkaError(), message) kafka_prefix += ".swh.journal.objects" writer = KafkaJournalWriterFailDelivery( brokers=[kafka_server], client_id="kafka_writer", prefix=kafka_prefix, ) empty_dir = Directory(entries=[]) with pytest.raises(KafkaDeliveryError) as exc: writer.write_addition("directory", empty_dir) assert "Failed deliveries" in exc.value.message assert len(exc.value.delivery_failures) == 1 delivery_failure = exc.value.delivery_failures[0] assert delivery_failure.key == empty_dir.id assert delivery_failure.code == "SWH_MOCK_ERROR" def test_write_delivery_timeout( kafka_prefix: str, kafka_server: str, consumer: Consumer ): produced = [] class MockProducer(Producer): """A kafka producer which pretends to produce messages, but never sends any delivery acknowledgements""" def produce(self, **kwargs): produced.append(kwargs) kafka_prefix += ".swh.journal.objects" writer = KafkaJournalWriter( brokers=[kafka_server], client_id="kafka_writer", prefix=kafka_prefix, flush_timeout=1, producer_class=MockProducer, ) empty_dir = Directory(entries=[]) with pytest.raises(KafkaDeliveryError) as exc: writer.write_addition("directory", empty_dir) assert len(produced) == 1 assert "timeout" in exc.value.message assert len(exc.value.delivery_failures) == 1 delivery_failure = exc.value.delivery_failures[0] assert delivery_failure.key == empty_dir.id assert delivery_failure.code == "SWH_FLUSH_TIMEOUT" diff --git a/swh/journal/tests/test_replay.py b/swh/journal/tests/test_replay.py deleted file mode 100644 index 796236d..0000000 --- a/swh/journal/tests/test_replay.py +++ /dev/null @@ -1,405 +0,0 @@ -# Copyright (C) 2019-2020 The Software Heritage developers -# See the AUTHORS file at the top-level directory of this distribution -# License: GNU General Public License version 3, or any later version -# See top-level LICENSE file for more information - -import datetime -import functools -import logging -import random -from typing import Dict, List - -import dateutil -import pytest - -from confluent_kafka import Producer -from hypothesis import strategies, given, settings - -from swh.storage import get_storage - -from swh.journal.client import JournalClient -from swh.journal.serializers import key_to_kafka, value_to_kafka -from swh.journal.replay import process_replay_objects, is_hash_in_bytearray -from swh.model.hashutil import hash_to_hex -from swh.model.model import Content - -from .conftest import TEST_OBJECT_DICTS, DUPLICATE_CONTENTS -from .utils import MockedJournalClient, MockedKafkaWriter - - -storage_config = {"cls": "pipeline", "steps": [{"cls": "memory"},]} - - -def make_topic(kafka_prefix: str, object_type: str) -> str: - return kafka_prefix + "." + object_type - - -def test_storage_play( - kafka_prefix: str, kafka_consumer_group: str, kafka_server: str, caplog, -): - """Optimal replayer scenario. - - This: - - writes objects to the topic - - replayer consumes objects from the topic and replay them - - """ - kafka_prefix += ".swh.journal.objects" - - storage = get_storage(**storage_config) - - producer = Producer( - { - "bootstrap.servers": kafka_server, - "client.id": "test producer", - "acks": "all", - } - ) - - now = datetime.datetime.now(tz=datetime.timezone.utc) - - # Fill Kafka - nb_sent = 0 - nb_visits = 0 - for object_type, objects in TEST_OBJECT_DICTS.items(): - topic = make_topic(kafka_prefix, object_type) - for object_ in objects: - key = bytes(random.randint(0, 255) for _ in range(40)) - object_ = object_.copy() - if object_type == "content": - object_["ctime"] = now - elif object_type == "origin_visit": - nb_visits += 1 - object_["visit"] = nb_visits - producer.produce( - topic=topic, key=key_to_kafka(key), value=value_to_kafka(object_), - ) - nb_sent += 1 - - producer.flush() - - caplog.set_level(logging.ERROR, "swh.journal.replay") - # Fill the storage from Kafka - replayer = JournalClient( - brokers=kafka_server, - group_id=kafka_consumer_group, - prefix=kafka_prefix, - stop_after_objects=nb_sent, - ) - worker_fn = functools.partial(process_replay_objects, storage=storage) - nb_inserted = 0 - while nb_inserted < nb_sent: - nb_inserted += replayer.process(worker_fn) - assert nb_sent == nb_inserted - - # Check the objects were actually inserted in the storage - assert TEST_OBJECT_DICTS["revision"] == list( - storage.revision_get([rev["id"] for rev in TEST_OBJECT_DICTS["revision"]]) - ) - assert TEST_OBJECT_DICTS["release"] == list( - storage.release_get([rel["id"] for rel in TEST_OBJECT_DICTS["release"]]) - ) - - origins = list(storage.origin_get([orig for orig in TEST_OBJECT_DICTS["origin"]])) - assert TEST_OBJECT_DICTS["origin"] == [{"url": orig["url"]} for orig in origins] - for origin in origins: - origin_url = origin["url"] - expected_visits = [ - { - **visit, - "origin": origin_url, - "date": dateutil.parser.parse(visit["date"]), - } - for visit in TEST_OBJECT_DICTS["origin_visit"] - if visit["origin"] == origin["url"] - ] - actual_visits = list(storage.origin_visit_get(origin_url)) - for visit in actual_visits: - del visit["visit"] # opaque identifier - assert expected_visits == actual_visits - - input_contents = TEST_OBJECT_DICTS["content"] - contents = storage.content_get_metadata([cont["sha1"] for cont in input_contents]) - assert len(contents) == len(input_contents) - assert contents == {cont["sha1"]: [cont] for cont in input_contents} - - collision = 0 - for record in caplog.records: - logtext = record.getMessage() - if "Colliding contents:" in logtext: - collision += 1 - - assert collision == 0, "No collision should be detected" - - -def test_storage_play_with_collision( - kafka_prefix: str, kafka_consumer_group: str, kafka_server: str, caplog, -): - """Another replayer scenario with collisions. - - This: - - writes objects to the topic, including colliding contents - - replayer consumes objects from the topic and replay them - - This drops the colliding contents from the replay when detected - - """ - kafka_prefix += ".swh.journal.objects" - - storage = get_storage(**storage_config) - - producer = Producer( - { - "bootstrap.servers": kafka_server, - "client.id": "test producer", - "enable.idempotence": "true", - } - ) - - now = datetime.datetime.now(tz=datetime.timezone.utc) - - # Fill Kafka - nb_sent = 0 - nb_visits = 0 - for object_type, objects in TEST_OBJECT_DICTS.items(): - topic = make_topic(kafka_prefix, object_type) - for object_ in objects: - key = bytes(random.randint(0, 255) for _ in range(40)) - object_ = object_.copy() - if object_type == "content": - object_["ctime"] = now - elif object_type == "origin_visit": - nb_visits += 1 - object_["visit"] = nb_visits - producer.produce( - topic=topic, key=key_to_kafka(key), value=value_to_kafka(object_), - ) - nb_sent += 1 - - # Create collision in input data - # They are not written in the destination - for content in DUPLICATE_CONTENTS: - topic = make_topic(kafka_prefix, "content") - producer.produce( - topic=topic, key=key_to_kafka(key), value=value_to_kafka(content), - ) - - nb_sent += 1 - - producer.flush() - - caplog.set_level(logging.ERROR, "swh.journal.replay") - # Fill the storage from Kafka - replayer = JournalClient( - brokers=kafka_server, - group_id=kafka_consumer_group, - prefix=kafka_prefix, - stop_after_objects=nb_sent, - ) - worker_fn = functools.partial(process_replay_objects, storage=storage) - nb_inserted = 0 - while nb_inserted < nb_sent: - nb_inserted += replayer.process(worker_fn) - assert nb_sent == nb_inserted - - # Check the objects were actually inserted in the storage - assert TEST_OBJECT_DICTS["revision"] == list( - storage.revision_get([rev["id"] for rev in TEST_OBJECT_DICTS["revision"]]) - ) - assert TEST_OBJECT_DICTS["release"] == list( - storage.release_get([rel["id"] for rel in TEST_OBJECT_DICTS["release"]]) - ) - - origins = list(storage.origin_get([orig for orig in TEST_OBJECT_DICTS["origin"]])) - assert TEST_OBJECT_DICTS["origin"] == [{"url": orig["url"]} for orig in origins] - for origin in origins: - origin_url = origin["url"] - expected_visits = [ - { - **visit, - "origin": origin_url, - "date": dateutil.parser.parse(visit["date"]), - } - for visit in TEST_OBJECT_DICTS["origin_visit"] - if visit["origin"] == origin["url"] - ] - actual_visits = list(storage.origin_visit_get(origin_url)) - for visit in actual_visits: - del visit["visit"] # opaque identifier - assert expected_visits == actual_visits - - input_contents = TEST_OBJECT_DICTS["content"] - contents = storage.content_get_metadata([cont["sha1"] for cont in input_contents]) - assert len(contents) == len(input_contents) - assert contents == {cont["sha1"]: [cont] for cont in input_contents} - - nb_collisions = 0 - - actual_collision: Dict - for record in caplog.records: - logtext = record.getMessage() - if "Collision detected:" in logtext: - nb_collisions += 1 - actual_collision = record.args["collision"] - - assert nb_collisions == 1, "1 collision should be detected" - - algo = "sha1" - assert actual_collision["algo"] == algo - expected_colliding_hash = hash_to_hex(DUPLICATE_CONTENTS[0][algo]) - assert actual_collision["hash"] == expected_colliding_hash - - actual_colliding_hashes = actual_collision["objects"] - assert len(actual_colliding_hashes) == len(DUPLICATE_CONTENTS) - for content in DUPLICATE_CONTENTS: - expected_content_hashes = { - k: hash_to_hex(v) for k, v in Content.from_dict(content).hashes().items() - } - assert expected_content_hashes in actual_colliding_hashes - - -def _test_write_replay_origin_visit(visits: List[Dict]): - """Helper function to write tests for origin_visit. - - Each visit (a dict) given in the 'visits' argument will be sent to - a (mocked) kafka queue, which a in-memory-storage backed replayer is - listening to. - - Check that corresponding origin visits entities are present in the storage - and have correct values if they are not skipped. - - """ - queue: List = [] - replayer = MockedJournalClient(queue) - writer = MockedKafkaWriter(queue) - - # Note that flipping the order of these two insertions will crash - # the test, because the legacy origin_format does not allow to create - # the origin when needed (type is missing) - writer.send( - "origin", - "foo", - { - "url": "http://example.com/", - "type": "git", # test the legacy origin format is accepted - }, - ) - for visit in visits: - writer.send("origin_visit", "foo", visit) - - queue_size = len(queue) - assert replayer.stop_after_objects is None - replayer.stop_after_objects = queue_size - - storage = get_storage(**storage_config) - worker_fn = functools.partial(process_replay_objects, storage=storage) - - replayer.process(worker_fn) - - actual_visits = list(storage.origin_visit_get("http://example.com/")) - - assert len(actual_visits) == len(visits), actual_visits - - for vin, vout in zip(visits, actual_visits): - vin = vin.copy() - vout = vout.copy() - assert vout.pop("origin") == "http://example.com/" - vin.pop("origin") - vin.setdefault("type", "git") - vin.setdefault("metadata", None) - assert vin == vout - - -def test_write_replay_origin_visit(): - """Test origin_visit when the 'origin' is just a string.""" - now = datetime.datetime.now() - visits = [ - { - "visit": 1, - "origin": "http://example.com/", - "date": now, - "type": "git", - "status": "partial", - "snapshot": None, - } - ] - _test_write_replay_origin_visit(visits) - - -def test_write_replay_legacy_origin_visit1(): - """Origin_visit with no types should make the replayer crash - - We expect the journal's origin_visit topic to no longer reference such - visits. If it does, the replayer must crash so we can fix the journal's - topic. - - """ - now = datetime.datetime.now() - visit = { - "visit": 1, - "origin": "http://example.com/", - "date": now, - "status": "partial", - "snapshot": None, - } - now2 = datetime.datetime.now() - visit2 = { - "visit": 2, - "origin": {"url": "http://example.com/"}, - "date": now2, - "status": "partial", - "snapshot": None, - } - - for origin_visit in [visit, visit2]: - with pytest.raises(ValueError, match="Old origin visit format"): - _test_write_replay_origin_visit([origin_visit]) - - -def test_write_replay_legacy_origin_visit2(): - """Test origin_visit when 'type' is missing from the visit, but not - from the origin.""" - now = datetime.datetime.now() - visits = [ - { - "visit": 1, - "origin": {"url": "http://example.com/", "type": "git",}, - "date": now, - "type": "git", - "status": "partial", - "snapshot": None, - } - ] - _test_write_replay_origin_visit(visits) - - -def test_write_replay_legacy_origin_visit3(): - """Test origin_visit when the origin is a dict""" - now = datetime.datetime.now() - visits = [ - { - "visit": 1, - "origin": {"url": "http://example.com/",}, - "date": now, - "type": "git", - "status": "partial", - "snapshot": None, - } - ] - _test_write_replay_origin_visit(visits) - - -hash_strategy = strategies.binary(min_size=20, max_size=20) - - -@settings(max_examples=500) -@given( - strategies.sets(hash_strategy, min_size=0, max_size=500), - strategies.sets(hash_strategy, min_size=10), -) -def test_is_hash_in_bytearray(haystack, needles): - array = b"".join(sorted(haystack)) - needles |= haystack # Exhaustively test for all objects in the array - for needle in needles: - assert is_hash_in_bytearray(needle, array, len(haystack)) == ( - needle in haystack - ) diff --git a/swh/journal/tests/test_serializers.py b/swh/journal/tests/test_serializers.py index e816c40..1a1fdd9 100644 --- a/swh/journal/tests/test_serializers.py +++ b/swh/journal/tests/test_serializers.py @@ -1,53 +1,77 @@ -# Copyright (C) 2017 The Software Heritage developers +# Copyright (C) 2017-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information -from collections import OrderedDict import itertools +from collections import OrderedDict +from typing import Iterable + from swh.journal import serializers from .conftest import TEST_OBJECTS def test_key_to_kafka_repeatable(): """Check the kafka key encoding is repeatable""" base_dict = { "a": "foo", "b": "bar", "c": "baz", } key = serializers.key_to_kafka(base_dict) for dict_keys in itertools.permutations(base_dict): d = OrderedDict() for k in dict_keys: d[k] = base_dict[k] assert key == serializers.key_to_kafka(d) def test_get_key(): """Test whether get_key works on all our objects""" for object_type, objects in TEST_OBJECTS.items(): for obj in objects: assert serializers.object_key(object_type, obj) is not None def test_pprint_key(): """Test whether get_key works on all our objects""" for object_type, objects in TEST_OBJECTS.items(): for obj in objects: key = serializers.object_key(object_type, obj) pprinted_key = serializers.pprint_key(key) assert isinstance(pprinted_key, str) if isinstance(key, dict): assert pprinted_key[0], pprinted_key[-1] == "{}" for dict_key in key.keys(): assert f"{dict_key}:" in pprinted_key if isinstance(key, bytes): assert pprinted_key == key.hex() + + +def test_kafka_to_key(): + """Standard back and forth serialization with keys + + """ + # All KeyType(s) + keys: Iterable[serializers.KeyType] = [ + {"a": "foo", "b": "bar", "c": "baz",}, + {"a": b"foobarbaz",}, + b"foo", + ] + for object_type, objects in TEST_OBJECTS.items(): + for obj in objects: + key = serializers.object_key(object_type, obj) + keys.append(key) + + for key in keys: + ktk = serializers.key_to_kafka(key) + v = serializers.kafka_to_key(ktk) + + assert v == key diff --git a/swh/journal/tests/test_write_replay.py b/swh/journal/tests/test_write_replay.py deleted file mode 100644 index 3c4ffe6..0000000 --- a/swh/journal/tests/test_write_replay.py +++ /dev/null @@ -1,165 +0,0 @@ -# Copyright (C) 2019-2020 The Software Heritage developers -# See the AUTHORS file at the top-level directory of this distribution -# License: GNU General Public License version 3, or any later version -# See top-level LICENSE file for more information - -import functools -from unittest.mock import patch - -import attr -from hypothesis import given, settings, HealthCheck -from hypothesis.strategies import lists - -from swh.model.hypothesis_strategies import present_contents -from swh.model.model import Origin -from swh.storage import get_storage -from swh.storage.exc import HashCollision - -from swh.journal.replay import ( - process_replay_objects, - process_replay_objects_content, - object_converter_fn, -) - -from .utils import MockedJournalClient, MockedKafkaWriter -from .conftest import objects_d - -storage_config = { - "cls": "memory", - "journal_writer": {"cls": "memory"}, -} - - -def empty_person_name_email(rev_or_rel): - """Empties the 'name' and 'email' fields of the author/committer fields - of a revision or release; leaving only the fullname.""" - if getattr(rev_or_rel, "author", None): - rev_or_rel = attr.evolve( - rev_or_rel, author=attr.evolve(rev_or_rel.author, name=b"", email=b"",) - ) - - if getattr(rev_or_rel, "committer", None): - rev_or_rel = attr.evolve( - rev_or_rel, - committer=attr.evolve(rev_or_rel.committer, name=b"", email=b"",), - ) - - return rev_or_rel - - -@given(lists(objects_d(), min_size=1)) -@settings(suppress_health_check=[HealthCheck.too_slow]) -def test_write_replay_same_order_batches(objects): - queue = [] - replayer = MockedJournalClient(queue) - - with patch( - "swh.journal.writer.inmemory.InMemoryJournalWriter", - return_value=MockedKafkaWriter(queue), - ): - storage1 = get_storage(**storage_config) - - # Write objects to storage1 - for (obj_type, obj) in objects: - if obj_type == "content" and obj.get("status") == "absent": - obj_type = "skipped_content" - - obj = object_converter_fn[obj_type](obj) - - if obj_type == "origin_visit": - storage1.origin_add_one(Origin(url=obj.origin)) - storage1.origin_visit_upsert([obj]) - else: - method = getattr(storage1, obj_type + "_add") - try: - method([obj]) - except HashCollision: - pass - - # Bail out early if we didn't insert any relevant objects... - queue_size = len(queue) - assert queue_size != 0, "No test objects found; hypothesis strategy bug?" - - assert replayer.stop_after_objects is None - replayer.stop_after_objects = queue_size - - storage2 = get_storage(**storage_config) - worker_fn = functools.partial(process_replay_objects, storage=storage2) - - replayer.process(worker_fn) - - assert replayer.consumer.committed - - for attr_name in ( - "_contents", - "_directories", - "_snapshots", - "_origin_visits", - "_origins", - ): - assert getattr(storage1, attr_name) == getattr(storage2, attr_name), attr_name - - # When hypothesis generates a revision and a release with same - # author (or committer) fullname but different name or email, then - # the storage will use the first name/email it sees. - # This first one will be either the one from the revision or the release, - # and since there is no order guarantees, storage2 has 1/2 chance of - # not seeing the same order as storage1, therefore we need to strip - # them out before comparing. - for attr_name in ("_revisions", "_releases"): - items1 = { - k: empty_person_name_email(v) - for (k, v) in getattr(storage1, attr_name).items() - } - items2 = { - k: empty_person_name_email(v) - for (k, v) in getattr(storage2, attr_name).items() - } - assert items1 == items2, attr_name - - -# TODO: add test for hash collision - - -@given(lists(present_contents(), min_size=1)) -@settings(suppress_health_check=[HealthCheck.too_slow]) -def test_write_replay_content(objects): - - queue = [] - replayer = MockedJournalClient(queue) - - with patch( - "swh.journal.writer.inmemory.InMemoryJournalWriter", - return_value=MockedKafkaWriter(queue), - ): - storage1 = get_storage(**storage_config) - - contents = [] - for obj in objects: - storage1.content_add([obj]) - contents.append(obj) - - # Bail out early if we didn't insert any relevant objects... - queue_size = len(queue) - assert queue_size != 0, "No test objects found; hypothesis strategy bug?" - - assert replayer.stop_after_objects is None - replayer.stop_after_objects = queue_size - - storage2 = get_storage(**storage_config) - - objstorage1 = storage1.objstorage.objstorage - objstorage2 = storage2.objstorage.objstorage - - worker_fn = functools.partial( - process_replay_objects_content, src=objstorage1, dst=objstorage2 - ) - - replayer.process(worker_fn) - - # only content with status visible will be copied in storage2 - expected_objstorage_state = { - c.sha1: c.data for c in contents if c.status == "visible" - } - - assert expected_objstorage_state == objstorage2.state diff --git a/version.txt b/version.txt index d290898..7ee69ae 100644 --- a/version.txt +++ b/version.txt @@ -1 +1 @@ -v0.0.31-0-gdc96335 \ No newline at end of file +v0.0.32-0-gfa9ab16 \ No newline at end of file