diff --git a/swh/loader/svn/tests/test_utils.py b/swh/loader/svn/tests/test_utils.py index 0bbb38d..8af014d 100644 --- a/swh/loader/svn/tests/test_utils.py +++ b/swh/loader/svn/tests/test_utils.py @@ -1,395 +1,401 @@ # Copyright (C) 2016-2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import logging import os import pty import shutil from subprocess import Popen import pytest from swh.loader.svn import utils def test_outputstream(): stdout_r, stdout_w = pty.openpty() echo = Popen(["echo", "-e", "foo\nbar\nbaz"], stdout=stdout_w) os.close(stdout_w) stdout_stream = utils.OutputStream(stdout_r) lines = [] while True: current_lines, readable = stdout_stream.read_lines() lines += current_lines if not readable: break echo.wait() os.close(stdout_r) assert lines == ["foo", "bar", "baz"] def test_init_svn_repo_from_dump(datadir, tmp_path): """Mounting svn repository out of a dump is ok""" dump_name = "penguinsdbtools2018.dump.gz" dump_path = os.path.join(datadir, dump_name) tmp_repo, repo_path = utils.init_svn_repo_from_dump( dump_path, gzip=True, cleanup_dump=False, root_dir=tmp_path ) assert os.path.exists(dump_path), "Dump path should still exists" assert os.path.exists(repo_path), "Repository should exists" def test_init_svn_repo_from_dump_and_cleanup(datadir, tmp_path): """Mounting svn repository with a dump cleanup after is ok""" dump_name = "penguinsdbtools2018.dump.gz" dump_ori_path = os.path.join(datadir, dump_name) dump_path = os.path.join(tmp_path, dump_name) shutil.copyfile(dump_ori_path, dump_path) assert os.path.exists(dump_path) assert os.path.exists(dump_ori_path) tmp_repo, repo_path = utils.init_svn_repo_from_dump( dump_path, gzip=True, root_dir=tmp_path ) assert not os.path.exists(dump_path), "Dump path should no longer exists" assert os.path.exists(repo_path), "Repository should exists" assert os.path.exists(dump_ori_path), "Original dump path should still exists" def test_init_svn_repo_from_dump_and_cleanup_already_done( datadir, tmp_path, mocker, caplog ): """Mounting svn repository out of a dump is ok""" caplog.set_level(logging.INFO, "swh.loader.svn.utils") dump_name = "penguinsdbtools2018.dump.gz" dump_ori_path = os.path.join(datadir, dump_name) mock_remove = mocker.patch("os.remove") mock_remove.side_effect = FileNotFoundError dump_path = os.path.join(tmp_path, dump_name) shutil.copyfile(dump_ori_path, dump_path) assert os.path.exists(dump_path) assert os.path.exists(dump_ori_path) tmp_repo, repo_path = utils.init_svn_repo_from_dump( dump_path, gzip=True, root_dir=tmp_path ) assert os.path.exists(repo_path), "Repository should exists" assert os.path.exists(dump_ori_path), "Original dump path should still exists" assert len(caplog.record_tuples) == 1 assert "Failure to remove" in caplog.record_tuples[0][2] assert mock_remove.called def test_init_svn_repo_from_archive_dump(datadir, tmp_path): """Mounting svn repository out of an archive dump is ok""" dump_name = "penguinsdbtools2018.dump.gz" dump_path = os.path.join(datadir, dump_name) tmp_repo, repo_path = utils.init_svn_repo_from_archive_dump( dump_path, cleanup_dump=False, root_dir=tmp_path ) assert os.path.exists(dump_path), "Dump path should still exists" assert os.path.exists(repo_path), "Repository should exists" def test_init_svn_repo_from_archive_dump_and_cleanup(datadir, tmp_path): """Mounting svn repository out of a dump is ok""" dump_name = "penguinsdbtools2018.dump.gz" dump_ori_path = os.path.join(datadir, dump_name) dump_path = os.path.join(tmp_path, dump_name) shutil.copyfile(dump_ori_path, dump_path) assert os.path.exists(dump_path) assert os.path.exists(dump_ori_path) tmp_repo, repo_path = utils.init_svn_repo_from_archive_dump( dump_path, root_dir=tmp_path ) assert not os.path.exists(dump_path), "Dump path should no longer exists" assert os.path.exists(repo_path), "Repository should exists" assert os.path.exists(dump_ori_path), "Original dump path should still exists" @pytest.mark.parametrize( "base_url, paths_to_join, expected_result", [ ( "https://svn.example.org", ["repos", "test"], "https://svn.example.org/repos/test", ), ( "https://svn.example.org/", ["repos", "test"], "https://svn.example.org/repos/test", ), ( "https://svn.example.org/foo", ["repos", "test"], "https://svn.example.org/foo/repos/test", ), ( "https://svn.example.org/foo/", ["/repos", "test/"], "https://svn.example.org/foo/repos/test", ), ("https://svn.example.org/foo", ["../bar"], "https://svn.example.org/bar",), ], ) def test_svn_urljoin(base_url, paths_to_join, expected_result): assert utils.svn_urljoin(base_url, *paths_to_join) == expected_result @pytest.mark.parametrize( "external, dir_path, repo_url, expected_result", [ # subversion < 1.5 ( "third-party/sounds http://svn.example.com/repos/sounds", "trunk/externals", "http://svn.example.org/repos/test", ("third-party/sounds", "http://svn.example.com/repos/sounds", None, False), ), ( "third-party/skins -r148 http://svn.example.com/skinproj", "trunk/externals", "http://svn.example.org/repos/test", ("third-party/skins", "http://svn.example.com/skinproj", 148, False), ), ( "third-party/skins/toolkit -r21 http://svn.example.com/skin-maker", "trunk/externals", "http://svn.example.org/repos/test", ( "third-party/skins/toolkit", "http://svn.example.com/skin-maker", 21, False, ), ), # subversion >= 1.5 ( " http://svn.example.com/repos/sounds third-party/sounds", "trunk/externals", "http://svn.example.org/repos/test", ("third-party/sounds", "http://svn.example.com/repos/sounds", None, False), ), ( "-r148 http://svn.example.com/skinproj third-party/skins", "trunk/externals", "http://svn.example.org/repos/test", ("third-party/skins", "http://svn.example.com/skinproj", 148, False), ), ( "-r 21 http://svn.example.com/skin-maker third-party/skins/toolkit", "trunk/externals", "http://svn.example.org/repos/test", ( "third-party/skins/toolkit", "http://svn.example.com/skin-maker", 21, False, ), ), ( "http://svn.example.com/repos/sounds third-party/sounds", "trunk/externals", "http://svn.example.org/repos/test", ("third-party/sounds", "http://svn.example.com/repos/sounds", None, False), ), ( "http://svn.example.com/skinproj@148 third-party/skins", "trunk/externals", "http://svn.example.org/repos/test", ("third-party/skins", "http://svn.example.com/skinproj", 148, False), ), ( "http://anon:anon@svn.example.com/skin-maker@21 third-party/skins/toolkit", "trunk/externals", "http://svn.example.org/repos/test", ( "third-party/skins/toolkit", "http://anon:anon@svn.example.com/skin-maker", 21, False, ), ), ( "-r21 http://anon:anon@svn.example.com/skin-maker third-party/skins/toolkit", # noqa "trunk/externals", "http://svn.example.org/repos/test", ( "third-party/skins/toolkit", "http://anon:anon@svn.example.com/skin-maker", 21, False, ), ), ( "-r21 http://anon:anon@svn.example.com/skin-maker@21 third-party/skins/toolkit", # noqa "trunk/externals", "http://svn.example.org/repos/test", ( "third-party/skins/toolkit", "http://anon:anon@svn.example.com/skin-maker", 21, False, ), ), # subversion >= 1.5, relative external definitions ( "^/sounds third-party/sounds", "trunk/externals", "http://svn.example.org/repos/test", ( "third-party/sounds", "http://svn.example.org/repos/test/sounds", None, False, ), ), ( "/skinproj@148 third-party/skins", "trunk/externals", "http://svn.example.org/repos/test", ("third-party/skins", "http://svn.example.org/skinproj", 148, True), ), ( "//svn.example.com/skin-maker@21 third-party/skins/toolkit", "trunk/externals", "http://svn.example.org/repos/test", ( "third-party/skins/toolkit", "http://svn.example.com/skin-maker", 21, True, ), ), ( "../skins skins", "trunk/externals", "http://svn.example.org/repos/test", ("skins", "http://svn.example.org/repos/test/trunk/skins", None, False), ), ( "../skins skins", "trunk/externals", "http://svn.example.org/repos/test", ("skins", "http://svn.example.org/repos/test/trunk/skins", None, False), ), # subversion >= 1.6 ( 'http://svn.thirdparty.com/repos/My%20Project "My Project"', "trunk/externals", "http://svn.example.org/repos/test", ("My Project", "http://svn.thirdparty.com/repos/My%20Project", None, False), ), ( 'http://svn.thirdparty.com/repos/My%20%20%20Project "My Project"', "trunk/externals", "http://svn.example.org/repos/test", ( "My Project", "http://svn.thirdparty.com/repos/My%20%20%20Project", None, False, ), ), ( 'http://svn.thirdparty.com/repos/%22Quotes%20Too%22 \\"Quotes\\ Too\\"', "trunk/externals", "http://svn.example.org/repos/test", ( '"Quotes Too"', "http://svn.thirdparty.com/repos/%22Quotes%20Too%22", None, False, ), ), ( 'http://svn.thirdparty.com/repos/%22Quotes%20%20%20Too%22 \\"Quotes\\ \\ \\ Too\\"', # noqa "trunk/externals", "http://svn.example.org/repos/test", ( '"Quotes Too"', "http://svn.thirdparty.com/repos/%22Quotes%20%20%20Too%22", None, False, ), ), # edge cases ( '-r1 http://svn.thirdparty.com/repos/test "trunk/PluginFramework"', "trunk/externals", "http://svn.example.org/repos/test", ("trunk/PluginFramework", "http://svn.thirdparty.com/repos/test", 1, False), ), ( "external -r 9 http://svn.thirdparty.com/repos/test", "tags", "http://svn.example.org/repos/test", ("external", "http://svn.thirdparty.com/repos/test", 9, False), ), ( "./external http://svn.thirdparty.com/repos/test", "tags", "http://svn.example.org/repos/test", ("external", "http://svn.thirdparty.com/repos/test", None, False), ), + ( + ".external http://svn.thirdparty.com/repos/test", + "tags", + "http://svn.example.org/repos/test", + (".external", "http://svn.thirdparty.com/repos/test", None, False), + ), ( "external/ http://svn.thirdparty.com/repos/test", "tags", "http://svn.example.org/repos/test", ("external", "http://svn.thirdparty.com/repos/test", None, False), ), ( "external ttp://svn.thirdparty.com/repos/test", "tags", "http://svn.example.org/repos/test", ("external", "ttp://svn.thirdparty.com/repos/test", None, False), ), ( "external http//svn.thirdparty.com/repos/test", "tags", "http://svn.example.org/repos/test", ("external", "http//svn.thirdparty.com/repos/test", None, False), ), ( "C:\\code\\repo\\external http://svn.thirdparty.com/repos/test", "tags", "http://svn.example.org/repos/test", ("C:coderepoexternal", "http://svn.thirdparty.com/repos/test", None, False), ), ( "C:\\\\code\\\\repo\\\\external http://svn.thirdparty.com/repos/test", "tags", "http://svn.example.org/repos/test", ( "C:\\code\\repo\\external", "http://svn.thirdparty.com/repos/test", None, False, ), ), ], ) def test_parse_external_definition(external, dir_path, repo_url, expected_result): assert ( utils.parse_external_definition(external, dir_path, repo_url) == expected_result ) diff --git a/swh/loader/svn/utils.py b/swh/loader/svn/utils.py index 7761f18..629f476 100644 --- a/swh/loader/svn/utils.py +++ b/swh/loader/svn/utils.py @@ -1,320 +1,321 @@ # Copyright (C) 2016-2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import errno import logging import os import re import shutil from subprocess import PIPE, Popen, call import tempfile from typing import Optional, Tuple from urllib.parse import quote, urlparse, urlunparse logger = logging.getLogger(__name__) class OutputStream: """Helper class to read lines from a program output while it is running Args: fileno (int): File descriptor of a program output stream opened in text mode """ def __init__(self, fileno): self._fileno = fileno self._buffer = "" def read_lines(self): """ Read available lines from the output stream and return them. Returns: Tuple[List[str], bool]: A tuple whose first member is the read lines and second member a boolean indicating if there are still some other lines available to read. """ try: output = os.read(self._fileno, 1000).decode() except OSError as e: if e.errno != errno.EIO: raise output = "" output = output.replace("\r\n", "\n") lines = output.split("\n") lines[0] = self._buffer + lines[0] if output: self._buffer = lines[-1] return (lines[:-1], True) else: self._buffer = "" if len(lines) == 1 and not lines[0]: lines = [] return (lines, False) def init_svn_repo_from_dump( dump_path: str, prefix: Optional[str] = None, suffix: Optional[str] = None, root_dir: str = "/tmp", gzip: bool = False, cleanup_dump: bool = True, ) -> Tuple[str, str]: """Given a path to a svn dump, initialize an svn repository with the content of said dump. Args: dump_path: The dump to the path prefix: optional prefix file name for the working directory suffix: optional suffix file name for the working directory root_dir: the root directory where the working directory is created gzip: Boolean to determine whether we treat the dump as compressed or not. cleanup_dump: Whether we want this function call to clean up the dump at the end of the repository initialization. Raises: ValueError in case of failure to run the command to uncompress and load the dump. Returns: A tuple: - temporary folder: containing the mounted repository - repo_path: path to the mounted repository inside the temporary folder """ project_name = os.path.basename(os.path.dirname(dump_path)) temp_dir = tempfile.mkdtemp(prefix=prefix, suffix=suffix, dir=root_dir) try: repo_path = os.path.join(temp_dir, project_name) # create the repository that will be loaded with the dump cmd = ["svnadmin", "create", repo_path] r = call(cmd) if r != 0: raise ValueError( "Failed to initialize empty svn repo for %s" % project_name ) read_dump_cmd = ["cat", dump_path] if gzip: read_dump_cmd = ["gzip", "-dc", dump_path] with Popen(read_dump_cmd, stdout=PIPE) as dump: # load dump and bypass properties validation as Unicode decoding errors # are already handled in loader implementation (see _ra_codecs_error_handler # in ra.py) cmd = ["svnadmin", "load", "-q", "--bypass-prop-validation", repo_path] r = call(cmd, stdin=dump.stdout) if r != 0: raise ValueError( "Failed to mount the svn dump for project %s" % project_name ) return temp_dir, repo_path except Exception as e: shutil.rmtree(temp_dir) raise e finally: if cleanup_dump: try: # At this time, the temporary svn repository is mounted from the dump or # the svn repository failed to mount. Either way, we can drop the dump. os.remove(dump_path) assert not os.path.exists(dump_path) except OSError as e: logger.warn("Failure to remove the dump %s: %s", dump_path, e) def init_svn_repo_from_archive_dump( archive_path: str, prefix: Optional[str] = None, suffix: Optional[str] = None, root_dir: str = "/tmp", cleanup_dump: bool = True, ) -> Tuple[str, str]: """Given a path to an archive containing an svn dump, initializes an svn repository with the content of the uncompressed dump. Args: archive_path: The archive svn dump path prefix: optional prefix file name for the working directory suffix: optional suffix file name for the working directory root_dir: the root directory where the working directory is created gzip: Boolean to determine whether we treat the dump as compressed or not. cleanup_dump: Whether we want this function call to clean up the dump at the end of the repository initialization. Raises: ValueError in case of failure to run the command to uncompress and load the dump. Returns: A tuple: - temporary folder: containing the mounted repository - repo_path: path to the mounted repository inside the temporary folder """ return init_svn_repo_from_dump( archive_path, prefix=prefix, suffix=suffix, root_dir=root_dir, gzip=True, cleanup_dump=cleanup_dump, ) def svn_urljoin(base_url: str, *args) -> str: """Join a base URL and a list of paths in a SVN way. For instance: - svn_urljoin("http://example.org", "foo", "bar") will return "https://example.org/foo/bar - svn_urljoin("http://example.org/foo", "../bar") will return "https://example.org/bar Args: base_url: Base URL to join paths with args: path components Returns: The joined URL """ parsed_url = urlparse(base_url) path = os.path.abspath( os.path.join(parsed_url.path or "/", *[arg.strip("/") for arg in args]) ) return f"{parsed_url.scheme}://{parsed_url.netloc}{path}" def parse_external_definition( external: str, dir_path: str, repo_url: str ) -> Tuple[str, str, Optional[int], bool]: """Parse a subversion external definition. Args: external: an external definition, extracted from the lines split of a svn:externals property value dir_path: The path of the directory in the subversion repository where the svn:externals property was set repo_url: URL of the subversion repository Returns: A tuple with the following members: - path relative to dir_path where the external should be exported - URL of the external to export - optional revision of the external to export - boolean indicating if the external URL is relative to the repository URL and targets a path not in the repository """ path = "" external_url = "" revision = None relative_url = False prev_part = None # turn multiple spaces into a single one and split on space for external_part in external.split(): if prev_part == "-r": # parse revision in the form "-r XXX" revision = int(external_part) elif external_part.startswith("-r") and external_part != "-r": # parse revision in the form "-rXXX" revision = int(external_part[2:]) elif external_part.startswith("^/"): # URL relative to the root of the repository in which the svn:externals # property is versioned external_url = svn_urljoin(repo_url, external_part[2:]) elif external_part.startswith("//"): # URL relative to the scheme of the URL of the directory on which the # svn:externals property is set scheme = urlparse(repo_url).scheme external_url = f"{scheme}:{external_part}" relative_url = not external_url.startswith(repo_url) elif external_part.startswith("/"): # URL relative to the root URL of the server on which the svn:externals # property is versioned parsed_url = urlparse(repo_url) root_url = f"{parsed_url.scheme}://{parsed_url.netloc}" external_url = svn_urljoin(root_url, external_part) relative_url = not external_url.startswith(repo_url) elif external_part.startswith("../"): # URL relative to the URL of the directory on which the svn:externals # property is set external_url = svn_urljoin(repo_url, dir_path, external_part) relative_url = not external_url.startswith(repo_url) elif re.match(r"^.*:*//.*", external_part): # absolute external URL external_url = external_part # subversion >= 1.6 added a quoting and escape mechanism to the syntax so # that the path of the external working copy may contain whitespace. elif external_part.startswith('\\"'): external_split = external.split('\\"') path = [ e.replace("\\ ", " ") for e in external_split if e.startswith(external_part[2:]) ][0] path = f'"{path}"' elif external_part.endswith('\\"'): continue elif external_part.startswith('"'): external_split = external.split('"') path_prefix = external_part.strip('"') path = next(iter([e for e in external_split if e.startswith(path_prefix)])) elif external_part.endswith('"'): continue elif not external_part.startswith("\\") and external_part != "-r": # path of the external relative to dir_path path = external_part.replace("\\\\", "\\") if path == external_part: path = external_part.replace("\\", "") - path = path.lstrip("./") + if path.startswith("./"): + path = path.replace("./", "", 1) prev_part = external_part if "@" in external_url: # try to extract revision number if external URL is in the form # http://svn.example.org/repos/test/path@XXX url, revision_s = external_url.rsplit("@", maxsplit=1) try: revision = int(revision_s) external_url = url except ValueError: # handle URL like http://user@svn.example.org/ pass return (path.rstrip("/"), external_url, revision, relative_url) def is_recursive_external( origin_url: str, dir_path: str, external_path: str, external_url: str ) -> bool: """ Check if an external definition can lead to a recursive subversion export operation (https://issues.apache.org/jira/browse/SVN-1703). Args: origin_url: repository URL dir_path: path of the directory where external is defined external_path: path of the external relative to the directory external_url: external URL Returns: Whether the external definition is recursive """ parsed_origin_url = urlparse(origin_url) parsed_external_url = urlparse(external_url) external_url = urlunparse( parsed_external_url._replace(scheme=parsed_origin_url.scheme) ) return svn_urljoin(origin_url, quote(dir_path), quote(external_path)).startswith( external_url )