Changeset View
Changeset View
Standalone View
Standalone View
swh/storage/cli.py
Show First 20 Lines • Show All 195 Lines • ▼ Show 20 Lines | def replay(ctx, stop_after_objects): | ||||
else: | else: | ||||
print("Done.") | print("Done.") | ||||
finally: | finally: | ||||
if notify: | if notify: | ||||
notify("STOPPING=1") | notify("STOPPING=1") | ||||
client.close() | client.close() | ||||
@storage.command() | |||||
@click.pass_context | |||||
def migrate_168(ctx): | |||||
"""Fills the 'id' column of raw_extrinsic_metadata in a postgresql backend. | |||||
Uses the journal as input (instead of SELECT) so it is parallelizable and | |||||
stop/resumable. | |||||
""" | |||||
import psycopg2.extras | |||||
from swh.journal.client import get_journal_client | |||||
from swh.model.hashutil import hash_to_bytes | |||||
from swh.model.identifiers import raw_extrinsic_metadata_identifier | |||||
from swh.storage import get_storage | |||||
from swh.storage.postgresql.storage import Storage as PostgresqlStorage | |||||
ensure_check_config(ctx.obj["config"], ctx.obj["check_config"], "write") | |||||
conf = ctx.obj["config"] | |||||
storage = get_storage(**conf.pop("storage")) | |||||
if not isinstance(storage, PostgresqlStorage): | |||||
raise click.Error("storage must has 'cls: local'.") | |||||
conn = storage.db.conn | |||||
client_cfg = conf.pop("journal_client") | |||||
try: | |||||
client = get_journal_client(**client_cfg) | |||||
except ValueError as exc: | |||||
ctx.fail(exc) | |||||
query = """ | |||||
UPDATE raw_extrinsic_metadata | |||||
SET id=? | |||||
FROM authority, fetcher | |||||
WHERE | |||||
authority.id=raw_extrinsic_metadata.authority_id | |||||
fetcher.id=raw_extrinsic_metadata.fetcher_id | |||||
AND authority.type=? | |||||
AND authority.url=? | |||||
AND discovery_date=? | |||||
AND fetcher.name=? | |||||
AND fetcher.version=? | |||||
""" | |||||
def worker_fn(all_objects): | |||||
assert set(all_objects) == {"raw_extrinsic_metadata"} | |||||
rows = [] | |||||
for obj in all_objects["raw_extrinsic_metadata"]: | |||||
rows.append( | |||||
( | |||||
hash_to_bytes(raw_extrinsic_metadata_identifier(obj)), | |||||
obj["target"], | |||||
obj["authority"]["type"], | |||||
obj["authority"]["url"], | |||||
obj["discovery_date"], | |||||
obj["fetcher"]["name"], | |||||
obj["fetcher"]["version"], | |||||
) | |||||
) | |||||
with conn.cursor() as cur: | |||||
psycopg2.extras.execute_batch(cur, query, rows) | |||||
conn.commit() | |||||
if notify: | |||||
notify("READY=1") | |||||
try: | |||||
client.process(worker_fn) | |||||
except KeyboardInterrupt: | |||||
ctx.exit(0) | |||||
else: | |||||
print("Done.") | |||||
finally: | |||||
if notify: | |||||
notify("STOPPING=1") | |||||
client.close() | |||||
def ensure_check_config(storage_cfg: Dict, check_config: Optional[str], default: str): | def ensure_check_config(storage_cfg: Dict, check_config: Optional[str], default: str): | ||||
"""Helper function to inject the setting of check_config option in the storage config | """Helper function to inject the setting of check_config option in the storage config | ||||
dict according to the expected default value (default value depends on the command, | dict according to the expected default value (default value depends on the command, | ||||
eg. backfill can be read-only). | eg. backfill can be read-only). | ||||
""" | """ | ||||
if check_config is not None: | if check_config is not None: | ||||
if check_config == "no": | if check_config == "no": | ||||
Show All 15 Lines |