Changeset View
Changeset View
Standalone View
Standalone View
swh/loader/svn/loader.py
# Copyright (C) 2015-2018 The Software Heritage developers | # Copyright (C) 2015-2018 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
"""Loader in charge of injecting either new or existing svn mirrors to | """Loader in charge of injecting either new or existing svn mirrors to | ||||
swh-storage. | swh-storage. | ||||
""" | """ | ||||
import os | import os | ||||
import shutil | import shutil | ||||
import tempfile | import tempfile | ||||
from swh.core import utils | |||||
from swh.model import hashutil | from swh.model import hashutil | ||||
from swh.model.from_disk import Directory | from swh.model.from_disk import Directory | ||||
from swh.model.identifiers import identifier_to_bytes, revision_identifier | from swh.model.identifiers import identifier_to_bytes, revision_identifier | ||||
from swh.model.identifiers import snapshot_identifier | from swh.model.identifiers import snapshot_identifier | ||||
from swh.loader.core.loader import SWHLoader | from swh.loader.core.loader import SWHLoader | ||||
from swh.loader.core.utils import clean_dangling_folders | from swh.loader.core.utils import clean_dangling_folders | ||||
from . import svn, converters | from . import svn, converters | ||||
from .utils import init_svn_repo_from_archive_dump | from .utils import init_svn_repo_from_archive_dump | ||||
from .exception import SvnLoaderEventful, SvnLoaderUneventful | from .exception import SvnLoaderUneventful | ||||
from .exception import SvnLoaderHistoryAltered | from .exception import SvnLoaderHistoryAltered | ||||
DEFAULT_BRANCH = b'master' | DEFAULT_BRANCH = b'master' | ||||
def _revision_id(revision): | def _revision_id(revision): | ||||
return identifier_to_bytes(revision_identifier(revision)) | return identifier_to_bytes(revision_identifier(revision)) | ||||
Show All 12 Lines | return { | ||||
} | } | ||||
} | } | ||||
} | } | ||||
TEMPORARY_DIR_PREFIX_PATTERN = 'swh.loader.svn.' | TEMPORARY_DIR_PREFIX_PATTERN = 'swh.loader.svn.' | ||||
class SWHSvnLoader(SWHLoader): | class SvnLoader(SWHLoader): | ||||
"""Swh svn loader to load an svn repository The repository is either | """Swh svn loader to load an svn repository The repository is either | ||||
remote or local. The loader deals with update on an already | remote or local. The loader deals with update on an already | ||||
previously loaded repository. | previously loaded repository. | ||||
Default policy: | Default policy: | ||||
Keep data as close as possible from the original svn data. We | Keep data as close as possible from the original svn data. We | ||||
only add information that are needed for update or continuing | only add information that are needed for update or continuing | ||||
from last known revision (svn revision and svn repository's | from last known revision (svn revision and svn repository's | ||||
Show All 10 Lines | class SvnLoader(SWHLoader): | ||||
def __init__(self): | def __init__(self): | ||||
super().__init__(logging_class='swh.loader.svn.SvnLoader') | super().__init__(logging_class='swh.loader.svn.SvnLoader') | ||||
self.check_revision = self.config['check_revision'] | self.check_revision = self.config['check_revision'] | ||||
self.origin_id = None | self.origin_id = None | ||||
self.debug = self.config['debug'] | self.debug = self.config['debug'] | ||||
self.last_seen_revision = None | self.last_seen_revision = None | ||||
self.temp_directory = self.config['temp_directory'] | self.temp_directory = self.config['temp_directory'] | ||||
self.done = False | |||||
self.svnrepo = None | |||||
# internal state used to store swh objects | |||||
self._contents = [] | |||||
self._directories = [] | |||||
self._revisions = [] | |||||
self._snapshot = None | |||||
self._last_revision = None | |||||
self._visit_status = 'full' | |||||
self._load_status = 'uneventful' | |||||
def pre_cleanup(self): | def pre_cleanup(self): | ||||
"""Cleanup potential dangling files from prior runs (e.g. OOM killed | """Cleanup potential dangling files from prior runs (e.g. OOM killed | ||||
tasks) | tasks) | ||||
""" | """ | ||||
clean_dangling_folders(self.temp_directory, | clean_dangling_folders(self.temp_directory, | ||||
pattern_check=TEMPORARY_DIR_PREFIX_PATTERN, | pattern_check=TEMPORARY_DIR_PREFIX_PATTERN, | ||||
log=self.log) | log=self.log) | ||||
def cleanup(self): | def cleanup(self): | ||||
"""Clean up the svn repository's working representation on disk. | """Clean up the svn repository's working representation on disk. | ||||
""" | """ | ||||
anlambert: I would rather initialize the svnrepo to None in the constructor then test again self.svnrepo… | |||||
Done Inline ActionsAgreed. ardumont: Agreed.
As this is already done, in your diff, i'll let you fix it. | |||||
Not Done Inline ActionsI did it as it annoyed me too much, thx. ardumont: I did it as it annoyed me too much, thx. | |||||
if self.svnrepo: # could happen if `prepare` fails | |||||
return | |||||
if self.debug: | if self.debug: | ||||
self.log.error('''NOT FOR PRODUCTION - debug flag activated | self.log.error('''NOT FOR PRODUCTION - debug flag activated | ||||
Local repository not cleaned up for investigation: %s''' % ( | Local repository not cleaned up for investigation: %s''' % ( | ||||
self.svnrepo.local_url.decode('utf-8'), )) | self.svnrepo.local_url.decode('utf-8'), )) | ||||
return | return | ||||
self.svnrepo.clean_fs() | self.svnrepo.clean_fs() | ||||
def swh_revision_hash_tree_at_svn_revision(self, revision): | def swh_revision_hash_tree_at_svn_revision(self, revision): | ||||
"""Compute and return the hash tree at a given svn revision. | """Compute and return the hash tree at a given svn revision. | ||||
Args: | Args: | ||||
rev (int): the svn revision we want to check | rev (int): the svn revision we want to check | ||||
Returns: | Returns: | ||||
The hash tree directory as bytes. | The hash tree directory as bytes. | ||||
""" | """ | ||||
local_dirname, local_url = self.svnrepo.export_temporary(revision) | local_dirname, local_url = self.svnrepo.export_temporary(revision) | ||||
h = Directory.from_disk(path=local_url).hash | h = Directory.from_disk(path=local_url).hash | ||||
self.svnrepo.clean_fs(local_dirname) | self.svnrepo.clean_fs(local_dirname) | ||||
return h | return h | ||||
def get_svn_repo(self, svn_url, local_dirname, origin): | def get_svn_repo(self, svn_url, local_dirname, origin_id): | ||||
"""Instantiates the needed svnrepo collaborator to permit reading svn | """Instantiates the needed svnrepo collaborator to permit reading svn | ||||
repository. | repository. | ||||
Args: | Args: | ||||
svn_url (str): the svn repository url to read from | svn_url (str): the svn repository url to read from | ||||
local_dirname (str): the local path on disk to compute data | local_dirname (str): the local path on disk to compute data | ||||
origin (int): the corresponding origin | origin_id (int): the corresponding origin id | ||||
Returns: | Returns: | ||||
Instance of :mod:`swh.loader.svn.svn` clients | Instance of :mod:`swh.loader.svn.svn` clients | ||||
""" | """ | ||||
return svn.SWHSvnRepo( | return svn.SvnRepo(svn_url, | ||||
svn_url, origin['id'], self.storage, | local_dirname=local_dirname, origin_id=origin_id) | ||||
local_dirname=local_dirname) | |||||
def swh_latest_snapshot_revision(self, origin_id, | def swh_latest_snapshot_revision(self, origin_id, | ||||
previous_swh_revision=None): | previous_swh_revision=None): | ||||
"""Look for latest snapshot revision and returns it if any. | """Look for latest snapshot revision and returns it if any. | ||||
Args: | Args: | ||||
origin_id (int): Origin identifier | origin_id (int): Origin identifier | ||||
previous_swh_revision: (optional) id of a possible | previous_swh_revision: (optional) id of a possible | ||||
Show All 20 Lines | def swh_latest_snapshot_revision(self, origin_id, | ||||
return {} | return {} | ||||
target_type = branch['target_type'] | target_type = branch['target_type'] | ||||
if target_type != 'revision': | if target_type != 'revision': | ||||
return {} | return {} | ||||
previous_swh_revision = branch['target'] | previous_swh_revision = branch['target'] | ||||
else: | else: | ||||
return {} | return {} | ||||
revs = list(storage.revision_get([previous_swh_revision])) | if isinstance(previous_swh_revision, dict): | ||||
swh_id = previous_swh_revision['id'] | |||||
else: | |||||
swh_id = previous_swh_revision | |||||
revs = list(storage.revision_get([swh_id])) | |||||
if revs: | if revs: | ||||
return { | return { | ||||
'snapshot': latest_snap, | 'snapshot': latest_snap, | ||||
'revision': revs[0] | 'revision': revs[0] | ||||
} | } | ||||
return {} | return {} | ||||
def build_swh_revision(self, rev, commit, dir_id, parents): | def build_swh_revision(self, rev, commit, dir_id, parents): | ||||
Show All 10 Lines | def build_swh_revision(self, rev, commit, dir_id, parents): | ||||
commit (dict): the commit metadata | commit (dict): the commit metadata | ||||
dir_id (bytes): the upper tree's hash identifier | dir_id (bytes): the upper tree's hash identifier | ||||
parents ([bytes]): the parents' identifiers | parents ([bytes]): the parents' identifiers | ||||
Returns: | Returns: | ||||
The swh revision corresponding to the svn revision. | The swh revision corresponding to the svn revision. | ||||
""" | """ | ||||
return converters.build_swh_revision(rev, | return converters.build_swh_revision( | ||||
commit, | rev, commit, self.svnrepo.uuid, dir_id, parents) | ||||
self.svnrepo.uuid, | |||||
dir_id, | |||||
parents) | |||||
def check_history_not_altered(self, svnrepo, revision_start, swh_rev): | def check_history_not_altered(self, svnrepo, revision_start, swh_rev): | ||||
"""Given a svn repository, check if the history was not tampered with. | """Given a svn repository, check if the history was not tampered with. | ||||
""" | """ | ||||
revision_id = swh_rev['id'] | revision_id = swh_rev['id'] | ||||
parents = swh_rev['parents'] | parents = swh_rev['parents'] | ||||
hash_data_per_revs = svnrepo.swh_hash_data_at_revision(revision_start) | hash_data_per_revs = svnrepo.swh_hash_data_at_revision(revision_start) | ||||
rev = revision_start | rev = revision_start | ||||
rev, _, commit, _, root_dir = list(hash_data_per_revs)[0] | rev, _, commit, _, root_dir = list(hash_data_per_revs)[0] | ||||
dir_id = root_dir.hash | dir_id = root_dir.hash | ||||
swh_revision = self.build_swh_revision(rev, | swh_revision = self.build_swh_revision(rev, | ||||
commit, | commit, | ||||
dir_id, | dir_id, | ||||
parents) | parents) | ||||
swh_revision_id = _revision_id(swh_revision) | swh_revision_id = _revision_id(swh_revision) | ||||
return swh_revision_id == revision_id | return swh_revision_id == revision_id | ||||
def process_repository(self, origin_visit, | def _init_from(self, partial_swh_revision, previous_swh_revision): | ||||
last_known_swh_revision=None, | """Function to determine from where to start from. | ||||
start_from_scratch=False): | |||||
"""The main idea of this function is to: | Args: | ||||
partial_swh_revision (dict): A known revision from which | |||||
the previous loading did not | |||||
finish. | |||||
known_previous_revision (dict): A known revision from | |||||
which the previous loading | |||||
did finish. | |||||
- iterate over the svn commit logs | Returns: | ||||
- extract the svn commit log metadata | The revision from which to start or None if nothing (fresh | ||||
- compute the hashes from the current directory down to the file | start). | ||||
- compute the equivalent swh revision | |||||
- send all those objects for storage | |||||
- create an swh occurrence pointing to the last swh revision seen | |||||
- send that occurrence for storage in swh-storage. | |||||
""" | """ | ||||
svnrepo = self.svnrepo | if partial_swh_revision and not previous_swh_revision: | ||||
return partial_swh_revision | |||||
if not partial_swh_revision and previous_swh_revision: | |||||
return previous_swh_revision | |||||
if partial_swh_revision and previous_swh_revision: | |||||
# will determine from which to start from | |||||
extra_headers1 = dict( | |||||
partial_swh_revision['metadata']['extra_headers']) | |||||
extra_headers2 = dict( | |||||
previous_swh_revision['metadata']['extra_headers']) | |||||
rev_start1 = int(extra_headers1['svn_revision']) | |||||
rev_start2 = int(extra_headers2['svn_revision']) | |||||
if rev_start1 <= rev_start2: | |||||
return previous_swh_revision | |||||
return partial_swh_revision | |||||
return None | |||||
def start_from(self, last_known_swh_revision=None, | |||||
start_from_scratch=False): | |||||
"""Determine from where to start the loading. | |||||
Args: | |||||
last_known_swh_revision (dict): Last know swh revision or None | |||||
start_from_scratch (bool): To start loading from scratch or not | |||||
revision_head = svnrepo.head_revision() | Returns: | ||||
tuple (revision_start, revision_end, revision_parents) | |||||
Raises: | |||||
SvnLoaderHistoryAltered: When a hash divergence has been | |||||
detected (should not happen) | |||||
SvnLoaderUneventful: Nothing changed since last visit | |||||
""" | |||||
revision_head = self.svnrepo.head_revision() | |||||
if revision_head == 0: # empty repository case | if revision_head == 0: # empty repository case | ||||
revision_start = 0 | revision_start = 0 | ||||
revision_end = 0 | revision_end = 0 | ||||
else: # default configuration | else: # default configuration | ||||
revision_start = svnrepo.initial_revision() | revision_start = self.svnrepo.initial_revision() | ||||
revision_end = revision_head | revision_end = revision_head | ||||
revision_parents = { | revision_parents = { | ||||
revision_start: [] | revision_start: [] | ||||
} | } | ||||
if not start_from_scratch: | if not start_from_scratch: | ||||
# Check if we already know a previous revision for that origin | # Check if we already know a previous revision for that origin | ||||
if self.latest_snapshot: | if self.latest_snapshot: | ||||
swh_rev = self.latest_snapshot['revision'] | swh_rev = self.latest_snapshot['revision'] | ||||
else: | else: | ||||
swh_rev = None | swh_rev = None | ||||
# Determine from which known revision to start | # Determine from which known revision to start | ||||
swh_rev = self.init_from(last_known_swh_revision, | swh_rev = self._init_from(last_known_swh_revision, | ||||
previous_swh_revision=swh_rev) | previous_swh_revision=swh_rev) | ||||
if swh_rev: # Yes, we know a previous revision. Try and update it. | if swh_rev: # Yes, we know a previous revision. Try and update it. | ||||
extra_headers = dict(swh_rev['metadata']['extra_headers']) | extra_headers = dict(swh_rev['metadata']['extra_headers']) | ||||
revision_start = int(extra_headers['svn_revision']) | revision_start = int(extra_headers['svn_revision']) | ||||
revision_parents = { | revision_parents = { | ||||
revision_start: swh_rev['parents'], | revision_start: swh_rev['parents'], | ||||
} | } | ||||
self.log.debug('svn export --ignore-keywords %s@%s' % ( | self.log.debug('svn export --ignore-keywords %s@%s' % ( | ||||
svnrepo.remote_url, | self.svnrepo.remote_url, | ||||
revision_start)) | revision_start)) | ||||
if swh_rev and not self.check_history_not_altered( | if swh_rev and not self.check_history_not_altered( | ||||
svnrepo, | self.svnrepo, | ||||
revision_start, | revision_start, | ||||
swh_rev): | swh_rev): | ||||
msg = 'History of svn %s@%s altered. ' \ | msg = 'History of svn %s@%s altered. ' \ | ||||
'Skipping...' % ( | 'Skipping...' % ( | ||||
svnrepo.remote_url, revision_start) | self.svnrepo.remote_url, revision_start) | ||||
raise SvnLoaderHistoryAltered(msg) | raise SvnLoaderHistoryAltered(msg) | ||||
# now we know history is ok, we start at next revision | # now we know history is ok, we start at next revision | ||||
revision_start = revision_start + 1 | revision_start = revision_start + 1 | ||||
# and the parent become the latest know revision for | # and the parent become the latest know revision for | ||||
# that repository | # that repository | ||||
revision_parents[revision_start] = [swh_rev['id']] | revision_parents[revision_start] = [swh_rev['id']] | ||||
if revision_start > revision_end and revision_start is not 1: | if revision_start > revision_end and revision_start is not 1: | ||||
msg = '%s@%s already injected.' % (svnrepo.remote_url, | msg = '%s@%s already injected.' % (self.svnrepo.remote_url, | ||||
revision_end) | revision_end) | ||||
raise SvnLoaderUneventful(msg) | raise SvnLoaderUneventful(msg) | ||||
self.log.info('Processing revisions [%s-%s] for %s' % ( | self.log.info('Processing revisions [%s-%s] for %s' % ( | ||||
revision_start, revision_end, svnrepo)) | revision_start, revision_end, self.svnrepo)) | ||||
# process and store revision to swh (sent by by blocks of | return revision_start, revision_end, revision_parents | ||||
# 'revision_packet_size') | |||||
return self.process_swh_revisions( | |||||
svnrepo, revision_start, revision_end, revision_parents) | |||||
def process_svn_revisions(self, svnrepo, revision_start, revision_end, | def process_svn_revisions(self, svnrepo, revision_start, revision_end, | ||||
revision_parents): | revision_parents): | ||||
"""Process revisions from revision_start to revision_end and send to | """Process svn revisions from revision_start to revision_end. | ||||
swh for storage. | |||||
At each svn revision, checkout the repository, compute the | At each svn revision, apply new diffs and simultaneously | ||||
tree hash and blobs and send for swh storage to store. | compute swh hashes. This yields those computed swh hashes as | ||||
Then computes and yields the swh revision. | a tuple (contents, directories, revision). | ||||
Note that at every self.check_revision, an svn export is done | Note that at every `self.check_revision`, a supplementary | ||||
and a hash tree is computed to check that no divergence | check takes place to check for hash-tree divergence (related | ||||
occurred. | T570). | ||||
Yields: | Yields: | ||||
swh revision as a dictionary with keys, sha1_git, sha1, etc... | tuple (contents, directories, revision) of dict as a | ||||
dictionary with keys, sha1_git, sha1, etc... | |||||
Raises: | |||||
ValueError in case of a hash divergence detection | |||||
""" | """ | ||||
gen_revs = svnrepo.swh_hash_data_per_revision( | gen_revs = svnrepo.swh_hash_data_per_revision( | ||||
revision_start, | revision_start, | ||||
revision_end) | revision_end) | ||||
swh_revision = None | swh_revision = None | ||||
count = 0 | count = 0 | ||||
for rev, nextrev, commit, new_objects, root_directory in gen_revs: | for rev, nextrev, commit, new_objects, root_directory in gen_revs: | ||||
count += 1 | count += 1 | ||||
# Send the associated contents/directories | # Send the associated contents/directories | ||||
self.maybe_load_contents(new_objects.get('content', {}).values()) | _contents = new_objects.get('content', {}).values() | ||||
self.maybe_load_directories( | _directories = new_objects.get('directory', {}).values() | ||||
new_objects.get('directory', {}).values()) | |||||
# compute the fs tree's checksums | # compute the fs tree's checksums | ||||
dir_id = root_directory.hash | dir_id = root_directory.hash | ||||
swh_revision = self.build_swh_revision( | swh_revision = self.build_swh_revision( | ||||
rev, commit, dir_id, revision_parents[rev]) | rev, commit, dir_id, revision_parents[rev]) | ||||
swh_revision['id'] = _revision_id(swh_revision) | swh_revision['id'] = _revision_id(swh_revision) | ||||
self.log.debug('rev: %s, swhrev: %s, dir: %s' % ( | self.log.debug('rev: %s, swhrev: %s, dir: %s' % ( | ||||
rev, | rev, | ||||
hashutil.hash_to_hex(swh_revision['id']), | hashutil.hash_to_hex(swh_revision['id']), | ||||
hashutil.hash_to_hex(dir_id))) | hashutil.hash_to_hex(dir_id))) | ||||
# FIXME: Is that still necessary? Rationale: T570 is now closed | |||||
if (count % self.check_revision) == 0: # hash computation check | if (count % self.check_revision) == 0: # hash computation check | ||||
self.log.debug('Checking hash computations on revision %s...' % | self.log.debug('Checking hash computations on revision %s...' % | ||||
rev) | rev) | ||||
checked_dir_id = self.swh_revision_hash_tree_at_svn_revision( | checked_dir_id = self.swh_revision_hash_tree_at_svn_revision( | ||||
rev) | rev) | ||||
if checked_dir_id != dir_id: | if checked_dir_id != dir_id: | ||||
err = 'Hash tree computation divergence detected ' \ | err = 'Hash tree computation divergence detected ' \ | ||||
'(%s != %s), stopping!' % ( | '(%s != %s), stopping!' % ( | ||||
hashutil.hash_to_hex(dir_id), | hashutil.hash_to_hex(dir_id), | ||||
hashutil.hash_to_hex(checked_dir_id)) | hashutil.hash_to_hex(checked_dir_id)) | ||||
raise ValueError(err) | raise ValueError(err) | ||||
Done Inline ActionsRemoving this check block will speed up the loader. This was added because at the time, the reason of the hash divergence detected was not yet known (T570). ardumont: Removing this check block will speed up the loader.
Especially on big repositories.
This was… | |||||
Done Inline ActionsInstead of dropping it, maybe enabling/disabling it through configuration would be a better choice. This is useful for debugging so I think we should keep it. anlambert: Instead of dropping it, maybe enabling/disabling it through configuration would be a better… | |||||
Not Done Inline ActionsEven better suggestion! Agreed. ardumont: Even better suggestion! Agreed. | |||||
if nextrev: | if nextrev: | ||||
revision_parents[nextrev] = [swh_revision['id']] | revision_parents[nextrev] = [swh_revision['id']] | ||||
yield swh_revision | yield _contents, _directories, swh_revision | ||||
def process_swh_revisions(self, | |||||
svnrepo, | |||||
revision_start, | |||||
revision_end, | |||||
revision_parents): | |||||
"""Process and store revision to swh (sent by blocks of | |||||
revision_packet_size) | |||||
Returns: | |||||
The latest revision stored. | |||||
""" | |||||
try: | |||||
swh_revision_gen = self.process_svn_revisions(svnrepo, | |||||
revision_start, | |||||
revision_end, | |||||
revision_parents) | |||||
revs = [] | |||||
for revisions in utils.grouper( | |||||
swh_revision_gen, | |||||
self.config['revision_packet_size']): | |||||
revs = list(revisions) | |||||
self.maybe_load_revisions(revs) | |||||
last_revision = revs[-1] | |||||
self.log.debug('Processed %s revisions: [..., %s]' % ( | |||||
len(revs), hashutil.hash_to_hex(last_revision['id']))) | |||||
self.last_seen_revision = last_revision | |||||
except Exception as e: | |||||
if revs: | |||||
# flush remaining revisions | |||||
self.maybe_load_revisions(revs) | |||||
# Take the last one as the last known revisions | |||||
known_swh_rev = revs[-1] | |||||
elif self.last_seen_revision: # We'll try to make a snapshot | |||||
known_swh_rev = self.last_seen_revision | |||||
else: | |||||
raise | |||||
_id = known_swh_rev.get('id') | |||||
if not _id: | |||||
_id = _revision_id(known_swh_rev) | |||||
# Then notify something is wrong, and we stopped at that rev. | |||||
raise SvnLoaderEventful(e, swh_revision={ | |||||
'id': _id, | |||||
}) | |||||
return last_revision | |||||
def process_swh_snapshot(self, revision=None, snapshot=None): | |||||
"""Create the snapshot either from existing snapshot or revision. | |||||
""" | |||||
if snapshot: | |||||
snap = snapshot | |||||
elif revision: | |||||
snap = build_swh_snapshot(revision['id']) | |||||
snap['id'] = identifier_to_bytes(snapshot_identifier(snap)) | |||||
else: | |||||
return None | |||||
self.log.debug('snapshot: %s' % snap) | |||||
self.maybe_load_snapshot(snap) | |||||
def prepare_origin_visit(self, *, svn_url, visit_date=None, | def prepare_origin_visit(self, *, svn_url, visit_date=None, | ||||
origin_url=None, **kwargs): | origin_url=None, **kwargs): | ||||
self.origin = { | self.origin = { | ||||
'url': origin_url if origin_url else svn_url, | 'url': origin_url if origin_url else svn_url, | ||||
'type': 'svn', | 'type': 'svn', | ||||
} | } | ||||
self.visit_date = visit_date | self.visit_date = visit_date | ||||
def prepare(self, *, svn_url, destination_path=None, | def prepare(self, *, svn_url, destination_path=None, | ||||
swh_revision=None, start_from_scratch=False, **kwargs): | swh_revision=None, start_from_scratch=False, **kwargs): | ||||
self.start_from_scratch = start_from_scratch | self.start_from_scratch = start_from_scratch | ||||
if swh_revision: | if swh_revision: | ||||
self.last_known_swh_revision = hashutil.hash_to_bytes( | self.last_known_swh_revision = swh_revision | ||||
swh_revision) | |||||
else: | else: | ||||
self.last_known_swh_revision = None | self.last_known_swh_revision = None | ||||
self.latest_snapshot = self.swh_latest_snapshot_revision( | self.latest_snapshot = self.swh_latest_snapshot_revision( | ||||
self.origin_id, self.last_known_swh_revision) | self.origin_id, self.last_known_swh_revision) | ||||
if destination_path: | if destination_path: | ||||
local_dirname = destination_path | local_dirname = destination_path | ||||
else: | else: | ||||
local_dirname = tempfile.mkdtemp( | local_dirname = tempfile.mkdtemp( | ||||
suffix='-%s' % os.getpid(), | suffix='-%s' % os.getpid(), | ||||
prefix=TEMPORARY_DIR_PREFIX_PATTERN, | prefix=TEMPORARY_DIR_PREFIX_PATTERN, | ||||
dir=self.temp_directory) | dir=self.temp_directory) | ||||
self.svnrepo = self.get_svn_repo(svn_url, local_dirname, self.origin) | self.svnrepo = self.get_svn_repo( | ||||
svn_url, local_dirname, self.origin_id) | |||||
try: | |||||
revision_start, revision_end, revision_parents = self.start_from( | |||||
self.last_known_swh_revision, self.start_from_scratch) | |||||
self.swh_revision_gen = self.process_svn_revisions( | |||||
self.svnrepo, revision_start, revision_end, revision_parents) | |||||
except SvnLoaderUneventful as e: | |||||
self.log.warn(e) | |||||
if self.latest_snapshot and 'snapshot' in self.latest_snapshot: | |||||
self._snapshot = self.latest_snapshot['snapshot'] | |||||
self.done = True | |||||
except SvnLoaderHistoryAltered as e: | |||||
self.log.error(e) | |||||
self.done = True | |||||
self._visit_status = 'partial' | |||||
def fetch_data(self): | def fetch_data(self): | ||||
"""We need to fetch and stream the data to store directly. So | """Fetching svn revision information. | ||||
fetch_data do actually nothing. The method ``store_data`` below is in | |||||
charge to do everything, fetch and store. | |||||
""" | This will apply svn revision as patch on disk, and at the same | ||||
pass | time, compute the swh hashes. | ||||
def store_data(self): | In effect, fetch_data fetches those data and compute the | ||||
"""We need to fetch and stream the data to store directly because | necessary swh objects. It's then stored in the internal state | ||||
there is too much data and state changes. Everything is | instance variables (initialized in `_prepare_state`). | ||||
intertwined together (We receive patch and apply on disk and | |||||
compute at the hashes at the same time) | |||||
So every data to fetch and store is done here. | This is up to `store_data` to actually discuss with the | ||||
storage to store those objects. | |||||
Note: | Returns: | ||||
origin_visit and last_known_swh_revision must have been set in the | bool: True to continue fetching data (next svn revision), | ||||
prepare method. | False to stop. | ||||
""" | """ | ||||
origin_visit = {'origin': self.origin_id, 'visit': self.visit} | data = None | ||||
try: | if self.done: | ||||
latest_rev = self.process_repository( | return False | ||||
origin_visit, | |||||
last_known_swh_revision=self.last_known_swh_revision, | |||||
start_from_scratch=self.start_from_scratch) | |||||
except SvnLoaderEventful as e: | |||||
latest_rev = e.swh_revision | |||||
self.process_swh_snapshot(revision=latest_rev) | |||||
raise | |||||
except Exception as e: | |||||
if self.latest_snapshot and 'snapshot' in self.latest_snapshot: | |||||
snapshot = self.latest_snapshot['snapshot'] | |||||
self.process_swh_snapshot(snapshot=snapshot) | |||||
raise | |||||
else: | |||||
self.process_swh_snapshot(revision=latest_rev) | |||||
def init_from(self, partial_swh_revision, previous_swh_revision): | try: | ||||
"""Function to determine from where to start from. | data = next(self.swh_revision_gen) | ||||
self._load_status = 'eventful' | |||||
except StopIteration: | |||||
self.done = True | |||||
self._visit_status = 'full' | |||||
return False # Stopping iteration | |||||
except Exception as e: # Potential: svn:external, i/o error... | |||||
self.done = True | |||||
self._visit_status = 'partial' | |||||
return False # Stopping iteration | |||||
self._contents, self._directories, revision = data | |||||
if revision: | |||||
self._last_revision = revision | |||||
self._revisions.append(revision) | |||||
return True # next svn revision | |||||
Args: | def store_data(self): | ||||
partial_swh_revision: A known revision from which | """We store the data accumulated in internal instance variable. If | ||||
the previous loading did not finish. | the iteration over the svn revisions is done, we create the | ||||
known_previous_revision: A known revision from which the | snapshot and flush to storage the data. | ||||
previous loading did finish. | |||||
Returns: | This also resets the internal instance variable state. | ||||
The revision from which to start or None if nothing (fresh | |||||
start). | |||||
""" | """ | ||||
if partial_swh_revision and not previous_swh_revision: | self.maybe_load_contents(self._contents) | ||||
return partial_swh_revision | self.maybe_load_directories(self._directories) | ||||
if not partial_swh_revision and previous_swh_revision: | self.maybe_load_revisions(self._revisions) | ||||
return previous_swh_revision | |||||
if partial_swh_revision and previous_swh_revision: | if self.done: # finish line, snapshot! | ||||
# will determine from which to start from | self.generate_and_load_snapshot(revision=self._last_revision, | ||||
extra_headers1 = dict( | snapshot=self._snapshot) | ||||
partial_swh_revision['metadata']['extra_headers']) | self.flush() | ||||
extra_headers2 = dict( | |||||
previous_swh_revision['metadata']['extra_headers']) | self._contents = [] | ||||
rev_start1 = int(extra_headers1['svn_revision']) | self._directories = [] | ||||
rev_start2 = int(extra_headers2['svn_revision']) | self._revisions = [] | ||||
if rev_start1 <= rev_start2: | |||||
return previous_swh_revision | def generate_and_load_snapshot(self, revision=None, snapshot=None): | ||||
return partial_swh_revision | """Create the snapshot either from existing revision or snapshot. | ||||
Revision (supposedly new) has priority over the snapshot | |||||
(supposedly existing one). | |||||
Args: | |||||
revision (dict): Last revision seen if any (None by default) | |||||
snapshot (dict): Snapshot to use if any (None by default) | |||||
""" | |||||
if revision: # Priority to the revision | |||||
snap = build_swh_snapshot(revision['id']) | |||||
snap['id'] = identifier_to_bytes(snapshot_identifier(snap)) | |||||
elif snapshot: # Fallback to prior snapshot | |||||
snap = snapshot | |||||
else: | |||||
return None | return None | ||||
self.log.debug('snapshot: %s' % snap) | |||||
self.maybe_load_snapshot(snap) | |||||
def load_status(self): | |||||
return { | |||||
'status': self._load_status, | |||||
} | |||||
def visit_status(self): | |||||
return self._visit_status | |||||
class SWHSvnLoaderFromDumpArchive(SWHSvnLoader): | class SvnLoaderFromDumpArchive(SvnLoader): | ||||
"""Uncompress an archive containing an svn dump, mount the svn dump as | """Uncompress an archive containing an svn dump, mount the svn dump as | ||||
an svn repository and load said repository. | an svn repository and load said repository. | ||||
""" | """ | ||||
def __init__(self, archive_path): | def __init__(self, archive_path): | ||||
super().__init__() | super().__init__() | ||||
self.log.info('Archive to mount and load %s' % archive_path) | self.log.info('Archive to mount and load %s' % archive_path) | ||||
self.temp_dir, self.repo_path = init_svn_repo_from_archive_dump( | self.temp_dir, self.repo_path = init_svn_repo_from_archive_dump( | ||||
Show All 13 Lines |
I would rather initialize the svnrepo to None in the constructor then test again self.svnrepo here.