diff --git a/requirements-swh.txt b/requirements-swh.txt index fa413d8..968d512 100644 --- a/requirements-swh.txt +++ b/requirements-swh.txt @@ -1,4 +1,4 @@ swh.model >= 0.4.0 swh.storage >= 0.22.0 swh.scheduler >= 0.0.39 -swh.loader.core >= 0.17.0 +swh.loader.core >= 0.18.0 diff --git a/swh/loader/mercurial/cli.py b/swh/loader/mercurial/cli.py index dfba9ed..63c13c1 100644 --- a/swh/loader/mercurial/cli.py +++ b/swh/loader/mercurial/cli.py @@ -1,59 +1,61 @@ -# Copyright (C) 2018 The Software Heritage developers +# Copyright (C) 2018-2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import datetime from itertools import chain import logging import click LOGLEVELS = list( chain.from_iterable( (logging._levelToName[lvl], logging._levelToName[lvl].lower()) for lvl in sorted(logging._levelToName.keys()) ) ) @click.command() @click.argument("origin-url") @click.option( "--hg-directory", "-d", help=( "Path to the hg (local) directory to load from. " "If unset, the hg repo will be cloned from the " "given (origin) url." ), ) @click.option("--hg-archive", "-a", help=("Path to the hg archive file to load from.")) @click.option("--visit-date", "-D", help="Visit date (defaults to now).") @click.option("--log-level", "-l", type=click.Choice(LOGLEVELS), help="Log level.") def main( origin_url, hg_directory=None, hg_archive=None, visit_date=None, log_level=None ): + from swh.storage import get_storage logging.basicConfig( level=(log_level or "DEBUG").upper(), format="%(asctime)s %(process)d %(message)s", ) if not visit_date: visit_date = datetime.datetime.now(tz=datetime.timezone.utc) kwargs = {"visit_date": visit_date, "origin_url": origin_url} if hg_archive: from .loader import HgArchiveBundle20Loader as HgLoader kwargs["archive_path"] = hg_archive else: from .loader import HgBundle20Loader as HgLoader kwargs["directory"] = hg_directory - return HgLoader().load(**kwargs) + storage = get_storage(cls="memory") + return HgLoader(storage, **kwargs).load() if __name__ == "__main__": main() diff --git a/swh/loader/mercurial/from_disk.py b/swh/loader/mercurial/from_disk.py index e73e8a0..78de435 100644 --- a/swh/loader/mercurial/from_disk.py +++ b/swh/loader/mercurial/from_disk.py @@ -1,533 +1,523 @@ -# Copyright (C) 2020 The Software Heritage developers +# Copyright (C) 2020-2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information -import os from collections import deque -from datetime import datetime, timezone +from datetime import datetime +import os from shutil import rmtree from tempfile import mkdtemp -from typing import Any, Deque, Dict, Optional, Tuple, TypeVar, Union - -import dateutil +from typing import Deque, Dict, Optional, Tuple, TypeVar, Union -from swh.core.config import merge_configs from swh.loader.core.loader import BaseLoader from swh.loader.core.utils import clean_dangling_folders +from swh.loader.mercurial.utils import parse_visit_date from swh.model.from_disk import Content, DentryPerms, Directory from swh.model.hashutil import MultiHash, hash_to_bytehex -from swh.model.model import Content as ModelContent from swh.model.model import ( ObjectType, Origin, Person, Release, Revision, RevisionType, Sha1Git, Snapshot, SnapshotBranch, TargetType, TimestampWithTimezone, ) +from swh.model.model import Content as ModelContent +from swh.storage.interface import StorageInterface from . import hgutil from .archive_extract import tmp_extract from .hgutil import HgNodeId FLAG_PERMS = { b"l": DentryPerms.symlink, b"x": DentryPerms.executable_content, b"": DentryPerms.content, } # type: Dict[bytes, DentryPerms] -DEFAULT_CONFIG: Dict[str, Any] = { - "temp_directory": "/tmp", - "clone_timeout_seconds": 7200, - "content_cache_size": 10_000, -} + TEMPORARY_DIR_PREFIX_PATTERN = "swh.loader.mercurial.from_disk" T = TypeVar("T") -def parse_visit_date(visit_date: Optional[Union[datetime, str]]) -> Optional[datetime]: - """Convert visit date from Optional[Union[str, datetime]] to Optional[datetime]. - - `HgLoaderFromDisk` accepts `str` and `datetime` as visit date - while `BaseLoader` only deals with `datetime`. - """ - if visit_date is None: - return None - - if isinstance(visit_date, datetime): - return visit_date - - if visit_date == "now": - return datetime.now(tz=timezone.utc) - - if isinstance(visit_date, str): - return dateutil.parser.parse(visit_date) - - return ValueError(f"invalid visit date {visit_date!r}") - - class HgDirectory(Directory): """A more practical directory. - creates missing parent directories - removes empty directories """ def __setitem__(self, path: bytes, value: Union[Content, "HgDirectory"]) -> None: if b"/" in path: head, tail = path.split(b"/", 1) directory = self.get(head) if directory is None or isinstance(directory, Content): directory = HgDirectory() self[head] = directory directory[tail] = value else: super().__setitem__(path, value) def __delitem__(self, path: bytes) -> None: super().__delitem__(path) while b"/" in path: # remove empty parent directories path = path.rsplit(b"/", 1)[0] if len(self[path]) == 0: super().__delitem__(path) else: break def get( self, path: bytes, default: Optional[T] = None ) -> Optional[Union[Content, "HgDirectory", T]]: # TODO move to swh.model.from_disk.Directory try: return self[path] except KeyError: return default class HgLoaderFromDisk(BaseLoader): """Load a mercurial repository from a local repository.""" CONFIG_BASE_FILENAME = "loader/mercurial" visit_type = "hg" def __init__( self, + storage: StorageInterface, url: str, directory: Optional[str] = None, logging_class: str = "swh.loader.mercurial.LoaderFromDisk", - visit_date: Optional[Union[datetime, str]] = None, - config: Optional[Dict[str, Any]] = None, + visit_date: Optional[datetime] = None, + temp_directory: str = "/tmp", + clone_timeout_seconds: int = 7200, + content_cache_size: int = 10_000, + max_content_size: Optional[int] = None, ): """Initialize the loader. Args: url: url of the repository. directory: directory of the local repository. logging_class: class of the loader logger. visit_date: visit date of the repository config: loader configuration """ - super().__init__(logging_class=logging_class, config=config or {}) + super().__init__( + storage=storage, + logging_class=logging_class, + max_content_size=max_content_size, + ) - self.config = merge_configs(DEFAULT_CONFIG, self.config) - self._temp_directory = self.config["temp_directory"] - self._clone_timeout = self.config["clone_timeout_seconds"] + self._temp_directory = temp_directory + self._clone_timeout = clone_timeout_seconds self.origin_url = url - self.visit_date = parse_visit_date(visit_date) + self.visit_date = visit_date self.directory = directory self._repo: Optional[hgutil.Repository] = None self._revision_nodeid_to_swhid: Dict[HgNodeId, Sha1Git] = {} self._repo_directory: Optional[str] = None # keeps the last processed hg nodeid # it is used for differential tree update by store_directories # NULLID is the parent of the first revision self._last_hg_nodeid = hgutil.NULLID # keeps the last revision tree # it is used for differential tree update by store_directories self._last_root = HgDirectory() # Cache the content hash across revisions to avoid recalculation. self._content_hash_cache: hgutil.LRUCacheDict = hgutil.LRUCacheDict( - self.config["content_cache_size"], + content_cache_size, ) def pre_cleanup(self) -> None: """As a first step, will try and check for dangling data to cleanup. This should do its best to avoid raising issues. """ clean_dangling_folders( self._temp_directory, pattern_check=TEMPORARY_DIR_PREFIX_PATTERN, log=self.log, ) def cleanup(self) -> None: """Last step executed by the loader.""" if self._repo_directory and os.path.exists(self._repo_directory): self.log.debug(f"Cleanup up repository {self._repo_directory}") rmtree(self._repo_directory) - def prepare_origin_visit(self, *args, **kwargs) -> None: + def prepare_origin_visit(self) -> None: """First step executed by the loader to prepare origin and visit references. Set/update self.origin, and optionally self.origin_url, self.visit_date. """ self.origin = Origin(url=self.origin_url) - def prepare(self, *args, **kwargs) -> None: + def prepare(self) -> None: """Second step executed by the loader to prepare some state needed by the loader. """ def fetch_data(self) -> bool: """Fetch the data from the source the loader is currently loading Returns: a value that is interpreted as a boolean. If True, fetch_data needs to be called again to complete loading. """ if not self.directory: # no local repository self._repo_directory = mkdtemp( prefix=TEMPORARY_DIR_PREFIX_PATTERN, suffix=f"-{os.getpid()}", dir=self._temp_directory, ) self.log.debug( f"Cloning {self.origin_url} to {self.directory} " f"with timeout {self._clone_timeout} seconds" ) hgutil.clone(self.origin_url, self._repo_directory, self._clone_timeout) else: # existing local repository # Allow to load on disk repository without cloning # for testing purpose. self._repo_directory = self.directory self._repo = hgutil.repository(self._repo_directory) return False def store_data(self): """Store fetched data in the database.""" for rev in self._repo: self.store_revision(self._repo[rev]) branch_by_hg_nodeid: Dict[HgNodeId, bytes] = { hg_nodeid: name for name, hg_nodeid in hgutil.branches(self._repo).items() } tags_by_name: Dict[bytes, HgNodeId] = self._repo.tags() tags_by_hg_nodeid: Dict[HgNodeId, bytes] = { hg_nodeid: name for name, hg_nodeid in tags_by_name.items() } snapshot_branches: Dict[bytes, SnapshotBranch] = {} for hg_nodeid, revision_swhid in self._revision_nodeid_to_swhid.items(): tag_name = tags_by_hg_nodeid.get(hg_nodeid) # tip is listed in the tags by the mercurial api # but its not a tag defined by the user in `.hgtags` if tag_name and tag_name != b"tip": snapshot_branches[tag_name] = SnapshotBranch( target=self.store_release(tag_name, revision_swhid), target_type=TargetType.RELEASE, ) if hg_nodeid in branch_by_hg_nodeid: name = branch_by_hg_nodeid[hg_nodeid] snapshot_branches[name] = SnapshotBranch( target=revision_swhid, target_type=TargetType.REVISION, ) # The tip is mapped to `HEAD` to match # the historical implementation if hg_nodeid == tags_by_name[b"tip"]: snapshot_branches[b"HEAD"] = SnapshotBranch( target=name, target_type=TargetType.ALIAS, ) snapshot = Snapshot(branches=snapshot_branches) self.storage.snapshot_add([snapshot]) self.flush() self.loaded_snapshot_id = snapshot.id def get_revision_id_from_hg_nodeid(self, hg_nodeid: HgNodeId) -> Sha1Git: """Return the swhid of a revision given its hg nodeid. Args: hg_nodeid: the hg nodeid of the revision. Returns: the swhid of the revision. """ return self._revision_nodeid_to_swhid[hg_nodeid] def get_revision_parents(self, rev_ctx: hgutil.BaseContext) -> Tuple[Sha1Git, ...]: """Return the swhids of the parent revisions. Args: hg_nodeid: the hg nodeid of the revision. Returns: the swhids of the parent revisions. """ parents = [] for parent_ctx in rev_ctx.parents(): parent_hg_nodeid = parent_ctx.node() # nullid is the value of a parent that does not exist if parent_hg_nodeid == hgutil.NULLID: continue parents.append(self.get_revision_id_from_hg_nodeid(parent_hg_nodeid)) return tuple(parents) def store_revision(self, rev_ctx: hgutil.BaseContext) -> None: """Store a revision given its hg nodeid. Args: rev_ctx: the he revision context. Returns: the swhid of the stored revision. """ hg_nodeid = rev_ctx.node() root_swhid = self.store_directories(rev_ctx) # `Person.from_fullname` is compatible with mercurial's freeform author # as fullname is what is used in revision hash when available. author = Person.from_fullname(rev_ctx.user()) (timestamp, offset) = rev_ctx.date() # TimestampWithTimezone.from_dict will change name # as it accept more than just dicts rev_date = TimestampWithTimezone.from_dict(int(timestamp)) extra_headers = [ (b"time_offset_seconds", str(offset).encode(),), ] for key, value in rev_ctx.extra().items(): # The default branch is skipped to match # the historical implementation if key == b"branch" and value == b"default": continue # transplant_source is converted to match # the historical implementation if key == b"transplant_source": value = hash_to_bytehex(value) extra_headers.append((key, value)) revision = Revision( author=author, date=rev_date, committer=author, committer_date=rev_date, type=RevisionType.MERCURIAL, directory=root_swhid, message=rev_ctx.description(), metadata={"node": hg_nodeid.hex()}, extra_headers=tuple(extra_headers), synthetic=False, parents=self.get_revision_parents(rev_ctx), ) self._revision_nodeid_to_swhid[hg_nodeid] = revision.id self.storage.revision_add([revision]) def store_release(self, name: bytes, target=Sha1Git) -> Sha1Git: """Store a release given its name and its target. A release correspond to a user defined tag in mercurial. The mercurial api as a `tip` tag that must be ignored. Args: name: name of the release. target: swhid of the target revision. Returns: the swhid of the stored release. """ release = Release( name=name, target=target, target_type=ObjectType.REVISION, message=None, metadata=None, synthetic=False, author=Person(name=None, email=None, fullname=b""), date=None, ) self.storage.release_add([release]) return release.id def store_content(self, rev_ctx: hgutil.BaseContext, file_path: bytes) -> Content: """Store a revision content hg nodeid and file path. Content is a mix of file content at a given revision and its permissions found in the changeset's manifest. Args: rev_ctx: the he revision context. file_path: the hg path of the content. Returns: the swhid of the top level directory. """ hg_nodeid = rev_ctx.node() file_ctx = rev_ctx[file_path] file_nodeid = file_ctx.filenode() perms = FLAG_PERMS[file_ctx.flags()] # Key is file_nodeid + perms because permissions does not participate # in content hash in hg while it is the case in swh. cache_key = (file_nodeid, perms) sha1_git = self._content_hash_cache.get(cache_key) if sha1_git is not None: return Content({"sha1_git": sha1_git, "perms": perms}) data = file_ctx.data() content_data = MultiHash.from_data(data).digest() content_data["length"] = len(data) content_data["perms"] = perms content_data["data"] = data content_data["status"] = "visible" content = Content(content_data) model = content.to_model() if isinstance(model, ModelContent): self.storage.content_add([model]) else: raise ValueError( f"{file_path!r} at rev {hg_nodeid.hex()!r} " "produced {type(model)!r} instead of {ModelContent!r}" ) self._content_hash_cache[cache_key] = content.hash # Here we make sure to return only necessary data. return Content({"sha1_git": content.hash, "perms": perms}) def store_directories(self, rev_ctx: hgutil.BaseContext) -> Sha1Git: """Store a revision directories given its hg nodeid. Mercurial as no directory as in git. A Git like tree must be build from file paths to obtain each directory hash. Args: rev_ctx: the he revision context. Returns: the swhid of the top level directory. """ repo: hgutil.Repository = self._repo # mypy can't infer that repo is not None prev_ctx = repo[self._last_hg_nodeid] # TODO maybe do diff on parents status = prev_ctx.status(rev_ctx) for file_path in status.removed: del self._last_root[file_path] for file_path in status.added: content = self.store_content(rev_ctx, file_path) self._last_root[file_path] = content for file_path in status.modified: content = self.store_content(rev_ctx, file_path) self._last_root[file_path] = content self._last_hg_nodeid = rev_ctx.node() directories: Deque[Directory] = deque([self._last_root]) while directories: directory = directories.pop() self.storage.directory_add([directory.to_model()]) directories.extend( [item for item in directory.values() if isinstance(item, Directory)] ) return self._last_root.hash class HgArchiveLoaderFromDisk(HgLoaderFromDisk): """Mercurial loader for repository wrapped within tarballs.""" def __init__( - self, url: str, visit_date: Optional[datetime] = None, archive_path: str = None + self, + storage: StorageInterface, + url: str, + visit_date: Optional[datetime] = None, + archive_path: str = None, + temp_directory: str = "/tmp", + max_content_size: Optional[int] = None, ): super().__init__( - url, + storage=storage, + url=url, visit_date=visit_date, logging_class="swh.loader.mercurial.ArchiveLoaderFromDisk", + temp_directory=temp_directory, + max_content_size=max_content_size, ) - self.temp_dir = None + self.archive_extract_temp_dir = None self.archive_path = archive_path - def prepare(self, *args, **kwargs): + def prepare(self): """Extract the archive instead of cloning.""" - self._temp_directory = tmp_extract( + self.archive_extract_temp_dir = tmp_extract( archive=self.archive_path, dir=self._temp_directory, prefix=TEMPORARY_DIR_PREFIX_PATTERN, suffix=f".dump-{os.getpid()}", log=self.log, source=self.origin_url, ) repo_name = os.listdir(self.temp_dir)[0] - self.directory = os.path.join(self.temp_dir, repo_name) - super().prepare(*args, **kwargs) - - def cleanup(self) -> None: - """Remove the extracted archive instead of the cloned repository.""" - if self.temp_dir and os.path.exists(self.temp_dir): - rmtree(self.temp_dir) - super().cleanup() + self.directory = os.path.join(self.archive_extract_temp_dir, repo_name) + super().prepare() # Allow direct usage of the loader from the command line with # `python -m swh.loader.mercurial.from_disk $ORIGIN_URL` if __name__ == "__main__": import logging import click logging.basicConfig( level=logging.DEBUG, format="%(asctime)s %(process)d %(message)s" ) @click.command() @click.option("--origin-url", help="origin url") @click.option("--hg-directory", help="Path to mercurial repository to load") @click.option("--visit-date", default=None, help="Visit date") def main(origin_url, hg_directory, visit_date): + from swh.storage import get_storage + + storage = get_storage(cls="memory") return HgLoaderFromDisk( - origin_url, directory=hg_directory, visit_date=visit_date + storage, + origin_url, + directory=hg_directory, + visit_date=parse_visit_date(visit_date), ).load() main() diff --git a/swh/loader/mercurial/loader.py b/swh/loader/mercurial/loader.py index da6eb55..0d5eb09 100644 --- a/swh/loader/mercurial/loader.py +++ b/swh/loader/mercurial/loader.py @@ -1,667 +1,671 @@ # Copyright (C) 2017-2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information """This document contains a SWH loader for ingesting repository data from Mercurial version 2 bundle files. """ # NOTE: The code here does expensive work twice in places because of the # intermediate need to check for what is missing before sending to the database # and the desire to not juggle very large amounts of data. # TODO: Decide whether to also serialize to disk and read back more quickly # from there. Maybe only for very large repos and fast drives. # - Avi import datetime import os from queue import Empty import random import re from shutil import rmtree from tempfile import mkdtemp import time from typing import Any, Dict, Iterable, List, Optional import billiard -from dateutil import parser import hglib from hglib.error import CommandError -from swh.core.config import merge_configs from swh.loader.core.loader import DVCSLoader from swh.loader.core.utils import clean_dangling_folders from swh.loader.exception import NotFound from swh.model import identifiers from swh.model.hashutil import ( DEFAULT_ALGORITHMS, MultiHash, hash_to_bytehex, hash_to_bytes, hash_to_hex, ) from swh.model.model import ( BaseContent, Content, Directory, ObjectType, Origin, Person, Release, Revision, RevisionType, Sha1Git, SkippedContent, Snapshot, SnapshotBranch, TargetType, TimestampWithTimezone, ) from swh.storage.algos.origin import origin_get_latest_visit_status +from swh.storage.interface import StorageInterface from . import converters from .archive_extract import tmp_extract from .bundle20_reader import Bundle20Reader from .converters import PRIMARY_ALGO as ALGO from .objects import SelectiveCache, SimpleTree TAG_PATTERN = re.compile("[0-9A-Fa-f]{40}") TEMPORARY_DIR_PREFIX_PATTERN = "swh.loader.mercurial." HEAD_POINTER_NAME = b"tip" class CommandErrorWrapper(Exception): """This exception is raised in place of a 'CommandError' exception (raised by the underlying hglib library) This is needed because billiard.Queue is serializing the queued object and as CommandError doesn't have a constructor without parameters, the deserialization is failing """ def __init__(self, err: Optional[bytes]): self.err = err class CloneTimeoutError(Exception): pass -DEFAULT_CONFIG: Dict[str, Any] = { - "bundle_filename": "HG20_none_bundle", - "reduce_effort": False, - "temp_directory": "/tmp", - "cache1_size": 800 * 1024 * 1024, - "cache2_size": 800 * 1024 * 1024, - "clone_timeout_seconds": 7200, -} - - class HgBundle20Loader(DVCSLoader): """Mercurial loader able to deal with remote or local repository. """ visit_type = "hg" def __init__( self, - url, - visit_date=None, - directory=None, + storage: StorageInterface, + url: str, + visit_date: Optional[datetime.datetime] = None, + directory: Optional[str] = None, logging_class="swh.loader.mercurial.Bundle20Loader", + bundle_filename: Optional[str] = "HG20_none_bundle", + reduce_effort: bool = False, + temp_directory: str = "/tmp", + cache1_size: int = 800 * 1024 * 1024, + cache2_size: int = 800 * 1024 * 1024, + clone_timeout_seconds: int = 7200, + save_data_path: Optional[str] = None, + max_content_size: Optional[int] = None, ): - super().__init__(logging_class=logging_class) - self.config = merge_configs(DEFAULT_CONFIG, self.config) + super().__init__( + storage=storage, + logging_class=logging_class, + save_data_path=save_data_path, + max_content_size=max_content_size, + ) self.origin_url = url self.visit_date = visit_date self.directory = directory - self.bundle_filename = self.config["bundle_filename"] - self.reduce_effort_flag = self.config["reduce_effort"] + self.bundle_filename = bundle_filename + self.reduce_effort_flag = reduce_effort self.empty_repository = None - self.temp_directory = self.config["temp_directory"] - self.cache1_size = self.config["cache1_size"] - self.cache2_size = self.config["cache2_size"] - self.clone_timeout = self.config["clone_timeout_seconds"] + self.temp_directory = temp_directory + self.cache1_size = cache1_size + self.cache2_size = cache2_size + self.clone_timeout = clone_timeout_seconds self.working_directory = None self.bundle_path = None - self.heads = {} - self.releases = {} + self.heads: Dict[bytes, Any] = {} + self.releases: Dict[bytes, Any] = {} self.last_snapshot_id: Optional[bytes] = None def pre_cleanup(self): """Cleanup potential dangling files from prior runs (e.g. OOM killed tasks) """ clean_dangling_folders( self.temp_directory, pattern_check=TEMPORARY_DIR_PREFIX_PATTERN, log=self.log, ) def cleanup(self): """Clean temporary working directory """ if self.bundle_path and os.path.exists(self.bundle_path): self.log.debug("Cleanup up working bundle %s" % self.bundle_path) os.unlink(self.bundle_path) if self.working_directory and os.path.exists(self.working_directory): self.log.debug( "Cleanup up working directory %s" % (self.working_directory,) ) rmtree(self.working_directory) def get_heads(self, repo): """Read the closed branches heads (branch, bookmarks) and returns a dict with key the branch_name (bytes) and values the tuple (pointer nature (bytes), mercurial's node id (bytes)). Those needs conversion to swh-ids. This is taken care of in get_revisions. """ b = {} for _, node_hash_id, pointer_nature, branch_name, *_ in repo.heads(): b[branch_name] = (pointer_nature, hash_to_bytes(node_hash_id.decode())) bookmarks = repo.bookmarks() if bookmarks and bookmarks[0]: for bookmark_name, _, target_short in bookmarks[0]: target = repo[target_short].node() b[bookmark_name] = (None, hash_to_bytes(target.decode())) return b - def prepare_origin_visit(self, *args, **kwargs) -> None: + def prepare_origin_visit(self) -> None: self.origin = Origin(url=self.origin_url) - visit_date = self.visit_date - if isinstance(visit_date, str): # visit_date can be string or datetime - visit_date = parser.parse(visit_date) - self.visit_date = visit_date visit_status = origin_get_latest_visit_status( self.storage, self.origin_url, require_snapshot=True ) self.last_snapshot_id = None if visit_status is None else visit_status.snapshot @staticmethod def clone_with_timeout(log, origin, destination, timeout): queue = billiard.Queue() start = time.monotonic() def do_clone(queue, origin, destination): try: result = hglib.clone(source=origin, dest=destination) except CommandError as e: # the queued object need an empty constructor to be deserialized later queue.put(CommandErrorWrapper(e.err)) except BaseException as e: queue.put(e) else: queue.put(result) process = billiard.Process(target=do_clone, args=(queue, origin, destination)) process.start() while True: try: result = queue.get(timeout=0.1) break except Empty: duration = time.monotonic() - start if timeout and duration > timeout: log.warning( "Timeout cloning `%s` within %s seconds", origin, timeout ) process.terminate() process.join() raise CloneTimeoutError(origin, timeout) continue process.join() if isinstance(result, Exception): raise result from None return result - def prepare(self, *args, **kwargs): + def prepare(self): """Prepare the necessary steps to load an actual remote or local repository. To load a local repository, pass the optional directory parameter as filled with a path to a real local folder. To load a remote repository, pass the optional directory parameter as None. Args: origin_url (str): Origin url to load visit_date (str/datetime): Date of the visit directory (str/None): The local directory to load """ self.branches = {} self.tags = [] self.releases = {} self.node_2_rev = {} self.heads = {} directory = self.directory if not directory: # remote repository self.working_directory = mkdtemp( prefix=TEMPORARY_DIR_PREFIX_PATTERN, suffix="-%s" % os.getpid(), dir=self.temp_directory, ) os.makedirs(self.working_directory, exist_ok=True) self.hgdir = self.working_directory self.log.debug( "Cloning %s to %s with timeout %s seconds", self.origin_url, self.hgdir, self.clone_timeout, ) try: self.clone_with_timeout( self.log, self.origin_url, self.hgdir, self.clone_timeout ) except CommandErrorWrapper as e: for msg in [ b"does not appear to be an hg repository", b"404: not found", b"name or service not known", ]: if msg in e.err.lower(): raise NotFound(e.args[0]) from None raise e else: # local repository self.working_directory = None self.hgdir = directory self.bundle_path = os.path.join(self.hgdir, self.bundle_filename) self.log.debug("Bundling at %s" % self.bundle_path) with hglib.open(self.hgdir) as repo: self.heads = self.get_heads(repo) repo.bundle(bytes(self.bundle_path, "utf-8"), all=True, type=b"none-v2") self.cache_filename1 = os.path.join( self.hgdir, "swh-cache-1-%s" % (hex(random.randint(0, 0xFFFFFF))[2:],) ) self.cache_filename2 = os.path.join( self.hgdir, "swh-cache-2-%s" % (hex(random.randint(0, 0xFFFFFF))[2:],) ) try: self.br = Bundle20Reader( bundlefile=self.bundle_path, cache_filename=self.cache_filename1, cache_size=self.cache1_size, ) except FileNotFoundError: # Empty repository! Still a successful visit targeting an # empty snapshot self.log.warn("%s is an empty repository!" % self.hgdir) self.empty_repository = True else: self.reduce_effort = set() if self.reduce_effort_flag: now = datetime.datetime.now(tz=datetime.timezone.utc) if (now - self.visit_date).days > 1: # Assuming that self.visit_date would be today for # a new visit, treat older visit dates as # indication of wanting to skip some processing # effort. for header, commit in self.br.yield_all_changesets(): ts = commit["time"].timestamp() if ts < self.visit_date.timestamp(): self.reduce_effort.add(header["node"]) def has_contents(self): return not self.empty_repository def has_directories(self): return not self.empty_repository def has_revisions(self): return not self.empty_repository def has_releases(self): return not self.empty_repository def fetch_data(self): """Fetch the data from the data source.""" pass def get_contents(self) -> Iterable[BaseContent]: """Get the contents that need to be loaded.""" # NOTE: This method generates blobs twice to reduce memory usage # without generating disk writes. self.file_node_to_hash = {} hash_to_info = {} self.num_contents = 0 contents: Dict[bytes, BaseContent] = {} missing_contents = set() for blob, node_info in self.br.yield_all_blobs(): self.num_contents += 1 file_name = node_info[0] header = node_info[2] length = len(blob) if header["linknode"] in self.reduce_effort: algorithms = set([ALGO]) else: algorithms = DEFAULT_ALGORITHMS h = MultiHash.from_data(blob, hash_names=algorithms) content = h.digest() content["length"] = length blob_hash = content[ALGO] self.file_node_to_hash[header["node"]] = blob_hash if header["linknode"] in self.reduce_effort: continue hash_to_info[blob_hash] = node_info if self.max_content_size is not None and length >= self.max_content_size: contents[blob_hash] = SkippedContent( status="absent", reason="Content too large", **content ) else: contents[blob_hash] = Content(data=blob, status="visible", **content) if file_name == b".hgtags": # https://www.mercurial-scm.org/wiki/GitConcepts#Tag_model # overwrite until the last one self.tags = (t for t in blob.split(b"\n") if t != b"") if contents: missing_contents = set( self.storage.content_missing( [c.to_dict() for c in contents.values()], key_hash=ALGO ) ) # Clusters needed blobs by file offset and then only fetches the # groups at the needed offsets. focs: Dict[int, Dict[bytes, bytes]] = {} # "file/offset/contents" for blob_hash in missing_contents: _, file_offset, header = hash_to_info[blob_hash] focs.setdefault(file_offset, {}) focs[file_offset][header["node"]] = blob_hash for offset, node_hashes in sorted(focs.items()): for header, data, *_ in self.br.yield_group_objects(group_offset=offset): node = header["node"] if node in node_hashes: blob, meta = self.br.extract_meta_from_blob(data) content = contents.pop(node_hashes[node], None) if content: if ( self.max_content_size is not None and len(blob) >= self.max_content_size ): yield SkippedContent.from_data( blob, reason="Content too large" ) else: yield Content.from_data(blob) def load_directories(self): """This is where the work is done to convert manifest deltas from the repository bundle into SWH directories. """ self.mnode_to_tree_id = {} cache_hints = self.br.build_manifest_hints() def tree_size(t): return t.size() self.trees = SelectiveCache( cache_hints=cache_hints, size_function=tree_size, filename=self.cache_filename2, max_size=self.cache2_size, ) tree = SimpleTree() for header, added, removed in self.br.yield_all_manifest_deltas(cache_hints): node = header["node"] basenode = header["basenode"] tree = self.trees.fetch(basenode) or tree # working tree for path in removed.keys(): tree = tree.remove_tree_node_for_path(path) for path, info in added.items(): file_node, is_symlink, perms_code = info tree = tree.add_blob( path, self.file_node_to_hash[file_node], is_symlink, perms_code ) if header["linknode"] in self.reduce_effort: self.trees.store(node, tree) else: new_dirs = [] self.mnode_to_tree_id[node] = tree.hash_changed(new_dirs) self.trees.store(node, tree) yield header, tree, new_dirs def get_directories(self) -> Iterable[Directory]: """Compute directories to load """ dirs: Dict[Sha1Git, Directory] = {} self.num_directories = 0 for _, _, new_dirs in self.load_directories(): for d in new_dirs: self.num_directories += 1 dirs[d["id"]] = Directory.from_dict(d) missing_dirs: List[Sha1Git] = list(dirs.keys()) if missing_dirs: missing_dirs = list(self.storage.directory_missing(missing_dirs)) for _id in missing_dirs: yield dirs[_id] def get_revisions(self) -> Iterable[Revision]: """Compute revisions to load """ revisions = {} self.num_revisions = 0 for header, commit in self.br.yield_all_changesets(): if header["node"] in self.reduce_effort: continue self.num_revisions += 1 date_dict = identifiers.normalize_timestamp(int(commit["time"].timestamp())) author_dict = converters.parse_author(commit["user"]) if commit["manifest"] == Bundle20Reader.NAUGHT_NODE: directory_id = SimpleTree().hash_changed() else: directory_id = self.mnode_to_tree_id[commit["manifest"]] extra_headers = [ ( b"time_offset_seconds", str(commit["time_offset_seconds"]).encode("utf-8"), ) ] extra = commit.get("extra") if extra: for e in extra.split(b"\x00"): k, v = e.split(b":", 1) # transplant_source stores binary reference to a changeset # prefer to dump hexadecimal one in the revision metadata if k == b"transplant_source": v = hash_to_bytehex(v) extra_headers.append((k, v)) parents = [] p1 = self.node_2_rev.get(header["p1"]) p2 = self.node_2_rev.get(header["p2"]) if p1: parents.append(p1) if p2: parents.append(p2) revision = Revision( author=Person.from_dict(author_dict), date=TimestampWithTimezone.from_dict(date_dict), committer=Person.from_dict(author_dict), committer_date=TimestampWithTimezone.from_dict(date_dict), type=RevisionType.MERCURIAL, directory=directory_id, message=commit["message"], metadata={"node": hash_to_hex(header["node"]),}, extra_headers=tuple(extra_headers), synthetic=False, parents=tuple(parents), ) self.node_2_rev[header["node"]] = revision.id revisions[revision.id] = revision # Converts heads to use swh ids self.heads = { branch_name: (pointer_nature, self.node_2_rev[node_id]) for branch_name, (pointer_nature, node_id) in self.heads.items() } missing_revs = set(revisions.keys()) if missing_revs: missing_revs = set(self.storage.revision_missing(list(missing_revs))) for rev in missing_revs: yield revisions[rev] self.mnode_to_tree_id = None def _read_tag(self, tag, split_byte=b" "): node, *name = tag.split(split_byte) name = split_byte.join(name) return node, name def get_releases(self) -> Iterable[Release]: """Get the releases that need to be loaded.""" self.num_releases = 0 releases = {} missing_releases = set() for t in self.tags: self.num_releases += 1 node, name = self._read_tag(t) node = node.decode() node_bytes = hash_to_bytes(node) if not TAG_PATTERN.match(node): self.log.warn("Wrong pattern (%s) found in tags. Skipping" % (node,)) continue if node_bytes not in self.node_2_rev: self.log.warn( "No matching revision for tag %s " "(hg changeset: %s). Skipping" % (name.decode(), node) ) continue tgt_rev = self.node_2_rev[node_bytes] release = Release( name=name, target=tgt_rev, target_type=ObjectType.REVISION, message=None, metadata=None, synthetic=False, author=Person(name=None, email=None, fullname=b""), date=None, ) missing_releases.add(release.id) releases[release.id] = release self.releases[name] = release.id if missing_releases: missing_releases = set(self.storage.release_missing(list(missing_releases))) for _id in missing_releases: yield releases[_id] def get_snapshot(self) -> Snapshot: """Get the snapshot that need to be loaded.""" branches: Dict[bytes, Optional[SnapshotBranch]] = {} for name, (pointer_nature, target) in self.heads.items(): branches[name] = SnapshotBranch( target=target, target_type=TargetType.REVISION ) if pointer_nature == HEAD_POINTER_NAME: branches[b"HEAD"] = SnapshotBranch( target=name, target_type=TargetType.ALIAS ) for name, target in self.releases.items(): branches[name] = SnapshotBranch( target=target, target_type=TargetType.RELEASE ) self.snapshot = Snapshot(branches=branches) return self.snapshot def get_fetch_history_result(self): """Return the data to store in fetch_history.""" return { "contents": self.num_contents, "directories": self.num_directories, "revisions": self.num_revisions, "releases": self.num_releases, } def load_status(self): snapshot = self.get_snapshot() load_status = "eventful" if self.last_snapshot_id is not None and self.last_snapshot_id == snapshot.id: load_status = "uneventful" return { "status": load_status, } class HgArchiveBundle20Loader(HgBundle20Loader): """Mercurial loader for repository wrapped within archives. """ - def __init__(self, url, visit_date=None, archive_path=None): + def __init__( + self, + storage: StorageInterface, + url: str, + visit_date: Optional[datetime.datetime] = None, + archive_path=None, + temp_directory: str = "/tmp", + max_content_size: Optional[int] = None, + ): super().__init__( - url, + storage=storage, + url=url, visit_date=visit_date, logging_class="swh.loader.mercurial.HgArchiveBundle20Loader", + temp_directory=temp_directory, + max_content_size=max_content_size, ) - self.temp_dir = None + self.archive_extract_temp_dir = None self.archive_path = archive_path - def prepare(self, *args, **kwargs): - self.temp_dir = tmp_extract( + def prepare(self): + self.archive_extract_temp_dir = tmp_extract( archive=self.archive_path, dir=self.temp_directory, prefix=TEMPORARY_DIR_PREFIX_PATTERN, suffix=".dump-%s" % os.getpid(), log=self.log, source=self.origin_url, ) - repo_name = os.listdir(self.temp_dir)[0] - self.directory = os.path.join(self.temp_dir, repo_name) - super().prepare(*args, **kwargs) - - def cleanup(self): - if self.temp_dir and os.path.exists(self.temp_dir): - rmtree(self.temp_dir) - super().cleanup() + repo_name = os.listdir(self.archive_extract_temp_dir)[0] + self.directory = os.path.join(self.archive_extract_temp_dir, repo_name) + super().prepare() diff --git a/swh/loader/mercurial/tasks.py b/swh/loader/mercurial/tasks.py index 96a35f1..71fb259 100644 --- a/swh/loader/mercurial/tasks.py +++ b/swh/loader/mercurial/tasks.py @@ -1,33 +1,44 @@ -# Copyright (C) 2017-2019 The Software Heritage developers +# Copyright (C) 2017-2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information +from typing import Optional + from celery import shared_task +from swh.loader.mercurial.utils import parse_visit_date + from .loader import HgArchiveBundle20Loader, HgBundle20Loader @shared_task(name=__name__ + ".LoadMercurial") -def load_hg(*, url, directory=None, visit_date=None): +def load_hg( + *, url: str, directory: Optional[str] = None, visit_date: Optional[str] = None +): """Mercurial repository loading Import a mercurial tarball into swh. - Args: see :func:`DepositLoader.load`. + Args: see :func:`HgBundle20Loader.load`. """ - loader = HgBundle20Loader(url, directory=directory, visit_date=visit_date) + + loader = HgBundle20Loader.from_configfile( + url=url, directory=directory, visit_date=parse_visit_date(visit_date) + ) return loader.load() @shared_task(name=__name__ + ".LoadArchiveMercurial") -def load_hg_from_archive(*, url, archive_path=None, visit_date=None): +def load_hg_from_archive( + *, url: str, archive_path: Optional[str] = None, visit_date: Optional[str] = None +): """Import a mercurial tarball into swh. - Args: see :func:`DepositLoader.load`. + Args: see :func:`HgArchiveBundle20Loader.load`. """ - loader = HgArchiveBundle20Loader( - url, archive_path=archive_path, visit_date=visit_date + loader = HgArchiveBundle20Loader.from_configfile( + url=url, archive_path=archive_path, visit_date=parse_visit_date(visit_date) ) return loader.load() diff --git a/swh/loader/mercurial/tasks_from_disk.py b/swh/loader/mercurial/tasks_from_disk.py index 56c01eb..07ff8e1 100644 --- a/swh/loader/mercurial/tasks_from_disk.py +++ b/swh/loader/mercurial/tasks_from_disk.py @@ -1,33 +1,43 @@ -# Copyright (C) 2020 The Software Heritage developers +# Copyright (C) 2020-2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information +from typing import Optional + from celery import shared_task +from swh.loader.mercurial.utils import parse_visit_date + from .from_disk import HgArchiveLoaderFromDisk, HgLoaderFromDisk @shared_task(name=__name__ + ".LoadMercurialFromDisk") -def load_hg(*, url, directory=None, visit_date=None): +def load_hg( + *, url: str, directory: Optional[str] = None, visit_date: Optional[str] = None +): """Mercurial repository loading Import a mercurial tarball into swh. - Args: see :func:`DepositLoader.load`. + Args: see :func:`HgLoaderFromDisk` constructor. """ - loader = HgLoaderFromDisk(url, directory=directory, visit_date=visit_date) + loader = HgLoaderFromDisk.from_configfile( + url=url, directory=directory, visit_date=parse_visit_date(visit_date) + ) return loader.load() @shared_task(name=__name__ + ".LoadArchiveMercurialFromDisk") -def load_hg_from_archive(*, url, archive_path=None, visit_date=None): +def load_hg_from_archive( + *, url: str, archive_path: Optional[str] = None, visit_date: Optional[str] = None +): """Import a mercurial tarball into swh. - Args: see :func:`DepositLoader.load`. + Args: see :func:`HgArchiveLoaderFromDisk` constructor. """ - loader = HgArchiveLoaderFromDisk( - url, archive_path=archive_path, visit_date=visit_date + loader = HgArchiveLoaderFromDisk.from_configfile( + url=url, archive_path=archive_path, visit_date=parse_visit_date(visit_date) ) return loader.load() diff --git a/swh/loader/mercurial/tests/conftest.py b/swh/loader/mercurial/tests/conftest.py index 8198e13..ee9c3f8 100644 --- a/swh/loader/mercurial/tests/conftest.py +++ b/swh/loader/mercurial/tests/conftest.py @@ -1,41 +1,39 @@ -# Copyright (C) 2019-2020 The Software Heritage developers +# Copyright (C) 2019-2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from typing import Any, Dict import pytest @pytest.fixture -def swh_loader_config(swh_storage_backend_config, tmp_path) -> Dict[str, Any]: - swh_storage_backend_config["journal_writer"] = {} +def swh_storage_backend_config(swh_storage_backend_config): + """Basic pg storage configuration with no journal collaborator + (to avoid pulling optional dependency on clients of this fixture) + + """ return { + "cls": "filter", "storage": { - "cls": "pipeline", - "steps": [ - {"cls": "filter"}, - { - "cls": "buffer", - "min_batch_size": { - "content": 10000, - "content_bytes": 1073741824, - "directory": 2500, - "revision": 10, - "release": 100, - }, - }, - swh_storage_backend_config, - ], + "cls": "buffer", + "min_batch_size": { + "content": 10, + "content_bytes": 100 * 1024 * 1024, + "directory": 10, + "revision": 10, + "release": 10, + }, + "storage": swh_storage_backend_config, }, - "bundle_filename": "HG20_none_bundle", - "cache1_size": 838860800, - "cache2_size": 838860800, - "clone_timeout_seconds": 2 * 3600, - "reduce_effort": False, - "save_data": False, - "save_data_path": "", + } + + +@pytest.fixture +def swh_loader_config(swh_storage_backend_config, tmp_path) -> Dict[str, Any]: + return { + "storage": swh_storage_backend_config, "max_content_size": 104857600, "temp_directory": str(tmp_path), } diff --git a/swh/loader/mercurial/tests/test_from_disk.py b/swh/loader/mercurial/tests/test_from_disk.py index 340bac4..d38eb39 100644 --- a/swh/loader/mercurial/tests/test_from_disk.py +++ b/swh/loader/mercurial/tests/test_from_disk.py @@ -1,243 +1,248 @@ -# Copyright (C) 2020 The Software Heritage developers +# Copyright (C) 2020-2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information -import os from datetime import datetime from hashlib import sha1 +import os +from swh.loader.mercurial.utils import parse_visit_date from swh.loader.tests import ( assert_last_visit_matches, check_snapshot, get_stats, prepare_repository_from_archive, ) from swh.model.from_disk import Content, DentryPerms from swh.model.hashutil import hash_to_bytes from swh.model.model import RevisionType, Snapshot, SnapshotBranch, TargetType from swh.storage.algos.snapshot import snapshot_get_latest from ..from_disk import HgDirectory, HgLoaderFromDisk from .loader_checker import ExpectedSwhids, LoaderChecker +VISIT_DATE = parse_visit_date("2016-05-03 15:16:32+00") +assert VISIT_DATE is not None + def random_content() -> Content: """Create minimal content object.""" data = str(datetime.now()).encode() return Content({"sha1_git": sha1(data).digest(), "perms": DentryPerms.content}) def test_hg_directory_creates_missing_directories(): directory = HgDirectory() directory[b"path/to/some/content"] = random_content() def test_hg_directory_get(): content = random_content() directory = HgDirectory() assert directory.get(b"path/to/content") is None assert directory.get(b"path/to/content", content) == content directory[b"path/to/content"] = content assert directory.get(b"path/to/content") == content def test_hg_directory_deletes_empty_directories(): directory = HgDirectory() content = random_content() directory[b"path/to/content"] = content directory[b"path/to/some/deep/content"] = random_content() del directory[b"path/to/some/deep/content"] assert directory.get(b"path/to/some/deep") is None assert directory.get(b"path/to/some") is None assert directory.get(b"path/to/content") == content def test_hg_directory_when_directory_replaces_file(): directory = HgDirectory() directory[b"path/to/some"] = random_content() directory[b"path/to/some/content"] = random_content() # Those tests assert expectations on repository loading # by reading expected values from associated json files # produced by the `swh-hg-identify` command line utility. # # It has more granularity than historical tests. # Assertions will tell if the error comes from the directories # revisions or release rather than only checking the snapshot. # # With more work it should event be possible to know which part # of an object is faulty. -def test_examples(swh_config, datadir, tmp_path): +def test_examples(swh_storage, datadir, tmp_path): for archive_name in ("hello", "transplant", "the-sandbox", "example"): archive_path = os.path.join(datadir, f"{archive_name}.tgz") json_path = os.path.join(datadir, f"{archive_name}.json") repo_url = prepare_repository_from_archive(archive_path, archive_name, tmp_path) LoaderChecker( - loader=HgLoaderFromDisk(repo_url), expected=ExpectedSwhids.load(json_path), + loader=HgLoaderFromDisk(swh_storage, repo_url), + expected=ExpectedSwhids.load(json_path), ).check() # This test has as been adapted from the historical `HgBundle20Loader` tests # to ensure compatibility of `HgLoaderFromDisk`. # Hashes as been produced by copy pasting the result of the implementation # to prevent regressions. -def test_loader_hg_new_visit_no_release(swh_config, datadir, tmp_path): +def test_loader_hg_new_visit_no_release(swh_storage, datadir, tmp_path): """Eventful visit should yield 1 snapshot""" archive_name = "the-sandbox" archive_path = os.path.join(datadir, f"{archive_name}.tgz") repo_url = prepare_repository_from_archive(archive_path, archive_name, tmp_path) - loader = HgLoaderFromDisk(url=repo_url) + loader = HgLoaderFromDisk(swh_storage, url=repo_url) assert loader.load() == {"status": "eventful"} tip_revision_develop = "a9c4534552df370f43f0ef97146f393ef2f2a08c" tip_revision_default = "70e750bb046101fdced06f428e73fee471509c56" expected_snapshot = Snapshot( id=hash_to_bytes("3b8fe58e467deb7597b12a5fd3b2c096b8c02028"), branches={ b"develop": SnapshotBranch( target=hash_to_bytes(tip_revision_develop), target_type=TargetType.REVISION, ), b"default": SnapshotBranch( target=hash_to_bytes(tip_revision_default), target_type=TargetType.REVISION, ), b"HEAD": SnapshotBranch(target=b"develop", target_type=TargetType.ALIAS,), }, ) assert_last_visit_matches( loader.storage, repo_url, status="full", type="hg", snapshot=expected_snapshot.id, ) check_snapshot(expected_snapshot, loader.storage) stats = get_stats(loader.storage) assert stats == { "content": 2, "directory": 3, "origin": 1, "origin_visit": 1, "release": 0, "revision": 58, "skipped_content": 0, "snapshot": 1, } # This test has as been adapted from the historical `HgBundle20Loader` tests # to ensure compatibility of `HgLoaderFromDisk`. # Hashes as been produced by copy pasting the result of the implementation # to prevent regressions. -def test_loader_hg_new_visit_with_release(swh_config, datadir, tmp_path): +def test_loader_hg_new_visit_with_release(swh_storage, datadir, tmp_path): """Eventful visit with release should yield 1 snapshot""" archive_name = "hello" archive_path = os.path.join(datadir, f"{archive_name}.tgz") repo_url = prepare_repository_from_archive(archive_path, archive_name, tmp_path) - loader = HgLoaderFromDisk(url=repo_url, visit_date="2016-05-03 15:16:32+00") + loader = HgLoaderFromDisk(swh_storage, url=repo_url, visit_date=VISIT_DATE,) actual_load_status = loader.load() assert actual_load_status == {"status": "eventful"} # then stats = get_stats(loader.storage) assert stats == { "content": 3, "directory": 3, "origin": 1, "origin_visit": 1, "release": 1, "revision": 3, "skipped_content": 0, "snapshot": 1, } # cf. test_loader.org for explaining from where those hashes tip_release = hash_to_bytes("515c4d72e089404356d0f4b39d60f948b8999140") release = loader.storage.release_get([tip_release])[0] assert release is not None tip_revision_default = hash_to_bytes("c3dbe4fbeaaa98dd961834e4007edb3efb0e2a27") revision = loader.storage.revision_get([tip_revision_default])[0] assert revision is not None expected_snapshot = Snapshot( id=hash_to_bytes("d35668e02e2ba4321dc951cd308cf883786f918a"), branches={ b"default": SnapshotBranch( target=tip_revision_default, target_type=TargetType.REVISION, ), b"0.1": SnapshotBranch(target=tip_release, target_type=TargetType.RELEASE,), b"HEAD": SnapshotBranch(target=b"default", target_type=TargetType.ALIAS,), }, ) check_snapshot(expected_snapshot, loader.storage) assert_last_visit_matches( loader.storage, repo_url, type=RevisionType.MERCURIAL.value, status="full", snapshot=expected_snapshot.id, ) # This test has as been adapted from the historical `HgBundle20Loader` tests # to ensure compatibility of `HgLoaderFromDisk`. # Hashes as been produced by copy pasting the result of the implementation # to prevent regressions. -def test_visit_repository_with_transplant_operations(swh_config, datadir, tmp_path): +def test_visit_repository_with_transplant_operations(swh_storage, datadir, tmp_path): """Visit a mercurial repository visit transplant operations within should yield a snapshot as well. """ archive_name = "transplant" archive_path = os.path.join(datadir, f"{archive_name}.tgz") repo_url = prepare_repository_from_archive(archive_path, archive_name, tmp_path) - loader = HgLoaderFromDisk(url=repo_url, visit_date="2016-05-03 15:16:32+00") + loader = HgLoaderFromDisk(swh_storage, url=repo_url, visit_date=VISIT_DATE,) # load hg repository actual_load_status = loader.load() assert actual_load_status == {"status": "eventful"} # collect swh revisions assert_last_visit_matches( loader.storage, repo_url, type=RevisionType.MERCURIAL.value, status="full" ) revisions = [] snapshot = snapshot_get_latest(loader.storage, repo_url) for branch in snapshot.branches.values(): if branch.target_type.value != "revision": continue revisions.append(branch.target) # extract original changesets info and the transplant sources hg_changesets = set() transplant_sources = set() for rev in loader.storage.revision_log(revisions): hg_changesets.add(rev["metadata"]["node"]) for k, v in rev["extra_headers"]: if k == b"transplant_source": transplant_sources.add(v.decode("ascii")) # check extracted data are valid assert len(hg_changesets) > 0 assert len(transplant_sources) > 0 assert transplant_sources.issubset(hg_changesets) diff --git a/swh/loader/mercurial/tests/test_loader.py b/swh/loader/mercurial/tests/test_loader.py index bbfbdb7..9dd127a 100644 --- a/swh/loader/mercurial/tests/test_loader.py +++ b/swh/loader/mercurial/tests/test_loader.py @@ -1,369 +1,369 @@ -# Copyright (C) 2018-2020 The Software Heritage developers +# Copyright (C) 2018-2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import copy import logging import os import time import hglib from hglib.error import CommandError import pytest +from swh.loader.mercurial.utils import parse_visit_date from swh.loader.tests import ( assert_last_visit_matches, check_snapshot, get_stats, prepare_repository_from_archive, ) from swh.model.hashutil import hash_to_bytes from swh.model.model import RevisionType, Snapshot, SnapshotBranch, TargetType from swh.storage.algos.snapshot import snapshot_get_latest from ..loader import CloneTimeoutError, HgArchiveBundle20Loader, HgBundle20Loader +VISIT_DATE = parse_visit_date("2016-05-03 15:16:32+00") +assert VISIT_DATE is not None -def test_loader_hg_new_visit_no_release(swh_config, datadir, tmp_path): + +def test_loader_hg_new_visit_no_release(swh_storage, datadir, tmp_path): """Eventful visit should yield 1 snapshot""" archive_name = "the-sandbox" archive_path = os.path.join(datadir, f"{archive_name}.tgz") repo_url = prepare_repository_from_archive(archive_path, archive_name, tmp_path) - loader = HgBundle20Loader(repo_url) + loader = HgBundle20Loader(swh_storage, repo_url) assert loader.load() == {"status": "eventful"} tip_revision_develop = "a9c4534552df370f43f0ef97146f393ef2f2a08c" tip_revision_default = "70e750bb046101fdced06f428e73fee471509c56" expected_snapshot = Snapshot( id=hash_to_bytes("3b8fe58e467deb7597b12a5fd3b2c096b8c02028"), branches={ b"develop": SnapshotBranch( target=hash_to_bytes(tip_revision_develop), target_type=TargetType.REVISION, ), b"default": SnapshotBranch( target=hash_to_bytes(tip_revision_default), target_type=TargetType.REVISION, ), b"HEAD": SnapshotBranch(target=b"develop", target_type=TargetType.ALIAS,), }, ) assert_last_visit_matches( - loader.storage, + swh_storage, repo_url, status="full", type="hg", snapshot=expected_snapshot.id, ) - check_snapshot(expected_snapshot, loader.storage) + check_snapshot(expected_snapshot, swh_storage) - stats = get_stats(loader.storage) + stats = get_stats(swh_storage) assert stats == { "content": 2, "directory": 3, "origin": 1, "origin_visit": 1, "release": 0, "revision": 58, "skipped_content": 0, "snapshot": 1, } # Ensure archive loader yields the same snapshot loader2 = HgArchiveBundle20Loader( - url=archive_path, - archive_path=archive_path, - visit_date="2016-05-03 15:16:32+00", + swh_storage, url=archive_path, archive_path=archive_path, visit_date=VISIT_DATE, ) actual_load_status = loader2.load() assert actual_load_status == {"status": "eventful"} stats2 = get_stats(loader2.storage) expected_stats = copy.deepcopy(stats) expected_stats["origin"] += 1 expected_stats["origin_visit"] += 1 assert stats2 == expected_stats # That visit yields the same snapshot assert_last_visit_matches( loader2.storage, archive_path, status="full", type="hg", snapshot=expected_snapshot.id, ) -def test_loader_hg_new_visit_with_release(swh_config, datadir, tmp_path): +def test_loader_hg_new_visit_with_release(swh_storage, datadir, tmp_path): """Eventful visit with release should yield 1 snapshot""" archive_name = "hello" archive_path = os.path.join(datadir, f"{archive_name}.tgz") repo_url = prepare_repository_from_archive(archive_path, archive_name, tmp_path) - loader = HgBundle20Loader(url=repo_url, visit_date="2016-05-03 15:16:32+00",) + loader = HgBundle20Loader(swh_storage, url=repo_url, visit_date=VISIT_DATE,) actual_load_status = loader.load() assert actual_load_status == {"status": "eventful"} # then - stats = get_stats(loader.storage) + stats = get_stats(swh_storage) assert stats == { "content": 3, "directory": 3, "origin": 1, "origin_visit": 1, "release": 1, "revision": 3, "skipped_content": 0, "snapshot": 1, } # cf. test_loader.org for explaining from where those hashes tip_release = hash_to_bytes("515c4d72e089404356d0f4b39d60f948b8999140") - release = loader.storage.release_get([tip_release])[0] + release = swh_storage.release_get([tip_release])[0] assert release is not None tip_revision_default = hash_to_bytes("c3dbe4fbeaaa98dd961834e4007edb3efb0e2a27") - revision = loader.storage.revision_get([tip_revision_default])[0] + revision = swh_storage.revision_get([tip_revision_default])[0] assert revision is not None expected_snapshot = Snapshot( id=hash_to_bytes("d35668e02e2ba4321dc951cd308cf883786f918a"), branches={ b"default": SnapshotBranch( target=tip_revision_default, target_type=TargetType.REVISION, ), b"0.1": SnapshotBranch(target=tip_release, target_type=TargetType.RELEASE,), b"HEAD": SnapshotBranch(target=b"default", target_type=TargetType.ALIAS,), }, ) - check_snapshot(expected_snapshot, loader.storage) + check_snapshot(expected_snapshot, swh_storage) assert_last_visit_matches( - loader.storage, + swh_storage, repo_url, type=RevisionType.MERCURIAL.value, status="full", snapshot=expected_snapshot.id, ) # Ensure archive loader yields the same snapshot loader2 = HgArchiveBundle20Loader( - url=archive_path, - archive_path=archive_path, - visit_date="2016-05-03 15:16:32+00", + swh_storage, url=archive_path, archive_path=archive_path, visit_date=VISIT_DATE, ) actual_load_status = loader2.load() assert actual_load_status == {"status": "eventful"} stats2 = get_stats(loader2.storage) expected_stats = copy.deepcopy(stats) expected_stats["origin"] += 1 expected_stats["origin_visit"] += 1 assert stats2 == expected_stats # That visit yields the same snapshot assert_last_visit_matches( loader2.storage, archive_path, status="full", type="hg", snapshot=expected_snapshot.id, ) -def test_visit_with_archive_decompression_failure(swh_config, mocker, datadir): +def test_visit_with_archive_decompression_failure(swh_storage, mocker, datadir): """Failure to decompress should fail early, no data is ingested""" mock_patoo = mocker.patch("swh.loader.mercurial.archive_extract.patoolib") mock_patoo.side_effect = ValueError archive_name = "hello" archive_path = os.path.join(datadir, f"{archive_name}.tgz") loader = HgArchiveBundle20Loader( - url=archive_path, visit_date="2016-05-03 15:16:32+00", + swh_storage, url=archive_path, visit_date=VISIT_DATE, ) actual_load_status = loader.load() assert actual_load_status == {"status": "failed"} - stats = get_stats(loader.storage) + stats = get_stats(swh_storage) assert stats == { "content": 0, "directory": 0, "origin": 1, "origin_visit": 1, "release": 0, "revision": 0, "skipped_content": 0, "snapshot": 0, } # That visit yields the same snapshot assert_last_visit_matches( - loader.storage, archive_path, status="failed", type="hg", snapshot=None + swh_storage, archive_path, status="failed", type="hg", snapshot=None ) -def test_visit_error_with_snapshot_partial(swh_config, datadir, tmp_path, mocker): +def test_visit_error_with_snapshot_partial(swh_storage, datadir, tmp_path, mocker): """Incomplete ingestion leads to a 'partial' ingestion status""" mock = mocker.patch("swh.loader.mercurial.loader.HgBundle20Loader.store_metadata") mock.side_effect = ValueError archive_name = "the-sandbox" archive_path = os.path.join(datadir, f"{archive_name}.tgz") repo_url = prepare_repository_from_archive(archive_path, archive_name, tmp_path) - loader = HgBundle20Loader(repo_url) + loader = HgBundle20Loader(swh_storage, repo_url) assert loader.load() == {"status": "failed"} assert_last_visit_matches( - loader.storage, + swh_storage, repo_url, status="partial", type="hg", snapshot=hash_to_bytes("3b8fe58e467deb7597b12a5fd3b2c096b8c02028"), ) @pytest.mark.parametrize( "error_msg", [ b"does not appear to be an HG repository", b"404: Not Found", b"404: NOT FOUND", b"Name or service not known", ], ) def test_visit_error_with_status_not_found( - swh_config, datadir, tmp_path, mocker, error_msg + swh_storage, datadir, tmp_path, mocker, error_msg ): """Not reaching the repo leads to a 'not_found' ingestion status""" mock = mocker.patch("hglib.clone") mock.side_effect = CommandError((), 255, b"", error_msg) archive_name = "the-sandbox" archive_path = os.path.join(datadir, f"{archive_name}.tgz") repo_url = prepare_repository_from_archive(archive_path, archive_name, tmp_path) - loader = HgBundle20Loader(repo_url) + loader = HgBundle20Loader(swh_storage, repo_url) assert loader.load() == {"status": "uneventful"} assert_last_visit_matches( - loader.storage, repo_url, status="not_found", type="hg", snapshot=None, + swh_storage, repo_url, status="not_found", type="hg", snapshot=None, ) -def test_visit_error_with_clone_error(swh_config, datadir, tmp_path, mocker): +def test_visit_error_with_clone_error(swh_storage, datadir, tmp_path, mocker): """Testing failures other than 'not_found'""" mock = mocker.patch("hglib.clone") mock.side_effect = CommandError((), 255, b"", b"out of disk space") archive_name = "the-sandbox" archive_path = os.path.join(datadir, f"{archive_name}.tgz") repo_url = prepare_repository_from_archive(archive_path, archive_name, tmp_path) - loader = HgBundle20Loader(repo_url) + loader = HgBundle20Loader(swh_storage, repo_url) assert loader.load() == {"status": "failed"} assert_last_visit_matches( - loader.storage, repo_url, status="failed", type="hg", snapshot=None, + swh_storage, repo_url, status="failed", type="hg", snapshot=None, ) -def test_visit_repository_with_transplant_operations(swh_config, datadir, tmp_path): +def test_visit_repository_with_transplant_operations(swh_storage, datadir, tmp_path): """Visit a mercurial repository visit transplant operations within should yield a snapshot as well. """ archive_name = "transplant" archive_path = os.path.join(datadir, f"{archive_name}.tgz") repo_url = prepare_repository_from_archive(archive_path, archive_name, tmp_path) - loader = HgBundle20Loader(url=repo_url, visit_date="2019-05-23 12:06:00+00",) + loader = HgBundle20Loader(swh_storage, url=repo_url, visit_date=VISIT_DATE,) # load hg repository actual_load_status = loader.load() assert actual_load_status == {"status": "eventful"} # collect swh revisions assert_last_visit_matches( - loader.storage, repo_url, type=RevisionType.MERCURIAL.value, status="full" + swh_storage, repo_url, type=RevisionType.MERCURIAL.value, status="full" ) revisions = [] - snapshot = snapshot_get_latest(loader.storage, repo_url) + snapshot = snapshot_get_latest(swh_storage, repo_url) for branch in snapshot.branches.values(): if branch.target_type.value != "revision": continue revisions.append(branch.target) # extract original changesets info and the transplant sources hg_changesets = set() transplant_sources = set() - for rev in loader.storage.revision_log(revisions): + for rev in swh_storage.revision_log(revisions): hg_changesets.add(rev["metadata"]["node"]) for k, v in rev["extra_headers"]: if k == b"transplant_source": transplant_sources.add(v.decode("ascii")) # check extracted data are valid assert len(hg_changesets) > 0 assert len(transplant_sources) > 0 assert transplant_sources.issubset(hg_changesets) def test_clone_with_timeout_timeout(caplog, tmp_path, monkeypatch): log = logging.getLogger("test_clone_with_timeout") def clone_timeout(source, dest): time.sleep(60) monkeypatch.setattr(hglib, "clone", clone_timeout) with pytest.raises(CloneTimeoutError): HgBundle20Loader.clone_with_timeout( log, "https://www.mercurial-scm.org/repo/hello", tmp_path, 1 ) for record in caplog.records: assert record.levelname == "WARNING" assert "https://www.mercurial-scm.org/repo/hello" in record.getMessage() assert record.args == ("https://www.mercurial-scm.org/repo/hello", 1) def test_clone_with_timeout_returns(caplog, tmp_path, monkeypatch): log = logging.getLogger("test_clone_with_timeout") def clone_return(source, dest): return (source, dest) monkeypatch.setattr(hglib, "clone", clone_return) assert HgBundle20Loader.clone_with_timeout( log, "https://www.mercurial-scm.org/repo/hello", tmp_path, 1 ) == ("https://www.mercurial-scm.org/repo/hello", tmp_path) def test_clone_with_timeout_exception(caplog, tmp_path, monkeypatch): log = logging.getLogger("test_clone_with_timeout") def clone_return(source, dest): raise ValueError("Test exception") monkeypatch.setattr(hglib, "clone", clone_return) with pytest.raises(ValueError) as excinfo: HgBundle20Loader.clone_with_timeout( log, "https://www.mercurial-scm.org/repo/hello", tmp_path, 1 ) assert "Test exception" in excinfo.value.args[0] diff --git a/swh/loader/mercurial/tests/test_utils.py b/swh/loader/mercurial/tests/test_utils.py new file mode 100644 index 0000000..e780b20 --- /dev/null +++ b/swh/loader/mercurial/tests/test_utils.py @@ -0,0 +1,31 @@ +# Copyright (C) 2021 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +from datetime import datetime + +import pytest + +from swh.loader.mercurial.utils import parse_visit_date + +VISIT_DATE_STR = "2021-02-17 15:50:04.518963" +VISIT_DATE = datetime(2021, 2, 17, 15, 50, 4, 518963) + + +@pytest.mark.parametrize( + "input_visit_date,expected_date", + [(None, None), (VISIT_DATE, VISIT_DATE), (VISIT_DATE_STR, VISIT_DATE),], +) +def test_utils_parse_visit_date(input_visit_date, expected_date): + assert parse_visit_date(input_visit_date) == expected_date + + +def test_utils_parse_visit_date_now(): + actual_date = parse_visit_date("now") + assert isinstance(actual_date, datetime) + + +def test_utils_parse_visit_date_fails(): + with pytest.raises(ValueError, match="invalid"): + parse_visit_date(10) # not a string nor a date diff --git a/swh/loader/mercurial/utils.py b/swh/loader/mercurial/utils.py new file mode 100644 index 0000000..9c9ee5d --- /dev/null +++ b/swh/loader/mercurial/utils.py @@ -0,0 +1,29 @@ +# Copyright (C) 2020-2021 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +from datetime import datetime, timezone +from typing import Optional, Union + +import dateutil + + +def parse_visit_date(visit_date: Optional[Union[datetime, str]]) -> Optional[datetime]: + """Convert visit date from either None, a string or a datetime to either None or + datetime. + + """ + if visit_date is None: + return None + + if isinstance(visit_date, datetime): + return visit_date + + if visit_date == "now": + return datetime.now(tz=timezone.utc) + + if isinstance(visit_date, str): + return dateutil.parser.parse(visit_date) + + raise ValueError(f"invalid visit date {visit_date!r}")