Changeset View
Changeset View
Standalone View
Standalone View
swh/loader/svn/loader.py
# Copyright (C) 2015-2021 The Software Heritage developers | # Copyright (C) 2015-2021 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
"""Loader in charge of injecting either new or existing svn mirrors to | """Loader in charge of injecting either new or existing svn mirrors to | ||||
swh-storage. | swh-storage. | ||||
""" | """ | ||||
from mmap import ACCESS_WRITE, mmap | from mmap import ACCESS_WRITE, mmap | ||||
import os | import os | ||||
import pty | import pty | ||||
import re | import re | ||||
import shutil | import shutil | ||||
from subprocess import Popen | from subprocess import Popen | ||||
import tempfile | import tempfile | ||||
from typing import Any, Dict, Iterator, List, Optional, Tuple | from typing import Dict, Iterator, List, Optional, Tuple | ||||
import iso8601 | |||||
from subvertpy import SubversionException | from subvertpy import SubversionException | ||||
from swh.core.config import merge_configs | |||||
from swh.loader.core.loader import BaseLoader | from swh.loader.core.loader import BaseLoader | ||||
from swh.loader.core.utils import clean_dangling_folders | from swh.loader.core.utils import clean_dangling_folders | ||||
from swh.loader.exception import NotFound | from swh.loader.exception import NotFound | ||||
from swh.loader.svn.svn import SvnRepo | from swh.loader.svn.svn import SvnRepo | ||||
from swh.model import from_disk, hashutil | from swh.model import from_disk, hashutil | ||||
from swh.model.model import ( | from swh.model.model import ( | ||||
Content, | Content, | ||||
Directory, | Directory, | ||||
Origin, | Origin, | ||||
Revision, | Revision, | ||||
SkippedContent, | SkippedContent, | ||||
Snapshot, | Snapshot, | ||||
SnapshotBranch, | SnapshotBranch, | ||||
TargetType, | TargetType, | ||||
) | ) | ||||
from swh.storage.algos.snapshot import snapshot_get_latest | from swh.storage.algos.snapshot import snapshot_get_latest | ||||
from swh.storage.interface import StorageInterface | |||||
from . import converters | from . import converters | ||||
from .exception import SvnLoaderHistoryAltered, SvnLoaderUneventful | from .exception import SvnLoaderHistoryAltered, SvnLoaderUneventful | ||||
from .utils import ( | from .utils import ( | ||||
OutputStream, | OutputStream, | ||||
init_svn_repo_from_archive_dump, | init_svn_repo_from_archive_dump, | ||||
init_svn_repo_from_dump, | init_svn_repo_from_dump, | ||||
) | ) | ||||
DEFAULT_BRANCH = b"HEAD" | DEFAULT_BRANCH = b"HEAD" | ||||
TEMPORARY_DIR_PREFIX_PATTERN = "swh.loader.svn." | TEMPORARY_DIR_PREFIX_PATTERN = "swh.loader.svn." | ||||
DEFAULT_CONFIG: Dict[str, Any] = { | |||||
"temp_directory": "/tmp", | |||||
"debug": False, # NOT FOR PRODUCTION: False for production | |||||
"check_revision": { | |||||
"status": False, # True: check the revision, False: don't check | |||||
"limit": 1000, # Periodicity check | |||||
}, | |||||
} | |||||
class SvnLoader(BaseLoader): | class SvnLoader(BaseLoader): | ||||
"""Swh svn loader. | """Swh svn loader. | ||||
The repository is either remote or local. The loader deals with | The repository is either remote or local. The loader deals with | ||||
update on an already previously loaded repository. | update on an already previously loaded repository. | ||||
""" | """ | ||||
visit_type = "svn" | visit_type = "svn" | ||||
def __init__( | def __init__( | ||||
self, | self, | ||||
url, | storage: StorageInterface, | ||||
origin_url=None, | url: str, | ||||
visit_date=None, | origin_url: Optional[str] = None, | ||||
destination_path=None, | visit_date: Optional[str] = None, | ||||
swh_revision=None, | destination_path: Optional[str] = None, | ||||
start_from_scratch=False, | swh_revision: Optional[str] = None, | ||||
start_from_scratch: bool = False, | |||||
temp_directory: str = "/tmp", | |||||
debug: bool = False, | |||||
anlambert: would be simpler to use an int with a default value of 0 here
The test `if self. | |||||
Done Inline ActionsI should default to 1000 as before to avoid changing behavior. Nonetheless, ack on your suggestion. It's not clear why i used Optional here... ardumont: I should default to 1000 as before to avoid changing behavior.
Nonetheless, ack on your… | |||||
Done Inline Actionsapparently i misremembered, it's not checking by default ¯\_(ツ)_/¯... (also checked swh-site). ardumont: apparently i misremembered, it's not checking by default ¯\_(ツ)_/¯... (also checked swh-site). | |||||
Not Done Inline ActionsI think you should set it to 0 as default value as revision checks was not enabled by default previously. anlambert: I think you should set it to 0 as default value as revision checks was not enabled by default… | |||||
Done Inline Actionsyes, agreed ardumont: yes, agreed | |||||
check_revision: int = 0, | |||||
max_content_size: Optional[int] = None, | |||||
): | ): | ||||
super().__init__(logging_class="swh.loader.svn.SvnLoader") | super().__init__( | ||||
self.config = merge_configs(DEFAULT_CONFIG, self.config) | storage=storage, | ||||
logging_class="swh.loader.svn.SvnLoader", | |||||
max_content_size=max_content_size, | |||||
) | |||||
# technical svn uri to act on svn repository | # technical svn uri to act on svn repository | ||||
self.svn_url = url | self.svn_url = url | ||||
# origin url as unique identifier for origin in swh archive | # origin url as unique identifier for origin in swh archive | ||||
self.origin_url = origin_url if origin_url else self.svn_url | self.origin_url = origin_url if origin_url else self.svn_url | ||||
self.debug = self.config["debug"] | self.debug = debug | ||||
self.temp_directory = self.config["temp_directory"] | self.temp_directory = temp_directory | ||||
self.done = False | self.done = False | ||||
self.svnrepo = None | self.svnrepo = None | ||||
# Revision check is configurable | # Revision check is configurable | ||||
check_revision = self.config["check_revision"] | self.check_revision = check_revision | ||||
Not Done Inline Actionsself.check_revision = check_revision if you remove Optional. anlambert: `self.check_revision = check_revision` if you remove Optional. | |||||
if check_revision["status"]: | |||||
self.check_revision = check_revision["limit"] | |||||
else: | |||||
self.check_revision = None | |||||
# internal state used to store swh objects | # internal state used to store swh objects | ||||
self._contents = [] | self._contents: List[Content] = [] | ||||
self._skipped_contents = [] | self._skipped_contents: List[SkippedContent] = [] | ||||
self._directories = [] | self._directories: List[Directory] = [] | ||||
self._revisions = [] | self._revisions: List[Revision] = [] | ||||
self._snapshot: Optional[Snapshot] = None | self._snapshot: Optional[Snapshot] = None | ||||
# internal state, current visit | # internal state, current visit | ||||
self._last_revision = None | self._last_revision = None | ||||
self._visit_status = "full" | self._visit_status = "full" | ||||
self._load_status = "uneventful" | self._load_status = "uneventful" | ||||
self.visit_date = visit_date | if visit_date: | ||||
self.visit_date = iso8601.parse_date(visit_date) | |||||
else: | |||||
self.visit_date = None | |||||
self.destination_path = destination_path | self.destination_path = destination_path | ||||
self.start_from_scratch = start_from_scratch | self.start_from_scratch = start_from_scratch | ||||
self.max_content_length = self.config["max_content_size"] | |||||
self.snapshot = None | self.snapshot = None | ||||
# state from previous visit | # state from previous visit | ||||
self.latest_snapshot = None | self.latest_snapshot = None | ||||
self.latest_revision = None | self.latest_revision = None | ||||
def pre_cleanup(self): | def pre_cleanup(self): | ||||
"""Cleanup potential dangling files from prior runs (e.g. OOM killed | """Cleanup potential dangling files from prior runs (e.g. OOM killed | ||||
tasks) | tasks) | ||||
▲ Show 20 Lines • Show All 123 Lines • ▼ Show 20 Lines | ) -> Tuple[int, int, Dict[int, Tuple[bytes, ...]]]: | ||||
Raises: | Raises: | ||||
SvnLoaderHistoryAltered: When a hash divergence has been | SvnLoaderHistoryAltered: When a hash divergence has been | ||||
detected (should not happen) | detected (should not happen) | ||||
SvnLoaderUneventful: Nothing changed since last visit | SvnLoaderUneventful: Nothing changed since last visit | ||||
""" | """ | ||||
assert self.svnrepo is not None, "svnrepo initialized in the `prepare` method" | |||||
revision_head = self.svnrepo.head_revision() | revision_head = self.svnrepo.head_revision() | ||||
if revision_head == 0: # empty repository case | if revision_head == 0: # empty repository case | ||||
revision_start = 0 | revision_start = 0 | ||||
revision_end = 0 | revision_end = 0 | ||||
else: # default configuration | else: # default configuration | ||||
revision_start = self.svnrepo.initial_revision() | revision_start = self.svnrepo.initial_revision() | ||||
revision_end = revision_head | revision_end = revision_head | ||||
▲ Show 20 Lines • Show All 53 Lines • ▼ Show 20 Lines | def _check_revision_divergence(self, count, rev, dir_id): | ||||
Returns: | Returns: | ||||
False if no hash divergence detected | False if no hash divergence detected | ||||
Raises | Raises | ||||
ValueError if a hash divergence is detected | ValueError if a hash divergence is detected | ||||
""" | """ | ||||
if (count % self.check_revision) == 0: # hash computation check | # hash computation check | ||||
if (self.check_revision != 0 and count % self.check_revision) == 0: | |||||
self.log.debug("Checking hash computations on revision %s..." % rev) | self.log.debug("Checking hash computations on revision %s..." % rev) | ||||
checked_dir_id = self.swh_revision_hash_tree_at_svn_revision(rev) | checked_dir_id = self.swh_revision_hash_tree_at_svn_revision(rev) | ||||
if checked_dir_id != dir_id: | if checked_dir_id != dir_id: | ||||
err = ( | err = ( | ||||
"Hash tree computation divergence detected " | "Hash tree computation divergence detected " | ||||
"(%s != %s), stopping!" | "(%s != %s), stopping!" | ||||
% ( | % ( | ||||
hashutil.hash_to_hex(dir_id), | hashutil.hash_to_hex(dir_id), | ||||
▲ Show 20 Lines • Show All 51 Lines • ▼ Show 20 Lines | ]: | ||||
if self.check_revision: | if self.check_revision: | ||||
self._check_revision_divergence(count, rev, dir_id) | self._check_revision_divergence(count, rev, dir_id) | ||||
if nextrev: | if nextrev: | ||||
revision_parents[nextrev] = [swh_revision.id] | revision_parents[nextrev] = [swh_revision.id] | ||||
yield _contents, _skipped_contents, _directories, swh_revision | yield _contents, _skipped_contents, _directories, swh_revision | ||||
def prepare_origin_visit(self, *args, **kwargs): | def prepare_origin_visit(self): | ||||
self.origin = Origin(url=self.origin_url if self.origin_url else self.svn_url) | self.origin = Origin(url=self.origin_url if self.origin_url else self.svn_url) | ||||
def prepare(self, *args, **kwargs): | def prepare(self): | ||||
latest_snapshot_revision = self._latest_snapshot_revision(self.origin_url) | latest_snapshot_revision = self._latest_snapshot_revision(self.origin_url) | ||||
if latest_snapshot_revision: | if latest_snapshot_revision: | ||||
self.latest_snapshot, self.latest_revision = latest_snapshot_revision | self.latest_snapshot, self.latest_revision = latest_snapshot_revision | ||||
if self.destination_path: | if self.destination_path: | ||||
local_dirname = self.destination_path | local_dirname = self.destination_path | ||||
else: | else: | ||||
local_dirname = tempfile.mkdtemp( | local_dirname = tempfile.mkdtemp( | ||||
suffix="-%s" % os.getpid(), | suffix="-%s" % os.getpid(), | ||||
prefix=TEMPORARY_DIR_PREFIX_PATTERN, | prefix=TEMPORARY_DIR_PREFIX_PATTERN, | ||||
dir=self.temp_directory, | dir=self.temp_directory, | ||||
) | ) | ||||
try: | try: | ||||
self.svnrepo = SvnRepo( | self.svnrepo = SvnRepo( | ||||
self.svn_url, self.origin_url, local_dirname, self.max_content_length | self.svn_url, self.origin_url, local_dirname, self.max_content_size | ||||
) | ) | ||||
except SubversionException as e: | except SubversionException as e: | ||||
error_msgs = [ | error_msgs = [ | ||||
"Unable to connect to a repository at URL", | "Unable to connect to a repository at URL", | ||||
"Unknown URL type", | "Unknown URL type", | ||||
] | ] | ||||
for msg in error_msgs: | for msg in error_msgs: | ||||
if msg in e.args[0]: | if msg in e.args[0]: | ||||
▲ Show 20 Lines • Show All 128 Lines • ▼ Show 20 Lines | |||||
class SvnLoaderFromDumpArchive(SvnLoader): | class SvnLoaderFromDumpArchive(SvnLoader): | ||||
"""Uncompress an archive containing an svn dump, mount the svn dump as | """Uncompress an archive containing an svn dump, mount the svn dump as | ||||
an svn repository and load said repository. | an svn repository and load said repository. | ||||
""" | """ | ||||
def __init__( | def __init__( | ||||
self, | self, | ||||
url, | storage: StorageInterface, | ||||
archive_path, | url: str, | ||||
origin_url=None, | archive_path: str, | ||||
destination_path=None, | origin_url: Optional[str] = None, | ||||
swh_revision=None, | destination_path: Optional[str] = None, | ||||
start_from_scratch=None, | swh_revision: Optional[str] = None, | ||||
visit_date=None, | start_from_scratch: bool = False, | ||||
visit_date: Optional[str] = None, | |||||
temp_directory: str = "/tmp", | |||||
debug: bool = False, | |||||
check_revision: int = 0, | |||||
max_content_size: Optional[int] = None, | |||||
): | ): | ||||
super().__init__( | super().__init__( | ||||
url, | storage=storage, | ||||
url=url, | |||||
origin_url=origin_url, | origin_url=origin_url, | ||||
destination_path=destination_path, | destination_path=destination_path, | ||||
swh_revision=swh_revision, | swh_revision=swh_revision, | ||||
start_from_scratch=start_from_scratch, | start_from_scratch=start_from_scratch, | ||||
visit_date=visit_date, | visit_date=visit_date, | ||||
temp_directory=temp_directory, | |||||
debug=debug, | |||||
check_revision=check_revision, | |||||
max_content_size=max_content_size, | |||||
) | ) | ||||
self.archive_path = archive_path | self.archive_path = archive_path | ||||
self.temp_dir = None | self.temp_dir = None | ||||
self.repo_path = None | self.repo_path = None | ||||
def prepare(self, *args, **kwargs): | def prepare(self): | ||||
self.log.info("Archive to mount and load %s" % self.archive_path) | self.log.info("Archive to mount and load %s" % self.archive_path) | ||||
self.temp_dir, self.repo_path = init_svn_repo_from_archive_dump( | self.temp_dir, self.repo_path = init_svn_repo_from_archive_dump( | ||||
self.archive_path, | self.archive_path, | ||||
prefix=TEMPORARY_DIR_PREFIX_PATTERN, | prefix=TEMPORARY_DIR_PREFIX_PATTERN, | ||||
suffix="-%s" % os.getpid(), | suffix="-%s" % os.getpid(), | ||||
root_dir=self.temp_directory, | root_dir=self.temp_directory, | ||||
) | ) | ||||
super().prepare(*args, **kwargs) | super().prepare() | ||||
def cleanup(self): | def cleanup(self): | ||||
super().cleanup() | super().cleanup() | ||||
if self.temp_dir and os.path.exists(self.temp_dir): | if self.temp_dir and os.path.exists(self.temp_dir): | ||||
msg = "Clean up temporary directory dump %s for project %s" % ( | msg = "Clean up temporary directory dump %s for project %s" % ( | ||||
self.temp_dir, | self.temp_dir, | ||||
os.path.basename(self.repo_path), | os.path.basename(self.repo_path), | ||||
) | ) | ||||
self.log.debug(msg) | self.log.debug(msg) | ||||
shutil.rmtree(self.temp_dir) | shutil.rmtree(self.temp_dir) | ||||
class SvnLoaderFromRemoteDump(SvnLoader): | class SvnLoaderFromRemoteDump(SvnLoader): | ||||
""" | """ | ||||
Create a subversion repository dump using the svnrdump utility, | Create a subversion repository dump using the svnrdump utility, | ||||
mount it locally and load the repository from it. | mount it locally and load the repository from it. | ||||
""" | """ | ||||
def __init__( | def __init__( | ||||
self, | self, | ||||
url, | storage: StorageInterface, | ||||
origin_url=None, | url: str, | ||||
destination_path=None, | origin_url: Optional[str] = None, | ||||
swh_revision=None, | destination_path: Optional[str] = None, | ||||
start_from_scratch=False, | swh_revision: Optional[str] = None, | ||||
visit_date=None, | start_from_scratch: bool = False, | ||||
visit_date: Optional[str] = None, | |||||
temp_directory: str = "/tmp", | |||||
debug: bool = False, | |||||
check_revision: int = 0, | |||||
max_content_size: Optional[int] = None, | |||||
): | ): | ||||
super().__init__( | super().__init__( | ||||
url, | storage=storage, | ||||
url=url, | |||||
origin_url=origin_url, | origin_url=origin_url, | ||||
destination_path=destination_path, | destination_path=destination_path, | ||||
swh_revision=swh_revision, | swh_revision=swh_revision, | ||||
start_from_scratch=start_from_scratch, | start_from_scratch=start_from_scratch, | ||||
visit_date=visit_date, | visit_date=visit_date, | ||||
temp_directory=temp_directory, | |||||
debug=debug, | |||||
check_revision=check_revision, | |||||
max_content_size=max_content_size, | |||||
) | ) | ||||
self.temp_dir = tempfile.mkdtemp(dir=self.temp_directory) | self.temp_dir = tempfile.mkdtemp(dir=self.temp_directory) | ||||
self.repo_path = None | self.repo_path = None | ||||
self.truncated_dump = False | self.truncated_dump = False | ||||
def get_last_loaded_svn_rev(self, svn_url: str) -> int: | def get_last_loaded_svn_rev(self, svn_url: str) -> int: | ||||
"""Check if the svn repository has already been visited and return the last | """Check if the svn repository has already been visited and return the last | ||||
loaded svn revision number or -1 otherwise. | loaded svn revision number or -1 otherwise. | ||||
▲ Show 20 Lines • Show All 101 Lines • ▼ Show 20 Lines | def dump_svn_revisions(self, svn_url, last_loaded_svn_rev=-1): | ||||
% (last_dumped_rev, last_loaded_svn_rev) | % (last_dumped_rev, last_loaded_svn_rev) | ||||
) | ) | ||||
raise Exception( | raise Exception( | ||||
"An error occurred when running svnrdump and " | "An error occurred when running svnrdump and " | ||||
"no exploitable dump file has been generated." | "no exploitable dump file has been generated." | ||||
) | ) | ||||
def prepare(self, *args, **kwargs): | def prepare(self): | ||||
# First, check if previous revisions have been loaded for the | # First, check if previous revisions have been loaded for the | ||||
# subversion origin and get the number of the last one | # subversion origin and get the number of the last one | ||||
last_loaded_svn_rev = self.get_last_loaded_svn_rev(self.svn_url) | last_loaded_svn_rev = self.get_last_loaded_svn_rev(self.svn_url) | ||||
# Then try to generate a dump file containing relevant svn revisions | # Then try to generate a dump file containing relevant svn revisions | ||||
# to load, an exception will be thrown if something wrong happened | # to load, an exception will be thrown if something wrong happened | ||||
dump_path = self.dump_svn_revisions(self.svn_url, last_loaded_svn_rev) | dump_path = self.dump_svn_revisions(self.svn_url, last_loaded_svn_rev) | ||||
# Finally, mount the dump and load the repository | # Finally, mount the dump and load the repository | ||||
self.log.debug('Mounting dump file with "svnadmin load".') | self.log.debug('Mounting dump file with "svnadmin load".') | ||||
_, self.repo_path = init_svn_repo_from_dump( | _, self.repo_path = init_svn_repo_from_dump( | ||||
dump_path, | dump_path, | ||||
prefix=TEMPORARY_DIR_PREFIX_PATTERN, | prefix=TEMPORARY_DIR_PREFIX_PATTERN, | ||||
suffix="-%s" % os.getpid(), | suffix="-%s" % os.getpid(), | ||||
root_dir=self.temp_dir, | root_dir=self.temp_dir, | ||||
) | ) | ||||
self.svn_url = "file://%s" % self.repo_path | self.svn_url = "file://%s" % self.repo_path | ||||
super().prepare(*args, **kwargs) | super().prepare() | ||||
def cleanup(self): | def cleanup(self): | ||||
super().cleanup() | super().cleanup() | ||||
if self.temp_dir and os.path.exists(self.temp_dir): | if self.temp_dir and os.path.exists(self.temp_dir): | ||||
shutil.rmtree(self.temp_dir) | shutil.rmtree(self.temp_dir) | ||||
def visit_status(self): | def visit_status(self): | ||||
if self.truncated_dump: | if self.truncated_dump: | ||||
return "partial" | return "partial" | ||||
else: | else: | ||||
return super().visit_status() | return super().visit_status() |
would be simpler to use an int with a default value of 0 here
The test if self.check_revision will still behave the same.