Changeset View
Changeset View
Standalone View
Standalone View
swh/storage/backfill.py
Show All 15 Lines | |||||
""" | """ | ||||
import logging | import logging | ||||
from swh.core.db import BaseDb | from swh.core.db import BaseDb | ||||
from swh.journal.writer.kafka import KafkaJournalWriter | from swh.journal.writer.kafka import KafkaJournalWriter | ||||
from swh.storage.converters import db_to_release, db_to_revision | from swh.storage.converters import db_to_release, db_to_revision | ||||
from swh.storage.replay import object_converter_fn | |||||
logger = logging.getLogger(__name__) | logger = logging.getLogger(__name__) | ||||
PARTITION_KEY = { | PARTITION_KEY = { | ||||
"content": "sha1", | "content": "sha1", | ||||
"skipped_content": "sha1", | "skipped_content": "sha1", | ||||
"directory": "id", | "directory": "id", | ||||
"revision": "revision.id", | "revision": "revision.id", | ||||
"release": "release.id", | "release": "release.id", | ||||
"snapshot": "id", | "snapshot": "id", | ||||
"origin": "id", | "origin": "id", | ||||
"origin_visit": "origin_visit.origin", | "origin_visit": "origin_visit.origin", | ||||
"origin_visit_status": "origin_visit_status.origin", | |||||
} | } | ||||
COLUMNS = { | COLUMNS = { | ||||
"content": [ | "content": [ | ||||
"sha1", | "sha1", | ||||
"sha1_git", | "sha1_git", | ||||
"sha256", | "sha256", | ||||
"blake2s256", | "blake2s256", | ||||
Show All 11 Lines | "skipped_content": [ | ||||
"status", | "status", | ||||
"reason", | "reason", | ||||
], | ], | ||||
"directory": ["id", "dir_entries", "file_entries", "rev_entries"], | "directory": ["id", "dir_entries", "file_entries", "rev_entries"], | ||||
"revision": [ | "revision": [ | ||||
("revision.id", "id"), | ("revision.id", "id"), | ||||
"date", | "date", | ||||
"date_offset", | "date_offset", | ||||
"date_neg_utc_offset", | |||||
"committer_date", | "committer_date", | ||||
"committer_date_offset", | "committer_date_offset", | ||||
"committer_date_neg_utc_offset", | |||||
"type", | "type", | ||||
"directory", | "directory", | ||||
"message", | "message", | ||||
"synthetic", | "synthetic", | ||||
"metadata", | "metadata", | ||||
"date_neg_utc_offset", | |||||
"committer_date_neg_utc_offset", | |||||
( | ( | ||||
"array(select parent_id::bytea from revision_history rh " | "array(select parent_id::bytea from revision_history rh " | ||||
"where rh.id = revision.id order by rh.parent_rank asc)", | "where rh.id = revision.id order by rh.parent_rank asc)", | ||||
"parents", | "parents", | ||||
), | ), | ||||
("a.id", "author_id"), | ("a.id", "author_id"), | ||||
("a.name", "author_name"), | ("a.name", "author_name"), | ||||
("a.email", "author_email"), | ("a.email", "author_email"), | ||||
("a.fullname", "author_fullname"), | ("a.fullname", "author_fullname"), | ||||
("c.id", "committer_id"), | ("c.id", "committer_id"), | ||||
("c.name", "committer_name"), | ("c.name", "committer_name"), | ||||
("c.email", "committer_email"), | ("c.email", "committer_email"), | ||||
("c.fullname", "committer_fullname"), | ("c.fullname", "committer_fullname"), | ||||
], | ], | ||||
"release": [ | "release": [ | ||||
("release.id", "id"), | ("release.id", "id"), | ||||
"date", | "date", | ||||
"date_offset", | "date_offset", | ||||
"date_neg_utc_offset", | |||||
"comment", | "comment", | ||||
("release.name", "name"), | ("release.name", "name"), | ||||
"synthetic", | "synthetic", | ||||
"date_neg_utc_offset", | |||||
"target", | "target", | ||||
"target_type", | "target_type", | ||||
("a.id", "author_id"), | ("a.id", "author_id"), | ||||
("a.name", "author_name"), | ("a.name", "author_name"), | ||||
("a.email", "author_email"), | ("a.email", "author_email"), | ||||
("a.fullname", "author_fullname"), | ("a.fullname", "author_fullname"), | ||||
], | ], | ||||
"snapshot": ["id", "object_id"], | "snapshot": ["id", "object_id"], | ||||
"origin": ["type", "url"], | "origin": ["url"], | ||||
"origin_visit": [ | "origin_visit": [ | ||||
"visit", | "visit", | ||||
"origin.type", | "type", | ||||
"origin_visit.type", | ("origin.url", "origin"), | ||||
"url", | "date", | ||||
"snapshot", | |||||
"status", | |||||
"metadata", | |||||
], | |||||
"origin_visit_status": [ | |||||
"visit", | |||||
("origin.url", "origin"), | |||||
"date", | "date", | ||||
"snapshot", | "snapshot", | ||||
"status", | "status", | ||||
"metadata", | "metadata", | ||||
], | ], | ||||
} | } | ||||
JOINS = { | JOINS = { | ||||
"release": ["person a on release.author=a.id"], | "release": ["person a on release.author=a.id"], | ||||
"revision": [ | "revision": [ | ||||
"person a on revision.author=a.id", | "person a on revision.author=a.id", | ||||
"person c on revision.committer=c.id", | "person c on revision.committer=c.id", | ||||
], | ], | ||||
"origin_visit": ["origin on origin_visit.origin=origin.id"], | "origin_visit": ["origin on origin_visit.origin=origin.id"], | ||||
"origin_visit_status": ["origin on origin_visit_status.origin=origin.id"], | |||||
} | } | ||||
def directory_converter(db, directory): | def directory_converter(db, directory): | ||||
"""Convert directory from the flat representation to swh model | """Convert directory from the flat representation to swh model | ||||
compatible objects. | compatible objects. | ||||
""" | """ | ||||
Show All 34 Lines | def revision_converter(db, revision): | ||||
return db_to_revision(revision) | return db_to_revision(revision) | ||||
def release_converter(db, release): | def release_converter(db, release): | ||||
"""Convert release from the flat representation to swh model | """Convert release from the flat representation to swh model | ||||
compatible objects. | compatible objects. | ||||
""" | """ | ||||
release = db_to_release(release) | return db_to_release(release) | ||||
if "author" in release and release["author"]: | |||||
del release["author"]["id"] | |||||
return release | |||||
def snapshot_converter(db, snapshot): | def snapshot_converter(db, snapshot): | ||||
"""Convert snapshot from the flat representation to swh model | """Convert snapshot from the flat representation to swh model | ||||
compatible objects. | compatible objects. | ||||
""" | """ | ||||
columns = ["name", "target", "target_type"] | columns = ["name", "target", "target_type"] | ||||
Show All 13 Lines | with db.cursor() as cur: | ||||
if not branch["target"] and not branch["target_type"]: | if not branch["target"] and not branch["target_type"]: | ||||
branch = None | branch = None | ||||
branches[name] = branch | branches[name] = branch | ||||
snapshot["branches"] = branches | snapshot["branches"] = branches | ||||
return snapshot | return snapshot | ||||
def origin_visit_converter(db, origin_visit): | |||||
origin = { | |||||
"type": origin_visit.pop("origin.type"), | |||||
"url": origin_visit.pop("url"), | |||||
} | |||||
origin_visit["origin"] = origin | |||||
origin_visit["type"] = origin_visit.pop("origin_visit.type") | |||||
return origin_visit | |||||
CONVERTERS = { | CONVERTERS = { | ||||
"directory": directory_converter, | "directory": directory_converter, | ||||
"revision": revision_converter, | "revision": revision_converter, | ||||
"release": release_converter, | "release": release_converter, | ||||
"snapshot": snapshot_converter, | "snapshot": snapshot_converter, | ||||
"origin_visit": origin_visit_converter, | |||||
} | } | ||||
def object_to_offset(object_id, numbits): | def object_to_offset(object_id, numbits): | ||||
"""Compute the index of the range containing object id, when dividing | """Compute the index of the range containing object id, when dividing | ||||
space into 2^numbits. | space into 2^numbits. | ||||
Args: | Args: | ||||
▲ Show 20 Lines • Show All 71 Lines • ▼ Show 20 Lines | RANGE_GENERATORS = { | ||||
"content": lambda start, end: byte_ranges(24, start, end), | "content": lambda start, end: byte_ranges(24, start, end), | ||||
"skipped_content": lambda start, end: [(None, None)], | "skipped_content": lambda start, end: [(None, None)], | ||||
"directory": lambda start, end: byte_ranges(24, start, end), | "directory": lambda start, end: byte_ranges(24, start, end), | ||||
"revision": lambda start, end: byte_ranges(24, start, end), | "revision": lambda start, end: byte_ranges(24, start, end), | ||||
"release": lambda start, end: byte_ranges(16, start, end), | "release": lambda start, end: byte_ranges(16, start, end), | ||||
"snapshot": lambda start, end: byte_ranges(16, start, end), | "snapshot": lambda start, end: byte_ranges(16, start, end), | ||||
"origin": integer_ranges, | "origin": integer_ranges, | ||||
"origin_visit": integer_ranges, | "origin_visit": integer_ranges, | ||||
"origin_visit_status": integer_ranges, | |||||
} | } | ||||
def compute_query(obj_type, start, end): | def compute_query(obj_type, start, end): | ||||
columns = COLUMNS.get(obj_type) | columns = COLUMNS.get(obj_type) | ||||
join_specs = JOINS.get(obj_type, []) | join_specs = JOINS.get(obj_type, []) | ||||
join_clause = "\n".join("left join %s" % clause for clause in join_specs) | join_clause = "\n".join("left join %s" % clause for clause in join_specs) | ||||
▲ Show 20 Lines • Show All 155 Lines • ▼ Show 20 Lines | def run(self, object_type, start_object, end_object, dry_run=False): | ||||
): | ): | ||||
logger.info( | logger.info( | ||||
"Processing %s range %s to %s", | "Processing %s range %s to %s", | ||||
object_type, | object_type, | ||||
_format_range_bound(range_start), | _format_range_bound(range_start), | ||||
_format_range_bound(range_end), | _format_range_bound(range_end), | ||||
) | ) | ||||
for obj in fetch(db, object_type, start=range_start, end=range_end,): | for obj_d in fetch(db, object_type, start=range_start, end=range_end,): | ||||
obj = object_converter_fn[object_type](obj_d) | |||||
if dry_run: | if dry_run: | ||||
continue | continue | ||||
writer.write_addition(object_type=object_type, object_=obj) | writer.write_addition(object_type=object_type, object_=obj) | ||||
writer.producer.flush() | writer.producer.flush() | ||||
if __name__ == "__main__": | if __name__ == "__main__": | ||||
print('Please use the "swh-journal backfiller run" command') | print('Please use the "swh-journal backfiller run" command') |