Changeset View
Changeset View
Standalone View
Standalone View
swh/provenance/cli.py
Show First 20 Lines • Show All 71 Lines • ▼ Show 20 Lines | |||||
@click.option( | @click.option( | ||||
"-P", | "-P", | ||||
"--profile", | "--profile", | ||||
default=None, | default=None, | ||||
type=click.Path(exists=False, dir_okay=False, path_type=str), | type=click.Path(exists=False, dir_okay=False, path_type=str), | ||||
help="""Enable profiling to specified file.""", | help="""Enable profiling to specified file.""", | ||||
) | ) | ||||
@click.pass_context | @click.pass_context | ||||
def cli(ctx, config_file: Optional[str], profile: str) -> None: | def cli(ctx: click.core.Context, config_file: Optional[str], profile: str) -> None: | ||||
if config_file is None and config.config_exists(DEFAULT_PATH): | if config_file is None and config.config_exists(DEFAULT_PATH): | ||||
config_file = DEFAULT_PATH | config_file = DEFAULT_PATH | ||||
if config_file is None: | if config_file is None: | ||||
conf = DEFAULT_CONFIG | conf = DEFAULT_CONFIG | ||||
else: | else: | ||||
# read_raw_config do not fail on ENOENT | # read_raw_config do not fail on ENOENT | ||||
if not config.config_exists(config_file): | if not config.config_exists(config_file): | ||||
Show All 22 Lines | |||||
@cli.command(name="iter-revisions") | @cli.command(name="iter-revisions") | ||||
@click.argument("filename") | @click.argument("filename") | ||||
@click.option("-a", "--track-all", default=True, type=bool) | @click.option("-a", "--track-all", default=True, type=bool) | ||||
@click.option("-l", "--limit", type=int) | @click.option("-l", "--limit", type=int) | ||||
@click.option("-m", "--min-depth", default=1, type=int) | @click.option("-m", "--min-depth", default=1, type=int) | ||||
@click.option("-r", "--reuse", default=True, type=bool) | @click.option("-r", "--reuse", default=True, type=bool) | ||||
@click.pass_context | @click.pass_context | ||||
def iter_revisions( | def iter_revisions( | ||||
ctx, | ctx: click.core.Context, | ||||
filename: str, | filename: str, | ||||
track_all: bool, | track_all: bool, | ||||
limit: Optional[int], | limit: Optional[int], | ||||
min_depth: int, | min_depth: int, | ||||
reuse: bool, | reuse: bool, | ||||
) -> None: | ) -> None: | ||||
# TODO: add file size filtering | # TODO: add file size filtering | ||||
"""Process a provided list of revisions.""" | """Process a provided list of revisions.""" | ||||
Show All 28 Lines | for line in open(filename, "r"): | ||||
hash_to_bytes(root), | hash_to_bytes(root), | ||||
) | ) | ||||
@cli.command(name="iter-origins") | @cli.command(name="iter-origins") | ||||
@click.argument("filename") | @click.argument("filename") | ||||
@click.option("-l", "--limit", type=int) | @click.option("-l", "--limit", type=int) | ||||
@click.pass_context | @click.pass_context | ||||
def iter_origins(ctx, filename: str, limit: Optional[int]) -> None: | def iter_origins(ctx: click.core.Context, filename: str, limit: Optional[int]) -> None: | ||||
"""Process a provided list of origins.""" | """Process a provided list of origins.""" | ||||
from . import get_archive, get_provenance | from . import get_archive, get_provenance | ||||
from .origin import CSVOriginIterator, origin_add | from .origin import CSVOriginIterator, origin_add | ||||
archive = get_archive(**ctx.obj["config"]["archive"]) | archive = get_archive(**ctx.obj["config"]["archive"]) | ||||
provenance = get_provenance(**ctx.obj["config"]["provenance"]) | provenance = get_provenance(**ctx.obj["config"]["provenance"]) | ||||
origins_provider = generate_origin_tuples(filename) | origins_provider = generate_origin_tuples(filename) | ||||
origins = CSVOriginIterator(origins_provider, limit=limit) | origins = CSVOriginIterator(origins_provider, limit=limit) | ||||
for origin in origins: | for origin in origins: | ||||
origin_add(provenance, archive, [origin]) | origin_add(provenance, archive, [origin]) | ||||
def generate_origin_tuples(filename: str) -> Generator[Tuple[str, bytes], None, None]: | def generate_origin_tuples(filename: str) -> Generator[Tuple[str, bytes], None, None]: | ||||
for line in open(filename, "r"): | for line in open(filename, "r"): | ||||
if line.strip(): | if line.strip(): | ||||
url, snapshot = line.strip().split(",") | url, snapshot = line.strip().split(",") | ||||
yield (url, hash_to_bytes(snapshot)) | yield (url, hash_to_bytes(snapshot)) | ||||
@cli.command(name="find-first") | @cli.command(name="find-first") | ||||
@click.argument("swhid") | @click.argument("swhid") | ||||
@click.pass_context | @click.pass_context | ||||
def find_first(ctx, swhid: str) -> None: | def find_first(ctx: click.core.Context, swhid: str) -> None: | ||||
"""Find first occurrence of the requested blob.""" | """Find first occurrence of the requested blob.""" | ||||
from . import get_provenance | from . import get_provenance | ||||
provenance = get_provenance(**ctx.obj["config"]["provenance"]) | provenance = get_provenance(**ctx.obj["config"]["provenance"]) | ||||
# TODO: return a dictionary with proper keys for each field | # TODO: return a dictionary with proper keys for each field | ||||
occur = provenance.content_find_first(hash_to_bytes(swhid)) | occur = provenance.content_find_first(hash_to_bytes(swhid)) | ||||
if occur is not None: | if occur is not None: | ||||
print( | print( | ||||
f"swh:1:cnt:{hash_to_hex(occur.content)}, " | f"swh:1:cnt:{hash_to_hex(occur.content)}, " | ||||
f"swh:1:rev:{hash_to_hex(occur.revision)}, " | f"swh:1:rev:{hash_to_hex(occur.revision)}, " | ||||
f"{occur.date}, " | f"{occur.date}, " | ||||
f"{occur.origin}, " | f"{occur.origin}, " | ||||
f"{os.fsdecode(occur.path)}" | f"{os.fsdecode(occur.path)}" | ||||
) | ) | ||||
else: | else: | ||||
print(f"Cannot find a content with the id {swhid}") | print(f"Cannot find a content with the id {swhid}") | ||||
@cli.command(name="find-all") | @cli.command(name="find-all") | ||||
@click.argument("swhid") | @click.argument("swhid") | ||||
@click.option("-l", "--limit", type=int) | @click.option("-l", "--limit", type=int) | ||||
@click.pass_context | @click.pass_context | ||||
def find_all(ctx, swhid: str, limit: Optional[int]) -> None: | def find_all(ctx: click.core.Context, swhid: str, limit: Optional[int]) -> None: | ||||
"""Find all occurrences of the requested blob.""" | """Find all occurrences of the requested blob.""" | ||||
from . import get_provenance | from . import get_provenance | ||||
provenance = get_provenance(**ctx.obj["config"]["provenance"]) | provenance = get_provenance(**ctx.obj["config"]["provenance"]) | ||||
# TODO: return a dictionary with proper keys for each field | # TODO: return a dictionary with proper keys for each field | ||||
for occur in provenance.content_find_all(hash_to_bytes(swhid), limit=limit): | for occur in provenance.content_find_all(hash_to_bytes(swhid), limit=limit): | ||||
print( | print( | ||||
f"swh:1:cnt:{hash_to_hex(occur.content)}, " | f"swh:1:cnt:{hash_to_hex(occur.content)}, " | ||||
f"swh:1:rev:{hash_to_hex(occur.revision)}, " | f"swh:1:rev:{hash_to_hex(occur.revision)}, " | ||||
f"{occur.date}, " | f"{occur.date}, " | ||||
f"{occur.origin}, " | f"{occur.origin}, " | ||||
f"{os.fsdecode(occur.path)}" | f"{os.fsdecode(occur.path)}" | ||||
) | ) |