diff --git a/README.md b/README.md --- a/README.md +++ b/README.md @@ -48,9 +48,12 @@ Configuration ------------- -You can run the loader or the updater directly by calling: +You can run the loader from a remote origin (*loader*) or from an +origin on disk (*from_disk*) directly by calling: + + ``` -python3 -m swh.loader.git.{loader,updater} +python3 -m swh.loader.git.{loader,from_disk} ``` ### Location @@ -66,7 +69,8 @@ ### Configuration sample -$SWH_CONFIG_PATH/loader/git-{loader,updater}.yml: +Respectively the loader from a remote (`git.yml`) and the loader from +a disk (`git-disk.yml`), $SWH_CONFIG_PATH/loader/git{-disk}.yml: ``` storage: cls: remote diff --git a/swh/loader/git/loader.py b/swh/loader/git/from_disk.py copy from swh/loader/git/loader.py copy to swh/loader/git/from_disk.py --- a/swh/loader/git/loader.py +++ b/swh/loader/git/from_disk.py @@ -16,11 +16,12 @@ from . import converters, utils -class GitLoader(UnbufferedLoader): +class GitLoaderFromDisk(UnbufferedLoader): """Load a git repository from a directory. + """ - CONFIG_BASE_FILENAME = 'loader/git-loader' + CONFIG_BASE_FILENAME = 'loader/git-disk' def __init__(self, config=None): super().__init__(logging_class='swh.loader.git.Loader', config=config) @@ -254,7 +255,7 @@ return {'status': ('eventful' if eventful else 'uneventful')} -class GitLoaderFromArchive(GitLoader): +class GitLoaderFromArchive(GitLoaderFromDisk): """Load a git repository from an archive. This loader ingests a git repository compressed into an archive. @@ -316,8 +317,8 @@ def prepare(self, origin_url, archive_path, visit_date): """1. Uncompress the archive in temporary location. - 2. Prepare as the GitLoader does - 3. Load as GitLoader does + 2. Prepare as the GitLoaderFromDisk does + 3. Load as GitLoaderFromDisk does """ project_name = self.project_name_from_archive(archive_path) @@ -355,6 +356,6 @@ if not visit_date: visit_date = datetime.datetime.now(tz=datetime.timezone.utc) - return GitLoader().load(origin_url, git_directory, visit_date) + return GitLoaderFromDisk().load(origin_url, git_directory, visit_date) main() diff --git a/swh/loader/git/loader.py b/swh/loader/git/loader.py --- a/swh/loader/git/loader.py +++ b/swh/loader/git/loader.py @@ -1,164 +1,395 @@ -# Copyright (C) 2015-2018 The Software Heritage developers +# Copyright (C) 2016-2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import datetime -import dulwich.repo +import dulwich.client +import logging import os -import shutil +import pickle +import sys -from dulwich.errors import ObjectFormatException, EmptyFileException from collections import defaultdict +from io import BytesIO +from dulwich.object_store import ObjectStoreGraphWalker +from dulwich.pack import PackData, PackInflater from swh.model import hashutil from swh.loader.core.loader import UnbufferedLoader -from . import converters, utils +from swh.storage.algos.snapshot import snapshot_get_all_branches +from . import converters -class GitLoader(UnbufferedLoader): - """Load a git repository from a directory. - """ +class RepoRepresentation: + """Repository representation for a Software Heritage origin.""" + def __init__(self, storage, origin_id, base_snapshot=None, + ignore_history=False): + self.storage = storage - CONFIG_BASE_FILENAME = 'loader/git-loader' + self._parents_cache = {} + self._type_cache = {} - def __init__(self, config=None): - super().__init__(logging_class='swh.loader.git.Loader', config=config) + self.ignore_history = ignore_history - def _prepare_origin_visit(self, origin_url, visit_date): - self.origin_url = origin_url - self.origin = converters.origin_url_to_origin(self.origin_url) - self.visit_date = visit_date + if origin_id and not ignore_history: + self.heads = set(self._cache_heads(origin_id, base_snapshot)) + else: + self.heads = set() + + def _fill_parents_cache(self, commits): + """When querying for a commit's parents, we fill the cache to a depth of 1000 + commits.""" + root_revs = self._encode_for_storage(commits) + for rev, parents in self.storage.revision_shortlog(root_revs, 1000): + rev_id = hashutil.hash_to_bytehex(rev) + if rev_id not in self._parents_cache: + self._parents_cache[rev_id] = [ + hashutil.hash_to_bytehex(parent) for parent in parents + ] + for rev in commits: + if rev not in self._parents_cache: + self._parents_cache[rev] = [] + + def _cache_heads(self, origin_id, base_snapshot): + """Return all the known head commits for `origin_id`""" + _git_types = ['content', 'directory', 'revision', 'release'] + + if not base_snapshot: + return [] + + snapshot_targets = set() + for target in base_snapshot['branches'].values(): + if target and target['target_type'] in _git_types: + snapshot_targets.add(target['target']) + + decoded_targets = self._decode_from_storage(snapshot_targets) + + for id, objs in self.get_stored_objects(decoded_targets).items(): + if not objs: + logging.warn('Missing head: %s' % hashutil.hash_to_hex(id)) + return [] + + return decoded_targets + + def get_parents(self, commit): + """Bogus method to prevent expensive recursion, at the expense of less + efficient downloading""" + return [] + + def get_heads(self): + return self.heads + + @staticmethod + def _encode_for_storage(objects): + return [hashutil.bytehex_to_hash(object) for object in objects] + + @staticmethod + def _decode_from_storage(objects): + return set(hashutil.hash_to_bytehex(object) for object in objects) + + def graph_walker(self): + return ObjectStoreGraphWalker(self.get_heads(), self.get_parents) + + @staticmethod + def filter_unwanted_refs(refs): + """Filter the unwanted references from refs""" + ret = {} + for ref, val in refs.items(): + if ref.endswith(b'^{}'): + # Peeled refs make the git protocol explode + continue + elif ref.startswith(b'refs/pull/') and ref.endswith(b'/merge'): + # We filter-out auto-merged GitHub pull requests + continue + else: + ret[ref] = val - def prepare_origin_visit(self, origin_url, directory, visit_date): - self._prepare_origin_visit(origin_url, visit_date) + return ret - def prepare(self, origin_url, directory, visit_date): - self.repo = dulwich.repo.Repo(directory) + def determine_wants(self, refs): + """Filter the remote references to figure out which ones + Software Heritage needs. + """ + if not refs: + return [] - def iter_objects(self): - object_store = self.repo.object_store + # Find what objects Software Heritage has + refs = self.find_remote_ref_types_in_swh(refs) - for pack in object_store.packs: - objs = list(pack.index.iterentries()) - objs.sort(key=lambda x: x[1]) - for sha, offset, crc32 in objs: - yield hashutil.hash_to_bytehex(sha) + # Cache the objects found in swh as existing heads + for target in refs.values(): + if target['target_type'] is not None: + self.heads.add(target['target']) - yield from object_store._iter_loose_objects() - yield from object_store._iter_alternate_objects() + ret = set() + for target in self.filter_unwanted_refs(refs).values(): + if target['target_type'] is None: + # The target doesn't exist in Software Heritage, let's retrieve + # it. + ret.add(target['target']) - def _check(self, obj): - """Check the object's repository representation. + return list(ret) - If any errors in check exists, an ObjectFormatException is - raised. - - Args: - obj (object): Dulwich object read from the repository. + def get_stored_objects(self, objects): + """Find which of these objects were stored in the archive. + Do the request in packets to avoid a server timeout. + """ + if self.ignore_history: + return {} + + packet_size = 1000 + + ret = {} + query = [] + for object in objects: + query.append(object) + if len(query) >= packet_size: + ret.update( + self.storage.object_find_by_sha1_git( + self._encode_for_storage(query) + ) + ) + query = [] + if query: + ret.update( + self.storage.object_find_by_sha1_git( + self._encode_for_storage(query) + ) + ) + return ret + + def find_remote_ref_types_in_swh(self, remote_refs): + """Parse the remote refs information and list the objects that exist in + Software Heritage. """ - obj.check() - from dulwich.objects import Commit, Tag - try: - # For additional checks on dulwich objects with date - # for now, only checks on *time - if isinstance(obj, Commit): - commit_time = obj._commit_time - utils.check_date_time(commit_time) - author_time = obj._author_time - utils.check_date_time(author_time) - elif isinstance(obj, Tag): - tag_time = obj._tag_time - utils.check_date_time(tag_time) - except Exception as e: - raise ObjectFormatException(e) - - def get_object(self, oid): - """Given an object id, return the object if it is found and not - malformed in some way. - Args: - oid (bytes): the object's identifier + all_objs = set(remote_refs.values()) - set(self._type_cache) + type_by_id = {} + + for id, objs in self.get_stored_objects(all_objs).items(): + id = hashutil.hash_to_bytehex(id) + if objs: + type_by_id[id] = objs[0]['type'] + + self._type_cache.update(type_by_id) + + ret = {} + for ref, id in remote_refs.items(): + ret[ref] = { + 'target': id, + 'target_type': self._type_cache.get(id), + } + return ret - Returns: - The object if found without malformation + +class GitLoader(UnbufferedLoader): + """A bulk loader for a git repository""" + CONFIG_BASE_FILENAME = 'loader/git' + + ADDITIONAL_CONFIG = { + 'pack_size_bytes': ('int', 4 * 1024 * 1024 * 1024), + } + + def __init__(self, repo_representation=RepoRepresentation, config=None): + """Initialize the bulk updater. + + Args: + repo_representation: swh's repository representation + which is in charge of filtering between known and remote + data. """ - try: - # some errors are raised when reading the object - obj = self.repo[oid] - # some we need to check ourselves - self._check(obj) - except KeyError: - _id = oid.decode('utf-8') - self.log.warn('object %s not found, skipping' % _id, - extra={ - 'swh_type': 'swh_loader_git_missing_object', - 'swh_object_id': _id, - 'origin_id': self.origin_id, - }) - return None - except ObjectFormatException: - _id = oid.decode('utf-8') - self.log.warn('object %s malformed, skipping' % _id, - extra={ - 'swh_type': 'swh_loader_git_missing_object', - 'swh_object_id': _id, - 'origin_id': self.origin_id, - }) - return None - except EmptyFileException: - _id = oid.decode('utf-8') - self.log.warn('object %s corrupted (empty file), skipping' % _id, - extra={ - 'swh_type': 'swh_loader_git_missing_object', - 'swh_object_id': _id, - 'origin_id': self.origin_id, - }) + super().__init__(logging_class='swh.loader.git.BulkLoader', + config=config) + self.repo_representation = repo_representation + + def fetch_pack_from_origin(self, origin_url, base_origin_id, + base_snapshot, do_activity): + """Fetch a pack from the origin""" + pack_buffer = BytesIO() + + base_repo = self.repo_representation( + storage=self.storage, + origin_id=base_origin_id, + base_snapshot=base_snapshot, + ignore_history=self.ignore_history, + ) + + client, path = dulwich.client.get_transport_and_path(origin_url, + thin_packs=False) + + size_limit = self.config['pack_size_bytes'] + + def do_pack(data, + pack_buffer=pack_buffer, + limit=size_limit, + origin_url=origin_url): + cur_size = pack_buffer.tell() + would_write = len(data) + if cur_size + would_write > limit: + raise IOError('Pack file too big for repository %s, ' + 'limit is %d bytes, current size is %d, ' + 'would write %d' % + (origin_url, limit, cur_size, would_write)) + + pack_buffer.write(data) + + remote_refs = client.fetch_pack(path, + base_repo.determine_wants, + base_repo.graph_walker(), + do_pack, + progress=do_activity).refs + + if remote_refs: + local_refs = base_repo.find_remote_ref_types_in_swh(remote_refs) else: - return obj + local_refs = remote_refs = {} + + pack_buffer.flush() + pack_size = pack_buffer.tell() + pack_buffer.seek(0) + + return { + 'remote_refs': base_repo.filter_unwanted_refs(remote_refs), + 'local_refs': local_refs, + 'pack_buffer': pack_buffer, + 'pack_size': pack_size, + } + + def list_pack(self, pack_data, pack_size): + id_to_type = {} + type_to_ids = defaultdict(set) + + inflater = self.get_inflater() + + for obj in inflater: + type, id = obj.type_name, obj.id + id_to_type[id] = type + type_to_ids[type].add(id) + + return id_to_type, type_to_ids + + def prepare_origin_visit(self, origin_url, **kwargs): + self.visit_date = datetime.datetime.now(tz=datetime.timezone.utc) + self.origin = converters.origin_url_to_origin(origin_url) + + def get_full_snapshot(self, origin_id): + prev_snapshot = self.storage.snapshot_get_latest(origin_id) + if prev_snapshot and prev_snapshot.pop('next_branch', None): + return snapshot_get_all_branches(self.storage, prev_snapshot['id']) + + return prev_snapshot + + def prepare(self, origin_url, base_url=None, ignore_history=False): + base_origin_id = origin_id = self.origin_id + + prev_snapshot = None + + if not ignore_history: + prev_snapshot = self.get_full_snapshot(origin_id) + + if base_url and not prev_snapshot: + base_origin = converters.origin_url_to_origin(base_url) + base_origin = self.storage.origin_get(base_origin) + if base_origin: + base_origin_id = base_origin['id'] + prev_snapshot = self.get_full_snapshot(base_origin_id) + + self.base_snapshot = prev_snapshot + self.base_origin_id = base_origin_id + self.ignore_history = ignore_history def fetch_data(self): - """Fetch the data from the data source""" - self.previous_snapshot = self.storage.snapshot_get_latest( - self.origin_id - ) + def do_progress(msg): + sys.stderr.buffer.write(msg) + sys.stderr.flush() - type_to_ids = defaultdict(list) - for oid in self.iter_objects(): - obj = self.get_object(oid) - if not obj: - continue - type_name = obj.type_name - type_to_ids[type_name].append(oid) + fetch_info = self.fetch_pack_from_origin( + self.origin['url'], self.base_origin_id, self.base_snapshot, + do_progress) + + self.pack_buffer = fetch_info['pack_buffer'] + self.pack_size = fetch_info['pack_size'] + + self.remote_refs = fetch_info['remote_refs'] + self.local_refs = fetch_info['local_refs'] + + origin_url = self.origin['url'] + + self.log.info('Listed %d refs for repo %s' % ( + len(self.remote_refs), origin_url), extra={ + 'swh_type': 'git_repo_list_refs', + 'swh_repo': origin_url, + 'swh_num_refs': len(self.remote_refs), + }) + # We want to load the repository, walk all the objects + id_to_type, type_to_ids = self.list_pack(self.pack_buffer, + self.pack_size) + + self.id_to_type = id_to_type self.type_to_ids = type_to_ids + def save_data(self): + """Store a pack for archival""" + + write_size = 8192 + pack_dir = self.get_save_data_path() + + pack_name = "%s.pack" % self.visit_date.isoformat() + refs_name = "%s.refs" % self.visit_date.isoformat() + + with open(os.path.join(pack_dir, pack_name), 'xb') as f: + self.pack_buffer.seek(0) + while True: + r = self.pack_buffer.read(write_size) + if not r: + break + f.write(r) + + self.pack_buffer.seek(0) + + with open(os.path.join(pack_dir, refs_name), 'xb') as f: + pickle.dump(self.remote_refs, f) + + def get_inflater(self): + """Reset the pack buffer and get an object inflater from it""" + self.pack_buffer.seek(0) + return PackInflater.for_pack_data( + PackData.from_file(self.pack_buffer, self.pack_size)) + def has_contents(self): - """Checks whether we need to load contents""" return bool(self.type_to_ids[b'blob']) def get_content_ids(self): """Get the content identifiers from the git repository""" - for oid in self.type_to_ids[b'blob']: - yield converters.dulwich_blob_to_content_id(self.repo[oid]) + for raw_obj in self.get_inflater(): + if raw_obj.type_name != b'blob': + continue + + yield converters.dulwich_blob_to_content_id(raw_obj) def get_contents(self): - """Get the contents that need to be loaded""" + """Format the blobs from the git repository as swh contents""" max_content_size = self.config['content_size_limit'] missing_contents = set(self.storage.content_missing( self.get_content_ids(), 'sha1_git')) - for oid in missing_contents: + for raw_obj in self.get_inflater(): + if raw_obj.type_name != b'blob': + continue + + if raw_obj.sha().digest() not in missing_contents: + continue + yield converters.dulwich_blob_to_content( - self.repo[hashutil.hash_to_bytehex(oid)], log=self.log, - max_content_size=max_content_size, + raw_obj, log=self.log, max_content_size=max_content_size, origin_id=self.origin_id) def has_directories(self): - """Checks whether we need to load directories""" return bool(self.type_to_ids[b'tree']) def get_directory_ids(self): @@ -167,16 +398,20 @@ for id in self.type_to_ids[b'tree']) def get_directories(self): - """Get the directories that need to be loaded""" + """Format the trees as swh directories""" missing_dirs = set(self.storage.directory_missing( sorted(self.get_directory_ids()))) - for oid in missing_dirs: - yield converters.dulwich_tree_to_directory( - self.repo[hashutil.hash_to_bytehex(oid)], log=self.log) + for raw_obj in self.get_inflater(): + if raw_obj.type_name != b'tree': + continue + + if raw_obj.sha().digest() not in missing_dirs: + continue + + yield converters.dulwich_tree_to_directory(raw_obj, log=self.log) def has_revisions(self): - """Checks whether we need to load revisions""" return bool(self.type_to_ids[b'commit']) def get_revision_ids(self): @@ -185,16 +420,20 @@ for id in self.type_to_ids[b'commit']) def get_revisions(self): - """Get the revisions that need to be loaded""" + """Format commits as swh revisions""" missing_revs = set(self.storage.revision_missing( sorted(self.get_revision_ids()))) - for oid in missing_revs: - yield converters.dulwich_commit_to_revision( - self.repo[hashutil.hash_to_bytehex(oid)], log=self.log) + for raw_obj in self.get_inflater(): + if raw_obj.type_name != b'commit': + continue + + if raw_obj.sha().digest() not in missing_revs: + continue + + yield converters.dulwich_commit_to_revision(raw_obj, log=self.log) def has_releases(self): - """Checks whether we need to load releases""" return bool(self.type_to_ids[b'tag']) def get_release_ids(self): @@ -203,33 +442,36 @@ for id in self.type_to_ids[b'tag']) def get_releases(self): - """Get the releases that need to be loaded""" + """Retrieve all the release objects from the git repository""" missing_rels = set(self.storage.release_missing( sorted(self.get_release_ids()))) - for oid in missing_rels: - yield converters.dulwich_tag_to_release( - self.repo[hashutil.hash_to_bytehex(oid)], log=self.log) + for raw_obj in self.get_inflater(): + if raw_obj.type_name != b'tag': + continue + + if raw_obj.sha().digest() not in missing_rels: + continue + + yield converters.dulwich_tag_to_release(raw_obj, log=self.log) def get_snapshot(self): - """Turn the list of branches into a snapshot to load""" branches = {} - for ref, target in self.repo.refs.as_dict().items(): - obj = self.get_object(target) - if obj: - branches[ref] = { - 'target': hashutil.bytehex_to_hash(target), - 'target_type': converters.DULWICH_TYPES[obj.type_name], - } - else: - branches[ref] = None + for ref in self.remote_refs: + ret_ref = self.local_refs[ref].copy() + if not ret_ref['target_type']: + target_type = self.id_to_type[ret_ref['target']] + ret_ref['target_type'] = converters.DULWICH_TYPES[target_type] + + ret_ref['target'] = hashutil.bytehex_to_hash(ret_ref['target']) + + branches[ref] = ret_ref self.snapshot = converters.branches_to_snapshot(branches) return self.snapshot def get_fetch_history_result(self): - """Return the data to store in fetch_history for the current loader""" return { 'contents': len(self.type_to_ids[b'blob']), 'directories': len(self.type_to_ids[b'tree']), @@ -237,110 +479,21 @@ 'releases': len(self.type_to_ids[b'tag']), } - def save_data(self): - """We already have the data locally, no need to save it""" - pass - def load_status(self): - """The load was eventful if the current occurrences are different to - the ones we retrieved at the beginning of the run""" + """The load was eventful if the current snapshot is different to + the one we retrieved at the beginning of the run""" eventful = False - if self.previous_snapshot: - eventful = self.snapshot['id'] != self.previous_snapshot['id'] + if self.base_snapshot: + eventful = self.snapshot['id'] != self.base_snapshot['id'] else: eventful = bool(self.snapshot['branches']) return {'status': ('eventful' if eventful else 'uneventful')} -class GitLoaderFromArchive(GitLoader): - """Load a git repository from an archive. - - This loader ingests a git repository compressed into an archive. - The supported archive formats are ``.zip`` and ``.tar.gz``. - - From an input tarball named ``my-git-repo.zip``, the following layout is - expected in it:: - - my-git-repo/ - ├── .git - │ ├── branches - │ ├── COMMIT_EDITMSG - │ ├── config - │ ├── description - │ ├── HEAD - ... - - Nevertheless, the loader is able to ingest tarballs with the following - layouts too:: - - . - ├── .git - │ ├── branches - │ ├── COMMIT_EDITMSG - │ ├── config - │ ├── description - │ ├── HEAD - ... - - or:: - - other-repo-name/ - ├── .git - │ ├── branches - │ ├── COMMIT_EDITMSG - │ ├── config - │ ├── description - │ ├── HEAD - ... - - """ - def __init__(self, *args, **kwargs): - super().__init__(*args, **kwargs) - self.temp_dir = self.repo_path = None - - def project_name_from_archive(self, archive_path): - """Compute the project name from the archive's path. - - """ - archive_name = os.path.basename(archive_path) - for ext in ('.zip', '.tar.gz', '.tgz'): - if archive_name.lower().endswith(ext): - archive_name = archive_name[:-len(ext)] - break - return archive_name - - def prepare_origin_visit(self, origin_url, archive_path, visit_date): - self._prepare_origin_visit(origin_url, visit_date) - - def prepare(self, origin_url, archive_path, visit_date): - """1. Uncompress the archive in temporary location. - 2. Prepare as the GitLoader does - 3. Load as GitLoader does - - """ - project_name = self.project_name_from_archive(archive_path) - self.temp_dir, self.repo_path = utils.init_git_repo_from_archive( - project_name, archive_path) - - self.log.info('Project %s - Uncompressing archive %s at %s' % ( - origin_url, os.path.basename(archive_path), self.repo_path)) - super().prepare(origin_url, self.repo_path, visit_date) - - def cleanup(self): - """Cleanup the temporary location (if it exists). - - """ - if self.temp_dir and os.path.exists(self.temp_dir): - shutil.rmtree(self.temp_dir) - self.log.info('Project %s - Done injecting %s' % ( - self.origin_url, self.repo_path)) - - if __name__ == '__main__': import click - import logging logging.basicConfig( level=logging.DEBUG, @@ -348,13 +501,15 @@ ) @click.command() - @click.option('--origin-url', help='origin url') - @click.option('--git-directory', help='Path to git repository to load') - @click.option('--visit-date', default=None, help='Visit date') - def main(origin_url, git_directory, visit_date): - if not visit_date: - visit_date = datetime.datetime.now(tz=datetime.timezone.utc) - - return GitLoader().load(origin_url, git_directory, visit_date) + @click.option('--origin-url', help='Origin url', required=True) + @click.option('--base-url', default=None, help='Optional Base url') + @click.option('--ignore-history/--no-ignore-history', + help='Ignore the repository history', default=False) + def main(origin_url, base_url, ignore_history): + return GitLoader().load( + origin_url, + base_url=base_url, + ignore_history=ignore_history, + ) main() diff --git a/swh/loader/git/tasks.py b/swh/loader/git/tasks.py --- a/swh/loader/git/tasks.py +++ b/swh/loader/git/tasks.py @@ -1,4 +1,4 @@ -# Copyright (C) 2015-2017 The Software Heritage developers +# Copyright (C) 2015-2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information @@ -7,8 +7,8 @@ from swh.scheduler.task import Task -from .loader import GitLoader, GitLoaderFromArchive -from .updater import BulkUpdater +from .from_disk import GitLoaderFromDisk, GitLoaderFromArchive +from .loader import GitLoader # TODO: rename to LoadRemoteGitRepository @@ -18,7 +18,7 @@ def run_task(self, repo_url, base_url=None): """Import a git repository""" - loader = BulkUpdater() + loader = GitLoader() loader.log = self.log return loader.load(repo_url, base_url=base_url) @@ -32,7 +32,7 @@ """Import a git repository, cloned in `directory` from `origin_url` at `date`.""" - loader = GitLoader() + loader = GitLoaderFromDisk() loader.log = self.log return loader.load(origin_url, directory, dateutil.parser.parse(date)) diff --git a/swh/loader/git/tests/test_loader.py b/swh/loader/git/tests/test_from_disk.py copy from swh/loader/git/tests/test_loader.py copy to swh/loader/git/tests/test_from_disk.py --- a/swh/loader/git/tests/test_loader.py +++ b/swh/loader/git/tests/test_from_disk.py @@ -6,7 +6,8 @@ import os.path import subprocess -from swh.loader.git.loader import GitLoader, GitLoaderFromArchive + +from swh.loader.git.from_disk import GitLoaderFromDisk, GitLoaderFromArchive from swh.loader.core.tests import BaseLoaderTest from . import TEST_LOADER_CONFIG @@ -93,7 +94,7 @@ } -class BaseGitLoaderTest(BaseLoaderTest): +class BaseGitLoaderFromDiskTest(BaseLoaderTest): def setUp(self, archive_name, uncompress_archive, filename='testrepo'): super().setUp(archive_name=archive_name, filename=filename, prefix_tmp_folder_name='swh.loader.git.', @@ -101,12 +102,12 @@ uncompress_archive=uncompress_archive) -class GitLoaderTest(GitLoader): +class GitLoaderFromDiskTest(GitLoaderFromDisk): def parse_config_file(self, *args, **kwargs): return TEST_LOADER_CONFIG -class BaseDirGitLoaderTest(BaseGitLoaderTest): +class BaseDirGitLoaderFromDiskTest(BaseGitLoaderFromDiskTest): """Mixin base loader test to prepare the git repository to uncompress, load and test the results. @@ -115,7 +116,7 @@ """ def setUp(self): super().setUp('testrepo.tgz', uncompress_archive=True) - self.loader = GitLoaderTest() + self.loader = GitLoaderFromDiskTest() self.storage = self.loader.storage def load(self): @@ -125,7 +126,7 @@ directory=self.destination_path) -class BaseGitLoaderFromArchiveTest(BaseGitLoaderTest): +class BaseGitLoaderFromArchiveTest(BaseGitLoaderFromDiskTest): """Mixin base loader test to prepare the git repository to uncompress, load and test the results. @@ -144,7 +145,7 @@ archive_path=self.destination_path) -class GitLoaderTests: +class GitLoaderFromDiskTests: """Common tests for all git loaders.""" def test_load(self): """Loads a simple repository (made available by `setUp()`), @@ -176,8 +177,8 @@ self.assertCountSnapshots(1) -class DirGitLoaderTest(BaseDirGitLoaderTest, GitLoaderTests): - """Tests for the GitLoader. Includes the common ones, and +class DirGitLoaderTest(BaseDirGitLoaderFromDiskTest, GitLoaderFromDiskTests): + """Tests for the GitLoaderFromDisk. Includes the common ones, and add others that only work with a local dir.""" def _git(self, *cmd): @@ -255,7 +256,8 @@ self.assertEqual(self.loader.visit_status(), 'full') -class GitLoaderFromArchiveTest(BaseGitLoaderFromArchiveTest, GitLoaderTests): +class GitLoaderFromArchiveTest(BaseGitLoaderFromArchiveTest, + GitLoaderFromDiskTests): """Tests for GitLoaderFromArchive. Imports the common ones from GitLoaderTests.""" pass diff --git a/swh/loader/git/tests/test_loader.py b/swh/loader/git/tests/test_loader.py --- a/swh/loader/git/tests/test_loader.py +++ b/swh/loader/git/tests/test_loader.py @@ -3,259 +3,26 @@ # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information -import os.path -import subprocess -from swh.loader.git.loader import GitLoader, GitLoaderFromArchive -from swh.loader.core.tests import BaseLoaderTest - -from . import TEST_LOADER_CONFIG - - -class GitLoaderFromArchive(GitLoaderFromArchive): - def project_name_from_archive(self, archive_path): - # We don't want the project name to be 'resources'. - return 'testrepo' - - def parse_config_file(self, *args, **kwargs): - return TEST_LOADER_CONFIG - - -CONTENT1 = { - '33ab5639bfd8e7b95eb1d8d0b87781d4ffea4d5d', # README v1 - '349c4ff7d21f1ec0eda26f3d9284c293e3425417', # README v2 - '799c11e348d39f1704022b8354502e2f81f3c037', # file1.txt - '4bdb40dfd6ec75cb730e678b5d7786e30170c5fb', # file2.txt -} - -SNAPSHOT_ID = 'bdf3b06d6017e0d9ad6447a73da6ff1ae9efb8f0' - -SNAPSHOT1 = { - 'id': SNAPSHOT_ID, - 'branches': { - 'HEAD': { - 'target': '2f01f5ca7e391a2f08905990277faf81e709a649', - 'target_type': 'revision', - }, - 'refs/heads/master': { - 'target': '2f01f5ca7e391a2f08905990277faf81e709a649', - 'target_type': 'revision', - }, - 'refs/heads/branch1': { - 'target': 'b0a77609903f767a2fd3d769904ef9ef68468b87', - 'target_type': 'revision', - }, - 'refs/heads/branch2': { - 'target': 'bd746cd1913721b269b395a56a97baf6755151c2', - 'target_type': 'revision', - }, - 'refs/tags/branch2-after-delete': { - 'target': 'bd746cd1913721b269b395a56a97baf6755151c2', - 'target_type': 'revision', - }, - 'refs/tags/branch2-before-delete': { - 'target': '1135e94ccf73b5f9bd6ef07b3fa2c5cc60bba69b', - 'target_type': 'revision', - }, - }, -} - -# directory hashes obtained with: -# gco b6f40292c4e94a8f7e7b4aff50e6c7429ab98e2a -# swh-hashtree --ignore '.git' --path . -# gco 2f01f5ca7e391a2f08905990277faf81e709a649 -# swh-hashtree --ignore '.git' --path . -# gco bcdc5ebfde1a3cd6c96e0c2ea4eed19c13208777 -# swh-hashtree --ignore '.git' --path . -# gco 1135e94ccf73b5f9bd6ef07b3fa2c5cc60bba69b -# swh-hashtree --ignore '.git' --path . -# gco 79f65ac75f79dda6ff03d66e1242702ab67fb51c -# swh-hashtree --ignore '.git' --path . -# gco b0a77609903f767a2fd3d769904ef9ef68468b87 -# swh-hashtree --ignore '.git' --path . -# gco bd746cd1913721b269b395a56a97baf6755151c2 -# swh-hashtree --ignore '.git' --path . -REVISIONS1 = { - 'b6f40292c4e94a8f7e7b4aff50e6c7429ab98e2a': - '40dbdf55dfd4065422462cc74a949254aefa972e', - '2f01f5ca7e391a2f08905990277faf81e709a649': - 'e1d0d894835f91a0f887a4bc8b16f81feefdfbd5', - 'bcdc5ebfde1a3cd6c96e0c2ea4eed19c13208777': - 'b43724545b4759244bb54be053c690649161411c', - '1135e94ccf73b5f9bd6ef07b3fa2c5cc60bba69b': - 'fbf70528223d263661b5ad4b80f26caf3860eb8e', - '79f65ac75f79dda6ff03d66e1242702ab67fb51c': - '5df34ec74d6f69072d9a0a6677d8efbed9b12e60', - 'b0a77609903f767a2fd3d769904ef9ef68468b87': - '9ca0c7d6ffa3f9f0de59fd7912e08f11308a1338', - 'bd746cd1913721b269b395a56a97baf6755151c2': - 'e1d0d894835f91a0f887a4bc8b16f81feefdfbd5', -} - - -class BaseGitLoaderTest(BaseLoaderTest): - def setUp(self, archive_name, uncompress_archive, filename='testrepo'): - super().setUp(archive_name=archive_name, filename=filename, - prefix_tmp_folder_name='swh.loader.git.', - start_path=os.path.dirname(__file__), - uncompress_archive=uncompress_archive) +from swh.loader.git.loader import GitLoader +from swh.loader.git.tests.test_from_disk import DirGitLoaderTest class GitLoaderTest(GitLoader): def parse_config_file(self, *args, **kwargs): - return TEST_LOADER_CONFIG - - -class BaseDirGitLoaderTest(BaseGitLoaderTest): - """Mixin base loader test to prepare the git - repository to uncompress, load and test the results. + return { + **super().parse_config_file(*args, **kwargs), + 'storage': {'cls': 'memory', 'args': {}} + } - This sets up - """ +class TestGitLoader(DirGitLoaderTest): + """Same tests as for the GitLoaderFromDisk, but running on GitLoader.""" def setUp(self): - super().setUp('testrepo.tgz', uncompress_archive=True) + super().setUp() self.loader = GitLoaderTest() self.storage = self.loader.storage def load(self): return self.loader.load( - origin_url=self.repo_url, - visit_date='2016-05-03 15:16:32+00', - directory=self.destination_path) - - -class BaseGitLoaderFromArchiveTest(BaseGitLoaderTest): - """Mixin base loader test to prepare the git - repository to uncompress, load and test the results. - - This sets up - - """ - def setUp(self): - super().setUp('testrepo.tgz', uncompress_archive=False) - self.loader = GitLoaderFromArchive() - self.storage = self.loader.storage - - def load(self): - return self.loader.load( - origin_url=self.repo_url, - visit_date='2016-05-03 15:16:32+00', - archive_path=self.destination_path) - - -class GitLoaderTests: - """Common tests for all git loaders.""" - def test_load(self): - """Loads a simple repository (made available by `setUp()`), - and checks everything was added in the storage.""" - res = self.load() - self.assertEqual(res['status'], 'eventful', res) - - self.assertContentsContain(CONTENT1) - self.assertCountDirectories(7) - self.assertCountReleases(0) # FIXME: why not 2? - self.assertCountRevisions(7) - self.assertCountSnapshots(1) - - self.assertRevisionsContain(REVISIONS1) - - self.assertSnapshotEqual(SNAPSHOT1) - - self.assertEqual(self.loader.load_status(), {'status': 'eventful'}) - self.assertEqual(self.loader.visit_status(), 'full') - - def test_load_unchanged(self): - """Checks loading a repository a second time does not add - any extra data.""" - res = self.load() - self.assertEqual(res['status'], 'eventful') - - res = self.load() - self.assertEqual(res['status'], 'uneventful') - self.assertCountSnapshots(1) - - -class DirGitLoaderTest(BaseDirGitLoaderTest, GitLoaderTests): - """Tests for the GitLoader. Includes the common ones, and - add others that only work with a local dir.""" - - def _git(self, *cmd): - """Small wrapper around subprocess to call Git.""" - try: - return subprocess.check_output( - ['git', '-C', self.destination_path] + list(cmd)) - except subprocess.CalledProcessError as e: - print(e.output) - print(e.stderr) - raise - - def test_load_changed(self): - """Loads a repository, makes some changes by adding files, commits, - and merges, load it again, and check the storage contains everything - it should.""" - # Initial load - res = self.load() - self.assertEqual(res['status'], 'eventful', res) - - self._git('config', '--local', 'user.email', 'you@example.com') - self._git('config', '--local', 'user.name', 'Your Name') - - # Load with a new file + revision - with open(os.path.join(self.destination_path, 'hello.py'), 'a') as fd: - fd.write("print('Hello world')\n") - - self._git('add', 'hello.py') - self._git('commit', '-m', 'Hello world') - new_revision = self._git('rev-parse', 'master').decode().strip() - - revisions = REVISIONS1.copy() - assert new_revision not in revisions - revisions[new_revision] = '85dae072a5aa9923ffa7a7568f819ff21bf49858' - - res = self.load() - self.assertEqual(res['status'], 'eventful') - - self.assertCountContents(4 + 1) - self.assertCountDirectories(7 + 1) - self.assertCountReleases(0) # FIXME: why not 2? - self.assertCountRevisions(7 + 1) - self.assertCountSnapshots(1 + 1) - - self.assertRevisionsContain(revisions) - - # TODO: how to check the snapshot id? - # self.assertSnapshotEqual(SNAPSHOT1) - - self.assertEqual(self.loader.load_status(), {'status': 'eventful'}) - self.assertEqual(self.loader.visit_status(), 'full') - - # Load with a new merge - self._git('merge', 'branch1', '-m', 'merge') - new_revision = self._git('rev-parse', 'master').decode().strip() - - assert new_revision not in revisions - revisions[new_revision] = 'dab8a37df8db8666d4e277bef9a546f585b5bedd' - - res = self.load() - self.assertEqual(res['status'], 'eventful') - - self.assertCountContents(4 + 1) - self.assertCountDirectories(7 + 2) - self.assertCountReleases(0) # FIXME: why not 2? - self.assertCountRevisions(7 + 2) - self.assertCountSnapshots(1 + 1 + 1) - - self.assertRevisionsContain(revisions) - - # TODO: how to check the snapshot id? - # self.assertSnapshotEqual(SNAPSHOT1) - - self.assertEqual(self.loader.load_status(), {'status': 'eventful'}) - self.assertEqual(self.loader.visit_status(), 'full') - - -class GitLoaderFromArchiveTest(BaseGitLoaderFromArchiveTest, GitLoaderTests): - """Tests for GitLoaderFromArchive. Imports the common ones - from GitLoaderTests.""" - pass + origin_url=self.repo_url) diff --git a/swh/loader/git/tests/test_tasks.py b/swh/loader/git/tests/test_tasks.py --- a/swh/loader/git/tests/test_tasks.py +++ b/swh/loader/git/tests/test_tasks.py @@ -18,7 +18,7 @@ task = UpdateGitRepository() self.assertEqual(task.task_queue, 'swh_loader_git') - @patch('swh.loader.git.updater.BulkUpdater.load') + @patch('swh.loader.git.loader.GitLoader.load') def test_task(self, mock_loader): mock_loader.return_value = {'status': 'eventful'} task = UpdateGitRepository() @@ -36,7 +36,7 @@ task = LoadDiskGitRepository() self.assertEqual(task.task_queue, 'swh_loader_git_express') - @patch('swh.loader.git.loader.GitLoader.load') + @patch('swh.loader.git.from_disk.GitLoaderFromDisk.load') def test_task(self, mock_loader): mock_loader.return_value = {'status': 'uneventful'} task = LoadDiskGitRepository() @@ -56,7 +56,7 @@ task = UncompressAndLoadDiskGitRepository() self.assertEqual(task.task_queue, 'swh_loader_git_archive') - @patch('swh.loader.git.loader.GitLoaderFromArchive.load') + @patch('swh.loader.git.from_disk.GitLoaderFromArchive.load') def test_task(self, mock_loader): mock_loader.return_value = {'status': 'failed'} task = UncompressAndLoadDiskGitRepository() diff --git a/swh/loader/git/tests/test_updater.py b/swh/loader/git/tests/test_updater.py deleted file mode 100644 --- a/swh/loader/git/tests/test_updater.py +++ /dev/null @@ -1,22 +0,0 @@ -from swh.loader.git.updater import BulkUpdater -from swh.loader.git.tests.test_loader import DirGitLoaderTest - - -class BulkUpdaterTest(BulkUpdater): - def parse_config_file(self, *args, **kwargs): - return { - **super().parse_config_file(*args, **kwargs), - 'storage': {'cls': 'memory', 'args': {}} - } - - -class TestBulkUpdater(DirGitLoaderTest): - """Same tests as for the GitLoader, but running on BulkUpdater.""" - def setUp(self): - super().setUp() - self.loader = BulkUpdaterTest() - self.storage = self.loader.storage - - def load(self): - return self.loader.load( - origin_url=self.repo_url) diff --git a/swh/loader/git/updater.py b/swh/loader/git/updater.py deleted file mode 100644 --- a/swh/loader/git/updater.py +++ /dev/null @@ -1,515 +0,0 @@ -# Copyright (C) 2016-2018 The Software Heritage developers -# See the AUTHORS file at the top-level directory of this distribution -# License: GNU General Public License version 3, or any later version -# See top-level LICENSE file for more information - -import datetime -import dulwich.client -import logging -import os -import pickle -import sys - -from collections import defaultdict -from io import BytesIO -from dulwich.object_store import ObjectStoreGraphWalker -from dulwich.pack import PackData, PackInflater - -from swh.model import hashutil -from swh.loader.core.loader import UnbufferedLoader -from swh.storage.algos.snapshot import snapshot_get_all_branches -from . import converters - - -class SWHRepoRepresentation: - """Repository representation for a Software Heritage origin.""" - def __init__(self, storage, origin_id, base_snapshot=None, - ignore_history=False): - self.storage = storage - - self._parents_cache = {} - self._type_cache = {} - - self.ignore_history = ignore_history - - if origin_id and not ignore_history: - self.heads = set(self._cache_heads(origin_id, base_snapshot)) - else: - self.heads = set() - - def _fill_parents_cache(self, commits): - """When querying for a commit's parents, we fill the cache to a depth of 1000 - commits.""" - root_revs = self._encode_for_storage(commits) - for rev, parents in self.storage.revision_shortlog(root_revs, 1000): - rev_id = hashutil.hash_to_bytehex(rev) - if rev_id not in self._parents_cache: - self._parents_cache[rev_id] = [ - hashutil.hash_to_bytehex(parent) for parent in parents - ] - for rev in commits: - if rev not in self._parents_cache: - self._parents_cache[rev] = [] - - def _cache_heads(self, origin_id, base_snapshot): - """Return all the known head commits for `origin_id`""" - _git_types = ['content', 'directory', 'revision', 'release'] - - if not base_snapshot: - return [] - - snapshot_targets = set() - for target in base_snapshot['branches'].values(): - if target and target['target_type'] in _git_types: - snapshot_targets.add(target['target']) - - decoded_targets = self._decode_from_storage(snapshot_targets) - - for id, objs in self.get_stored_objects(decoded_targets).items(): - if not objs: - logging.warn('Missing head: %s' % hashutil.hash_to_hex(id)) - return [] - - return decoded_targets - - def get_parents(self, commit): - """Bogus method to prevent expensive recursion, at the expense of less - efficient downloading""" - return [] - - def get_heads(self): - return self.heads - - @staticmethod - def _encode_for_storage(objects): - return [hashutil.bytehex_to_hash(object) for object in objects] - - @staticmethod - def _decode_from_storage(objects): - return set(hashutil.hash_to_bytehex(object) for object in objects) - - def graph_walker(self): - return ObjectStoreGraphWalker(self.get_heads(), self.get_parents) - - @staticmethod - def filter_unwanted_refs(refs): - """Filter the unwanted references from refs""" - ret = {} - for ref, val in refs.items(): - if ref.endswith(b'^{}'): - # Peeled refs make the git protocol explode - continue - elif ref.startswith(b'refs/pull/') and ref.endswith(b'/merge'): - # We filter-out auto-merged GitHub pull requests - continue - else: - ret[ref] = val - - return ret - - def determine_wants(self, refs): - """Filter the remote references to figure out which ones - Software Heritage needs. - """ - if not refs: - return [] - - # Find what objects Software Heritage has - refs = self.find_remote_ref_types_in_swh(refs) - - # Cache the objects found in swh as existing heads - for target in refs.values(): - if target['target_type'] is not None: - self.heads.add(target['target']) - - ret = set() - for target in self.filter_unwanted_refs(refs).values(): - if target['target_type'] is None: - # The target doesn't exist in Software Heritage, let's retrieve - # it. - ret.add(target['target']) - - return list(ret) - - def get_stored_objects(self, objects): - """Find which of these objects were stored in the archive. - - Do the request in packets to avoid a server timeout. - """ - if self.ignore_history: - return {} - - packet_size = 1000 - - ret = {} - query = [] - for object in objects: - query.append(object) - if len(query) >= packet_size: - ret.update( - self.storage.object_find_by_sha1_git( - self._encode_for_storage(query) - ) - ) - query = [] - if query: - ret.update( - self.storage.object_find_by_sha1_git( - self._encode_for_storage(query) - ) - ) - return ret - - def find_remote_ref_types_in_swh(self, remote_refs): - """Parse the remote refs information and list the objects that exist in - Software Heritage. - """ - - all_objs = set(remote_refs.values()) - set(self._type_cache) - type_by_id = {} - - for id, objs in self.get_stored_objects(all_objs).items(): - id = hashutil.hash_to_bytehex(id) - if objs: - type_by_id[id] = objs[0]['type'] - - self._type_cache.update(type_by_id) - - ret = {} - for ref, id in remote_refs.items(): - ret[ref] = { - 'target': id, - 'target_type': self._type_cache.get(id), - } - return ret - - -class BulkUpdater(UnbufferedLoader): - """A bulk loader for a git repository""" - CONFIG_BASE_FILENAME = 'loader/git-updater' - - ADDITIONAL_CONFIG = { - 'pack_size_bytes': ('int', 4 * 1024 * 1024 * 1024), - } - - def __init__(self, repo_representation=SWHRepoRepresentation, config=None): - """Initialize the bulk updater. - - Args: - repo_representation: swh's repository representation - which is in charge of filtering between known and remote - data. - - """ - super().__init__(logging_class='swh.loader.git.BulkLoader', - config=config) - self.repo_representation = repo_representation - - def fetch_pack_from_origin(self, origin_url, base_origin_id, - base_snapshot, do_activity): - """Fetch a pack from the origin""" - pack_buffer = BytesIO() - - base_repo = self.repo_representation( - storage=self.storage, - origin_id=base_origin_id, - base_snapshot=base_snapshot, - ignore_history=self.ignore_history, - ) - - client, path = dulwich.client.get_transport_and_path(origin_url, - thin_packs=False) - - size_limit = self.config['pack_size_bytes'] - - def do_pack(data, - pack_buffer=pack_buffer, - limit=size_limit, - origin_url=origin_url): - cur_size = pack_buffer.tell() - would_write = len(data) - if cur_size + would_write > limit: - raise IOError('Pack file too big for repository %s, ' - 'limit is %d bytes, current size is %d, ' - 'would write %d' % - (origin_url, limit, cur_size, would_write)) - - pack_buffer.write(data) - - remote_refs = client.fetch_pack(path, - base_repo.determine_wants, - base_repo.graph_walker(), - do_pack, - progress=do_activity).refs - - if remote_refs: - local_refs = base_repo.find_remote_ref_types_in_swh(remote_refs) - else: - local_refs = remote_refs = {} - - pack_buffer.flush() - pack_size = pack_buffer.tell() - pack_buffer.seek(0) - - return { - 'remote_refs': base_repo.filter_unwanted_refs(remote_refs), - 'local_refs': local_refs, - 'pack_buffer': pack_buffer, - 'pack_size': pack_size, - } - - def list_pack(self, pack_data, pack_size): - id_to_type = {} - type_to_ids = defaultdict(set) - - inflater = self.get_inflater() - - for obj in inflater: - type, id = obj.type_name, obj.id - id_to_type[id] = type - type_to_ids[type].add(id) - - return id_to_type, type_to_ids - - def prepare_origin_visit(self, origin_url, **kwargs): - self.visit_date = datetime.datetime.now(tz=datetime.timezone.utc) - self.origin = converters.origin_url_to_origin(origin_url) - - def get_full_snapshot(self, origin_id): - prev_snapshot = self.storage.snapshot_get_latest(origin_id) - if prev_snapshot and prev_snapshot.pop('next_branch', None): - return snapshot_get_all_branches(self.storage, prev_snapshot['id']) - - return prev_snapshot - - def prepare(self, origin_url, base_url=None, ignore_history=False): - base_origin_id = origin_id = self.origin_id - - prev_snapshot = None - - if not ignore_history: - prev_snapshot = self.get_full_snapshot(origin_id) - - if base_url and not prev_snapshot: - base_origin = converters.origin_url_to_origin(base_url) - base_origin = self.storage.origin_get(base_origin) - if base_origin: - base_origin_id = base_origin['id'] - prev_snapshot = self.get_full_snapshot(base_origin_id) - - self.base_snapshot = prev_snapshot - self.base_origin_id = base_origin_id - self.ignore_history = ignore_history - - def fetch_data(self): - def do_progress(msg): - sys.stderr.buffer.write(msg) - sys.stderr.flush() - - fetch_info = self.fetch_pack_from_origin( - self.origin['url'], self.base_origin_id, self.base_snapshot, - do_progress) - - self.pack_buffer = fetch_info['pack_buffer'] - self.pack_size = fetch_info['pack_size'] - - self.remote_refs = fetch_info['remote_refs'] - self.local_refs = fetch_info['local_refs'] - - origin_url = self.origin['url'] - - self.log.info('Listed %d refs for repo %s' % ( - len(self.remote_refs), origin_url), extra={ - 'swh_type': 'git_repo_list_refs', - 'swh_repo': origin_url, - 'swh_num_refs': len(self.remote_refs), - }) - - # We want to load the repository, walk all the objects - id_to_type, type_to_ids = self.list_pack(self.pack_buffer, - self.pack_size) - - self.id_to_type = id_to_type - self.type_to_ids = type_to_ids - - def save_data(self): - """Store a pack for archival""" - - write_size = 8192 - pack_dir = self.get_save_data_path() - - pack_name = "%s.pack" % self.visit_date.isoformat() - refs_name = "%s.refs" % self.visit_date.isoformat() - - with open(os.path.join(pack_dir, pack_name), 'xb') as f: - self.pack_buffer.seek(0) - while True: - r = self.pack_buffer.read(write_size) - if not r: - break - f.write(r) - - self.pack_buffer.seek(0) - - with open(os.path.join(pack_dir, refs_name), 'xb') as f: - pickle.dump(self.remote_refs, f) - - def get_inflater(self): - """Reset the pack buffer and get an object inflater from it""" - self.pack_buffer.seek(0) - return PackInflater.for_pack_data( - PackData.from_file(self.pack_buffer, self.pack_size)) - - def has_contents(self): - return bool(self.type_to_ids[b'blob']) - - def get_content_ids(self): - """Get the content identifiers from the git repository""" - for raw_obj in self.get_inflater(): - if raw_obj.type_name != b'blob': - continue - - yield converters.dulwich_blob_to_content_id(raw_obj) - - def get_contents(self): - """Format the blobs from the git repository as swh contents""" - max_content_size = self.config['content_size_limit'] - - missing_contents = set(self.storage.content_missing( - self.get_content_ids(), 'sha1_git')) - - for raw_obj in self.get_inflater(): - if raw_obj.type_name != b'blob': - continue - - if raw_obj.sha().digest() not in missing_contents: - continue - - yield converters.dulwich_blob_to_content( - raw_obj, log=self.log, max_content_size=max_content_size, - origin_id=self.origin_id) - - def has_directories(self): - return bool(self.type_to_ids[b'tree']) - - def get_directory_ids(self): - """Get the directory identifiers from the git repository""" - return (hashutil.hash_to_bytes(id.decode()) - for id in self.type_to_ids[b'tree']) - - def get_directories(self): - """Format the trees as swh directories""" - missing_dirs = set(self.storage.directory_missing( - sorted(self.get_directory_ids()))) - - for raw_obj in self.get_inflater(): - if raw_obj.type_name != b'tree': - continue - - if raw_obj.sha().digest() not in missing_dirs: - continue - - yield converters.dulwich_tree_to_directory(raw_obj, log=self.log) - - def has_revisions(self): - return bool(self.type_to_ids[b'commit']) - - def get_revision_ids(self): - """Get the revision identifiers from the git repository""" - return (hashutil.hash_to_bytes(id.decode()) - for id in self.type_to_ids[b'commit']) - - def get_revisions(self): - """Format commits as swh revisions""" - missing_revs = set(self.storage.revision_missing( - sorted(self.get_revision_ids()))) - - for raw_obj in self.get_inflater(): - if raw_obj.type_name != b'commit': - continue - - if raw_obj.sha().digest() not in missing_revs: - continue - - yield converters.dulwich_commit_to_revision(raw_obj, log=self.log) - - def has_releases(self): - return bool(self.type_to_ids[b'tag']) - - def get_release_ids(self): - """Get the release identifiers from the git repository""" - return (hashutil.hash_to_bytes(id.decode()) - for id in self.type_to_ids[b'tag']) - - def get_releases(self): - """Retrieve all the release objects from the git repository""" - missing_rels = set(self.storage.release_missing( - sorted(self.get_release_ids()))) - - for raw_obj in self.get_inflater(): - if raw_obj.type_name != b'tag': - continue - - if raw_obj.sha().digest() not in missing_rels: - continue - - yield converters.dulwich_tag_to_release(raw_obj, log=self.log) - - def get_snapshot(self): - branches = {} - - for ref in self.remote_refs: - ret_ref = self.local_refs[ref].copy() - if not ret_ref['target_type']: - target_type = self.id_to_type[ret_ref['target']] - ret_ref['target_type'] = converters.DULWICH_TYPES[target_type] - - ret_ref['target'] = hashutil.bytehex_to_hash(ret_ref['target']) - - branches[ref] = ret_ref - - self.snapshot = converters.branches_to_snapshot(branches) - return self.snapshot - - def get_fetch_history_result(self): - return { - 'contents': len(self.type_to_ids[b'blob']), - 'directories': len(self.type_to_ids[b'tree']), - 'revisions': len(self.type_to_ids[b'commit']), - 'releases': len(self.type_to_ids[b'tag']), - } - - def load_status(self): - """The load was eventful if the current snapshot is different to - the one we retrieved at the beginning of the run""" - eventful = False - - if self.base_snapshot: - eventful = self.snapshot['id'] != self.base_snapshot['id'] - else: - eventful = bool(self.snapshot['branches']) - - return {'status': ('eventful' if eventful else 'uneventful')} - - -if __name__ == '__main__': - import click - - logging.basicConfig( - level=logging.DEBUG, - format='%(asctime)s %(process)d %(message)s' - ) - - @click.command() - @click.option('--origin-url', help='Origin url', required=True) - @click.option('--base-url', default=None, help='Optional Base url') - @click.option('--ignore-history/--no-ignore-history', - help='Ignore the repository history', default=False) - def main(origin_url, base_url, ignore_history): - return BulkUpdater().load( - origin_url, - base_url=base_url, - ignore_history=ignore_history, - ) - - main()