Page Menu
Home
Software Heritage
Search
Configure Global Search
Log In
Files
F9696035
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
56 KB
Subscribers
None
View Options
diff --git a/swh/loader/git/updater.py b/swh/loader/git/base.py
similarity index 52%
copy from swh/loader/git/updater.py
copy to swh/loader/git/base.py
index e7d9e45..ba22de4 100644
--- a/swh/loader/git/updater.py
+++ b/swh/loader/git/base.py
@@ -1,767 +1,390 @@
-# Copyright (C) 2015 The Software Heritage developers
+# Copyright (C) 2016 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
-import datetime
-from io import BytesIO
import logging
-import sys
import traceback
import uuid
-from collections import defaultdict
-import dulwich.client
-from dulwich.object_store import ObjectStoreGraphWalker
-from dulwich.pack import PackData, PackInflater
import psycopg2
import requests
from retrying import retry
-from urllib.parse import urlparse
-from swh.core import config, hashutil
+from swh.core import config
from swh.storage import get_storage
-from . import converters
-
-
-class SWHRepoRepresentation:
- """Repository representation for a Software Heritage origin."""
- def __init__(self, storage, origin_id, occurrences=None):
- self.storage = storage
-
- self._parents_cache = {}
- self._type_cache = {}
-
- if origin_id:
- self.heads = set(self._cache_heads(origin_id, occurrences))
- else:
- self.heads = set()
-
- def _fill_parents_cache(self, commit):
- """When querying for a commit's parents, we fill the cache to a depth of 100
- commits."""
- root_rev = hashutil.bytehex_to_hash(commit)
- for rev, parents in self.storage.revision_shortlog([root_rev], 100):
- rev_id = hashutil.hash_to_bytehex(rev)
- if rev_id not in self._parents_cache:
- self._parents_cache[rev_id] = [
- hashutil.hash_to_bytehex(parent) for parent in parents
- ]
-
- def _cache_heads(self, origin_id, occurrences):
- """Return all the known head commits for `origin_id`"""
- if not occurrences:
- occurrences = self.storage.occurrence_get(origin_id)
-
- return self._decode_from_storage(
- occurrence['target'] for occurrence in occurrences
- )
-
- def get_parents(self, commit):
- """get the parent commits for `commit`"""
- if commit not in self._parents_cache:
- self._fill_parents_cache(commit)
- return self._parents_cache.get(commit, [])
-
- def get_heads(self):
- return self.heads
-
- @staticmethod
- def _encode_for_storage(objects):
- return [hashutil.bytehex_to_hash(object) for object in objects]
-
- @staticmethod
- def _decode_from_storage(objects):
- return set(hashutil.hash_to_bytehex(object) for object in objects)
-
- def graph_walker(self):
- return ObjectStoreGraphWalker(self.get_heads(), self.get_parents)
-
- @staticmethod
- def filter_unwanted_refs(refs):
- """Filter the unwanted references from refs"""
- ret = {}
- for ref, val in refs.items():
- if ref.endswith(b'^{}'):
- # Peeled refs make the git protocol explode
- continue
- elif ref.startswith(b'refs/pull/') and ref.endswith(b'/merge'):
- # We filter-out auto-merged GitHub pull requests
- continue
- else:
- ret[ref] = val
-
- return ret
-
- def determine_wants(self, refs):
- """Filter the remote references to figure out which ones
- Software Heritage needs.
- """
- if not refs:
- return []
-
- # Find what objects Software Heritage has
- refs = self.find_remote_ref_types_in_swh(refs)
-
- # Cache the objects found in swh as existing heads
- for target in refs.values():
- if target['target_type'] is not None:
- self.heads.add(target['target'])
-
- ret = set()
- for target in self.filter_unwanted_refs(refs).values():
- if target['target_type'] is None:
- # The target doesn't exist in Software Heritage, let's retrieve
- # it.
- ret.add(target['target'])
-
- return list(ret)
-
- def get_stored_objects(self, objects):
- return self.storage.object_find_by_sha1_git(
- self._encode_for_storage(objects))
-
- def find_remote_ref_types_in_swh(self, remote_refs):
- """Parse the remote refs information and list the objects that exist in
- Software Heritage.
- """
-
- all_objs = set(remote_refs.values()) - set(self._type_cache)
- type_by_id = {}
-
- for id, objs in self.get_stored_objects(all_objs).items():
- id = hashutil.hash_to_bytehex(id)
- if objs:
- type_by_id[id] = objs[0]['type']
-
- self._type_cache.update(type_by_id)
-
- ret = {}
- for ref, id in remote_refs.items():
- ret[ref] = {
- 'target': id,
- 'target_type': self._type_cache.get(id),
- }
- return ret
-
def send_in_packets(objects, sender, packet_size, packet_size_bytes=None):
"""Send `objects`, using the `sender`, in packets of `packet_size` objects (and
of max `packet_size_bytes`).
"""
formatted_objects = []
count = 0
if not packet_size_bytes:
packet_size_bytes = 0
for obj in objects:
if not obj:
continue
formatted_objects.append(obj)
if packet_size_bytes:
count += obj['length']
if len(formatted_objects) >= packet_size or count > packet_size_bytes:
sender(formatted_objects)
formatted_objects = []
count = 0
if formatted_objects:
sender(formatted_objects)
def retry_loading(error):
"""Retry policy when we catch a recoverable error"""
exception_classes = [
# raised when two parallel insertions insert the same data.
psycopg2.IntegrityError,
# raised when uWSGI restarts and hungs up on the worker.
requests.exceptions.ConnectionError,
]
if not any(isinstance(error, exc) for exc in exception_classes):
return False
logger = logging.getLogger('swh.loader.git.BulkLoader')
error_name = error.__module__ + '.' + error.__class__.__name__
logger.warning('Retry loading a batch', exc_info=False, extra={
'swh_type': 'storage_retry',
'swh_exception_type': error_name,
'swh_exception': traceback.format_exception(
error.__class__,
error,
error.__traceback__,
),
})
return True
class BaseLoader(config.SWHConfig):
"""This base class is a pattern for loaders.
The external calling convention is as such:
- instantiate the class once (loads storage and the configuration)
- for each origin, call load with the origin-specific arguments (for
instance, an origin URL).
load calls several methods that must be implemented in subclasses:
- prepare(*args, **kwargs) prepares the loader for the new origin
- get_origin gets the origin object associated to the current loader
- fetch_data downloads the necessary data from the origin
- get_{contents,directories,revisions,releases,occurrences} retrieve each
kind of object from the origin
- has_* checks whether there are some objects to load for that object type
- get_fetch_history_result retrieves the data to insert in the
fetch_history table once the load was successful
- eventful returns whether the load was eventful or not
"""
CONFIG_BASE_FILENAME = None
DEFAULT_CONFIG = {
'storage_class': ('str', 'remote_storage'),
'storage_args': ('list[str]', ['http://localhost:5000/']),
'send_contents': ('bool', True),
'send_directories': ('bool', True),
'send_revisions': ('bool', True),
'send_releases': ('bool', True),
'send_occurrences': ('bool', True),
'content_packet_size': ('int', 10000),
'content_packet_size_bytes': ('int', 1024 * 1024 * 1024),
'directory_packet_size': ('int', 25000),
'revision_packet_size': ('int', 100000),
'release_packet_size': ('int', 100000),
'occurrence_packet_size': ('int', 100000),
}
ADDITIONAL_CONFIG = {}
def __init__(self):
self.config = self.parse_config_file(
additional_configs=[self.ADDITIONAL_CONFIG])
self.storage = get_storage(self.config['storage_class'],
self.config['storage_args'])
self.log = logging.getLogger('swh.loader.git.BulkLoader')
def prepare(self, *args, **kwargs):
"""Prepare the data source to be loaded"""
raise NotImplementedError
def get_origin(self):
"""Get the origin that is currently being loaded"""
raise NotImplementedError
def fetch_data(self):
"""Fetch the data from the data source"""
raise NotImplementedError
def has_contents(self):
"""Checks whether we need to load contents"""
return True
def get_contents(self):
"""Get the contents that need to be loaded"""
raise NotImplementedError
def has_directories(self):
"""Checks whether we need to load directories"""
return True
def get_directories(self):
"""Get the directories that need to be loaded"""
raise NotImplementedError
def has_revisions(self):
"""Checks whether we need to load revisions"""
return True
def get_revisions(self):
"""Get the revisions that need to be loaded"""
raise NotImplementedError
def has_releases(self):
"""Checks whether we need to load releases"""
return True
def get_releases(self):
"""Get the releases that need to be loaded"""
raise NotImplementedError
def has_occurrences(self):
"""Checks whether we need to load occurrences"""
return True
def get_occurrences(self, refs):
"""Get the occurrences that need to be loaded"""
raise NotImplementedError
def get_fetch_history_result(self):
"""Return the data to store in fetch_history for the current loader"""
raise NotImplementedError
def eventful(self):
"""Whether the load was eventful"""
raise NotImplementedError
@retry(retry_on_exception=retry_loading, stop_max_attempt_number=3)
def send_contents(self, content_list):
"""Actually send properly formatted contents to the database"""
num_contents = len(content_list)
log_id = str(uuid.uuid4())
self.log.debug("Sending %d contents" % num_contents,
extra={
'swh_type': 'storage_send_start',
'swh_content_type': 'content',
'swh_num': num_contents,
'swh_id': log_id,
})
self.storage.content_add(content_list)
self.log.debug("Done sending %d contents" % num_contents,
extra={
'swh_type': 'storage_send_end',
'swh_content_type': 'content',
'swh_num': num_contents,
'swh_id': log_id,
})
@retry(retry_on_exception=retry_loading, stop_max_attempt_number=3)
def send_directories(self, directory_list):
"""Actually send properly formatted directories to the database"""
num_directories = len(directory_list)
log_id = str(uuid.uuid4())
self.log.debug("Sending %d directories" % num_directories,
extra={
'swh_type': 'storage_send_start',
'swh_content_type': 'directory',
'swh_num': num_directories,
'swh_id': log_id,
})
self.storage.directory_add(directory_list)
self.log.debug("Done sending %d directories" % num_directories,
extra={
'swh_type': 'storage_send_end',
'swh_content_type': 'directory',
'swh_num': num_directories,
'swh_id': log_id,
})
@retry(retry_on_exception=retry_loading, stop_max_attempt_number=3)
def send_revisions(self, revision_list):
"""Actually send properly formatted revisions to the database"""
num_revisions = len(revision_list)
log_id = str(uuid.uuid4())
self.log.debug("Sending %d revisions" % num_revisions,
extra={
'swh_type': 'storage_send_start',
'swh_content_type': 'revision',
'swh_num': num_revisions,
'swh_id': log_id,
})
self.storage.revision_add(revision_list)
self.log.debug("Done sending %d revisions" % num_revisions,
extra={
'swh_type': 'storage_send_end',
'swh_content_type': 'revision',
'swh_num': num_revisions,
'swh_id': log_id,
})
@retry(retry_on_exception=retry_loading, stop_max_attempt_number=3)
def send_releases(self, release_list):
"""Actually send properly formatted releases to the database"""
num_releases = len(release_list)
log_id = str(uuid.uuid4())
self.log.debug("Sending %d releases" % num_releases,
extra={
'swh_type': 'storage_send_start',
'swh_content_type': 'release',
'swh_num': num_releases,
'swh_id': log_id,
})
self.storage.release_add(release_list)
self.log.debug("Done sending %d releases" % num_releases,
extra={
'swh_type': 'storage_send_end',
'swh_content_type': 'release',
'swh_num': num_releases,
'swh_id': log_id,
})
@retry(retry_on_exception=retry_loading, stop_max_attempt_number=3)
def send_occurrences(self, occurrence_list):
"""Actually send properly formatted occurrences to the database"""
num_occurrences = len(occurrence_list)
log_id = str(uuid.uuid4())
self.log.debug("Sending %d occurrences" % num_occurrences,
extra={
'swh_type': 'storage_send_start',
'swh_content_type': 'occurrence',
'swh_num': num_occurrences,
'swh_id': log_id,
})
self.storage.occurrence_add(occurrence_list)
self.log.debug("Done sending %d occurrences" % num_occurrences,
extra={
'swh_type': 'storage_send_end',
'swh_content_type': 'occurrence',
'swh_num': num_occurrences,
'swh_id': log_id,
})
def send_origin(self, origin):
log_id = str(uuid.uuid4())
- self.log.debug('Creating origin for %s' % origin_url,
+ self.log.debug('Creating %s origin for %s' % (origin['type'],
+ origin['url']),
extra={
'swh_type': 'storage_send_start',
'swh_content_type': 'origin',
'swh_num': 1,
'swh_id': log_id
})
origin_id = self.storage.origin_add_one(origin)
- self.log.debug('Done creating origin for %s' % origin_url,
+ self.log.debug('Done creating %s origin for %s' % (origin['type'],
+ origin['url']),
extra={
'swh_type': 'storage_send_end',
'swh_content_type': 'origin',
'swh_num': 1,
'swh_id': log_id
})
return origin_id
def send_all_contents(self, contents):
"""Send all the contents to the database"""
packet_size = self.config['content_packet_size']
packet_size_bytes = self.config['content_packet_size_bytes']
send_in_packets(contents, self.send_contents, packet_size,
- packet_size_bytes=packet_size_bytes, log=self.log)
+ packet_size_bytes=packet_size_bytes)
def send_all_directories(self, directories):
"""Send all the directories to the database"""
packet_size = self.config['directory_packet_size']
- send_in_packets(directories, self.send_directories, packet_size,
- log=self.log)
+ send_in_packets(directories, self.send_directories, packet_size)
def send_all_revisions(self, revisions):
"""Send all the revisions to the database"""
packet_size = self.config['revision_packet_size']
- send_in_packets(revisions, self.send_revisions, packet_size,
- log=self.log)
+ send_in_packets(revisions, self.send_revisions, packet_size)
def send_all_releases(self, releases):
"""Send all the releases to the database
"""
packet_size = self.config['release_packet_size']
- send_in_packets(releases, self.send_releases, packet_size,
- log=self.log)
+ send_in_packets(releases, self.send_releases, packet_size)
def send_all_occurrences(self, occurrences):
"""Send all the occurrences to the database
"""
packet_size = self.config['occurrence_packet_size']
send_in_packets(occurrences, self.send_occurrences, packet_size)
def open_fetch_history(self):
return self.storage.fetch_history_start(self.origin_id)
def close_fetch_history_success(self, fetch_history_id, result):
data = {
'status': True,
'result': result,
}
return self.storage.fetch_history_end(fetch_history_id, data)
def close_fetch_history_failure(self, fetch_history_id):
import traceback
data = {
'status': False,
'stderr': traceback.format_exc(),
}
return self.storage.fetch_history_end(fetch_history_id, data)
def load(self, *args, **kwargs):
self.prepare(*args, **kwargs)
origin = self.get_origin()
self.origin_id = self.send_origin(origin)
fetch_history_id = self.open_fetch_history()
try:
self.fetch_data()
if self.config['send_contents'] and self.has_contents():
self.send_all_contents(self.get_contents())
if self.config['send_directories'] and self.has_directories():
self.send_all_directories(self.get_directories())
if self.config['send_revisions'] and self.has_revisions():
self.send_all_revisions(self.get_revisions())
if self.config['send_releases'] and self.has_releases():
self.send_all_releases(self.get_releases())
if self.config['send_occurrences'] and self.has_occurrences():
self.send_all_occurrences(self.get_occurrences())
self.close_fetch_history_success(fetch_history_id,
self.get_fetch_history_result())
except:
self.close_fetch_history_failure(fetch_history_id)
raise
return self.eventful()
-
-
-class BulkUpdater(BaseLoader):
- """A bulk loader for a git repository"""
- CONFIG_BASE_FILENAME = 'loader/git-updater.ini'
-
- ADDITIONAL_CONFIG = {
- 'pack_size_bytes': ('int', 4 * 1024 * 1024 * 1024),
- }
-
- def fetch_pack_from_origin(self, origin_url, base_origin_id,
- base_occurrences, do_activity):
- """Fetch a pack from the origin"""
- pack_buffer = BytesIO()
-
- base_repo = SWHRepoRepresentation(self.storage, base_origin_id,
- base_occurrences)
-
- parsed_uri = urlparse(origin_url)
-
- path = parsed_uri.path
- if not path.endswith('.git'):
- path += '.git'
-
- client = dulwich.client.TCPGitClient(parsed_uri.netloc,
- thin_packs=False)
-
- size_limit = self.config['pack_size_bytes']
-
- def do_pack(data, pack_buffer=pack_buffer, limit=size_limit):
- cur_size = pack_buffer.tell()
- would_write = len(data)
- if cur_size + would_write > limit:
- raise IOError('Pack file too big for repository %s, '
- 'limit is %d bytes, current size is %d, '
- 'would write %d' %
- (origin_url, limit, cur_size, would_write))
-
- pack_buffer.write(data)
-
- remote_refs = client.fetch_pack(path.encode('ascii'),
- base_repo.determine_wants,
- base_repo.graph_walker(),
- do_pack,
- progress=do_activity)
-
- if remote_refs:
- local_refs = base_repo.find_remote_ref_types_in_swh(remote_refs)
- else:
- local_refs = remote_refs = {}
-
- pack_buffer.flush()
- pack_size = pack_buffer.tell()
- pack_buffer.seek(0)
- return {
- 'remote_refs': base_repo.filter_unwanted_refs(remote_refs),
- 'local_refs': local_refs,
- 'pack_buffer': pack_buffer,
- 'pack_size': pack_size,
- }
-
- def list_pack(self, pack_data, pack_size):
- id_to_type = {}
- type_to_ids = defaultdict(set)
-
- inflater = self.get_inflater()
-
- for obj in inflater:
- type, id = obj.type_name, obj.id
- id_to_type[id] = type
- type_to_ids[type].add(id)
-
- return id_to_type, type_to_ids
-
- def prepare(self, origin_url, base_url=None):
- origin = converters.origin_url_to_origin(origin_url)
- base_origin = converters.origin_url_to_origin(base_url)
-
- base_occurrences = []
- base_origin_id = origin_id = None
-
- db_origin = self.storage.origin_get(origin)
- if db_origin:
- base_origin_id = origin_id = db_origin['id']
-
- if origin_id:
- base_occurrences = self.storage.occurrence_get(origin_id)
-
- if base_url and not base_occurrences:
- base_origin = self.storage.origin_get(base_origin)
- if base_origin:
- base_origin_id = base_origin['id']
- base_occurrences = self.storage.occurrence_get(base_origin_id)
-
- self.base_occurrences = list(sorted(base_occurrences,
- key=lambda occ: occ['branch']))
- self.base_origin_id = base_origin_id
- self.origin = origin
-
- def get_origin(self):
- return self.origin
-
- def fetch_data(self):
- def do_progress(msg):
- sys.stderr.buffer.write(msg)
- sys.stderr.flush()
-
- self.fetch_date = datetime.datetime.now(tz=datetime.timezone.utc)
-
- fetch_info = self.fetch_pack_from_origin(
- self.origin['url'], self.base_origin_id, self.base_occurrences,
- do_progress)
-
- self.pack_buffer = fetch_info['pack_buffer']
- self.pack_size = fetch_info['pack_size']
-
- self.remote_refs = fetch_info['remote_refs']
- self.local_refs = fetch_info['local_refs']
- if not self.remote_refs:
- raise ValueError('Handle no remote refs')
-
- origin_url = self.origin['url']
-
- self.log.info('Listed %d refs for repo %s' % (
- len(self.remote_refs), origin_url), extra={
- 'swh_type': 'git_repo_list_refs',
- 'swh_repo': origin_url,
- 'swh_num_refs': len(self.remote_refs),
- })
-
- # We want to load the repository, walk all the objects
- id_to_type, type_to_ids = self.list_pack(self.pack_buffer,
- self.pack_size)
-
- self.id_to_type = id_to_type
- self.type_to_ids = type_to_ids
-
- def get_inflater(self):
- """Reset the pack buffer and get an object inflater from it"""
- self.pack_buffer.seek(0)
- return PackInflater.for_pack_data(
- PackData.from_file(self.pack_buffer, self.pack_size))
-
- def has_contents(self):
- return bool(self.type_to_ids[b'blob'])
-
- def get_contents(self):
- """Format the blobs from the git repository as swh contents"""
- max_content_size = self.config['content_size_limit']
- for raw_obj in self.get_inflater():
- if raw_obj.type_name != b'blob':
- continue
-
- yield converters.dulwich_blob_to_content(
- raw_obj, log=self.log, max_content_size=max_content_size,
- origin_id=self.origin_id)
-
- def has_directories(self):
- return bool(self.type_to_ids[b'tree'])
-
- def get_directories(self):
- """Format the trees as swh directories"""
- for raw_obj in self.get_inflater():
- if raw_obj.type_name != b'tree':
- continue
-
- yield converters.dulwich_tree_to_directory(raw_obj, log=self.log)
-
- def has_revisions(self):
- return bool(self.type_to_ids[b'commit'])
-
- def get_revisions(self):
- """Format commits as swh revisions"""
- for raw_obj in self.get_inflater():
- if raw_obj.type_name != b'commit':
- continue
-
- yield converters.dulwich_commit_to_revision(raw_obj, log=self.log)
-
- def has_releases(self):
- return bool(self.type_to_ids[b'tag'])
-
- def get_releases(self):
- """Retrieve all the release objects from the git repository"""
- for raw_obj in self.get_inflater():
- if raw_obj.type_name != b'tag':
- continue
-
- yield converters.dulwich_tag_to_release(raw_obj, log=self.log)
-
- def has_occurrences(self):
- return bool(self.remote_refs)
-
- def get_occurrences(self):
- ret = []
- for ref in self.remote_refs:
- ret_ref = self.local_refs[ref].copy()
- ret_ref.update({
- 'branch': ref,
- 'origin': self.origin_id,
- 'date': self.fetch_date,
- })
- if not ret_ref['target_type']:
- target_type = self.id_to_type[ret_ref['target']]
- ret_ref['target_type'] = converters.DULWICH_TYPES[target_type]
-
- ret_ref['target'] = hashutil.bytehex_to_hash(ret_ref['target'])
-
- ret.append(ret_ref)
-
- return ret
-
- def get_fetch_history_result(self):
- return {
- 'contents': len(self.type_to_ids[b'blob']),
- 'directories': len(self.type_to_ids[b'tree']),
- 'revisions': len(self.type_to_ids[b'commit']),
- 'releases': len(self.type_to_ids[b'tag']),
- 'occurrences': len(self.remote_refs),
- }
-
- def eventful(self):
- """The load was eventful if the current occurrences are different to
- the ones we retrieved at the beginning of the run"""
- current_occurrences = list(sorted(
- self.storage.occurrence_get(self.origin_id),
- key=lambda occ: occ['branch'],
- ))
-
- return self.base_occurrences != current_occurrences
-
-
-if __name__ == '__main__':
- logging.basicConfig(
- level=logging.DEBUG,
- format='%(asctime)s %(process)d %(message)s'
- )
- bulkupdater = BulkUpdater()
-
- origin_url = sys.argv[1]
- base_url = origin_url
- if len(sys.argv) > 2:
- base_url = sys.argv[2]
-
- print(bulkupdater.load(origin_url, base_url))
diff --git a/swh/loader/git/updater.py b/swh/loader/git/updater.py
index e7d9e45..73b73d9 100644
--- a/swh/loader/git/updater.py
+++ b/swh/loader/git/updater.py
@@ -1,767 +1,385 @@
-# Copyright (C) 2015 The Software Heritage developers
+# Copyright (C) 2016 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import datetime
from io import BytesIO
import logging
import sys
-import traceback
-import uuid
from collections import defaultdict
import dulwich.client
from dulwich.object_store import ObjectStoreGraphWalker
from dulwich.pack import PackData, PackInflater
-import psycopg2
-import requests
-from retrying import retry
from urllib.parse import urlparse
-from swh.core import config, hashutil
-from swh.storage import get_storage
+from swh.core import hashutil
-from . import converters
+from . import base, converters
class SWHRepoRepresentation:
"""Repository representation for a Software Heritage origin."""
def __init__(self, storage, origin_id, occurrences=None):
self.storage = storage
self._parents_cache = {}
self._type_cache = {}
if origin_id:
self.heads = set(self._cache_heads(origin_id, occurrences))
else:
self.heads = set()
def _fill_parents_cache(self, commit):
"""When querying for a commit's parents, we fill the cache to a depth of 100
commits."""
root_rev = hashutil.bytehex_to_hash(commit)
for rev, parents in self.storage.revision_shortlog([root_rev], 100):
rev_id = hashutil.hash_to_bytehex(rev)
if rev_id not in self._parents_cache:
self._parents_cache[rev_id] = [
hashutil.hash_to_bytehex(parent) for parent in parents
]
def _cache_heads(self, origin_id, occurrences):
"""Return all the known head commits for `origin_id`"""
if not occurrences:
occurrences = self.storage.occurrence_get(origin_id)
return self._decode_from_storage(
occurrence['target'] for occurrence in occurrences
)
def get_parents(self, commit):
"""get the parent commits for `commit`"""
if commit not in self._parents_cache:
self._fill_parents_cache(commit)
return self._parents_cache.get(commit, [])
def get_heads(self):
return self.heads
@staticmethod
def _encode_for_storage(objects):
return [hashutil.bytehex_to_hash(object) for object in objects]
@staticmethod
def _decode_from_storage(objects):
return set(hashutil.hash_to_bytehex(object) for object in objects)
def graph_walker(self):
return ObjectStoreGraphWalker(self.get_heads(), self.get_parents)
@staticmethod
def filter_unwanted_refs(refs):
"""Filter the unwanted references from refs"""
ret = {}
for ref, val in refs.items():
if ref.endswith(b'^{}'):
# Peeled refs make the git protocol explode
continue
elif ref.startswith(b'refs/pull/') and ref.endswith(b'/merge'):
# We filter-out auto-merged GitHub pull requests
continue
else:
ret[ref] = val
return ret
def determine_wants(self, refs):
"""Filter the remote references to figure out which ones
Software Heritage needs.
"""
if not refs:
return []
# Find what objects Software Heritage has
refs = self.find_remote_ref_types_in_swh(refs)
# Cache the objects found in swh as existing heads
for target in refs.values():
if target['target_type'] is not None:
self.heads.add(target['target'])
ret = set()
for target in self.filter_unwanted_refs(refs).values():
if target['target_type'] is None:
# The target doesn't exist in Software Heritage, let's retrieve
# it.
ret.add(target['target'])
return list(ret)
def get_stored_objects(self, objects):
return self.storage.object_find_by_sha1_git(
self._encode_for_storage(objects))
def find_remote_ref_types_in_swh(self, remote_refs):
"""Parse the remote refs information and list the objects that exist in
Software Heritage.
"""
all_objs = set(remote_refs.values()) - set(self._type_cache)
type_by_id = {}
for id, objs in self.get_stored_objects(all_objs).items():
id = hashutil.hash_to_bytehex(id)
if objs:
type_by_id[id] = objs[0]['type']
self._type_cache.update(type_by_id)
ret = {}
for ref, id in remote_refs.items():
ret[ref] = {
'target': id,
'target_type': self._type_cache.get(id),
}
return ret
-def send_in_packets(objects, sender, packet_size, packet_size_bytes=None):
- """Send `objects`, using the `sender`, in packets of `packet_size` objects (and
- of max `packet_size_bytes`).
- """
- formatted_objects = []
- count = 0
- if not packet_size_bytes:
- packet_size_bytes = 0
- for obj in objects:
- if not obj:
- continue
- formatted_objects.append(obj)
- if packet_size_bytes:
- count += obj['length']
- if len(formatted_objects) >= packet_size or count > packet_size_bytes:
- sender(formatted_objects)
- formatted_objects = []
- count = 0
-
- if formatted_objects:
- sender(formatted_objects)
-
-
-def retry_loading(error):
- """Retry policy when we catch a recoverable error"""
- exception_classes = [
- # raised when two parallel insertions insert the same data.
- psycopg2.IntegrityError,
- # raised when uWSGI restarts and hungs up on the worker.
- requests.exceptions.ConnectionError,
- ]
-
- if not any(isinstance(error, exc) for exc in exception_classes):
- return False
-
- logger = logging.getLogger('swh.loader.git.BulkLoader')
-
- error_name = error.__module__ + '.' + error.__class__.__name__
- logger.warning('Retry loading a batch', exc_info=False, extra={
- 'swh_type': 'storage_retry',
- 'swh_exception_type': error_name,
- 'swh_exception': traceback.format_exception(
- error.__class__,
- error,
- error.__traceback__,
- ),
- })
-
- return True
-
-
-class BaseLoader(config.SWHConfig):
- """This base class is a pattern for loaders.
-
- The external calling convention is as such:
- - instantiate the class once (loads storage and the configuration)
- - for each origin, call load with the origin-specific arguments (for
- instance, an origin URL).
-
- load calls several methods that must be implemented in subclasses:
-
- - prepare(*args, **kwargs) prepares the loader for the new origin
- - get_origin gets the origin object associated to the current loader
- - fetch_data downloads the necessary data from the origin
- - get_{contents,directories,revisions,releases,occurrences} retrieve each
- kind of object from the origin
- - has_* checks whether there are some objects to load for that object type
- - get_fetch_history_result retrieves the data to insert in the
- fetch_history table once the load was successful
- - eventful returns whether the load was eventful or not
- """
-
- CONFIG_BASE_FILENAME = None
-
- DEFAULT_CONFIG = {
- 'storage_class': ('str', 'remote_storage'),
- 'storage_args': ('list[str]', ['http://localhost:5000/']),
-
- 'send_contents': ('bool', True),
- 'send_directories': ('bool', True),
- 'send_revisions': ('bool', True),
- 'send_releases': ('bool', True),
- 'send_occurrences': ('bool', True),
-
- 'content_packet_size': ('int', 10000),
- 'content_packet_size_bytes': ('int', 1024 * 1024 * 1024),
- 'directory_packet_size': ('int', 25000),
- 'revision_packet_size': ('int', 100000),
- 'release_packet_size': ('int', 100000),
- 'occurrence_packet_size': ('int', 100000),
- }
-
- ADDITIONAL_CONFIG = {}
-
- def __init__(self):
- self.config = self.parse_config_file(
- additional_configs=[self.ADDITIONAL_CONFIG])
-
- self.storage = get_storage(self.config['storage_class'],
- self.config['storage_args'])
- self.log = logging.getLogger('swh.loader.git.BulkLoader')
-
- def prepare(self, *args, **kwargs):
- """Prepare the data source to be loaded"""
- raise NotImplementedError
-
- def get_origin(self):
- """Get the origin that is currently being loaded"""
- raise NotImplementedError
-
- def fetch_data(self):
- """Fetch the data from the data source"""
- raise NotImplementedError
-
- def has_contents(self):
- """Checks whether we need to load contents"""
- return True
-
- def get_contents(self):
- """Get the contents that need to be loaded"""
- raise NotImplementedError
-
- def has_directories(self):
- """Checks whether we need to load directories"""
- return True
-
- def get_directories(self):
- """Get the directories that need to be loaded"""
- raise NotImplementedError
-
- def has_revisions(self):
- """Checks whether we need to load revisions"""
- return True
-
- def get_revisions(self):
- """Get the revisions that need to be loaded"""
- raise NotImplementedError
-
- def has_releases(self):
- """Checks whether we need to load releases"""
- return True
-
- def get_releases(self):
- """Get the releases that need to be loaded"""
- raise NotImplementedError
-
- def has_occurrences(self):
- """Checks whether we need to load occurrences"""
- return True
-
- def get_occurrences(self, refs):
- """Get the occurrences that need to be loaded"""
- raise NotImplementedError
-
- def get_fetch_history_result(self):
- """Return the data to store in fetch_history for the current loader"""
- raise NotImplementedError
-
- def eventful(self):
- """Whether the load was eventful"""
- raise NotImplementedError
-
- @retry(retry_on_exception=retry_loading, stop_max_attempt_number=3)
- def send_contents(self, content_list):
- """Actually send properly formatted contents to the database"""
- num_contents = len(content_list)
- log_id = str(uuid.uuid4())
- self.log.debug("Sending %d contents" % num_contents,
- extra={
- 'swh_type': 'storage_send_start',
- 'swh_content_type': 'content',
- 'swh_num': num_contents,
- 'swh_id': log_id,
- })
- self.storage.content_add(content_list)
- self.log.debug("Done sending %d contents" % num_contents,
- extra={
- 'swh_type': 'storage_send_end',
- 'swh_content_type': 'content',
- 'swh_num': num_contents,
- 'swh_id': log_id,
- })
-
- @retry(retry_on_exception=retry_loading, stop_max_attempt_number=3)
- def send_directories(self, directory_list):
- """Actually send properly formatted directories to the database"""
- num_directories = len(directory_list)
- log_id = str(uuid.uuid4())
- self.log.debug("Sending %d directories" % num_directories,
- extra={
- 'swh_type': 'storage_send_start',
- 'swh_content_type': 'directory',
- 'swh_num': num_directories,
- 'swh_id': log_id,
- })
- self.storage.directory_add(directory_list)
- self.log.debug("Done sending %d directories" % num_directories,
- extra={
- 'swh_type': 'storage_send_end',
- 'swh_content_type': 'directory',
- 'swh_num': num_directories,
- 'swh_id': log_id,
- })
-
- @retry(retry_on_exception=retry_loading, stop_max_attempt_number=3)
- def send_revisions(self, revision_list):
- """Actually send properly formatted revisions to the database"""
- num_revisions = len(revision_list)
- log_id = str(uuid.uuid4())
- self.log.debug("Sending %d revisions" % num_revisions,
- extra={
- 'swh_type': 'storage_send_start',
- 'swh_content_type': 'revision',
- 'swh_num': num_revisions,
- 'swh_id': log_id,
- })
- self.storage.revision_add(revision_list)
- self.log.debug("Done sending %d revisions" % num_revisions,
- extra={
- 'swh_type': 'storage_send_end',
- 'swh_content_type': 'revision',
- 'swh_num': num_revisions,
- 'swh_id': log_id,
- })
-
- @retry(retry_on_exception=retry_loading, stop_max_attempt_number=3)
- def send_releases(self, release_list):
- """Actually send properly formatted releases to the database"""
- num_releases = len(release_list)
- log_id = str(uuid.uuid4())
- self.log.debug("Sending %d releases" % num_releases,
- extra={
- 'swh_type': 'storage_send_start',
- 'swh_content_type': 'release',
- 'swh_num': num_releases,
- 'swh_id': log_id,
- })
- self.storage.release_add(release_list)
- self.log.debug("Done sending %d releases" % num_releases,
- extra={
- 'swh_type': 'storage_send_end',
- 'swh_content_type': 'release',
- 'swh_num': num_releases,
- 'swh_id': log_id,
- })
-
- @retry(retry_on_exception=retry_loading, stop_max_attempt_number=3)
- def send_occurrences(self, occurrence_list):
- """Actually send properly formatted occurrences to the database"""
- num_occurrences = len(occurrence_list)
- log_id = str(uuid.uuid4())
- self.log.debug("Sending %d occurrences" % num_occurrences,
- extra={
- 'swh_type': 'storage_send_start',
- 'swh_content_type': 'occurrence',
- 'swh_num': num_occurrences,
- 'swh_id': log_id,
- })
- self.storage.occurrence_add(occurrence_list)
- self.log.debug("Done sending %d occurrences" % num_occurrences,
- extra={
- 'swh_type': 'storage_send_end',
- 'swh_content_type': 'occurrence',
- 'swh_num': num_occurrences,
- 'swh_id': log_id,
- })
-
- def send_origin(self, origin):
- log_id = str(uuid.uuid4())
- self.log.debug('Creating origin for %s' % origin_url,
- extra={
- 'swh_type': 'storage_send_start',
- 'swh_content_type': 'origin',
- 'swh_num': 1,
- 'swh_id': log_id
- })
- origin_id = self.storage.origin_add_one(origin)
- self.log.debug('Done creating origin for %s' % origin_url,
- extra={
- 'swh_type': 'storage_send_end',
- 'swh_content_type': 'origin',
- 'swh_num': 1,
- 'swh_id': log_id
- })
-
- return origin_id
-
- def send_all_contents(self, contents):
- """Send all the contents to the database"""
- packet_size = self.config['content_packet_size']
- packet_size_bytes = self.config['content_packet_size_bytes']
-
- send_in_packets(contents, self.send_contents, packet_size,
- packet_size_bytes=packet_size_bytes, log=self.log)
-
- def send_all_directories(self, directories):
- """Send all the directories to the database"""
- packet_size = self.config['directory_packet_size']
-
- send_in_packets(directories, self.send_directories, packet_size,
- log=self.log)
-
- def send_all_revisions(self, revisions):
- """Send all the revisions to the database"""
- packet_size = self.config['revision_packet_size']
-
- send_in_packets(revisions, self.send_revisions, packet_size,
- log=self.log)
-
- def send_all_releases(self, releases):
- """Send all the releases to the database
- """
- packet_size = self.config['release_packet_size']
-
- send_in_packets(releases, self.send_releases, packet_size,
- log=self.log)
-
- def send_all_occurrences(self, occurrences):
- """Send all the occurrences to the database
- """
- packet_size = self.config['occurrence_packet_size']
-
- send_in_packets(occurrences, self.send_occurrences, packet_size)
-
- def open_fetch_history(self):
- return self.storage.fetch_history_start(self.origin_id)
-
- def close_fetch_history_success(self, fetch_history_id, result):
- data = {
- 'status': True,
- 'result': result,
- }
- return self.storage.fetch_history_end(fetch_history_id, data)
-
- def close_fetch_history_failure(self, fetch_history_id):
- import traceback
- data = {
- 'status': False,
- 'stderr': traceback.format_exc(),
- }
- return self.storage.fetch_history_end(fetch_history_id, data)
-
- def load(self, *args, **kwargs):
-
- self.prepare(*args, **kwargs)
- origin = self.get_origin()
- self.origin_id = self.send_origin(origin)
-
- fetch_history_id = self.open_fetch_history()
- try:
- self.fetch_data()
-
- if self.config['send_contents'] and self.has_contents():
- self.send_all_contents(self.get_contents())
-
- if self.config['send_directories'] and self.has_directories():
- self.send_all_directories(self.get_directories())
-
- if self.config['send_revisions'] and self.has_revisions():
- self.send_all_revisions(self.get_revisions())
-
- if self.config['send_releases'] and self.has_releases():
- self.send_all_releases(self.get_releases())
-
- if self.config['send_occurrences'] and self.has_occurrences():
- self.send_all_occurrences(self.get_occurrences())
-
- self.close_fetch_history_success(fetch_history_id,
- self.get_fetch_history_result())
- except:
- self.close_fetch_history_failure(fetch_history_id)
- raise
-
- return self.eventful()
-
-
-class BulkUpdater(BaseLoader):
+class BulkUpdater(base.BaseLoader):
"""A bulk loader for a git repository"""
CONFIG_BASE_FILENAME = 'loader/git-updater.ini'
ADDITIONAL_CONFIG = {
'pack_size_bytes': ('int', 4 * 1024 * 1024 * 1024),
}
def fetch_pack_from_origin(self, origin_url, base_origin_id,
base_occurrences, do_activity):
"""Fetch a pack from the origin"""
pack_buffer = BytesIO()
base_repo = SWHRepoRepresentation(self.storage, base_origin_id,
base_occurrences)
parsed_uri = urlparse(origin_url)
path = parsed_uri.path
if not path.endswith('.git'):
path += '.git'
client = dulwich.client.TCPGitClient(parsed_uri.netloc,
thin_packs=False)
size_limit = self.config['pack_size_bytes']
def do_pack(data, pack_buffer=pack_buffer, limit=size_limit):
cur_size = pack_buffer.tell()
would_write = len(data)
if cur_size + would_write > limit:
raise IOError('Pack file too big for repository %s, '
'limit is %d bytes, current size is %d, '
'would write %d' %
(origin_url, limit, cur_size, would_write))
pack_buffer.write(data)
remote_refs = client.fetch_pack(path.encode('ascii'),
base_repo.determine_wants,
base_repo.graph_walker(),
do_pack,
progress=do_activity)
if remote_refs:
local_refs = base_repo.find_remote_ref_types_in_swh(remote_refs)
else:
local_refs = remote_refs = {}
pack_buffer.flush()
pack_size = pack_buffer.tell()
pack_buffer.seek(0)
return {
'remote_refs': base_repo.filter_unwanted_refs(remote_refs),
'local_refs': local_refs,
'pack_buffer': pack_buffer,
'pack_size': pack_size,
}
def list_pack(self, pack_data, pack_size):
id_to_type = {}
type_to_ids = defaultdict(set)
inflater = self.get_inflater()
for obj in inflater:
type, id = obj.type_name, obj.id
id_to_type[id] = type
type_to_ids[type].add(id)
return id_to_type, type_to_ids
def prepare(self, origin_url, base_url=None):
origin = converters.origin_url_to_origin(origin_url)
base_origin = converters.origin_url_to_origin(base_url)
base_occurrences = []
base_origin_id = origin_id = None
db_origin = self.storage.origin_get(origin)
if db_origin:
base_origin_id = origin_id = db_origin['id']
if origin_id:
base_occurrences = self.storage.occurrence_get(origin_id)
if base_url and not base_occurrences:
base_origin = self.storage.origin_get(base_origin)
if base_origin:
base_origin_id = base_origin['id']
base_occurrences = self.storage.occurrence_get(base_origin_id)
self.base_occurrences = list(sorted(base_occurrences,
key=lambda occ: occ['branch']))
self.base_origin_id = base_origin_id
self.origin = origin
def get_origin(self):
return self.origin
def fetch_data(self):
def do_progress(msg):
sys.stderr.buffer.write(msg)
sys.stderr.flush()
self.fetch_date = datetime.datetime.now(tz=datetime.timezone.utc)
fetch_info = self.fetch_pack_from_origin(
self.origin['url'], self.base_origin_id, self.base_occurrences,
do_progress)
self.pack_buffer = fetch_info['pack_buffer']
self.pack_size = fetch_info['pack_size']
self.remote_refs = fetch_info['remote_refs']
self.local_refs = fetch_info['local_refs']
if not self.remote_refs:
raise ValueError('Handle no remote refs')
origin_url = self.origin['url']
self.log.info('Listed %d refs for repo %s' % (
len(self.remote_refs), origin_url), extra={
'swh_type': 'git_repo_list_refs',
'swh_repo': origin_url,
'swh_num_refs': len(self.remote_refs),
})
# We want to load the repository, walk all the objects
id_to_type, type_to_ids = self.list_pack(self.pack_buffer,
self.pack_size)
self.id_to_type = id_to_type
self.type_to_ids = type_to_ids
def get_inflater(self):
"""Reset the pack buffer and get an object inflater from it"""
self.pack_buffer.seek(0)
return PackInflater.for_pack_data(
PackData.from_file(self.pack_buffer, self.pack_size))
def has_contents(self):
return bool(self.type_to_ids[b'blob'])
def get_contents(self):
"""Format the blobs from the git repository as swh contents"""
max_content_size = self.config['content_size_limit']
for raw_obj in self.get_inflater():
if raw_obj.type_name != b'blob':
continue
yield converters.dulwich_blob_to_content(
raw_obj, log=self.log, max_content_size=max_content_size,
origin_id=self.origin_id)
def has_directories(self):
return bool(self.type_to_ids[b'tree'])
def get_directories(self):
"""Format the trees as swh directories"""
for raw_obj in self.get_inflater():
if raw_obj.type_name != b'tree':
continue
yield converters.dulwich_tree_to_directory(raw_obj, log=self.log)
def has_revisions(self):
return bool(self.type_to_ids[b'commit'])
def get_revisions(self):
"""Format commits as swh revisions"""
for raw_obj in self.get_inflater():
if raw_obj.type_name != b'commit':
continue
yield converters.dulwich_commit_to_revision(raw_obj, log=self.log)
def has_releases(self):
return bool(self.type_to_ids[b'tag'])
def get_releases(self):
"""Retrieve all the release objects from the git repository"""
for raw_obj in self.get_inflater():
if raw_obj.type_name != b'tag':
continue
yield converters.dulwich_tag_to_release(raw_obj, log=self.log)
def has_occurrences(self):
return bool(self.remote_refs)
def get_occurrences(self):
ret = []
for ref in self.remote_refs:
ret_ref = self.local_refs[ref].copy()
ret_ref.update({
'branch': ref,
'origin': self.origin_id,
'date': self.fetch_date,
})
if not ret_ref['target_type']:
target_type = self.id_to_type[ret_ref['target']]
ret_ref['target_type'] = converters.DULWICH_TYPES[target_type]
ret_ref['target'] = hashutil.bytehex_to_hash(ret_ref['target'])
ret.append(ret_ref)
return ret
def get_fetch_history_result(self):
return {
'contents': len(self.type_to_ids[b'blob']),
'directories': len(self.type_to_ids[b'tree']),
'revisions': len(self.type_to_ids[b'commit']),
'releases': len(self.type_to_ids[b'tag']),
'occurrences': len(self.remote_refs),
}
def eventful(self):
"""The load was eventful if the current occurrences are different to
the ones we retrieved at the beginning of the run"""
current_occurrences = list(sorted(
self.storage.occurrence_get(self.origin_id),
key=lambda occ: occ['branch'],
))
return self.base_occurrences != current_occurrences
if __name__ == '__main__':
logging.basicConfig(
level=logging.DEBUG,
format='%(asctime)s %(process)d %(message)s'
)
bulkupdater = BulkUpdater()
origin_url = sys.argv[1]
base_url = origin_url
if len(sys.argv) > 2:
base_url = sys.argv[2]
print(bulkupdater.load(origin_url, base_url))
File Metadata
Details
Attached
Mime Type
text/x-diff
Expires
Mon, Aug 18, 6:42 PM (1 w, 5 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3463915
Attached To
rDLDG Git loader
Event Timeline
Log In to Comment