Changeset View
Changeset View
Standalone View
Standalone View
swh/loader/svn/loader.py
# Copyright (C) 2015-2021 The Software Heritage developers | # Copyright (C) 2015-2022 The Software Heritage developers | ||||||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||||||
"""Loader in charge of injecting either new or existing svn mirrors to | """Loader in charge of injecting either new or existing svn mirrors to | ||||||||
swh-storage. | swh-storage. | ||||||||
""" | """ | ||||||||
from datetime import datetime | from datetime import datetime | ||||||||
from mmap import ACCESS_WRITE, mmap | |||||||||
import os | import os | ||||||||
import pty | import pty | ||||||||
import re | import re | ||||||||
import shutil | import shutil | ||||||||
from subprocess import Popen | from subprocess import PIPE, Popen | ||||||||
import tempfile | import tempfile | ||||||||
from typing import Any, Dict, Iterator, List, Optional, Sequence, Tuple | from typing import Any, Dict, Iterator, List, Optional, Sequence, Tuple | ||||||||
from subvertpy import SubversionException | from subvertpy import SubversionException | ||||||||
from swh.loader.core.loader import BaseLoader | from swh.loader.core.loader import BaseLoader | ||||||||
from swh.loader.core.utils import clean_dangling_folders | from swh.loader.core.utils import clean_dangling_folders | ||||||||
from swh.loader.exception import NotFound | from swh.loader.exception import NotFound | ||||||||
▲ Show 20 Lines • Show All 627 Lines • ▼ Show 20 Lines | def get_last_loaded_svn_rev(self, svn_url: str) -> int: | ||||||||
if latest_snapshot_revision: | if latest_snapshot_revision: | ||||||||
_, latest_revision = latest_snapshot_revision | _, latest_revision = latest_snapshot_revision | ||||||||
latest_revision_headers = dict(latest_revision.extra_headers) | latest_revision_headers = dict(latest_revision.extra_headers) | ||||||||
svn_revision = int(latest_revision_headers[b"svn_revision"]) | svn_revision = int(latest_revision_headers[b"svn_revision"]) | ||||||||
except Exception: | except Exception: | ||||||||
pass | pass | ||||||||
return svn_revision | return svn_revision | ||||||||
def dump_svn_revisions(self, svn_url: str, last_loaded_svn_rev: int = -1) -> str: | def dump_svn_revisions( | ||||||||
self, svn_url: str, last_loaded_svn_rev: int = -1 | |||||||||
) -> Tuple[str, int]: | |||||||||
"""Generate a subversion dump file using the svnrdump tool. If the svnrdump | """Generate a subversion dump file using the svnrdump tool. If the svnrdump | ||||||||
ardumontUnsubmitted Not Done Inline Actions
ardumont: | |||||||||
command failed somehow, the produced dump file is analyzed to determine if a | command failed somehow, the produced dump file is analyzed to determine if a | ||||||||
partial loading is still feasible. | partial loading is still feasible. | ||||||||
Raises: | Raises: | ||||||||
NotFound when the repository is no longer found at url | NotFound when the repository is no longer found at url | ||||||||
Returns: | Returns: | ||||||||
The dump_path of the repository mounted | The dump_path of the repository mounted | ||||||||
Show All 9 Lines | ) -> Tuple[str, int]: | ||||||||
"--password", | "--password", | ||||||||
self.svnrepo.password, | self.svnrepo.password, | ||||||||
] | ] | ||||||||
# Launch the svnrdump command while capturing stderr as | # Launch the svnrdump command while capturing stderr as | ||||||||
# successfully dumped revision numbers are printed to it | # successfully dumped revision numbers are printed to it | ||||||||
dump_temp_dir = tempfile.mkdtemp(dir=self.temp_dir) | dump_temp_dir = tempfile.mkdtemp(dir=self.temp_dir) | ||||||||
dump_name = "".join(c for c in svn_url if c.isalnum()) | dump_name = "".join(c for c in svn_url if c.isalnum()) | ||||||||
dump_path = "%s/%s.svndump" % (dump_temp_dir, dump_name) | dump_path = "%s/%s.svndump.gz" % (dump_temp_dir, dump_name) | ||||||||
stderr_lines = [] | stderr_lines = [] | ||||||||
self.log.debug("Executing %s", " ".join(svnrdump_cmd)) | self.log.debug("Executing %s", " ".join(svnrdump_cmd)) | ||||||||
with open(dump_path, "wb") as dump_file: | with open(dump_path, "wb") as dump_file: | ||||||||
gzip = Popen(["gzip"], stdin=PIPE, stdout=dump_file) | |||||||||
stderr_r, stderr_w = pty.openpty() | stderr_r, stderr_w = pty.openpty() | ||||||||
svnrdump = Popen(svnrdump_cmd, stdout=dump_file, stderr=stderr_w) | svnrdump = Popen(svnrdump_cmd, stdout=gzip.stdin, stderr=stderr_w) | ||||||||
os.close(stderr_w) | os.close(stderr_w) | ||||||||
stderr_stream = OutputStream(stderr_r) | stderr_stream = OutputStream(stderr_r) | ||||||||
readable = True | readable = True | ||||||||
error_codes: List[str] = [] | error_codes: List[str] = [] | ||||||||
error_messages: List[str] = [] | error_messages: List[str] = [] | ||||||||
while readable: | while readable: | ||||||||
lines, readable = stderr_stream.read_lines() | lines, readable = stderr_stream.read_lines() | ||||||||
stderr_lines += lines | stderr_lines += lines | ||||||||
for line in lines: | for line in lines: | ||||||||
self.log.debug(line) | self.log.debug(line) | ||||||||
match = SUBVERSION_ERROR.search(line) | match = SUBVERSION_ERROR.search(line) | ||||||||
if match: | if match: | ||||||||
error_codes.append(match.group(1)) | error_codes.append(match.group(1)) | ||||||||
error_messages.append(line) | error_messages.append(line) | ||||||||
svnrdump.wait() | svnrdump.wait() | ||||||||
os.close(stderr_r) | os.close(stderr_r) | ||||||||
# denote end of read file | |||||||||
gzip.stdin.close() | |||||||||
gzip.wait() | |||||||||
if svnrdump.returncode == 0: | if svnrdump.returncode == 0: | ||||||||
return dump_path | return dump_path, -1 | ||||||||
# There was an error but it does not mean that no revisions | # There was an error but it does not mean that no revisions | ||||||||
# can be loaded. | # can be loaded. | ||||||||
# Get the stderr line with latest dumped revision | # Get the stderr line with latest dumped revision | ||||||||
last_dumped_rev = None | last_dumped_rev = None | ||||||||
for stderr_line in reversed(stderr_lines): | for stderr_line in reversed(stderr_lines): | ||||||||
if stderr_line.startswith("* Dumped revision"): | if stderr_line.startswith("* Dumped revision"): | ||||||||
Show All 11 Lines | ) -> Tuple[str, int]: | ||||||||
"svnrdump did not dump all expected revisions " | "svnrdump did not dump all expected revisions " | ||||||||
"but revisions range %s:%s are available in " | "but revisions range %s:%s are available in " | ||||||||
"the generated dump file and will be loaded " | "the generated dump file and will be loaded " | ||||||||
"into the archive." | "into the archive." | ||||||||
), | ), | ||||||||
last_loaded_svn_rev + 1, | last_loaded_svn_rev + 1, | ||||||||
last_dumped_rev, | last_dumped_rev, | ||||||||
) | ) | ||||||||
# Truncate the dump file after the last successfully dumped | |||||||||
# revision to avoid the loading of corrupted data | |||||||||
self.log.debug( | |||||||||
( | |||||||||
"Truncating dump file after the last " | |||||||||
"successfully dumped revision (%s) to avoid " | |||||||||
"the loading of corrupted data" | |||||||||
), | |||||||||
last_dumped_rev, | |||||||||
) | |||||||||
with open(dump_path, "r+b") as f: | |||||||||
with mmap(f.fileno(), 0, access=ACCESS_WRITE) as s: | |||||||||
pattern = ( | |||||||||
"Revision-number: %s" % (last_dumped_rev + 1) | |||||||||
).encode() | |||||||||
n = s.rfind(pattern) | |||||||||
if n != -1: | |||||||||
s.resize(n) | |||||||||
self.truncated_dump = True | self.truncated_dump = True | ||||||||
return dump_path | return dump_path, last_dumped_rev | ||||||||
elif last_dumped_rev != -1 and last_dumped_rev < last_loaded_svn_rev: | elif last_dumped_rev != -1 and last_dumped_rev < last_loaded_svn_rev: | ||||||||
raise Exception( | raise Exception( | ||||||||
( | ( | ||||||||
"Last dumped subversion revision (%s) is " | "Last dumped subversion revision (%s) is " | ||||||||
"lesser than the last one loaded into the " | "lesser than the last one loaded into the " | ||||||||
"archive (%s)." | "archive (%s)." | ||||||||
) | ) | ||||||||
% (last_dumped_rev, last_loaded_svn_rev) | % (last_dumped_rev, last_loaded_svn_rev) | ||||||||
Show All 36 Lines | def prepare(self): | ||||||||
self._snapshot = last_loaded_snp | self._snapshot = last_loaded_snp | ||||||||
self._last_revision = last_loaded_rev | self._last_revision = last_loaded_rev | ||||||||
self.done = True | self.done = True | ||||||||
self.skip_post_load = True | self.skip_post_load = True | ||||||||
return | return | ||||||||
# Then try to generate a dump file containing relevant svn revisions | # Then try to generate a dump file containing relevant svn revisions | ||||||||
# to load, an exception will be thrown if something wrong happened | # to load, an exception will be thrown if something wrong happened | ||||||||
dump_path = self.dump_svn_revisions(self.svn_url, last_loaded_svn_rev) | dump_path, max_rev = self.dump_svn_revisions(self.svn_url, last_loaded_svn_rev) | ||||||||
# Finally, mount the dump and load the repository | # Finally, mount the dump and load the repository | ||||||||
self.log.debug('Mounting dump file with "svnadmin load".') | self.log.debug('Mounting dump file with "svnadmin load".') | ||||||||
_, self.repo_path = init_svn_repo_from_dump( | _, self.repo_path = init_svn_repo_from_dump( | ||||||||
dump_path, | dump_path, | ||||||||
prefix=TEMPORARY_DIR_PREFIX_PATTERN, | prefix=TEMPORARY_DIR_PREFIX_PATTERN, | ||||||||
suffix="-%s" % os.getpid(), | suffix="-%s" % os.getpid(), | ||||||||
root_dir=self.temp_dir, | root_dir=self.temp_dir, | ||||||||
gzip=True, | |||||||||
max_rev=max_rev, | |||||||||
) | ) | ||||||||
self.svn_url = "file://%s" % self.repo_path | self.svn_url = "file://%s" % self.repo_path | ||||||||
super().prepare() | super().prepare() | ||||||||
def cleanup(self): | def cleanup(self): | ||||||||
super().cleanup() | super().cleanup() | ||||||||
if self.temp_dir and os.path.exists(self.temp_dir): | if self.temp_dir and os.path.exists(self.temp_dir): | ||||||||
shutil.rmtree(self.temp_dir) | shutil.rmtree(self.temp_dir) | ||||||||
def visit_status(self): | def visit_status(self): | ||||||||
if self.truncated_dump: | if self.truncated_dump: | ||||||||
return "partial" | return "partial" | ||||||||
else: | else: | ||||||||
return super().visit_status() | return super().visit_status() |