Changeset View
Changeset View
Standalone View
Standalone View
swh/loader/mercurial/loader.py
Show All 22 Lines | |||||
import random | import random | ||||
import re | import re | ||||
from shutil import rmtree | from shutil import rmtree | ||||
from tempfile import mkdtemp | from tempfile import mkdtemp | ||||
import time | import time | ||||
from typing import Any, Dict, Iterable, List, Optional | from typing import Any, Dict, Iterable, List, Optional | ||||
import billiard | import billiard | ||||
from dateutil import parser | |||||
import hglib | import hglib | ||||
from hglib.error import CommandError | from hglib.error import CommandError | ||||
from swh.core.config import merge_configs | |||||
from swh.loader.core.loader import DVCSLoader | from swh.loader.core.loader import DVCSLoader | ||||
from swh.loader.core.utils import clean_dangling_folders | from swh.loader.core.utils import clean_dangling_folders | ||||
from swh.loader.exception import NotFound | from swh.loader.exception import NotFound | ||||
from swh.model import identifiers | from swh.model import identifiers | ||||
from swh.model.hashutil import ( | from swh.model.hashutil import ( | ||||
DEFAULT_ALGORITHMS, | DEFAULT_ALGORITHMS, | ||||
MultiHash, | MultiHash, | ||||
hash_to_bytehex, | hash_to_bytehex, | ||||
Show All 13 Lines | from swh.model.model import ( | ||||
Sha1Git, | Sha1Git, | ||||
SkippedContent, | SkippedContent, | ||||
Snapshot, | Snapshot, | ||||
SnapshotBranch, | SnapshotBranch, | ||||
TargetType, | TargetType, | ||||
TimestampWithTimezone, | TimestampWithTimezone, | ||||
) | ) | ||||
from swh.storage.algos.origin import origin_get_latest_visit_status | from swh.storage.algos.origin import origin_get_latest_visit_status | ||||
from swh.storage.interface import StorageInterface | |||||
from . import converters | from . import converters | ||||
from .archive_extract import tmp_extract | from .archive_extract import tmp_extract | ||||
from .bundle20_reader import Bundle20Reader | from .bundle20_reader import Bundle20Reader | ||||
from .converters import PRIMARY_ALGO as ALGO | from .converters import PRIMARY_ALGO as ALGO | ||||
from .objects import SelectiveCache, SimpleTree | from .objects import SelectiveCache, SimpleTree | ||||
TAG_PATTERN = re.compile("[0-9A-Fa-f]{40}") | TAG_PATTERN = re.compile("[0-9A-Fa-f]{40}") | ||||
Show All 15 Lines | class CommandErrorWrapper(Exception): | ||||
def __init__(self, err: Optional[bytes]): | def __init__(self, err: Optional[bytes]): | ||||
self.err = err | self.err = err | ||||
class CloneTimeoutError(Exception): | class CloneTimeoutError(Exception): | ||||
pass | pass | ||||
DEFAULT_CONFIG: Dict[str, Any] = { | |||||
"bundle_filename": "HG20_none_bundle", | |||||
"reduce_effort": False, | |||||
"temp_directory": "/tmp", | |||||
"cache1_size": 800 * 1024 * 1024, | |||||
"cache2_size": 800 * 1024 * 1024, | |||||
"clone_timeout_seconds": 7200, | |||||
} | |||||
class HgBundle20Loader(DVCSLoader): | class HgBundle20Loader(DVCSLoader): | ||||
"""Mercurial loader able to deal with remote or local repository. | """Mercurial loader able to deal with remote or local repository. | ||||
""" | """ | ||||
visit_type = "hg" | visit_type = "hg" | ||||
def __init__( | def __init__( | ||||
self, | self, | ||||
url, | storage: StorageInterface, | ||||
visit_date=None, | url: str, | ||||
directory=None, | visit_date: Optional[datetime.datetime] = None, | ||||
directory: Optional[str] = None, | |||||
logging_class="swh.loader.mercurial.Bundle20Loader", | logging_class="swh.loader.mercurial.Bundle20Loader", | ||||
bundle_filename: Optional[str] = "HG20_none_bundle", | |||||
reduce_effort: bool = False, | |||||
temp_directory: str = "/tmp", | |||||
cache1_size: int = 800 * 1024 * 1024, | |||||
cache2_size: int = 800 * 1024 * 1024, | |||||
clone_timeout_seconds: int = 7200, | |||||
save_data_path: Optional[str] = None, | |||||
max_content_size: Optional[int] = None, | |||||
): | ): | ||||
super().__init__(logging_class=logging_class) | super().__init__( | ||||
self.config = merge_configs(DEFAULT_CONFIG, self.config) | storage=storage, | ||||
logging_class=logging_class, | |||||
save_data_path=save_data_path, | |||||
max_content_size=max_content_size, | |||||
) | |||||
self.origin_url = url | self.origin_url = url | ||||
self.visit_date = visit_date | self.visit_date = visit_date | ||||
self.directory = directory | self.directory = directory | ||||
self.bundle_filename = self.config["bundle_filename"] | self.bundle_filename = bundle_filename | ||||
self.reduce_effort_flag = self.config["reduce_effort"] | self.reduce_effort_flag = reduce_effort | ||||
self.empty_repository = None | self.empty_repository = None | ||||
self.temp_directory = self.config["temp_directory"] | self.temp_directory = temp_directory | ||||
self.cache1_size = self.config["cache1_size"] | self.cache1_size = cache1_size | ||||
self.cache2_size = self.config["cache2_size"] | self.cache2_size = cache2_size | ||||
self.clone_timeout = self.config["clone_timeout_seconds"] | self.clone_timeout = clone_timeout_seconds | ||||
self.working_directory = None | self.working_directory = None | ||||
self.bundle_path = None | self.bundle_path = None | ||||
self.heads = {} | self.heads: Dict[bytes, Any] = {} | ||||
self.releases = {} | self.releases: Dict[bytes, Any] = {} | ||||
self.last_snapshot_id: Optional[bytes] = None | self.last_snapshot_id: Optional[bytes] = None | ||||
def pre_cleanup(self): | def pre_cleanup(self): | ||||
"""Cleanup potential dangling files from prior runs (e.g. OOM killed | """Cleanup potential dangling files from prior runs (e.g. OOM killed | ||||
tasks) | tasks) | ||||
""" | """ | ||||
clean_dangling_folders( | clean_dangling_folders( | ||||
Show All 30 Lines | def get_heads(self, repo): | ||||
bookmarks = repo.bookmarks() | bookmarks = repo.bookmarks() | ||||
if bookmarks and bookmarks[0]: | if bookmarks and bookmarks[0]: | ||||
for bookmark_name, _, target_short in bookmarks[0]: | for bookmark_name, _, target_short in bookmarks[0]: | ||||
target = repo[target_short].node() | target = repo[target_short].node() | ||||
b[bookmark_name] = (None, hash_to_bytes(target.decode())) | b[bookmark_name] = (None, hash_to_bytes(target.decode())) | ||||
return b | return b | ||||
def prepare_origin_visit(self, *args, **kwargs) -> None: | def prepare_origin_visit(self) -> None: | ||||
self.origin = Origin(url=self.origin_url) | self.origin = Origin(url=self.origin_url) | ||||
visit_date = self.visit_date | |||||
if isinstance(visit_date, str): # visit_date can be string or datetime | |||||
visit_date = parser.parse(visit_date) | |||||
self.visit_date = visit_date | |||||
visit_status = origin_get_latest_visit_status( | visit_status = origin_get_latest_visit_status( | ||||
self.storage, self.origin_url, require_snapshot=True | self.storage, self.origin_url, require_snapshot=True | ||||
) | ) | ||||
self.last_snapshot_id = None if visit_status is None else visit_status.snapshot | self.last_snapshot_id = None if visit_status is None else visit_status.snapshot | ||||
@staticmethod | @staticmethod | ||||
def clone_with_timeout(log, origin, destination, timeout): | def clone_with_timeout(log, origin, destination, timeout): | ||||
queue = billiard.Queue() | queue = billiard.Queue() | ||||
Show All 30 Lines | def clone_with_timeout(log, origin, destination, timeout): | ||||
process.join() | process.join() | ||||
if isinstance(result, Exception): | if isinstance(result, Exception): | ||||
raise result from None | raise result from None | ||||
return result | return result | ||||
def prepare(self, *args, **kwargs): | def prepare(self): | ||||
"""Prepare the necessary steps to load an actual remote or local | """Prepare the necessary steps to load an actual remote or local | ||||
repository. | repository. | ||||
To load a local repository, pass the optional directory | To load a local repository, pass the optional directory | ||||
parameter as filled with a path to a real local folder. | parameter as filled with a path to a real local folder. | ||||
To load a remote repository, pass the optional directory | To load a remote repository, pass the optional directory | ||||
parameter as None. | parameter as None. | ||||
▲ Show 20 Lines • Show All 390 Lines • ▼ Show 20 Lines | def load_status(self): | ||||
} | } | ||||
class HgArchiveBundle20Loader(HgBundle20Loader): | class HgArchiveBundle20Loader(HgBundle20Loader): | ||||
"""Mercurial loader for repository wrapped within archives. | """Mercurial loader for repository wrapped within archives. | ||||
""" | """ | ||||
def __init__(self, url, visit_date=None, archive_path=None): | def __init__( | ||||
self, | |||||
storage: StorageInterface, | |||||
url: str, | |||||
visit_date: Optional[datetime.datetime] = None, | |||||
archive_path=None, | |||||
temp_directory: str = "/tmp", | |||||
max_content_size: Optional[int] = None, | |||||
): | |||||
super().__init__( | super().__init__( | ||||
url, | storage=storage, | ||||
url=url, | |||||
visit_date=visit_date, | visit_date=visit_date, | ||||
logging_class="swh.loader.mercurial.HgArchiveBundle20Loader", | logging_class="swh.loader.mercurial.HgArchiveBundle20Loader", | ||||
temp_directory=temp_directory, | |||||
max_content_size=max_content_size, | |||||
) | ) | ||||
self.temp_dir = None | self.archive_extract_temp_dir = None | ||||
self.archive_path = archive_path | self.archive_path = archive_path | ||||
def prepare(self, *args, **kwargs): | def prepare(self): | ||||
self.temp_dir = tmp_extract( | self.archive_extract_temp_dir = tmp_extract( | ||||
archive=self.archive_path, | archive=self.archive_path, | ||||
dir=self.temp_directory, | dir=self.temp_directory, | ||||
prefix=TEMPORARY_DIR_PREFIX_PATTERN, | prefix=TEMPORARY_DIR_PREFIX_PATTERN, | ||||
suffix=".dump-%s" % os.getpid(), | suffix=".dump-%s" % os.getpid(), | ||||
log=self.log, | log=self.log, | ||||
source=self.origin_url, | source=self.origin_url, | ||||
) | ) | ||||
repo_name = os.listdir(self.temp_dir)[0] | repo_name = os.listdir(self.archive_extract_temp_dir)[0] | ||||
self.directory = os.path.join(self.temp_dir, repo_name) | self.directory = os.path.join(self.archive_extract_temp_dir, repo_name) | ||||
super().prepare(*args, **kwargs) | super().prepare() | ||||
def cleanup(self): | |||||
if self.temp_dir and os.path.exists(self.temp_dir): | |||||
rmtree(self.temp_dir) | |||||
super().cleanup() |