diff --git a/swh/loader/mercurial/identify.py b/swh/loader/mercurial/identify.py index 289f7d3..c7d0581 100644 --- a/swh/loader/mercurial/identify.py +++ b/swh/loader/mercurial/identify.py @@ -1,554 +1,545 @@ -# Copyright (C) 2020 The Software Heritage developers +# Copyright (C) 2020-2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from codecs import escape_decode # type: ignore import json -import os from pathlib import Path import re import subprocess from typing import Any, Dict, Iterator, List, NamedTuple, Optional, Union # WARNING: do not import unnecessary things here to keep cli startup time under # control import click +from swh.loader.mercurial.utils import get_minimum_env from swh.model.cli import identify_object from swh.model.hashutil import hash_to_bytehex from swh.model.identifiers import normalize_timestamp, swhid from swh.model.model import RevisionType TAG_PATTERN = re.compile(b"([0-9A-Fa-f]{40}) +(.+)") class HgAuthor(NamedTuple): """Represent a Mercurial revision author.""" fullname: bytes """full name of the author""" name: Optional[bytes] """name of the author""" email: Optional[bytes] """email of the author""" @staticmethod def from_bytes(data: bytes) -> "HgAuthor": """Convert bytes to an HgAuthor named tuple. Expected format: "name " """ from swh.loader.mercurial.converters import parse_author result = parse_author(data) return HgAuthor( fullname=result["fullname"], name=result["name"], email=result["email"] ) def to_dict(self) -> Dict[str, Optional[bytes]]: return {"fullname": self.fullname, "name": self.name, "email": self.email} HG_REVISION_TEMPLATE = "\n".join( [ "node_id:{node}", "author:{author}", "timestamp_offset:{date|json}", "p1:{p1.node}", "p2:{p2.node}", "extras:{join(extras, '\nextras:')}", ] ) # Log template for HgRevision.from_bytes NULL_NODE_ID = b"0" * 40 # Value used when no parent class HgRevision(NamedTuple): """Represent a Mercurial revision.""" node_id: bytes """raw bytes of the revision hash""" author: HgAuthor """author of the revision""" timestamp: bytes """timestamp of the revision""" offset: bytes """offset of the revision""" parents: List[bytes] """hex bytes of the revision's parents""" extras: Dict[bytes, bytes] """metadata of the revision""" description: bytes """description of the revision""" @staticmethod def from_bytes(data: bytes, description: bytes) -> "HgRevision": """Convert bytes to an HgRevision named tuple. Expected data format: ''' node_id:{node} author:{author} timestamp_offset:[{timestamp}, {offset}] p1:{p1} p2:{p2} extras:{key1}={value1} ... extras:{keyn}={value} ''' """ lines = data.split(b"\n") tuples = [line.split(b":", 1) for line in lines] fields: Dict[str, Any] = { "parents": [], "extras": {}, "description": description, } for key, value in tuples: if key == b"timestamp_offset": timestamp, offset = json.loads(value) fields["timestamp"] = timestamp fields["offset"] = offset elif key in (b"p1", b"p2"): if value != NULL_NODE_ID: fields["parents"].append(value) elif key == b"extras": extra_key, extra_value = value.split(b"=", 1) fields["extras"][extra_key] = extra_value elif key == b"author": fields["author"] = HgAuthor.from_bytes(value) else: fields[key.decode()] = value return HgRevision(**fields) def branch(self) -> bytes: return self.extras.get(b"branch", b"default") def to_dict(self) -> Dict: """Convert a HgRevision to a dict for SWHID computation""" date = normalize_timestamp(int(self.timestamp)) extra_headers = [ (b"time_offset_seconds", str(self.offset).encode("utf-8")), ] for key, value in self.extras.items(): if key == b"branch" and value == b"default": # branch default is skipped to match historical implementation continue if key == b"transplant_source": # transplant_source is converted to hex # to match historical implementation value = hash_to_bytehex(escape_decode(value)[0]) extra_headers.append((key, value)) author = self.author.to_dict() return { "author": author, "date": date, "committer": author, "committer_date": date, "type": RevisionType.MERCURIAL.value, "message": self.description, "metadata": {"node": self.node_id}, "extra_headers": tuple(extra_headers), "synthetic": False, "parents": self.parents, } class HgBranch(NamedTuple): """Represent a Mercurial branch.""" name: bytes """name of the branch""" node_id: bytes """row bytes of the target revision hash""" class HgTag(NamedTuple): """Represent a Mercurial tag.""" name: bytes """name of the tag""" node_id: bytes """hex bytes of the target revision""" class Hg: """Provide methods to extract data from a Mercurial repository.""" def __init__(self, repository_root: Path) -> None: self._root = repository_root def _output(self, *args) -> bytes: """Return the outpout of a `hg` call.""" return subprocess.check_output( - ["hg", *args], cwd=self._root, env=self._get_env() + ["hg", *args], cwd=self._root, env=get_minimum_env() ) def _call(self, *args) -> None: """Perform a `hg` call.""" subprocess.check_call( ["hg", *args], cwd=self._root, stderr=subprocess.PIPE, stdout=subprocess.PIPE, - env=self._get_env(), + env=get_minimum_env(), ) - def _get_env(self) -> Dict[str, str]: - """Return the smallest viable environment for `hg` suprocesses""" - env = { - "PATH": os.environ["PATH"], - "HGPLAIN": "", # Tells Mercurial to disable output customization - "HGRCPATH": "", # Tells Mercurial to ignore config files - } - return env - def root(self) -> Path: """Return the root of the Mercurial repository.""" return self._root def log(self, rev: Optional[Union[bytes, str]] = None) -> List[HgRevision]: """Return the specified revisions of the Mercurial repository. Mercurial revsets are supported. (See `hg help revsets`) If no revision range is specified, return all revisions". """ if rev: node_ids = self._output("log", "-r", rev, "-T", "{node}\n").splitlines() else: node_ids = self._output("log", "-T", "{node}\n").splitlines() revisions = [self._revision(node_id) for node_id in reversed(node_ids)] return revisions def _revision(self, revision: bytes) -> HgRevision: data = self._output("log", "-r", revision, "-T", HG_REVISION_TEMPLATE) # hg log strips the description so the raw description has to be taken # from debugdata # The description follows some metadata and is separated from them # by an empty line _, desc = self._output("debugdata", "-c", revision).split(b"\n\n", 1) return HgRevision.from_bytes(data, desc) def up(self, rev: bytes) -> None: """Update the repository working directory to the specified revision.""" self._call("up", rev) def branches(self) -> List[HgBranch]: """List the repository named branches.""" output = self._output("branches", "-T", "{branch}\n{node}\n\n").strip() branches = [] for block in output.split(b"\n\n"): name, node_id = block.splitlines() branches.append(HgBranch(name=name, node_id=node_id)) return branches def tip(self) -> HgRevision: """Return the `tip` node-id.""" return self.log("tip")[0] def tags(self) -> List[HgTag]: """Return the repository's tags as defined in the `.hgtags` file. `.hgtags` being like any other repository's tracked file, its content can vary from revision to revision. The returned value therefore depends on the current revision of the repository. """ hgtags = self._root / ".hgtags" tags = {} if hgtags.is_file(): for line in hgtags.read_bytes().splitlines(): match = TAG_PATTERN.match(line) if match is None: continue node_id, name = match.groups() tags[node_id] = name return [HgTag(name=name, node_id=node_id) for node_id, name in tags.items()] @click.group() @click.option( "--directory", "-d", help=("Path to the Mercurial repository. If unset, the current directory is used"), ) @click.pass_context def main(ctx, directory=None): """Compute the Software Heritage persistent identifier (SWHID) for the given source code object(s). For more details about SWHIDs see: https://docs.softwareheritage.org/devel/swh-model/persistent-identifiers.html """ # ensure that ctx.obj exists and is a dict (in case `cli()` is called # by means other than the `if` block below) ctx.ensure_object(dict) root = Path(directory) if directory else Path() if not root.exists(): raise IOError(f"{root!r} does not exists") ctx.obj["HG_ROOT"] = root def identify_directory(path: Path) -> str: """Return the SWHID of the given path.""" uri = identify_object( "directory", follow_symlinks=True, exclude_patterns=[".hg"], obj=str(path) )[1] return uri.split(":")[-1] class RevisionIdentity(NamedTuple): """Represent a swh revision identity.""" swhid: bytes """SWHID raw bytes""" node_id: bytes """node_id hex bytes""" directory_swhid: bytes def dir_uri(self) -> str: """Return the SWHID uri of the revision's directory.""" return f"swh:1:dir:{self.directory_swhid.hex()}\t{self.node_id.decode()}" def __str__(self) -> str: """Return the string representation of a RevisionIdentity.""" uri = swhid("revision", self.swhid.hex()) return f"{uri}\t{self.node_id.decode()}" def identify_revision( hg: Hg, rev: Optional[bytes] = None, node_id_2_swhid: Optional[Dict[bytes, bytes]] = None, ) -> Iterator[RevisionIdentity]: """Return the repository revision identities. hg: A `Hg` repository instance rev: An optional revision or Mercurial revsets (See `hg help revsets`) If not provided all the repository revisions will be computed. node_id_2_swhid: An optional cache mapping hg node ids to SWHIDs It will be updated in place with new mappings. """ from swh.model.hashutil import hash_to_bytes from swh.model.model import Revision if node_id_2_swhid is None: node_id_2_swhid = {} for revision in hg.log(rev): data = revision.to_dict() hg.up(revision.node_id) directory_swhid = hash_to_bytes(identify_directory(hg.root())) data["directory"] = directory_swhid parents = [] for parent in data["parents"]: if parent not in node_id_2_swhid: parent_revision = next(identify_revision(hg, parent, node_id_2_swhid)) node_id_2_swhid[parent] = parent_revision.swhid parents.append(node_id_2_swhid[parent]) data["parents"] = parents revision_swhid = hash_to_bytes(Revision.from_dict(data).id) node_id_2_swhid[revision.node_id] = revision_swhid yield RevisionIdentity( swhid=revision_swhid, node_id=revision.node_id, directory_swhid=directory_swhid, ) class ReleaseIdentity(NamedTuple): """Represent a swh release identity.""" swhid: str """SWHID hex string""" node_id: bytes """node_id hex bytes""" name: bytes """name of the release""" def __str__(self) -> str: """Return the string representation of a ReleaseIdentity.""" uri = swhid("release", self.swhid) return f"{uri}\t{self.name.decode()}" def identify_release( hg: Hg, node_id_2_swhid: Optional[Dict[bytes, bytes]] = None, ) -> Iterator[ReleaseIdentity]: """Return the repository's release identities. hg: A `Hg` repository instance node_id_2_swhid: An optional cache mapping hg node ids to SWHIDs If not provided it will be computed using `identify_revision`. """ from swh.model.model import ObjectType, Release if node_id_2_swhid is None: node_id_2_swhid = { revision.node_id: revision.swhid for revision in identify_revision(hg) } for tag in hg.tags(): data = { "name": tag.name, "target": node_id_2_swhid[tag.node_id], "target_type": ObjectType.REVISION.value, "message": None, "metadata": None, "synthetic": False, "author": {"name": None, "email": None, "fullname": b""}, "date": None, } release_swhid = Release.from_dict(data).id yield ReleaseIdentity( swhid=release_swhid, node_id=tag.node_id, name=tag.name, ) def identify_snapshot( hg: Hg, node_id_2_swhid: Optional[Dict[bytes, bytes]] = None, releases: Optional[List[ReleaseIdentity]] = None, ) -> str: """Return the repository snapshot identity. hg: A `Hg` repository instance node_id_2_swhid: An optional cache mapping hg node ids to SWHIDs If not provided it will be computed using `identify_revision`. release: an optional list of `ReleaseIdentity`. If not provided it will be computed using `identify_release`. """ from swh.model.model import Snapshot, TargetType if node_id_2_swhid is None: node_id_2_swhid = { revision.node_id: revision.swhid for revision in identify_revision(hg) } if releases is None: releases = [release for release in identify_release(hg, node_id_2_swhid)] branches = {} tip = hg.tip() branches[b"HEAD"] = { "target": tip.branch(), "target_type": TargetType.ALIAS.value, } for branch in hg.branches(): branches[branch.name] = { "target": node_id_2_swhid[branch.node_id], "target_type": TargetType.REVISION.value, } for release in releases: branches[release.name] = { "target": release.swhid, "target_type": TargetType.RELEASE.value, } return Snapshot.from_dict({"branches": branches}).id @main.command() @click.argument("rev", required=False) @click.pass_context def revision(ctx, rev): """Compute the SWHID of a given revision. If specified REV allow to select a single or multiple revisions (using the Mercurial revsets language: `hg help revsets`) """ hg = Hg(ctx.obj["HG_ROOT"]) for identity in identify_revision(hg, rev): click.echo(identity) @main.command() @click.pass_context def snapshot(ctx): """Compute the SWHID of the snapshot.""" root = ctx.obj["HG_ROOT"] hg = Hg(root) snapshot_swhid = identify_snapshot(hg) uri = swhid("snapshot", snapshot_swhid) click.echo(f"{uri}\t{root}") @main.command() @click.pass_context def all(ctx): """Compute the SWHID of all the repository objects.""" root = ctx.obj["HG_ROOT"] hg = Hg(root) dir_uris = [] rev_uris = [] rel_uris = [] node_id_2_swhid = {} for revision in identify_revision(hg): dir_uris.append(revision.dir_uri()) rev_uris.append(str(revision)) node_id_2_swhid[revision.node_id] = revision.swhid releases = [] for release in identify_release(hg, node_id_2_swhid): rel_uris.append(str(release)) releases.append(release) snapshot_swhid = identify_snapshot(hg, node_id_2_swhid, releases) for uri in dir_uris + rev_uris + rel_uris: click.echo(uri) uri = swhid("snapshot", snapshot_swhid) click.echo(f"{uri}\t{root}") if __name__ == "__main__": main() diff --git a/swh/loader/mercurial/loader.py b/swh/loader/mercurial/loader.py index 45f971d..194d982 100644 --- a/swh/loader/mercurial/loader.py +++ b/swh/loader/mercurial/loader.py @@ -1,671 +1,677 @@ # Copyright (C) 2017-2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information """This document contains a SWH loader for ingesting repository data from Mercurial version 2 bundle files. """ # NOTE: The code here does expensive work twice in places because of the # intermediate need to check for what is missing before sending to the database # and the desire to not juggle very large amounts of data. # TODO: Decide whether to also serialize to disk and read back more quickly # from there. Maybe only for very large repos and fast drives. # - Avi import datetime import os from queue import Empty import random import re from shutil import rmtree from tempfile import mkdtemp import time from typing import Any, Dict, Iterable, List, Optional import billiard import hglib from hglib.error import CommandError from swh.loader.core.loader import DVCSLoader from swh.loader.core.utils import clean_dangling_folders from swh.loader.exception import NotFound +from swh.loader.mercurial.utils import get_minimum_env from swh.model import identifiers from swh.model.hashutil import ( DEFAULT_ALGORITHMS, MultiHash, hash_to_bytehex, hash_to_bytes, hash_to_hex, ) from swh.model.model import ( BaseContent, Content, Directory, ObjectType, Origin, Person, Release, Revision, RevisionType, Sha1Git, SkippedContent, Snapshot, SnapshotBranch, TargetType, TimestampWithTimezone, ) from swh.storage.algos.origin import origin_get_latest_visit_status from swh.storage.interface import StorageInterface from . import converters from .archive_extract import tmp_extract from .bundle20_reader import Bundle20Reader from .converters import PRIMARY_ALGO as ALGO from .objects import SelectiveCache, SimpleTree TAG_PATTERN = re.compile("[0-9A-Fa-f]{40}") TEMPORARY_DIR_PREFIX_PATTERN = "swh.loader.mercurial." HEAD_POINTER_NAME = b"tip" class CommandErrorWrapper(Exception): """This exception is raised in place of a 'CommandError' exception (raised by the underlying hglib library) This is needed because billiard.Queue is serializing the queued object and as CommandError doesn't have a constructor without parameters, the deserialization is failing """ def __init__(self, err: Optional[bytes]): self.err = err class CloneTimeoutError(Exception): pass class HgBundle20Loader(DVCSLoader): """Mercurial loader able to deal with remote or local repository. """ visit_type = "hg" def __init__( self, storage: StorageInterface, url: str, visit_date: Optional[datetime.datetime] = None, directory: Optional[str] = None, logging_class="swh.loader.mercurial.Bundle20Loader", bundle_filename: Optional[str] = "HG20_none_bundle", reduce_effort: bool = False, temp_directory: str = "/tmp", cache1_size: int = 800 * 1024 * 1024, cache2_size: int = 800 * 1024 * 1024, clone_timeout_seconds: int = 7200, save_data_path: Optional[str] = None, max_content_size: Optional[int] = None, ): super().__init__( storage=storage, logging_class=logging_class, save_data_path=save_data_path, max_content_size=max_content_size, ) self.origin_url = url self.visit_date = visit_date self.directory = directory self.bundle_filename = bundle_filename self.reduce_effort_flag = reduce_effort self.empty_repository = None self.temp_directory = temp_directory self.cache1_size = cache1_size self.cache2_size = cache2_size self.clone_timeout = clone_timeout_seconds self.working_directory = None self.bundle_path = None self.heads: Dict[bytes, Any] = {} self.releases: Dict[bytes, Any] = {} self.last_snapshot_id: Optional[bytes] = None + self.old_environ = os.environ.copy() + os.environ.clear() + os.environ.update(get_minimum_env()) def pre_cleanup(self): """Cleanup potential dangling files from prior runs (e.g. OOM killed tasks) """ clean_dangling_folders( self.temp_directory, pattern_check=TEMPORARY_DIR_PREFIX_PATTERN, log=self.log, ) def cleanup(self): """Clean temporary working directory """ + os.environ.clear() + os.environ.update(self.old_environ) if self.bundle_path and os.path.exists(self.bundle_path): self.log.debug("Cleanup up working bundle %s" % self.bundle_path) os.unlink(self.bundle_path) if self.working_directory and os.path.exists(self.working_directory): self.log.debug( "Cleanup up working directory %s" % (self.working_directory,) ) rmtree(self.working_directory) def get_heads(self, repo): """Read the closed branches heads (branch, bookmarks) and returns a dict with key the branch_name (bytes) and values the tuple (pointer nature (bytes), mercurial's node id (bytes)). Those needs conversion to swh-ids. This is taken care of in get_revisions. """ b = {} for _, node_hash_id, pointer_nature, branch_name, *_ in repo.heads(): b[branch_name] = (pointer_nature, hash_to_bytes(node_hash_id.decode())) bookmarks = repo.bookmarks() if bookmarks and bookmarks[0]: for bookmark_name, _, target_short in bookmarks[0]: target = repo[target_short].node() b[bookmark_name] = (None, hash_to_bytes(target.decode())) return b def prepare_origin_visit(self) -> None: self.origin = Origin(url=self.origin_url) visit_status = origin_get_latest_visit_status( self.storage, self.origin_url, require_snapshot=True ) self.last_snapshot_id = None if visit_status is None else visit_status.snapshot @staticmethod def clone_with_timeout(log, origin, destination, timeout): queue = billiard.Queue() start = time.monotonic() def do_clone(queue, origin, destination): try: result = hglib.clone(source=origin, dest=destination, noupdate=True) except CommandError as e: # the queued object need an empty constructor to be deserialized later queue.put(CommandErrorWrapper(e.err)) except BaseException as e: queue.put(e) else: queue.put(result) process = billiard.Process(target=do_clone, args=(queue, origin, destination)) process.start() while True: try: result = queue.get(timeout=0.1) break except Empty: duration = time.monotonic() - start if timeout and duration > timeout: log.warning( "Timeout cloning `%s` within %s seconds", origin, timeout ) process.terminate() process.join() raise CloneTimeoutError(origin, timeout) continue process.join() if isinstance(result, Exception): raise result from None return result def prepare(self): """Prepare the necessary steps to load an actual remote or local repository. To load a local repository, pass the optional directory parameter as filled with a path to a real local folder. To load a remote repository, pass the optional directory parameter as None. Args: origin_url (str): Origin url to load visit_date (str/datetime): Date of the visit directory (str/None): The local directory to load """ self.branches = {} self.tags = [] self.releases = {} self.node_2_rev = {} self.heads = {} directory = self.directory if not directory: # remote repository self.working_directory = mkdtemp( prefix=TEMPORARY_DIR_PREFIX_PATTERN, suffix="-%s" % os.getpid(), dir=self.temp_directory, ) os.makedirs(self.working_directory, exist_ok=True) self.hgdir = self.working_directory self.log.debug( "Cloning %s to %s with timeout %s seconds", self.origin_url, self.hgdir, self.clone_timeout, ) try: self.clone_with_timeout( self.log, self.origin_url, self.hgdir, self.clone_timeout ) except CommandErrorWrapper as e: for msg in [ b"does not appear to be an hg repository", b"404: not found", b"name or service not known", ]: if msg in e.err.lower(): raise NotFound(e.args[0]) from None raise e else: # local repository self.working_directory = None self.hgdir = directory self.bundle_path = os.path.join(self.hgdir, self.bundle_filename) self.log.debug("Bundling at %s" % self.bundle_path) with hglib.open(self.hgdir) as repo: self.heads = self.get_heads(repo) repo.bundle(bytes(self.bundle_path, "utf-8"), all=True, type=b"none-v2") self.cache_filename1 = os.path.join( self.hgdir, "swh-cache-1-%s" % (hex(random.randint(0, 0xFFFFFF))[2:],) ) self.cache_filename2 = os.path.join( self.hgdir, "swh-cache-2-%s" % (hex(random.randint(0, 0xFFFFFF))[2:],) ) try: self.br = Bundle20Reader( bundlefile=self.bundle_path, cache_filename=self.cache_filename1, cache_size=self.cache1_size, ) except FileNotFoundError: # Empty repository! Still a successful visit targeting an # empty snapshot self.log.warn("%s is an empty repository!" % self.hgdir) self.empty_repository = True else: self.reduce_effort = set() if self.reduce_effort_flag: now = datetime.datetime.now(tz=datetime.timezone.utc) if (now - self.visit_date).days > 1: # Assuming that self.visit_date would be today for # a new visit, treat older visit dates as # indication of wanting to skip some processing # effort. for header, commit in self.br.yield_all_changesets(): ts = commit["time"].timestamp() if ts < self.visit_date.timestamp(): self.reduce_effort.add(header["node"]) def has_contents(self): return not self.empty_repository def has_directories(self): return not self.empty_repository def has_revisions(self): return not self.empty_repository def has_releases(self): return not self.empty_repository def fetch_data(self): """Fetch the data from the data source.""" pass def get_contents(self) -> Iterable[BaseContent]: """Get the contents that need to be loaded.""" # NOTE: This method generates blobs twice to reduce memory usage # without generating disk writes. self.file_node_to_hash = {} hash_to_info = {} self.num_contents = 0 contents: Dict[bytes, BaseContent] = {} missing_contents = set() for blob, node_info in self.br.yield_all_blobs(): self.num_contents += 1 file_name = node_info[0] header = node_info[2] length = len(blob) if header["linknode"] in self.reduce_effort: algorithms = set([ALGO]) else: algorithms = DEFAULT_ALGORITHMS h = MultiHash.from_data(blob, hash_names=algorithms) content = h.digest() content["length"] = length blob_hash = content[ALGO] self.file_node_to_hash[header["node"]] = blob_hash if header["linknode"] in self.reduce_effort: continue hash_to_info[blob_hash] = node_info if self.max_content_size is not None and length >= self.max_content_size: contents[blob_hash] = SkippedContent( status="absent", reason="Content too large", **content ) else: contents[blob_hash] = Content(data=blob, status="visible", **content) if file_name == b".hgtags": # https://www.mercurial-scm.org/wiki/GitConcepts#Tag_model # overwrite until the last one self.tags = (t for t in blob.split(b"\n") if t != b"") if contents: missing_contents = set( self.storage.content_missing( [c.to_dict() for c in contents.values()], key_hash=ALGO ) ) # Clusters needed blobs by file offset and then only fetches the # groups at the needed offsets. focs: Dict[int, Dict[bytes, bytes]] = {} # "file/offset/contents" for blob_hash in missing_contents: _, file_offset, header = hash_to_info[blob_hash] focs.setdefault(file_offset, {}) focs[file_offset][header["node"]] = blob_hash for offset, node_hashes in sorted(focs.items()): for header, data, *_ in self.br.yield_group_objects(group_offset=offset): node = header["node"] if node in node_hashes: blob, meta = self.br.extract_meta_from_blob(data) content = contents.pop(node_hashes[node], None) if content: if ( self.max_content_size is not None and len(blob) >= self.max_content_size ): yield SkippedContent.from_data( blob, reason="Content too large" ) else: yield Content.from_data(blob) def load_directories(self): """This is where the work is done to convert manifest deltas from the repository bundle into SWH directories. """ self.mnode_to_tree_id = {} cache_hints = self.br.build_manifest_hints() def tree_size(t): return t.size() self.trees = SelectiveCache( cache_hints=cache_hints, size_function=tree_size, filename=self.cache_filename2, max_size=self.cache2_size, ) tree = SimpleTree() for header, added, removed in self.br.yield_all_manifest_deltas(cache_hints): node = header["node"] basenode = header["basenode"] tree = self.trees.fetch(basenode) or tree # working tree for path in removed.keys(): tree = tree.remove_tree_node_for_path(path) for path, info in added.items(): file_node, is_symlink, perms_code = info tree = tree.add_blob( path, self.file_node_to_hash[file_node], is_symlink, perms_code ) if header["linknode"] in self.reduce_effort: self.trees.store(node, tree) else: new_dirs = [] self.mnode_to_tree_id[node] = tree.hash_changed(new_dirs) self.trees.store(node, tree) yield header, tree, new_dirs def get_directories(self) -> Iterable[Directory]: """Compute directories to load """ dirs: Dict[Sha1Git, Directory] = {} self.num_directories = 0 for _, _, new_dirs in self.load_directories(): for d in new_dirs: self.num_directories += 1 dirs[d["id"]] = Directory.from_dict(d) missing_dirs: List[Sha1Git] = list(dirs.keys()) if missing_dirs: missing_dirs = list(self.storage.directory_missing(missing_dirs)) for _id in missing_dirs: yield dirs[_id] def get_revisions(self) -> Iterable[Revision]: """Compute revisions to load """ revisions = {} self.num_revisions = 0 for header, commit in self.br.yield_all_changesets(): if header["node"] in self.reduce_effort: continue self.num_revisions += 1 date_dict = identifiers.normalize_timestamp(int(commit["time"].timestamp())) author_dict = converters.parse_author(commit["user"]) if commit["manifest"] == Bundle20Reader.NAUGHT_NODE: directory_id = SimpleTree().hash_changed() else: directory_id = self.mnode_to_tree_id[commit["manifest"]] extra_headers = [ ( b"time_offset_seconds", str(commit["time_offset_seconds"]).encode("utf-8"), ) ] extra = commit.get("extra") if extra: for e in extra.split(b"\x00"): k, v = e.split(b":", 1) # transplant_source stores binary reference to a changeset # prefer to dump hexadecimal one in the revision metadata if k == b"transplant_source": v = hash_to_bytehex(v) extra_headers.append((k, v)) parents = [] p1 = self.node_2_rev.get(header["p1"]) p2 = self.node_2_rev.get(header["p2"]) if p1: parents.append(p1) if p2: parents.append(p2) revision = Revision( author=Person.from_dict(author_dict), date=TimestampWithTimezone.from_dict(date_dict), committer=Person.from_dict(author_dict), committer_date=TimestampWithTimezone.from_dict(date_dict), type=RevisionType.MERCURIAL, directory=directory_id, message=commit["message"], metadata={"node": hash_to_hex(header["node"]),}, extra_headers=tuple(extra_headers), synthetic=False, parents=tuple(parents), ) self.node_2_rev[header["node"]] = revision.id revisions[revision.id] = revision # Converts heads to use swh ids self.heads = { branch_name: (pointer_nature, self.node_2_rev[node_id]) for branch_name, (pointer_nature, node_id) in self.heads.items() } missing_revs = set(revisions.keys()) if missing_revs: missing_revs = set(self.storage.revision_missing(list(missing_revs))) for rev in missing_revs: yield revisions[rev] self.mnode_to_tree_id = None def _read_tag(self, tag, split_byte=b" "): node, *name = tag.split(split_byte) name = split_byte.join(name) return node, name def get_releases(self) -> Iterable[Release]: """Get the releases that need to be loaded.""" self.num_releases = 0 releases = {} missing_releases = set() for t in self.tags: self.num_releases += 1 node, name = self._read_tag(t) node = node.decode() node_bytes = hash_to_bytes(node) if not TAG_PATTERN.match(node): self.log.warn("Wrong pattern (%s) found in tags. Skipping" % (node,)) continue if node_bytes not in self.node_2_rev: self.log.warn( "No matching revision for tag %s " "(hg changeset: %s). Skipping" % (name.decode(), node) ) continue tgt_rev = self.node_2_rev[node_bytes] release = Release( name=name, target=tgt_rev, target_type=ObjectType.REVISION, message=None, metadata=None, synthetic=False, author=Person(name=None, email=None, fullname=b""), date=None, ) missing_releases.add(release.id) releases[release.id] = release self.releases[name] = release.id if missing_releases: missing_releases = set(self.storage.release_missing(list(missing_releases))) for _id in missing_releases: yield releases[_id] def get_snapshot(self) -> Snapshot: """Get the snapshot that need to be loaded.""" branches: Dict[bytes, Optional[SnapshotBranch]] = {} for name, (pointer_nature, target) in self.heads.items(): branches[name] = SnapshotBranch( target=target, target_type=TargetType.REVISION ) if pointer_nature == HEAD_POINTER_NAME: branches[b"HEAD"] = SnapshotBranch( target=name, target_type=TargetType.ALIAS ) for name, target in self.releases.items(): branches[name] = SnapshotBranch( target=target, target_type=TargetType.RELEASE ) self.snapshot = Snapshot(branches=branches) return self.snapshot def get_fetch_history_result(self): """Return the data to store in fetch_history.""" return { "contents": self.num_contents, "directories": self.num_directories, "revisions": self.num_revisions, "releases": self.num_releases, } def load_status(self): snapshot = self.get_snapshot() load_status = "eventful" if self.last_snapshot_id is not None and self.last_snapshot_id == snapshot.id: load_status = "uneventful" return { "status": load_status, } class HgArchiveBundle20Loader(HgBundle20Loader): """Mercurial loader for repository wrapped within archives. """ def __init__( self, storage: StorageInterface, url: str, visit_date: Optional[datetime.datetime] = None, archive_path=None, temp_directory: str = "/tmp", max_content_size: Optional[int] = None, ): super().__init__( storage=storage, url=url, visit_date=visit_date, logging_class="swh.loader.mercurial.HgArchiveBundle20Loader", temp_directory=temp_directory, max_content_size=max_content_size, ) self.archive_extract_temp_dir = None self.archive_path = archive_path def prepare(self): self.archive_extract_temp_dir = tmp_extract( archive=self.archive_path, dir=self.temp_directory, prefix=TEMPORARY_DIR_PREFIX_PATTERN, suffix=".dump-%s" % os.getpid(), log=self.log, source=self.origin_url, ) repo_name = os.listdir(self.archive_extract_temp_dir)[0] self.directory = os.path.join(self.archive_extract_temp_dir, repo_name) super().prepare() diff --git a/swh/loader/mercurial/utils.py b/swh/loader/mercurial/utils.py index 319b4f4..d0a52c0 100644 --- a/swh/loader/mercurial/utils.py +++ b/swh/loader/mercurial/utils.py @@ -1,29 +1,44 @@ # Copyright (C) 2020-2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from datetime import datetime, timezone -from typing import Optional, Union +import os +from typing import Dict, Optional, Union from dateutil.parser import parse def parse_visit_date(visit_date: Optional[Union[datetime, str]]) -> Optional[datetime]: """Convert visit date from either None, a string or a datetime to either None or datetime. """ if visit_date is None: return None if isinstance(visit_date, datetime): return visit_date if visit_date == "now": return datetime.now(tz=timezone.utc) if isinstance(visit_date, str): return parse(visit_date) raise ValueError(f"invalid visit date {visit_date!r}") + + +def get_minimum_env() -> Dict[str, str]: + """Return the smallest viable environment for `hg` suprocesses""" + env = { + "HGPLAIN": "", # Tells Mercurial to disable output customization + "HGRCPATH": "", # Tells Mercurial to ignore config files + } + path = os.environ.get("PATH") + if path: + # Sometimes (in tests for example), there is no PATH. An empty PATH could be + # interpreted differently than a lack of PATH by some programs. + env["PATH"] = path + return env