diff --git a/swh/storage/backfill.py b/swh/storage/backfill.py --- a/swh/storage/backfill.py +++ b/swh/storage/backfill.py @@ -16,10 +16,21 @@ """ import logging - +from typing import Any, Callable, Dict from swh.core.db import BaseDb from swh.journal.writer.kafka import KafkaJournalWriter +from swh.model.model import ( + BaseModel, + Directory, + DirectoryEntry, + RawExtrinsicMetadata, + Release, + Revision, + Snapshot, + SnapshotBranch, + TargetType, +) from swh.storage.converters import ( db_to_raw_extrinsic_metadata, db_to_release, @@ -158,7 +169,7 @@ } -def directory_converter(db, directory): +def directory_converter(db: BaseDb, directory_d: Dict[str, Any]) -> Directory: """Convert directory from the flat representation to swh model compatible objects. @@ -175,7 +186,7 @@ entries = [] with db.cursor() as cur: for type in types: - ids = directory.pop("%s_entries" % type) + ids = directory_d.pop("%s_entries" % type) if not ids: continue query = query_template % { @@ -184,39 +195,49 @@ } cur.execute(query, (tuple(ids),)) for row in cur: - entry = dict(zip(columns, row)) - entry["type"] = type + entry_d = dict(zip(columns, row)) + entry = DirectoryEntry( + name=entry_d["name"], + type=type, + target=entry_d["target"], + perms=entry_d["perms"], + ) entries.append(entry) - directory["entries"] = entries - return directory + return Directory(id=directory_d["id"], entries=tuple(entries),) -def raw_extrinsic_metadata_converter(db, metadata): +def raw_extrinsic_metadata_converter( + db: BaseDb, metadata: Dict[str, Any] +) -> RawExtrinsicMetadata: """Convert revision from the flat representation to swh model compatible objects. """ - return db_to_raw_extrinsic_metadata(metadata).to_dict() + return db_to_raw_extrinsic_metadata(metadata) -def revision_converter(db, revision): +def revision_converter(db: BaseDb, revision_d: Dict[str, Any]) -> Revision: """Convert revision from the flat representation to swh model compatible objects. """ - return db_to_revision(revision).to_dict() + revision = db_to_revision(revision_d) + assert revision is not None, revision_d["id"] + return revision -def release_converter(db, release): +def release_converter(db: BaseDb, release_d: Dict[str, Any]) -> Release: """Convert release from the flat representation to swh model compatible objects. """ - return db_to_release(release).to_dict() + release = db_to_release(release_d) + assert release is not None, release_d["id"] + return release -def snapshot_converter(db, snapshot): +def snapshot_converter(db: BaseDb, snapshot_d: Dict[str, Any]) -> Snapshot: """Convert snapshot from the flat representation to swh model compatible objects. @@ -231,19 +252,23 @@ columns ) with db.cursor() as cur: - cur.execute(query, (snapshot.pop("object_id"),)) + cur.execute(query, (snapshot_d["object_id"],)) branches = {} for name, *row in cur: - branch = dict(zip(columns[1:], row)) - if not branch["target"] and not branch["target_type"]: + branch_d = dict(zip(columns[1:], row)) + if branch_d["target"] or branch_d["target_type"]: branch = None + else: + branch = SnapshotBranch( + target=branch_d["target"], + target_type=TargetType(branch_d["target_type"]), + ) branches[name] = branch - snapshot["branches"] = branches - return snapshot + return Snapshot(id=snapshot_d["id"], branches=branches,) -CONVERTERS = { +CONVERTERS: Dict[str, Callable[[BaseDb, Dict[str, Any]], BaseModel]] = { "directory": directory_converter, "raw_extrinsic_metadata": raw_extrinsic_metadata_converter, "revision": revision_converter, @@ -414,6 +439,8 @@ record = dict(zip(column_aliases, row)) if converter: record = converter(db, record) + else: + record = object_converter_fn[obj_type](record) logger.debug("record: %s" % record) yield record @@ -508,8 +535,7 @@ _format_range_bound(range_end), ) - for obj_d in fetch(db, object_type, start=range_start, end=range_end,): - obj = object_converter_fn[object_type](obj_d) + for obj in fetch(db, object_type, start=range_start, end=range_end,): if dry_run: continue writer.write_addition(object_type=object_type, object_=obj)