diff --git a/swh/storage/backfill.py b/swh/storage/backfill.py index 38f4a2b4..423d289c 100644 --- a/swh/storage/backfill.py +++ b/swh/storage/backfill.py @@ -1,632 +1,632 @@ # Copyright (C) 2017-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information """Storage backfiller. The backfiller goal is to produce back part or all of the objects from a storage to the journal topics Current implementation consists in the JournalBackfiller class. It simply reads the objects from the storage and sends every object identifier back to the journal. """ import logging from typing import Any, Callable, Dict, Optional from swh.core.db import BaseDb from swh.model.identifiers import ExtendedObjectType from swh.model.model import ( BaseModel, Directory, DirectoryEntry, RawExtrinsicMetadata, Release, Revision, Snapshot, SnapshotBranch, TargetType, ) from swh.storage.postgresql.converters import ( db_to_raw_extrinsic_metadata, db_to_release, db_to_revision, ) from swh.storage.replay import object_converter_fn from swh.storage.writer import JournalWriter logger = logging.getLogger(__name__) PARTITION_KEY = { "content": "sha1", "skipped_content": "sha1", "directory": "id", "metadata_authority": "type, url", "metadata_fetcher": "name, version", "raw_extrinsic_metadata": "target", "revision": "revision.id", "release": "release.id", "snapshot": "id", "origin": "id", "origin_visit": "origin_visit.origin", "origin_visit_status": "origin_visit_status.origin", } COLUMNS = { "content": [ "sha1", "sha1_git", "sha256", "blake2s256", "length", "status", "ctime", ], "skipped_content": [ "sha1", "sha1_git", "sha256", "blake2s256", "length", "ctime", "status", "reason", ], "directory": ["id", "dir_entries", "file_entries", "rev_entries"], "metadata_authority": ["type", "url", "metadata",], "metadata_fetcher": ["name", "version", "metadata",], "raw_extrinsic_metadata": [ "raw_extrinsic_metadata.type", "raw_extrinsic_metadata.target", "metadata_authority.type", "metadata_authority.url", "metadata_fetcher.name", "metadata_fetcher.version", "discovery_date", "format", "raw_extrinsic_metadata.metadata", "origin", "visit", "snapshot", "release", "revision", "path", "directory", ], "revision": [ ("revision.id", "id"), "date", "date_offset", "date_neg_utc_offset", "committer_date", "committer_date_offset", "committer_date_neg_utc_offset", "type", "directory", "message", "synthetic", "metadata", "extra_headers", ( "array(select parent_id::bytea from revision_history rh " "where rh.id = revision.id order by rh.parent_rank asc)", "parents", ), ("a.id", "author_id"), ("a.name", "author_name"), ("a.email", "author_email"), ("a.fullname", "author_fullname"), ("c.id", "committer_id"), ("c.name", "committer_name"), ("c.email", "committer_email"), ("c.fullname", "committer_fullname"), ], "release": [ ("release.id", "id"), "date", "date_offset", "date_neg_utc_offset", "comment", ("release.name", "name"), "synthetic", "target", "target_type", ("a.id", "author_id"), ("a.name", "author_name"), ("a.email", "author_email"), ("a.fullname", "author_fullname"), ], "snapshot": ["id", "object_id"], "origin": ["url"], "origin_visit": ["visit", "type", ("origin.url", "origin"), "date",], "origin_visit_status": [ ("origin_visit_status.visit", "visit"), ("origin.url", "origin"), ("origin_visit_status.date", "date"), "type", "snapshot", "status", "metadata", ], } JOINS = { "release": ["person a on release.author=a.id"], "revision": [ "person a on revision.author=a.id", "person c on revision.committer=c.id", ], "origin_visit": ["origin on origin_visit.origin=origin.id"], "origin_visit_status": ["origin on origin_visit_status.origin=origin.id",], "raw_extrinsic_metadata": [ "metadata_authority on " "raw_extrinsic_metadata.authority_id=metadata_authority.id", "metadata_fetcher on raw_extrinsic_metadata.fetcher_id=metadata_fetcher.id", ], } def directory_converter(db: BaseDb, directory_d: Dict[str, Any]) -> Directory: """Convert directory from the flat representation to swh model compatible objects. """ columns = ["target", "name", "perms"] query_template = """ select %(columns)s from directory_entry_%(type)s where id in %%s """ types = ["file", "dir", "rev"] entries = [] with db.cursor() as cur: for type in types: ids = directory_d.pop("%s_entries" % type) if not ids: continue query = query_template % { "columns": ",".join(columns), "type": type, } cur.execute(query, (tuple(ids),)) for row in cur: entry_d = dict(zip(columns, row)) entry = DirectoryEntry( name=entry_d["name"], type=type, target=entry_d["target"], perms=entry_d["perms"], ) entries.append(entry) return Directory(id=directory_d["id"], entries=tuple(entries),) def raw_extrinsic_metadata_converter( db: BaseDb, metadata: Dict[str, Any] ) -> RawExtrinsicMetadata: """Convert revision from the flat representation to swh model compatible objects. """ return db_to_raw_extrinsic_metadata(metadata) def revision_converter(db: BaseDb, revision_d: Dict[str, Any]) -> Revision: """Convert revision from the flat representation to swh model compatible objects. """ revision = db_to_revision(revision_d) assert revision is not None, revision_d["id"] return revision def release_converter(db: BaseDb, release_d: Dict[str, Any]) -> Release: """Convert release from the flat representation to swh model compatible objects. """ release = db_to_release(release_d) assert release is not None, release_d["id"] return release def snapshot_converter(db: BaseDb, snapshot_d: Dict[str, Any]) -> Snapshot: """Convert snapshot from the flat representation to swh model compatible objects. """ columns = ["name", "target", "target_type"] query = """ select %s from snapshot_branches sbs inner join snapshot_branch sb on sb.object_id=sbs.branch_id where sbs.snapshot_id=%%s """ % ", ".join( columns ) with db.cursor() as cur: cur.execute(query, (snapshot_d["object_id"],)) branches = {} for name, *row in cur: branch_d = dict(zip(columns[1:], row)) if branch_d["target"] is not None and branch_d["target_type"] is not None: branch: Optional[SnapshotBranch] = SnapshotBranch( target=branch_d["target"], target_type=TargetType(branch_d["target_type"]), ) else: branch = None branches[name] = branch return Snapshot(id=snapshot_d["id"], branches=branches,) CONVERTERS: Dict[str, Callable[[BaseDb, Dict[str, Any]], BaseModel]] = { "directory": directory_converter, "raw_extrinsic_metadata": raw_extrinsic_metadata_converter, "revision": revision_converter, "release": release_converter, "snapshot": snapshot_converter, } def object_to_offset(object_id, numbits): """Compute the index of the range containing object id, when dividing space into 2^numbits. Args: object_id (str): The hex representation of object_id numbits (int): Number of bits in which we divide input space Returns: The index of the range containing object id """ q, r = divmod(numbits, 8) length = q + (r != 0) shift_bits = 8 - r if r else 0 truncated_id = object_id[: length * 2] if len(truncated_id) < length * 2: truncated_id += "0" * (length * 2 - len(truncated_id)) truncated_id_bytes = bytes.fromhex(truncated_id) return int.from_bytes(truncated_id_bytes, byteorder="big") >> shift_bits def byte_ranges(numbits, start_object=None, end_object=None): """Generate start/end pairs of bytes spanning numbits bits and constrained by optional start_object and end_object. Args: numbits (int): Number of bits in which we divide input space start_object (str): Hex object id contained in the first range returned end_object (str): Hex object id contained in the last range returned Yields: 2^numbits pairs of bytes """ q, r = divmod(numbits, 8) length = q + (r != 0) shift_bits = 8 - r if r else 0 def to_bytes(i): return int.to_bytes(i << shift_bits, length=length, byteorder="big") start_offset = 0 end_offset = 1 << numbits if start_object is not None: start_offset = object_to_offset(start_object, numbits) if end_object is not None: end_offset = object_to_offset(end_object, numbits) + 1 for start in range(start_offset, end_offset): end = start + 1 if start == 0: yield None, to_bytes(end) elif end == 1 << numbits: yield to_bytes(start), None else: yield to_bytes(start), to_bytes(end) def raw_extrinsic_metadata_target_ranges(start_object=None, end_object=None): """Generate ranges of values for the `target` attribute of `raw_extrinsic_metadata` objects. This generates one range for all values before the first SWHID (which would correspond to raw origin URLs), then a number of hex-based ranges for each known type of SWHID (2**12 ranges for directories, 2**8 ranges for all other types). Finally, it generates one extra range for values above all possible SWHIDs. """ if start_object is None: start_object = "" swhid_target_types = sorted(type.value for type in ExtendedObjectType) first_swhid = f"swh:1:{swhid_target_types[0]}:" # Generate a range for url targets, if the starting object is before SWHIDs if start_object < first_swhid: yield start_object, ( first_swhid if end_object is None or end_object >= first_swhid else end_object ) if end_object is not None and end_object <= first_swhid: return # Prime the following loop, which uses the upper bound of the previous range # as lower bound, to account for potential targets between two valid types # of SWHIDs (even though they would eventually be rejected by the # RawExtrinsicMetadata parser, they /might/ exist...) end_swhid = first_swhid # Generate ranges for swhid targets for target_type in swhid_target_types: finished = False base_swhid = f"swh:1:{target_type}:" last_swhid = base_swhid + ("f" * 40) if start_object > last_swhid: continue # Generate 2**8 or 2**12 ranges for _, end in byte_ranges(12 if target_type == "dir" else 8): # Reuse previous uppper bound start_swhid = end_swhid # Use last_swhid for this object type if on the last byte range end_swhid = (base_swhid + end.hex()) if end is not None else last_swhid # Ignore out of bounds ranges if start_object >= end_swhid: continue # Potentially clamp start of range to the first object requested start_swhid = max(start_swhid, start_object) # Handle ending the loop early if the last requested object id is in # the current range if end_object is not None and end_swhid >= end_object: end_swhid = end_object finished = True yield start_swhid, end_swhid if finished: return # Generate one final range for potential raw origin URLs after the last # valid SWHID start_swhid = max(start_object, end_swhid) yield start_swhid, end_object def integer_ranges(start, end, block_size=1000): for start in range(start, end, block_size): if start == 0: yield None, block_size elif start + block_size > end: yield start, end else: yield start, start + block_size RANGE_GENERATORS = { "content": lambda start, end: byte_ranges(24, start, end), "skipped_content": lambda start, end: [(None, None)], "directory": lambda start, end: byte_ranges(24, start, end), "revision": lambda start, end: byte_ranges(24, start, end), "release": lambda start, end: byte_ranges(16, start, end), "raw_extrinsic_metadata": raw_extrinsic_metadata_target_ranges, "snapshot": lambda start, end: byte_ranges(16, start, end), "origin": integer_ranges, "origin_visit": integer_ranges, "origin_visit_status": integer_ranges, } def compute_query(obj_type, start, end): columns = COLUMNS.get(obj_type) join_specs = JOINS.get(obj_type, []) join_clause = "\n".join("left join %s" % clause for clause in join_specs) where = [] where_args = [] if start: where.append("%(keys)s >= %%s") where_args.append(start) if end: where.append("%(keys)s < %%s") where_args.append(end) where_clause = "" if where: where_clause = ("where " + " and ".join(where)) % { "keys": "(%s)" % PARTITION_KEY[obj_type] } column_specs = [] column_aliases = [] for column in columns: if isinstance(column, str): column_specs.append(column) column_aliases.append(column) else: column_specs.append("%s as %s" % column) column_aliases.append(column[1]) query = """ select %(columns)s from %(table)s %(join)s %(where)s """ % { "columns": ",".join(column_specs), "table": obj_type, "join": join_clause, "where": where_clause, } return query, where_args, column_aliases def fetch(db, obj_type, start, end): """Fetch all obj_type's identifiers from db. This opens one connection, stream objects and when done, close the connection. Args: db (BaseDb): Db connection object obj_type (str): Object type start (Union[bytes|Tuple]): Range start identifier end (Union[bytes|Tuple]): Range end identifier Raises: ValueError if obj_type is not supported Yields: Objects in the given range """ query, where_args, column_aliases = compute_query(obj_type, start, end) converter = CONVERTERS.get(obj_type) with db.cursor() as cursor: logger.debug("Fetching data for table %s", obj_type) logger.debug("query: %s %s", query, where_args) cursor.execute(query, where_args) for row in cursor: record = dict(zip(column_aliases, row)) if converter: record = converter(db, record) else: record = object_converter_fn[obj_type](record) logger.debug("record: %s", record) yield record def _format_range_bound(bound): if isinstance(bound, bytes): return bound.hex() else: return str(bound) MANDATORY_KEYS = ["storage", "journal_writer"] class JournalBackfiller: """Class in charge of reading the storage's objects and sends those back to the journal's topics. This is designed to be run periodically. """ def __init__(self, config=None): self.config = config self.check_config(config) def check_config(self, config): missing_keys = [] for key in MANDATORY_KEYS: if not config.get(key): missing_keys.append(key) if missing_keys: raise ValueError( "Configuration error: The following keys must be" " provided: %s" % (",".join(missing_keys),) ) if "cls" not in config["storage"] or config["storage"]["cls"] != "local": raise ValueError( "swh storage backfiller must be configured to use a local" " (PostgreSQL) storage" ) def parse_arguments(self, object_type, start_object, end_object): """Parse arguments Raises: ValueError for unsupported object type ValueError if object ids are not parseable Returns: Parsed start and end object ids """ if object_type not in COLUMNS: raise ValueError( "Object type %s is not supported. " "The only possible values are %s" - % (object_type, ", ".join(COLUMNS.keys())) + % (object_type, ", ".join(sorted(COLUMNS.keys()))) ) if object_type in ["origin", "origin_visit", "origin_visit_status"]: if start_object: start_object = int(start_object) else: start_object = 0 if end_object: end_object = int(end_object) else: end_object = 100 * 1000 * 1000 # hard-coded limit return start_object, end_object def run(self, object_type, start_object, end_object, dry_run=False): """Reads storage's subscribed object types and send them to the journal's reading topic. """ start_object, end_object = self.parse_arguments( object_type, start_object, end_object ) db = BaseDb.connect(self.config["storage"]["db"]) writer = JournalWriter({"cls": "kafka", **self.config["journal_writer"]}) assert writer.journal is not None for range_start, range_end in RANGE_GENERATORS[object_type]( start_object, end_object ): logger.info( "Processing %s range %s to %s", object_type, _format_range_bound(range_start), _format_range_bound(range_end), ) objects = fetch(db, object_type, start=range_start, end=range_end) if not dry_run: writer.write_additions(object_type, objects) else: # only consume the objects iterator to check for any potential # decoding/encoding errors for obj in objects: pass if __name__ == "__main__": print('Please use the "swh-journal backfiller run" command') diff --git a/swh/storage/tests/test_backfill.py b/swh/storage/tests/test_backfill.py index fc915876..e64ef680 100644 --- a/swh/storage/tests/test_backfill.py +++ b/swh/storage/tests/test_backfill.py @@ -1,294 +1,294 @@ # Copyright (C) 2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import functools import logging from unittest.mock import patch import pytest from swh.journal.client import JournalClient from swh.journal.tests.journal_data import TEST_OBJECTS from swh.storage import get_storage from swh.storage.backfill import ( PARTITION_KEY, JournalBackfiller, byte_ranges, compute_query, raw_extrinsic_metadata_target_ranges, ) from swh.storage.replay import process_replay_objects from swh.storage.tests.test_replay import check_replayed TEST_CONFIG = { "journal_writer": { "brokers": ["localhost"], "prefix": "swh.tmp_journal.new", "client_id": "swh.journal.client.test", }, "storage": {"cls": "local", "db": "service=swh-dev"}, } def test_config_ko_missing_mandatory_key(): """Missing configuration key will make the initialization fail """ for key in TEST_CONFIG.keys(): config = TEST_CONFIG.copy() config.pop(key) with pytest.raises(ValueError) as e: JournalBackfiller(config) error = "Configuration error: The following keys must be provided: %s" % ( ",".join([key]), ) assert e.value.args[0] == error def test_config_ko_unknown_object_type(): """Parse arguments will fail if the object type is unknown """ backfiller = JournalBackfiller(TEST_CONFIG) with pytest.raises(ValueError) as e: backfiller.parse_arguments("unknown-object-type", 1, 2) error = ( "Object type unknown-object-type is not supported. " - "The only possible values are %s" % (", ".join(PARTITION_KEY)) + "The only possible values are %s" % (", ".join(sorted(PARTITION_KEY))) ) assert e.value.args[0] == error def test_compute_query_content(): query, where_args, column_aliases = compute_query("content", "\x000000", "\x000001") assert where_args == ["\x000000", "\x000001"] assert column_aliases == [ "sha1", "sha1_git", "sha256", "blake2s256", "length", "status", "ctime", ] assert ( query == """ select sha1,sha1_git,sha256,blake2s256,length,status,ctime from content where (sha1) >= %s and (sha1) < %s """ ) def test_compute_query_skipped_content(): query, where_args, column_aliases = compute_query("skipped_content", None, None) assert where_args == [] assert column_aliases == [ "sha1", "sha1_git", "sha256", "blake2s256", "length", "ctime", "status", "reason", ] assert ( query == """ select sha1,sha1_git,sha256,blake2s256,length,ctime,status,reason from skipped_content """ ) def test_compute_query_origin_visit(): query, where_args, column_aliases = compute_query("origin_visit", 1, 10) assert where_args == [1, 10] assert column_aliases == [ "visit", "type", "origin", "date", ] assert ( query == """ select visit,type,origin.url as origin,date from origin_visit left join origin on origin_visit.origin=origin.id where (origin_visit.origin) >= %s and (origin_visit.origin) < %s """ ) def test_compute_query_release(): query, where_args, column_aliases = compute_query("release", "\x000002", "\x000003") assert where_args == ["\x000002", "\x000003"] assert column_aliases == [ "id", "date", "date_offset", "date_neg_utc_offset", "comment", "name", "synthetic", "target", "target_type", "author_id", "author_name", "author_email", "author_fullname", ] assert ( query == """ select release.id as id,date,date_offset,date_neg_utc_offset,comment,release.name as name,synthetic,target,target_type,a.id as author_id,a.name as author_name,a.email as author_email,a.fullname as author_fullname from release left join person a on release.author=a.id where (release.id) >= %s and (release.id) < %s """ # noqa ) @pytest.mark.parametrize("numbits", [2, 3, 8, 16]) def test_byte_ranges(numbits): ranges = list(byte_ranges(numbits)) assert len(ranges) == 2 ** numbits assert ranges[0][0] is None assert ranges[-1][1] is None bounds = [] for i, (left, right) in enumerate(zip(ranges[:-1], ranges[1:])): assert left[1] == right[0], f"Mismatched bounds in {i}th range" bounds.append(left[1]) assert bounds == sorted(bounds) def test_raw_extrinsic_metadata_target_ranges(): ranges = list(raw_extrinsic_metadata_target_ranges()) assert ranges[0][0] == "" assert ranges[-1][1] is None bounds = [] for i, (left, right) in enumerate(zip(ranges[:-1], ranges[1:])): assert left[1] == right[0], f"Mismatched bounds in {i}th range" bounds.append(left[1]) assert bounds == sorted(bounds) RANGE_GENERATORS = { "content": lambda start, end: [(None, None)], "skipped_content": lambda start, end: [(None, None)], "directory": lambda start, end: [(None, None)], "metadata_authority": lambda start, end: [(None, None)], "metadata_fetcher": lambda start, end: [(None, None)], "revision": lambda start, end: [(None, None)], "release": lambda start, end: [(None, None)], "snapshot": lambda start, end: [(None, None)], "origin": lambda start, end: [(None, 10000)], "origin_visit": lambda start, end: [(None, 10000)], "origin_visit_status": lambda start, end: [(None, 10000)], "raw_extrinsic_metadata": lambda start, end: [(None, None)], } @patch("swh.storage.backfill.RANGE_GENERATORS", RANGE_GENERATORS) def test_backfiller( swh_storage_backend_config, kafka_prefix: str, kafka_consumer_group: str, kafka_server: str, caplog, ): prefix1 = f"{kafka_prefix}-1" prefix2 = f"{kafka_prefix}-2" journal1 = { "cls": "kafka", "brokers": [kafka_server], "client_id": "kafka_writer-1", "prefix": prefix1, } swh_storage_backend_config["journal_writer"] = journal1 storage = get_storage(**swh_storage_backend_config) # fill the storage and the journal (under prefix1) for object_type, objects in TEST_OBJECTS.items(): method = getattr(storage, object_type + "_add") method(objects) # now apply the backfiller on the storage to fill the journal under prefix2 backfiller_config = { "journal_writer": { "brokers": [kafka_server], "client_id": "kafka_writer-2", "prefix": prefix2, }, "storage": swh_storage_backend_config, } # Backfilling backfiller = JournalBackfiller(backfiller_config) for object_type in TEST_OBJECTS: backfiller.run(object_type, None, None) # Trace log messages for unhandled object types in the replayer caplog.set_level(logging.DEBUG, "swh.storage.replay") # now check journal content are the same under both topics # use the replayer scaffolding to fill storages to make is a bit easier # Replaying #1 sto1 = get_storage(cls="memory") replayer1 = JournalClient( brokers=kafka_server, group_id=f"{kafka_consumer_group}-1", prefix=prefix1, stop_on_eof=True, ) worker_fn1 = functools.partial(process_replay_objects, storage=sto1) replayer1.process(worker_fn1) # Replaying #2 sto2 = get_storage(cls="memory") replayer2 = JournalClient( brokers=kafka_server, group_id=f"{kafka_consumer_group}-2", prefix=prefix2, stop_on_eof=True, ) worker_fn2 = functools.partial(process_replay_objects, storage=sto2) replayer2.process(worker_fn2) # Compare storages check_replayed(sto1, sto2) for record in caplog.records: assert ( "this should not happen" not in record.message ), "Replayer ignored some message types, see captured logging"