Changeset View
Changeset View
Standalone View
Standalone View
swh/storage/backfill.py
Show All 14 Lines | |||||
""" | """ | ||||
import logging | import logging | ||||
from swh.core.db import BaseDb | from swh.core.db import BaseDb | ||||
from swh.journal.writer.kafka import KafkaJournalWriter | from swh.journal.writer.kafka import KafkaJournalWriter | ||||
from swh.storage.converters import db_to_release, db_to_revision | from swh.storage.converters import ( | ||||
db_to_raw_extrinsic_metadata, | |||||
db_to_release, | |||||
db_to_revision, | |||||
) | |||||
from swh.storage.replay import object_converter_fn | from swh.storage.replay import object_converter_fn | ||||
logger = logging.getLogger(__name__) | logger = logging.getLogger(__name__) | ||||
PARTITION_KEY = { | PARTITION_KEY = { | ||||
"content": "sha1", | "content": "sha1", | ||||
"skipped_content": "sha1", | "skipped_content": "sha1", | ||||
"directory": "id", | "directory": "id", | ||||
"metadata_authority": "type, url", | |||||
"metadata_fetcher": "name, version", | |||||
"raw_extrinsic_metadata": "id", | |||||
"revision": "revision.id", | "revision": "revision.id", | ||||
"release": "release.id", | "release": "release.id", | ||||
"snapshot": "id", | "snapshot": "id", | ||||
"origin": "id", | "origin": "id", | ||||
"origin_visit": "origin_visit.origin", | "origin_visit": "origin_visit.origin", | ||||
"origin_visit_status": "origin_visit_status.origin", | "origin_visit_status": "origin_visit_status.origin", | ||||
} | } | ||||
Show All 13 Lines | "skipped_content": [ | ||||
"sha256", | "sha256", | ||||
"blake2s256", | "blake2s256", | ||||
"length", | "length", | ||||
"ctime", | "ctime", | ||||
"status", | "status", | ||||
"reason", | "reason", | ||||
], | ], | ||||
"directory": ["id", "dir_entries", "file_entries", "rev_entries"], | "directory": ["id", "dir_entries", "file_entries", "rev_entries"], | ||||
"metadata_authority": ["type", "url", "metadata",], | |||||
"metadata_fetcher": ["name", "version", "metadata",], | |||||
"raw_extrinsic_metadata": [ | |||||
"raw_extrinsic_metadata.type", | |||||
"raw_extrinsic_metadata.id", | |||||
"metadata_authority.type", | |||||
"metadata_authority.url", | |||||
"metadata_fetcher.name", | |||||
"metadata_fetcher.version", | |||||
"discovery_date", | |||||
"format", | |||||
"raw_extrinsic_metadata.metadata", | |||||
"origin", | |||||
"visit", | |||||
"snapshot", | |||||
"release", | |||||
"revision", | |||||
"path", | |||||
"directory", | |||||
], | |||||
"revision": [ | "revision": [ | ||||
("revision.id", "id"), | ("revision.id", "id"), | ||||
"date", | "date", | ||||
"date_offset", | "date_offset", | ||||
"date_neg_utc_offset", | "date_neg_utc_offset", | ||||
"committer_date", | "committer_date", | ||||
"committer_date_offset", | "committer_date_offset", | ||||
"committer_date_neg_utc_offset", | "committer_date_neg_utc_offset", | ||||
▲ Show 20 Lines • Show All 48 Lines • ▼ Show 20 Lines | |||||
JOINS = { | JOINS = { | ||||
"release": ["person a on release.author=a.id"], | "release": ["person a on release.author=a.id"], | ||||
"revision": [ | "revision": [ | ||||
"person a on revision.author=a.id", | "person a on revision.author=a.id", | ||||
"person c on revision.committer=c.id", | "person c on revision.committer=c.id", | ||||
], | ], | ||||
"origin_visit": ["origin on origin_visit.origin=origin.id"], | "origin_visit": ["origin on origin_visit.origin=origin.id"], | ||||
"origin_visit_status": ["origin on origin_visit_status.origin=origin.id"], | "origin_visit_status": ["origin on origin_visit_status.origin=origin.id"], | ||||
"raw_extrinsic_metadata": [ | |||||
"metadata_authority on " | |||||
"raw_extrinsic_metadata.authority_id=metadata_authority.id", | |||||
"metadata_fetcher on raw_extrinsic_metadata.fetcher_id=metadata_fetcher.id", | |||||
], | |||||
} | } | ||||
def directory_converter(db, directory): | def directory_converter(db, directory): | ||||
"""Convert directory from the flat representation to swh model | """Convert directory from the flat representation to swh model | ||||
compatible objects. | compatible objects. | ||||
""" | """ | ||||
Show All 21 Lines | with db.cursor() as cur: | ||||
entry = dict(zip(columns, row)) | entry = dict(zip(columns, row)) | ||||
entry["type"] = type | entry["type"] = type | ||||
entries.append(entry) | entries.append(entry) | ||||
directory["entries"] = entries | directory["entries"] = entries | ||||
return directory | return directory | ||||
def raw_extrinsic_metadata_converter(db, metadata): | |||||
"""Convert revision from the flat representation to swh model | |||||
compatible objects. | |||||
""" | |||||
return db_to_raw_extrinsic_metadata(metadata).to_dict() | |||||
def revision_converter(db, revision): | def revision_converter(db, revision): | ||||
"""Convert revision from the flat representation to swh model | """Convert revision from the flat representation to swh model | ||||
compatible objects. | compatible objects. | ||||
""" | """ | ||||
return db_to_revision(revision) | return db_to_revision(revision) | ||||
Show All 29 Lines | with db.cursor() as cur: | ||||
branches[name] = branch | branches[name] = branch | ||||
snapshot["branches"] = branches | snapshot["branches"] = branches | ||||
return snapshot | return snapshot | ||||
CONVERTERS = { | CONVERTERS = { | ||||
"directory": directory_converter, | "directory": directory_converter, | ||||
"raw_extrinsic_metadata": raw_extrinsic_metadata_converter, | |||||
"revision": revision_converter, | "revision": revision_converter, | ||||
"release": release_converter, | "release": release_converter, | ||||
"snapshot": snapshot_converter, | "snapshot": snapshot_converter, | ||||
} | } | ||||
def object_to_offset(object_id, numbits): | def object_to_offset(object_id, numbits): | ||||
"""Compute the index of the range containing object id, when dividing | """Compute the index of the range containing object id, when dividing | ||||
▲ Show 20 Lines • Show All 265 Lines • Show Last 20 Lines |