Changeset View
Changeset View
Standalone View
Standalone View
swh/loader/mercurial/from_disk.py
# Copyright (C) 2020 The Software Heritage developers | # Copyright (C) 2020-2021 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
import os | |||||
from collections import deque | from collections import deque | ||||
from datetime import datetime, timezone | from datetime import datetime | ||||
import os | |||||
from shutil import rmtree | from shutil import rmtree | ||||
from tempfile import mkdtemp | from tempfile import mkdtemp | ||||
from typing import Any, Deque, Dict, Optional, Tuple, TypeVar, Union | from typing import Deque, Dict, Optional, Tuple, TypeVar, Union | ||||
import dateutil | |||||
from swh.core.config import merge_configs | |||||
from swh.loader.core.loader import BaseLoader | from swh.loader.core.loader import BaseLoader | ||||
from swh.loader.core.utils import clean_dangling_folders | from swh.loader.core.utils import clean_dangling_folders | ||||
from swh.loader.mercurial.utils import parse_visit_date | |||||
from swh.model.from_disk import Content, DentryPerms, Directory | from swh.model.from_disk import Content, DentryPerms, Directory | ||||
from swh.model.hashutil import MultiHash, hash_to_bytehex | from swh.model.hashutil import MultiHash, hash_to_bytehex | ||||
from swh.model.model import Content as ModelContent | |||||
from swh.model.model import ( | from swh.model.model import ( | ||||
ObjectType, | ObjectType, | ||||
Origin, | Origin, | ||||
Person, | Person, | ||||
Release, | Release, | ||||
Revision, | Revision, | ||||
RevisionType, | RevisionType, | ||||
Sha1Git, | Sha1Git, | ||||
Snapshot, | Snapshot, | ||||
SnapshotBranch, | SnapshotBranch, | ||||
TargetType, | TargetType, | ||||
TimestampWithTimezone, | TimestampWithTimezone, | ||||
) | ) | ||||
from swh.model.model import Content as ModelContent | |||||
from swh.storage.interface import StorageInterface | |||||
from . import hgutil | from . import hgutil | ||||
from .archive_extract import tmp_extract | from .archive_extract import tmp_extract | ||||
from .hgutil import HgNodeId | from .hgutil import HgNodeId | ||||
FLAG_PERMS = { | FLAG_PERMS = { | ||||
b"l": DentryPerms.symlink, | b"l": DentryPerms.symlink, | ||||
b"x": DentryPerms.executable_content, | b"x": DentryPerms.executable_content, | ||||
b"": DentryPerms.content, | b"": DentryPerms.content, | ||||
} # type: Dict[bytes, DentryPerms] | } # type: Dict[bytes, DentryPerms] | ||||
DEFAULT_CONFIG: Dict[str, Any] = { | |||||
"temp_directory": "/tmp", | |||||
"clone_timeout_seconds": 7200, | |||||
"content_cache_size": 10_000, | |||||
} | |||||
TEMPORARY_DIR_PREFIX_PATTERN = "swh.loader.mercurial.from_disk" | TEMPORARY_DIR_PREFIX_PATTERN = "swh.loader.mercurial.from_disk" | ||||
T = TypeVar("T") | T = TypeVar("T") | ||||
def parse_visit_date(visit_date: Optional[Union[datetime, str]]) -> Optional[datetime]: | |||||
"""Convert visit date from Optional[Union[str, datetime]] to Optional[datetime]. | |||||
`HgLoaderFromDisk` accepts `str` and `datetime` as visit date | |||||
while `BaseLoader` only deals with `datetime`. | |||||
""" | |||||
if visit_date is None: | |||||
return None | |||||
if isinstance(visit_date, datetime): | |||||
return visit_date | |||||
if visit_date == "now": | |||||
return datetime.now(tz=timezone.utc) | |||||
if isinstance(visit_date, str): | |||||
return dateutil.parser.parse(visit_date) | |||||
return ValueError(f"invalid visit date {visit_date!r}") | |||||
class HgDirectory(Directory): | class HgDirectory(Directory): | ||||
"""A more practical directory. | """A more practical directory. | ||||
- creates missing parent directories | - creates missing parent directories | ||||
- removes empty directories | - removes empty directories | ||||
""" | """ | ||||
def __setitem__(self, path: bytes, value: Union[Content, "HgDirectory"]) -> None: | def __setitem__(self, path: bytes, value: Union[Content, "HgDirectory"]) -> None: | ||||
Show All 33 Lines | class HgLoaderFromDisk(BaseLoader): | ||||
"""Load a mercurial repository from a local repository.""" | """Load a mercurial repository from a local repository.""" | ||||
CONFIG_BASE_FILENAME = "loader/mercurial" | CONFIG_BASE_FILENAME = "loader/mercurial" | ||||
visit_type = "hg" | visit_type = "hg" | ||||
def __init__( | def __init__( | ||||
self, | self, | ||||
storage: StorageInterface, | |||||
url: str, | url: str, | ||||
directory: Optional[str] = None, | directory: Optional[str] = None, | ||||
logging_class: str = "swh.loader.mercurial.LoaderFromDisk", | logging_class: str = "swh.loader.mercurial.LoaderFromDisk", | ||||
visit_date: Optional[Union[datetime, str]] = None, | visit_date: Optional[datetime] = None, | ||||
config: Optional[Dict[str, Any]] = None, | temp_directory: str = "/tmp", | ||||
clone_timeout_seconds: int = 7200, | |||||
content_cache_size: int = 10_000, | |||||
max_content_size: Optional[int] = None, | |||||
): | ): | ||||
"""Initialize the loader. | """Initialize the loader. | ||||
Args: | Args: | ||||
url: url of the repository. | url: url of the repository. | ||||
directory: directory of the local repository. | directory: directory of the local repository. | ||||
logging_class: class of the loader logger. | logging_class: class of the loader logger. | ||||
visit_date: visit date of the repository | visit_date: visit date of the repository | ||||
config: loader configuration | config: loader configuration | ||||
""" | """ | ||||
super().__init__(logging_class=logging_class, config=config or {}) | super().__init__( | ||||
storage=storage, | |||||
logging_class=logging_class, | |||||
max_content_size=max_content_size, | |||||
) | |||||
self.config = merge_configs(DEFAULT_CONFIG, self.config) | self._temp_directory = temp_directory | ||||
self._temp_directory = self.config["temp_directory"] | self._clone_timeout = clone_timeout_seconds | ||||
self._clone_timeout = self.config["clone_timeout_seconds"] | |||||
self.origin_url = url | self.origin_url = url | ||||
self.visit_date = parse_visit_date(visit_date) | self.visit_date = visit_date | ||||
self.directory = directory | self.directory = directory | ||||
self._repo: Optional[hgutil.Repository] = None | self._repo: Optional[hgutil.Repository] = None | ||||
self._revision_nodeid_to_swhid: Dict[HgNodeId, Sha1Git] = {} | self._revision_nodeid_to_swhid: Dict[HgNodeId, Sha1Git] = {} | ||||
self._repo_directory: Optional[str] = None | self._repo_directory: Optional[str] = None | ||||
# keeps the last processed hg nodeid | # keeps the last processed hg nodeid | ||||
# it is used for differential tree update by store_directories | # it is used for differential tree update by store_directories | ||||
# NULLID is the parent of the first revision | # NULLID is the parent of the first revision | ||||
self._last_hg_nodeid = hgutil.NULLID | self._last_hg_nodeid = hgutil.NULLID | ||||
# keeps the last revision tree | # keeps the last revision tree | ||||
# it is used for differential tree update by store_directories | # it is used for differential tree update by store_directories | ||||
self._last_root = HgDirectory() | self._last_root = HgDirectory() | ||||
# Cache the content hash across revisions to avoid recalculation. | # Cache the content hash across revisions to avoid recalculation. | ||||
self._content_hash_cache: hgutil.LRUCacheDict = hgutil.LRUCacheDict( | self._content_hash_cache: hgutil.LRUCacheDict = hgutil.LRUCacheDict( | ||||
self.config["content_cache_size"], | content_cache_size, | ||||
) | ) | ||||
def pre_cleanup(self) -> None: | def pre_cleanup(self) -> None: | ||||
"""As a first step, will try and check for dangling data to cleanup. | """As a first step, will try and check for dangling data to cleanup. | ||||
This should do its best to avoid raising issues. | This should do its best to avoid raising issues. | ||||
""" | """ | ||||
clean_dangling_folders( | clean_dangling_folders( | ||||
self._temp_directory, | self._temp_directory, | ||||
pattern_check=TEMPORARY_DIR_PREFIX_PATTERN, | pattern_check=TEMPORARY_DIR_PREFIX_PATTERN, | ||||
log=self.log, | log=self.log, | ||||
) | ) | ||||
def cleanup(self) -> None: | def cleanup(self) -> None: | ||||
"""Last step executed by the loader.""" | """Last step executed by the loader.""" | ||||
if self._repo_directory and os.path.exists(self._repo_directory): | if self._repo_directory and os.path.exists(self._repo_directory): | ||||
self.log.debug(f"Cleanup up repository {self._repo_directory}") | self.log.debug(f"Cleanup up repository {self._repo_directory}") | ||||
rmtree(self._repo_directory) | rmtree(self._repo_directory) | ||||
def prepare_origin_visit(self, *args, **kwargs) -> None: | def prepare_origin_visit(self) -> None: | ||||
"""First step executed by the loader to prepare origin and visit | """First step executed by the loader to prepare origin and visit | ||||
references. Set/update self.origin, and | references. Set/update self.origin, and | ||||
optionally self.origin_url, self.visit_date. | optionally self.origin_url, self.visit_date. | ||||
""" | """ | ||||
self.origin = Origin(url=self.origin_url) | self.origin = Origin(url=self.origin_url) | ||||
def prepare(self, *args, **kwargs) -> None: | def prepare(self) -> None: | ||||
"""Second step executed by the loader to prepare some state needed by | """Second step executed by the loader to prepare some state needed by | ||||
the loader. | the loader. | ||||
""" | """ | ||||
def fetch_data(self) -> bool: | def fetch_data(self) -> bool: | ||||
"""Fetch the data from the source the loader is currently loading | """Fetch the data from the source the loader is currently loading | ||||
▲ Show 20 Lines • Show All 271 Lines • ▼ Show 20 Lines | def store_directories(self, rev_ctx: hgutil.BaseContext) -> Sha1Git: | ||||
return self._last_root.hash | return self._last_root.hash | ||||
class HgArchiveLoaderFromDisk(HgLoaderFromDisk): | class HgArchiveLoaderFromDisk(HgLoaderFromDisk): | ||||
"""Mercurial loader for repository wrapped within tarballs.""" | """Mercurial loader for repository wrapped within tarballs.""" | ||||
def __init__( | def __init__( | ||||
self, url: str, visit_date: Optional[datetime] = None, archive_path: str = None | self, | ||||
storage: StorageInterface, | |||||
url: str, | |||||
visit_date: Optional[datetime] = None, | |||||
archive_path: str = None, | |||||
temp_directory: str = "/tmp", | |||||
max_content_size: Optional[int] = None, | |||||
): | ): | ||||
super().__init__( | super().__init__( | ||||
url, | storage=storage, | ||||
url=url, | |||||
visit_date=visit_date, | visit_date=visit_date, | ||||
logging_class="swh.loader.mercurial.ArchiveLoaderFromDisk", | logging_class="swh.loader.mercurial.ArchiveLoaderFromDisk", | ||||
temp_directory=temp_directory, | |||||
max_content_size=max_content_size, | |||||
) | ) | ||||
self.temp_dir = None | self.archive_extract_temp_dir = None | ||||
self.archive_path = archive_path | self.archive_path = archive_path | ||||
def prepare(self, *args, **kwargs): | def prepare(self): | ||||
"""Extract the archive instead of cloning.""" | """Extract the archive instead of cloning.""" | ||||
self._temp_directory = tmp_extract( | self.archive_extract_temp_dir = tmp_extract( | ||||
archive=self.archive_path, | archive=self.archive_path, | ||||
dir=self._temp_directory, | dir=self._temp_directory, | ||||
prefix=TEMPORARY_DIR_PREFIX_PATTERN, | prefix=TEMPORARY_DIR_PREFIX_PATTERN, | ||||
suffix=f".dump-{os.getpid()}", | suffix=f".dump-{os.getpid()}", | ||||
log=self.log, | log=self.log, | ||||
source=self.origin_url, | source=self.origin_url, | ||||
) | ) | ||||
repo_name = os.listdir(self.temp_dir)[0] | repo_name = os.listdir(self.temp_dir)[0] | ||||
self.directory = os.path.join(self.temp_dir, repo_name) | self.directory = os.path.join(self.archive_extract_temp_dir, repo_name) | ||||
super().prepare(*args, **kwargs) | super().prepare() | ||||
def cleanup(self) -> None: | |||||
"""Remove the extracted archive instead of the cloned repository.""" | |||||
if self.temp_dir and os.path.exists(self.temp_dir): | |||||
rmtree(self.temp_dir) | |||||
super().cleanup() | |||||
# Allow direct usage of the loader from the command line with | # Allow direct usage of the loader from the command line with | ||||
# `python -m swh.loader.mercurial.from_disk $ORIGIN_URL` | # `python -m swh.loader.mercurial.from_disk $ORIGIN_URL` | ||||
if __name__ == "__main__": | if __name__ == "__main__": | ||||
import logging | import logging | ||||
import click | import click | ||||
logging.basicConfig( | logging.basicConfig( | ||||
level=logging.DEBUG, format="%(asctime)s %(process)d %(message)s" | level=logging.DEBUG, format="%(asctime)s %(process)d %(message)s" | ||||
) | ) | ||||
@click.command() | @click.command() | ||||
@click.option("--origin-url", help="origin url") | @click.option("--origin-url", help="origin url") | ||||
@click.option("--hg-directory", help="Path to mercurial repository to load") | @click.option("--hg-directory", help="Path to mercurial repository to load") | ||||
@click.option("--visit-date", default=None, help="Visit date") | @click.option("--visit-date", default=None, help="Visit date") | ||||
def main(origin_url, hg_directory, visit_date): | def main(origin_url, hg_directory, visit_date): | ||||
from swh.storage import get_storage | |||||
storage = get_storage(cls="memory") | |||||
return HgLoaderFromDisk( | return HgLoaderFromDisk( | ||||
origin_url, directory=hg_directory, visit_date=visit_date | storage, | ||||
origin_url, | |||||
directory=hg_directory, | |||||
visit_date=parse_visit_date(visit_date), | |||||
).load() | ).load() | ||||
main() | main() |