diff --git a/swh/loader/svn/replay.py b/swh/loader/svn/replay.py index 81885ef..8075c92 100644 --- a/swh/loader/svn/replay.py +++ b/swh/loader/svn/replay.py @@ -1,882 +1,885 @@ # Copyright (C) 2016-2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information """Remote Access client to svn server. """ from __future__ import annotations import codecs from collections import defaultdict from dataclasses import dataclass, field from distutils.dir_util import copy_tree from itertools import chain import logging import os import shutil import tempfile from typing import ( TYPE_CHECKING, Any, BinaryIO, Callable, Dict, List, Optional, Set, Tuple, Union, cast, ) import click from subvertpy import SubversionException, properties from subvertpy.ra import Auth, RemoteAccess, get_username_provider from swh.model import from_disk, hashutil from swh.model.from_disk import DiskBackedContent from swh.model.model import Content, Directory, SkippedContent if TYPE_CHECKING: from swh.loader.svn.svn import SvnRepo from swh.loader.svn.utils import ( is_recursive_external, parse_external_definition, svn_urljoin, ) logger = logging.getLogger(__name__) def _ra_codecs_error_handler(e: UnicodeError) -> Tuple[Union[str, bytes], int]: """Subvertpy may fail to decode to utf-8 the user svn properties. As they are not used by the loader, return an empty string instead of the decoded content. Args: e: exception raised during the svn properties decoding. """ return "", cast(UnicodeDecodeError, e).end DEFAULT_FLAG = 0 EXEC_FLAG = 1 NOEXEC_FLAG = 2 SVN_PROPERTY_EOL = "svn:eol-style" class FileEditor: """File Editor in charge of updating file on disk and memory objects.""" __slots__ = [ "directory", "path", "fullpath", "executable", "link", "state", "svnrepo", "editor", ] def __init__( self, directory: from_disk.Directory, rootpath: bytes, path: bytes, svnrepo: SvnRepo, ): self.directory = directory self.path = path self.fullpath = os.path.join(rootpath, path) self.svnrepo = svnrepo self.editor: Editor = svnrepo.swhreplay.editor def change_prop(self, key: str, value: str) -> None: if self.editor.debug: logger.debug( "Setting property %s to value %s on path %s", key, value, self.path ) def apply_textdelta(self, base_checksum) -> Callable[[Any, bytes, BinaryIO], None]: if self.editor.debug: logger.debug("Applying textdelta to file %s", self.path) # do not apply textdelta, file will be fully exported when closing the editor return lambda *args: None def close(self) -> None: """When done with a file added or modified in the current replayed revision, we export it to disk and update the from_disk model. """ if self.editor.debug: logger.debug("Closing file %s", self.path) if self.path not in self.editor.external_paths: # export file to disk if its path does not match an external self.svnrepo.export( os.path.join(self.svnrepo.remote_url, os.fsdecode(self.path)), to=self.fullpath, rev=self.editor.revnum, peg_rev=self.editor.revnum, ignore_keywords=True, overwrite=True, ) # And now compute file's checksums self.directory[self.path] = from_disk.Content.from_file(path=self.fullpath) ExternalDefinition = Tuple[str, Optional[int], bool] @dataclass class DirState: """Persists some directory states (eg. externals) across revisions while replaying them.""" externals: Dict[str, List[ExternalDefinition]] = field(default_factory=dict) """Map a path in the directory to a list of (external_url, revision, relative_url) targeting it""" externals_paths: Set[bytes] = field(default_factory=set) """Keep track of all external paths reachable from the directory""" class DirEditor: """Directory Editor in charge of updating directory hashes computation. This implementation includes empty folder in the hash computation. """ __slots__ = [ "directory", "rootpath", "path", "dir_states", "svnrepo", "editor", "externals", ] def __init__( self, directory: from_disk.Directory, rootpath: bytes, path: bytes, dir_states: Dict[bytes, DirState], svnrepo: SvnRepo, ): self.directory = directory self.rootpath = rootpath self.path = path # build directory on init os.makedirs(rootpath, exist_ok=True) self.dir_states = dir_states self.svnrepo = svnrepo self.editor = svnrepo.swhreplay.editor self.externals: Dict[str, List[ExternalDefinition]] = {} def remove_child(self, path: bytes) -> None: """Remove a path from the current objects. The path can be resolved as link, file or directory. This function takes also care of removing the link between the child and the parent. Args: path: to remove from the current objects. """ if path in self.directory: entry_removed = self.directory[path] del self.directory[path] fpath = os.path.join(self.rootpath, path) if isinstance(entry_removed, from_disk.Directory): shutil.rmtree(fpath) else: os.remove(fpath) def open_directory(self, path: str, *args) -> DirEditor: """Updating existing directory.""" if self.editor.debug: logger.debug("Opening directory %s", path) return DirEditor( self.directory, rootpath=self.rootpath, path=os.fsencode(path), dir_states=self.dir_states, svnrepo=self.svnrepo, ) def add_directory( self, path: str, copyfrom_path: Optional[str] = None, copyfrom_rev: int = -1 ) -> DirEditor: """Adding a new directory.""" if self.editor.debug: logger.debug( "Adding directory %s, copyfrom_path = %s, copyfrom_rev = %s", path, copyfrom_path, copyfrom_rev, ) path_bytes = os.fsencode(path) fullpath = os.path.join(self.rootpath, path_bytes) os.makedirs(fullpath, exist_ok=True) if copyfrom_rev == -1: if path_bytes and path_bytes not in self.directory: self.dir_states[path_bytes] = DirState() self.directory[path_bytes] = from_disk.Directory() else: url = svn_urljoin(self.svnrepo.remote_url, copyfrom_path) self.remove_child(path_bytes) self.svnrepo.export( url, to=fullpath, peg_rev=copyfrom_rev, ignore_keywords=True, overwrite=True, ) self.directory[path_bytes] = from_disk.Directory.from_disk(path=fullpath) return DirEditor( self.directory, self.rootpath, path_bytes, self.dir_states, svnrepo=self.svnrepo, ) def open_file(self, path: str, *args) -> FileEditor: """Updating existing file.""" if self.editor.debug: logger.debug("Opening file %s", path) path_bytes = os.fsencode(path) self.directory[path_bytes] = from_disk.Content() return FileEditor( self.directory, rootpath=self.rootpath, path=path_bytes, svnrepo=self.svnrepo, ) def add_file( self, path: str, copyfrom_path: Optional[str] = None, copyfrom_rev: int = -1 ) -> FileEditor: """Creating a new file.""" if self.editor.debug: logger.debug( "Adding file %s, copyfrom_path = %s, copyfrom_rev = %s", path, copyfrom_path, copyfrom_rev, ) path_bytes = os.fsencode(path) fullpath = os.path.join(self.rootpath, path_bytes) if copyfrom_rev == -1: self.directory[path_bytes] = from_disk.Content() else: url = svn_urljoin(self.svnrepo.remote_url, copyfrom_path) self.remove_child(path_bytes) self.svnrepo.export( url, to=fullpath, peg_rev=copyfrom_rev, ignore_keywords=True, overwrite=True, ) self.directory[path_bytes] = from_disk.Content.from_file(path=fullpath) return FileEditor( self.directory, self.rootpath, path_bytes, svnrepo=self.svnrepo, ) def change_prop(self, key: str, value: str) -> None: """Change property callback on directory.""" if key == properties.PROP_EXTERNALS: logger.debug( "Setting '%s' property with value '%s' on path %s", key, value, self.path, ) self.externals = defaultdict(list) if value is not None: try: # externals are set on that directory path, parse and store them # for later processing in the close method for external in value.split("\n"): external = external.rstrip("\r") # skip empty line or comment if not external or external.startswith("#"): continue ( path, external_url, revision, relative_url, ) = parse_external_definition( external, os.fsdecode(self.path), self.svnrepo.origin_url ) self.externals[path].append( (external_url, revision, relative_url) ) except ValueError: logger.debug( "Failed to parse external: %s\n" "Externals defined on path %s will not be processed", external, self.path, ) # as the official subversion client, do not process externals in case # of parsing error self.externals = {} if not self.externals: # externals might have been unset on that directory path, # remove associated paths from the reconstructed filesystem externals = self.dir_states[self.path].externals for path in externals.keys(): self.remove_external_path(os.fsencode(path)) self.dir_states[self.path].externals = {} def delete_entry(self, path: str, revision: int) -> None: """Remove a path.""" if self.editor.debug: logger.debug("Deleting directory entry %s", path) path_bytes = os.fsencode(path) fullpath = os.path.join(self.rootpath, path_bytes) if os.path.isdir(fullpath): # remove all external paths associated to the removed directory # (we cannot simply remove a root external directory as externals # paths associated to ancestor directories can overlap) for external_path in self.dir_states[path_bytes].externals_paths: self.remove_external_path( external_path, root_path=path_bytes, remove_subpaths=False, force=True, ) if path_bytes not in self.editor.external_paths: self.remove_child(path_bytes) elif os.path.isdir(fullpath): # versioned and external paths can overlap so we need to iterate on # all subpaths to check which ones to remove for root, dirs, files in os.walk(fullpath): for p in chain(dirs, files): full_repo_path = os.path.join(root, p) repo_path = full_repo_path.replace(self.rootpath + b"/", b"") if repo_path not in self.editor.external_paths: self.remove_child(repo_path) def close(self): """Function called when we finish processing a repository. SVN external definitions are processed by it. """ if self.editor.debug: logger.debug("Closing directory %s", self.path) prev_externals = self.dir_states[self.path].externals if self.externals: # externals definition list might have changed in the current replayed # revision, we need to determine if some were removed and delete the # associated paths externals = self.externals prev_externals_set = { (path, url, rev) for path in prev_externals.keys() for (url, rev, _) in prev_externals[path] } externals_set = { (path, url, rev) for path in externals.keys() for (url, rev, _) in externals[path] } old_externals = prev_externals_set - externals_set for path, _, _ in old_externals: self.remove_external_path(os.fsencode(path)) else: # some external paths might have been removed in the current replayed # revision by a delete operation on an overlapping versioned path so we # need to restore them externals = prev_externals # For each external, try to export it in reconstructed filesystem for path, externals_def in externals.items(): for i, external in enumerate(externals_def): external_url, revision, relative_url = external self.process_external( path, external_url, revision, relative_url, remove_target_path=i == 0, ) # backup externals in directory state if self.externals: self.dir_states[self.path].externals = self.externals # do operations below only when closing the root directory if self.path == b"": self.svnrepo.has_relative_externals = any( relative_url for (_, relative_url) in self.editor.valid_externals.values() ) self.svnrepo.has_recursive_externals = any( is_recursive_external( self.svnrepo.origin_url, os.fsdecode(path), external_path, external_url, ) for path, dir_state in self.dir_states.items() for external_path in dir_state.externals.keys() for (external_url, _, _) in dir_state.externals[external_path] ) if self.svnrepo.has_recursive_externals: # If the repository has recursive externals, we stop processing # externals and remove those already exported, # We will then ignore externals when exporting the revision to # check for divergence with the reconstructed filesystem. for external_path in list(self.editor.external_paths): self.remove_external_path(external_path, force=True) def process_external( self, path: str, external_url: str, revision: Optional[int], relative_url: bool, remove_target_path: bool = True, ) -> None: external = (external_url, revision, relative_url) dest_path = os.fsencode(path) dest_fullpath = os.path.join(self.path, dest_path) prev_externals = self.dir_states[self.path].externals if ( path in prev_externals and external in prev_externals[path] and dest_fullpath in self.directory ): # external already exported, nothing to do return if is_recursive_external( self.svnrepo.origin_url, os.fsdecode(self.path), path, external_url ): # recursive external, skip it return logger.debug( "Exporting external %s%s to path %s", external_url, f"@{revision}" if revision else "", dest_fullpath, ) if external not in self.editor.externals_cache: try: # try to export external in a temporary path, destination path could # be versioned and must be overridden only if the external URL is # still valid temp_dir = os.fsencode( tempfile.mkdtemp(dir=self.editor.externals_cache_dir) ) temp_path = os.path.join(temp_dir, dest_path) os.makedirs(b"/".join(temp_path.split(b"/")[:-1]), exist_ok=True) if external_url not in self.editor.dead_externals: url = external_url.rstrip("/") origin_url = self.svnrepo.origin_url.rstrip("/") if ( url.startswith(origin_url + "/") and not self.svnrepo.has_relative_externals ): url = url.replace(origin_url, self.svnrepo.remote_url) self.svnrepo.export( url, to=temp_path, peg_rev=revision, ignore_keywords=True, ) self.editor.externals_cache[external] = temp_path except SubversionException as se: # external no longer available (404) logger.debug(se) self.editor.dead_externals.add(external_url) else: temp_path = self.editor.externals_cache[external] # subversion export will always create the subdirectories of the external # path regardless the validity of the remote URL dest_path_split = dest_path.split(b"/") current_path = self.path self.add_directory(os.fsdecode(current_path)) for subpath in dest_path_split[:-1]: current_path = os.path.join(current_path, subpath) self.add_directory(os.fsdecode(current_path)) if os.path.exists(temp_path): # external successfully exported if remove_target_path: # remove previous path in from_disk model self.remove_external_path(dest_path, remove_subpaths=False) # mark external as valid self.editor.valid_externals[dest_fullpath] = ( external_url, relative_url, ) # copy exported path to reconstructed filesystem fullpath = os.path.join(self.rootpath, dest_fullpath) if os.path.isfile(temp_path): if os.path.islink(fullpath): # remove destination file if it is a link os.remove(fullpath) shutil.copy(os.fsdecode(temp_path), os.fsdecode(fullpath)) self.directory[dest_fullpath] = from_disk.Content.from_file( path=fullpath ) else: self.add_directory(os.fsdecode(dest_fullpath)) # copy_tree needs sub-directories to exist in destination for root, dirs, files in os.walk(temp_path): for dir in dirs: temp_dir_fullpath = os.path.join(root, dir) if os.path.islink(temp_dir_fullpath): # do not create folder if it's a link or copy_tree will fail continue subdir = temp_dir_fullpath.replace(temp_path + b"/", b"") self.add_directory( os.fsdecode(os.path.join(dest_fullpath, subdir)) ) copy_tree( os.fsdecode(temp_path), os.fsdecode(fullpath), preserve_symlinks=True, ) # TODO: replace code above by the line below once we use Python >= 3.8 in production # noqa # shutil.copytree(temp_path, fullpath, symlinks=True, dirs_exist_ok=True) # noqa self.directory[dest_fullpath] = from_disk.Directory.from_disk( path=fullpath ) # update set of external paths reachable from the directory external_paths = set() dest_path_part = dest_path.split(b"/") for i in range(1, len(dest_path_part) + 1): external_paths.add(b"/".join(dest_path_part[:i])) for root, dirs, files in os.walk(temp_path): external_paths.update( [ os.path.join( dest_path, os.path.join(root, p).replace(temp_path, b"").strip(b"/"), ) for p in chain(dirs, files) ] ) self.dir_states[self.path].externals_paths.update(external_paths) for external_path in external_paths: self.editor.external_paths[os.path.join(self.path, external_path)] += 1 # ensure hash update for the directory with externals set self.directory[self.path].update_hash(force=True) def remove_external_path( self, external_path: bytes, remove_subpaths: bool = True, force: bool = False, root_path: Optional[bytes] = None, ) -> None: """Remove a previously exported SVN external path from the reconstructed filesystem. """ path = root_path if root_path else self.path fullpath = os.path.join(path, external_path) + if self.editor.debug: + logger.debug("Removing external path %s", fullpath) + # decrement number of references for external path when we really remove it # (when remove_subpaths is False, we just cleanup the external path before # copying exported paths in it) if force or (fullpath in self.editor.external_paths and remove_subpaths): self.editor.external_paths[fullpath] -= 1 if ( fullpath in self.editor.external_paths and self.editor.external_paths[fullpath] == 0 ): self.remove_child(fullpath) self.editor.external_paths.pop(fullpath, None) self.editor.valid_externals.pop(fullpath, None) for path in list(self.editor.external_paths): if path.startswith(fullpath + b"/"): self.editor.external_paths[path] -= 1 if self.editor.external_paths[path] == 0: self.editor.external_paths.pop(path) if remove_subpaths: subpath_split = fullpath.split(b"/")[:-1] for i in reversed(range(1, len(subpath_split) + 1)): # delete external sub-directory only if it is not versioned subpath = b"/".join(subpath_split[0:i]) try: self.svnrepo.client.info( svn_urljoin(self.svnrepo.remote_url, os.fsdecode(subpath)), peg_revision=self.editor.revnum, revision=self.editor.revnum, ) except SubversionException: self.remove_child(subpath) else: break try: # externals can overlap with versioned files so we must restore # them after removing the path above dest_path = os.path.join(self.rootpath, fullpath) self.svnrepo.client.export( svn_urljoin(self.svnrepo.remote_url, os.fsdecode(fullpath)), to=dest_path, peg_rev=self.editor.revnum, ignore_keywords=True, ) if os.path.isfile(dest_path) or os.path.islink(dest_path): self.directory[fullpath] = from_disk.Content.from_file(path=dest_path) else: self.directory[fullpath] = from_disk.Directory.from_disk(path=dest_path) except SubversionException: pass class Editor: """Editor in charge of replaying svn events and computing objects along. This implementation accounts for empty folder during hash computations. """ def __init__( self, rootpath: bytes, directory: from_disk.Directory, svnrepo: SvnRepo, temp_dir: str, debug: bool = False, ): self.rootpath = rootpath self.directory = directory self.dir_states: Dict[bytes, DirState] = defaultdict(DirState) self.external_paths: Dict[bytes, int] = defaultdict(int) self.valid_externals: Dict[bytes, Tuple[str, bool]] = {} self.dead_externals: Set[str] = set() self.externals_cache_dir = tempfile.mkdtemp(dir=temp_dir) self.externals_cache: Dict[ExternalDefinition, bytes] = {} self.svnrepo = svnrepo self.revnum = None self.debug = debug def set_target_revision(self, revnum) -> None: self.revnum = revnum def abort(self) -> None: pass def close(self) -> None: pass def open_root(self, base_revnum: int) -> DirEditor: return DirEditor( self.directory, rootpath=self.rootpath, path=b"", dir_states=self.dir_states, svnrepo=self.svnrepo, ) class Replay: """Replay class.""" def __init__( self, conn: RemoteAccess, rootpath: bytes, svnrepo: SvnRepo, temp_dir: str, directory: Optional[from_disk.Directory] = None, debug: bool = False, ): self.conn = conn self.rootpath = rootpath if directory is None: directory = from_disk.Directory() self.directory = directory self.editor = Editor( rootpath=rootpath, directory=directory, svnrepo=svnrepo, temp_dir=temp_dir, debug=debug, ) def replay(self, rev: int, low_water_mark: int) -> from_disk.Directory: """Replay svn actions between rev and rev+1. This method updates in place the self.editor.directory, as well as the filesystem. Returns: The updated root directory """ codecs.register_error("strict", _ra_codecs_error_handler) self.conn.replay(rev, low_water_mark, self.editor) codecs.register_error("strict", codecs.strict_errors) return self.editor.directory def compute_objects( self, rev: int, low_water_mark: int ) -> Tuple[List[Content], List[SkippedContent], List[Directory]]: """Compute objects added or modified at revisions rev. Expects the state to be at previous revision's objects. Args: rev: The revision to start the replay from. Returns: The updated objects between rev and rev+1. Beware that this mutates the filesystem at rootpath accordingly. """ self.replay(rev, low_water_mark) contents: List[Content] = [] skipped_contents: List[SkippedContent] = [] directories: List[Directory] = [] for obj_node in self.directory.collect(): obj = obj_node.to_model() # type: ignore obj_type = obj.object_type if obj_type in (Content.object_type, DiskBackedContent.object_type): contents.append(obj.with_data()) elif obj_type == SkippedContent.object_type: skipped_contents.append(obj) elif obj_type == Directory.object_type: directories.append(obj) else: assert False, obj_type return contents, skipped_contents, directories @click.command() @click.option("--local-url", default="/tmp", help="local svn working copy") @click.option( "--svn-url", default="file:///home/storage/svn/repos/pkg-fox", help="svn repository's url.", ) @click.option( "--revision-start", default=1, type=click.INT, help="svn repository's starting revision.", ) @click.option( "--revision-end", default=-1, type=click.INT, help="svn repository's ending revision.", ) @click.option( "--debug/--nodebug", default=True, help="Indicates if the server should run in debug mode.", ) @click.option( "--cleanup/--nocleanup", default=True, help="Indicates whether to cleanup disk when done or not.", ) def main(local_url, svn_url, revision_start, revision_end, debug, cleanup): """Script to present how to use Replay class.""" conn = RemoteAccess(svn_url.encode("utf-8"), auth=Auth([get_username_provider()])) os.makedirs(local_url, exist_ok=True) rootpath = tempfile.mkdtemp( prefix=local_url, suffix="-" + os.path.basename(svn_url) ) rootpath = os.fsencode(rootpath) # Do not go beyond the repository's latest revision revision_end_max = conn.get_latest_revnum() if revision_end == -1: revision_end = revision_end_max revision_end = min(revision_end, revision_end_max) try: replay = Replay(conn, rootpath) for rev in range(revision_start, revision_end + 1): contents, skipped_contents, directories = replay.compute_objects(rev) print( "r%s %s (%s new contents, %s new directories)" % ( rev, hashutil.hash_to_hex(replay.directory.hash), len(contents) + len(skipped_contents), len(directories), ) ) if debug: print("%s" % rootpath.decode("utf-8")) finally: if cleanup: if os.path.exists(rootpath): shutil.rmtree(rootpath) if __name__ == "__main__": main() diff --git a/swh/loader/svn/tests/test_externals.py b/swh/loader/svn/tests/test_externals.py index 2cbf222..1cc8795 100644 --- a/swh/loader/svn/tests/test_externals.py +++ b/swh/loader/svn/tests/test_externals.py @@ -1,1705 +1,1800 @@ # Copyright (C) 2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import pytest from swh.loader.svn.loader import SvnLoader, SvnLoaderFromRemoteDump from swh.loader.svn.utils import svn_urljoin from swh.loader.tests import assert_last_visit_matches, check_snapshot from .utils import CommitChange, CommitChangeType, add_commit, create_repo @pytest.fixture def external_repo_url(tmpdir_factory): # create a repository return create_repo(tmpdir_factory.mktemp("external")) def test_loader_with_valid_svn_externals( swh_storage, repo_url, external_repo_url, tmp_path ): # first commit on external add_commit( external_repo_url, "Create some directories and files in an external repository", [ CommitChange( change_type=CommitChangeType.AddOrUpdate, path="code/hello/hello-world", properties={"svn:executable": "*"}, data=b"#!/bin/bash\necho Hello World !", ), CommitChange( change_type=CommitChangeType.AddOrUpdate, path="foo.sh", properties={"svn:executable": "*"}, data=b"#!/bin/bash\necho foo", ), ], ) # first commit add_commit( repo_url, "Create repository structure.", [ CommitChange( change_type=CommitChangeType.AddOrUpdate, path="branches/", ), CommitChange( change_type=CommitChangeType.AddOrUpdate, path="tags/", ), CommitChange( change_type=CommitChangeType.AddOrUpdate, path="trunk/", ), CommitChange( change_type=CommitChangeType.AddOrUpdate, path="trunk/bar.sh", properties={"svn:executable": "*"}, data=b"#!/bin/bash\necho bar", ), ], ) # second commit add_commit( repo_url, ( "Set svn:externals property on trunk/externals path of repository to load." "One external targets a remote directory and another one a remote file." ), [ CommitChange( change_type=CommitChangeType.AddOrUpdate, path="trunk/externals/", properties={ "svn:externals": ( f"{svn_urljoin(external_repo_url, 'code/hello')} hello\n" f"{svn_urljoin(external_repo_url, 'foo.sh')} foo.sh\n" f"{svn_urljoin(repo_url, 'trunk/bar.sh')} bar.sh" ) }, ), ], ) # first load loader = SvnLoader(swh_storage, repo_url, temp_directory=tmp_path, check_revision=1) assert loader.load() == {"status": "eventful"} assert_last_visit_matches( loader.storage, repo_url, status="full", type="svn", ) check_snapshot(loader.snapshot, loader.storage) # third commit add_commit( repo_url, "Unset svn:externals property on trunk/externals path", [ CommitChange( change_type=CommitChangeType.AddOrUpdate, path="trunk/externals/", properties={"svn:externals": None}, ), ], ) # second load loader = SvnLoader(swh_storage, repo_url, temp_directory=tmp_path, check_revision=1) assert loader.load() == {"status": "eventful"} assert_last_visit_matches( loader.storage, repo_url, status="full", type="svn", ) check_snapshot(loader.snapshot, loader.storage) def test_loader_with_invalid_svn_externals(swh_storage, repo_url, tmp_path, mocker): # first commit add_commit( repo_url, "Create repository structure.", [ CommitChange( change_type=CommitChangeType.AddOrUpdate, path="branches/", ), CommitChange( change_type=CommitChangeType.AddOrUpdate, path="tags/", ), CommitChange( change_type=CommitChangeType.AddOrUpdate, path="trunk/", ), ], ) # second commit add_commit( repo_url, ( "Set svn:externals property on trunk/externals path of repository to load." "The externals URLs are not valid." ), [ CommitChange( change_type=CommitChangeType.AddOrUpdate, path="trunk/externals/", properties={ "svn:externals": ( "file:///tmp/invalid/svn/repo/hello hello\n" "file:///tmp/invalid/svn/repo/foo.sh foo.sh" ) }, ), ], ) loader = SvnLoader(swh_storage, repo_url, temp_directory=tmp_path, check_revision=1) assert loader.load() == {"status": "eventful"} assert_last_visit_matches( loader.storage, repo_url, status="full", type="svn", ) check_snapshot(loader.snapshot, loader.storage) def test_loader_with_valid_externals_modification( swh_storage, repo_url, external_repo_url, tmp_path ): # first commit on external add_commit( external_repo_url, "Create some directories and files in an external repository", [ CommitChange( change_type=CommitChangeType.AddOrUpdate, path="code/hello/hello-world", properties={"svn:executable": "*"}, data=b"#!/bin/bash\necho Hello World !", ), CommitChange( change_type=CommitChangeType.AddOrUpdate, path="code/bar/bar.sh", properties={"svn:executable": "*"}, data=b"#!/bin/bash\necho bar", ), CommitChange( change_type=CommitChangeType.AddOrUpdate, path="foo.sh", properties={"svn:executable": "*"}, data=b"#!/bin/bash\necho foo", ), ], ) # first commit add_commit( repo_url, ("Set svn:externals property on trunk/externals path of repository to load."), [ CommitChange( change_type=CommitChangeType.AddOrUpdate, path="trunk/externals/", properties={ "svn:externals": ( f"{svn_urljoin(external_repo_url, 'code/hello')} src/code/hello\n" # noqa f"{svn_urljoin(external_repo_url, 'foo.sh')} src/foo.sh\n" ) }, ), ], ) # second commit add_commit( repo_url, ( "Modify svn:externals property on trunk/externals path of repository to load." # noqa ), [ CommitChange( change_type=CommitChangeType.AddOrUpdate, path="trunk/externals/", properties={ "svn:externals": ( f"{svn_urljoin(external_repo_url, 'code/bar')} src/code/bar\n" # noqa f"{svn_urljoin(external_repo_url, 'foo.sh')} src/foo.sh\n" ) }, ), ], ) loader = SvnLoader(swh_storage, repo_url, temp_directory=tmp_path, check_revision=1) assert loader.load() == {"status": "eventful"} assert_last_visit_matches( loader.storage, repo_url, status="full", type="svn", ) check_snapshot(loader.snapshot, loader.storage) def test_loader_with_valid_externals_and_versioned_path( swh_storage, repo_url, external_repo_url, tmp_path ): # first commit on external add_commit( external_repo_url, "Create a file in an external repository", [ CommitChange( change_type=CommitChangeType.AddOrUpdate, path="code/script.sh", data=b"#!/bin/bash\necho Hello World !", ), ], ) # first commit add_commit( repo_url, "Add file with same name but different content in main repository", [ CommitChange( change_type=CommitChangeType.AddOrUpdate, path="trunk/script.sh", data=b"#!/bin/bash\necho foo", ), ], ) # second commit add_commit( repo_url, "Add externals targeting the versioned file", [ CommitChange( change_type=CommitChangeType.AddOrUpdate, path="trunk/", properties={ "svn:externals": ( f"{svn_urljoin(external_repo_url, 'code/script.sh')} script.sh" # noqa ) }, ), ], ) # third commit add_commit( repo_url, "Modify the versioned file", [ CommitChange( change_type=CommitChangeType.AddOrUpdate, path="trunk/script.sh", data=b"#!/bin/bash\necho bar", ), ], ) loader = SvnLoader(swh_storage, repo_url, temp_directory=tmp_path, check_revision=1) assert loader.load() == {"status": "eventful"} assert_last_visit_matches( loader.storage, repo_url, status="full", type="svn", ) check_snapshot(loader.snapshot, loader.storage) def test_loader_with_invalid_externals_and_versioned_path( swh_storage, repo_url, tmp_path ): # first commit add_commit( repo_url, "Add file in main repository", [ CommitChange( change_type=CommitChangeType.AddOrUpdate, path="trunk/script.sh", data=b"#!/bin/bash\necho foo", ), ], ) # second commit add_commit( repo_url, "Add invalid externals targeting the versioned file", [ CommitChange( change_type=CommitChangeType.AddOrUpdate, path="trunk/", properties={ "svn:externals": ( "file:///tmp/invalid/svn/repo/code/script.sh script.sh" ) }, ), ], ) loader = SvnLoader(swh_storage, repo_url, temp_directory=tmp_path, check_revision=1) assert loader.load() == {"status": "eventful"} assert_last_visit_matches( loader.storage, repo_url, status="full", type="svn", ) check_snapshot(loader.snapshot, loader.storage) def test_loader_set_externals_then_remove_and_add_as_local( swh_storage, repo_url, external_repo_url, tmp_path ): # first commit on external add_commit( external_repo_url, "Create a file in an external repository", [ CommitChange( change_type=CommitChangeType.AddOrUpdate, path="code/script.sh", data=b"#!/bin/bash\necho Hello World !", ), ], ) # first commit add_commit( repo_url, "Add trunk directory and set externals", [ CommitChange( change_type=CommitChangeType.AddOrUpdate, path="trunk/", properties={ "svn:externals": (f"{svn_urljoin(external_repo_url, 'code')} code") }, ), ], ) # second commit add_commit( repo_url, "Unset externals on trunk and add remote path as local path", [ CommitChange( change_type=CommitChangeType.AddOrUpdate, path="trunk/", properties={"svn:externals": None}, ), CommitChange( change_type=CommitChangeType.AddOrUpdate, path="trunk/code/script.sh", data=b"#!/bin/bash\necho Hello World !", ), ], ) loader = SvnLoader(swh_storage, repo_url, temp_directory=tmp_path, check_revision=1) assert loader.load() == {"status": "eventful"} assert_last_visit_matches( loader.storage, repo_url, status="full", type="svn", ) check_snapshot(loader.snapshot, loader.storage) def test_loader_set_invalid_externals_then_remove(swh_storage, repo_url, tmp_path): # first commit add_commit( repo_url, "Add trunk directory and set invalid external", [ CommitChange( change_type=CommitChangeType.AddOrUpdate, path="trunk/", properties={ "svn:externals": "file:///tmp/invalid/svn/repo/code external/code" }, ), ], ) # second commit add_commit( repo_url, "Unset externals on trunk", [ CommitChange( change_type=CommitChangeType.AddOrUpdate, path="trunk/", properties={"svn:externals": None}, ), ], ) loader = SvnLoader(swh_storage, repo_url, temp_directory=tmp_path, check_revision=1) assert loader.load() == {"status": "eventful"} assert_last_visit_matches( loader.storage, repo_url, status="full", type="svn", ) check_snapshot(loader.snapshot, loader.storage) def test_loader_set_externals_with_versioned_file_overlap( swh_storage, repo_url, external_repo_url, tmp_path ): # first commit on external add_commit( external_repo_url, "Create a file in an external repository", [ CommitChange( change_type=CommitChangeType.AddOrUpdate, path="code/script.sh", data=b"#!/bin/bash\necho Hello World !", ), ], ) # first commit add_commit( repo_url, "Add file with same name as in the external repository", [ CommitChange( change_type=CommitChangeType.AddOrUpdate, path="trunk/script.sh", data=b"#!/bin/bash\necho foo", ), ], ) # second commit add_commit( repo_url, "Set external on trunk overlapping versioned file", [ CommitChange( change_type=CommitChangeType.AddOrUpdate, path="trunk/", properties={ "svn:externals": ( f"{svn_urljoin(external_repo_url, 'code/script.sh')} script.sh" ) }, ), ], ) # third commit add_commit( repo_url, "Unset externals on trunk", [ CommitChange( change_type=CommitChangeType.AddOrUpdate, path="trunk/", properties={"svn:externals": None}, ), ], ) loader = SvnLoader(swh_storage, repo_url, temp_directory=tmp_path, check_revision=1) assert loader.load() == {"status": "eventful"} assert_last_visit_matches( loader.storage, repo_url, status="full", type="svn", ) check_snapshot(loader.snapshot, loader.storage) def test_dump_loader_relative_externals_detection( swh_storage, repo_url, external_repo_url, tmp_path ): add_commit( external_repo_url, "Create a file in external repository.", [ CommitChange( change_type=CommitChangeType.AddOrUpdate, path="project1/foo.sh", data=b"#!/bin/bash\necho foo", ), ], ) add_commit( external_repo_url, "Create another file in repository to load.", [ CommitChange( change_type=CommitChangeType.AddOrUpdate, path="project2/bar.sh", data=b"#!/bin/bash\necho bar", ), ], ) external_url = f"{external_repo_url.replace('file://', '//')}/project2/bar.sh" add_commit( repo_url, "Set external relative to URL scheme in repository to load", [ CommitChange( change_type=CommitChangeType.AddOrUpdate, path="project1/", properties={"svn:externals": (f"{external_url} bar.sh")}, ), ], ) loader = SvnLoaderFromRemoteDump( swh_storage, repo_url, temp_directory=tmp_path, check_revision=1 ) assert loader.load() == {"status": "eventful"} assert_last_visit_matches( loader.storage, repo_url, status="full", type="svn", ) check_snapshot(loader.snapshot, loader.storage) assert loader.svnrepo.has_relative_externals add_commit( repo_url, "Unset external in repository to load", [ CommitChange( change_type=CommitChangeType.AddOrUpdate, path="project1/", properties={"svn:externals": None}, ), ], ) loader = SvnLoaderFromRemoteDump( swh_storage, repo_url, temp_directory=tmp_path, check_revision=1 ) assert loader.load() == {"status": "eventful"} assert_last_visit_matches( loader.storage, repo_url, status="full", type="svn", ) check_snapshot(loader.snapshot, loader.storage) assert not loader.svnrepo.has_relative_externals def test_loader_externals_cache(swh_storage, repo_url, external_repo_url, tmp_path): # first commit on external add_commit( external_repo_url, "Create some directories and files in an external repository", [ CommitChange( change_type=CommitChangeType.AddOrUpdate, path="code/hello/hello-world", properties={"svn:executable": "*"}, data=b"#!/bin/bash\necho Hello World !", ), ], ) # first commit add_commit( repo_url, "Create repository structure.", [ CommitChange( change_type=CommitChangeType.AddOrUpdate, path="project1/", ), CommitChange( change_type=CommitChangeType.AddOrUpdate, path="project2/", ), ], ) external_url = svn_urljoin(external_repo_url, "code/hello") # second commit add_commit( repo_url, ( "Set svn:externals property on trunk/externals path of repository to load." "One external targets a remote directory and another one a remote file." ), [ CommitChange( change_type=CommitChangeType.AddOrUpdate, path="project1/externals/", properties={"svn:externals": (f"{external_url} hello\n")}, ), CommitChange( change_type=CommitChangeType.AddOrUpdate, path="project2/externals/", properties={"svn:externals": (f"{external_url} hello\n")}, ), ], ) loader = SvnLoader(swh_storage, repo_url, temp_directory=tmp_path, check_revision=1) assert loader.load() == {"status": "eventful"} assert_last_visit_matches( loader.storage, repo_url, status="full", type="svn", ) check_snapshot(loader.snapshot, loader.storage) assert ( external_url, None, False, ) in loader.svnrepo.swhreplay.editor.externals_cache def test_loader_remove_versioned_path_with_external_overlap( swh_storage, repo_url, external_repo_url, tmp_path ): # first commit on external add_commit( external_repo_url, "Create a file in an external repository", [ CommitChange( change_type=CommitChangeType.AddOrUpdate, path="code/hello.sh", data=b"#!/bin/bash\necho Hello World !", ), ], ) # first commit add_commit( repo_url, "Add a file", [ CommitChange( change_type=CommitChangeType.AddOrUpdate, path="trunk/project/script.sh", data=b"#!/bin/bash\necho foo", ), ], ) # second commit add_commit( repo_url, "Set external on trunk overlapping versioned path", [ CommitChange( change_type=CommitChangeType.AddOrUpdate, path="trunk/", properties={ "svn:externals": ( f"{svn_urljoin(external_repo_url, 'code')} project/code" ) }, ), ], ) # third commit add_commit( repo_url, "Remove trunk/project/ versioned path", [ CommitChange( change_type=CommitChangeType.Delete, path="trunk/project/", ), ], ) loader = SvnLoader( swh_storage, repo_url, temp_directory=tmp_path, check_revision=1, ) assert loader.load() == {"status": "eventful"} assert_last_visit_matches( loader.storage, repo_url, status="full", type="svn", ) check_snapshot(loader.snapshot, loader.storage) def test_loader_export_external_path_using_peg_rev( swh_storage, repo_url, external_repo_url, tmp_path ): # first commit on external add_commit( external_repo_url, "Create a file in an external repository", [ CommitChange( change_type=CommitChangeType.AddOrUpdate, path="code/foo.sh", data=b"#!/bin/bash\necho foo", ), ], ) # second commit on external add_commit( external_repo_url, "Remove previously added file", [ CommitChange( change_type=CommitChangeType.Delete, path="code/foo.sh", ), ], ) # third commit on external add_commit( external_repo_url, "Add file again but with different content", [ CommitChange( change_type=CommitChangeType.AddOrUpdate, path="code/foo.sh", data=b"#!/bin/bash\necho bar", ), ], ) # first commit add_commit( repo_url, "Add trunk dir", [ CommitChange( change_type=CommitChangeType.AddOrUpdate, path="trunk/", ), ], ) # second commit add_commit( repo_url, "Set external on trunk targeting first revision of external repo", [ CommitChange( change_type=CommitChangeType.AddOrUpdate, path="trunk/", properties={ "svn:externals": ( f"{svn_urljoin(external_repo_url, 'code/foo.sh')}@1 foo.sh" ) }, ), ], ) # third commit add_commit( repo_url, "Modify external on trunk to target third revision of external repo", [ CommitChange( change_type=CommitChangeType.AddOrUpdate, path="trunk/", properties={ "svn:externals": ( f"{svn_urljoin(external_repo_url, 'code/foo.sh')}@3 foo.sh" ) }, ), ], ) loader = SvnLoader( swh_storage, repo_url, temp_directory=tmp_path, check_revision=1, ) assert loader.load() == {"status": "eventful"} assert_last_visit_matches( loader.storage, repo_url, status="full", type="svn", ) check_snapshot(loader.snapshot, loader.storage) def test_loader_remove_external_overlapping_versioned_path( swh_storage, repo_url, external_repo_url, tmp_path ): # first commit on external add_commit( external_repo_url, "Create files in an external repository", [ CommitChange( change_type=CommitChangeType.AddOrUpdate, path="code/foo.sh", data=b"#!/bin/bash\necho foo", ), CommitChange( change_type=CommitChangeType.AddOrUpdate, path="code/link", data=b"#!/bin/bash\necho link", ), ], ) # first commit add_commit( repo_url, "Add trunk dir and a link file", [ CommitChange(change_type=CommitChangeType.AddOrUpdate, path="trunk/"), CommitChange( change_type=CommitChangeType.AddOrUpdate, path="trunk/link", data=b"link ../test", properties={"svn:special": "*"}, ), ], ) # second commit add_commit( repo_url, "Set external on root dir overlapping versioned trunk path", [ CommitChange( change_type=CommitChangeType.AddOrUpdate, path="", # repo root dir properties={ "svn:externals": ( f"{svn_urljoin(external_repo_url, 'code/foo.sh')} trunk/code/foo.sh\n" # noqa f"{svn_urljoin(external_repo_url, 'code/link')} trunk/link" ) }, ), ], ) # third commit add_commit( repo_url, "Remove external on root dir", [ CommitChange( change_type=CommitChangeType.AddOrUpdate, path="", properties={"svn:externals": None}, ), ], ) loader = SvnLoader( swh_storage, repo_url, temp_directory=tmp_path, check_revision=1, ) assert loader.load() == {"status": "eventful"} assert_last_visit_matches( loader.storage, repo_url, status="full", type="svn", ) check_snapshot(loader.snapshot, loader.storage) def test_loader_modify_external_same_path( swh_storage, repo_url, external_repo_url, tmp_path ): # first commit on external add_commit( external_repo_url, "Create a file in an external repository", [ CommitChange( change_type=CommitChangeType.AddOrUpdate, path="code/foo.sh", data=b"#!/bin/bash\necho foo", ), ], ) # first commit add_commit( repo_url, "Add trunk dir", [CommitChange(change_type=CommitChangeType.AddOrUpdate, path="trunk/")], ) # second commit add_commit( repo_url, "Set external code on trunk dir", [ CommitChange( change_type=CommitChangeType.AddOrUpdate, path="trunk/", properties={ "svn:externals": (f"{svn_urljoin(external_repo_url, 'code')} code") }, ), ], ) # third commit add_commit( repo_url, "Change code external on trunk targeting an invalid URL", [ CommitChange( change_type=CommitChangeType.AddOrUpdate, path="trunk/", properties={"svn:externals": "file:///tmp/invalid/svn/repo/path code"}, ), ], ) loader = SvnLoader( swh_storage, repo_url, temp_directory=tmp_path, check_revision=1, ) assert loader.load() == {"status": "eventful"} assert_last_visit_matches( loader.storage, repo_url, status="full", type="svn", ) check_snapshot(loader.snapshot, loader.storage) def test_loader_with_recursive_external( swh_storage, repo_url, external_repo_url, tmp_path ): # first commit on external add_commit( external_repo_url, "Create a file in an external repository", [ CommitChange( change_type=CommitChangeType.AddOrUpdate, path="code/foo.sh", data=b"#!/bin/bash\necho foo", ), ], ) # first commit add_commit( repo_url, "Add trunk dir and a file", [ CommitChange( change_type=CommitChangeType.AddOrUpdate, path="trunk/bar.sh", data=b"#!/bin/bash\necho bar", ) ], ) # second commit add_commit( repo_url, "Set externals code on trunk/externals dir, one being recursive", [ CommitChange( change_type=CommitChangeType.AddOrUpdate, path="trunk/externals/", properties={ "svn:externals": ( f"{svn_urljoin(external_repo_url, 'code')} code\n" f"{repo_url} recursive" ) }, ), ], ) # first load loader = SvnLoader( swh_storage, repo_url, temp_directory=tmp_path, check_revision=1, ) assert loader.load() == {"status": "eventful"} assert_last_visit_matches( loader.storage, repo_url, status="full", type="svn", ) check_snapshot(loader.snapshot, loader.storage) assert loader.svnrepo.has_recursive_externals # second load on stale repo loader = SvnLoader( swh_storage, repo_url, temp_directory=tmp_path, check_revision=1, ) assert loader.load() == {"status": "uneventful"} assert_last_visit_matches( loader.storage, repo_url, status="full", type="svn", ) check_snapshot(loader.snapshot, loader.storage) assert loader.svnrepo.has_recursive_externals # third commit add_commit( repo_url, "Remove recursive external on trunk/externals dir", [ CommitChange( change_type=CommitChangeType.AddOrUpdate, path="trunk/externals/", properties={ "svn:externals": (f"{svn_urljoin(external_repo_url, 'code')} code") }, ), ], ) # third load loader = SvnLoader( swh_storage, repo_url, temp_directory=tmp_path, check_revision=1, ) assert loader.load() == {"status": "eventful"} assert_last_visit_matches( loader.storage, repo_url, status="full", type="svn", ) check_snapshot(loader.snapshot, loader.storage) assert not loader.svnrepo.has_recursive_externals def test_loader_externals_with_same_target( swh_storage, repo_url, external_repo_url, tmp_path ): # first commit on external add_commit( external_repo_url, "Create files in an external repository", [ CommitChange( change_type=CommitChangeType.AddOrUpdate, path="foo/foo.sh", data=b"#!/bin/bash\necho foo", ), CommitChange( change_type=CommitChangeType.AddOrUpdate, path="bar/bar.sh", data=b"#!/bin/bash\necho bar", ), ], ) # first commit add_commit( repo_url, "Add trunk/src dir", [CommitChange(change_type=CommitChangeType.AddOrUpdate, path="trunk/src/")], ) # second commit add_commit( repo_url, "Add externals on trunk targeting same directory", [ CommitChange( change_type=CommitChangeType.AddOrUpdate, path="trunk/", properties={ "svn:externals": ( f"{svn_urljoin(external_repo_url, 'foo')} src\n" f"{svn_urljoin(external_repo_url, 'bar')} src" ) }, ), ], ) loader = SvnLoader( swh_storage, repo_url, temp_directory=tmp_path, check_revision=1, ) assert loader.load() == {"status": "eventful"} assert_last_visit_matches( loader.storage, repo_url, status="full", type="svn", ) check_snapshot(loader.snapshot, loader.storage) def test_loader_external_in_versioned_path( swh_storage, repo_url, external_repo_url, tmp_path ): # first commit on external add_commit( external_repo_url, "Create a file in an external repository", [ CommitChange( change_type=CommitChangeType.AddOrUpdate, path="src/foo.sh", data=b"#!/bin/bash\necho foo", ), ], ) # first commit add_commit( repo_url, "Add trunk/src dir", [CommitChange(change_type=CommitChangeType.AddOrUpdate, path="trunk/src/")], ) # second commit add_commit( repo_url, "Add a file in trunk/src directory and set external on trunk targeting src", [ CommitChange( change_type=CommitChangeType.AddOrUpdate, path="trunk/src/bar.sh", data=b"#!/bin/bash\necho bar", ), CommitChange( change_type=CommitChangeType.AddOrUpdate, path="trunk/", properties={ "svn:externals": (f"{svn_urljoin(external_repo_url, 'src')} src") }, ), ], ) loader = SvnLoader( swh_storage, repo_url, temp_directory=tmp_path, check_revision=1, ) assert loader.load() == {"status": "eventful"} assert_last_visit_matches( loader.storage, repo_url, status="full", type="svn", ) check_snapshot(loader.snapshot, loader.storage) def test_dump_loader_externals_in_loaded_repository(swh_storage, tmp_path, mocker): repo_url = create_repo(tmp_path, repo_name="foo") externa_url = create_repo(tmp_path, repo_name="foobar") # first commit on external add_commit( externa_url, "Create a file in an external repository", [ CommitChange( change_type=CommitChangeType.AddOrUpdate, path="trunk/src/foo.sh", data=b"#!/bin/bash\necho foo", ), ], ) add_commit( repo_url, ( "Add a file and set externals on trunk/externals:" "one external located in this repository, the other in a remote one" ), [ CommitChange( change_type=CommitChangeType.AddOrUpdate, path="trunk/src/bar.sh", data=b"#!/bin/bash\necho bar", ), CommitChange( change_type=CommitChangeType.AddOrUpdate, path="trunk/externals/", properties={ "svn:externals": ( f"{svn_urljoin(repo_url, 'trunk/src/bar.sh')} bar.sh\n" f"{svn_urljoin(externa_url, 'trunk/src/foo.sh')} foo.sh" ) }, ), ], ) from swh.loader.svn.svn import client mock_client = mocker.MagicMock() mocker.patch.object(client, "Client", mock_client) class Info: repos_root_url = repo_url mock_client().info.return_value = {"repo": Info()} loader = SvnLoaderFromRemoteDump(swh_storage, repo_url, temp_directory=tmp_path) loader.load() export_call_args = mock_client().export.call_args_list # first external export should use the base URL of the local repository # mounted from the remote dump as it is located in loaded repository assert export_call_args[0][0][0] != svn_urljoin( loader.svnrepo.origin_url, "trunk/src/bar.sh" ) assert export_call_args[0][0][0] == svn_urljoin( loader.svnrepo.remote_url, "trunk/src/bar.sh" ) # second external export should use the remote URL of the external repository assert export_call_args[1][0][0] == svn_urljoin(externa_url, "trunk/src/foo.sh") def test_loader_externals_add_remove_readd_on_subpath( swh_storage, repo_url, external_repo_url, tmp_path ): # first commit on external add_commit( external_repo_url, "Create files in an external repository", [ CommitChange( change_type=CommitChangeType.AddOrUpdate, path="src/foo.sh", data=b"#!/bin/bash\necho foo", ), CommitChange( change_type=CommitChangeType.AddOrUpdate, path="src/bar.sh", data=b"#!/bin/bash\necho bar", ), ], ) # first commit add_commit( repo_url, "Set external on two paths targeting the same absolute path", [ CommitChange( change_type=CommitChangeType.AddOrUpdate, path="trunk/src/", properties={ "svn:externals": ( f"{svn_urljoin(external_repo_url, 'src/foo.sh')} foo.sh" ) }, ), CommitChange( change_type=CommitChangeType.AddOrUpdate, path="trunk/", properties={ "svn:externals": ( f"{svn_urljoin(external_repo_url, 'src/foo.sh')} src/foo.sh" ) }, ), ], ) # second commit add_commit( repo_url, "Remove external on a single path", [ CommitChange( change_type=CommitChangeType.AddOrUpdate, path="trunk/", properties={ "svn:externals": ( f"{svn_urljoin(external_repo_url, 'src/bar.sh')} src/bar.sh" ) }, ), ], ) loader = SvnLoader( swh_storage, repo_url, temp_directory=tmp_path, check_revision=1, ) assert loader.load() == {"status": "eventful"} assert_last_visit_matches( loader.storage, repo_url, status="full", type="svn", ) check_snapshot(loader.snapshot, loader.storage) def test_loader_directory_symlink_in_external( swh_storage, repo_url, external_repo_url, tmp_path ): # first commit on external add_commit( external_repo_url, "Create dirs in an external repository", [ CommitChange( change_type=CommitChangeType.AddOrUpdate, path="src/apps/", ), CommitChange( change_type=CommitChangeType.AddOrUpdate, path="src/deps/", ), ], ) # second commit on external add_commit( external_repo_url, "Add symlink to src/deps in src/apps directory", [ CommitChange( change_type=CommitChangeType.AddOrUpdate, path="src/apps/deps", data=b"link ../deps", properties={"svn:special": "*"}, ), ], ) # first commit add_commit( repo_url, "Add deps dir", [CommitChange(change_type=CommitChangeType.AddOrUpdate, path="deps/")], ) # second commit add_commit( repo_url, "Set external to deps folder", [ CommitChange( change_type=CommitChangeType.AddOrUpdate, path="deps/", properties={"svn:externals": (f"{external_repo_url} external")}, ), ], ) loader = SvnLoader( swh_storage, repo_url, temp_directory=tmp_path, check_revision=1, ) assert loader.load() == {"status": "eventful"} assert_last_visit_matches( loader.storage, repo_url, status="full", type="svn", ) check_snapshot(loader.snapshot, loader.storage) def test_loader_with_externals_parsing_error( swh_storage, repo_url, external_repo_url, tmp_path ): # first commit on external add_commit( external_repo_url, "Create code directory", [ CommitChange( change_type=CommitChangeType.AddOrUpdate, path="code/", ), ], ) # second commit on external add_commit( external_repo_url, "Create code/foo.sh file", [ CommitChange( change_type=CommitChangeType.AddOrUpdate, path="code/foo.sh", properties={"svn:executable": "*"}, data=b"#!/bin/bash\necho foo", ), ], ) # first commit add_commit( repo_url, "Create trunk directory.", [ CommitChange( change_type=CommitChangeType.AddOrUpdate, path="trunk/", ), ], ) # second commit add_commit( repo_url, "Set external on trunk directory that will result in a parsing error.", [ CommitChange( change_type=CommitChangeType.AddOrUpdate, path="trunk/", properties={ "svn:externals": ( f"-r2{svn_urljoin(external_repo_url, 'code/foo.sh')} foo.sh" ) }, ), ], ) # third commit add_commit( repo_url, "Fix external definition on trunk directory.", [ CommitChange( change_type=CommitChangeType.AddOrUpdate, path="trunk/", properties={ "svn:externals": ( f"-r2 {svn_urljoin(external_repo_url, 'code/foo.sh')} foo.sh" ) }, ), ], ) loader = SvnLoader( swh_storage, repo_url, temp_directory=tmp_path, check_revision=1, ) assert loader.load() == {"status": "eventful"} assert_last_visit_matches( loader.storage, repo_url, status="full", type="svn", ) check_snapshot(loader.snapshot, loader.storage) @pytest.mark.parametrize("remote_external_path", ["src/main/project", "src/main"]) def test_loader_overlapping_external_paths_removal( swh_storage, repo_url, external_repo_url, tmp_path, remote_external_path ): add_commit( external_repo_url, "Create external repository layout", [ CommitChange( change_type=CommitChangeType.AddOrUpdate, path="src/main/project/foo/bar", data=b"bar", ), ], ) add_commit( repo_url, "Create repository layout", [ CommitChange( change_type=CommitChangeType.AddOrUpdate, path="trunk/src/main/project/", ), ], ) add_commit( repo_url, "Add overlapping externals", [ CommitChange( change_type=CommitChangeType.AddOrUpdate, path="trunk/src/main/", properties={ "svn:externals": f"{svn_urljoin(external_repo_url, remote_external_path)} project" # noqa }, ), CommitChange( change_type=CommitChangeType.AddOrUpdate, path="trunk/src/main/project/", properties={ "svn:externals": f'{svn_urljoin(external_repo_url, "src/main/project/foo")} foo' # noqa }, ), ], ) add_commit( repo_url, "Remove directory with externals overlapping with those from ancestor directory", [ CommitChange( change_type=CommitChangeType.Delete, path="trunk/src/main/project/", ), ], ) loader = SvnLoader( swh_storage, repo_url, temp_directory=tmp_path, check_revision=1, ) assert loader.load() == {"status": "eventful"} assert_last_visit_matches( loader.storage, repo_url, status="full", type="svn", ) check_snapshot(loader.snapshot, loader.storage) def test_loader_copyfrom_rev_with_externals( swh_storage, repo_url, external_repo_url, tmp_path ): add_commit( external_repo_url, "Create some directories and files in an external repository", [ CommitChange( change_type=CommitChangeType.AddOrUpdate, path="code/hello/hello-world", data=b"#!/bin/bash\necho Hello World !", ), ], ) add_commit( repo_url, "Create repository structure, one externals directory with svn:externals" "property set and one trunk directory", [ CommitChange( change_type=CommitChangeType.AddOrUpdate, path="externals/", properties={ "svn:externals": f'{svn_urljoin(external_repo_url, "code/hello/")} hello' # noqa }, ), CommitChange( change_type=CommitChangeType.AddOrUpdate, path="trunk/", ), ], ) add_commit( repo_url, "Add copy of externals directory to trunk from revision 1.", [ CommitChange( change_type=CommitChangeType.AddOrUpdate, path="trunk/externals/", copyfrom_path=repo_url + "/externals", copyfrom_rev=1, ), ], ) loader = SvnLoader( swh_storage, repo_url, temp_directory=tmp_path, check_revision=1, ) assert loader.load() == {"status": "eventful"} assert_last_visit_matches( loader.storage, repo_url, status="full", type="svn", ) check_snapshot(loader.snapshot, loader.storage) + + +def test_loader_with_unparsable_external_on_path( + swh_storage, repo_url, external_repo_url, tmp_path +): + # first commit on external + add_commit( + external_repo_url, + "Create some directories and files in an external repository", + [ + CommitChange( + change_type=CommitChangeType.AddOrUpdate, + path="code/hello/hello-world", + properties={"svn:executable": "*"}, + data=b"#!/bin/bash\necho Hello World !", + ), + CommitChange( + change_type=CommitChangeType.AddOrUpdate, + path="code/foo/foo.sh", + properties={"svn:executable": "*"}, + data=b"#!/bin/bash\necho foo", + ), + ], + ) + + # first commit + add_commit( + repo_url, + ( + "Set parsable svn:externals property on project1 path of repository to load." + "Add a code directory with a file in it." + ), + [ + CommitChange( + change_type=CommitChangeType.AddOrUpdate, + path="project1/", + properties={ + "svn:externals": ( + f"{svn_urljoin(external_repo_url, 'code/hello')} hello\n" + ) + }, + ), + CommitChange( + change_type=CommitChangeType.AddOrUpdate, + path="code/foo.sh", + properties={"svn:executable": "*"}, + data=b"#!/bin/bash\necho foo", + ), + ], + ) + + # second commit + add_commit( + repo_url, + ( + "Set unparsable svn:externals property on project2 path of repository to load." + ), + [ + CommitChange( + change_type=CommitChangeType.AddOrUpdate, + path="project2/", + properties={"svn:externals": ("^code/foo foo\n")}, + ), + ], + ) + + # third commit + add_commit( + repo_url, + ( + "Fix unparsable svn:externals property on project2 path of repository to load." + ), + [ + CommitChange( + change_type=CommitChangeType.AddOrUpdate, + path="project2/", + properties={"svn:externals": ("^/code/foo foo\n")}, + ), + ], + ) + + loader = SvnLoader( + swh_storage, + repo_url, + temp_directory=tmp_path, + check_revision=1, + ) + assert loader.load() == {"status": "eventful"} + assert_last_visit_matches( + loader.storage, + repo_url, + status="full", + type="svn", + ) + check_snapshot(loader.snapshot, loader.storage) diff --git a/swh/loader/svn/tests/test_utils.py b/swh/loader/svn/tests/test_utils.py index 63afc58..36f5032 100644 --- a/swh/loader/svn/tests/test_utils.py +++ b/swh/loader/svn/tests/test_utils.py @@ -1,481 +1,494 @@ # Copyright (C) 2016-2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import logging import os from pathlib import Path import pty import re import shutil from subprocess import Popen, run import pytest from swh.loader.svn import utils from swh.loader.tests import prepare_repository_from_archive def test_outputstream(): stdout_r, stdout_w = pty.openpty() echo = Popen(["echo", "-e", "foo\nbar\nbaz"], stdout=stdout_w) os.close(stdout_w) stdout_stream = utils.OutputStream(stdout_r) lines = [] while True: current_lines, readable = stdout_stream.read_lines() lines += current_lines if not readable: break echo.wait() os.close(stdout_r) assert lines == ["foo", "bar", "baz"] def test_init_svn_repo_from_dump(datadir, tmp_path): """Mounting svn repository out of a dump is ok""" dump_name = "penguinsdbtools2018.dump.gz" dump_path = os.path.join(datadir, dump_name) tmp_repo, repo_path = utils.init_svn_repo_from_dump( dump_path, gzip=True, cleanup_dump=False, root_dir=tmp_path ) assert os.path.exists(dump_path), "Dump path should still exists" assert os.path.exists(repo_path), "Repository should exists" def test_init_svn_repo_from_dump_svnadmin_error(tmp_path): """svnadmin load error should be reported in exception text""" dump_path = os.path.join(tmp_path, "foo") Path(dump_path).touch() with pytest.raises( ValueError, match="svnadmin: E200003: Premature end of content data in dumpstream", ): utils.init_svn_repo_from_dump(dump_path, cleanup_dump=False, root_dir=tmp_path) def test_init_svn_repo_from_dump_and_cleanup(datadir, tmp_path): """Mounting svn repository with a dump cleanup after is ok""" dump_name = "penguinsdbtools2018.dump.gz" dump_ori_path = os.path.join(datadir, dump_name) dump_path = os.path.join(tmp_path, dump_name) shutil.copyfile(dump_ori_path, dump_path) assert os.path.exists(dump_path) assert os.path.exists(dump_ori_path) tmp_repo, repo_path = utils.init_svn_repo_from_dump( dump_path, gzip=True, root_dir=tmp_path ) assert not os.path.exists(dump_path), "Dump path should no longer exists" assert os.path.exists(repo_path), "Repository should exists" assert os.path.exists(dump_ori_path), "Original dump path should still exists" def test_init_svn_repo_from_dump_and_cleanup_already_done( datadir, tmp_path, mocker, caplog ): """Mounting svn repository out of a dump is ok""" caplog.set_level(logging.INFO, "swh.loader.svn.utils") dump_name = "penguinsdbtools2018.dump.gz" dump_ori_path = os.path.join(datadir, dump_name) mock_remove = mocker.patch("os.remove") mock_remove.side_effect = FileNotFoundError dump_path = os.path.join(tmp_path, dump_name) shutil.copyfile(dump_ori_path, dump_path) assert os.path.exists(dump_path) assert os.path.exists(dump_ori_path) tmp_repo, repo_path = utils.init_svn_repo_from_dump( dump_path, gzip=True, root_dir=tmp_path ) assert os.path.exists(repo_path), "Repository should exists" assert os.path.exists(dump_ori_path), "Original dump path should still exists" assert len(caplog.record_tuples) == 1 assert "Failure to remove" in caplog.record_tuples[0][2] assert mock_remove.called def test_init_svn_repo_from_truncated_dump(datadir, tmp_path): """Mounting partial svn repository from a truncated dump should work""" # prepare a repository archive_name = "pkg-gourmet" archive_path = os.path.join(datadir, f"{archive_name}.tgz") repo_url = prepare_repository_from_archive(archive_path, archive_name, tmp_path) # dump it to file dump_path = str(tmp_path / f"{archive_name}.dump") truncated_dump_path = str(tmp_path / f"{archive_name}_truncated.dump") svnrdump_cmd = ["svnrdump", "dump", repo_url] with open(dump_path, "wb") as dump: run(svnrdump_cmd, stdout=dump) # create a truncated dump file that will generate a "svnadmin load" error with open(dump_path, "rb") as dump, open( truncated_dump_path, "wb" ) as truncated_dump: dump_lines = dump.readlines() assert len(dump_lines) > 150 truncated_dump_content = b"".join(dump_lines[:150]) truncated_dump.write(truncated_dump_content) # compute max revision number with non truncated data revs = re.findall(rb"Revision-number: ([0-9]+)", truncated_dump_content) max_rev = int(revs[-1]) - 1 # prepare repository from truncated dump _, repo_path = utils.init_svn_repo_from_dump( truncated_dump_path, gzip=False, root_dir=tmp_path, max_rev=max_rev ) # check expected number of revisions have been loaded svnadmin_info = run(["svnadmin", "info", repo_path], capture_output=True, text=True) assert f"Revisions: {max_rev}\n" in svnadmin_info.stdout def test_init_svn_repo_from_archive_dump(datadir, tmp_path): """Mounting svn repository out of an archive dump is ok""" dump_name = "penguinsdbtools2018.dump.gz" dump_path = os.path.join(datadir, dump_name) tmp_repo, repo_path = utils.init_svn_repo_from_archive_dump( dump_path, cleanup_dump=False, root_dir=tmp_path ) assert os.path.exists(dump_path), "Dump path should still exists" assert os.path.exists(repo_path), "Repository should exists" def test_init_svn_repo_from_archive_dump_and_cleanup(datadir, tmp_path): """Mounting svn repository out of a dump is ok""" dump_name = "penguinsdbtools2018.dump.gz" dump_ori_path = os.path.join(datadir, dump_name) dump_path = os.path.join(tmp_path, dump_name) shutil.copyfile(dump_ori_path, dump_path) assert os.path.exists(dump_path) assert os.path.exists(dump_ori_path) tmp_repo, repo_path = utils.init_svn_repo_from_archive_dump( dump_path, root_dir=tmp_path ) assert not os.path.exists(dump_path), "Dump path should no longer exists" assert os.path.exists(repo_path), "Repository should exists" assert os.path.exists(dump_ori_path), "Original dump path should still exists" @pytest.mark.parametrize( "base_url, paths_to_join, expected_result", [ ( "https://svn.example.org", ["repos", "test"], "https://svn.example.org/repos/test", ), ( "https://svn.example.org/", ["repos", "test"], "https://svn.example.org/repos/test", ), ( "https://svn.example.org/foo", ["repos", "test"], "https://svn.example.org/foo/repos/test", ), ( "https://svn.example.org/foo/", ["/repos", "test/"], "https://svn.example.org/foo/repos/test", ), ( "https://svn.example.org/foo", ["../bar"], "https://svn.example.org/bar", ), ], ) def test_svn_urljoin(base_url, paths_to_join, expected_result): assert utils.svn_urljoin(base_url, *paths_to_join) == expected_result @pytest.mark.parametrize( "external, dir_path, repo_url, expected_result", [ # subversion < 1.5 ( "third-party/sounds http://svn.example.com/repos/sounds", "trunk/externals", "http://svn.example.org/repos/test", ("third-party/sounds", "http://svn.example.com/repos/sounds", None, False), ), ( "third-party/skins -r148 http://svn.example.com/skinproj", "trunk/externals", "http://svn.example.org/repos/test", ("third-party/skins", "http://svn.example.com/skinproj", 148, False), ), ( "third-party/skins/toolkit -r21 http://svn.example.com/skin-maker", "trunk/externals", "http://svn.example.org/repos/test", ( "third-party/skins/toolkit", "http://svn.example.com/skin-maker", 21, False, ), ), # subversion >= 1.5 ( " http://svn.example.com/repos/sounds third-party/sounds", "trunk/externals", "http://svn.example.org/repos/test", ("third-party/sounds", "http://svn.example.com/repos/sounds", None, False), ), ( "-r148 http://svn.example.com/skinproj third-party/skins", "trunk/externals", "http://svn.example.org/repos/test", ("third-party/skins", "http://svn.example.com/skinproj", 148, False), ), ( "-r 21 http://svn.example.com/skin-maker third-party/skins/toolkit", "trunk/externals", "http://svn.example.org/repos/test", ( "third-party/skins/toolkit", "http://svn.example.com/skin-maker", 21, False, ), ), ( "http://svn.example.com/repos/sounds third-party/sounds", "trunk/externals", "http://svn.example.org/repos/test", ("third-party/sounds", "http://svn.example.com/repos/sounds", None, False), ), ( "http://svn.example.com/skinproj@148 third-party/skins", "trunk/externals", "http://svn.example.org/repos/test", ("third-party/skins", "http://svn.example.com/skinproj", 148, False), ), ( "http://anon:anon@svn.example.com/skin-maker@21 third-party/skins/toolkit", "trunk/externals", "http://svn.example.org/repos/test", ( "third-party/skins/toolkit", "http://anon:anon@svn.example.com/skin-maker", 21, False, ), ), ( "-r21 http://anon:anon@svn.example.com/skin-maker third-party/skins/toolkit", # noqa "trunk/externals", "http://svn.example.org/repos/test", ( "third-party/skins/toolkit", "http://anon:anon@svn.example.com/skin-maker", 21, False, ), ), ( "-r21 http://anon:anon@svn.example.com/skin-maker@21 third-party/skins/toolkit", # noqa "trunk/externals", "http://svn.example.org/repos/test", ( "third-party/skins/toolkit", "http://anon:anon@svn.example.com/skin-maker", 21, False, ), ), # subversion >= 1.5, relative external definitions ( "^/sounds third-party/sounds", "trunk/externals", "http://svn.example.org/repos/test", ( "third-party/sounds", "http://svn.example.org/repos/test/sounds", None, False, ), ), ( "/skinproj@148 third-party/skins", "trunk/externals", "http://svn.example.org/repos/test", ("third-party/skins", "http://svn.example.org/skinproj", 148, True), ), ( "//svn.example.com/skin-maker@21 third-party/skins/toolkit", "trunk/externals", "http://svn.example.org/repos/test", ( "third-party/skins/toolkit", "http://svn.example.com/skin-maker", 21, True, ), ), ( "^/../../skin-maker@21 third-party/skins/toolkit", "trunk/externals", "http://svn.example.org/repos/test", ( "third-party/skins/toolkit", "http://svn.example.org/skin-maker", 21, True, ), ), ( "../skins skins", "trunk/externals", "http://svn.example.org/repos/test", ("skins", "http://svn.example.org/repos/test/trunk/skins", None, False), ), ( "../skins skins", "trunk/externals", "http://svn.example.org/repos/test", ("skins", "http://svn.example.org/repos/test/trunk/skins", None, False), ), # subversion >= 1.6 ( 'http://svn.thirdparty.com/repos/My%20Project "My Project"', "trunk/externals", "http://svn.example.org/repos/test", ("My Project", "http://svn.thirdparty.com/repos/My%20Project", None, False), ), ( 'http://svn.thirdparty.com/repos/My%20%20%20Project "My Project"', "trunk/externals", "http://svn.example.org/repos/test", ( "My Project", "http://svn.thirdparty.com/repos/My%20%20%20Project", None, False, ), ), ( 'http://svn.thirdparty.com/repos/%22Quotes%20Too%22 \\"Quotes\\ Too\\"', "trunk/externals", "http://svn.example.org/repos/test", ( '"Quotes Too"', "http://svn.thirdparty.com/repos/%22Quotes%20Too%22", None, False, ), ), ( 'http://svn.thirdparty.com/repos/%22Quotes%20%20%20Too%22 \\"Quotes\\ \\ \\ Too\\"', # noqa "trunk/externals", "http://svn.example.org/repos/test", ( '"Quotes Too"', "http://svn.thirdparty.com/repos/%22Quotes%20%20%20Too%22", None, False, ), ), # edge cases ( '-r1 http://svn.thirdparty.com/repos/test "trunk/PluginFramework"', "trunk/externals", "http://svn.example.org/repos/test", ("trunk/PluginFramework", "http://svn.thirdparty.com/repos/test", 1, False), ), ( "external -r 9 http://svn.thirdparty.com/repos/test", "tags", "http://svn.example.org/repos/test", ("external", "http://svn.thirdparty.com/repos/test", 9, False), ), ( "./external http://svn.thirdparty.com/repos/test", "tags", "http://svn.example.org/repos/test", ("external", "http://svn.thirdparty.com/repos/test", None, False), ), ( ".external http://svn.thirdparty.com/repos/test", "tags", "http://svn.example.org/repos/test", (".external", "http://svn.thirdparty.com/repos/test", None, False), ), ( "external/ http://svn.thirdparty.com/repos/test", "tags", "http://svn.example.org/repos/test", ("external", "http://svn.thirdparty.com/repos/test", None, False), ), ( "external ttp://svn.thirdparty.com/repos/test", "tags", "http://svn.example.org/repos/test", ("external", "ttp://svn.thirdparty.com/repos/test", None, False), ), ( "external http//svn.thirdparty.com/repos/test", "tags", "http://svn.example.org/repos/test", ("external", "http//svn.thirdparty.com/repos/test", None, False), ), ( "C:\\code\\repo\\external http://svn.thirdparty.com/repos/test", "tags", "http://svn.example.org/repos/test", ("C:coderepoexternal", "http://svn.thirdparty.com/repos/test", None, False), ), ( "C:\\\\code\\\\repo\\\\external http://svn.thirdparty.com/repos/test", "tags", "http://svn.example.org/repos/test", ( "C:\\code\\repo\\external", "http://svn.thirdparty.com/repos/test", None, False, ), ), ( "-r 123 http://svn.example.com/repos/sounds@100 third-party/sounds", "trunk/externals", "http://svn.example.org/repos/test", ("third-party/sounds", "http://svn.example.com/repos/sounds", 123, False), ), ( "-r 123 http://svn.example.com/repos/sounds@150 third-party/sounds", "trunk/externals", "http://svn.example.org/repos/test", ("third-party/sounds", "http://svn.example.com/repos/sounds", 123, False), ), ], ) def test_parse_external_definition(external, dir_path, repo_url, expected_result): assert ( utils.parse_external_definition(external, dir_path, repo_url) == expected_result ) + + +@pytest.mark.parametrize( + "invalid_external", + [ + "^tests@21 tests", + ], +) +def test_parse_invalid_external_definition(invalid_external): + with pytest.raises(ValueError, match="Failed to parse external definition"): + utils.parse_external_definition( + invalid_external, "/trunk/externals", "http://svn.example.org/repo" + ) diff --git a/swh/loader/svn/utils.py b/swh/loader/svn/utils.py index f6c0e1f..68ef22b 100644 --- a/swh/loader/svn/utils.py +++ b/swh/loader/svn/utils.py @@ -1,339 +1,344 @@ # Copyright (C) 2016-2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import errno import logging import os import re import shutil from subprocess import PIPE, Popen, call, run import tempfile from typing import Optional, Tuple from urllib.parse import quote, urlparse, urlunparse logger = logging.getLogger(__name__) class OutputStream: """Helper class to read lines from a program output while it is running Args: fileno (int): File descriptor of a program output stream opened in text mode """ def __init__(self, fileno): self._fileno = fileno self._buffer = "" def read_lines(self): """ Read available lines from the output stream and return them. Returns: Tuple[List[str], bool]: A tuple whose first member is the read lines and second member a boolean indicating if there are still some other lines available to read. """ try: output = os.read(self._fileno, 1000).decode() except OSError as e: if e.errno != errno.EIO: raise output = "" output = output.replace("\r\n", "\n") lines = output.split("\n") lines[0] = self._buffer + lines[0] if output: self._buffer = lines[-1] return (lines[:-1], True) else: self._buffer = "" if len(lines) == 1 and not lines[0]: lines = [] return (lines, False) def init_svn_repo_from_dump( dump_path: str, prefix: Optional[str] = None, suffix: Optional[str] = None, root_dir: str = "/tmp", gzip: bool = False, cleanup_dump: bool = True, max_rev: int = -1, ) -> Tuple[str, str]: """Given a path to a svn dump, initialize an svn repository with the content of said dump. Args: dump_path: The dump to the path prefix: optional prefix file name for the working directory suffix: optional suffix file name for the working directory root_dir: the root directory where the working directory is created gzip: Boolean to determine whether we treat the dump as compressed or not. cleanup_dump: Whether we want this function call to clean up the dump at the end of the repository initialization. Raises: ValueError in case of failure to run the command to uncompress and load the dump. Returns: A tuple: - temporary folder: containing the mounted repository - repo_path: path to the mounted repository inside the temporary folder """ project_name = os.path.basename(os.path.dirname(dump_path)) temp_dir = tempfile.mkdtemp(prefix=prefix, suffix=suffix, dir=root_dir) try: repo_path = os.path.join(temp_dir, project_name) # create the repository that will be loaded with the dump cmd = ["svnadmin", "create", repo_path] r = call(cmd) if r != 0: raise ValueError( "Failed to initialize empty svn repo for %s" % project_name ) read_dump_cmd = ["cat", dump_path] if gzip: read_dump_cmd = ["gzip", "-dc", dump_path] with Popen(read_dump_cmd, stdout=PIPE) as dump: # load dump and bypass properties validation as Unicode decoding errors # are already handled in loader implementation (see _ra_codecs_error_handler # in ra.py) cmd = ["svnadmin", "load", "-q", "--bypass-prop-validation"] if max_rev > 0: cmd.append(f"-r1:{max_rev}") cmd.append(repo_path) svnadmin_load = run(cmd, stdin=dump.stdout, capture_output=True, text=True) if svnadmin_load.returncode != 0: if max_rev > 0: # if max_rev is specified, we might have a truncated dump due to # an error when executing svnrdump, check if max_rev have been # loaded and continue loading process if it is the case svnadmin_info = run( ["svnadmin", "info", repo_path], capture_output=True, text=True ) if f"Revisions: {max_rev}\n" in svnadmin_info.stdout: return temp_dir, repo_path raise ValueError( f"Failed to mount the svn dump for project {project_name}\n" + svnadmin_load.stderr ) return temp_dir, repo_path except Exception as e: shutil.rmtree(temp_dir) raise e finally: if cleanup_dump: try: # At this time, the temporary svn repository is mounted from the dump or # the svn repository failed to mount. Either way, we can drop the dump. os.remove(dump_path) assert not os.path.exists(dump_path) except OSError as e: logger.warn("Failure to remove the dump %s: %s", dump_path, e) def init_svn_repo_from_archive_dump( archive_path: str, prefix: Optional[str] = None, suffix: Optional[str] = None, root_dir: str = "/tmp", cleanup_dump: bool = True, ) -> Tuple[str, str]: """Given a path to an archive containing an svn dump, initializes an svn repository with the content of the uncompressed dump. Args: archive_path: The archive svn dump path prefix: optional prefix file name for the working directory suffix: optional suffix file name for the working directory root_dir: the root directory where the working directory is created gzip: Boolean to determine whether we treat the dump as compressed or not. cleanup_dump: Whether we want this function call to clean up the dump at the end of the repository initialization. Raises: ValueError in case of failure to run the command to uncompress and load the dump. Returns: A tuple: - temporary folder: containing the mounted repository - repo_path: path to the mounted repository inside the temporary folder """ return init_svn_repo_from_dump( archive_path, prefix=prefix, suffix=suffix, root_dir=root_dir, gzip=True, cleanup_dump=cleanup_dump, ) def svn_urljoin(base_url: str, *args) -> str: """Join a base URL and a list of paths in a SVN way. For instance: - svn_urljoin("http://example.org", "foo", "bar") will return "https://example.org/foo/bar - svn_urljoin("http://example.org/foo", "../bar") will return "https://example.org/bar Args: base_url: Base URL to join paths with args: path components Returns: The joined URL """ parsed_url = urlparse(base_url) path = os.path.abspath( os.path.join(parsed_url.path or "/", *[arg.strip("/") for arg in args]) ) return f"{parsed_url.scheme}://{parsed_url.netloc}{path}" def parse_external_definition( external: str, dir_path: str, repo_url: str ) -> Tuple[str, str, Optional[int], bool]: """Parse a subversion external definition. Args: external: an external definition, extracted from the lines split of a svn:externals property value dir_path: The path of the directory in the subversion repository where the svn:externals property was set repo_url: URL of the subversion repository Returns: A tuple with the following members: - path relative to dir_path where the external should be exported - URL of the external to export - optional revision of the external to export - boolean indicating if the external URL is relative to the repository URL and targets a path not in the repository """ path = "" external_url = "" revision = None relative_url = False prev_part = None # turn multiple spaces into a single one and split on space for external_part in external.split(): if prev_part == "-r": # parse revision in the form "-r XXX" revision = int(external_part) elif external_part.startswith("-r") and external_part != "-r": # parse revision in the form "-rXXX" revision = int(external_part[2:]) elif external_part.startswith("^/"): # URL relative to the root of the repository in which the svn:externals # property is versioned external_url = svn_urljoin(repo_url, external_part[2:]) relative_url = not external_url.startswith(repo_url) elif external_part.startswith("//"): # URL relative to the scheme of the URL of the directory on which the # svn:externals property is set scheme = urlparse(repo_url).scheme external_url = f"{scheme}:{external_part}" relative_url = not external_url.startswith(repo_url) elif external_part.startswith("/"): # URL relative to the root URL of the server on which the svn:externals # property is versioned parsed_url = urlparse(repo_url) root_url = f"{parsed_url.scheme}://{parsed_url.netloc}" external_url = svn_urljoin(root_url, external_part) relative_url = not external_url.startswith(repo_url) elif external_part.startswith("../"): # URL relative to the URL of the directory on which the svn:externals # property is set external_url = svn_urljoin(repo_url, dir_path, external_part) relative_url = not external_url.startswith(repo_url) elif re.match(r"^.*:*//.*", external_part): # absolute external URL external_url = external_part # subversion >= 1.6 added a quoting and escape mechanism to the syntax so # that the path of the external working copy may contain whitespace. elif external_part.startswith('\\"'): external_split = external.split('\\"') path = [ e.replace("\\ ", " ") for e in external_split if e.startswith(external_part[2:]) ][0] path = f'"{path}"' elif external_part.endswith('\\"'): continue elif external_part.startswith('"'): external_split = external.split('"') path_prefix = external_part.strip('"') path = next(iter([e for e in external_split if e.startswith(path_prefix)])) elif external_part.endswith('"'): continue elif not external_part.startswith("\\") and external_part != "-r": # path of the external relative to dir_path path = external_part.replace("\\\\", "\\") if path == external_part: path = external_part.replace("\\", "") if path.startswith("./"): path = path.replace("./", "", 1) prev_part = external_part if "@" in external_url: # try to extract revision number if external URL is in the form # http://svn.example.org/repos/test/path@XXX url, revision_s = external_url.rsplit("@", maxsplit=1) try: # ensure revision_s can be parsed to int rev = int(revision_s) # -r XXX takes priority over @XXX revision = revision or rev external_url = url except ValueError: # handle URL like http://user@svn.example.org/ pass + + if not external_url or not path: + raise ValueError(f"Failed to parse external definition '{external}'") + return (path.rstrip("/"), external_url, revision, relative_url) def is_recursive_external( origin_url: str, dir_path: str, external_path: str, external_url: str ) -> bool: """ Check if an external definition can lead to a recursive subversion export operation (https://issues.apache.org/jira/browse/SVN-1703). Args: origin_url: repository URL dir_path: path of the directory where external is defined external_path: path of the external relative to the directory external_url: external URL Returns: Whether the external definition is recursive """ + assert external_url parsed_origin_url = urlparse(origin_url) parsed_external_url = urlparse(external_url) external_url = urlunparse( parsed_external_url._replace(scheme=parsed_origin_url.scheme) ) return svn_urljoin(origin_url, quote(dir_path), quote(external_path)).startswith( external_url )