Changeset View
Changeset View
Standalone View
Standalone View
swh/loader/svn/loader.py
Show All 9 Lines | |||||
from datetime import datetime | from datetime import datetime | ||||
from mmap import ACCESS_WRITE, mmap | from mmap import ACCESS_WRITE, mmap | ||||
import os | import os | ||||
import pty | import pty | ||||
import re | import re | ||||
import shutil | import shutil | ||||
from subprocess import Popen | from subprocess import Popen | ||||
import tempfile | import tempfile | ||||
from typing import Dict, Iterator, List, Optional, Tuple | from typing import Dict, Iterator, List, Optional, Sequence, Tuple | ||||
from subvertpy import SubversionException | from subvertpy import SubversionException | ||||
from swh.loader.core.loader import BaseLoader | from swh.loader.core.loader import BaseLoader | ||||
from swh.loader.core.utils import clean_dangling_folders | from swh.loader.core.utils import clean_dangling_folders | ||||
from swh.loader.exception import NotFound | from swh.loader.exception import NotFound | ||||
from swh.loader.svn.svn import SvnRepo | from swh.loader.svn.svn import SvnRepo | ||||
from swh.model import from_disk, hashutil | from swh.model import from_disk, hashutil | ||||
Show All 20 Lines | |||||
DEFAULT_BRANCH = b"HEAD" | DEFAULT_BRANCH = b"HEAD" | ||||
TEMPORARY_DIR_PREFIX_PATTERN = "swh.loader.svn." | TEMPORARY_DIR_PREFIX_PATTERN = "swh.loader.svn." | ||||
SUBVERSION_ERROR = re.compile(r".*(E[0-9]{6}):.*") | SUBVERSION_ERROR = re.compile(r".*(E[0-9]{6}):.*") | ||||
SUBVERSION_NOT_FOUND = "E170013" | SUBVERSION_NOT_FOUND = "E170013" | ||||
class SvnLoader(BaseLoader): | class SvnLoader(BaseLoader): | ||||
"""Swh svn loader. | """SVN loader. The repository is either remote or local. The loader deals with | ||||
The repository is either remote or local. The loader deals with | |||||
update on an already previously loaded repository. | update on an already previously loaded repository. | ||||
""" | """ | ||||
visit_type = "svn" | visit_type = "svn" | ||||
def __init__( | def __init__( | ||||
self, | self, | ||||
storage: StorageInterface, | storage: StorageInterface, | ||||
url: str, | url: str, | ||||
origin_url: Optional[str] = None, | origin_url: Optional[str] = None, | ||||
visit_date: Optional[datetime] = None, | visit_date: Optional[datetime] = None, | ||||
destination_path: Optional[str] = None, | |||||
swh_revision: Optional[str] = None, | |||||
incremental: bool = True, | incremental: bool = True, | ||||
temp_directory: str = "/tmp", | temp_directory: str = "/tmp", | ||||
debug: bool = False, | debug: bool = False, | ||||
check_revision: int = 0, | check_revision: int = 0, | ||||
max_content_size: Optional[int] = None, | max_content_size: Optional[int] = None, | ||||
): | ): | ||||
"""Load an svn repository. | """Load a svn repository (either remote or local). | ||||
Args: | Args: | ||||
... | url: The default origin url | ||||
origin_url: Optional original url override to use as origin reference in the | |||||
archive. If not provided, "url" is used as origin. | |||||
visit_date: Optional date to override the visit date | |||||
incremental: If True, the default, starts from the last snapshot (if any). | incremental: If True, the default, starts from the last snapshot (if any). | ||||
Otherwise, starts from the initial commit of the repository. | Otherwise, starts from the initial commit of the repository. | ||||
temp_directory: The temporary directory to use as root directory for working | |||||
directory computations | |||||
debug: If true, run the loader in debug mode. At the end of the loading, the | |||||
temporary working directory is not cleaned up to ease inspection. | |||||
Defaults to false. | |||||
check_revision: The number of svn commits between checks for hash divergence | |||||
max_content_size: Default max content size allowed | |||||
""" | """ | ||||
super().__init__( | super().__init__( | ||||
storage=storage, | storage=storage, | ||||
logging_class="swh.loader.svn.SvnLoader", | logging_class="swh.loader.svn.SvnLoader", | ||||
max_content_size=max_content_size, | max_content_size=max_content_size, | ||||
) | ) | ||||
# technical svn uri to act on svn repository | # technical svn uri to act on svn repository | ||||
Show All 12 Lines | ): | ||||
self._directories: List[Directory] = [] | self._directories: List[Directory] = [] | ||||
self._revisions: List[Revision] = [] | self._revisions: List[Revision] = [] | ||||
self._snapshot: Optional[Snapshot] = None | self._snapshot: Optional[Snapshot] = None | ||||
# internal state, current visit | # internal state, current visit | ||||
self._last_revision = None | self._last_revision = None | ||||
self._visit_status = "full" | self._visit_status = "full" | ||||
self._load_status = "uneventful" | self._load_status = "uneventful" | ||||
self.visit_date = visit_date | self.visit_date = visit_date | ||||
self.destination_path = destination_path | |||||
self.incremental = incremental | self.incremental = incremental | ||||
self.snapshot: Optional[Snapshot] = None | self.snapshot: Optional[Snapshot] = None | ||||
# state from previous visit | # state from previous visit | ||||
self.latest_snapshot = None | self.latest_snapshot = None | ||||
self.latest_revision = None | self.latest_revision = None | ||||
def pre_cleanup(self): | def pre_cleanup(self): | ||||
"""Cleanup potential dangling files from prior runs (e.g. OOM killed | """Cleanup potential dangling files from prior runs (e.g. OOM killed | ||||
Show All 16 Lines | def cleanup(self): | ||||
self.log.error( | self.log.error( | ||||
"""NOT FOR PRODUCTION - debug flag activated | """NOT FOR PRODUCTION - debug flag activated | ||||
Local repository not cleaned up for investigation: %s""", | Local repository not cleaned up for investigation: %s""", | ||||
self.svnrepo.local_url.decode("utf-8"), | self.svnrepo.local_url.decode("utf-8"), | ||||
) | ) | ||||
return | return | ||||
self.svnrepo.clean_fs() | self.svnrepo.clean_fs() | ||||
def swh_revision_hash_tree_at_svn_revision(self, revision): | def swh_revision_hash_tree_at_svn_revision(self, revision: int) -> bytes: | ||||
"""Compute and return the hash tree at a given svn revision. | """Compute and return the hash tree at a given svn revision. | ||||
Args: | Args: | ||||
rev (int): the svn revision we want to check | rev: the svn revision we want to check | ||||
Returns: | Returns: | ||||
The hash tree directory as bytes. | The hash tree directory as bytes. | ||||
""" | """ | ||||
assert self.svnrepo is not None | |||||
local_dirname, local_url = self.svnrepo.export_temporary(revision) | local_dirname, local_url = self.svnrepo.export_temporary(revision) | ||||
h = from_disk.Directory.from_disk(path=local_url).hash | h = from_disk.Directory.from_disk(path=local_url).hash | ||||
self.svnrepo.clean_fs(local_dirname) | self.svnrepo.clean_fs(local_dirname) | ||||
return h | return h | ||||
def _latest_snapshot_revision( | def _latest_snapshot_revision( | ||||
self, origin_url: str, | self, origin_url: str, | ||||
) -> Optional[Tuple[Snapshot, Revision]]: | ) -> Optional[Tuple[Snapshot, Revision]]: | ||||
Show All 23 Lines | ) -> Optional[Tuple[Snapshot, Revision]]: | ||||
return None | return None | ||||
swh_id = branch.target | swh_id = branch.target | ||||
revision = storage.revision_get([swh_id])[0] | revision = storage.revision_get([swh_id])[0] | ||||
if not revision: | if not revision: | ||||
return None | return None | ||||
return latest_snapshot, revision | return latest_snapshot, revision | ||||
def build_swh_revision(self, rev, commit, dir_id, parents): | def build_swh_revision( | ||||
self, rev: int, commit: Dict, dir_id: bytes, parents: Sequence[bytes] | |||||
) -> Revision: | |||||
"""Build the swh revision dictionary. | """Build the swh revision dictionary. | ||||
This adds: | This adds: | ||||
- the `'synthetic`' flag to true | - the `'synthetic`' flag to true | ||||
- the '`extra_headers`' containing the repository's uuid and the | - the '`extra_headers`' containing the repository's uuid and the | ||||
svn revision number. | svn revision number. | ||||
Args: | Args: | ||||
rev (int): the svn revision number | rev: the svn revision number | ||||
commit (dict): the commit data: revision id, date, author, and message | commit: the commit data: revision id, date, author, and message | ||||
dir_id (bytes): the upper tree's hash identifier | dir_id: the upper tree's hash identifier | ||||
parents ([bytes]): the parents' identifiers | parents: the parents' identifiers | ||||
Returns: | Returns: | ||||
The swh revision corresponding to the svn revision. | The swh revision corresponding to the svn revision. | ||||
""" | """ | ||||
assert self.svnrepo is not None | |||||
return converters.build_swh_revision( | return converters.build_swh_revision( | ||||
rev, commit, self.svnrepo.uuid, dir_id, parents | rev, commit, self.svnrepo.uuid, dir_id, parents | ||||
) | ) | ||||
def check_history_not_altered( | def check_history_not_altered( | ||||
self, svnrepo, revision_start: int, swh_rev: Revision | self, svnrepo: SvnRepo, revision_start: int, swh_rev: Revision | ||||
) -> bool: | ) -> bool: | ||||
"""Given a svn repository, check if the history was modified in between visits. | """Given a svn repository, check if the history was modified in between visits. | ||||
""" | """ | ||||
revision_id = swh_rev.id | revision_id = swh_rev.id | ||||
parents = swh_rev.parents | parents = swh_rev.parents | ||||
hash_data_per_revs = svnrepo.swh_hash_data_at_revision(revision_start) | hash_data_per_revs = svnrepo.swh_hash_data_at_revision(revision_start) | ||||
▲ Show 20 Lines • Show All 70 Lines • ▼ Show 20 Lines | def start_from(self) -> Tuple[int, int, Dict[int, Tuple[bytes, ...]]]: | ||||
"Processing revisions [%s-%s] for %s", | "Processing revisions [%s-%s] for %s", | ||||
revision_start, | revision_start, | ||||
revision_end, | revision_end, | ||||
self.svnrepo, | self.svnrepo, | ||||
) | ) | ||||
return revision_start, revision_end, revision_parents | return revision_start, revision_end, revision_parents | ||||
def _check_revision_divergence(self, count, rev, dir_id): | def _check_revision_divergence(self, rev: int, dir_id: bytes) -> None: | ||||
"""Check for hash revision computation divergence. | """Check for hash revision computation divergence. | ||||
The Rationale behind this is that svn can trigger unknown | The Rationale behind this is that svn can trigger unknown edge cases (mixed | ||||
edge cases (mixed CRLF, svn properties, etc...). Those are | CRLF, svn properties, etc...). Those are not always easy to spot. Adding a | ||||
not always easy to spot. Adding a check will help in | regular check will help spotting potential missing edge cases. | ||||
spotting missing edge cases. | |||||
Args: | Args: | ||||
count (int): The number of revisions done so far | rev: The actual revision we are computing from | ||||
rev (dict): The actual revision we are computing from | dir_id: The actual directory for the given revision | ||||
dir_id (bytes): The actual directory for the given revision | |||||
Returns: | |||||
False if no hash divergence detected | |||||
Raises | Raises | ||||
ValueError if a hash divergence is detected | ValueError if a hash divergence is detected | ||||
""" | """ | ||||
# hash computation check | |||||
if (self.check_revision != 0 and count % self.check_revision) == 0: | |||||
self.log.debug("Checking hash computations on revision %s...", rev) | self.log.debug("Checking hash computations on revision %s...", rev) | ||||
checked_dir_id = self.swh_revision_hash_tree_at_svn_revision(rev) | checked_dir_id = self.swh_revision_hash_tree_at_svn_revision(rev) | ||||
if checked_dir_id != dir_id: | if checked_dir_id != dir_id: | ||||
err = ( | err = ( | ||||
"Hash tree computation divergence detected " | "Hash tree computation divergence detected " | ||||
"(%s != %s), stopping!" | "(%s != %s), stopping!" | ||||
% ( | % (hashutil.hash_to_hex(dir_id), hashutil.hash_to_hex(checked_dir_id),) | ||||
hashutil.hash_to_hex(dir_id), | |||||
hashutil.hash_to_hex(checked_dir_id), | |||||
) | |||||
) | ) | ||||
raise ValueError(err) | raise ValueError(err) | ||||
def process_svn_revisions( | def process_svn_revisions( | ||||
self, svnrepo, revision_start, revision_end, revision_parents | self, svnrepo, revision_start, revision_end, revision_parents | ||||
) -> Iterator[ | ) -> Iterator[ | ||||
Tuple[List[Content], List[SkippedContent], List[Directory], Revision] | Tuple[List[Content], List[SkippedContent], List[Directory], Revision] | ||||
]: | ]: | ||||
"""Process svn revisions from revision_start to revision_end. | """Process svn revisions from revision_start to revision_end. | ||||
Show All 29 Lines | ]: | ||||
self.log.debug( | self.log.debug( | ||||
"rev: %s, swhrev: %s, dir: %s", | "rev: %s, swhrev: %s, dir: %s", | ||||
rev, | rev, | ||||
hashutil.hash_to_hex(swh_revision.id), | hashutil.hash_to_hex(swh_revision.id), | ||||
hashutil.hash_to_hex(dir_id), | hashutil.hash_to_hex(dir_id), | ||||
) | ) | ||||
if self.check_revision: | if ( | ||||
self._check_revision_divergence(count, rev, dir_id) | self.check_revision | ||||
and self.check_revision != 0 | |||||
and count % self.check_revision == 0 | |||||
): | |||||
ardumont: Now that the post_load actually execute that check as the last step, i'm wondering whether we… | |||||
Done Inline Actions@vlorentz regarding your previous question, here is the check if you want to follow through from there ;) ardumont: @vlorentz regarding your previous question, here is the check if you want to follow through… | |||||
self._check_revision_divergence(rev, dir_id) | |||||
if nextrev: | if nextrev: | ||||
revision_parents[nextrev] = [swh_revision.id] | revision_parents[nextrev] = [swh_revision.id] | ||||
yield _contents, _skipped_contents, _directories, swh_revision | yield _contents, _skipped_contents, _directories, swh_revision | ||||
def prepare_origin_visit(self): | def prepare_origin_visit(self): | ||||
self.origin = Origin(url=self.origin_url if self.origin_url else self.svn_url) | self.origin = Origin(url=self.origin_url if self.origin_url else self.svn_url) | ||||
def prepare(self): | def prepare(self): | ||||
latest_snapshot_revision = self._latest_snapshot_revision(self.origin_url) | latest_snapshot_revision = self._latest_snapshot_revision(self.origin_url) | ||||
if latest_snapshot_revision: | if latest_snapshot_revision: | ||||
self.latest_snapshot, self.latest_revision = latest_snapshot_revision | self.latest_snapshot, self.latest_revision = latest_snapshot_revision | ||||
if self.destination_path: | |||||
local_dirname = self.destination_path | |||||
else: | |||||
local_dirname = tempfile.mkdtemp( | local_dirname = tempfile.mkdtemp( | ||||
suffix="-%s" % os.getpid(), | suffix="-%s" % os.getpid(), | ||||
prefix=TEMPORARY_DIR_PREFIX_PATTERN, | prefix=TEMPORARY_DIR_PREFIX_PATTERN, | ||||
dir=self.temp_directory, | dir=self.temp_directory, | ||||
) | ) | ||||
try: | try: | ||||
self.svnrepo = SvnRepo( | self.svnrepo = SvnRepo( | ||||
self.svn_url, self.origin_url, local_dirname, self.max_content_size | self.svn_url, self.origin_url, local_dirname, self.max_content_size | ||||
) | ) | ||||
except SubversionException as e: | except SubversionException as e: | ||||
error_msgs = [ | error_msgs = [ | ||||
"Unable to connect to a repository at URL", | "Unable to connect to a repository at URL", | ||||
▲ Show 20 Lines • Show All 123 Lines • ▼ Show 20 Lines | def load_status(self): | ||||
"status": self._load_status, | "status": self._load_status, | ||||
} | } | ||||
def visit_status(self): | def visit_status(self): | ||||
return self._visit_status | return self._visit_status | ||||
def post_load(self, success: bool = True) -> None: | def post_load(self, success: bool = True) -> None: | ||||
if success and self._last_revision is not None: | if success and self._last_revision is not None: | ||||
# force revision divergence check | |||||
self.check_revision = 1 | |||||
# check if the reconstructed filesystem for the last loaded revision is | # check if the reconstructed filesystem for the last loaded revision is | ||||
# consistent with the one obtained with a svn export operation, if it is | # consistent with the one obtained with a svn export operation. If it is not | ||||
# not an exception will be raised to report the issue and mark the visit | # the case, an exception will be raised to report the issue and mark the | ||||
# as partial | # visit as partial | ||||
self._check_revision_divergence( | self._check_revision_divergence( | ||||
self.check_revision, | |||||
int(dict(self._last_revision.extra_headers)[b"svn_revision"]), | int(dict(self._last_revision.extra_headers)[b"svn_revision"]), | ||||
self._last_revision.directory, | self._last_revision.directory, | ||||
) | ) | ||||
class SvnLoaderFromDumpArchive(SvnLoader): | class SvnLoaderFromDumpArchive(SvnLoader): | ||||
"""Uncompress an archive containing an svn dump, mount the svn dump as | """Uncompress an archive containing an svn dump, mount the svn dump as a local svn | ||||
an svn repository and load said repository. | repository and load that repository. | ||||
""" | """ | ||||
def __init__( | def __init__( | ||||
self, | self, | ||||
storage: StorageInterface, | storage: StorageInterface, | ||||
url: str, | url: str, | ||||
archive_path: str, | archive_path: str, | ||||
origin_url: Optional[str] = None, | origin_url: Optional[str] = None, | ||||
destination_path: Optional[str] = None, | |||||
swh_revision: Optional[str] = None, | |||||
incremental: bool = False, | incremental: bool = False, | ||||
visit_date: Optional[datetime] = None, | visit_date: Optional[datetime] = None, | ||||
temp_directory: str = "/tmp", | temp_directory: str = "/tmp", | ||||
debug: bool = False, | debug: bool = False, | ||||
check_revision: int = 0, | check_revision: int = 0, | ||||
max_content_size: Optional[int] = None, | max_content_size: Optional[int] = None, | ||||
): | ): | ||||
super().__init__( | super().__init__( | ||||
storage=storage, | storage=storage, | ||||
url=url, | url=url, | ||||
origin_url=origin_url, | origin_url=origin_url, | ||||
destination_path=destination_path, | |||||
swh_revision=swh_revision, | |||||
incremental=incremental, | incremental=incremental, | ||||
visit_date=visit_date, | visit_date=visit_date, | ||||
temp_directory=temp_directory, | temp_directory=temp_directory, | ||||
debug=debug, | debug=debug, | ||||
check_revision=check_revision, | check_revision=check_revision, | ||||
max_content_size=max_content_size, | max_content_size=max_content_size, | ||||
) | ) | ||||
self.archive_path = archive_path | self.archive_path = archive_path | ||||
Show All 19 Lines | def cleanup(self): | ||||
"Clean up temporary directory dump %s for project %s", | "Clean up temporary directory dump %s for project %s", | ||||
self.temp_dir, | self.temp_dir, | ||||
os.path.basename(self.repo_path), | os.path.basename(self.repo_path), | ||||
) | ) | ||||
shutil.rmtree(self.temp_dir) | shutil.rmtree(self.temp_dir) | ||||
class SvnLoaderFromRemoteDump(SvnLoader): | class SvnLoaderFromRemoteDump(SvnLoader): | ||||
""" | """Create a subversion repository dump out of a remote svn repository (using the | ||||
Create a subversion repository dump using the svnrdump utility, | svnrdump utility). Then, mount the repository locally and load that repository. | ||||
mount it locally and load the repository from it. | |||||
""" | """ | ||||
def __init__( | def __init__( | ||||
self, | self, | ||||
storage: StorageInterface, | storage: StorageInterface, | ||||
url: str, | url: str, | ||||
origin_url: Optional[str] = None, | origin_url: Optional[str] = None, | ||||
destination_path: Optional[str] = None, | |||||
swh_revision: Optional[str] = None, | |||||
incremental: bool = True, | incremental: bool = True, | ||||
visit_date: Optional[datetime] = None, | visit_date: Optional[datetime] = None, | ||||
temp_directory: str = "/tmp", | temp_directory: str = "/tmp", | ||||
debug: bool = False, | debug: bool = False, | ||||
check_revision: int = 0, | check_revision: int = 0, | ||||
max_content_size: Optional[int] = None, | max_content_size: Optional[int] = None, | ||||
): | ): | ||||
super().__init__( | super().__init__( | ||||
storage=storage, | storage=storage, | ||||
url=url, | url=url, | ||||
origin_url=origin_url, | origin_url=origin_url, | ||||
destination_path=destination_path, | |||||
swh_revision=swh_revision, | |||||
incremental=incremental, | incremental=incremental, | ||||
visit_date=visit_date, | visit_date=visit_date, | ||||
temp_directory=temp_directory, | temp_directory=temp_directory, | ||||
debug=debug, | debug=debug, | ||||
check_revision=check_revision, | check_revision=check_revision, | ||||
max_content_size=max_content_size, | max_content_size=max_content_size, | ||||
) | ) | ||||
self.temp_dir = tempfile.mkdtemp(dir=self.temp_directory) | self.temp_dir = tempfile.mkdtemp(dir=self.temp_directory) | ||||
Show All 14 Lines | def get_last_loaded_svn_rev(self, svn_url: str) -> int: | ||||
if latest_snapshot_revision: | if latest_snapshot_revision: | ||||
_, latest_revision = latest_snapshot_revision | _, latest_revision = latest_snapshot_revision | ||||
latest_revision_headers = dict(latest_revision.extra_headers) | latest_revision_headers = dict(latest_revision.extra_headers) | ||||
svn_revision = int(latest_revision_headers[b"svn_revision"]) | svn_revision = int(latest_revision_headers[b"svn_revision"]) | ||||
except Exception: | except Exception: | ||||
pass | pass | ||||
return svn_revision | return svn_revision | ||||
def dump_svn_revisions(self, svn_url, last_loaded_svn_rev=-1): | def dump_svn_revisions(self, svn_url: str, last_loaded_svn_rev: int = -1) -> str: | ||||
""" | """Generate a subversion dump file using the svnrdump tool. If the svnrdump | ||||
Generate a subversion dump file using the svnrdump tool. | command failed somehow, the produced dump file is analyzed to determine if a | ||||
If the svnrdump command failed somehow, | partial loading is still feasible. | ||||
the produced dump file is analyzed to determine if a partial | |||||
loading is still feasible. | |||||
Raises: | Raises: | ||||
NotFound when the repository is no longer found at url | NotFound when the repository is no longer found at url | ||||
Returns: | |||||
The dump_path of the repository mounted | |||||
""" | """ | ||||
# Build the svnrdump command line | # Build the svnrdump command line | ||||
svnrdump_cmd = ["svnrdump", "dump", svn_url] | svnrdump_cmd = ["svnrdump", "dump", svn_url] | ||||
# Launch the svnrdump command while capturing stderr as | # Launch the svnrdump command while capturing stderr as | ||||
# successfully dumped revision numbers are printed to it | # successfully dumped revision numbers are printed to it | ||||
dump_temp_dir = tempfile.mkdtemp(dir=self.temp_dir) | dump_temp_dir = tempfile.mkdtemp(dir=self.temp_dir) | ||||
dump_name = "".join(c for c in svn_url if c.isalnum()) | dump_name = "".join(c for c in svn_url if c.isalnum()) | ||||
▲ Show 20 Lines • Show All 121 Lines • Show Last 20 Lines |
Now that the post_load actually execute that check as the last step, i'm wondering whether we want that check to stay at all.
@anlambert btw ^ (just worth mentioning, if we decide to drop it, it will be in another diff)