Page Menu
Home
Software Heritage
Search
Configure Global Search
Log In
Files
F7123340
D834.id2657.diff
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
62 KB
Subscribers
None
D834.id2657.diff
View Options
diff --git a/README.md b/README.md
--- a/README.md
+++ b/README.md
@@ -48,9 +48,12 @@
Configuration
-------------
-You can run the loader or the updater directly by calling:
+You can run the loader from a remote origin (*loader*) or from an
+origin on disk (*from_disk*) directly by calling:
+
+
```
-python3 -m swh.loader.git.{loader,updater}
+python3 -m swh.loader.git.{loader,from_disk}
```
### Location
@@ -66,7 +69,8 @@
### Configuration sample
-$SWH_CONFIG_PATH/loader/git-{loader,updater}.yml:
+Respectively the loader from a remote (`git.yml`) and the loader from
+a disk (`git-disk.yml`), $SWH_CONFIG_PATH/loader/git{-disk}.yml:
```
storage:
cls: remote
diff --git a/swh/loader/git/loader.py b/swh/loader/git/from_disk.py
copy from swh/loader/git/loader.py
copy to swh/loader/git/from_disk.py
--- a/swh/loader/git/loader.py
+++ b/swh/loader/git/from_disk.py
@@ -16,11 +16,12 @@
from . import converters, utils
-class GitLoader(UnbufferedLoader):
+class GitLoaderFromDisk(UnbufferedLoader):
"""Load a git repository from a directory.
+
"""
- CONFIG_BASE_FILENAME = 'loader/git-loader'
+ CONFIG_BASE_FILENAME = 'loader/git-disk'
def __init__(self, config=None):
super().__init__(logging_class='swh.loader.git.Loader', config=config)
@@ -254,7 +255,7 @@
return {'status': ('eventful' if eventful else 'uneventful')}
-class GitLoaderFromArchive(GitLoader):
+class GitLoaderFromArchive(GitLoaderFromDisk):
"""Load a git repository from an archive.
This loader ingests a git repository compressed into an archive.
@@ -316,8 +317,8 @@
def prepare(self, origin_url, archive_path, visit_date):
"""1. Uncompress the archive in temporary location.
- 2. Prepare as the GitLoader does
- 3. Load as GitLoader does
+ 2. Prepare as the GitLoaderFromDisk does
+ 3. Load as GitLoaderFromDisk does
"""
project_name = self.project_name_from_archive(archive_path)
@@ -355,6 +356,6 @@
if not visit_date:
visit_date = datetime.datetime.now(tz=datetime.timezone.utc)
- return GitLoader().load(origin_url, git_directory, visit_date)
+ return GitLoaderFromDisk().load(origin_url, git_directory, visit_date)
main()
diff --git a/swh/loader/git/loader.py b/swh/loader/git/loader.py
--- a/swh/loader/git/loader.py
+++ b/swh/loader/git/loader.py
@@ -1,164 +1,395 @@
-# Copyright (C) 2015-2018 The Software Heritage developers
+# Copyright (C) 2016-2018 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import datetime
-import dulwich.repo
+import dulwich.client
+import logging
import os
-import shutil
+import pickle
+import sys
-from dulwich.errors import ObjectFormatException, EmptyFileException
from collections import defaultdict
+from io import BytesIO
+from dulwich.object_store import ObjectStoreGraphWalker
+from dulwich.pack import PackData, PackInflater
from swh.model import hashutil
from swh.loader.core.loader import UnbufferedLoader
-from . import converters, utils
+from swh.storage.algos.snapshot import snapshot_get_all_branches
+from . import converters
-class GitLoader(UnbufferedLoader):
- """Load a git repository from a directory.
- """
+class RepoRepresentation:
+ """Repository representation for a Software Heritage origin."""
+ def __init__(self, storage, origin_id, base_snapshot=None,
+ ignore_history=False):
+ self.storage = storage
- CONFIG_BASE_FILENAME = 'loader/git-loader'
+ self._parents_cache = {}
+ self._type_cache = {}
- def __init__(self, config=None):
- super().__init__(logging_class='swh.loader.git.Loader', config=config)
+ self.ignore_history = ignore_history
- def _prepare_origin_visit(self, origin_url, visit_date):
- self.origin_url = origin_url
- self.origin = converters.origin_url_to_origin(self.origin_url)
- self.visit_date = visit_date
+ if origin_id and not ignore_history:
+ self.heads = set(self._cache_heads(origin_id, base_snapshot))
+ else:
+ self.heads = set()
+
+ def _fill_parents_cache(self, commits):
+ """When querying for a commit's parents, we fill the cache to a depth of 1000
+ commits."""
+ root_revs = self._encode_for_storage(commits)
+ for rev, parents in self.storage.revision_shortlog(root_revs, 1000):
+ rev_id = hashutil.hash_to_bytehex(rev)
+ if rev_id not in self._parents_cache:
+ self._parents_cache[rev_id] = [
+ hashutil.hash_to_bytehex(parent) for parent in parents
+ ]
+ for rev in commits:
+ if rev not in self._parents_cache:
+ self._parents_cache[rev] = []
+
+ def _cache_heads(self, origin_id, base_snapshot):
+ """Return all the known head commits for `origin_id`"""
+ _git_types = ['content', 'directory', 'revision', 'release']
+
+ if not base_snapshot:
+ return []
+
+ snapshot_targets = set()
+ for target in base_snapshot['branches'].values():
+ if target and target['target_type'] in _git_types:
+ snapshot_targets.add(target['target'])
+
+ decoded_targets = self._decode_from_storage(snapshot_targets)
+
+ for id, objs in self.get_stored_objects(decoded_targets).items():
+ if not objs:
+ logging.warn('Missing head: %s' % hashutil.hash_to_hex(id))
+ return []
+
+ return decoded_targets
+
+ def get_parents(self, commit):
+ """Bogus method to prevent expensive recursion, at the expense of less
+ efficient downloading"""
+ return []
+
+ def get_heads(self):
+ return self.heads
+
+ @staticmethod
+ def _encode_for_storage(objects):
+ return [hashutil.bytehex_to_hash(object) for object in objects]
+
+ @staticmethod
+ def _decode_from_storage(objects):
+ return set(hashutil.hash_to_bytehex(object) for object in objects)
+
+ def graph_walker(self):
+ return ObjectStoreGraphWalker(self.get_heads(), self.get_parents)
+
+ @staticmethod
+ def filter_unwanted_refs(refs):
+ """Filter the unwanted references from refs"""
+ ret = {}
+ for ref, val in refs.items():
+ if ref.endswith(b'^{}'):
+ # Peeled refs make the git protocol explode
+ continue
+ elif ref.startswith(b'refs/pull/') and ref.endswith(b'/merge'):
+ # We filter-out auto-merged GitHub pull requests
+ continue
+ else:
+ ret[ref] = val
- def prepare_origin_visit(self, origin_url, directory, visit_date):
- self._prepare_origin_visit(origin_url, visit_date)
+ return ret
- def prepare(self, origin_url, directory, visit_date):
- self.repo = dulwich.repo.Repo(directory)
+ def determine_wants(self, refs):
+ """Filter the remote references to figure out which ones
+ Software Heritage needs.
+ """
+ if not refs:
+ return []
- def iter_objects(self):
- object_store = self.repo.object_store
+ # Find what objects Software Heritage has
+ refs = self.find_remote_ref_types_in_swh(refs)
- for pack in object_store.packs:
- objs = list(pack.index.iterentries())
- objs.sort(key=lambda x: x[1])
- for sha, offset, crc32 in objs:
- yield hashutil.hash_to_bytehex(sha)
+ # Cache the objects found in swh as existing heads
+ for target in refs.values():
+ if target['target_type'] is not None:
+ self.heads.add(target['target'])
- yield from object_store._iter_loose_objects()
- yield from object_store._iter_alternate_objects()
+ ret = set()
+ for target in self.filter_unwanted_refs(refs).values():
+ if target['target_type'] is None:
+ # The target doesn't exist in Software Heritage, let's retrieve
+ # it.
+ ret.add(target['target'])
- def _check(self, obj):
- """Check the object's repository representation.
+ return list(ret)
- If any errors in check exists, an ObjectFormatException is
- raised.
-
- Args:
- obj (object): Dulwich object read from the repository.
+ def get_stored_objects(self, objects):
+ """Find which of these objects were stored in the archive.
+ Do the request in packets to avoid a server timeout.
+ """
+ if self.ignore_history:
+ return {}
+
+ packet_size = 1000
+
+ ret = {}
+ query = []
+ for object in objects:
+ query.append(object)
+ if len(query) >= packet_size:
+ ret.update(
+ self.storage.object_find_by_sha1_git(
+ self._encode_for_storage(query)
+ )
+ )
+ query = []
+ if query:
+ ret.update(
+ self.storage.object_find_by_sha1_git(
+ self._encode_for_storage(query)
+ )
+ )
+ return ret
+
+ def find_remote_ref_types_in_swh(self, remote_refs):
+ """Parse the remote refs information and list the objects that exist in
+ Software Heritage.
"""
- obj.check()
- from dulwich.objects import Commit, Tag
- try:
- # For additional checks on dulwich objects with date
- # for now, only checks on *time
- if isinstance(obj, Commit):
- commit_time = obj._commit_time
- utils.check_date_time(commit_time)
- author_time = obj._author_time
- utils.check_date_time(author_time)
- elif isinstance(obj, Tag):
- tag_time = obj._tag_time
- utils.check_date_time(tag_time)
- except Exception as e:
- raise ObjectFormatException(e)
-
- def get_object(self, oid):
- """Given an object id, return the object if it is found and not
- malformed in some way.
- Args:
- oid (bytes): the object's identifier
+ all_objs = set(remote_refs.values()) - set(self._type_cache)
+ type_by_id = {}
+
+ for id, objs in self.get_stored_objects(all_objs).items():
+ id = hashutil.hash_to_bytehex(id)
+ if objs:
+ type_by_id[id] = objs[0]['type']
+
+ self._type_cache.update(type_by_id)
+
+ ret = {}
+ for ref, id in remote_refs.items():
+ ret[ref] = {
+ 'target': id,
+ 'target_type': self._type_cache.get(id),
+ }
+ return ret
- Returns:
- The object if found without malformation
+
+class GitLoader(UnbufferedLoader):
+ """A bulk loader for a git repository"""
+ CONFIG_BASE_FILENAME = 'loader/git'
+
+ ADDITIONAL_CONFIG = {
+ 'pack_size_bytes': ('int', 4 * 1024 * 1024 * 1024),
+ }
+
+ def __init__(self, repo_representation=RepoRepresentation, config=None):
+ """Initialize the bulk updater.
+
+ Args:
+ repo_representation: swh's repository representation
+ which is in charge of filtering between known and remote
+ data.
"""
- try:
- # some errors are raised when reading the object
- obj = self.repo[oid]
- # some we need to check ourselves
- self._check(obj)
- except KeyError:
- _id = oid.decode('utf-8')
- self.log.warn('object %s not found, skipping' % _id,
- extra={
- 'swh_type': 'swh_loader_git_missing_object',
- 'swh_object_id': _id,
- 'origin_id': self.origin_id,
- })
- return None
- except ObjectFormatException:
- _id = oid.decode('utf-8')
- self.log.warn('object %s malformed, skipping' % _id,
- extra={
- 'swh_type': 'swh_loader_git_missing_object',
- 'swh_object_id': _id,
- 'origin_id': self.origin_id,
- })
- return None
- except EmptyFileException:
- _id = oid.decode('utf-8')
- self.log.warn('object %s corrupted (empty file), skipping' % _id,
- extra={
- 'swh_type': 'swh_loader_git_missing_object',
- 'swh_object_id': _id,
- 'origin_id': self.origin_id,
- })
+ super().__init__(logging_class='swh.loader.git.BulkLoader',
+ config=config)
+ self.repo_representation = repo_representation
+
+ def fetch_pack_from_origin(self, origin_url, base_origin_id,
+ base_snapshot, do_activity):
+ """Fetch a pack from the origin"""
+ pack_buffer = BytesIO()
+
+ base_repo = self.repo_representation(
+ storage=self.storage,
+ origin_id=base_origin_id,
+ base_snapshot=base_snapshot,
+ ignore_history=self.ignore_history,
+ )
+
+ client, path = dulwich.client.get_transport_and_path(origin_url,
+ thin_packs=False)
+
+ size_limit = self.config['pack_size_bytes']
+
+ def do_pack(data,
+ pack_buffer=pack_buffer,
+ limit=size_limit,
+ origin_url=origin_url):
+ cur_size = pack_buffer.tell()
+ would_write = len(data)
+ if cur_size + would_write > limit:
+ raise IOError('Pack file too big for repository %s, '
+ 'limit is %d bytes, current size is %d, '
+ 'would write %d' %
+ (origin_url, limit, cur_size, would_write))
+
+ pack_buffer.write(data)
+
+ remote_refs = client.fetch_pack(path,
+ base_repo.determine_wants,
+ base_repo.graph_walker(),
+ do_pack,
+ progress=do_activity).refs
+
+ if remote_refs:
+ local_refs = base_repo.find_remote_ref_types_in_swh(remote_refs)
else:
- return obj
+ local_refs = remote_refs = {}
+
+ pack_buffer.flush()
+ pack_size = pack_buffer.tell()
+ pack_buffer.seek(0)
+
+ return {
+ 'remote_refs': base_repo.filter_unwanted_refs(remote_refs),
+ 'local_refs': local_refs,
+ 'pack_buffer': pack_buffer,
+ 'pack_size': pack_size,
+ }
+
+ def list_pack(self, pack_data, pack_size):
+ id_to_type = {}
+ type_to_ids = defaultdict(set)
+
+ inflater = self.get_inflater()
+
+ for obj in inflater:
+ type, id = obj.type_name, obj.id
+ id_to_type[id] = type
+ type_to_ids[type].add(id)
+
+ return id_to_type, type_to_ids
+
+ def prepare_origin_visit(self, origin_url, **kwargs):
+ self.visit_date = datetime.datetime.now(tz=datetime.timezone.utc)
+ self.origin = converters.origin_url_to_origin(origin_url)
+
+ def get_full_snapshot(self, origin_id):
+ prev_snapshot = self.storage.snapshot_get_latest(origin_id)
+ if prev_snapshot and prev_snapshot.pop('next_branch', None):
+ return snapshot_get_all_branches(self.storage, prev_snapshot['id'])
+
+ return prev_snapshot
+
+ def prepare(self, origin_url, base_url=None, ignore_history=False):
+ base_origin_id = origin_id = self.origin_id
+
+ prev_snapshot = None
+
+ if not ignore_history:
+ prev_snapshot = self.get_full_snapshot(origin_id)
+
+ if base_url and not prev_snapshot:
+ base_origin = converters.origin_url_to_origin(base_url)
+ base_origin = self.storage.origin_get(base_origin)
+ if base_origin:
+ base_origin_id = base_origin['id']
+ prev_snapshot = self.get_full_snapshot(base_origin_id)
+
+ self.base_snapshot = prev_snapshot
+ self.base_origin_id = base_origin_id
+ self.ignore_history = ignore_history
def fetch_data(self):
- """Fetch the data from the data source"""
- self.previous_snapshot = self.storage.snapshot_get_latest(
- self.origin_id
- )
+ def do_progress(msg):
+ sys.stderr.buffer.write(msg)
+ sys.stderr.flush()
- type_to_ids = defaultdict(list)
- for oid in self.iter_objects():
- obj = self.get_object(oid)
- if not obj:
- continue
- type_name = obj.type_name
- type_to_ids[type_name].append(oid)
+ fetch_info = self.fetch_pack_from_origin(
+ self.origin['url'], self.base_origin_id, self.base_snapshot,
+ do_progress)
+
+ self.pack_buffer = fetch_info['pack_buffer']
+ self.pack_size = fetch_info['pack_size']
+
+ self.remote_refs = fetch_info['remote_refs']
+ self.local_refs = fetch_info['local_refs']
+
+ origin_url = self.origin['url']
+
+ self.log.info('Listed %d refs for repo %s' % (
+ len(self.remote_refs), origin_url), extra={
+ 'swh_type': 'git_repo_list_refs',
+ 'swh_repo': origin_url,
+ 'swh_num_refs': len(self.remote_refs),
+ })
+ # We want to load the repository, walk all the objects
+ id_to_type, type_to_ids = self.list_pack(self.pack_buffer,
+ self.pack_size)
+
+ self.id_to_type = id_to_type
self.type_to_ids = type_to_ids
+ def save_data(self):
+ """Store a pack for archival"""
+
+ write_size = 8192
+ pack_dir = self.get_save_data_path()
+
+ pack_name = "%s.pack" % self.visit_date.isoformat()
+ refs_name = "%s.refs" % self.visit_date.isoformat()
+
+ with open(os.path.join(pack_dir, pack_name), 'xb') as f:
+ self.pack_buffer.seek(0)
+ while True:
+ r = self.pack_buffer.read(write_size)
+ if not r:
+ break
+ f.write(r)
+
+ self.pack_buffer.seek(0)
+
+ with open(os.path.join(pack_dir, refs_name), 'xb') as f:
+ pickle.dump(self.remote_refs, f)
+
+ def get_inflater(self):
+ """Reset the pack buffer and get an object inflater from it"""
+ self.pack_buffer.seek(0)
+ return PackInflater.for_pack_data(
+ PackData.from_file(self.pack_buffer, self.pack_size))
+
def has_contents(self):
- """Checks whether we need to load contents"""
return bool(self.type_to_ids[b'blob'])
def get_content_ids(self):
"""Get the content identifiers from the git repository"""
- for oid in self.type_to_ids[b'blob']:
- yield converters.dulwich_blob_to_content_id(self.repo[oid])
+ for raw_obj in self.get_inflater():
+ if raw_obj.type_name != b'blob':
+ continue
+
+ yield converters.dulwich_blob_to_content_id(raw_obj)
def get_contents(self):
- """Get the contents that need to be loaded"""
+ """Format the blobs from the git repository as swh contents"""
max_content_size = self.config['content_size_limit']
missing_contents = set(self.storage.content_missing(
self.get_content_ids(), 'sha1_git'))
- for oid in missing_contents:
+ for raw_obj in self.get_inflater():
+ if raw_obj.type_name != b'blob':
+ continue
+
+ if raw_obj.sha().digest() not in missing_contents:
+ continue
+
yield converters.dulwich_blob_to_content(
- self.repo[hashutil.hash_to_bytehex(oid)], log=self.log,
- max_content_size=max_content_size,
+ raw_obj, log=self.log, max_content_size=max_content_size,
origin_id=self.origin_id)
def has_directories(self):
- """Checks whether we need to load directories"""
return bool(self.type_to_ids[b'tree'])
def get_directory_ids(self):
@@ -167,16 +398,20 @@
for id in self.type_to_ids[b'tree'])
def get_directories(self):
- """Get the directories that need to be loaded"""
+ """Format the trees as swh directories"""
missing_dirs = set(self.storage.directory_missing(
sorted(self.get_directory_ids())))
- for oid in missing_dirs:
- yield converters.dulwich_tree_to_directory(
- self.repo[hashutil.hash_to_bytehex(oid)], log=self.log)
+ for raw_obj in self.get_inflater():
+ if raw_obj.type_name != b'tree':
+ continue
+
+ if raw_obj.sha().digest() not in missing_dirs:
+ continue
+
+ yield converters.dulwich_tree_to_directory(raw_obj, log=self.log)
def has_revisions(self):
- """Checks whether we need to load revisions"""
return bool(self.type_to_ids[b'commit'])
def get_revision_ids(self):
@@ -185,16 +420,20 @@
for id in self.type_to_ids[b'commit'])
def get_revisions(self):
- """Get the revisions that need to be loaded"""
+ """Format commits as swh revisions"""
missing_revs = set(self.storage.revision_missing(
sorted(self.get_revision_ids())))
- for oid in missing_revs:
- yield converters.dulwich_commit_to_revision(
- self.repo[hashutil.hash_to_bytehex(oid)], log=self.log)
+ for raw_obj in self.get_inflater():
+ if raw_obj.type_name != b'commit':
+ continue
+
+ if raw_obj.sha().digest() not in missing_revs:
+ continue
+
+ yield converters.dulwich_commit_to_revision(raw_obj, log=self.log)
def has_releases(self):
- """Checks whether we need to load releases"""
return bool(self.type_to_ids[b'tag'])
def get_release_ids(self):
@@ -203,33 +442,36 @@
for id in self.type_to_ids[b'tag'])
def get_releases(self):
- """Get the releases that need to be loaded"""
+ """Retrieve all the release objects from the git repository"""
missing_rels = set(self.storage.release_missing(
sorted(self.get_release_ids())))
- for oid in missing_rels:
- yield converters.dulwich_tag_to_release(
- self.repo[hashutil.hash_to_bytehex(oid)], log=self.log)
+ for raw_obj in self.get_inflater():
+ if raw_obj.type_name != b'tag':
+ continue
+
+ if raw_obj.sha().digest() not in missing_rels:
+ continue
+
+ yield converters.dulwich_tag_to_release(raw_obj, log=self.log)
def get_snapshot(self):
- """Turn the list of branches into a snapshot to load"""
branches = {}
- for ref, target in self.repo.refs.as_dict().items():
- obj = self.get_object(target)
- if obj:
- branches[ref] = {
- 'target': hashutil.bytehex_to_hash(target),
- 'target_type': converters.DULWICH_TYPES[obj.type_name],
- }
- else:
- branches[ref] = None
+ for ref in self.remote_refs:
+ ret_ref = self.local_refs[ref].copy()
+ if not ret_ref['target_type']:
+ target_type = self.id_to_type[ret_ref['target']]
+ ret_ref['target_type'] = converters.DULWICH_TYPES[target_type]
+
+ ret_ref['target'] = hashutil.bytehex_to_hash(ret_ref['target'])
+
+ branches[ref] = ret_ref
self.snapshot = converters.branches_to_snapshot(branches)
return self.snapshot
def get_fetch_history_result(self):
- """Return the data to store in fetch_history for the current loader"""
return {
'contents': len(self.type_to_ids[b'blob']),
'directories': len(self.type_to_ids[b'tree']),
@@ -237,110 +479,21 @@
'releases': len(self.type_to_ids[b'tag']),
}
- def save_data(self):
- """We already have the data locally, no need to save it"""
- pass
-
def load_status(self):
- """The load was eventful if the current occurrences are different to
- the ones we retrieved at the beginning of the run"""
+ """The load was eventful if the current snapshot is different to
+ the one we retrieved at the beginning of the run"""
eventful = False
- if self.previous_snapshot:
- eventful = self.snapshot['id'] != self.previous_snapshot['id']
+ if self.base_snapshot:
+ eventful = self.snapshot['id'] != self.base_snapshot['id']
else:
eventful = bool(self.snapshot['branches'])
return {'status': ('eventful' if eventful else 'uneventful')}
-class GitLoaderFromArchive(GitLoader):
- """Load a git repository from an archive.
-
- This loader ingests a git repository compressed into an archive.
- The supported archive formats are ``.zip`` and ``.tar.gz``.
-
- From an input tarball named ``my-git-repo.zip``, the following layout is
- expected in it::
-
- my-git-repo/
- ├── .git
- │ ├── branches
- │ ├── COMMIT_EDITMSG
- │ ├── config
- │ ├── description
- │ ├── HEAD
- ...
-
- Nevertheless, the loader is able to ingest tarballs with the following
- layouts too::
-
- .
- ├── .git
- │ ├── branches
- │ ├── COMMIT_EDITMSG
- │ ├── config
- │ ├── description
- │ ├── HEAD
- ...
-
- or::
-
- other-repo-name/
- ├── .git
- │ ├── branches
- │ ├── COMMIT_EDITMSG
- │ ├── config
- │ ├── description
- │ ├── HEAD
- ...
-
- """
- def __init__(self, *args, **kwargs):
- super().__init__(*args, **kwargs)
- self.temp_dir = self.repo_path = None
-
- def project_name_from_archive(self, archive_path):
- """Compute the project name from the archive's path.
-
- """
- archive_name = os.path.basename(archive_path)
- for ext in ('.zip', '.tar.gz', '.tgz'):
- if archive_name.lower().endswith(ext):
- archive_name = archive_name[:-len(ext)]
- break
- return archive_name
-
- def prepare_origin_visit(self, origin_url, archive_path, visit_date):
- self._prepare_origin_visit(origin_url, visit_date)
-
- def prepare(self, origin_url, archive_path, visit_date):
- """1. Uncompress the archive in temporary location.
- 2. Prepare as the GitLoader does
- 3. Load as GitLoader does
-
- """
- project_name = self.project_name_from_archive(archive_path)
- self.temp_dir, self.repo_path = utils.init_git_repo_from_archive(
- project_name, archive_path)
-
- self.log.info('Project %s - Uncompressing archive %s at %s' % (
- origin_url, os.path.basename(archive_path), self.repo_path))
- super().prepare(origin_url, self.repo_path, visit_date)
-
- def cleanup(self):
- """Cleanup the temporary location (if it exists).
-
- """
- if self.temp_dir and os.path.exists(self.temp_dir):
- shutil.rmtree(self.temp_dir)
- self.log.info('Project %s - Done injecting %s' % (
- self.origin_url, self.repo_path))
-
-
if __name__ == '__main__':
import click
- import logging
logging.basicConfig(
level=logging.DEBUG,
@@ -348,13 +501,15 @@
)
@click.command()
- @click.option('--origin-url', help='origin url')
- @click.option('--git-directory', help='Path to git repository to load')
- @click.option('--visit-date', default=None, help='Visit date')
- def main(origin_url, git_directory, visit_date):
- if not visit_date:
- visit_date = datetime.datetime.now(tz=datetime.timezone.utc)
-
- return GitLoader().load(origin_url, git_directory, visit_date)
+ @click.option('--origin-url', help='Origin url', required=True)
+ @click.option('--base-url', default=None, help='Optional Base url')
+ @click.option('--ignore-history/--no-ignore-history',
+ help='Ignore the repository history', default=False)
+ def main(origin_url, base_url, ignore_history):
+ return GitLoader().load(
+ origin_url,
+ base_url=base_url,
+ ignore_history=ignore_history,
+ )
main()
diff --git a/swh/loader/git/tasks.py b/swh/loader/git/tasks.py
--- a/swh/loader/git/tasks.py
+++ b/swh/loader/git/tasks.py
@@ -1,4 +1,4 @@
-# Copyright (C) 2015-2017 The Software Heritage developers
+# Copyright (C) 2015-2018 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
@@ -7,8 +7,8 @@
from swh.scheduler.task import Task
-from .loader import GitLoader, GitLoaderFromArchive
-from .updater import BulkUpdater
+from .from_disk import GitLoaderFromDisk, GitLoaderFromArchive
+from .loader import GitLoader
# TODO: rename to LoadRemoteGitRepository
@@ -18,7 +18,7 @@
def run_task(self, repo_url, base_url=None):
"""Import a git repository"""
- loader = BulkUpdater()
+ loader = GitLoader()
loader.log = self.log
return loader.load(repo_url, base_url=base_url)
@@ -32,7 +32,7 @@
"""Import a git repository, cloned in `directory` from `origin_url` at
`date`."""
- loader = GitLoader()
+ loader = GitLoaderFromDisk()
loader.log = self.log
return loader.load(origin_url, directory, dateutil.parser.parse(date))
diff --git a/swh/loader/git/tests/test_loader.py b/swh/loader/git/tests/test_from_disk.py
copy from swh/loader/git/tests/test_loader.py
copy to swh/loader/git/tests/test_from_disk.py
--- a/swh/loader/git/tests/test_loader.py
+++ b/swh/loader/git/tests/test_from_disk.py
@@ -6,7 +6,8 @@
import os.path
import subprocess
-from swh.loader.git.loader import GitLoader, GitLoaderFromArchive
+
+from swh.loader.git.from_disk import GitLoaderFromDisk, GitLoaderFromArchive
from swh.loader.core.tests import BaseLoaderTest
from . import TEST_LOADER_CONFIG
@@ -93,7 +94,7 @@
}
-class BaseGitLoaderTest(BaseLoaderTest):
+class BaseGitLoaderFromDiskTest(BaseLoaderTest):
def setUp(self, archive_name, uncompress_archive, filename='testrepo'):
super().setUp(archive_name=archive_name, filename=filename,
prefix_tmp_folder_name='swh.loader.git.',
@@ -101,12 +102,12 @@
uncompress_archive=uncompress_archive)
-class GitLoaderTest(GitLoader):
+class GitLoaderFromDiskTest(GitLoaderFromDisk):
def parse_config_file(self, *args, **kwargs):
return TEST_LOADER_CONFIG
-class BaseDirGitLoaderTest(BaseGitLoaderTest):
+class BaseDirGitLoaderFromDiskTest(BaseGitLoaderFromDiskTest):
"""Mixin base loader test to prepare the git
repository to uncompress, load and test the results.
@@ -115,7 +116,7 @@
"""
def setUp(self):
super().setUp('testrepo.tgz', uncompress_archive=True)
- self.loader = GitLoaderTest()
+ self.loader = GitLoaderFromDiskTest()
self.storage = self.loader.storage
def load(self):
@@ -125,7 +126,7 @@
directory=self.destination_path)
-class BaseGitLoaderFromArchiveTest(BaseGitLoaderTest):
+class BaseGitLoaderFromArchiveTest(BaseGitLoaderFromDiskTest):
"""Mixin base loader test to prepare the git
repository to uncompress, load and test the results.
@@ -144,7 +145,7 @@
archive_path=self.destination_path)
-class GitLoaderTests:
+class GitLoaderFromDiskTests:
"""Common tests for all git loaders."""
def test_load(self):
"""Loads a simple repository (made available by `setUp()`),
@@ -176,8 +177,8 @@
self.assertCountSnapshots(1)
-class DirGitLoaderTest(BaseDirGitLoaderTest, GitLoaderTests):
- """Tests for the GitLoader. Includes the common ones, and
+class DirGitLoaderTest(BaseDirGitLoaderFromDiskTest, GitLoaderFromDiskTests):
+ """Tests for the GitLoaderFromDisk. Includes the common ones, and
add others that only work with a local dir."""
def _git(self, *cmd):
@@ -255,7 +256,8 @@
self.assertEqual(self.loader.visit_status(), 'full')
-class GitLoaderFromArchiveTest(BaseGitLoaderFromArchiveTest, GitLoaderTests):
+class GitLoaderFromArchiveTest(BaseGitLoaderFromArchiveTest,
+ GitLoaderFromDiskTests):
"""Tests for GitLoaderFromArchive. Imports the common ones
from GitLoaderTests."""
pass
diff --git a/swh/loader/git/tests/test_loader.py b/swh/loader/git/tests/test_loader.py
--- a/swh/loader/git/tests/test_loader.py
+++ b/swh/loader/git/tests/test_loader.py
@@ -3,259 +3,26 @@
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
-import os.path
-import subprocess
-from swh.loader.git.loader import GitLoader, GitLoaderFromArchive
-from swh.loader.core.tests import BaseLoaderTest
-
-from . import TEST_LOADER_CONFIG
-
-
-class GitLoaderFromArchive(GitLoaderFromArchive):
- def project_name_from_archive(self, archive_path):
- # We don't want the project name to be 'resources'.
- return 'testrepo'
-
- def parse_config_file(self, *args, **kwargs):
- return TEST_LOADER_CONFIG
-
-
-CONTENT1 = {
- '33ab5639bfd8e7b95eb1d8d0b87781d4ffea4d5d', # README v1
- '349c4ff7d21f1ec0eda26f3d9284c293e3425417', # README v2
- '799c11e348d39f1704022b8354502e2f81f3c037', # file1.txt
- '4bdb40dfd6ec75cb730e678b5d7786e30170c5fb', # file2.txt
-}
-
-SNAPSHOT_ID = 'bdf3b06d6017e0d9ad6447a73da6ff1ae9efb8f0'
-
-SNAPSHOT1 = {
- 'id': SNAPSHOT_ID,
- 'branches': {
- 'HEAD': {
- 'target': '2f01f5ca7e391a2f08905990277faf81e709a649',
- 'target_type': 'revision',
- },
- 'refs/heads/master': {
- 'target': '2f01f5ca7e391a2f08905990277faf81e709a649',
- 'target_type': 'revision',
- },
- 'refs/heads/branch1': {
- 'target': 'b0a77609903f767a2fd3d769904ef9ef68468b87',
- 'target_type': 'revision',
- },
- 'refs/heads/branch2': {
- 'target': 'bd746cd1913721b269b395a56a97baf6755151c2',
- 'target_type': 'revision',
- },
- 'refs/tags/branch2-after-delete': {
- 'target': 'bd746cd1913721b269b395a56a97baf6755151c2',
- 'target_type': 'revision',
- },
- 'refs/tags/branch2-before-delete': {
- 'target': '1135e94ccf73b5f9bd6ef07b3fa2c5cc60bba69b',
- 'target_type': 'revision',
- },
- },
-}
-
-# directory hashes obtained with:
-# gco b6f40292c4e94a8f7e7b4aff50e6c7429ab98e2a
-# swh-hashtree --ignore '.git' --path .
-# gco 2f01f5ca7e391a2f08905990277faf81e709a649
-# swh-hashtree --ignore '.git' --path .
-# gco bcdc5ebfde1a3cd6c96e0c2ea4eed19c13208777
-# swh-hashtree --ignore '.git' --path .
-# gco 1135e94ccf73b5f9bd6ef07b3fa2c5cc60bba69b
-# swh-hashtree --ignore '.git' --path .
-# gco 79f65ac75f79dda6ff03d66e1242702ab67fb51c
-# swh-hashtree --ignore '.git' --path .
-# gco b0a77609903f767a2fd3d769904ef9ef68468b87
-# swh-hashtree --ignore '.git' --path .
-# gco bd746cd1913721b269b395a56a97baf6755151c2
-# swh-hashtree --ignore '.git' --path .
-REVISIONS1 = {
- 'b6f40292c4e94a8f7e7b4aff50e6c7429ab98e2a':
- '40dbdf55dfd4065422462cc74a949254aefa972e',
- '2f01f5ca7e391a2f08905990277faf81e709a649':
- 'e1d0d894835f91a0f887a4bc8b16f81feefdfbd5',
- 'bcdc5ebfde1a3cd6c96e0c2ea4eed19c13208777':
- 'b43724545b4759244bb54be053c690649161411c',
- '1135e94ccf73b5f9bd6ef07b3fa2c5cc60bba69b':
- 'fbf70528223d263661b5ad4b80f26caf3860eb8e',
- '79f65ac75f79dda6ff03d66e1242702ab67fb51c':
- '5df34ec74d6f69072d9a0a6677d8efbed9b12e60',
- 'b0a77609903f767a2fd3d769904ef9ef68468b87':
- '9ca0c7d6ffa3f9f0de59fd7912e08f11308a1338',
- 'bd746cd1913721b269b395a56a97baf6755151c2':
- 'e1d0d894835f91a0f887a4bc8b16f81feefdfbd5',
-}
-
-
-class BaseGitLoaderTest(BaseLoaderTest):
- def setUp(self, archive_name, uncompress_archive, filename='testrepo'):
- super().setUp(archive_name=archive_name, filename=filename,
- prefix_tmp_folder_name='swh.loader.git.',
- start_path=os.path.dirname(__file__),
- uncompress_archive=uncompress_archive)
+from swh.loader.git.loader import GitLoader
+from swh.loader.git.tests.test_from_disk import DirGitLoaderTest
class GitLoaderTest(GitLoader):
def parse_config_file(self, *args, **kwargs):
- return TEST_LOADER_CONFIG
-
-
-class BaseDirGitLoaderTest(BaseGitLoaderTest):
- """Mixin base loader test to prepare the git
- repository to uncompress, load and test the results.
+ return {
+ **super().parse_config_file(*args, **kwargs),
+ 'storage': {'cls': 'memory', 'args': {}}
+ }
- This sets up
- """
+class TestGitLoader(DirGitLoaderTest):
+ """Same tests as for the GitLoaderFromDisk, but running on GitLoader."""
def setUp(self):
- super().setUp('testrepo.tgz', uncompress_archive=True)
+ super().setUp()
self.loader = GitLoaderTest()
self.storage = self.loader.storage
def load(self):
return self.loader.load(
- origin_url=self.repo_url,
- visit_date='2016-05-03 15:16:32+00',
- directory=self.destination_path)
-
-
-class BaseGitLoaderFromArchiveTest(BaseGitLoaderTest):
- """Mixin base loader test to prepare the git
- repository to uncompress, load and test the results.
-
- This sets up
-
- """
- def setUp(self):
- super().setUp('testrepo.tgz', uncompress_archive=False)
- self.loader = GitLoaderFromArchive()
- self.storage = self.loader.storage
-
- def load(self):
- return self.loader.load(
- origin_url=self.repo_url,
- visit_date='2016-05-03 15:16:32+00',
- archive_path=self.destination_path)
-
-
-class GitLoaderTests:
- """Common tests for all git loaders."""
- def test_load(self):
- """Loads a simple repository (made available by `setUp()`),
- and checks everything was added in the storage."""
- res = self.load()
- self.assertEqual(res['status'], 'eventful', res)
-
- self.assertContentsContain(CONTENT1)
- self.assertCountDirectories(7)
- self.assertCountReleases(0) # FIXME: why not 2?
- self.assertCountRevisions(7)
- self.assertCountSnapshots(1)
-
- self.assertRevisionsContain(REVISIONS1)
-
- self.assertSnapshotEqual(SNAPSHOT1)
-
- self.assertEqual(self.loader.load_status(), {'status': 'eventful'})
- self.assertEqual(self.loader.visit_status(), 'full')
-
- def test_load_unchanged(self):
- """Checks loading a repository a second time does not add
- any extra data."""
- res = self.load()
- self.assertEqual(res['status'], 'eventful')
-
- res = self.load()
- self.assertEqual(res['status'], 'uneventful')
- self.assertCountSnapshots(1)
-
-
-class DirGitLoaderTest(BaseDirGitLoaderTest, GitLoaderTests):
- """Tests for the GitLoader. Includes the common ones, and
- add others that only work with a local dir."""
-
- def _git(self, *cmd):
- """Small wrapper around subprocess to call Git."""
- try:
- return subprocess.check_output(
- ['git', '-C', self.destination_path] + list(cmd))
- except subprocess.CalledProcessError as e:
- print(e.output)
- print(e.stderr)
- raise
-
- def test_load_changed(self):
- """Loads a repository, makes some changes by adding files, commits,
- and merges, load it again, and check the storage contains everything
- it should."""
- # Initial load
- res = self.load()
- self.assertEqual(res['status'], 'eventful', res)
-
- self._git('config', '--local', 'user.email', 'you@example.com')
- self._git('config', '--local', 'user.name', 'Your Name')
-
- # Load with a new file + revision
- with open(os.path.join(self.destination_path, 'hello.py'), 'a') as fd:
- fd.write("print('Hello world')\n")
-
- self._git('add', 'hello.py')
- self._git('commit', '-m', 'Hello world')
- new_revision = self._git('rev-parse', 'master').decode().strip()
-
- revisions = REVISIONS1.copy()
- assert new_revision not in revisions
- revisions[new_revision] = '85dae072a5aa9923ffa7a7568f819ff21bf49858'
-
- res = self.load()
- self.assertEqual(res['status'], 'eventful')
-
- self.assertCountContents(4 + 1)
- self.assertCountDirectories(7 + 1)
- self.assertCountReleases(0) # FIXME: why not 2?
- self.assertCountRevisions(7 + 1)
- self.assertCountSnapshots(1 + 1)
-
- self.assertRevisionsContain(revisions)
-
- # TODO: how to check the snapshot id?
- # self.assertSnapshotEqual(SNAPSHOT1)
-
- self.assertEqual(self.loader.load_status(), {'status': 'eventful'})
- self.assertEqual(self.loader.visit_status(), 'full')
-
- # Load with a new merge
- self._git('merge', 'branch1', '-m', 'merge')
- new_revision = self._git('rev-parse', 'master').decode().strip()
-
- assert new_revision not in revisions
- revisions[new_revision] = 'dab8a37df8db8666d4e277bef9a546f585b5bedd'
-
- res = self.load()
- self.assertEqual(res['status'], 'eventful')
-
- self.assertCountContents(4 + 1)
- self.assertCountDirectories(7 + 2)
- self.assertCountReleases(0) # FIXME: why not 2?
- self.assertCountRevisions(7 + 2)
- self.assertCountSnapshots(1 + 1 + 1)
-
- self.assertRevisionsContain(revisions)
-
- # TODO: how to check the snapshot id?
- # self.assertSnapshotEqual(SNAPSHOT1)
-
- self.assertEqual(self.loader.load_status(), {'status': 'eventful'})
- self.assertEqual(self.loader.visit_status(), 'full')
-
-
-class GitLoaderFromArchiveTest(BaseGitLoaderFromArchiveTest, GitLoaderTests):
- """Tests for GitLoaderFromArchive. Imports the common ones
- from GitLoaderTests."""
- pass
+ origin_url=self.repo_url)
diff --git a/swh/loader/git/tests/test_tasks.py b/swh/loader/git/tests/test_tasks.py
--- a/swh/loader/git/tests/test_tasks.py
+++ b/swh/loader/git/tests/test_tasks.py
@@ -18,7 +18,7 @@
task = UpdateGitRepository()
self.assertEqual(task.task_queue, 'swh_loader_git')
- @patch('swh.loader.git.updater.BulkUpdater.load')
+ @patch('swh.loader.git.loader.GitLoader.load')
def test_task(self, mock_loader):
mock_loader.return_value = {'status': 'eventful'}
task = UpdateGitRepository()
@@ -36,7 +36,7 @@
task = LoadDiskGitRepository()
self.assertEqual(task.task_queue, 'swh_loader_git_express')
- @patch('swh.loader.git.loader.GitLoader.load')
+ @patch('swh.loader.git.from_disk.GitLoaderFromDisk.load')
def test_task(self, mock_loader):
mock_loader.return_value = {'status': 'uneventful'}
task = LoadDiskGitRepository()
@@ -56,7 +56,7 @@
task = UncompressAndLoadDiskGitRepository()
self.assertEqual(task.task_queue, 'swh_loader_git_archive')
- @patch('swh.loader.git.loader.GitLoaderFromArchive.load')
+ @patch('swh.loader.git.from_disk.GitLoaderFromArchive.load')
def test_task(self, mock_loader):
mock_loader.return_value = {'status': 'failed'}
task = UncompressAndLoadDiskGitRepository()
diff --git a/swh/loader/git/tests/test_updater.py b/swh/loader/git/tests/test_updater.py
deleted file mode 100644
--- a/swh/loader/git/tests/test_updater.py
+++ /dev/null
@@ -1,22 +0,0 @@
-from swh.loader.git.updater import BulkUpdater
-from swh.loader.git.tests.test_loader import DirGitLoaderTest
-
-
-class BulkUpdaterTest(BulkUpdater):
- def parse_config_file(self, *args, **kwargs):
- return {
- **super().parse_config_file(*args, **kwargs),
- 'storage': {'cls': 'memory', 'args': {}}
- }
-
-
-class TestBulkUpdater(DirGitLoaderTest):
- """Same tests as for the GitLoader, but running on BulkUpdater."""
- def setUp(self):
- super().setUp()
- self.loader = BulkUpdaterTest()
- self.storage = self.loader.storage
-
- def load(self):
- return self.loader.load(
- origin_url=self.repo_url)
diff --git a/swh/loader/git/updater.py b/swh/loader/git/updater.py
deleted file mode 100644
--- a/swh/loader/git/updater.py
+++ /dev/null
@@ -1,515 +0,0 @@
-# Copyright (C) 2016-2018 The Software Heritage developers
-# See the AUTHORS file at the top-level directory of this distribution
-# License: GNU General Public License version 3, or any later version
-# See top-level LICENSE file for more information
-
-import datetime
-import dulwich.client
-import logging
-import os
-import pickle
-import sys
-
-from collections import defaultdict
-from io import BytesIO
-from dulwich.object_store import ObjectStoreGraphWalker
-from dulwich.pack import PackData, PackInflater
-
-from swh.model import hashutil
-from swh.loader.core.loader import UnbufferedLoader
-from swh.storage.algos.snapshot import snapshot_get_all_branches
-from . import converters
-
-
-class SWHRepoRepresentation:
- """Repository representation for a Software Heritage origin."""
- def __init__(self, storage, origin_id, base_snapshot=None,
- ignore_history=False):
- self.storage = storage
-
- self._parents_cache = {}
- self._type_cache = {}
-
- self.ignore_history = ignore_history
-
- if origin_id and not ignore_history:
- self.heads = set(self._cache_heads(origin_id, base_snapshot))
- else:
- self.heads = set()
-
- def _fill_parents_cache(self, commits):
- """When querying for a commit's parents, we fill the cache to a depth of 1000
- commits."""
- root_revs = self._encode_for_storage(commits)
- for rev, parents in self.storage.revision_shortlog(root_revs, 1000):
- rev_id = hashutil.hash_to_bytehex(rev)
- if rev_id not in self._parents_cache:
- self._parents_cache[rev_id] = [
- hashutil.hash_to_bytehex(parent) for parent in parents
- ]
- for rev in commits:
- if rev not in self._parents_cache:
- self._parents_cache[rev] = []
-
- def _cache_heads(self, origin_id, base_snapshot):
- """Return all the known head commits for `origin_id`"""
- _git_types = ['content', 'directory', 'revision', 'release']
-
- if not base_snapshot:
- return []
-
- snapshot_targets = set()
- for target in base_snapshot['branches'].values():
- if target and target['target_type'] in _git_types:
- snapshot_targets.add(target['target'])
-
- decoded_targets = self._decode_from_storage(snapshot_targets)
-
- for id, objs in self.get_stored_objects(decoded_targets).items():
- if not objs:
- logging.warn('Missing head: %s' % hashutil.hash_to_hex(id))
- return []
-
- return decoded_targets
-
- def get_parents(self, commit):
- """Bogus method to prevent expensive recursion, at the expense of less
- efficient downloading"""
- return []
-
- def get_heads(self):
- return self.heads
-
- @staticmethod
- def _encode_for_storage(objects):
- return [hashutil.bytehex_to_hash(object) for object in objects]
-
- @staticmethod
- def _decode_from_storage(objects):
- return set(hashutil.hash_to_bytehex(object) for object in objects)
-
- def graph_walker(self):
- return ObjectStoreGraphWalker(self.get_heads(), self.get_parents)
-
- @staticmethod
- def filter_unwanted_refs(refs):
- """Filter the unwanted references from refs"""
- ret = {}
- for ref, val in refs.items():
- if ref.endswith(b'^{}'):
- # Peeled refs make the git protocol explode
- continue
- elif ref.startswith(b'refs/pull/') and ref.endswith(b'/merge'):
- # We filter-out auto-merged GitHub pull requests
- continue
- else:
- ret[ref] = val
-
- return ret
-
- def determine_wants(self, refs):
- """Filter the remote references to figure out which ones
- Software Heritage needs.
- """
- if not refs:
- return []
-
- # Find what objects Software Heritage has
- refs = self.find_remote_ref_types_in_swh(refs)
-
- # Cache the objects found in swh as existing heads
- for target in refs.values():
- if target['target_type'] is not None:
- self.heads.add(target['target'])
-
- ret = set()
- for target in self.filter_unwanted_refs(refs).values():
- if target['target_type'] is None:
- # The target doesn't exist in Software Heritage, let's retrieve
- # it.
- ret.add(target['target'])
-
- return list(ret)
-
- def get_stored_objects(self, objects):
- """Find which of these objects were stored in the archive.
-
- Do the request in packets to avoid a server timeout.
- """
- if self.ignore_history:
- return {}
-
- packet_size = 1000
-
- ret = {}
- query = []
- for object in objects:
- query.append(object)
- if len(query) >= packet_size:
- ret.update(
- self.storage.object_find_by_sha1_git(
- self._encode_for_storage(query)
- )
- )
- query = []
- if query:
- ret.update(
- self.storage.object_find_by_sha1_git(
- self._encode_for_storage(query)
- )
- )
- return ret
-
- def find_remote_ref_types_in_swh(self, remote_refs):
- """Parse the remote refs information and list the objects that exist in
- Software Heritage.
- """
-
- all_objs = set(remote_refs.values()) - set(self._type_cache)
- type_by_id = {}
-
- for id, objs in self.get_stored_objects(all_objs).items():
- id = hashutil.hash_to_bytehex(id)
- if objs:
- type_by_id[id] = objs[0]['type']
-
- self._type_cache.update(type_by_id)
-
- ret = {}
- for ref, id in remote_refs.items():
- ret[ref] = {
- 'target': id,
- 'target_type': self._type_cache.get(id),
- }
- return ret
-
-
-class BulkUpdater(UnbufferedLoader):
- """A bulk loader for a git repository"""
- CONFIG_BASE_FILENAME = 'loader/git-updater'
-
- ADDITIONAL_CONFIG = {
- 'pack_size_bytes': ('int', 4 * 1024 * 1024 * 1024),
- }
-
- def __init__(self, repo_representation=SWHRepoRepresentation, config=None):
- """Initialize the bulk updater.
-
- Args:
- repo_representation: swh's repository representation
- which is in charge of filtering between known and remote
- data.
-
- """
- super().__init__(logging_class='swh.loader.git.BulkLoader',
- config=config)
- self.repo_representation = repo_representation
-
- def fetch_pack_from_origin(self, origin_url, base_origin_id,
- base_snapshot, do_activity):
- """Fetch a pack from the origin"""
- pack_buffer = BytesIO()
-
- base_repo = self.repo_representation(
- storage=self.storage,
- origin_id=base_origin_id,
- base_snapshot=base_snapshot,
- ignore_history=self.ignore_history,
- )
-
- client, path = dulwich.client.get_transport_and_path(origin_url,
- thin_packs=False)
-
- size_limit = self.config['pack_size_bytes']
-
- def do_pack(data,
- pack_buffer=pack_buffer,
- limit=size_limit,
- origin_url=origin_url):
- cur_size = pack_buffer.tell()
- would_write = len(data)
- if cur_size + would_write > limit:
- raise IOError('Pack file too big for repository %s, '
- 'limit is %d bytes, current size is %d, '
- 'would write %d' %
- (origin_url, limit, cur_size, would_write))
-
- pack_buffer.write(data)
-
- remote_refs = client.fetch_pack(path,
- base_repo.determine_wants,
- base_repo.graph_walker(),
- do_pack,
- progress=do_activity).refs
-
- if remote_refs:
- local_refs = base_repo.find_remote_ref_types_in_swh(remote_refs)
- else:
- local_refs = remote_refs = {}
-
- pack_buffer.flush()
- pack_size = pack_buffer.tell()
- pack_buffer.seek(0)
-
- return {
- 'remote_refs': base_repo.filter_unwanted_refs(remote_refs),
- 'local_refs': local_refs,
- 'pack_buffer': pack_buffer,
- 'pack_size': pack_size,
- }
-
- def list_pack(self, pack_data, pack_size):
- id_to_type = {}
- type_to_ids = defaultdict(set)
-
- inflater = self.get_inflater()
-
- for obj in inflater:
- type, id = obj.type_name, obj.id
- id_to_type[id] = type
- type_to_ids[type].add(id)
-
- return id_to_type, type_to_ids
-
- def prepare_origin_visit(self, origin_url, **kwargs):
- self.visit_date = datetime.datetime.now(tz=datetime.timezone.utc)
- self.origin = converters.origin_url_to_origin(origin_url)
-
- def get_full_snapshot(self, origin_id):
- prev_snapshot = self.storage.snapshot_get_latest(origin_id)
- if prev_snapshot and prev_snapshot.pop('next_branch', None):
- return snapshot_get_all_branches(self.storage, prev_snapshot['id'])
-
- return prev_snapshot
-
- def prepare(self, origin_url, base_url=None, ignore_history=False):
- base_origin_id = origin_id = self.origin_id
-
- prev_snapshot = None
-
- if not ignore_history:
- prev_snapshot = self.get_full_snapshot(origin_id)
-
- if base_url and not prev_snapshot:
- base_origin = converters.origin_url_to_origin(base_url)
- base_origin = self.storage.origin_get(base_origin)
- if base_origin:
- base_origin_id = base_origin['id']
- prev_snapshot = self.get_full_snapshot(base_origin_id)
-
- self.base_snapshot = prev_snapshot
- self.base_origin_id = base_origin_id
- self.ignore_history = ignore_history
-
- def fetch_data(self):
- def do_progress(msg):
- sys.stderr.buffer.write(msg)
- sys.stderr.flush()
-
- fetch_info = self.fetch_pack_from_origin(
- self.origin['url'], self.base_origin_id, self.base_snapshot,
- do_progress)
-
- self.pack_buffer = fetch_info['pack_buffer']
- self.pack_size = fetch_info['pack_size']
-
- self.remote_refs = fetch_info['remote_refs']
- self.local_refs = fetch_info['local_refs']
-
- origin_url = self.origin['url']
-
- self.log.info('Listed %d refs for repo %s' % (
- len(self.remote_refs), origin_url), extra={
- 'swh_type': 'git_repo_list_refs',
- 'swh_repo': origin_url,
- 'swh_num_refs': len(self.remote_refs),
- })
-
- # We want to load the repository, walk all the objects
- id_to_type, type_to_ids = self.list_pack(self.pack_buffer,
- self.pack_size)
-
- self.id_to_type = id_to_type
- self.type_to_ids = type_to_ids
-
- def save_data(self):
- """Store a pack for archival"""
-
- write_size = 8192
- pack_dir = self.get_save_data_path()
-
- pack_name = "%s.pack" % self.visit_date.isoformat()
- refs_name = "%s.refs" % self.visit_date.isoformat()
-
- with open(os.path.join(pack_dir, pack_name), 'xb') as f:
- self.pack_buffer.seek(0)
- while True:
- r = self.pack_buffer.read(write_size)
- if not r:
- break
- f.write(r)
-
- self.pack_buffer.seek(0)
-
- with open(os.path.join(pack_dir, refs_name), 'xb') as f:
- pickle.dump(self.remote_refs, f)
-
- def get_inflater(self):
- """Reset the pack buffer and get an object inflater from it"""
- self.pack_buffer.seek(0)
- return PackInflater.for_pack_data(
- PackData.from_file(self.pack_buffer, self.pack_size))
-
- def has_contents(self):
- return bool(self.type_to_ids[b'blob'])
-
- def get_content_ids(self):
- """Get the content identifiers from the git repository"""
- for raw_obj in self.get_inflater():
- if raw_obj.type_name != b'blob':
- continue
-
- yield converters.dulwich_blob_to_content_id(raw_obj)
-
- def get_contents(self):
- """Format the blobs from the git repository as swh contents"""
- max_content_size = self.config['content_size_limit']
-
- missing_contents = set(self.storage.content_missing(
- self.get_content_ids(), 'sha1_git'))
-
- for raw_obj in self.get_inflater():
- if raw_obj.type_name != b'blob':
- continue
-
- if raw_obj.sha().digest() not in missing_contents:
- continue
-
- yield converters.dulwich_blob_to_content(
- raw_obj, log=self.log, max_content_size=max_content_size,
- origin_id=self.origin_id)
-
- def has_directories(self):
- return bool(self.type_to_ids[b'tree'])
-
- def get_directory_ids(self):
- """Get the directory identifiers from the git repository"""
- return (hashutil.hash_to_bytes(id.decode())
- for id in self.type_to_ids[b'tree'])
-
- def get_directories(self):
- """Format the trees as swh directories"""
- missing_dirs = set(self.storage.directory_missing(
- sorted(self.get_directory_ids())))
-
- for raw_obj in self.get_inflater():
- if raw_obj.type_name != b'tree':
- continue
-
- if raw_obj.sha().digest() not in missing_dirs:
- continue
-
- yield converters.dulwich_tree_to_directory(raw_obj, log=self.log)
-
- def has_revisions(self):
- return bool(self.type_to_ids[b'commit'])
-
- def get_revision_ids(self):
- """Get the revision identifiers from the git repository"""
- return (hashutil.hash_to_bytes(id.decode())
- for id in self.type_to_ids[b'commit'])
-
- def get_revisions(self):
- """Format commits as swh revisions"""
- missing_revs = set(self.storage.revision_missing(
- sorted(self.get_revision_ids())))
-
- for raw_obj in self.get_inflater():
- if raw_obj.type_name != b'commit':
- continue
-
- if raw_obj.sha().digest() not in missing_revs:
- continue
-
- yield converters.dulwich_commit_to_revision(raw_obj, log=self.log)
-
- def has_releases(self):
- return bool(self.type_to_ids[b'tag'])
-
- def get_release_ids(self):
- """Get the release identifiers from the git repository"""
- return (hashutil.hash_to_bytes(id.decode())
- for id in self.type_to_ids[b'tag'])
-
- def get_releases(self):
- """Retrieve all the release objects from the git repository"""
- missing_rels = set(self.storage.release_missing(
- sorted(self.get_release_ids())))
-
- for raw_obj in self.get_inflater():
- if raw_obj.type_name != b'tag':
- continue
-
- if raw_obj.sha().digest() not in missing_rels:
- continue
-
- yield converters.dulwich_tag_to_release(raw_obj, log=self.log)
-
- def get_snapshot(self):
- branches = {}
-
- for ref in self.remote_refs:
- ret_ref = self.local_refs[ref].copy()
- if not ret_ref['target_type']:
- target_type = self.id_to_type[ret_ref['target']]
- ret_ref['target_type'] = converters.DULWICH_TYPES[target_type]
-
- ret_ref['target'] = hashutil.bytehex_to_hash(ret_ref['target'])
-
- branches[ref] = ret_ref
-
- self.snapshot = converters.branches_to_snapshot(branches)
- return self.snapshot
-
- def get_fetch_history_result(self):
- return {
- 'contents': len(self.type_to_ids[b'blob']),
- 'directories': len(self.type_to_ids[b'tree']),
- 'revisions': len(self.type_to_ids[b'commit']),
- 'releases': len(self.type_to_ids[b'tag']),
- }
-
- def load_status(self):
- """The load was eventful if the current snapshot is different to
- the one we retrieved at the beginning of the run"""
- eventful = False
-
- if self.base_snapshot:
- eventful = self.snapshot['id'] != self.base_snapshot['id']
- else:
- eventful = bool(self.snapshot['branches'])
-
- return {'status': ('eventful' if eventful else 'uneventful')}
-
-
-if __name__ == '__main__':
- import click
-
- logging.basicConfig(
- level=logging.DEBUG,
- format='%(asctime)s %(process)d %(message)s'
- )
-
- @click.command()
- @click.option('--origin-url', help='Origin url', required=True)
- @click.option('--base-url', default=None, help='Optional Base url')
- @click.option('--ignore-history/--no-ignore-history',
- help='Ignore the repository history', default=False)
- def main(origin_url, base_url, ignore_history):
- return BulkUpdater().load(
- origin_url,
- base_url=base_url,
- ignore_history=ignore_history,
- )
-
- main()
File Metadata
Details
Attached
Mime Type
text/plain
Expires
Thu, Dec 19, 12:37 AM (20 h, 34 m ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3227381
Attached To
D834: loader: Rename loaders to more meaningful names
Event Timeline
Log In to Comment