Page MenuHomeSoftware Heritage

No OneTemporary

diff --git a/swh/loader/svn/svn.py b/swh/loader/svn/svn.py
index df6dad4..007aadc 100644
--- a/swh/loader/svn/svn.py
+++ b/swh/loader/svn/svn.py
@@ -1,613 +1,613 @@
# Copyright (C) 2015-2022 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
"""SVN client in charge of iterating over svn logs and yield commit
representations including the hash tree/content computations per svn
commit.
"""
import logging
import os
import shutil
import tempfile
from typing import Dict, Iterator, List, Optional, Tuple, Union
-from urllib.parse import urlparse, urlunparse
+from urllib.parse import quote, urlparse, urlunparse
from subvertpy import SubversionException, client, properties, wc
from subvertpy.ra import (
Auth,
RemoteAccess,
get_simple_prompt_provider,
get_username_provider,
)
from swh.model.from_disk import Directory as DirectoryFromDisk
from swh.model.model import (
Content,
Directory,
Person,
SkippedContent,
TimestampWithTimezone,
)
from . import converters, replay
from .svn_retry import svn_retry
from .utils import is_recursive_external, parse_external_definition
# When log message contains empty data
DEFAULT_AUTHOR_MESSAGE = ""
logger = logging.getLogger(__name__)
def quote_svn_url(url: str) -> str:
- return url.replace(" ", "%20").replace("#", "%23")
+ return quote(url, safe="/:!$&'()*+,=@")
class SvnRepo:
"""Svn repository representation.
Args:
remote_url: Remove svn repository url
origin_url: Associated origin identifier
local_dirname: Path to write intermediary svn action results
"""
def __init__(
self,
remote_url: str,
origin_url: str,
local_dirname: str,
max_content_length: int,
from_dump: bool = False,
debug: bool = False,
):
self.origin_url = origin_url
self.from_dump = from_dump
# default auth provider for anonymous access
auth_providers = [get_username_provider()]
# check if basic auth is required
parsed_origin_url = urlparse(origin_url)
self.username = parsed_origin_url.username or ""
self.password = parsed_origin_url.password or ""
if self.username:
# add basic auth provider for username/password
auth_providers.append(
get_simple_prompt_provider(
lambda realm, uname, may_save: (
self.username,
self.password,
False,
),
0,
)
)
# we need to remove the authentication part in the origin URL to avoid
# errors when calling subversion API through subvertpy
self.origin_url = urlunparse(
parsed_origin_url._replace(
netloc=parsed_origin_url.netloc.split("@", 1)[1]
)
)
if origin_url == remote_url:
remote_url = self.origin_url
self.remote_url = remote_url.rstrip("/")
auth = Auth(auth_providers)
# one client for update operation
self.client = client.Client(auth=auth)
if not self.remote_url.startswith("file://"):
# use redirection URL if any for remote operations
self.remote_url = self.info(self.remote_url).url
# one connection for log iteration
self.conn_log = self.remote_access(auth)
# another for replay
self.conn = self.remote_access(auth)
if not self.from_dump:
self.remote_url = self.info(self.remote_url).repos_root_url
self.local_dirname = local_dirname
local_name = os.path.basename(self.remote_url)
self.local_url = os.path.join(self.local_dirname, local_name).encode("utf-8")
self.uuid = self.conn.get_uuid().encode("utf-8")
self.swhreplay = replay.Replay(
conn=self.conn,
rootpath=self.local_url,
svnrepo=self,
temp_dir=local_dirname,
debug=debug,
)
self.max_content_length = max_content_length
self.has_relative_externals = False
self.has_recursive_externals = False
self.replay_started = False
# compute root directory path from the remote repository URL, required to
# properly load the sub-tree of a repository mounted from a dump file
repos_root_url = self.info(self.origin_url).repos_root_url
self.root_directory = self.origin_url.rstrip("/").replace(repos_root_url, "", 1)
def __str__(self):
return str(
{
"swh-origin": self.origin_url,
"remote_url": self.remote_url,
"local_url": self.local_url,
"uuid": self.uuid,
}
)
def head_revision(self) -> int:
"""Retrieve current head revision."""
return self.conn.get_latest_revnum()
def initial_revision(self) -> int:
"""Retrieve the initial revision from which the remote url appeared."""
return 1
def convert_commit_message(self, msg: Union[str, bytes]) -> bytes:
"""Simply encode the commit message.
Args:
msg: the commit message to convert.
Returns:
The transformed message as bytes.
"""
if isinstance(msg, bytes):
return msg
return msg.encode("utf-8")
def convert_commit_date(self, date: bytes) -> TimestampWithTimezone:
"""Convert the message commit date into a timestamp in swh format.
The precision is kept.
Args:
date: the commit date to convert.
Returns:
The transformed date.
"""
return converters.svn_date_to_swh_date(date)
def convert_commit_author(self, author: Optional[bytes]) -> Person:
"""Convert the commit author into an swh person.
Args:
author: the commit author to convert.
Returns:
Person as model object
"""
return converters.svn_author_to_swh_person(author)
def __to_entry(self, log_entry: Tuple) -> Dict:
changed_paths, rev, revprops, has_children = log_entry
author_date = self.convert_commit_date(
revprops.get(properties.PROP_REVISION_DATE)
)
author = self.convert_commit_author(
revprops.get(properties.PROP_REVISION_AUTHOR)
)
message = self.convert_commit_message(
revprops.get(properties.PROP_REVISION_LOG, DEFAULT_AUTHOR_MESSAGE)
)
has_changes = (
not self.from_dump
or changed_paths is not None
and any(
changed_path.startswith(self.root_directory)
for changed_path in changed_paths.keys()
)
)
return {
"rev": rev,
"author_date": author_date,
"author_name": author,
"message": message,
"has_changes": has_changes,
"changed_paths": changed_paths,
}
def logs(self, revision_start: int, revision_end: int) -> Iterator[Dict]:
"""Stream svn logs between revision_start and revision_end by chunks of
block_size logs.
Yields revision and associated revision information between the
revision start and revision_end.
Args:
revision_start: the svn revision starting bound
revision_end: the svn revision ending bound
Yields:
tuple: tuple of revisions and logs:
- revisions: list of revisions in order
- logs: Dictionary with key revision number and value the log
entry. The log entry is a dictionary with the following keys:
- author_date: date of the commit
- author_name: name of the author
- message: commit message
"""
for log_entry in self.conn_log.iter_log(
paths=None,
start=revision_start,
end=revision_end,
discover_changed_paths=True,
):
yield self.__to_entry(log_entry)
@svn_retry()
def commit_info(self, revision: int) -> Optional[Dict]:
"""Return commit information.
Args:
revision: svn revision to return commit info
Returns:
A dictionary filled with commit info, see :meth:`swh.loader.svn.svn.logs`
for details about its content.
"""
return next(self.logs(revision, revision), None)
@svn_retry()
def remote_access(self, auth: Auth) -> RemoteAccess:
"""Simple wrapper around subvertpy.ra.RemoteAccess creation
enabling to retry the operation if a network error occurs."""
return RemoteAccess(self.remote_url, auth=auth)
@svn_retry()
def info(self, origin_url: str):
"""Simple wrapper around subvertpy.client.Client.info enabling to retry
the command if a network error occurs."""
info = self.client.info(quote_svn_url(origin_url).rstrip("/"))
return next(iter(info.values()))
@svn_retry()
def export(
self,
url: str,
to: str,
rev: Optional[int] = None,
peg_rev: Optional[int] = None,
recurse: bool = True,
ignore_externals: bool = False,
overwrite: bool = False,
ignore_keywords: bool = False,
) -> int:
"""Simple wrapper around subvertpy.client.Client.export enabling to retry
the command if a network error occurs.
See documentation of svn_client_export5 function from subversion C API
to get details about parameters.
"""
# remove export path as command can be retried
if os.path.isfile(to) or os.path.islink(to):
os.remove(to)
elif os.path.isdir(to):
shutil.rmtree(to)
options = []
if rev is not None:
options.append(f"-r {rev}")
if recurse:
options.append("--depth infinity")
if ignore_externals:
options.append("--ignore-externals")
if overwrite:
options.append("--force")
if ignore_keywords:
options.append("--ignore-keywords")
logger.debug(
"svn export %s %s%s %s",
" ".join(options),
quote_svn_url(url),
f"@{peg_rev}" if peg_rev else "",
to,
)
return self.client.export(
quote_svn_url(url),
to=to,
rev=rev,
peg_rev=peg_rev,
recurse=recurse,
ignore_externals=ignore_externals,
overwrite=overwrite,
ignore_keywords=ignore_keywords,
)
@svn_retry()
def checkout(
self,
url: str,
path: str,
rev: Optional[int] = None,
peg_rev: Optional[int] = None,
recurse: bool = True,
ignore_externals: bool = False,
allow_unver_obstructions: bool = False,
) -> int:
"""Simple wrapper around subvertpy.client.Client.checkout enabling to retry
the command if a network error occurs.
See documentation of svn_client_checkout3 function from subversion C API
to get details about parameters.
"""
if os.path.isdir(os.path.join(path, ".svn")):
# cleanup checkout path as command can be retried and svn working copy might
# be locked
wc.cleanup(path)
elif os.path.isdir(path):
# recursively remove checkout path otherwise if it is not a svn working copy
shutil.rmtree(path)
options = []
if rev is not None:
options.append(f"-r {rev}")
if recurse:
options.append("--depth infinity")
if ignore_externals:
options.append("--ignore-externals")
logger.debug(
"svn checkout %s %s%s %s",
" ".join(options),
quote_svn_url(url),
f"@{peg_rev}" if peg_rev else "",
path,
)
return self.client.checkout(
quote_svn_url(url),
path=path,
rev=rev,
peg_rev=peg_rev,
recurse=recurse,
ignore_externals=ignore_externals,
allow_unver_obstructions=allow_unver_obstructions,
)
@svn_retry()
def propget(
self,
name: str,
target: str,
peg_rev: Optional[int],
rev: Optional[int] = None,
recurse: bool = False,
):
"""Simple wrapper around subvertpy.client.Client.propget enabling to retry
the command if a network error occurs.
See documentation of svn_client_propget5 function from subversion C API
to get details about parameters.
"""
return self.client.propget(name, target, peg_rev, rev, recurse)
def export_temporary(self, revision: int) -> Tuple[str, bytes]:
"""Export the repository to a given revision in a temporary location. This is up
to the caller of this function to clean up the temporary location when done (cf.
self.clean_fs method)
Args:
revision: Revision to export at
Returns:
The tuple local_dirname the temporary location root
folder, local_url where the repository was exported.
"""
local_dirname = tempfile.mkdtemp(
dir=self.local_dirname, prefix=f"check-revision-{revision}."
)
local_name = os.path.basename(self.remote_url)
local_url = os.path.join(local_dirname, local_name)
url = self.remote_url
# if some paths have external URLs relative to the repository URL but targeting
# paths outside it, we need to export from the origin URL as the remote URL can
# target a dump mounted on the local filesystem
if self.replay_started and self.has_relative_externals:
# externals detected while replaying revisions
url = self.origin_url
elif not self.replay_started:
# revisions replay has not started, we need to check if svn:externals
# properties are set from a checkout of the revision and if some
# external URLs are relative to pick the right export URL,
# recursive externals are also checked
with tempfile.TemporaryDirectory(
dir=self.local_dirname, prefix=f"checkout-revision-{revision}."
) as co_dirname:
self.checkout(
self.remote_url, co_dirname, revision, ignore_externals=True
)
# get all svn:externals properties recursively
externals = self.propget("svn:externals", co_dirname, None, None, True)
self.has_relative_externals = False
self.has_recursive_externals = False
for path, external_defs in externals.items():
if self.has_relative_externals or self.has_recursive_externals:
break
path = path.replace(self.remote_url.rstrip("/") + "/", "")
for external_def in os.fsdecode(external_defs).split("\n"):
# skip empty line or comment
if not external_def or external_def.startswith("#"):
continue
(
external_path,
external_url,
_,
relative_url,
) = parse_external_definition(
external_def.rstrip("\r"), path, self.origin_url
)
if is_recursive_external(
self.origin_url,
path,
external_path,
external_url,
):
self.has_recursive_externals = True
url = self.remote_url
break
if relative_url:
self.has_relative_externals = True
url = self.origin_url
break
try:
url = url.rstrip("/")
self.export(
url,
to=local_url,
rev=revision,
ignore_keywords=True,
ignore_externals=self.has_recursive_externals,
)
except SubversionException as se:
if se.args[0].startswith(
(
"Error parsing svn:externals property",
"Unrecognized format for the relative external URL",
)
):
pass
else:
raise
# exported paths are relative to the repository root path so we need to
# adjust the URL of the exported filesystem
root_dir_local_url = os.path.join(local_url, self.root_directory.strip("/"))
# check that root directory of a subproject did not get removed in revision
if os.path.exists(root_dir_local_url):
local_url = root_dir_local_url
return local_dirname, os.fsencode(local_url)
def swh_hash_data_per_revision(
self, start_revision: int, end_revision: int
) -> Iterator[
Tuple[
int,
Dict,
Tuple[List[Content], List[SkippedContent], List[Directory]],
DirectoryFromDisk,
],
]:
"""Compute swh hash data per each revision between start_revision and
end_revision.
Args:
start_revision: starting revision
end_revision: ending revision
Yields:
Tuple (rev, nextrev, commit, objects_per_path):
- rev: current revision
- commit: commit data (author, date, message) for such revision
- objects_per_path: Tuple of list of objects between start_revision and
end_revision
- complete Directory representation
"""
# even in incremental loading mode, we need to replay the whole set of
# path modifications from first revision to restore possible file states induced
# by setting svn properties on those files (end of line style for instance)
self.replay_started = True
first_revision = 1 if start_revision else 0 # handle empty repository edge case
for commit in self.logs(first_revision, end_revision):
rev = commit["rev"]
copyfrom_revs = (
[
copyfrom_rev
for (_, _, copyfrom_rev, _) in commit["changed_paths"].values()
if copyfrom_rev != -1
]
if commit["changed_paths"]
else None
)
low_water_mark = rev + 1
if copyfrom_revs:
# when files or directories in the revision to replay have been copied from
# ancestor revisions, we need to adjust the low water mark revision used by
# svn replay API to handle the copies in our commit editor and to ensure
# replace operations after copy will be replayed
low_water_mark = min(copyfrom_revs)
objects = self.swhreplay.compute_objects(rev, low_water_mark)
if rev >= start_revision:
# start yielding new data to archive once we reached the revision to
# resume the loading from
if commit["has_changes"] or start_revision == 0:
# yield data only if commit has changes or if repository is empty
root_dir_path = self.root_directory.encode()[1:]
if not root_dir_path or root_dir_path in self.swhreplay.directory:
root_dir = self.swhreplay.directory[root_dir_path]
else:
# root directory of subproject got removed in revision, return
# empty directory for that edge case
root_dir = DirectoryFromDisk()
yield rev, commit, objects, root_dir
def swh_hash_data_at_revision(
self, revision: int
) -> Tuple[Dict, DirectoryFromDisk]:
"""Compute the information at a given svn revision. This is expected to be used
for checks only.
Yields:
The tuple (commit dictionary, targeted directory object).
"""
# Update disk representation of the repository at revision id
local_dirname, local_url = self.export_temporary(revision)
# Compute the current hashes on disk
directory = DirectoryFromDisk.from_disk(
path=local_url, max_content_length=self.max_content_length
)
# Retrieve the commit information for revision
commit = self.commit_info(revision)
# Clean export directory
self.clean_fs(local_dirname)
return commit, directory
def clean_fs(self, local_dirname: Optional[str] = None) -> None:
"""Clean up the local working copy.
Args:
local_dirname: Path to remove recursively if provided. Otherwise, remove the
temporary upper root tree used for svn repository loading.
"""
dirname = local_dirname or self.local_dirname
if os.path.exists(dirname):
logger.debug("cleanup %s", dirname)
shutil.rmtree(dirname)
diff --git a/swh/loader/svn/tests/test_loader.py b/swh/loader/svn/tests/test_loader.py
index 9a82e8d..947cdf5 100644
--- a/swh/loader/svn/tests/test_loader.py
+++ b/swh/loader/svn/tests/test_loader.py
@@ -1,2430 +1,2434 @@
# Copyright (C) 2016-2022 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
+import itertools
import logging
import os
import shutil
import subprocess
import textwrap
from typing import Any, Dict
import pytest
from subvertpy import SubversionException
from swh.loader.svn.loader import (
SvnLoader,
SvnLoaderFromDumpArchive,
SvnLoaderFromRemoteDump,
)
from swh.loader.svn.svn import SvnRepo
from swh.loader.svn.utils import init_svn_repo_from_dump
from swh.loader.tests import (
assert_last_visit_matches,
check_snapshot,
get_stats,
prepare_repository_from_archive,
)
from swh.model.from_disk import DentryPerms, Directory
from swh.model.hashutil import hash_to_bytes
from swh.model.model import Snapshot, SnapshotBranch, TargetType
from .utils import CommitChange, CommitChangeType, add_commit
GOURMET_SNAPSHOT = Snapshot(
id=hash_to_bytes("889cacc2731e3312abfb2b1a0c18ade82a949e07"),
branches={
b"HEAD": SnapshotBranch(
target=hash_to_bytes("4876cb10aec6f708f7466dddf547567b65f6c39c"),
target_type=TargetType.REVISION,
)
},
)
GOURMET_UPDATES_SNAPSHOT = Snapshot(
id=hash_to_bytes("11086d15317014e43d2438b7ffc712c44f1b8afe"),
branches={
b"HEAD": SnapshotBranch(
target=hash_to_bytes("171dc35522bfd17dda4e90a542a0377fb2fc707a"),
target_type=TargetType.REVISION,
)
},
)
def test_loader_svn_not_found_no_mock(swh_storage, tmp_path):
"""Given an unknown repository, the loader visit ends up in status not_found"""
repo_url = "unknown-repository"
loader = SvnLoader(swh_storage, repo_url, temp_directory=tmp_path)
assert loader.load() == {"status": "uneventful"}
assert_last_visit_matches(
swh_storage,
repo_url,
status="not_found",
type="svn",
)
@pytest.mark.parametrize(
"exception_msg",
[
"Unable to connect to a repository at URL",
"Unknown URL type",
],
)
def test_loader_svn_not_found(swh_storage, tmp_path, exception_msg, mocker):
"""Given unknown repository issues, the loader visit ends up in status not_found"""
mock = mocker.patch("swh.loader.svn.loader.SvnRepo")
mock.side_effect = SubversionException(exception_msg, 0)
unknown_repo_url = "unknown-repository"
loader = SvnLoader(swh_storage, unknown_repo_url, temp_directory=tmp_path)
assert loader.load() == {"status": "uneventful"}
assert_last_visit_matches(
swh_storage,
unknown_repo_url,
status="not_found",
type="svn",
)
@pytest.mark.parametrize(
"exception",
[
SubversionException("Irrelevant message, considered a failure", 10),
SubversionException("Present but fails to read, considered a failure", 20),
ValueError("considered a failure"),
],
)
def test_loader_svn_failures(swh_storage, tmp_path, exception, mocker):
"""Given any errors raised, the loader visit ends up in status failed"""
mock = mocker.patch("swh.loader.svn.loader.SvnRepo")
mock.side_effect = exception
existing_repo_url = "existing-repo-url"
loader = SvnLoader(swh_storage, existing_repo_url, temp_directory=tmp_path)
assert loader.load() == {"status": "failed"}
assert_last_visit_matches(
swh_storage,
existing_repo_url,
status="failed",
type="svn",
)
def test_loader_svnrdump_not_found(swh_storage, tmp_path, mocker):
"""Loading from remote dump which does not exist should end up as not_found visit"""
unknown_repo_url = "file:///tmp/svn.code.sf.net/p/white-rats-studios/svn"
loader = SvnLoaderFromRemoteDump(
swh_storage, unknown_repo_url, temp_directory=tmp_path
)
assert loader.load() == {"status": "uneventful"}
assert_last_visit_matches(
swh_storage,
unknown_repo_url,
status="not_found",
type="svn",
)
def test_loader_svnrdump_no_such_revision(swh_storage, tmp_path, datadir):
"""Visit multiple times an origin with the remote loader should not raise.
It used to fail the ingestion on the second visit with a "No such revision x,
160006" message.
"""
archive_ori_dump = os.path.join(datadir, "penguinsdbtools2018.dump.gz")
archive_dump_dir = os.path.join(tmp_path, "dump")
os.mkdir(archive_dump_dir)
archive_dump = os.path.join(archive_dump_dir, "penguinsdbtools2018.dump.gz")
# loader now drops the dump as soon as it's mounted so we need to make a copy first
shutil.copyfile(archive_ori_dump, archive_dump)
loading_path = str(tmp_path / "loading")
os.mkdir(loading_path)
# Prepare the dump as a local svn repository for test purposes
temp_dir, repo_path = init_svn_repo_from_dump(
archive_dump, root_dir=tmp_path, gzip=True
)
repo_url = f"file://{repo_path}"
loader = SvnLoaderFromRemoteDump(swh_storage, repo_url, temp_directory=loading_path)
assert loader.load() == {"status": "eventful"}
actual_visit = assert_last_visit_matches(
swh_storage,
repo_url,
status="full",
type="svn",
)
check_snapshot(loader.snapshot, loader.storage)
loader2 = SvnLoaderFromRemoteDump(
swh_storage, repo_url, temp_directory=loading_path
)
# Visiting a second time the same repository should be uneventful...
assert loader2.load() == {"status": "uneventful"}
actual_visit2 = assert_last_visit_matches(
swh_storage,
repo_url,
status="full",
type="svn",
)
assert actual_visit.snapshot is not None
# ... with the same snapshot as the first visit
assert actual_visit2.snapshot == actual_visit.snapshot
def test_loader_svn_new_visit(swh_storage, datadir, tmp_path):
"""Eventful visit should yield 1 snapshot"""
archive_name = "pkg-gourmet"
archive_path = os.path.join(datadir, f"{archive_name}.tgz")
repo_url = prepare_repository_from_archive(archive_path, archive_name, tmp_path)
loader = SvnLoader(swh_storage, repo_url, temp_directory=tmp_path)
assert loader.load() == {"status": "eventful"}
assert_last_visit_matches(
loader.storage,
repo_url,
status="full",
type="svn",
snapshot=GOURMET_SNAPSHOT.id,
)
check_snapshot(loader.snapshot, loader.storage)
stats = get_stats(loader.storage)
assert stats == {
"content": 19,
"directory": 17,
"origin": 1,
"origin_visit": 1,
"release": 0,
"revision": 6,
"skipped_content": 0,
"snapshot": 1,
}
check_snapshot(GOURMET_SNAPSHOT, loader.storage)
def test_loader_svn_2_visits_no_change(swh_storage, datadir, tmp_path):
"""Visit multiple times a repository with no change should yield the same snapshot"""
archive_name = "pkg-gourmet"
archive_path = os.path.join(datadir, f"{archive_name}.tgz")
repo_url = prepare_repository_from_archive(archive_path, archive_name, tmp_path)
loader = SvnLoader(swh_storage, repo_url, temp_directory=tmp_path)
assert loader.load() == {"status": "eventful"}
visit_status1 = assert_last_visit_matches(
loader.storage,
repo_url,
status="full",
type="svn",
snapshot=GOURMET_SNAPSHOT.id,
)
check_snapshot(loader.snapshot, loader.storage)
assert loader.load() == {"status": "uneventful"}
visit_status2 = assert_last_visit_matches(
loader.storage,
repo_url,
status="full",
type="svn",
snapshot=GOURMET_SNAPSHOT.id,
)
assert visit_status1.date < visit_status2.date
assert visit_status1.snapshot == visit_status2.snapshot
stats = get_stats(loader.storage)
assert stats["origin_visit"] == 1 + 1 # computed twice the same snapshot
assert stats["snapshot"] == 1
# even starting from previous revision...
start_revision = loader.storage.revision_get(
[hash_to_bytes("95edacc8848369d6fb1608e887d6d2474fd5224f")]
)[0]
assert start_revision is not None
loader = SvnLoader(swh_storage, repo_url, temp_directory=tmp_path)
assert loader.load() == {"status": "uneventful"}
stats = get_stats(loader.storage)
assert stats["origin_visit"] == 2 + 1
# ... with no change in repository, this yields the same snapshot
assert stats["snapshot"] == 1
assert_last_visit_matches(
loader.storage,
repo_url,
status="full",
type="svn",
snapshot=GOURMET_SNAPSHOT.id,
)
def test_loader_tampered_repository(swh_storage, datadir, tmp_path):
"""In this scenario, the dump has been tampered with to modify the
commit log [1]. This results in a hash divergence which is
detected at startup after a new run for the same origin.
In effect, this will perform a complete reloading of the repository.
[1] Tampering with revision 6 log message following:
```
tar xvf pkg-gourmet.tgz # initial repository ingested
cd pkg-gourmet/
echo "Tampering with commit log message for fun and profit" > log.txt
svnadmin setlog . -r 6 log.txt --bypass-hooks
tar cvf pkg-gourmet-tampered-rev6-log.tgz pkg-gourmet/
```
"""
archive_name = "pkg-gourmet"
archive_path = os.path.join(datadir, f"{archive_name}.tgz")
repo_url = prepare_repository_from_archive(archive_path, archive_name, tmp_path)
loader = SvnLoader(swh_storage, repo_url, temp_directory=tmp_path)
assert loader.load() == {"status": "eventful"}
check_snapshot(GOURMET_SNAPSHOT, loader.storage)
assert_last_visit_matches(
loader.storage,
repo_url,
status="full",
type="svn",
snapshot=GOURMET_SNAPSHOT.id,
)
check_snapshot(loader.snapshot, loader.storage)
archive_path2 = os.path.join(datadir, "pkg-gourmet-tampered-rev6-log.tgz")
repo_tampered_url = prepare_repository_from_archive(
archive_path2, archive_name, tmp_path
)
loader2 = SvnLoader(
swh_storage, repo_tampered_url, origin_url=repo_url, temp_directory=tmp_path
)
assert loader2.load() == {"status": "eventful"}
assert_last_visit_matches(
loader2.storage,
repo_url,
status="full",
type="svn",
snapshot=hash_to_bytes("5aa61959e788e281fd6e187053d0f46c68e8d8bb"),
)
check_snapshot(loader.snapshot, loader.storage)
stats = get_stats(loader.storage)
assert stats["origin"] == 1
assert stats["origin_visit"] == 2
assert stats["snapshot"] == 2
def test_loader_svn_visit_with_changes(swh_storage, datadir, tmp_path):
"""In this scenario, the repository has been updated with new changes.
The loading visit should result in new objects stored and 1 new
snapshot.
"""
archive_name = "pkg-gourmet"
archive_path = os.path.join(datadir, f"{archive_name}.tgz")
repo_initial_url = prepare_repository_from_archive(
archive_path, archive_name, tmp_path
)
# repo_initial_url becomes the origin_url we want to visit some more below
loader = SvnLoader(swh_storage, repo_initial_url, temp_directory=tmp_path)
assert loader.load() == {"status": "eventful"}
visit_status1 = assert_last_visit_matches(
loader.storage,
repo_initial_url,
status="full",
type="svn",
snapshot=GOURMET_SNAPSHOT.id,
)
check_snapshot(GOURMET_SNAPSHOT, loader.storage)
archive_path = os.path.join(datadir, "pkg-gourmet-with-updates.tgz")
repo_updated_url = prepare_repository_from_archive(
archive_path, "pkg-gourmet", tmp_path
)
loader = SvnLoader(
swh_storage,
repo_updated_url,
origin_url=repo_initial_url,
temp_directory=tmp_path,
)
assert loader.load() == {"status": "eventful"}
visit_status2 = assert_last_visit_matches(
loader.storage,
repo_updated_url,
status="full",
type="svn",
snapshot=GOURMET_UPDATES_SNAPSHOT.id,
)
assert visit_status1.date < visit_status2.date
assert visit_status1.snapshot != visit_status2.snapshot
stats = get_stats(loader.storage)
assert stats == {
"content": 22,
"directory": 28,
"origin": 1,
"origin_visit": 2,
"release": 0,
"revision": 11,
"skipped_content": 0,
"snapshot": 2,
}
check_snapshot(GOURMET_UPDATES_SNAPSHOT, loader.storage)
# Let's start the ingestion from the start, this should yield the same result
loader = SvnLoader(
swh_storage,
repo_updated_url,
origin_url=repo_initial_url,
incremental=False,
temp_directory=tmp_path,
)
assert loader.load() == {"status": "eventful"}
visit_status3 = assert_last_visit_matches(
loader.storage,
repo_updated_url,
status="full",
type="svn",
snapshot=GOURMET_UPDATES_SNAPSHOT.id,
)
assert visit_status2.date < visit_status3.date
assert visit_status3.snapshot == visit_status2.snapshot
check_snapshot(GOURMET_UPDATES_SNAPSHOT, loader.storage)
stats = get_stats(loader.storage)
assert stats["origin"] == 1 # always the same visit
assert stats["origin_visit"] == 2 + 1 # 1 more visit
assert stats["snapshot"] == 2 # no new snapshot
def test_loader_svn_visit_start_from_revision(swh_storage, datadir, tmp_path):
"""Starting from existing revision, next visit on changed repo should yield 1 new
snapshot.
"""
archive_name = "pkg-gourmet"
archive_path = os.path.join(datadir, f"{archive_name}.tgz")
repo_initial_url = prepare_repository_from_archive(
archive_path, archive_name, tmp_path
)
# repo_initial_url becomes the origin_url we want to visit some more below
loader = SvnLoader(swh_storage, repo_initial_url, temp_directory=tmp_path)
assert loader.load() == {"status": "eventful"}
visit_status1 = assert_last_visit_matches(
loader.storage,
repo_initial_url,
status="full",
type="svn",
snapshot=GOURMET_SNAPSHOT.id,
)
check_snapshot(GOURMET_SNAPSHOT, loader.storage)
start_revision = loader.storage.revision_get(
[hash_to_bytes("95edacc8848369d6fb1608e887d6d2474fd5224f")]
)[0]
assert start_revision is not None
archive_path = os.path.join(datadir, "pkg-gourmet-with-updates.tgz")
repo_updated_url = prepare_repository_from_archive(
archive_path, "pkg-gourmet", tmp_path
)
# we'll start from start_revision
loader = SvnLoader(
swh_storage,
repo_updated_url,
origin_url=repo_initial_url,
temp_directory=tmp_path,
)
assert loader.load() == {"status": "eventful"}
# nonetheless, we obtain the same snapshot (as previous tests on that repository)
visit_status2 = assert_last_visit_matches(
loader.storage,
repo_updated_url,
status="full",
type="svn",
snapshot=GOURMET_UPDATES_SNAPSHOT.id,
)
assert visit_status1.date < visit_status2.date
assert visit_status1.snapshot != visit_status2.snapshot
stats = get_stats(loader.storage)
assert stats == {
"content": 22,
"directory": 28,
"origin": 1,
"origin_visit": 2,
"release": 0,
"revision": 11,
"skipped_content": 0,
"snapshot": 2,
}
check_snapshot(GOURMET_UPDATES_SNAPSHOT, loader.storage)
def test_loader_svn_visit_with_eol_style(swh_storage, datadir, tmp_path):
"""Check that a svn repo containing a versioned file with CRLF line
endings with svn:eol-style property set to 'native' (this is a
violation of svn specification as the file should have been
stored with LF line endings) can be loaded anyway.
"""
archive_name = "mediawiki-repo-r407-eol-native-crlf"
archive_path = os.path.join(datadir, f"{archive_name}.tgz")
repo_url = prepare_repository_from_archive(archive_path, archive_name, tmp_path)
loader = SvnLoader(swh_storage, repo_url, temp_directory=tmp_path)
assert loader.load() == {"status": "eventful"}
mediawiki_snapshot = Snapshot(
id=hash_to_bytes("d6d6e9703f157c5702d9a4a5dec878926ed4ab76"),
branches={
b"HEAD": SnapshotBranch(
target=hash_to_bytes("7da4975c363101b819756d33459f30a866d01b1b"),
target_type=TargetType.REVISION,
)
},
)
check_snapshot(mediawiki_snapshot, loader.storage)
assert_last_visit_matches(
loader.storage,
repo_url,
status="full",
type="svn",
snapshot=mediawiki_snapshot.id,
)
stats = get_stats(loader.storage)
assert stats["origin"] == 1
assert stats["origin_visit"] == 1
assert stats["snapshot"] == 1
def test_loader_svn_visit_with_mixed_crlf_lf(swh_storage, datadir, tmp_path):
"""Check that a svn repo containing a versioned file with mixed
CRLF/LF line endings with svn:eol-style property set to 'native'
(this is a violation of svn specification as mixed line endings
for textual content should not be stored when the svn:eol-style
property is set) can be loaded anyway.
"""
archive_name = "pyang-repo-r343-eol-native-mixed-lf-crlf"
archive_path = os.path.join(datadir, f"{archive_name}.tgz")
repo_url = prepare_repository_from_archive(archive_path, archive_name, tmp_path)
loader = SvnLoader(swh_storage, repo_url, temp_directory=tmp_path)
assert loader.load() == {"status": "eventful"}
pyang_snapshot = Snapshot(
id=hash_to_bytes("6d9590de11b00a5801de0ff3297c5b44bbbf7d24"),
branches={
b"HEAD": SnapshotBranch(
target=hash_to_bytes("9c6962eeb9164a636c374be700672355e34a98a7"),
target_type=TargetType.REVISION,
)
},
)
check_snapshot(pyang_snapshot, loader.storage)
assert_last_visit_matches(
loader.storage,
repo_url,
status="full",
type="svn",
snapshot=pyang_snapshot.id,
)
stats = get_stats(loader.storage)
assert stats["origin"] == 1
assert stats["origin_visit"] == 1
assert stats["snapshot"] == 1
def test_loader_svn_with_symlink(swh_storage, datadir, tmp_path):
"""Repository with symlinks should be ingested ok
Edge case:
- first create a file and commit it.
Remove it, then add folder holding the same name, commit.
- do the same scenario with symbolic link (instead of file)
"""
archive_name = "pkg-gourmet"
archive_path = os.path.join(
datadir, "pkg-gourmet-with-edge-case-links-and-files.tgz"
)
repo_url = prepare_repository_from_archive(archive_path, archive_name, tmp_path)
loader = SvnLoader(swh_storage, repo_url, temp_directory=tmp_path)
assert loader.load() == {"status": "eventful"}
gourmet_edge_cases_snapshot = Snapshot(
id=hash_to_bytes("18e60982fe521a2546ab8c3c73a535d80462d9d0"),
branches={
b"HEAD": SnapshotBranch(
target=hash_to_bytes("3f43af2578fccf18b0d4198e48563da7929dc608"),
target_type=TargetType.REVISION,
)
},
)
check_snapshot(gourmet_edge_cases_snapshot, loader.storage)
assert_last_visit_matches(
loader.storage,
repo_url,
status="full",
type="svn",
snapshot=gourmet_edge_cases_snapshot.id,
)
stats = get_stats(loader.storage)
assert stats["origin"] == 1
assert stats["origin_visit"] == 1
assert stats["snapshot"] == 1
assert stats["revision"] == 19
def test_loader_svn_with_wrong_symlinks(swh_storage, datadir, tmp_path):
"""Repository with wrong symlinks should be ingested ok nonetheless
Edge case:
- wrong symbolic link
- wrong symbolic link with empty space names
"""
archive_name = "pkg-gourmet"
archive_path = os.path.join(datadir, "pkg-gourmet-with-wrong-link-cases.tgz")
repo_url = prepare_repository_from_archive(archive_path, archive_name, tmp_path)
loader = SvnLoader(swh_storage, repo_url, temp_directory=tmp_path)
assert loader.load() == {"status": "eventful"}
gourmet_wrong_links_snapshot = Snapshot(
id=hash_to_bytes("b17f38acabb90f066dedd30c29f01a02af88a5c4"),
branches={
b"HEAD": SnapshotBranch(
target=hash_to_bytes("cf30d3bb9d5967d0a2bbeacc405f10a5dd9b138a"),
target_type=TargetType.REVISION,
)
},
)
check_snapshot(gourmet_wrong_links_snapshot, loader.storage)
assert_last_visit_matches(
loader.storage,
repo_url,
status="full",
type="svn",
snapshot=gourmet_wrong_links_snapshot.id,
)
stats = get_stats(loader.storage)
assert stats["origin"] == 1
assert stats["origin_visit"] == 1
assert stats["snapshot"] == 1
assert stats["revision"] == 21
def test_loader_svn_cleanup_loader(swh_storage, datadir, tmp_path):
"""Loader should clean up its working directory after the load"""
archive_name = "pkg-gourmet"
archive_path = os.path.join(datadir, f"{archive_name}.tgz")
repo_url = prepare_repository_from_archive(archive_path, archive_name, tmp_path)
loading_temp_directory = str(tmp_path / "loading")
os.mkdir(loading_temp_directory)
loader = SvnLoader(swh_storage, repo_url, temp_directory=loading_temp_directory)
assert loader.load() == {"status": "eventful"}
# the root temporary directory still exists
assert os.path.exists(loader.temp_directory)
# but it should be empty
assert os.listdir(loader.temp_directory) == []
def test_loader_svn_cleanup_loader_from_remote_dump(swh_storage, datadir, tmp_path):
"""Loader should clean up its working directory after the load"""
archive_name = "pkg-gourmet"
archive_path = os.path.join(datadir, f"{archive_name}.tgz")
repo_url = prepare_repository_from_archive(archive_path, archive_name, tmp_path)
loading_temp_directory = str(tmp_path / "loading")
os.mkdir(loading_temp_directory)
loader = SvnLoaderFromRemoteDump(
swh_storage, repo_url, temp_directory=loading_temp_directory
)
assert loader.load() == {"status": "eventful"}
# the root temporary directory still exists
assert os.path.exists(loader.temp_directory)
# but it should be empty
assert os.listdir(loader.temp_directory) == []
# the internal temp_dir should be cleaned up though
assert not os.path.exists(loader.temp_dir)
def test_loader_svn_cleanup_loader_from_dump_archive(swh_storage, datadir, tmp_path):
"""Loader should clean up its working directory after the load"""
archive_ori_dump = os.path.join(datadir, "penguinsdbtools2018.dump.gz")
archive_dump_dir = os.path.join(tmp_path, "dump")
os.mkdir(archive_dump_dir)
archive_dump = os.path.join(archive_dump_dir, "penguinsdbtools2018.dump.gz")
# loader now drops the dump as soon as it's mounted so we need to make a copy first
shutil.copyfile(archive_ori_dump, archive_dump)
loading_path = str(tmp_path / "loading")
os.mkdir(loading_path)
# Prepare the dump as a local svn repository for test purposes
temp_dir, repo_path = init_svn_repo_from_dump(
archive_dump, root_dir=tmp_path, gzip=True
)
repo_url = f"file://{repo_path}"
loader = SvnLoaderFromRemoteDump(swh_storage, repo_url, temp_directory=loading_path)
assert loader.load() == {"status": "eventful"}
# the root temporary directory still exists
assert os.path.exists(loader.temp_directory)
# but it should be empty
assert os.listdir(loader.temp_directory) == []
# the internal temp_dir should be cleaned up though
assert not os.path.exists(loader.temp_dir)
def test_svn_loader_from_remote_dump(swh_storage, datadir, tmpdir_factory):
archive_name = "pkg-gourmet"
archive_path = os.path.join(datadir, f"{archive_name}.tgz")
tmp_path = tmpdir_factory.mktemp("repo1")
repo_url = prepare_repository_from_archive(archive_path, archive_name, tmp_path)
loaderFromDump = SvnLoaderFromRemoteDump(
swh_storage, repo_url, temp_directory=tmp_path
)
assert loaderFromDump.load() == {"status": "eventful"}
assert_last_visit_matches(
loaderFromDump.storage,
repo_url,
status="full",
type="svn",
snapshot=GOURMET_SNAPSHOT.id,
)
# rename to another origin
tmp_path = tmpdir_factory.mktemp("repo2")
origin_url = prepare_repository_from_archive(archive_path, archive_name, tmp_path)
loader = SvnLoader(
swh_storage, repo_url, origin_url=origin_url, temp_directory=tmp_path
)
assert loader.load() == {"status": "eventful"} # because are working on new origin
assert_last_visit_matches(
loader.storage,
origin_url,
status="full",
type="svn",
snapshot=GOURMET_SNAPSHOT.id,
)
check_snapshot(GOURMET_SNAPSHOT, loader.storage)
stats = get_stats(loader.storage)
assert stats["origin"] == 2 # created one more origin
assert stats["origin_visit"] == 2
assert stats["snapshot"] == 1
loader = SvnLoader(
swh_storage, repo_url, temp_directory=tmp_path
) # no change on the origin-url
assert loader.load() == {"status": "uneventful"}
assert_last_visit_matches(
loader.storage,
origin_url,
status="full",
type="svn",
snapshot=GOURMET_SNAPSHOT.id,
)
stats = get_stats(loader.storage)
assert stats["origin"] == 2
assert stats["origin_visit"] == 3
assert stats["snapshot"] == 1
# second visit from the dump should be uneventful
loaderFromDump = SvnLoaderFromRemoteDump(
swh_storage, repo_url, temp_directory=tmp_path
)
assert loaderFromDump.load() == {"status": "uneventful"}
def test_svn_loader_from_remote_dump_incremental_load_on_stale_repo(
swh_storage, datadir, tmp_path, mocker
):
archive_name = "pkg-gourmet"
archive_path = os.path.join(datadir, f"{archive_name}.tgz")
repo_url = prepare_repository_from_archive(archive_path, archive_name, tmp_path)
# first load: a dump file will be created, mounted to a local repository
# and the latter will be loaded into the archive
loaderFromDump = SvnLoaderFromRemoteDump(
swh_storage, repo_url, temp_directory=tmp_path
)
assert loaderFromDump.load() == {"status": "eventful"}
assert_last_visit_matches(
loaderFromDump.storage,
repo_url,
status="full",
type="svn",
snapshot=GOURMET_SNAPSHOT.id,
)
# second load on same repository: the loader will detect there is no changes
# since last load and will skip the dump, mount and load phases
loaderFromDump = SvnLoaderFromRemoteDump(
swh_storage, repo_url, temp_directory=tmp_path
)
loaderFromDump.dump_svn_revisions = mocker.MagicMock()
init_svn_repo_from_dump = mocker.patch(
"swh.loader.svn.loader.init_svn_repo_from_dump"
)
loaderFromDump.process_svn_revisions = mocker.MagicMock()
loaderFromDump._check_revision_divergence = mocker.MagicMock()
assert loaderFromDump.load() == {"status": "uneventful"}
assert_last_visit_matches(
loaderFromDump.storage,
repo_url,
status="full",
type="svn",
snapshot=GOURMET_SNAPSHOT.id,
)
# no dump
loaderFromDump.dump_svn_revisions.assert_not_called()
# no mount
init_svn_repo_from_dump.assert_not_called()
# no loading
loaderFromDump.process_svn_revisions.assert_not_called()
# no redundant post_load processing
loaderFromDump._check_revision_divergence.assert_not_called()
def test_svn_loader_from_remote_dump_incremental_load_on_non_stale_repo(
swh_storage, datadir, tmp_path, mocker
):
archive_name = "pkg-gourmet"
archive_path = os.path.join(datadir, f"{archive_name}.tgz")
repo_url = prepare_repository_from_archive(archive_path, archive_name, tmp_path)
# first load
loader = SvnLoaderFromRemoteDump(swh_storage, repo_url, temp_directory=tmp_path)
loader.load()
archive_path = os.path.join(datadir, "pkg-gourmet-with-updates.tgz")
repo_updated_url = prepare_repository_from_archive(
archive_path, archive_name, tmp_path
)
# second load
loader = SvnLoaderFromRemoteDump(
swh_storage, repo_updated_url, temp_directory=tmp_path
)
dump_svn_revisions = mocker.spy(loader, "dump_svn_revisions")
process_svn_revisions = mocker.spy(loader, "process_svn_revisions")
loader.load()
dump_svn_revisions.assert_called()
process_svn_revisions.assert_called()
def test_loader_user_defined_svn_properties(swh_storage, datadir, tmp_path):
"""Edge cases: The repository held some user defined svn-properties with special
encodings, this prevented the repository from being loaded even though we do not
ingest those information.
"""
archive_name = "httthttt"
archive_path = os.path.join(datadir, f"{archive_name}.tgz")
repo_url = prepare_repository_from_archive(archive_path, archive_name, tmp_path)
loader = SvnLoader(swh_storage, repo_url)
assert loader.load() == {"status": "eventful"}
expected_snapshot = Snapshot(
id=hash_to_bytes("70487267f682c07e52a2371061369b6cf5bffa47"),
branches={
b"HEAD": SnapshotBranch(
target=hash_to_bytes("604a17dbb15e8d7ecb3e9f3768d09bf493667a93"),
target_type=TargetType.REVISION,
)
},
)
check_snapshot(expected_snapshot, loader.storage)
assert_last_visit_matches(
loader.storage,
repo_url,
status="full",
type="svn",
snapshot=expected_snapshot.id,
)
stats = get_stats(loader.storage)
assert stats["origin"] == 1
assert stats["origin_visit"] == 1
assert stats["snapshot"] == 1
assert stats["revision"] == 7
def test_loader_svn_dir_added_then_removed(swh_storage, datadir, tmp_path):
"""Loader should handle directory removal when processing a commit"""
archive_name = "pkg-gourmet"
archive_path = os.path.join(datadir, f"{archive_name}-add-remove-dir.tgz")
repo_url = prepare_repository_from_archive(archive_path, archive_name, tmp_path)
loader = SvnLoader(swh_storage, repo_url, temp_directory=tmp_path)
assert loader.load() == {"status": "eventful"}
assert_last_visit_matches(
loader.storage,
repo_url,
status="full",
type="svn",
)
check_snapshot(loader.snapshot, loader.storage)
def test_loader_svn_loader_from_dump_archive(swh_storage, datadir, tmp_path):
archive_name = "pkg-gourmet"
archive_path = os.path.join(datadir, f"{archive_name}.tgz")
repo_url = prepare_repository_from_archive(archive_path, archive_name, tmp_path)
dump_filename = f"{archive_name}.dump"
with open(os.path.join(tmp_path, dump_filename), "wb") as dump_file:
# create compressed dump file of pkg-gourmet repo
subprocess.run(["svnrdump", "dump", repo_url], stdout=dump_file)
subprocess.run(["gzip", dump_filename], cwd=tmp_path)
# load svn repo from that compressed dump file
loader = SvnLoaderFromDumpArchive(
swh_storage,
url=repo_url,
archive_path=os.path.join(tmp_path, f"{dump_filename}.gz"),
temp_directory=tmp_path,
)
assert loader.load() == {"status": "eventful"}
assert_last_visit_matches(
loader.storage,
repo_url,
status="full",
type="svn",
snapshot=GOURMET_SNAPSHOT.id,
)
check_snapshot(GOURMET_SNAPSHOT, loader.storage)
assert get_stats(loader.storage) == {
"content": 19,
"directory": 17,
"origin": 1,
"origin_visit": 1,
"release": 0,
"revision": 6,
"skipped_content": 0,
"snapshot": 1,
}
def test_loader_eol_style_file_property_handling_edge_case(
swh_storage, repo_url, tmp_path
):
# # first commit
add_commit(
repo_url,
(
"Add a directory containing a file with CRLF end of line "
"and set svn:eol-style property to native so CRLF will be "
"replaced by LF in the file when exporting the revision"
),
[
CommitChange(
change_type=CommitChangeType.AddOrUpdate,
path="directory/file_with_crlf_eol.txt",
properties={"svn:eol-style": "native"},
data=b"Hello world!\r\n",
)
],
)
# second commit
add_commit(
repo_url,
"Remove previously added directory and file",
[
CommitChange(
change_type=CommitChangeType.Delete,
path="directory/",
)
],
)
# third commit
add_commit(
repo_url,
(
"Add again same directory containing same file with CRLF end of line "
"but do not set svn:eol-style property value so CRLF will not be "
"replaced by LF when exporting the revision"
),
[
CommitChange(
change_type=CommitChangeType.AddOrUpdate,
path="directory/file_with_crlf_eol.txt",
data=b"Hello world!\r\n",
)
],
)
# instantiate a svn loader checking after each processed revision that
# the repository filesystem it reconstructed does not differ from a subversion
# export of that revision
loader = SvnLoader(swh_storage, repo_url, temp_directory=tmp_path, check_revision=1)
assert loader.load() == {"status": "eventful"}
assert_last_visit_matches(
loader.storage,
repo_url,
status="full",
type="svn",
)
check_snapshot(loader.snapshot, loader.storage)
assert get_stats(loader.storage) == {
"content": 2,
"directory": 5,
"origin": 1,
"origin_visit": 1,
"release": 0,
"revision": 3,
"skipped_content": 0,
"snapshot": 1,
}
def get_head_revision_paths_info(loader: SvnLoader) -> Dict[bytes, Dict[str, Any]]:
assert loader.snapshot is not None
root_dir = loader.snapshot.branches[b"HEAD"].target
revision = loader.storage.revision_get([root_dir])[0]
assert revision is not None
paths = {}
for entry in loader.storage.directory_ls(revision.directory, recursive=True):
paths[entry["name"]] = entry
return paths
def test_loader_eol_style_on_svn_link_handling(swh_storage, repo_url, tmp_path):
# first commit
add_commit(
repo_url,
(
"Add a regular file, a directory and a link to the regular file "
"in the directory. Set svn:eol-style property for the regular "
"file and the link. Set svn:special property for the link."
),
[
CommitChange(
change_type=CommitChangeType.AddOrUpdate,
path="file_with_crlf_eol.txt",
properties={"svn:eol-style": "native"},
data=b"Hello world!\r\n",
),
CommitChange(
change_type=CommitChangeType.AddOrUpdate,
path="directory/file_with_crlf_eol.txt",
properties={"svn:eol-style": "native", "svn:special": "*"},
data=b"link ../file_with_crlf_eol.txt",
),
],
)
# instantiate a svn loader checking after each processed revision that
# the repository filesystem it reconstructed does not differ from a subversion
# export of that revision
loader = SvnLoader(swh_storage, repo_url, temp_directory=tmp_path, check_revision=1)
assert loader.load() == {"status": "eventful"}
assert_last_visit_matches(
loader.storage,
repo_url,
status="full",
type="svn",
)
check_snapshot(loader.snapshot, loader.storage)
# check loaded objects are those expected
assert get_stats(loader.storage) == {
"content": 2,
"directory": 2,
"origin": 1,
"origin_visit": 1,
"release": 0,
"revision": 1,
"skipped_content": 0,
"snapshot": 1,
}
paths = get_head_revision_paths_info(loader)
assert (
loader.storage.content_get_data(paths[b"file_with_crlf_eol.txt"]["sha1"])
== b"Hello world!\n"
)
assert paths[b"directory/file_with_crlf_eol.txt"]["perms"] == DentryPerms.symlink
assert (
loader.storage.content_get_data(
paths[b"directory/file_with_crlf_eol.txt"]["sha1"]
)
== b"../file_with_crlf_eol.txt"
)
def test_loader_svn_special_property_unset(swh_storage, repo_url, tmp_path):
# first commit
add_commit(
repo_url,
(
"Create a regular file, a link to a file and a link to an "
"external file. Set the svn:special property on the links."
),
[
CommitChange(
change_type=CommitChangeType.AddOrUpdate,
path="file.txt",
data=b"Hello world!\n",
),
CommitChange(
change_type=CommitChangeType.AddOrUpdate,
path="link.txt",
properties={"svn:special": "*"},
data=b"link ./file.txt",
),
CommitChange(
change_type=CommitChangeType.AddOrUpdate,
path="external_link.txt",
properties={"svn:special": "*"},
data=b"link /home/user/data.txt",
),
],
)
# second commit
add_commit(
repo_url,
"Unset the svn:special property on the links.",
[
CommitChange(
change_type=CommitChangeType.AddOrUpdate,
path="link.txt",
properties={"svn:special": None},
),
CommitChange(
change_type=CommitChangeType.AddOrUpdate,
path="external_link.txt",
properties={"svn:special": None},
),
],
)
# instantiate a svn loader checking after each processed revision that
# the repository filesystem it reconstructed does not differ from a subversion
# export of that revision
loader = SvnLoader(swh_storage, repo_url, temp_directory=tmp_path, check_revision=1)
assert loader.load() == {"status": "eventful"}
assert_last_visit_matches(
loader.storage,
repo_url,
status="full",
type="svn",
)
check_snapshot(loader.snapshot, loader.storage)
# check loaded objects are those expected
assert get_stats(loader.storage) == {
"content": 5,
"directory": 2,
"origin": 1,
"origin_visit": 1,
"release": 0,
"revision": 2,
"skipped_content": 0,
"snapshot": 1,
}
paths = get_head_revision_paths_info(loader)
assert paths[b"link.txt"]["perms"] == DentryPerms.content
assert (
loader.storage.content_get_data(paths[b"link.txt"]["sha1"])
== b"link ./file.txt"
)
assert paths[b"external_link.txt"]["perms"] == DentryPerms.content
assert (
loader.storage.content_get_data(paths[b"external_link.txt"]["sha1"])
== b"link /home/user/data.txt"
)
def test_loader_invalid_svn_eol_style_property_value(swh_storage, repo_url, tmp_path):
filename = "file_with_crlf_eol.txt"
file_content = b"Hello world!\r\n"
# # first commit
add_commit(
repo_url,
(
"Add a file with CRLF end of line and set svn:eol-style property "
"to an invalid value."
),
[
CommitChange(
change_type=CommitChangeType.AddOrUpdate,
path=filename,
properties={"svn:eol-style": "foo"},
data=file_content,
)
],
)
# instantiate a svn loader checking after each processed revision that
# the repository filesystem it reconstructed does not differ from a subversion
# export of that revision
loader = SvnLoader(swh_storage, repo_url, temp_directory=tmp_path, check_revision=1)
assert loader.load() == {"status": "eventful"}
assert_last_visit_matches(
loader.storage,
repo_url,
status="full",
type="svn",
)
check_snapshot(loader.snapshot, loader.storage)
paths = get_head_revision_paths_info(loader)
# end of lines should not have been processed
assert (
loader.storage.content_get_data(paths[filename.encode()]["sha1"])
== file_content
)
def test_loader_first_revision_is_not_number_one(
swh_storage, mocker, repo_url, tmp_path
):
class SvnRepoSkipFirstRevision(SvnRepo):
def logs(self, revision_start, revision_end):
"""Overrides logs method to skip revision number one in yielded revisions"""
yield from super().logs(revision_start + 1, revision_end)
from swh.loader.svn import loader
mocker.patch.object(loader, "SvnRepo", SvnRepoSkipFirstRevision)
for filename in ("foo", "bar", "baz"):
add_commit(
repo_url,
f"Add {filename} file",
[
CommitChange(
change_type=CommitChangeType.AddOrUpdate,
path=filename,
data=f"{filename}\n".encode(),
)
],
)
loader = SvnLoader(swh_storage, repo_url, temp_directory=tmp_path)
# post loading will detect an issue and make a partial visit with a snapshot
assert loader.load() == {"status": "failed"}
assert_last_visit_matches(
loader.storage,
repo_url,
status="partial",
type="svn",
)
check_snapshot(loader.snapshot, loader.storage)
assert get_stats(loader.storage) == {
"content": 2,
"directory": 2,
"origin": 1,
"origin_visit": 1,
"release": 0,
"revision": 2,
"skipped_content": 0,
"snapshot": 1,
}
def test_loader_svn_special_property_on_binary_file(swh_storage, repo_url, tmp_path):
"""When a file has the svn:special property set but is not a svn link,
it might be truncated under certain conditions when performing an export
operation."""
data = (
b"!<symlink>\xff\xfea\x00p\x00t\x00-\x00c\x00y\x00g\x00.\x00s\x00h\x00\x00\x00"
)
# first commit
add_commit(
repo_url,
(
"Add a non svn link binary file and set the svn:special property on it."
"That file will be truncated when exporting it."
),
[
CommitChange(
change_type=CommitChangeType.AddOrUpdate,
path="binary_file",
properties={"svn:special": "*"},
data=data,
),
],
)
# second commit
add_commit(
repo_url,
(
"Add a non svn link binary file and set the svn:special and "
"svn:mime-type properties on it."
"That file will not be truncated when exporting it."
),
[
CommitChange(
change_type=CommitChangeType.AddOrUpdate,
path="another_binary_file",
properties={
"svn:special": "*",
"svn:mime-type": "application/octet-stream",
},
data=data,
),
],
)
# third commit
add_commit(
repo_url,
"Remove the svn:special property on the previously added files",
[
CommitChange(
change_type=CommitChangeType.AddOrUpdate,
path="binary_file",
properties={"svn:special": None},
),
CommitChange(
change_type=CommitChangeType.AddOrUpdate,
path="another_binary_file",
properties={"svn:special": None},
),
],
)
# instantiate a svn loader checking after each processed revision that
# the repository filesystem it reconstructed does not differ from a subversion
# export of that revision
loader = SvnLoader(swh_storage, repo_url, temp_directory=tmp_path, check_revision=1)
assert loader.load() == {"status": "eventful"}
assert_last_visit_matches(
loader.storage,
repo_url,
status="full",
type="svn",
)
check_snapshot(loader.snapshot, loader.storage)
def test_loader_last_revision_divergence(swh_storage, datadir, tmp_path):
archive_name = "pkg-gourmet"
archive_path = os.path.join(datadir, f"{archive_name}.tgz")
repo_url = prepare_repository_from_archive(archive_path, archive_name, tmp_path)
class SvnLoaderRevisionDivergence(SvnLoader):
def _check_revision_divergence(self, count, rev, dir_id):
raise ValueError("revision divergence detected")
loader = SvnLoaderRevisionDivergence(swh_storage, repo_url, temp_directory=tmp_path)
assert loader.load() == {"status": "failed"}
assert_last_visit_matches(
loader.storage,
repo_url,
status="partial",
type="svn",
snapshot=GOURMET_SNAPSHOT.id,
)
check_snapshot(GOURMET_SNAPSHOT, loader.storage)
def test_loader_delete_directory_while_file_has_same_prefix(
swh_storage, repo_url, tmp_path
):
# first commit
add_commit(
repo_url,
"Add a file and a directory with same prefix",
[
CommitChange(
change_type=CommitChangeType.AddOrUpdate,
path="foo/bar.c",
data=b'#include "../foo.c"',
),
CommitChange(
change_type=CommitChangeType.AddOrUpdate,
path="foo.c",
data=b"int foo() {return 0;}",
),
],
)
# second commit
add_commit(
repo_url,
"Delete previously added directory and update file content",
[
CommitChange(change_type=CommitChangeType.Delete, path="foo"),
CommitChange(
change_type=CommitChangeType.AddOrUpdate,
path="foo.c",
data=b"int foo() {return 1;}",
),
],
)
# instantiate a svn loader checking after each processed revision that
# the repository filesystem it reconstructed does not differ from a subversion
# export of that revision
loader = SvnLoader(swh_storage, repo_url, temp_directory=tmp_path, check_revision=1)
assert loader.load() == {"status": "eventful"}
assert_last_visit_matches(
loader.storage,
repo_url,
status="full",
type="svn",
)
check_snapshot(loader.snapshot, loader.storage)
def test_svn_loader_incremental(swh_storage, repo_url, tmp_path):
# first commit
add_commit(
repo_url,
(
"Add a directory containing a file with CRLF end of line "
"and set svn:eol-style property to native so CRLF will be "
"replaced by LF in the file when exporting the revision"
),
[
CommitChange(
change_type=CommitChangeType.AddOrUpdate,
path="file_with_crlf_eol.txt",
properties={"svn:eol-style": "native"},
data=b"Hello world!\r\n",
)
],
)
# first load
loader = SvnLoader(swh_storage, repo_url, temp_directory=tmp_path, check_revision=1)
assert loader.load() == {"status": "eventful"}
assert_last_visit_matches(
loader.storage,
repo_url,
status="full",
type="svn",
)
check_snapshot(loader.snapshot, loader.storage)
# second commit
add_commit(
repo_url,
"Modify previously added file",
[
CommitChange(
change_type=CommitChangeType.AddOrUpdate,
path="file_with_crlf_eol.txt",
data=b"Hello World!\r\n",
)
],
)
# second load, incremental
loader = SvnLoader(swh_storage, repo_url, temp_directory=tmp_path, check_revision=1)
assert loader.load() == {"status": "eventful"}
assert_last_visit_matches(
loader.storage,
repo_url,
status="full",
type="svn",
)
check_snapshot(loader.snapshot, loader.storage)
# third commit
add_commit(
repo_url,
"Unset svn:eol-style property on file",
[
CommitChange(
change_type=CommitChangeType.AddOrUpdate,
path="file_with_crlf_eol.txt",
properties={"svn:eol-style": None},
)
],
)
# third load, incremental
loader = SvnLoader(swh_storage, repo_url, temp_directory=tmp_path, check_revision=1)
assert loader.load() == {"status": "eventful"}
assert_last_visit_matches(
loader.storage,
repo_url,
status="full",
type="svn",
)
check_snapshot(loader.snapshot, loader.storage)
def test_svn_loader_incremental_replay_start_with_empty_directory(
swh_storage, mocker, repo_url, tmp_path
):
# first commit
add_commit(
repo_url,
("Add a file"),
[
CommitChange(
change_type=CommitChangeType.AddOrUpdate,
path="foo.txt",
data=b"foo\n",
)
],
)
# first load
loader = SvnLoader(swh_storage, repo_url, temp_directory=tmp_path)
assert loader.load() == {"status": "eventful"}
assert_last_visit_matches(
loader.storage,
repo_url,
status="full",
type="svn",
)
check_snapshot(loader.snapshot, loader.storage)
# second commit
add_commit(
repo_url,
"Modify previously added file",
[
CommitChange(
change_type=CommitChangeType.AddOrUpdate,
path="foo.txt",
data=b"bar\n",
)
],
)
class SvnRepoCheckReplayStartWithEmptyDirectory(SvnRepo):
def swh_hash_data_per_revision(self, start_revision: int, end_revision: int):
"""Overrides swh_hash_data_per_revision method to grab the content
of the directory where the svn revisions will be replayed before that
process starts."""
self.replay_dir_content_before_start = [
os.path.join(root, name)
for root, _, files in os.walk(self.local_url)
for name in files
]
yield from super().swh_hash_data_per_revision(start_revision, end_revision)
from swh.loader.svn import loader
mocker.patch.object(loader, "SvnRepo", SvnRepoCheckReplayStartWithEmptyDirectory)
# second load, incremental
loader = SvnLoader(swh_storage, repo_url, temp_directory=tmp_path)
loader.load()
# check work directory was empty before replaying revisions
assert loader.svnrepo.replay_dir_content_before_start == []
def test_loader_svn_executable_property_on_svn_link_handling(
swh_storage, repo_url, tmp_path
):
# first commit
add_commit(
repo_url,
(
"Add an executable file and a svn link to it."
"Set svn:executable property for both paths."
),
[
CommitChange(
change_type=CommitChangeType.AddOrUpdate,
path="hello-world",
properties={"svn:executable": "*"},
data=b"#!/bin/bash\necho Hello World !",
),
CommitChange(
change_type=CommitChangeType.AddOrUpdate,
path="hello",
properties={"svn:executable": "*", "svn:special": "*"},
data=b"link hello-world",
),
],
)
# second commit
add_commit(
repo_url,
(
"Remove executable file, unset link and replace it with executable content."
"As the link was previously marked as executable, execution rights should"
"be set after turning it to a regular file."
),
[
CommitChange(change_type=CommitChangeType.Delete, path="hello-world"),
CommitChange(
change_type=CommitChangeType.AddOrUpdate,
path="hello",
properties={"svn:special": None},
data=b"#!/bin/bash\necho Hello World !",
),
],
)
# instantiate a svn loader checking after each processed revision that
# the repository filesystem it reconstructed does not differ from a subversion
# export of that revision
loader = SvnLoader(swh_storage, repo_url, temp_directory=tmp_path, check_revision=1)
assert loader.load() == {"status": "eventful"}
assert_last_visit_matches(
loader.storage,
repo_url,
status="full",
type="svn",
)
check_snapshot(loader.snapshot, loader.storage)
def test_loader_svn_add_property_on_link(swh_storage, repo_url, tmp_path):
# first commit
add_commit(
repo_url,
"Add an executable file and a svn link to it.",
[
CommitChange(
change_type=CommitChangeType.AddOrUpdate,
path="hello-world",
properties={"svn:executable": "*"},
data=b"#!/bin/bash\necho Hello World !",
),
CommitChange(
change_type=CommitChangeType.AddOrUpdate,
path="hello",
properties={"svn:special": "*"},
data=b"link hello-world",
),
],
)
# second commit
add_commit(
repo_url,
"Set svn:eol-style property on link",
[
CommitChange(
change_type=CommitChangeType.AddOrUpdate,
path="hello",
properties={"svn:eol-style": "native"},
),
],
)
# instantiate a svn loader checking after each processed revision that
# the repository filesystem it reconstructed does not differ from a subversion
# export of that revision
loader = SvnLoader(swh_storage, repo_url, temp_directory=tmp_path, check_revision=1)
assert loader.load() == {"status": "eventful"}
assert_last_visit_matches(
loader.storage,
repo_url,
status="full",
type="svn",
)
check_snapshot(loader.snapshot, loader.storage)
def test_loader_svn_link_parsing(swh_storage, repo_url, tmp_path):
# first commit
add_commit(
repo_url,
"Add an executable file and a svn link to it.",
[
CommitChange(
change_type=CommitChangeType.AddOrUpdate,
path="hello-world",
properties={"svn:executable": "*"},
data=b"#!/bin/bash\necho Hello World !",
),
CommitChange(
change_type=CommitChangeType.AddOrUpdate,
path="hello",
properties={"svn:special": "*"},
data=b"link hello-world",
),
],
)
# second commit
add_commit(
repo_url,
"Update svn link content",
[
CommitChange(
change_type=CommitChangeType.AddOrUpdate,
path="hello",
data=b"link hello-world\r\n",
),
],
)
# instantiate a svn loader checking after each processed revision that
# the repository filesystem it reconstructed does not differ from a subversion
# export of that revision
loader = SvnLoader(swh_storage, repo_url, temp_directory=tmp_path, check_revision=1)
assert loader.load() == {"status": "eventful"}
assert_last_visit_matches(
loader.storage,
repo_url,
status="full",
type="svn",
)
check_snapshot(loader.snapshot, loader.storage)
def test_loader_svn_empty_local_dir_before_post_load(swh_storage, datadir, tmp_path):
archive_name = "pkg-gourmet"
archive_path = os.path.join(datadir, f"{archive_name}.tgz")
repo_url = prepare_repository_from_archive(archive_path, archive_name, tmp_path)
class SvnLoaderPostLoadLocalDirIsEmpty(SvnLoader):
def post_load(self, success=True):
if success:
self.local_dirname_content = [
os.path.join(root, name)
for root, _, files in os.walk(self.svnrepo.local_dirname)
for name in files
]
return super().post_load(success)
loader = SvnLoaderPostLoadLocalDirIsEmpty(
swh_storage, repo_url, temp_directory=tmp_path
)
assert loader.load() == {"status": "eventful"}
assert loader.local_dirname_content == []
assert_last_visit_matches(
loader.storage,
repo_url,
status="full",
type="svn",
snapshot=GOURMET_SNAPSHOT.id,
)
check_snapshot(GOURMET_SNAPSHOT, loader.storage)
def _dump_project(tmp_path, origin_url):
svnrdump_cmd = ["svnrdump", "dump", origin_url]
dump_path = f"{tmp_path}/repo.dump"
with open(dump_path, "wb") as dump_file:
subprocess.run(svnrdump_cmd, stdout=dump_file)
subprocess.run(["gzip", dump_path])
return dump_path + ".gz"
def test_loader_svn_add_property_on_directory_link(swh_storage, repo_url, tmp_path):
# first commit
add_commit(
repo_url,
"Add an executable file in a directory and a svn link to the directory.",
[
CommitChange(
change_type=CommitChangeType.AddOrUpdate,
path="code/hello-world",
properties={"svn:executable": "*"},
data=b"#!/bin/bash\necho Hello World !",
),
CommitChange(
change_type=CommitChangeType.AddOrUpdate,
path="hello",
properties={"svn:special": "*"},
data=b"link code",
),
],
)
# second commit
add_commit(
repo_url,
"Set svn:eol-style property on link",
[
CommitChange(
change_type=CommitChangeType.AddOrUpdate,
path="hello",
properties={"svn:eol-style": "native"},
),
],
)
# instantiate a svn loader checking after each processed revision that
# the repository filesystem it reconstructed does not differ from a subversion
# export of that revision
loader = SvnLoader(swh_storage, repo_url, temp_directory=tmp_path, check_revision=1)
assert loader.load() == {"status": "eventful"}
assert_last_visit_matches(
loader.storage,
repo_url,
status="full",
type="svn",
)
check_snapshot(loader.snapshot, loader.storage)
@pytest.mark.parametrize(
"svn_loader_cls", [SvnLoader, SvnLoaderFromDumpArchive, SvnLoaderFromRemoteDump]
)
def test_loader_with_subprojects(swh_storage, repo_url, tmp_path, svn_loader_cls):
# first commit
add_commit(
repo_url,
"Add first project in repository",
[
CommitChange(
change_type=CommitChangeType.AddOrUpdate,
path="project1/foo.sh",
data=b"#!/bin/bash\necho foo",
),
],
)
# second commit
add_commit(
repo_url,
"Add second project in repository",
[
CommitChange(
change_type=CommitChangeType.AddOrUpdate,
path="project2/bar.sh",
data=b"#!/bin/bash\necho bar",
),
],
)
# third commit
add_commit(
repo_url,
"Add third project in repository",
[
CommitChange(
change_type=CommitChangeType.AddOrUpdate,
path="project3/baz.sh",
data=b"#!/bin/bash\necho baz",
),
],
)
for i in range(1, 4):
# load each project in the repository separately and check behavior
# is the same if origin URL has a trailing slash or not
origin_url = f"{repo_url}/project{i}{'/' if i%2 else ''}"
loader_params = {
"storage": swh_storage,
"url": origin_url,
"origin_url": origin_url,
"temp_directory": tmp_path,
"incremental": True,
"check_revision": 1,
}
if svn_loader_cls == SvnLoaderFromDumpArchive:
loader_params["archive_path"] = _dump_project(tmp_path, origin_url)
loader = svn_loader_cls(**loader_params)
assert loader.load() == {"status": "eventful"}
assert_last_visit_matches(
loader.storage,
origin_url,
status="full",
type="svn",
)
check_snapshot(loader.snapshot, loader.storage)
if svn_loader_cls == SvnLoaderFromDumpArchive:
loader_params["archive_path"] = _dump_project(tmp_path, origin_url)
loader = svn_loader_cls(**loader_params)
assert loader.load() == {"status": "uneventful"}
# each project origin must have
assert get_stats(loader.storage) == {
"content": i, # one content
"directory": 2 * i, # two directories
"origin": i,
"origin_visit": 2 * i, # two visits
"release": 0,
"revision": i, # one revision
"skipped_content": 0,
"snapshot": i, # one snapshot
}
@pytest.mark.parametrize(
"svn_loader_cls", [SvnLoader, SvnLoaderFromDumpArchive, SvnLoaderFromRemoteDump]
)
def test_loader_subproject_root_dir_removal(
swh_storage, repo_url, tmp_path, svn_loader_cls
):
# first commit
add_commit(
repo_url,
"Add project in repository",
[
CommitChange(
change_type=CommitChangeType.AddOrUpdate,
path="project/foo.sh",
data=b"#!/bin/bash\necho foo",
),
],
)
# second commit
add_commit(
repo_url,
"Remove project root directory",
[CommitChange(change_type=CommitChangeType.Delete, path="project/")],
)
# third commit
add_commit(
repo_url,
"Re-add project in repository",
[
CommitChange(
change_type=CommitChangeType.AddOrUpdate,
path="project/foo.sh",
data=b"#!/bin/bash\necho foo",
),
],
)
origin_url = f"{repo_url}/project"
loader_params = {
"storage": swh_storage,
"url": origin_url,
"origin_url": origin_url,
"temp_directory": tmp_path,
"incremental": True,
"check_revision": 1,
}
if svn_loader_cls == SvnLoaderFromDumpArchive:
loader_params["archive_path"] = _dump_project(tmp_path, origin_url)
loader = svn_loader_cls(**loader_params)
assert loader.load() == {"status": "eventful"}
assert_last_visit_matches(
loader.storage,
origin_url,
status="full",
type="svn",
)
check_snapshot(loader.snapshot, loader.storage)
if svn_loader_cls == SvnLoaderFromDumpArchive:
loader_params["archive_path"] = _dump_project(tmp_path, origin_url)
loader = svn_loader_cls(**loader_params)
assert loader.load() == {"status": "uneventful"}
@pytest.mark.parametrize("svn_loader_cls", [SvnLoader, SvnLoaderFromRemoteDump])
def test_loader_svn_not_found_after_successful_visit(
swh_storage, datadir, tmp_path, svn_loader_cls
):
archive_name = "pkg-gourmet"
archive_path = os.path.join(datadir, f"{archive_name}.tgz")
repo_url = prepare_repository_from_archive(archive_path, archive_name, tmp_path)
loader = svn_loader_cls(swh_storage, repo_url, temp_directory=tmp_path)
assert loader.load() == {"status": "eventful"}
assert_last_visit_matches(
loader.storage,
repo_url,
status="full",
type="svn",
snapshot=GOURMET_SNAPSHOT.id,
)
check_snapshot(loader.snapshot, loader.storage)
# simulate removal of remote repository
shutil.rmtree(repo_url.replace("file://", ""))
loader = svn_loader_cls(swh_storage, repo_url, temp_directory=tmp_path)
assert loader.load() == {"status": "uneventful"}
assert_last_visit_matches(
loader.storage,
repo_url,
status="not_found",
type="svn",
snapshot=None,
)
def test_loader_svn_from_remote_dump_url_redirect(swh_storage, tmp_path, mocker):
repo_url = "http://svn.example.org/repo"
repo_redirect_url = "https://svn.example.org/repo"
# mock remote subversion operations
from swh.loader.svn.svn import client
mocker.patch("swh.loader.svn.svn.RemoteAccess")
init_svn_repo_from_dump = mocker.patch(
"swh.loader.svn.loader.init_svn_repo_from_dump"
)
init_svn_repo_from_dump.return_value = ("", "")
mock_client = mocker.MagicMock()
mocker.patch.object(client, "Client", mock_client)
class Info:
repos_root_url = repo_redirect_url
url = repo_redirect_url
mock_client().info.return_value = {"repo": Info()}
# init remote dump loader and mock some methods
loader = SvnLoaderFromRemoteDump(swh_storage, repo_url, temp_directory=tmp_path)
loader.dump_svn_revisions = mocker.MagicMock(return_value=("", -1))
loader.start_from = mocker.MagicMock(return_value=(0, 0))
# prepare loading
loader.prepare()
# check redirection URL has been used to dump repository
assert loader.dump_svn_revisions.call_args_list[0][0][0] == repo_redirect_url
@pytest.mark.parametrize("svn_loader_cls", [SvnLoader, SvnLoaderFromRemoteDump])
def test_loader_basic_authentication_required(
swh_storage, repo_url, tmp_path, svn_loader_cls, svnserve
):
# add file to empty test repo
add_commit(
repo_url,
"Add project in repository",
[
CommitChange(
change_type=CommitChangeType.AddOrUpdate,
path="project/foo.sh",
data=b"#!/bin/bash\necho foo",
),
],
)
# compute repo URLs that will be made available by svnserve
repo_path = repo_url.replace("file://", "")
repo_root = os.path.dirname(repo_path)
repo_name = os.path.basename(repo_path)
username = "anonymous"
password = "anonymous"
port = 12000
repo_url_no_auth = f"svn://localhost:{port}/{repo_name}"
repo_url = f"svn://{username}:{password}@localhost:{port}/{repo_name}"
# disable anonymous access and require authentication on test repo
with open(os.path.join(repo_path, "conf", "svnserve.conf"), "w") as svnserve_conf:
svnserve_conf.write(
textwrap.dedent(
"""
[general]
# Authentication realm of the repository.
realm = test-repository
password-db = passwd
# Deny all anonymous access
anon-access = none
# Grant authenticated users read and write privileges
auth-access = write
"""
)
)
# add a user with read/write access on test repo
with open(os.path.join(repo_path, "conf", "passwd"), "w") as passwd:
passwd.write(f"[users]\n{username} = {password}")
# execute svnserve
svnserve(repo_root, port)
# check loading failed with no authentication
loader = svn_loader_cls(swh_storage, repo_url_no_auth, temp_directory=tmp_path)
assert loader.load() == {"status": "uneventful"}
# check loading succeeded with authentication
loader = svn_loader_cls(swh_storage, repo_url, temp_directory=tmp_path)
assert loader.load() == {"status": "eventful"}
assert_last_visit_matches(
loader.storage,
repo_url,
status="full",
type="svn",
)
check_snapshot(loader.snapshot, loader.storage)
-@pytest.mark.parametrize(
- "filename", ["file with spaces.txt", "file#with#hash#signs.txt"]
-)
-def test_loader_with_special_chars_in_svn_url(repo_url, tmp_path, filename):
+def test_loader_with_special_chars_in_svn_url(repo_url, tmp_path):
content = b"foo"
+ filename = "".join(
+ itertools.chain(
+ (chr(i) for i in range(32, 127)), (chr(i) for i in range(161, 256))
+ )
+ )
+
add_commit(
repo_url,
- "Add file with spaces in its name",
+ "Add file with characters to quote in its name",
[
CommitChange(
change_type=CommitChangeType.AddOrUpdate,
path=filename,
data=content,
),
],
)
svnrepo = SvnRepo(repo_url, repo_url, tmp_path, max_content_length=10000)
dest_path = f"{tmp_path}/file"
svnrepo.export(f"{repo_url}/{filename}", to=dest_path)
with open(dest_path, "rb") as f:
assert f.read() == content
@pytest.mark.parametrize("svn_loader_cls", [SvnLoader, SvnLoaderFromRemoteDump])
def test_loader_repo_with_copyfrom_and_replace_operations(
swh_storage, repo_url, tmp_path, svn_loader_cls
):
add_commit(
repo_url,
"Create trunk/data folder",
[
CommitChange(
change_type=CommitChangeType.AddOrUpdate,
path="trunk/data/foo",
data=b"foo",
),
CommitChange(
change_type=CommitChangeType.AddOrUpdate,
path="trunk/data/bar",
data=b"bar",
),
CommitChange(
change_type=CommitChangeType.AddOrUpdate,
path="trunk/data/baz/",
),
],
)
add_commit(
repo_url,
"Create trunk/project folder",
[
CommitChange(
change_type=CommitChangeType.AddOrUpdate,
path="trunk/project/",
),
],
)
add_commit(
repo_url,
"Create trunk/project/bar as copy of trunk/data/bar from revision 1",
[
CommitChange(
change_type=CommitChangeType.AddOrUpdate,
path="trunk/project/bar",
copyfrom_path=repo_url + "/trunk/data/bar",
copyfrom_rev=1,
),
],
)
add_commit(
repo_url,
(
"Create trunk/project/data/ folder as a copy of /trunk/data from revision 1"
" and replace the trunk/project/data/baz/ folder by a trunk/project/data/baz file"
),
[
CommitChange(
change_type=CommitChangeType.AddOrUpdate,
path="trunk/project/data/",
copyfrom_path=repo_url + "/trunk/data/",
copyfrom_rev=1,
),
CommitChange(
change_type=CommitChangeType.Delete,
path="trunk/project/data/baz/",
),
CommitChange(
change_type=CommitChangeType.AddOrUpdate,
path="trunk/project/data/baz",
data=b"baz",
),
],
)
loader = svn_loader_cls(
swh_storage, repo_url, temp_directory=tmp_path, check_revision=1
)
assert loader.load() == {"status": "eventful"}
assert_last_visit_matches(
loader.storage,
repo_url,
status="full",
type="svn",
)
check_snapshot(loader.snapshot, loader.storage)
@pytest.mark.parametrize("svn_loader_cls", [SvnLoader, SvnLoaderFromRemoteDump])
def test_loader_repo_with_copyfrom_operations_and_eol_style(
swh_storage, repo_url, tmp_path, svn_loader_cls
):
add_commit(
repo_url,
"Create trunk/code/foo file",
[
CommitChange(
change_type=CommitChangeType.AddOrUpdate,
path="trunk/code/foo",
data=b"foo\n",
properties={"svn:eol-style": "CRLF"},
),
CommitChange(
change_type=CommitChangeType.AddOrUpdate,
path="branches/code/",
),
],
)
add_commit(
repo_url,
"Modify svn:eol-style property for the trunk/code/foo file",
[
CommitChange(
change_type=CommitChangeType.AddOrUpdate,
path="trunk/code/foo",
properties={"svn:eol-style": "native"},
),
],
)
add_commit(
repo_url,
"Copy trunk/code/foo folder from revision 1",
[
CommitChange(
change_type=CommitChangeType.AddOrUpdate,
path="branches/code/foo",
copyfrom_path=repo_url + "/trunk/code/foo",
copyfrom_rev=1,
),
],
)
add_commit(
repo_url,
"Modify branches/code/foo previously copied",
[
CommitChange(
change_type=CommitChangeType.AddOrUpdate,
path="branches/code/foo",
data=b"foo\r\nbar\n",
),
],
)
loader = svn_loader_cls(
swh_storage, repo_url, temp_directory=tmp_path, check_revision=1
)
assert loader.load() == {"status": "eventful"}
assert_last_visit_matches(
loader.storage,
repo_url,
status="full",
type="svn",
)
check_snapshot(loader.snapshot, loader.storage)
def test_loader_check_tree_divergence(swh_storage, repo_url, tmp_path, caplog):
# create sample repository
add_commit(
repo_url,
"Create trunk/data folder",
[
CommitChange(
change_type=CommitChangeType.AddOrUpdate,
path="trunk/data/foo",
data=b"foo",
),
CommitChange(
change_type=CommitChangeType.AddOrUpdate,
path="trunk/data/bar",
data=b"bar",
),
CommitChange(
change_type=CommitChangeType.AddOrUpdate,
path="trunk/data/baz/",
),
],
)
# load it
loader = SvnLoader(
swh_storage,
repo_url,
temp_directory=tmp_path,
debug=True,
check_revision=1,
)
assert loader.load() == {"status": "eventful"}
# export it to a temporary directory
export_path, _ = loader.svnrepo.export_temporary(revision=1)
export_path = os.path.join(export_path, repo_url.split("/")[-1])
# modify some file content in the export and remove a path
with open(os.path.join(export_path, "trunk/data/foo"), "wb") as f:
f.write(b"baz")
shutil.rmtree(os.path.join(export_path, "trunk/data/baz/"))
# create directory model from the modified export
export_dir = Directory.from_disk(path=export_path.encode())
# ensure debug logs
caplog.set_level(logging.DEBUG)
# check exported tree and repository tree are diverging
with pytest.raises(ValueError):
loader._check_revision_divergence(1, export_dir.hash, export_dir)
# check diverging paths have been detected and logged
for debug_log in (
"directory with path b'trunk' has different hash in reconstructed repository filesystem", # noqa
"directory with path b'trunk/data' has different hash in reconstructed repository filesystem", # noqa
"content with path b'trunk/data/foo' has different hash in reconstructed repository filesystem", # noqa
"directory with path b'trunk/data/baz' is missing in reconstructed repository filesystem", # noqa
"below is diff between files:",
"@@ -1 +1 @@",
"-foo",
"+baz",
):
assert debug_log in caplog.text

File Metadata

Mime Type
text/x-diff
Expires
Jul 4 2025, 7:46 AM (10 w, 3 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3249994

Event Timeline