Changeset View
Changeset View
Standalone View
Standalone View
swh/loader/mercurial/loader.py
# Copyright (C) 2020-2021 The Software Heritage developers | # Copyright (C) 2020-2021 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
"""Loaders for ingesting Mercurial repositories either local from disk, or remote, see | """Loaders for ingesting Mercurial repositories either local from disk, or remote, see | ||||
:class:`swh.loader.mercurial.loader.HgLoader` or from an archive, see | :class:`swh.loader.mercurial.loader.HgLoader` or from an archive, see | ||||
:class:`swh.loader.mercurial.from_disk.HgArchiveLoader`. | :class:`swh.loader.mercurial.from_disk.HgArchiveLoader`. | ||||
""" | """ | ||||
from collections import deque | from collections import deque | ||||
from datetime import datetime | from datetime import datetime | ||||
import os | import os | ||||
from shutil import rmtree | from shutil import rmtree | ||||
from tempfile import mkdtemp | from tempfile import mkdtemp | ||||
from typing import Deque, Dict, Iterator, List, Optional, Set, Tuple, TypeVar, Union | from typing import ( | ||||
Any, | |||||
Deque, | |||||
Dict, | |||||
Iterator, | |||||
List, | |||||
Optional, | |||||
Set, | |||||
Tuple, | |||||
TypeVar, | |||||
Union, | |||||
) | |||||
from swh.core.utils import grouper | from swh.core.utils import grouper | ||||
from swh.loader.core.loader import BaseLoader | from swh.loader.core.loader import BaseLoader | ||||
from swh.loader.core.utils import clean_dangling_folders | from swh.loader.core.utils import clean_dangling_folders | ||||
from swh.loader.mercurial.utils import get_minimum_env | from swh.loader.mercurial.utils import get_minimum_env | ||||
from swh.model import swhids | from swh.model import swhids | ||||
from swh.model.from_disk import Content, DentryPerms, Directory | from swh.model.from_disk import Content, DentryPerms, Directory | ||||
from swh.model.hashutil import hash_to_bytehex | from swh.model.hashutil import hash_to_bytehex | ||||
from swh.model.model import ( | from swh.model.model import ( | ||||
ExtID, | ExtID, | ||||
ObjectType, | ObjectType, | ||||
Origin, | |||||
Person, | Person, | ||||
Release, | Release, | ||||
Revision, | Revision, | ||||
RevisionType, | RevisionType, | ||||
Sha1Git, | Sha1Git, | ||||
Snapshot, | Snapshot, | ||||
SnapshotBranch, | SnapshotBranch, | ||||
TargetType, | TargetType, | ||||
▲ Show 20 Lines • Show All 102 Lines • ▼ Show 20 Lines | class HgLoader(BaseLoader): | ||||
visit_type = "hg" | visit_type = "hg" | ||||
def __init__( | def __init__( | ||||
self, | self, | ||||
storage: StorageInterface, | storage: StorageInterface, | ||||
url: str, | url: str, | ||||
directory: Optional[str] = None, | directory: Optional[str] = None, | ||||
logging_class: str = "swh.loader.mercurial.loader.HgLoader", | |||||
visit_date: Optional[datetime] = None, | visit_date: Optional[datetime] = None, | ||||
temp_directory: str = "/tmp", | temp_directory: str = "/tmp", | ||||
clone_timeout_seconds: int = 7200, | clone_timeout_seconds: int = 7200, | ||||
content_cache_size: int = 10_000, | content_cache_size: int = 10_000, | ||||
max_content_size: Optional[int] = None, | **kwargs: Any, | ||||
): | ): | ||||
"""Initialize the loader. | """Initialize the loader. | ||||
Args: | Args: | ||||
url: url of the repository. | url: url of the repository. | ||||
directory: directory of the local repository. | directory: directory of the local repository. | ||||
logging_class: class of the loader logger. | logging_class: class of the loader logger. | ||||
visit_date: visit date of the repository | visit_date: visit date of the repository | ||||
config: loader configuration | config: loader configuration | ||||
""" | """ | ||||
super().__init__( | super().__init__(storage=storage, origin_url=url, **kwargs) | ||||
storage=storage, | |||||
logging_class=logging_class, | |||||
max_content_size=max_content_size, | |||||
) | |||||
self._temp_directory = temp_directory | self._temp_directory = temp_directory | ||||
self._clone_timeout = clone_timeout_seconds | self._clone_timeout = clone_timeout_seconds | ||||
self.origin_url = url | self.visit_date = visit_date or self.visit_date | ||||
self.visit_date = visit_date | |||||
self.directory = directory | self.directory = directory | ||||
self._repo: Optional[hgutil.Repository] = None | self._repo: Optional[hgutil.Repository] = None | ||||
self._revision_nodeid_to_sha1git: Dict[HgNodeId, Sha1Git] = {} | self._revision_nodeid_to_sha1git: Dict[HgNodeId, Sha1Git] = {} | ||||
self._repo_directory: Optional[str] = None | self._repo_directory: Optional[str] = None | ||||
# keeps the last processed hg nodeid | # keeps the last processed hg nodeid | ||||
# it is used for differential tree update by store_directories | # it is used for differential tree update by store_directories | ||||
▲ Show 20 Lines • Show All 42 Lines • ▼ Show 20 Lines | def cleanup(self) -> None: | ||||
os.environ.update(self.old_environ) | os.environ.update(self.old_environ) | ||||
# Don't cleanup if loading from a local directory | # Don't cleanup if loading from a local directory | ||||
was_remote = self.directory is None | was_remote = self.directory is None | ||||
if was_remote and self._repo_directory and os.path.exists(self._repo_directory): | if was_remote and self._repo_directory and os.path.exists(self._repo_directory): | ||||
self.log.debug(f"Cleanup up repository {self._repo_directory}") | self.log.debug(f"Cleanup up repository {self._repo_directory}") | ||||
rmtree(self._repo_directory) | rmtree(self._repo_directory) | ||||
def prepare_origin_visit(self) -> None: | |||||
"""First step executed by the loader to prepare origin and visit | |||||
references. Set/update self.origin, and | |||||
optionally self.origin_url, self.visit_date. | |||||
""" | |||||
self.origin = Origin(url=self.origin_url) | |||||
def prepare(self) -> None: | def prepare(self) -> None: | ||||
"""Second step executed by the loader to prepare some state needed by | """Second step executed by the loader to prepare some state needed by | ||||
the loader. | the loader. | ||||
""" | """ | ||||
# Set here to allow multiple calls to load on the same loader instance | # Set here to allow multiple calls to load on the same loader instance | ||||
self._latest_heads = [] | self._latest_heads = [] | ||||
latest_snapshot = snapshot_get_latest(self.storage, self.origin_url) | latest_snapshot = snapshot_get_latest(self.storage, self.origin.url) | ||||
if latest_snapshot: | if latest_snapshot: | ||||
self._set_recorded_state(latest_snapshot) | self._set_recorded_state(latest_snapshot) | ||||
def _set_recorded_state(self, latest_snapshot: Snapshot) -> None: | def _set_recorded_state(self, latest_snapshot: Snapshot) -> None: | ||||
""" | """ | ||||
Looks up the nodeid for all revisions in the snapshot via extid_get_from_target, | Looks up the nodeid for all revisions in the snapshot via extid_get_from_target, | ||||
and adds them to `self._latest_heads`. | and adds them to `self._latest_heads`. | ||||
Also looks up the currently saved releases ("tags" in Mercurial speak). | Also looks up the currently saved releases ("tags" in Mercurial speak). | ||||
▲ Show 20 Lines • Show All 80 Lines • ▼ Show 20 Lines | def fetch_data(self) -> bool: | ||||
""" | """ | ||||
if not self.directory: # no local repository | if not self.directory: # no local repository | ||||
self._repo_directory = mkdtemp( | self._repo_directory = mkdtemp( | ||||
prefix=TEMPORARY_DIR_PREFIX_PATTERN, | prefix=TEMPORARY_DIR_PREFIX_PATTERN, | ||||
suffix=f"-{os.getpid()}", | suffix=f"-{os.getpid()}", | ||||
dir=self._temp_directory, | dir=self._temp_directory, | ||||
) | ) | ||||
self.log.debug( | self.log.debug( | ||||
f"Cloning {self.origin_url} to {self._repo_directory} " | f"Cloning {self.origin.url} to {self._repo_directory} " | ||||
f"with timeout {self._clone_timeout} seconds" | f"with timeout {self._clone_timeout} seconds" | ||||
) | ) | ||||
hgutil.clone(self.origin_url, self._repo_directory, self._clone_timeout) | hgutil.clone(self.origin.url, self._repo_directory, self._clone_timeout) | ||||
else: # existing local repository | else: # existing local repository | ||||
# Allow to load on disk repository without cloning | # Allow to load on disk repository without cloning | ||||
# for testing purpose. | # for testing purpose. | ||||
self._repo_directory = self.directory | self._repo_directory = self.directory | ||||
self._repo = hgutil.repository(self._repo_directory) | self._repo = hgutil.repository(self._repo_directory) | ||||
return False | return False | ||||
▲ Show 20 Lines • Show All 455 Lines • ▼ Show 20 Lines | class HgArchiveLoader(HgLoader): | ||||
def prepare(self): | def prepare(self): | ||||
"""Extract the archive instead of cloning.""" | """Extract the archive instead of cloning.""" | ||||
self.archive_extract_temp_dir = tmp_extract( | self.archive_extract_temp_dir = tmp_extract( | ||||
archive=self.archive_path, | archive=self.archive_path, | ||||
dir=self._temp_directory, | dir=self._temp_directory, | ||||
prefix=TEMPORARY_DIR_PREFIX_PATTERN, | prefix=TEMPORARY_DIR_PREFIX_PATTERN, | ||||
suffix=f".dump-{os.getpid()}", | suffix=f".dump-{os.getpid()}", | ||||
log=self.log, | log=self.log, | ||||
source=self.origin_url, | source=self.origin.url, | ||||
) | ) | ||||
repo_name = os.listdir(self.temp_dir)[0] | repo_name = os.listdir(self.temp_dir)[0] | ||||
self.directory = os.path.join(self.archive_extract_temp_dir, repo_name) | self.directory = os.path.join(self.archive_extract_temp_dir, repo_name) | ||||
super().prepare() | super().prepare() |