diff --git a/PKG-INFO b/PKG-INFO index 64a7e74..0da254b 100644 --- a/PKG-INFO +++ b/PKG-INFO @@ -1,10 +1,10 @@ Metadata-Version: 1.0 Name: swh.loader.git -Version: 0.0.20 +Version: 0.0.21 Summary: Software Heritage git loader Home-page: https://forge.softwareheritage.org/diffusion/DCORE/ Author: Software Heritage developers Author-email: swh-devel@inria.fr License: UNKNOWN Description: UNKNOWN Platform: UNKNOWN diff --git a/swh.loader.git.egg-info/PKG-INFO b/swh.loader.git.egg-info/PKG-INFO index 64a7e74..0da254b 100644 --- a/swh.loader.git.egg-info/PKG-INFO +++ b/swh.loader.git.egg-info/PKG-INFO @@ -1,10 +1,10 @@ Metadata-Version: 1.0 Name: swh.loader.git -Version: 0.0.20 +Version: 0.0.21 Summary: Software Heritage git loader Home-page: https://forge.softwareheritage.org/diffusion/DCORE/ Author: Software Heritage developers Author-email: swh-devel@inria.fr License: UNKNOWN Description: UNKNOWN Platform: UNKNOWN diff --git a/swh/loader/git/reader.py b/swh/loader/git/reader.py index b069665..a35a971 100644 --- a/swh/loader/git/reader.py +++ b/swh/loader/git/reader.py @@ -1,130 +1,213 @@ # Copyright (C) 2016 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import click import datetime +import logging from collections import defaultdict from swh.core import hashutil, utils -from .updater import BulkUpdater +from .updater import BulkUpdater, SWHRepoRepresentation from .loader import GitLoader +from . import converters class GitSha1Reader(GitLoader): """Disk git sha1 reader. Only read and dump sha1s in stdout. """ def fetch_data(self): """Fetch the data from the data source""" for oid in self.iter_objects(): type_name = self.repo[oid].type_name if type_name != b'blob': continue yield hashutil.hex_to_hash(oid.decode('utf-8')) def load(self, *args, **kwargs): self.prepare(*args, **kwargs) yield from self.fetch_data() +class SWHRepoFullRepresentation(SWHRepoRepresentation): + """Overridden representation of a swh repository to permit to read + completely the remote repository. + + """ + def __init__(self, storage, origin_id, occurrences=None): + self.storage = storage + self._parents_cache = {} + self._type_cache = {} + self.heads = set() + + def determine_wants(self, refs): + """Filter the remote references to figure out which ones Software + Heritage needs. In this particular context, we want to know + everything. + + """ + if not refs: + return [] + + for target in refs.values(): + self.heads.add(target) + + return self.filter_unwanted_refs(refs).values() + + def find_remote_ref_types_in_swh(self, remote_refs): + """Find the known swh remote. + In that particular context, we know nothing. + + """ + return {} + + +class DummyGraphWalker(object): + """Dummy graph walker which claims that the client doesn’t have any + objects. + + """ + def ack(self, sha): pass + + def next(self): pass + + def __next__(self): pass + + class GitSha1RemoteReader(BulkUpdater): """Disk git sha1 reader to dump only repo's content sha1 list. """ CONFIG_BASE_FILENAME = 'loader/git-remote-reader' ADDITIONAL_CONFIG = { 'pack_size_bytes': ('int', 4 * 1024 * 1024 * 1024), 'pack_storage_base': ('str', ''), # don't want to store packs so empty 'next_task': ( 'dict', { 'queue': 'swh.storage.archiver.tasks.SWHArchiverToBackendTask', 'batch_size': 100, 'destination': 'azure' } ) } def __init__(self): - super().__init__() + super().__init__(SWHRepoFullRepresentation) self.next_task = self.config['next_task'] self.batch_size = self.next_task['batch_size'] self.task_destination = self.next_task.get('queue') self.destination = self.next_task['destination'] + def graph_walker(self): + return DummyGraphWalker() + + def prepare(self, origin_url, base_url=None): + """Only retrieve information about the origin, set everything else to + empty. + + """ + ori = converters.origin_url_to_origin(origin_url) + self.origin = self.storage.origin_get(ori) + self.origin_id = self.origin['id'] + self.base_occurrences = [] + self.base_origin_id = self.origin['id'] + def list_pack(self, pack_data, pack_size): - """Override list_pack to only keep blobs data. + """Override list_pack to only keep contents' sha1. + + Returns: + id_to_type (dict): keys are sha1, values are their associated type + type_to_ids (dict): keys are types, values are list of associated + ids (sha1 for blobs) """ id_to_type = {} type_to_ids = defaultdict(set) inflater = self.get_inflater() for obj in inflater: type, id = obj.type_name, obj.id if type != b'blob': # don't keep other types continue oid = hashutil.hex_to_hash(id.decode('utf-8')) id_to_type[oid] = type type_to_ids[type].add(oid) return id_to_type, type_to_ids def load(self, *args, **kwargs): - self.prepare(*args, **kwargs) - origin = self.get_origin() - self.origin_id = self.send_origin(origin) + """Override the loading part which simply reads the repository's + contents' sha1. + + Returns: + If the configuration holds a destination queue, send those + sha1s as batch of sha1s to it for consumption. Otherwise, + returns the list of discovered sha1s. + + """ + try: + self.prepare(*args, **kwargs) + except: + self.log.error('Unknown repository, skipping...') + return [] self.fetch_data() - data = self.id_to_type.keys() + data = self.type_to_ids[b'blob'] + if not self.task_destination: # to stdout - yield from data - return + return data from swh.scheduler.celery_backend.config import app try: # optional dependency from swh.storage.archiver import tasks # noqa except ImportError: pass from celery import group task_destination = app.tasks[self.task_destination] groups = [] for ids in utils.grouper(data, self.batch_size): sig_ids = task_destination.s(destination=self.destination, batch=list(ids)) groups.append(sig_ids) group(groups).delay() + return data @click.command() @click.option('--origin-url', help='Origin\'s url') -@click.option('--source', help='origin\'s source url (disk or remote)') +@click.option('--source', default=None, + help='origin\'s source url (disk or remote)') def main(origin_url, source): - import logging - logging.basicConfig( level=logging.DEBUG, format='%(asctime)s %(process)d %(message)s' ) - if source.startswith('/'): + local_reader = (source and source.startswith('/')) or origin_url.startswith('/') # noqa + + if local_reader: loader = GitSha1Reader() fetch_date = datetime.datetime.now(tz=datetime.timezone.utc) ids = loader.load(origin_url, source, fetch_date) else: loader = GitSha1RemoteReader() ids = loader.load(origin_url, source) - for oid in ids: - print(oid) + if ids: + count = 0 + for oid in ids: + print(oid) + count += 1 + print("sha1s: %s" % count) if __name__ == '__main__': main() diff --git a/swh/loader/git/tasks.py b/swh/loader/git/tasks.py index 752e6fc..6373378 100644 --- a/swh/loader/git/tasks.py +++ b/swh/loader/git/tasks.py @@ -1,53 +1,53 @@ -# Copyright (C) 2015 The Software Heritage developers +# Copyright (C) 2015-2016 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import dateutil.parser from swh.scheduler.task import Task from .loader import GitLoader from .updater import BulkUpdater from .reader import GitSha1RemoteReader # TODO: rename to LoadRemoteGitRepository class UpdateGitRepository(Task): """Import a git repository from a remote location""" task_queue = 'swh_loader_git' def run(self, repo_url, base_url=None): """Import a git repository""" loader = BulkUpdater() loader.log = self.log return loader.load(repo_url, base_url) class LoadDiskGitRepository(Task): """Import a git repository from disk""" task_queue = 'swh_loader_git' def run(self, origin_url, directory, date): """Import a git repository, cloned in `directory` from `origin_url` at `date`.""" loader = GitLoader() loader.log = self.log return loader.load(origin_url, directory, dateutil.parser.parse(date)) class ReaderGitRepository(Task): task_queue = 'swh_reader_git' def run(self, repo_url, base_url=None): """Read a git repository from a remote location and send sha1 to archival. """ loader = GitSha1RemoteReader() loader.log = self.log - loader.load(repo_url, base_url) + return loader.load(repo_url) diff --git a/swh/loader/git/updater.py b/swh/loader/git/updater.py index ba4ac95..12d9ce8 100644 --- a/swh/loader/git/updater.py +++ b/swh/loader/git/updater.py @@ -1,422 +1,437 @@ # Copyright (C) 2016 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import datetime from io import BytesIO import logging import os import pickle import sys from collections import defaultdict import dulwich.client from dulwich.object_store import ObjectStoreGraphWalker from dulwich.pack import PackData, PackInflater from urllib.parse import urlparse from swh.core import hashutil from . import base, converters class SWHRepoRepresentation: """Repository representation for a Software Heritage origin.""" def __init__(self, storage, origin_id, occurrences=None): self.storage = storage self._parents_cache = {} self._type_cache = {} if origin_id: self.heads = set(self._cache_heads(origin_id, occurrences)) else: self.heads = set() def _fill_parents_cache(self, commit): """When querying for a commit's parents, we fill the cache to a depth of 100 commits.""" root_rev = hashutil.bytehex_to_hash(commit) for rev, parents in self.storage.revision_shortlog([root_rev], 100): rev_id = hashutil.hash_to_bytehex(rev) if rev_id not in self._parents_cache: self._parents_cache[rev_id] = [ hashutil.hash_to_bytehex(parent) for parent in parents ] def _cache_heads(self, origin_id, occurrences): """Return all the known head commits for `origin_id`""" if not occurrences: occurrences = self.storage.occurrence_get(origin_id) return self._decode_from_storage( occurrence['target'] for occurrence in occurrences ) def get_parents(self, commit): """get the parent commits for `commit`""" if commit not in self._parents_cache: self._fill_parents_cache(commit) return self._parents_cache.get(commit, []) def get_heads(self): return self.heads @staticmethod def _encode_for_storage(objects): return [hashutil.bytehex_to_hash(object) for object in objects] @staticmethod def _decode_from_storage(objects): return set(hashutil.hash_to_bytehex(object) for object in objects) def graph_walker(self): return ObjectStoreGraphWalker(self.get_heads(), self.get_parents) @staticmethod def filter_unwanted_refs(refs): """Filter the unwanted references from refs""" ret = {} for ref, val in refs.items(): if ref.endswith(b'^{}'): # Peeled refs make the git protocol explode continue elif ref.startswith(b'refs/pull/') and ref.endswith(b'/merge'): # We filter-out auto-merged GitHub pull requests continue else: ret[ref] = val return ret def determine_wants(self, refs): """Filter the remote references to figure out which ones Software Heritage needs. """ if not refs: return [] # Find what objects Software Heritage has refs = self.find_remote_ref_types_in_swh(refs) # Cache the objects found in swh as existing heads for target in refs.values(): if target['target_type'] is not None: self.heads.add(target['target']) ret = set() for target in self.filter_unwanted_refs(refs).values(): if target['target_type'] is None: # The target doesn't exist in Software Heritage, let's retrieve # it. ret.add(target['target']) return list(ret) def get_stored_objects(self, objects): return self.storage.object_find_by_sha1_git( self._encode_for_storage(objects)) def find_remote_ref_types_in_swh(self, remote_refs): """Parse the remote refs information and list the objects that exist in Software Heritage. """ all_objs = set(remote_refs.values()) - set(self._type_cache) type_by_id = {} for id, objs in self.get_stored_objects(all_objs).items(): id = hashutil.hash_to_bytehex(id) if objs: type_by_id[id] = objs[0]['type'] self._type_cache.update(type_by_id) ret = {} for ref, id in remote_refs.items(): ret[ref] = { 'target': id, 'target_type': self._type_cache.get(id), } return ret class BulkUpdater(base.BaseLoader): """A bulk loader for a git repository""" CONFIG_BASE_FILENAME = 'loader/git-updater.ini' ADDITIONAL_CONFIG = { 'pack_size_bytes': ('int', 4 * 1024 * 1024 * 1024), 'pack_storage_base': ('str', ''), } + def __init__(self, repo_representation=SWHRepoRepresentation): + """Initialize the bulk updater. + + Args: + repo_representation: swh's repository representation + which is in charge of filtering between known and remote + data. + + """ + super().__init__() + self.repo_representation = repo_representation + def store_pack_and_refs(self, pack_buffer, remote_refs): """Store a pack for archival""" write_size = 8192 origin_id = "%08d" % self.origin_id year = str(self.fetch_date.year) pack_dir = os.path.join( self.config['pack_storage_base'], origin_id, year, ) pack_name = "%s.pack" % self.fetch_date.isoformat() refs_name = "%s.refs" % self.fetch_date.isoformat() os.makedirs(pack_dir, exist_ok=True) with open(os.path.join(pack_dir, pack_name), 'xb') as f: while True: r = pack_buffer.read(write_size) if not r: break f.write(r) pack_buffer.seek(0) with open(os.path.join(pack_dir, refs_name), 'xb') as f: pickle.dump(remote_refs, f) def fetch_pack_from_origin(self, origin_url, base_origin_id, base_occurrences, do_activity): """Fetch a pack from the origin""" pack_buffer = BytesIO() - base_repo = SWHRepoRepresentation(self.storage, base_origin_id, - base_occurrences) + base_repo = self.repo_representation(self.storage, base_origin_id, + base_occurrences) parsed_uri = urlparse(origin_url) path = parsed_uri.path if not path.endswith('.git'): path += '.git' client = dulwich.client.TCPGitClient(parsed_uri.netloc, thin_packs=False) size_limit = self.config['pack_size_bytes'] - def do_pack(data, pack_buffer=pack_buffer, limit=size_limit): + def do_pack(data, + pack_buffer=pack_buffer, + limit=size_limit, + origin_url=origin_url): cur_size = pack_buffer.tell() would_write = len(data) if cur_size + would_write > limit: raise IOError('Pack file too big for repository %s, ' 'limit is %d bytes, current size is %d, ' 'would write %d' % (origin_url, limit, cur_size, would_write)) pack_buffer.write(data) remote_refs = client.fetch_pack(path.encode('ascii'), base_repo.determine_wants, base_repo.graph_walker(), do_pack, progress=do_activity) if remote_refs: local_refs = base_repo.find_remote_ref_types_in_swh(remote_refs) else: local_refs = remote_refs = {} pack_buffer.flush() pack_size = pack_buffer.tell() pack_buffer.seek(0) if self.config['pack_storage_base']: self.store_pack_and_refs(pack_buffer, remote_refs) return { 'remote_refs': base_repo.filter_unwanted_refs(remote_refs), 'local_refs': local_refs, 'pack_buffer': pack_buffer, 'pack_size': pack_size, } def list_pack(self, pack_data, pack_size): id_to_type = {} type_to_ids = defaultdict(set) inflater = self.get_inflater() for obj in inflater: type, id = obj.type_name, obj.id id_to_type[id] = type type_to_ids[type].add(id) return id_to_type, type_to_ids def prepare(self, origin_url, base_url=None): origin = converters.origin_url_to_origin(origin_url) base_origin = converters.origin_url_to_origin(base_url) base_occurrences = [] base_origin_id = origin_id = None db_origin = self.storage.origin_get(origin) if db_origin: base_origin_id = origin_id = db_origin['id'] if origin_id: base_occurrences = self.storage.occurrence_get(origin_id) if base_url and not base_occurrences: base_origin = self.storage.origin_get(base_origin) if base_origin: base_origin_id = base_origin['id'] base_occurrences = self.storage.occurrence_get(base_origin_id) self.base_occurrences = list(sorted(base_occurrences, key=lambda occ: occ['branch'])) self.base_origin_id = base_origin_id self.origin = origin def get_origin(self): return self.origin def fetch_data(self): def do_progress(msg): sys.stderr.buffer.write(msg) sys.stderr.flush() self.fetch_date = datetime.datetime.now(tz=datetime.timezone.utc) fetch_info = self.fetch_pack_from_origin( self.origin['url'], self.base_origin_id, self.base_occurrences, do_progress) self.pack_buffer = fetch_info['pack_buffer'] self.pack_size = fetch_info['pack_size'] self.remote_refs = fetch_info['remote_refs'] self.local_refs = fetch_info['local_refs'] origin_url = self.origin['url'] self.log.info('Listed %d refs for repo %s' % ( len(self.remote_refs), origin_url), extra={ 'swh_type': 'git_repo_list_refs', 'swh_repo': origin_url, 'swh_num_refs': len(self.remote_refs), }) # We want to load the repository, walk all the objects id_to_type, type_to_ids = self.list_pack(self.pack_buffer, self.pack_size) self.id_to_type = id_to_type self.type_to_ids = type_to_ids def get_inflater(self): """Reset the pack buffer and get an object inflater from it""" self.pack_buffer.seek(0) return PackInflater.for_pack_data( PackData.from_file(self.pack_buffer, self.pack_size)) def has_contents(self): return bool(self.type_to_ids[b'blob']) def get_contents(self): """Format the blobs from the git repository as swh contents""" max_content_size = self.config['content_size_limit'] for raw_obj in self.get_inflater(): if raw_obj.type_name != b'blob': continue yield converters.dulwich_blob_to_content( raw_obj, log=self.log, max_content_size=max_content_size, origin_id=self.origin_id) def has_directories(self): return bool(self.type_to_ids[b'tree']) def get_directories(self): """Format the trees as swh directories""" for raw_obj in self.get_inflater(): if raw_obj.type_name != b'tree': continue yield converters.dulwich_tree_to_directory(raw_obj, log=self.log) def has_revisions(self): return bool(self.type_to_ids[b'commit']) def get_revisions(self): """Format commits as swh revisions""" for raw_obj in self.get_inflater(): if raw_obj.type_name != b'commit': continue yield converters.dulwich_commit_to_revision(raw_obj, log=self.log) def has_releases(self): return bool(self.type_to_ids[b'tag']) def get_releases(self): """Retrieve all the release objects from the git repository""" for raw_obj in self.get_inflater(): if raw_obj.type_name != b'tag': continue yield converters.dulwich_tag_to_release(raw_obj, log=self.log) def has_occurrences(self): return bool(self.remote_refs) def get_occurrences(self): origin_id = self.origin_id visit = self.visit ret = [] for ref in self.remote_refs: ret_ref = self.local_refs[ref].copy() ret_ref.update({ 'branch': ref, 'origin': origin_id, 'visit': visit, }) if not ret_ref['target_type']: target_type = self.id_to_type[ret_ref['target']] ret_ref['target_type'] = converters.DULWICH_TYPES[target_type] ret_ref['target'] = hashutil.bytehex_to_hash(ret_ref['target']) ret.append(ret_ref) return ret def get_fetch_history_result(self): return { 'contents': len(self.type_to_ids[b'blob']), 'directories': len(self.type_to_ids[b'tree']), 'revisions': len(self.type_to_ids[b'commit']), 'releases': len(self.type_to_ids[b'tag']), 'occurrences': len(self.remote_refs), } def eventful(self): """The load was eventful if the current occurrences are different to the ones we retrieved at the beginning of the run""" current_occurrences = list(sorted( self.storage.occurrence_get(self.origin_id), key=lambda occ: occ['branch'], )) return self.base_occurrences != current_occurrences if __name__ == '__main__': logging.basicConfig( level=logging.DEBUG, format='%(asctime)s %(process)d %(message)s' ) bulkupdater = BulkUpdater() origin_url = sys.argv[1] base_url = origin_url if len(sys.argv) > 2: base_url = sys.argv[2] print(bulkupdater.load(origin_url, base_url)) diff --git a/version.txt b/version.txt index 3ac068d..9f5f749 100644 --- a/version.txt +++ b/version.txt @@ -1 +1 @@ -v0.0.20-0-g86928fa \ No newline at end of file +v0.0.21-0-g370f17a \ No newline at end of file