Changeset View
Changeset View
Standalone View
Standalone View
swh/storage/backfill.py
Show All 29 Lines | from swh.model.model import ( | ||||
SnapshotBranch, | SnapshotBranch, | ||||
TargetType, | TargetType, | ||||
) | ) | ||||
from swh.storage.postgresql.converters import ( | from swh.storage.postgresql.converters import ( | ||||
db_to_raw_extrinsic_metadata, | db_to_raw_extrinsic_metadata, | ||||
db_to_release, | db_to_release, | ||||
db_to_revision, | db_to_revision, | ||||
) | ) | ||||
from swh.storage.postgresql.db import register_swhid_type | |||||
from swh.storage.replay import object_converter_fn | from swh.storage.replay import object_converter_fn | ||||
from swh.storage.writer import JournalWriter | from swh.storage.writer import JournalWriter | ||||
logger = logging.getLogger(__name__) | logger = logging.getLogger(__name__) | ||||
PARTITION_KEY = { | PARTITION_KEY = { | ||||
"content": "sha1", | "content": "sha1", | ||||
"skipped_content": "sha1", | "skipped_content": "sha1", | ||||
Show All 28 Lines | "skipped_content": [ | ||||
"ctime", | "ctime", | ||||
"status", | "status", | ||||
"reason", | "reason", | ||||
], | ], | ||||
"directory": ["id", "dir_entries", "file_entries", "rev_entries"], | "directory": ["id", "dir_entries", "file_entries", "rev_entries"], | ||||
"metadata_authority": ["type", "url", "metadata",], | "metadata_authority": ["type", "url", "metadata",], | ||||
"metadata_fetcher": ["name", "version", "metadata",], | "metadata_fetcher": ["name", "version", "metadata",], | ||||
"raw_extrinsic_metadata": [ | "raw_extrinsic_metadata": [ | ||||
"raw_extrinsic_metadata.type", | |||||
"raw_extrinsic_metadata.target", | "raw_extrinsic_metadata.target", | ||||
"metadata_authority.type", | "metadata_authority.type", | ||||
"metadata_authority.url", | "metadata_authority.url", | ||||
"metadata_fetcher.name", | "metadata_fetcher.name", | ||||
"metadata_fetcher.version", | "metadata_fetcher.version", | ||||
"discovery_date", | "discovery_date", | ||||
"format", | "format", | ||||
"raw_extrinsic_metadata.metadata", | "raw_extrinsic_metadata.metadata", | ||||
▲ Show 20 Lines • Show All 437 Lines • ▼ Show 20 Lines | def run(self, object_type, start_object, end_object, dry_run=False): | ||||
journal's reading topic. | journal's reading topic. | ||||
""" | """ | ||||
start_object, end_object = self.parse_arguments( | start_object, end_object = self.parse_arguments( | ||||
object_type, start_object, end_object | object_type, start_object, end_object | ||||
) | ) | ||||
db = BaseDb.connect(self.config["storage"]["db"]) | db = BaseDb.connect(self.config["storage"]["db"]) | ||||
register_swhid_type(db.conn) | |||||
writer = JournalWriter({"cls": "kafka", **self.config["journal_writer"]}) | writer = JournalWriter({"cls": "kafka", **self.config["journal_writer"]}) | ||||
assert writer.journal is not None | assert writer.journal is not None | ||||
for range_start, range_end in RANGE_GENERATORS[object_type]( | for range_start, range_end in RANGE_GENERATORS[object_type]( | ||||
start_object, end_object | start_object, end_object | ||||
): | ): | ||||
logger.info( | logger.info( | ||||
"Processing %s range %s to %s", | "Processing %s range %s to %s", | ||||
Show All 18 Lines |