Page MenuHomeSoftware Heritage

D834.id2657.diff
No OneTemporary

D834.id2657.diff

diff --git a/README.md b/README.md
--- a/README.md
+++ b/README.md
@@ -48,9 +48,12 @@
Configuration
-------------
-You can run the loader or the updater directly by calling:
+You can run the loader from a remote origin (*loader*) or from an
+origin on disk (*from_disk*) directly by calling:
+
+
```
-python3 -m swh.loader.git.{loader,updater}
+python3 -m swh.loader.git.{loader,from_disk}
```
### Location
@@ -66,7 +69,8 @@
### Configuration sample
-$SWH_CONFIG_PATH/loader/git-{loader,updater}.yml:
+Respectively the loader from a remote (`git.yml`) and the loader from
+a disk (`git-disk.yml`), $SWH_CONFIG_PATH/loader/git{-disk}.yml:
```
storage:
cls: remote
diff --git a/swh/loader/git/loader.py b/swh/loader/git/from_disk.py
copy from swh/loader/git/loader.py
copy to swh/loader/git/from_disk.py
--- a/swh/loader/git/loader.py
+++ b/swh/loader/git/from_disk.py
@@ -16,11 +16,12 @@
from . import converters, utils
-class GitLoader(UnbufferedLoader):
+class GitLoaderFromDisk(UnbufferedLoader):
"""Load a git repository from a directory.
+
"""
- CONFIG_BASE_FILENAME = 'loader/git-loader'
+ CONFIG_BASE_FILENAME = 'loader/git-disk'
def __init__(self, config=None):
super().__init__(logging_class='swh.loader.git.Loader', config=config)
@@ -254,7 +255,7 @@
return {'status': ('eventful' if eventful else 'uneventful')}
-class GitLoaderFromArchive(GitLoader):
+class GitLoaderFromArchive(GitLoaderFromDisk):
"""Load a git repository from an archive.
This loader ingests a git repository compressed into an archive.
@@ -316,8 +317,8 @@
def prepare(self, origin_url, archive_path, visit_date):
"""1. Uncompress the archive in temporary location.
- 2. Prepare as the GitLoader does
- 3. Load as GitLoader does
+ 2. Prepare as the GitLoaderFromDisk does
+ 3. Load as GitLoaderFromDisk does
"""
project_name = self.project_name_from_archive(archive_path)
@@ -355,6 +356,6 @@
if not visit_date:
visit_date = datetime.datetime.now(tz=datetime.timezone.utc)
- return GitLoader().load(origin_url, git_directory, visit_date)
+ return GitLoaderFromDisk().load(origin_url, git_directory, visit_date)
main()
diff --git a/swh/loader/git/loader.py b/swh/loader/git/loader.py
--- a/swh/loader/git/loader.py
+++ b/swh/loader/git/loader.py
@@ -1,164 +1,395 @@
-# Copyright (C) 2015-2018 The Software Heritage developers
+# Copyright (C) 2016-2018 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import datetime
-import dulwich.repo
+import dulwich.client
+import logging
import os
-import shutil
+import pickle
+import sys
-from dulwich.errors import ObjectFormatException, EmptyFileException
from collections import defaultdict
+from io import BytesIO
+from dulwich.object_store import ObjectStoreGraphWalker
+from dulwich.pack import PackData, PackInflater
from swh.model import hashutil
from swh.loader.core.loader import UnbufferedLoader
-from . import converters, utils
+from swh.storage.algos.snapshot import snapshot_get_all_branches
+from . import converters
-class GitLoader(UnbufferedLoader):
- """Load a git repository from a directory.
- """
+class RepoRepresentation:
+ """Repository representation for a Software Heritage origin."""
+ def __init__(self, storage, origin_id, base_snapshot=None,
+ ignore_history=False):
+ self.storage = storage
- CONFIG_BASE_FILENAME = 'loader/git-loader'
+ self._parents_cache = {}
+ self._type_cache = {}
- def __init__(self, config=None):
- super().__init__(logging_class='swh.loader.git.Loader', config=config)
+ self.ignore_history = ignore_history
- def _prepare_origin_visit(self, origin_url, visit_date):
- self.origin_url = origin_url
- self.origin = converters.origin_url_to_origin(self.origin_url)
- self.visit_date = visit_date
+ if origin_id and not ignore_history:
+ self.heads = set(self._cache_heads(origin_id, base_snapshot))
+ else:
+ self.heads = set()
+
+ def _fill_parents_cache(self, commits):
+ """When querying for a commit's parents, we fill the cache to a depth of 1000
+ commits."""
+ root_revs = self._encode_for_storage(commits)
+ for rev, parents in self.storage.revision_shortlog(root_revs, 1000):
+ rev_id = hashutil.hash_to_bytehex(rev)
+ if rev_id not in self._parents_cache:
+ self._parents_cache[rev_id] = [
+ hashutil.hash_to_bytehex(parent) for parent in parents
+ ]
+ for rev in commits:
+ if rev not in self._parents_cache:
+ self._parents_cache[rev] = []
+
+ def _cache_heads(self, origin_id, base_snapshot):
+ """Return all the known head commits for `origin_id`"""
+ _git_types = ['content', 'directory', 'revision', 'release']
+
+ if not base_snapshot:
+ return []
+
+ snapshot_targets = set()
+ for target in base_snapshot['branches'].values():
+ if target and target['target_type'] in _git_types:
+ snapshot_targets.add(target['target'])
+
+ decoded_targets = self._decode_from_storage(snapshot_targets)
+
+ for id, objs in self.get_stored_objects(decoded_targets).items():
+ if not objs:
+ logging.warn('Missing head: %s' % hashutil.hash_to_hex(id))
+ return []
+
+ return decoded_targets
+
+ def get_parents(self, commit):
+ """Bogus method to prevent expensive recursion, at the expense of less
+ efficient downloading"""
+ return []
+
+ def get_heads(self):
+ return self.heads
+
+ @staticmethod
+ def _encode_for_storage(objects):
+ return [hashutil.bytehex_to_hash(object) for object in objects]
+
+ @staticmethod
+ def _decode_from_storage(objects):
+ return set(hashutil.hash_to_bytehex(object) for object in objects)
+
+ def graph_walker(self):
+ return ObjectStoreGraphWalker(self.get_heads(), self.get_parents)
+
+ @staticmethod
+ def filter_unwanted_refs(refs):
+ """Filter the unwanted references from refs"""
+ ret = {}
+ for ref, val in refs.items():
+ if ref.endswith(b'^{}'):
+ # Peeled refs make the git protocol explode
+ continue
+ elif ref.startswith(b'refs/pull/') and ref.endswith(b'/merge'):
+ # We filter-out auto-merged GitHub pull requests
+ continue
+ else:
+ ret[ref] = val
- def prepare_origin_visit(self, origin_url, directory, visit_date):
- self._prepare_origin_visit(origin_url, visit_date)
+ return ret
- def prepare(self, origin_url, directory, visit_date):
- self.repo = dulwich.repo.Repo(directory)
+ def determine_wants(self, refs):
+ """Filter the remote references to figure out which ones
+ Software Heritage needs.
+ """
+ if not refs:
+ return []
- def iter_objects(self):
- object_store = self.repo.object_store
+ # Find what objects Software Heritage has
+ refs = self.find_remote_ref_types_in_swh(refs)
- for pack in object_store.packs:
- objs = list(pack.index.iterentries())
- objs.sort(key=lambda x: x[1])
- for sha, offset, crc32 in objs:
- yield hashutil.hash_to_bytehex(sha)
+ # Cache the objects found in swh as existing heads
+ for target in refs.values():
+ if target['target_type'] is not None:
+ self.heads.add(target['target'])
- yield from object_store._iter_loose_objects()
- yield from object_store._iter_alternate_objects()
+ ret = set()
+ for target in self.filter_unwanted_refs(refs).values():
+ if target['target_type'] is None:
+ # The target doesn't exist in Software Heritage, let's retrieve
+ # it.
+ ret.add(target['target'])
- def _check(self, obj):
- """Check the object's repository representation.
+ return list(ret)
- If any errors in check exists, an ObjectFormatException is
- raised.
-
- Args:
- obj (object): Dulwich object read from the repository.
+ def get_stored_objects(self, objects):
+ """Find which of these objects were stored in the archive.
+ Do the request in packets to avoid a server timeout.
+ """
+ if self.ignore_history:
+ return {}
+
+ packet_size = 1000
+
+ ret = {}
+ query = []
+ for object in objects:
+ query.append(object)
+ if len(query) >= packet_size:
+ ret.update(
+ self.storage.object_find_by_sha1_git(
+ self._encode_for_storage(query)
+ )
+ )
+ query = []
+ if query:
+ ret.update(
+ self.storage.object_find_by_sha1_git(
+ self._encode_for_storage(query)
+ )
+ )
+ return ret
+
+ def find_remote_ref_types_in_swh(self, remote_refs):
+ """Parse the remote refs information and list the objects that exist in
+ Software Heritage.
"""
- obj.check()
- from dulwich.objects import Commit, Tag
- try:
- # For additional checks on dulwich objects with date
- # for now, only checks on *time
- if isinstance(obj, Commit):
- commit_time = obj._commit_time
- utils.check_date_time(commit_time)
- author_time = obj._author_time
- utils.check_date_time(author_time)
- elif isinstance(obj, Tag):
- tag_time = obj._tag_time
- utils.check_date_time(tag_time)
- except Exception as e:
- raise ObjectFormatException(e)
-
- def get_object(self, oid):
- """Given an object id, return the object if it is found and not
- malformed in some way.
- Args:
- oid (bytes): the object's identifier
+ all_objs = set(remote_refs.values()) - set(self._type_cache)
+ type_by_id = {}
+
+ for id, objs in self.get_stored_objects(all_objs).items():
+ id = hashutil.hash_to_bytehex(id)
+ if objs:
+ type_by_id[id] = objs[0]['type']
+
+ self._type_cache.update(type_by_id)
+
+ ret = {}
+ for ref, id in remote_refs.items():
+ ret[ref] = {
+ 'target': id,
+ 'target_type': self._type_cache.get(id),
+ }
+ return ret
- Returns:
- The object if found without malformation
+
+class GitLoader(UnbufferedLoader):
+ """A bulk loader for a git repository"""
+ CONFIG_BASE_FILENAME = 'loader/git'
+
+ ADDITIONAL_CONFIG = {
+ 'pack_size_bytes': ('int', 4 * 1024 * 1024 * 1024),
+ }
+
+ def __init__(self, repo_representation=RepoRepresentation, config=None):
+ """Initialize the bulk updater.
+
+ Args:
+ repo_representation: swh's repository representation
+ which is in charge of filtering between known and remote
+ data.
"""
- try:
- # some errors are raised when reading the object
- obj = self.repo[oid]
- # some we need to check ourselves
- self._check(obj)
- except KeyError:
- _id = oid.decode('utf-8')
- self.log.warn('object %s not found, skipping' % _id,
- extra={
- 'swh_type': 'swh_loader_git_missing_object',
- 'swh_object_id': _id,
- 'origin_id': self.origin_id,
- })
- return None
- except ObjectFormatException:
- _id = oid.decode('utf-8')
- self.log.warn('object %s malformed, skipping' % _id,
- extra={
- 'swh_type': 'swh_loader_git_missing_object',
- 'swh_object_id': _id,
- 'origin_id': self.origin_id,
- })
- return None
- except EmptyFileException:
- _id = oid.decode('utf-8')
- self.log.warn('object %s corrupted (empty file), skipping' % _id,
- extra={
- 'swh_type': 'swh_loader_git_missing_object',
- 'swh_object_id': _id,
- 'origin_id': self.origin_id,
- })
+ super().__init__(logging_class='swh.loader.git.BulkLoader',
+ config=config)
+ self.repo_representation = repo_representation
+
+ def fetch_pack_from_origin(self, origin_url, base_origin_id,
+ base_snapshot, do_activity):
+ """Fetch a pack from the origin"""
+ pack_buffer = BytesIO()
+
+ base_repo = self.repo_representation(
+ storage=self.storage,
+ origin_id=base_origin_id,
+ base_snapshot=base_snapshot,
+ ignore_history=self.ignore_history,
+ )
+
+ client, path = dulwich.client.get_transport_and_path(origin_url,
+ thin_packs=False)
+
+ size_limit = self.config['pack_size_bytes']
+
+ def do_pack(data,
+ pack_buffer=pack_buffer,
+ limit=size_limit,
+ origin_url=origin_url):
+ cur_size = pack_buffer.tell()
+ would_write = len(data)
+ if cur_size + would_write > limit:
+ raise IOError('Pack file too big for repository %s, '
+ 'limit is %d bytes, current size is %d, '
+ 'would write %d' %
+ (origin_url, limit, cur_size, would_write))
+
+ pack_buffer.write(data)
+
+ remote_refs = client.fetch_pack(path,
+ base_repo.determine_wants,
+ base_repo.graph_walker(),
+ do_pack,
+ progress=do_activity).refs
+
+ if remote_refs:
+ local_refs = base_repo.find_remote_ref_types_in_swh(remote_refs)
else:
- return obj
+ local_refs = remote_refs = {}
+
+ pack_buffer.flush()
+ pack_size = pack_buffer.tell()
+ pack_buffer.seek(0)
+
+ return {
+ 'remote_refs': base_repo.filter_unwanted_refs(remote_refs),
+ 'local_refs': local_refs,
+ 'pack_buffer': pack_buffer,
+ 'pack_size': pack_size,
+ }
+
+ def list_pack(self, pack_data, pack_size):
+ id_to_type = {}
+ type_to_ids = defaultdict(set)
+
+ inflater = self.get_inflater()
+
+ for obj in inflater:
+ type, id = obj.type_name, obj.id
+ id_to_type[id] = type
+ type_to_ids[type].add(id)
+
+ return id_to_type, type_to_ids
+
+ def prepare_origin_visit(self, origin_url, **kwargs):
+ self.visit_date = datetime.datetime.now(tz=datetime.timezone.utc)
+ self.origin = converters.origin_url_to_origin(origin_url)
+
+ def get_full_snapshot(self, origin_id):
+ prev_snapshot = self.storage.snapshot_get_latest(origin_id)
+ if prev_snapshot and prev_snapshot.pop('next_branch', None):
+ return snapshot_get_all_branches(self.storage, prev_snapshot['id'])
+
+ return prev_snapshot
+
+ def prepare(self, origin_url, base_url=None, ignore_history=False):
+ base_origin_id = origin_id = self.origin_id
+
+ prev_snapshot = None
+
+ if not ignore_history:
+ prev_snapshot = self.get_full_snapshot(origin_id)
+
+ if base_url and not prev_snapshot:
+ base_origin = converters.origin_url_to_origin(base_url)
+ base_origin = self.storage.origin_get(base_origin)
+ if base_origin:
+ base_origin_id = base_origin['id']
+ prev_snapshot = self.get_full_snapshot(base_origin_id)
+
+ self.base_snapshot = prev_snapshot
+ self.base_origin_id = base_origin_id
+ self.ignore_history = ignore_history
def fetch_data(self):
- """Fetch the data from the data source"""
- self.previous_snapshot = self.storage.snapshot_get_latest(
- self.origin_id
- )
+ def do_progress(msg):
+ sys.stderr.buffer.write(msg)
+ sys.stderr.flush()
- type_to_ids = defaultdict(list)
- for oid in self.iter_objects():
- obj = self.get_object(oid)
- if not obj:
- continue
- type_name = obj.type_name
- type_to_ids[type_name].append(oid)
+ fetch_info = self.fetch_pack_from_origin(
+ self.origin['url'], self.base_origin_id, self.base_snapshot,
+ do_progress)
+
+ self.pack_buffer = fetch_info['pack_buffer']
+ self.pack_size = fetch_info['pack_size']
+
+ self.remote_refs = fetch_info['remote_refs']
+ self.local_refs = fetch_info['local_refs']
+
+ origin_url = self.origin['url']
+
+ self.log.info('Listed %d refs for repo %s' % (
+ len(self.remote_refs), origin_url), extra={
+ 'swh_type': 'git_repo_list_refs',
+ 'swh_repo': origin_url,
+ 'swh_num_refs': len(self.remote_refs),
+ })
+ # We want to load the repository, walk all the objects
+ id_to_type, type_to_ids = self.list_pack(self.pack_buffer,
+ self.pack_size)
+
+ self.id_to_type = id_to_type
self.type_to_ids = type_to_ids
+ def save_data(self):
+ """Store a pack for archival"""
+
+ write_size = 8192
+ pack_dir = self.get_save_data_path()
+
+ pack_name = "%s.pack" % self.visit_date.isoformat()
+ refs_name = "%s.refs" % self.visit_date.isoformat()
+
+ with open(os.path.join(pack_dir, pack_name), 'xb') as f:
+ self.pack_buffer.seek(0)
+ while True:
+ r = self.pack_buffer.read(write_size)
+ if not r:
+ break
+ f.write(r)
+
+ self.pack_buffer.seek(0)
+
+ with open(os.path.join(pack_dir, refs_name), 'xb') as f:
+ pickle.dump(self.remote_refs, f)
+
+ def get_inflater(self):
+ """Reset the pack buffer and get an object inflater from it"""
+ self.pack_buffer.seek(0)
+ return PackInflater.for_pack_data(
+ PackData.from_file(self.pack_buffer, self.pack_size))
+
def has_contents(self):
- """Checks whether we need to load contents"""
return bool(self.type_to_ids[b'blob'])
def get_content_ids(self):
"""Get the content identifiers from the git repository"""
- for oid in self.type_to_ids[b'blob']:
- yield converters.dulwich_blob_to_content_id(self.repo[oid])
+ for raw_obj in self.get_inflater():
+ if raw_obj.type_name != b'blob':
+ continue
+
+ yield converters.dulwich_blob_to_content_id(raw_obj)
def get_contents(self):
- """Get the contents that need to be loaded"""
+ """Format the blobs from the git repository as swh contents"""
max_content_size = self.config['content_size_limit']
missing_contents = set(self.storage.content_missing(
self.get_content_ids(), 'sha1_git'))
- for oid in missing_contents:
+ for raw_obj in self.get_inflater():
+ if raw_obj.type_name != b'blob':
+ continue
+
+ if raw_obj.sha().digest() not in missing_contents:
+ continue
+
yield converters.dulwich_blob_to_content(
- self.repo[hashutil.hash_to_bytehex(oid)], log=self.log,
- max_content_size=max_content_size,
+ raw_obj, log=self.log, max_content_size=max_content_size,
origin_id=self.origin_id)
def has_directories(self):
- """Checks whether we need to load directories"""
return bool(self.type_to_ids[b'tree'])
def get_directory_ids(self):
@@ -167,16 +398,20 @@
for id in self.type_to_ids[b'tree'])
def get_directories(self):
- """Get the directories that need to be loaded"""
+ """Format the trees as swh directories"""
missing_dirs = set(self.storage.directory_missing(
sorted(self.get_directory_ids())))
- for oid in missing_dirs:
- yield converters.dulwich_tree_to_directory(
- self.repo[hashutil.hash_to_bytehex(oid)], log=self.log)
+ for raw_obj in self.get_inflater():
+ if raw_obj.type_name != b'tree':
+ continue
+
+ if raw_obj.sha().digest() not in missing_dirs:
+ continue
+
+ yield converters.dulwich_tree_to_directory(raw_obj, log=self.log)
def has_revisions(self):
- """Checks whether we need to load revisions"""
return bool(self.type_to_ids[b'commit'])
def get_revision_ids(self):
@@ -185,16 +420,20 @@
for id in self.type_to_ids[b'commit'])
def get_revisions(self):
- """Get the revisions that need to be loaded"""
+ """Format commits as swh revisions"""
missing_revs = set(self.storage.revision_missing(
sorted(self.get_revision_ids())))
- for oid in missing_revs:
- yield converters.dulwich_commit_to_revision(
- self.repo[hashutil.hash_to_bytehex(oid)], log=self.log)
+ for raw_obj in self.get_inflater():
+ if raw_obj.type_name != b'commit':
+ continue
+
+ if raw_obj.sha().digest() not in missing_revs:
+ continue
+
+ yield converters.dulwich_commit_to_revision(raw_obj, log=self.log)
def has_releases(self):
- """Checks whether we need to load releases"""
return bool(self.type_to_ids[b'tag'])
def get_release_ids(self):
@@ -203,33 +442,36 @@
for id in self.type_to_ids[b'tag'])
def get_releases(self):
- """Get the releases that need to be loaded"""
+ """Retrieve all the release objects from the git repository"""
missing_rels = set(self.storage.release_missing(
sorted(self.get_release_ids())))
- for oid in missing_rels:
- yield converters.dulwich_tag_to_release(
- self.repo[hashutil.hash_to_bytehex(oid)], log=self.log)
+ for raw_obj in self.get_inflater():
+ if raw_obj.type_name != b'tag':
+ continue
+
+ if raw_obj.sha().digest() not in missing_rels:
+ continue
+
+ yield converters.dulwich_tag_to_release(raw_obj, log=self.log)
def get_snapshot(self):
- """Turn the list of branches into a snapshot to load"""
branches = {}
- for ref, target in self.repo.refs.as_dict().items():
- obj = self.get_object(target)
- if obj:
- branches[ref] = {
- 'target': hashutil.bytehex_to_hash(target),
- 'target_type': converters.DULWICH_TYPES[obj.type_name],
- }
- else:
- branches[ref] = None
+ for ref in self.remote_refs:
+ ret_ref = self.local_refs[ref].copy()
+ if not ret_ref['target_type']:
+ target_type = self.id_to_type[ret_ref['target']]
+ ret_ref['target_type'] = converters.DULWICH_TYPES[target_type]
+
+ ret_ref['target'] = hashutil.bytehex_to_hash(ret_ref['target'])
+
+ branches[ref] = ret_ref
self.snapshot = converters.branches_to_snapshot(branches)
return self.snapshot
def get_fetch_history_result(self):
- """Return the data to store in fetch_history for the current loader"""
return {
'contents': len(self.type_to_ids[b'blob']),
'directories': len(self.type_to_ids[b'tree']),
@@ -237,110 +479,21 @@
'releases': len(self.type_to_ids[b'tag']),
}
- def save_data(self):
- """We already have the data locally, no need to save it"""
- pass
-
def load_status(self):
- """The load was eventful if the current occurrences are different to
- the ones we retrieved at the beginning of the run"""
+ """The load was eventful if the current snapshot is different to
+ the one we retrieved at the beginning of the run"""
eventful = False
- if self.previous_snapshot:
- eventful = self.snapshot['id'] != self.previous_snapshot['id']
+ if self.base_snapshot:
+ eventful = self.snapshot['id'] != self.base_snapshot['id']
else:
eventful = bool(self.snapshot['branches'])
return {'status': ('eventful' if eventful else 'uneventful')}
-class GitLoaderFromArchive(GitLoader):
- """Load a git repository from an archive.
-
- This loader ingests a git repository compressed into an archive.
- The supported archive formats are ``.zip`` and ``.tar.gz``.
-
- From an input tarball named ``my-git-repo.zip``, the following layout is
- expected in it::
-
- my-git-repo/
- ├── .git
- │ ├── branches
- │ ├── COMMIT_EDITMSG
- │ ├── config
- │ ├── description
- │ ├── HEAD
- ...
-
- Nevertheless, the loader is able to ingest tarballs with the following
- layouts too::
-
- .
- ├── .git
- │ ├── branches
- │ ├── COMMIT_EDITMSG
- │ ├── config
- │ ├── description
- │ ├── HEAD
- ...
-
- or::
-
- other-repo-name/
- ├── .git
- │ ├── branches
- │ ├── COMMIT_EDITMSG
- │ ├── config
- │ ├── description
- │ ├── HEAD
- ...
-
- """
- def __init__(self, *args, **kwargs):
- super().__init__(*args, **kwargs)
- self.temp_dir = self.repo_path = None
-
- def project_name_from_archive(self, archive_path):
- """Compute the project name from the archive's path.
-
- """
- archive_name = os.path.basename(archive_path)
- for ext in ('.zip', '.tar.gz', '.tgz'):
- if archive_name.lower().endswith(ext):
- archive_name = archive_name[:-len(ext)]
- break
- return archive_name
-
- def prepare_origin_visit(self, origin_url, archive_path, visit_date):
- self._prepare_origin_visit(origin_url, visit_date)
-
- def prepare(self, origin_url, archive_path, visit_date):
- """1. Uncompress the archive in temporary location.
- 2. Prepare as the GitLoader does
- 3. Load as GitLoader does
-
- """
- project_name = self.project_name_from_archive(archive_path)
- self.temp_dir, self.repo_path = utils.init_git_repo_from_archive(
- project_name, archive_path)
-
- self.log.info('Project %s - Uncompressing archive %s at %s' % (
- origin_url, os.path.basename(archive_path), self.repo_path))
- super().prepare(origin_url, self.repo_path, visit_date)
-
- def cleanup(self):
- """Cleanup the temporary location (if it exists).
-
- """
- if self.temp_dir and os.path.exists(self.temp_dir):
- shutil.rmtree(self.temp_dir)
- self.log.info('Project %s - Done injecting %s' % (
- self.origin_url, self.repo_path))
-
-
if __name__ == '__main__':
import click
- import logging
logging.basicConfig(
level=logging.DEBUG,
@@ -348,13 +501,15 @@
)
@click.command()
- @click.option('--origin-url', help='origin url')
- @click.option('--git-directory', help='Path to git repository to load')
- @click.option('--visit-date', default=None, help='Visit date')
- def main(origin_url, git_directory, visit_date):
- if not visit_date:
- visit_date = datetime.datetime.now(tz=datetime.timezone.utc)
-
- return GitLoader().load(origin_url, git_directory, visit_date)
+ @click.option('--origin-url', help='Origin url', required=True)
+ @click.option('--base-url', default=None, help='Optional Base url')
+ @click.option('--ignore-history/--no-ignore-history',
+ help='Ignore the repository history', default=False)
+ def main(origin_url, base_url, ignore_history):
+ return GitLoader().load(
+ origin_url,
+ base_url=base_url,
+ ignore_history=ignore_history,
+ )
main()
diff --git a/swh/loader/git/tasks.py b/swh/loader/git/tasks.py
--- a/swh/loader/git/tasks.py
+++ b/swh/loader/git/tasks.py
@@ -1,4 +1,4 @@
-# Copyright (C) 2015-2017 The Software Heritage developers
+# Copyright (C) 2015-2018 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
@@ -7,8 +7,8 @@
from swh.scheduler.task import Task
-from .loader import GitLoader, GitLoaderFromArchive
-from .updater import BulkUpdater
+from .from_disk import GitLoaderFromDisk, GitLoaderFromArchive
+from .loader import GitLoader
# TODO: rename to LoadRemoteGitRepository
@@ -18,7 +18,7 @@
def run_task(self, repo_url, base_url=None):
"""Import a git repository"""
- loader = BulkUpdater()
+ loader = GitLoader()
loader.log = self.log
return loader.load(repo_url, base_url=base_url)
@@ -32,7 +32,7 @@
"""Import a git repository, cloned in `directory` from `origin_url` at
`date`."""
- loader = GitLoader()
+ loader = GitLoaderFromDisk()
loader.log = self.log
return loader.load(origin_url, directory, dateutil.parser.parse(date))
diff --git a/swh/loader/git/tests/test_loader.py b/swh/loader/git/tests/test_from_disk.py
copy from swh/loader/git/tests/test_loader.py
copy to swh/loader/git/tests/test_from_disk.py
--- a/swh/loader/git/tests/test_loader.py
+++ b/swh/loader/git/tests/test_from_disk.py
@@ -6,7 +6,8 @@
import os.path
import subprocess
-from swh.loader.git.loader import GitLoader, GitLoaderFromArchive
+
+from swh.loader.git.from_disk import GitLoaderFromDisk, GitLoaderFromArchive
from swh.loader.core.tests import BaseLoaderTest
from . import TEST_LOADER_CONFIG
@@ -93,7 +94,7 @@
}
-class BaseGitLoaderTest(BaseLoaderTest):
+class BaseGitLoaderFromDiskTest(BaseLoaderTest):
def setUp(self, archive_name, uncompress_archive, filename='testrepo'):
super().setUp(archive_name=archive_name, filename=filename,
prefix_tmp_folder_name='swh.loader.git.',
@@ -101,12 +102,12 @@
uncompress_archive=uncompress_archive)
-class GitLoaderTest(GitLoader):
+class GitLoaderFromDiskTest(GitLoaderFromDisk):
def parse_config_file(self, *args, **kwargs):
return TEST_LOADER_CONFIG
-class BaseDirGitLoaderTest(BaseGitLoaderTest):
+class BaseDirGitLoaderFromDiskTest(BaseGitLoaderFromDiskTest):
"""Mixin base loader test to prepare the git
repository to uncompress, load and test the results.
@@ -115,7 +116,7 @@
"""
def setUp(self):
super().setUp('testrepo.tgz', uncompress_archive=True)
- self.loader = GitLoaderTest()
+ self.loader = GitLoaderFromDiskTest()
self.storage = self.loader.storage
def load(self):
@@ -125,7 +126,7 @@
directory=self.destination_path)
-class BaseGitLoaderFromArchiveTest(BaseGitLoaderTest):
+class BaseGitLoaderFromArchiveTest(BaseGitLoaderFromDiskTest):
"""Mixin base loader test to prepare the git
repository to uncompress, load and test the results.
@@ -144,7 +145,7 @@
archive_path=self.destination_path)
-class GitLoaderTests:
+class GitLoaderFromDiskTests:
"""Common tests for all git loaders."""
def test_load(self):
"""Loads a simple repository (made available by `setUp()`),
@@ -176,8 +177,8 @@
self.assertCountSnapshots(1)
-class DirGitLoaderTest(BaseDirGitLoaderTest, GitLoaderTests):
- """Tests for the GitLoader. Includes the common ones, and
+class DirGitLoaderTest(BaseDirGitLoaderFromDiskTest, GitLoaderFromDiskTests):
+ """Tests for the GitLoaderFromDisk. Includes the common ones, and
add others that only work with a local dir."""
def _git(self, *cmd):
@@ -255,7 +256,8 @@
self.assertEqual(self.loader.visit_status(), 'full')
-class GitLoaderFromArchiveTest(BaseGitLoaderFromArchiveTest, GitLoaderTests):
+class GitLoaderFromArchiveTest(BaseGitLoaderFromArchiveTest,
+ GitLoaderFromDiskTests):
"""Tests for GitLoaderFromArchive. Imports the common ones
from GitLoaderTests."""
pass
diff --git a/swh/loader/git/tests/test_loader.py b/swh/loader/git/tests/test_loader.py
--- a/swh/loader/git/tests/test_loader.py
+++ b/swh/loader/git/tests/test_loader.py
@@ -3,259 +3,26 @@
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
-import os.path
-import subprocess
-from swh.loader.git.loader import GitLoader, GitLoaderFromArchive
-from swh.loader.core.tests import BaseLoaderTest
-
-from . import TEST_LOADER_CONFIG
-
-
-class GitLoaderFromArchive(GitLoaderFromArchive):
- def project_name_from_archive(self, archive_path):
- # We don't want the project name to be 'resources'.
- return 'testrepo'
-
- def parse_config_file(self, *args, **kwargs):
- return TEST_LOADER_CONFIG
-
-
-CONTENT1 = {
- '33ab5639bfd8e7b95eb1d8d0b87781d4ffea4d5d', # README v1
- '349c4ff7d21f1ec0eda26f3d9284c293e3425417', # README v2
- '799c11e348d39f1704022b8354502e2f81f3c037', # file1.txt
- '4bdb40dfd6ec75cb730e678b5d7786e30170c5fb', # file2.txt
-}
-
-SNAPSHOT_ID = 'bdf3b06d6017e0d9ad6447a73da6ff1ae9efb8f0'
-
-SNAPSHOT1 = {
- 'id': SNAPSHOT_ID,
- 'branches': {
- 'HEAD': {
- 'target': '2f01f5ca7e391a2f08905990277faf81e709a649',
- 'target_type': 'revision',
- },
- 'refs/heads/master': {
- 'target': '2f01f5ca7e391a2f08905990277faf81e709a649',
- 'target_type': 'revision',
- },
- 'refs/heads/branch1': {
- 'target': 'b0a77609903f767a2fd3d769904ef9ef68468b87',
- 'target_type': 'revision',
- },
- 'refs/heads/branch2': {
- 'target': 'bd746cd1913721b269b395a56a97baf6755151c2',
- 'target_type': 'revision',
- },
- 'refs/tags/branch2-after-delete': {
- 'target': 'bd746cd1913721b269b395a56a97baf6755151c2',
- 'target_type': 'revision',
- },
- 'refs/tags/branch2-before-delete': {
- 'target': '1135e94ccf73b5f9bd6ef07b3fa2c5cc60bba69b',
- 'target_type': 'revision',
- },
- },
-}
-
-# directory hashes obtained with:
-# gco b6f40292c4e94a8f7e7b4aff50e6c7429ab98e2a
-# swh-hashtree --ignore '.git' --path .
-# gco 2f01f5ca7e391a2f08905990277faf81e709a649
-# swh-hashtree --ignore '.git' --path .
-# gco bcdc5ebfde1a3cd6c96e0c2ea4eed19c13208777
-# swh-hashtree --ignore '.git' --path .
-# gco 1135e94ccf73b5f9bd6ef07b3fa2c5cc60bba69b
-# swh-hashtree --ignore '.git' --path .
-# gco 79f65ac75f79dda6ff03d66e1242702ab67fb51c
-# swh-hashtree --ignore '.git' --path .
-# gco b0a77609903f767a2fd3d769904ef9ef68468b87
-# swh-hashtree --ignore '.git' --path .
-# gco bd746cd1913721b269b395a56a97baf6755151c2
-# swh-hashtree --ignore '.git' --path .
-REVISIONS1 = {
- 'b6f40292c4e94a8f7e7b4aff50e6c7429ab98e2a':
- '40dbdf55dfd4065422462cc74a949254aefa972e',
- '2f01f5ca7e391a2f08905990277faf81e709a649':
- 'e1d0d894835f91a0f887a4bc8b16f81feefdfbd5',
- 'bcdc5ebfde1a3cd6c96e0c2ea4eed19c13208777':
- 'b43724545b4759244bb54be053c690649161411c',
- '1135e94ccf73b5f9bd6ef07b3fa2c5cc60bba69b':
- 'fbf70528223d263661b5ad4b80f26caf3860eb8e',
- '79f65ac75f79dda6ff03d66e1242702ab67fb51c':
- '5df34ec74d6f69072d9a0a6677d8efbed9b12e60',
- 'b0a77609903f767a2fd3d769904ef9ef68468b87':
- '9ca0c7d6ffa3f9f0de59fd7912e08f11308a1338',
- 'bd746cd1913721b269b395a56a97baf6755151c2':
- 'e1d0d894835f91a0f887a4bc8b16f81feefdfbd5',
-}
-
-
-class BaseGitLoaderTest(BaseLoaderTest):
- def setUp(self, archive_name, uncompress_archive, filename='testrepo'):
- super().setUp(archive_name=archive_name, filename=filename,
- prefix_tmp_folder_name='swh.loader.git.',
- start_path=os.path.dirname(__file__),
- uncompress_archive=uncompress_archive)
+from swh.loader.git.loader import GitLoader
+from swh.loader.git.tests.test_from_disk import DirGitLoaderTest
class GitLoaderTest(GitLoader):
def parse_config_file(self, *args, **kwargs):
- return TEST_LOADER_CONFIG
-
-
-class BaseDirGitLoaderTest(BaseGitLoaderTest):
- """Mixin base loader test to prepare the git
- repository to uncompress, load and test the results.
+ return {
+ **super().parse_config_file(*args, **kwargs),
+ 'storage': {'cls': 'memory', 'args': {}}
+ }
- This sets up
- """
+class TestGitLoader(DirGitLoaderTest):
+ """Same tests as for the GitLoaderFromDisk, but running on GitLoader."""
def setUp(self):
- super().setUp('testrepo.tgz', uncompress_archive=True)
+ super().setUp()
self.loader = GitLoaderTest()
self.storage = self.loader.storage
def load(self):
return self.loader.load(
- origin_url=self.repo_url,
- visit_date='2016-05-03 15:16:32+00',
- directory=self.destination_path)
-
-
-class BaseGitLoaderFromArchiveTest(BaseGitLoaderTest):
- """Mixin base loader test to prepare the git
- repository to uncompress, load and test the results.
-
- This sets up
-
- """
- def setUp(self):
- super().setUp('testrepo.tgz', uncompress_archive=False)
- self.loader = GitLoaderFromArchive()
- self.storage = self.loader.storage
-
- def load(self):
- return self.loader.load(
- origin_url=self.repo_url,
- visit_date='2016-05-03 15:16:32+00',
- archive_path=self.destination_path)
-
-
-class GitLoaderTests:
- """Common tests for all git loaders."""
- def test_load(self):
- """Loads a simple repository (made available by `setUp()`),
- and checks everything was added in the storage."""
- res = self.load()
- self.assertEqual(res['status'], 'eventful', res)
-
- self.assertContentsContain(CONTENT1)
- self.assertCountDirectories(7)
- self.assertCountReleases(0) # FIXME: why not 2?
- self.assertCountRevisions(7)
- self.assertCountSnapshots(1)
-
- self.assertRevisionsContain(REVISIONS1)
-
- self.assertSnapshotEqual(SNAPSHOT1)
-
- self.assertEqual(self.loader.load_status(), {'status': 'eventful'})
- self.assertEqual(self.loader.visit_status(), 'full')
-
- def test_load_unchanged(self):
- """Checks loading a repository a second time does not add
- any extra data."""
- res = self.load()
- self.assertEqual(res['status'], 'eventful')
-
- res = self.load()
- self.assertEqual(res['status'], 'uneventful')
- self.assertCountSnapshots(1)
-
-
-class DirGitLoaderTest(BaseDirGitLoaderTest, GitLoaderTests):
- """Tests for the GitLoader. Includes the common ones, and
- add others that only work with a local dir."""
-
- def _git(self, *cmd):
- """Small wrapper around subprocess to call Git."""
- try:
- return subprocess.check_output(
- ['git', '-C', self.destination_path] + list(cmd))
- except subprocess.CalledProcessError as e:
- print(e.output)
- print(e.stderr)
- raise
-
- def test_load_changed(self):
- """Loads a repository, makes some changes by adding files, commits,
- and merges, load it again, and check the storage contains everything
- it should."""
- # Initial load
- res = self.load()
- self.assertEqual(res['status'], 'eventful', res)
-
- self._git('config', '--local', 'user.email', 'you@example.com')
- self._git('config', '--local', 'user.name', 'Your Name')
-
- # Load with a new file + revision
- with open(os.path.join(self.destination_path, 'hello.py'), 'a') as fd:
- fd.write("print('Hello world')\n")
-
- self._git('add', 'hello.py')
- self._git('commit', '-m', 'Hello world')
- new_revision = self._git('rev-parse', 'master').decode().strip()
-
- revisions = REVISIONS1.copy()
- assert new_revision not in revisions
- revisions[new_revision] = '85dae072a5aa9923ffa7a7568f819ff21bf49858'
-
- res = self.load()
- self.assertEqual(res['status'], 'eventful')
-
- self.assertCountContents(4 + 1)
- self.assertCountDirectories(7 + 1)
- self.assertCountReleases(0) # FIXME: why not 2?
- self.assertCountRevisions(7 + 1)
- self.assertCountSnapshots(1 + 1)
-
- self.assertRevisionsContain(revisions)
-
- # TODO: how to check the snapshot id?
- # self.assertSnapshotEqual(SNAPSHOT1)
-
- self.assertEqual(self.loader.load_status(), {'status': 'eventful'})
- self.assertEqual(self.loader.visit_status(), 'full')
-
- # Load with a new merge
- self._git('merge', 'branch1', '-m', 'merge')
- new_revision = self._git('rev-parse', 'master').decode().strip()
-
- assert new_revision not in revisions
- revisions[new_revision] = 'dab8a37df8db8666d4e277bef9a546f585b5bedd'
-
- res = self.load()
- self.assertEqual(res['status'], 'eventful')
-
- self.assertCountContents(4 + 1)
- self.assertCountDirectories(7 + 2)
- self.assertCountReleases(0) # FIXME: why not 2?
- self.assertCountRevisions(7 + 2)
- self.assertCountSnapshots(1 + 1 + 1)
-
- self.assertRevisionsContain(revisions)
-
- # TODO: how to check the snapshot id?
- # self.assertSnapshotEqual(SNAPSHOT1)
-
- self.assertEqual(self.loader.load_status(), {'status': 'eventful'})
- self.assertEqual(self.loader.visit_status(), 'full')
-
-
-class GitLoaderFromArchiveTest(BaseGitLoaderFromArchiveTest, GitLoaderTests):
- """Tests for GitLoaderFromArchive. Imports the common ones
- from GitLoaderTests."""
- pass
+ origin_url=self.repo_url)
diff --git a/swh/loader/git/tests/test_tasks.py b/swh/loader/git/tests/test_tasks.py
--- a/swh/loader/git/tests/test_tasks.py
+++ b/swh/loader/git/tests/test_tasks.py
@@ -18,7 +18,7 @@
task = UpdateGitRepository()
self.assertEqual(task.task_queue, 'swh_loader_git')
- @patch('swh.loader.git.updater.BulkUpdater.load')
+ @patch('swh.loader.git.loader.GitLoader.load')
def test_task(self, mock_loader):
mock_loader.return_value = {'status': 'eventful'}
task = UpdateGitRepository()
@@ -36,7 +36,7 @@
task = LoadDiskGitRepository()
self.assertEqual(task.task_queue, 'swh_loader_git_express')
- @patch('swh.loader.git.loader.GitLoader.load')
+ @patch('swh.loader.git.from_disk.GitLoaderFromDisk.load')
def test_task(self, mock_loader):
mock_loader.return_value = {'status': 'uneventful'}
task = LoadDiskGitRepository()
@@ -56,7 +56,7 @@
task = UncompressAndLoadDiskGitRepository()
self.assertEqual(task.task_queue, 'swh_loader_git_archive')
- @patch('swh.loader.git.loader.GitLoaderFromArchive.load')
+ @patch('swh.loader.git.from_disk.GitLoaderFromArchive.load')
def test_task(self, mock_loader):
mock_loader.return_value = {'status': 'failed'}
task = UncompressAndLoadDiskGitRepository()
diff --git a/swh/loader/git/tests/test_updater.py b/swh/loader/git/tests/test_updater.py
deleted file mode 100644
--- a/swh/loader/git/tests/test_updater.py
+++ /dev/null
@@ -1,22 +0,0 @@
-from swh.loader.git.updater import BulkUpdater
-from swh.loader.git.tests.test_loader import DirGitLoaderTest
-
-
-class BulkUpdaterTest(BulkUpdater):
- def parse_config_file(self, *args, **kwargs):
- return {
- **super().parse_config_file(*args, **kwargs),
- 'storage': {'cls': 'memory', 'args': {}}
- }
-
-
-class TestBulkUpdater(DirGitLoaderTest):
- """Same tests as for the GitLoader, but running on BulkUpdater."""
- def setUp(self):
- super().setUp()
- self.loader = BulkUpdaterTest()
- self.storage = self.loader.storage
-
- def load(self):
- return self.loader.load(
- origin_url=self.repo_url)
diff --git a/swh/loader/git/updater.py b/swh/loader/git/updater.py
deleted file mode 100644
--- a/swh/loader/git/updater.py
+++ /dev/null
@@ -1,515 +0,0 @@
-# Copyright (C) 2016-2018 The Software Heritage developers
-# See the AUTHORS file at the top-level directory of this distribution
-# License: GNU General Public License version 3, or any later version
-# See top-level LICENSE file for more information
-
-import datetime
-import dulwich.client
-import logging
-import os
-import pickle
-import sys
-
-from collections import defaultdict
-from io import BytesIO
-from dulwich.object_store import ObjectStoreGraphWalker
-from dulwich.pack import PackData, PackInflater
-
-from swh.model import hashutil
-from swh.loader.core.loader import UnbufferedLoader
-from swh.storage.algos.snapshot import snapshot_get_all_branches
-from . import converters
-
-
-class SWHRepoRepresentation:
- """Repository representation for a Software Heritage origin."""
- def __init__(self, storage, origin_id, base_snapshot=None,
- ignore_history=False):
- self.storage = storage
-
- self._parents_cache = {}
- self._type_cache = {}
-
- self.ignore_history = ignore_history
-
- if origin_id and not ignore_history:
- self.heads = set(self._cache_heads(origin_id, base_snapshot))
- else:
- self.heads = set()
-
- def _fill_parents_cache(self, commits):
- """When querying for a commit's parents, we fill the cache to a depth of 1000
- commits."""
- root_revs = self._encode_for_storage(commits)
- for rev, parents in self.storage.revision_shortlog(root_revs, 1000):
- rev_id = hashutil.hash_to_bytehex(rev)
- if rev_id not in self._parents_cache:
- self._parents_cache[rev_id] = [
- hashutil.hash_to_bytehex(parent) for parent in parents
- ]
- for rev in commits:
- if rev not in self._parents_cache:
- self._parents_cache[rev] = []
-
- def _cache_heads(self, origin_id, base_snapshot):
- """Return all the known head commits for `origin_id`"""
- _git_types = ['content', 'directory', 'revision', 'release']
-
- if not base_snapshot:
- return []
-
- snapshot_targets = set()
- for target in base_snapshot['branches'].values():
- if target and target['target_type'] in _git_types:
- snapshot_targets.add(target['target'])
-
- decoded_targets = self._decode_from_storage(snapshot_targets)
-
- for id, objs in self.get_stored_objects(decoded_targets).items():
- if not objs:
- logging.warn('Missing head: %s' % hashutil.hash_to_hex(id))
- return []
-
- return decoded_targets
-
- def get_parents(self, commit):
- """Bogus method to prevent expensive recursion, at the expense of less
- efficient downloading"""
- return []
-
- def get_heads(self):
- return self.heads
-
- @staticmethod
- def _encode_for_storage(objects):
- return [hashutil.bytehex_to_hash(object) for object in objects]
-
- @staticmethod
- def _decode_from_storage(objects):
- return set(hashutil.hash_to_bytehex(object) for object in objects)
-
- def graph_walker(self):
- return ObjectStoreGraphWalker(self.get_heads(), self.get_parents)
-
- @staticmethod
- def filter_unwanted_refs(refs):
- """Filter the unwanted references from refs"""
- ret = {}
- for ref, val in refs.items():
- if ref.endswith(b'^{}'):
- # Peeled refs make the git protocol explode
- continue
- elif ref.startswith(b'refs/pull/') and ref.endswith(b'/merge'):
- # We filter-out auto-merged GitHub pull requests
- continue
- else:
- ret[ref] = val
-
- return ret
-
- def determine_wants(self, refs):
- """Filter the remote references to figure out which ones
- Software Heritage needs.
- """
- if not refs:
- return []
-
- # Find what objects Software Heritage has
- refs = self.find_remote_ref_types_in_swh(refs)
-
- # Cache the objects found in swh as existing heads
- for target in refs.values():
- if target['target_type'] is not None:
- self.heads.add(target['target'])
-
- ret = set()
- for target in self.filter_unwanted_refs(refs).values():
- if target['target_type'] is None:
- # The target doesn't exist in Software Heritage, let's retrieve
- # it.
- ret.add(target['target'])
-
- return list(ret)
-
- def get_stored_objects(self, objects):
- """Find which of these objects were stored in the archive.
-
- Do the request in packets to avoid a server timeout.
- """
- if self.ignore_history:
- return {}
-
- packet_size = 1000
-
- ret = {}
- query = []
- for object in objects:
- query.append(object)
- if len(query) >= packet_size:
- ret.update(
- self.storage.object_find_by_sha1_git(
- self._encode_for_storage(query)
- )
- )
- query = []
- if query:
- ret.update(
- self.storage.object_find_by_sha1_git(
- self._encode_for_storage(query)
- )
- )
- return ret
-
- def find_remote_ref_types_in_swh(self, remote_refs):
- """Parse the remote refs information and list the objects that exist in
- Software Heritage.
- """
-
- all_objs = set(remote_refs.values()) - set(self._type_cache)
- type_by_id = {}
-
- for id, objs in self.get_stored_objects(all_objs).items():
- id = hashutil.hash_to_bytehex(id)
- if objs:
- type_by_id[id] = objs[0]['type']
-
- self._type_cache.update(type_by_id)
-
- ret = {}
- for ref, id in remote_refs.items():
- ret[ref] = {
- 'target': id,
- 'target_type': self._type_cache.get(id),
- }
- return ret
-
-
-class BulkUpdater(UnbufferedLoader):
- """A bulk loader for a git repository"""
- CONFIG_BASE_FILENAME = 'loader/git-updater'
-
- ADDITIONAL_CONFIG = {
- 'pack_size_bytes': ('int', 4 * 1024 * 1024 * 1024),
- }
-
- def __init__(self, repo_representation=SWHRepoRepresentation, config=None):
- """Initialize the bulk updater.
-
- Args:
- repo_representation: swh's repository representation
- which is in charge of filtering between known and remote
- data.
-
- """
- super().__init__(logging_class='swh.loader.git.BulkLoader',
- config=config)
- self.repo_representation = repo_representation
-
- def fetch_pack_from_origin(self, origin_url, base_origin_id,
- base_snapshot, do_activity):
- """Fetch a pack from the origin"""
- pack_buffer = BytesIO()
-
- base_repo = self.repo_representation(
- storage=self.storage,
- origin_id=base_origin_id,
- base_snapshot=base_snapshot,
- ignore_history=self.ignore_history,
- )
-
- client, path = dulwich.client.get_transport_and_path(origin_url,
- thin_packs=False)
-
- size_limit = self.config['pack_size_bytes']
-
- def do_pack(data,
- pack_buffer=pack_buffer,
- limit=size_limit,
- origin_url=origin_url):
- cur_size = pack_buffer.tell()
- would_write = len(data)
- if cur_size + would_write > limit:
- raise IOError('Pack file too big for repository %s, '
- 'limit is %d bytes, current size is %d, '
- 'would write %d' %
- (origin_url, limit, cur_size, would_write))
-
- pack_buffer.write(data)
-
- remote_refs = client.fetch_pack(path,
- base_repo.determine_wants,
- base_repo.graph_walker(),
- do_pack,
- progress=do_activity).refs
-
- if remote_refs:
- local_refs = base_repo.find_remote_ref_types_in_swh(remote_refs)
- else:
- local_refs = remote_refs = {}
-
- pack_buffer.flush()
- pack_size = pack_buffer.tell()
- pack_buffer.seek(0)
-
- return {
- 'remote_refs': base_repo.filter_unwanted_refs(remote_refs),
- 'local_refs': local_refs,
- 'pack_buffer': pack_buffer,
- 'pack_size': pack_size,
- }
-
- def list_pack(self, pack_data, pack_size):
- id_to_type = {}
- type_to_ids = defaultdict(set)
-
- inflater = self.get_inflater()
-
- for obj in inflater:
- type, id = obj.type_name, obj.id
- id_to_type[id] = type
- type_to_ids[type].add(id)
-
- return id_to_type, type_to_ids
-
- def prepare_origin_visit(self, origin_url, **kwargs):
- self.visit_date = datetime.datetime.now(tz=datetime.timezone.utc)
- self.origin = converters.origin_url_to_origin(origin_url)
-
- def get_full_snapshot(self, origin_id):
- prev_snapshot = self.storage.snapshot_get_latest(origin_id)
- if prev_snapshot and prev_snapshot.pop('next_branch', None):
- return snapshot_get_all_branches(self.storage, prev_snapshot['id'])
-
- return prev_snapshot
-
- def prepare(self, origin_url, base_url=None, ignore_history=False):
- base_origin_id = origin_id = self.origin_id
-
- prev_snapshot = None
-
- if not ignore_history:
- prev_snapshot = self.get_full_snapshot(origin_id)
-
- if base_url and not prev_snapshot:
- base_origin = converters.origin_url_to_origin(base_url)
- base_origin = self.storage.origin_get(base_origin)
- if base_origin:
- base_origin_id = base_origin['id']
- prev_snapshot = self.get_full_snapshot(base_origin_id)
-
- self.base_snapshot = prev_snapshot
- self.base_origin_id = base_origin_id
- self.ignore_history = ignore_history
-
- def fetch_data(self):
- def do_progress(msg):
- sys.stderr.buffer.write(msg)
- sys.stderr.flush()
-
- fetch_info = self.fetch_pack_from_origin(
- self.origin['url'], self.base_origin_id, self.base_snapshot,
- do_progress)
-
- self.pack_buffer = fetch_info['pack_buffer']
- self.pack_size = fetch_info['pack_size']
-
- self.remote_refs = fetch_info['remote_refs']
- self.local_refs = fetch_info['local_refs']
-
- origin_url = self.origin['url']
-
- self.log.info('Listed %d refs for repo %s' % (
- len(self.remote_refs), origin_url), extra={
- 'swh_type': 'git_repo_list_refs',
- 'swh_repo': origin_url,
- 'swh_num_refs': len(self.remote_refs),
- })
-
- # We want to load the repository, walk all the objects
- id_to_type, type_to_ids = self.list_pack(self.pack_buffer,
- self.pack_size)
-
- self.id_to_type = id_to_type
- self.type_to_ids = type_to_ids
-
- def save_data(self):
- """Store a pack for archival"""
-
- write_size = 8192
- pack_dir = self.get_save_data_path()
-
- pack_name = "%s.pack" % self.visit_date.isoformat()
- refs_name = "%s.refs" % self.visit_date.isoformat()
-
- with open(os.path.join(pack_dir, pack_name), 'xb') as f:
- self.pack_buffer.seek(0)
- while True:
- r = self.pack_buffer.read(write_size)
- if not r:
- break
- f.write(r)
-
- self.pack_buffer.seek(0)
-
- with open(os.path.join(pack_dir, refs_name), 'xb') as f:
- pickle.dump(self.remote_refs, f)
-
- def get_inflater(self):
- """Reset the pack buffer and get an object inflater from it"""
- self.pack_buffer.seek(0)
- return PackInflater.for_pack_data(
- PackData.from_file(self.pack_buffer, self.pack_size))
-
- def has_contents(self):
- return bool(self.type_to_ids[b'blob'])
-
- def get_content_ids(self):
- """Get the content identifiers from the git repository"""
- for raw_obj in self.get_inflater():
- if raw_obj.type_name != b'blob':
- continue
-
- yield converters.dulwich_blob_to_content_id(raw_obj)
-
- def get_contents(self):
- """Format the blobs from the git repository as swh contents"""
- max_content_size = self.config['content_size_limit']
-
- missing_contents = set(self.storage.content_missing(
- self.get_content_ids(), 'sha1_git'))
-
- for raw_obj in self.get_inflater():
- if raw_obj.type_name != b'blob':
- continue
-
- if raw_obj.sha().digest() not in missing_contents:
- continue
-
- yield converters.dulwich_blob_to_content(
- raw_obj, log=self.log, max_content_size=max_content_size,
- origin_id=self.origin_id)
-
- def has_directories(self):
- return bool(self.type_to_ids[b'tree'])
-
- def get_directory_ids(self):
- """Get the directory identifiers from the git repository"""
- return (hashutil.hash_to_bytes(id.decode())
- for id in self.type_to_ids[b'tree'])
-
- def get_directories(self):
- """Format the trees as swh directories"""
- missing_dirs = set(self.storage.directory_missing(
- sorted(self.get_directory_ids())))
-
- for raw_obj in self.get_inflater():
- if raw_obj.type_name != b'tree':
- continue
-
- if raw_obj.sha().digest() not in missing_dirs:
- continue
-
- yield converters.dulwich_tree_to_directory(raw_obj, log=self.log)
-
- def has_revisions(self):
- return bool(self.type_to_ids[b'commit'])
-
- def get_revision_ids(self):
- """Get the revision identifiers from the git repository"""
- return (hashutil.hash_to_bytes(id.decode())
- for id in self.type_to_ids[b'commit'])
-
- def get_revisions(self):
- """Format commits as swh revisions"""
- missing_revs = set(self.storage.revision_missing(
- sorted(self.get_revision_ids())))
-
- for raw_obj in self.get_inflater():
- if raw_obj.type_name != b'commit':
- continue
-
- if raw_obj.sha().digest() not in missing_revs:
- continue
-
- yield converters.dulwich_commit_to_revision(raw_obj, log=self.log)
-
- def has_releases(self):
- return bool(self.type_to_ids[b'tag'])
-
- def get_release_ids(self):
- """Get the release identifiers from the git repository"""
- return (hashutil.hash_to_bytes(id.decode())
- for id in self.type_to_ids[b'tag'])
-
- def get_releases(self):
- """Retrieve all the release objects from the git repository"""
- missing_rels = set(self.storage.release_missing(
- sorted(self.get_release_ids())))
-
- for raw_obj in self.get_inflater():
- if raw_obj.type_name != b'tag':
- continue
-
- if raw_obj.sha().digest() not in missing_rels:
- continue
-
- yield converters.dulwich_tag_to_release(raw_obj, log=self.log)
-
- def get_snapshot(self):
- branches = {}
-
- for ref in self.remote_refs:
- ret_ref = self.local_refs[ref].copy()
- if not ret_ref['target_type']:
- target_type = self.id_to_type[ret_ref['target']]
- ret_ref['target_type'] = converters.DULWICH_TYPES[target_type]
-
- ret_ref['target'] = hashutil.bytehex_to_hash(ret_ref['target'])
-
- branches[ref] = ret_ref
-
- self.snapshot = converters.branches_to_snapshot(branches)
- return self.snapshot
-
- def get_fetch_history_result(self):
- return {
- 'contents': len(self.type_to_ids[b'blob']),
- 'directories': len(self.type_to_ids[b'tree']),
- 'revisions': len(self.type_to_ids[b'commit']),
- 'releases': len(self.type_to_ids[b'tag']),
- }
-
- def load_status(self):
- """The load was eventful if the current snapshot is different to
- the one we retrieved at the beginning of the run"""
- eventful = False
-
- if self.base_snapshot:
- eventful = self.snapshot['id'] != self.base_snapshot['id']
- else:
- eventful = bool(self.snapshot['branches'])
-
- return {'status': ('eventful' if eventful else 'uneventful')}
-
-
-if __name__ == '__main__':
- import click
-
- logging.basicConfig(
- level=logging.DEBUG,
- format='%(asctime)s %(process)d %(message)s'
- )
-
- @click.command()
- @click.option('--origin-url', help='Origin url', required=True)
- @click.option('--base-url', default=None, help='Optional Base url')
- @click.option('--ignore-history/--no-ignore-history',
- help='Ignore the repository history', default=False)
- def main(origin_url, base_url, ignore_history):
- return BulkUpdater().load(
- origin_url,
- base_url=base_url,
- ignore_history=ignore_history,
- )
-
- main()

File Metadata

Mime Type
text/plain
Expires
Thu, Dec 19, 12:37 AM (20 h, 34 m ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3227381

Event Timeline