diff --git a/debian/control b/debian/control index 5d78de5..a8d9566 100644 --- a/debian/control +++ b/debian/control @@ -1,28 +1,28 @@ Source: swh-loader-git Maintainer: Software Heritage developers Section: python Priority: optional Build-Depends: debhelper (>= 9), dh-python, python3-all, python3-nose, python3-dulwich, python3-retrying, python3-setuptools, python3-click, python3-swh.core (>= 0.0.7~), - python3-swh.model (>= 0.0.3~), + python3-swh.model (>= 0.0.14~), python3-swh.scheduler, python3-swh.storage (>= 0.0.76~), python3-vcversioner Standards-Version: 3.9.6 Homepage: https://forge.softwareheritage.org/diffusion/DLDG/ Package: python3-swh.loader.git Architecture: all Depends: python3-swh.core (>= 0.0.7~), python3-swh.storage (>= 0.0.76~), python3-swh.model (>= 0.0.3~), ${misc:Depends}, ${python3:Depends} Description: Software Heritage Git loader diff --git a/requirements-swh.txt b/requirements-swh.txt index 9c3c311..498ea27 100644 --- a/requirements-swh.txt +++ b/requirements-swh.txt @@ -1,4 +1,4 @@ swh.core >= 0.0.7 -swh.model >= 0.0.3 +swh.model >= 0.0.14 swh.scheduler swh.storage >= 0.0.76 diff --git a/swh/loader/git/converters.py b/swh/loader/git/converters.py index b84116c..9161925 100644 --- a/swh/loader/git/converters.py +++ b/swh/loader/git/converters.py @@ -1,216 +1,216 @@ -# Copyright (C) 2015 The Software Heritage developers +# Copyright (C) 2015-2017 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information """Convert dulwich objects to dictionaries suitable for swh.storage""" -from swh.core import hashutil +from swh.model import hashutil HASH_ALGORITHMS = hashutil.ALGORITHMS - {'sha1_git'} def origin_url_to_origin(origin_url): """Format a pygit2.Repository as an origin suitable for swh.storage""" return { 'type': 'git', 'url': origin_url, } def dulwich_blob_to_content(blob, log=None, max_content_size=None, origin_id=None): """Convert a dulwich blob to a Software Heritage content""" if blob.type_name != b'blob': return size = blob.raw_length() ret = { 'sha1_git': blob.sha().digest(), 'length': size, 'status': 'absent' } if max_content_size: if size > max_content_size: id = hashutil.hash_to_hex(ret['sha1_git']) if log: log.info('Skipping content %s, too large (%s > %s)' % (id, size, max_content_size), extra={ 'swh_type': 'loader_git_content_skip', 'swh_id': id, 'swh_size': size, }) ret['reason'] = 'Content too large' ret['origin'] = origin_id return ret data = blob.as_raw_string() - hashes = hashutil.hashdata(data, HASH_ALGORITHMS) + hashes = hashutil.hash_data(data, HASH_ALGORITHMS) ret.update(hashes) ret['data'] = data ret['status'] = 'visible' return ret def dulwich_tree_to_directory(tree, log=None): """Format a tree as a directory""" if tree.type_name != b'tree': return ret = { 'id': tree.sha().digest(), } entries = [] ret['entries'] = entries entry_mode_map = { 0o040000: 'dir', 0o160000: 'rev', 0o100644: 'file', 0o100755: 'file', 0o120000: 'file', } for entry in tree.iteritems(): entries.append({ 'type': entry_mode_map.get(entry.mode, 'file'), 'perms': entry.mode, 'name': entry.path, - 'target': hashutil.hex_to_hash(entry.sha.decode('ascii')), + 'target': hashutil.hash_to_bytes(entry.sha.decode('ascii')), }) return ret def parse_author(name_email): """Parse an author line""" if name_email is None: return None try: open_bracket = name_email.index(b'<') except ValueError: name = email = None else: raw_name = name_email[:open_bracket] raw_email = name_email[open_bracket+1:] if not raw_name: name = None elif raw_name.endswith(b' '): name = raw_name[:-1] else: name = raw_name try: close_bracket = raw_email.index(b'>') except ValueError: email = None else: email = raw_email[:close_bracket] return { 'name': name, 'email': email, 'fullname': name_email, } def dulwich_tsinfo_to_timestamp(timestamp, timezone, timezone_neg_utc): """Convert the dulwich timestamp information to a structure compatible with Software Heritage""" return { 'timestamp': timestamp, 'offset': timezone // 60, 'negative_utc': timezone_neg_utc if timezone == 0 else None, } def dulwich_commit_to_revision(commit, log=None): if commit.type_name != b'commit': return ret = { 'id': commit.sha().digest(), 'author': parse_author(commit.author), 'date': dulwich_tsinfo_to_timestamp( commit.author_time, commit.author_timezone, commit._author_timezone_neg_utc, ), 'committer': parse_author(commit.committer), 'committer_date': dulwich_tsinfo_to_timestamp( commit.commit_time, commit.commit_timezone, commit._commit_timezone_neg_utc, ), 'type': 'git', 'directory': bytes.fromhex(commit.tree.decode()), 'message': commit.message, 'metadata': None, 'synthetic': False, 'parents': [bytes.fromhex(p.decode()) for p in commit.parents], } git_metadata = [] if commit.encoding is not None: git_metadata.append(['encoding', commit.encoding]) if commit.mergetag: for mergetag in commit.mergetag: raw_string = mergetag.as_raw_string() assert raw_string.endswith(b'\n') git_metadata.append(['mergetag', raw_string[:-1]]) if commit.extra: git_metadata.extend([k.decode('utf-8'), v] for k, v in commit.extra) if commit.gpgsig: git_metadata.append(['gpgsig', commit.gpgsig]) if git_metadata: ret['metadata'] = { 'extra_headers': git_metadata, } return ret DULWICH_TYPES = { b'blob': 'content', b'tree': 'directory', b'commit': 'revision', b'tag': 'release', } def dulwich_tag_to_release(tag, log=None): if tag.type_name != b'tag': return target_type, target = tag.object ret = { 'id': tag.sha().digest(), 'name': tag.name, 'target': bytes.fromhex(target.decode()), 'target_type': DULWICH_TYPES[target_type.type_name], 'message': tag._message, 'metadata': None, 'synthetic': False, } if tag.tagger: ret['author'] = parse_author(tag.tagger) ret['date'] = dulwich_tsinfo_to_timestamp( tag.tag_time, tag.tag_timezone, tag._tag_timezone_neg_utc, ) else: ret['author'] = ret['date'] = None return ret diff --git a/swh/loader/git/loader.py b/swh/loader/git/loader.py index 76be640..498f312 100644 --- a/swh/loader/git/loader.py +++ b/swh/loader/git/loader.py @@ -1,185 +1,185 @@ # Copyright (C) 2015-2017 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import datetime import dulwich.repo import os import shutil from collections import defaultdict -from swh.core import hashutil +from swh.model import hashutil from . import base, converters, utils class GitLoader(base.BaseLoader): """Load a git repository from a directory. """ CONFIG_BASE_FILENAME = 'loader/git-loader' def prepare(self, origin_url, directory, fetch_date): self.origin_url = origin_url self.repo = dulwich.repo.Repo(directory) self.fetch_date = fetch_date def get_origin(self): """Get the origin that is currently being loaded""" return converters.origin_url_to_origin(self.origin_url) def iter_objects(self): object_store = self.repo.object_store for pack in object_store.packs: objs = list(pack.index.iterentries()) objs.sort(key=lambda x: x[1]) for sha, offset, crc32 in objs: yield hashutil.hash_to_bytehex(sha) yield from object_store._iter_loose_objects() yield from object_store._iter_alternate_objects() def fetch_data(self): """Fetch the data from the data source""" type_to_ids = defaultdict(list) for oid in self.iter_objects(): type_name = self.repo[oid].type_name type_to_ids[type_name].append(oid) self.type_to_ids = type_to_ids def has_contents(self): """Checks whether we need to load contents""" return bool(self.type_to_ids[b'blob']) def get_contents(self): """Get the contents that need to be loaded""" max_content_size = self.config['content_size_limit'] for oid in self.type_to_ids[b'blob']: yield converters.dulwich_blob_to_content( self.repo[oid], log=self.log, max_content_size=max_content_size, origin_id=self.origin_id) def has_directories(self): """Checks whether we need to load directories""" return bool(self.type_to_ids[b'tree']) def get_directories(self): """Get the directories that need to be loaded""" for oid in self.type_to_ids[b'tree']: yield converters.dulwich_tree_to_directory( self.repo[oid], log=self.log) def has_revisions(self): """Checks whether we need to load revisions""" return bool(self.type_to_ids[b'commit']) def get_revisions(self): """Get the revisions that need to be loaded""" for oid in self.type_to_ids[b'commit']: yield converters.dulwich_commit_to_revision( self.repo[oid], log=self.log) def has_releases(self): """Checks whether we need to load releases""" return bool(self.type_to_ids[b'tag']) def get_releases(self): """Get the releases that need to be loaded""" for oid in self.type_to_ids[b'tag']: yield converters.dulwich_tag_to_release( self.repo[oid], log=self.log) def has_occurrences(self): """Checks whether we need to load occurrences""" return True def get_occurrences(self): """Get the occurrences that need to be loaded""" repo = self.repo origin_id = self.origin_id visit = self.visit for ref, target in repo.refs.as_dict().items(): target_type_name = repo[target].type_name target_type = converters.DULWICH_TYPES[target_type_name] yield { 'branch': ref, 'origin': origin_id, 'target': hashutil.bytehex_to_hash(target), 'target_type': target_type, 'visit': visit, } def get_fetch_history_result(self): """Return the data to store in fetch_history for the current loader""" return { 'contents': len(self.type_to_ids[b'blob']), 'directories': len(self.type_to_ids[b'tree']), 'revisions': len(self.type_to_ids[b'commit']), 'releases': len(self.type_to_ids[b'tag']), 'occurrences': len(self.repo.refs.allkeys()), } def save_data(self): """We already have the data locally, no need to save it""" pass def eventful(self): """Whether the load was eventful""" return True class GitLoaderFromArchive(GitLoader): """Load a git repository from an archive. """ def project_name_from_archive(self, archive_path): """Compute the project name from the archive's path. """ return os.path.basename(os.path.dirname(archive_path)) def prepare(self, origin_url, archive_path, fetch_date): """1. Uncompress the archive in temporary location. 2. Prepare as the GitLoader does 3. Load as GitLoader does """ project_name = self.project_name_from_archive(archive_path) self.temp_dir, self.repo_path = utils.init_git_repo_from_archive( project_name, archive_path) self.log.info('Project %s - Uncompressing archive %s at %s' % ( origin_url, os.path.basename(archive_path), self.repo_path)) super().prepare(origin_url, self.repo_path, fetch_date) def cleanup(self): """Cleanup the temporary location (if it exists). """ if self.temp_dir and os.path.exists(self.temp_dir): shutil.rmtree(self.temp_dir) self.log.info('Project %s - Done injecting %s' % ( self.origin_url, self.repo_path)) if __name__ == '__main__': import logging import sys logging.basicConfig( level=logging.DEBUG, format='%(asctime)s %(process)d %(message)s' ) loader = GitLoader() origin_url = sys.argv[1] directory = sys.argv[2] fetch_date = datetime.datetime.now(tz=datetime.timezone.utc) print(loader.load(origin_url, directory, fetch_date)) diff --git a/swh/loader/git/reader.py b/swh/loader/git/reader.py index 6646e63..7f717c1 100644 --- a/swh/loader/git/reader.py +++ b/swh/loader/git/reader.py @@ -1,255 +1,256 @@ -# Copyright (C) 2016 The Software Heritage developers +# Copyright (C) 2016-2017 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from collections import defaultdict import logging import pprint import click -from swh.core import hashutil, utils +from swh.core import utils +from swh.model import hashutil from .updater import BulkUpdater, SWHRepoRepresentation from . import converters class SWHRepoFullRepresentation(SWHRepoRepresentation): """Overridden representation of a swh repository to permit to read completely the remote repository. """ def __init__(self, storage, origin_id, occurrences=None): self.storage = storage self._parents_cache = {} self._type_cache = {} self.heads = set() def determine_wants(self, refs): """Filter the remote references to figure out which ones Software Heritage needs. In this particular context, we want to know everything. """ if not refs: return [] for target in refs.values(): self.heads.add(target) return self.filter_unwanted_refs(refs).values() def find_remote_ref_types_in_swh(self, remote_refs): """Find the known swh remote. In that particular context, we know nothing. """ return {} class DummyGraphWalker(object): """Dummy graph walker which claims that the client doesn’t have any objects. """ def ack(self, sha): pass def next(self): pass def __next__(self): pass class BaseGitRemoteReader(BulkUpdater): CONFIG_BASE_FILENAME = 'loader/git-remote-reader' ADDITIONAL_CONFIG = { 'pack_size_bytes': ('int', 4 * 1024 * 1024 * 1024), 'pack_storage_base': ('str', ''), # don't want to store packs so empty 'next_task': ( 'dict', { 'queue': 'swh.storage.archiver.tasks.SWHArchiverToBackendTask', 'batch_size': 100, 'destination': 'azure' } ) } def __init__(self): super().__init__(SWHRepoFullRepresentation) self.next_task = self.config['next_task'] self.batch_size = self.next_task['batch_size'] self.task_destination = self.next_task['queue'] self.destination = self.next_task['destination'] def graph_walker(self): return DummyGraphWalker() def prepare(self, origin_url, base_url=None): """Only retrieve information about the origin, set everything else to empty. """ self.origin = converters.origin_url_to_origin(origin_url) self.origin_id = 0 self.base_occurrences = [] self.base_origin_id = 0 def keep_object(self, obj): """Do we want to keep this object or not?""" raise NotImplementedError('Please implement keep_object') def get_id_and_data(self, obj): """Get the id, type and data of the given object""" raise NotImplementedError('Please implement get_id_and_data') def list_pack(self, pack_data, pack_size): """Override list_pack to only keep contents' sha1. Returns: id_to_type (dict): keys are sha1, values are their associated type type_to_ids (dict): keys are types, values are list of associated data (sha1 for blobs) """ self.data = {} id_to_type = {} type_to_ids = defaultdict(set) inflater = self.get_inflater() for obj in inflater: if not self.keep_object(obj): continue object_id, type, data = self.get_id_and_data(obj) id_to_type[object_id] = type type_to_ids[type].add(object_id) self.data[object_id] = data return id_to_type, type_to_ids def load(self, *args, **kwargs): """Override the loading part which simply reads the repository's contents' sha1. Returns: Returns the list of discovered sha1s for that origin. """ self.prepare(*args, **kwargs) self.fetch_data() class GitSha1RemoteReader(BaseGitRemoteReader): """Read sha1 git from a remote repository and dump only repository's content sha1 as list. """ def keep_object(self, obj): """Only keep blobs""" return obj.type_name == b'blob' def get_id_and_data(self, obj): """We want to store only object identifiers""" # compute the sha1 (obj.id is the sha1_git) data = obj.as_raw_string() - hashes = hashutil.hashdata(data, {'sha1'}) + hashes = hashutil.hash_data(data, {'sha1'}) oid = hashes['sha1'] return (oid, b'blob', oid) class GitSha1RemoteReaderAndSendToQueue(GitSha1RemoteReader): """Read sha1 git from a remote repository and dump only repository's content sha1 as list and send batch of those sha1s to a celery queue for consumption. """ def load(self, *args, **kwargs): """Retrieve the list of sha1s for a particular origin and send those sha1s as group of sha1s to a specific queue. """ super().load(*args, **kwargs) data = self.type_to_ids[b'blob'] from swh.scheduler.celery_backend.config import app try: # optional dependency from swh.storage.archiver import tasks # noqa except ImportError: pass from celery import group task_destination = app.tasks[self.task_destination] groups = [] for ids in utils.grouper(data, self.batch_size): sig_ids = task_destination.s(destination=self.destination, batch=list(ids)) groups.append(sig_ids) group(groups).delay() return data class GitCommitRemoteReader(BaseGitRemoteReader): def keep_object(self, obj): return obj.type_name == b'commit' def get_id_and_data(self, obj): return obj.id, b'commit', converters.dulwich_commit_to_revision(obj) def load(self, *args, **kwargs): super().load(*args, **kwargs) return self.data @click.group() @click.option('--origin-url', help='Origin url') @click.pass_context def main(ctx, origin_url): logging.basicConfig( level=logging.DEBUG, format='%(asctime)s %(process)d %(message)s' ) ctx.obj['origin_url'] = origin_url @main.command() @click.option('--send/--nosend', default=False, help='Origin\'s url') @click.pass_context def blobs(ctx, send): origin_url = ctx.obj['origin_url'] if send: loader = GitSha1RemoteReaderAndSendToQueue() ids = loader.load(origin_url) print('%s sha1s were sent to queue' % len(ids)) return loader = GitSha1RemoteReader() ids = loader.load(origin_url) if ids: for oid in ids: print(hashutil.hash_to_hex(oid)) @main.command() @click.option('--ids-only', is_flag=True, help='print ids only') @click.pass_context def commits(ctx, ids_only): origin_url = ctx.obj['origin_url'] reader = GitCommitRemoteReader() commits = reader.load(origin_url) for commit_id, commit in commits.items(): if ids_only: print(commit_id.decode()) else: pprint.pprint(commit) if __name__ == '__main__': main(obj={}) diff --git a/swh/loader/git/tests/test_converters.py b/swh/loader/git/tests/test_converters.py index fddf249..3b37cbb 100644 --- a/swh/loader/git/tests/test_converters.py +++ b/swh/loader/git/tests/test_converters.py @@ -1,170 +1,170 @@ -# Copyright (C) 2015 The Software Heritage developers +# Copyright (C) 2015-2017 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import os import shutil import subprocess import tempfile import unittest from nose.tools import istest import dulwich.repo import swh.loader.git.converters as converters -from swh.core.hashutil import bytehex_to_hash, hex_to_hash +from swh.model.hashutil import bytehex_to_hash, hash_to_bytes class TestConverters(unittest.TestCase): @classmethod def setUpClass(cls): super().setUpClass() cls.repo_path = tempfile.mkdtemp() cls.repo = dulwich.repo.Repo.init_bare(cls.repo_path) fast_export = os.path.join(os.path.dirname(__file__), '../../../../..', 'swh-storage-testdata', 'git-repos', 'example-submodule.fast-export.xz') xz = subprocess.Popen( ['xzcat'], stdin=open(fast_export, 'rb'), stdout=subprocess.PIPE, ) git = subprocess.Popen( ['git', 'fast-import', '--quiet'], stdin=xz.stdout, cwd=cls.repo_path, ) # flush stdout of xz xz.stdout.close() git.communicate() @classmethod def tearDownClass(cls): super().tearDownClass() shutil.rmtree(cls.repo_path) print(cls.repo_path) def setUp(self): super().setUp() self.blob_id = b'28c6f4023d65f74e3b59a2dea3c4277ed9ee07b0' self.blob = { 'sha1_git': bytehex_to_hash(self.blob_id), - 'sha1': hex_to_hash('4850a3420a2262ff061cb296fb915430fa92301c'), - 'sha256': hex_to_hash('fee7c8a485a10321ad94b64135073cb5' - '5f22cb9f57fa2417d2adfb09d310adef'), + 'sha1': hash_to_bytes('4850a3420a2262ff061cb296fb915430fa92301c'), + 'sha256': hash_to_bytes('fee7c8a485a10321ad94b64135073cb5' + '5f22cb9f57fa2417d2adfb09d310adef'), 'data': (b'[submodule "example-dependency"]\n' b'\tpath = example-dependency\n' b'\turl = https://github.com/githubtraining/' b'example-dependency.git\n'), 'length': 124, 'status': 'visible', } self.blob_hidden = { 'sha1_git': bytehex_to_hash(self.blob_id), 'length': 124, 'status': 'absent', 'reason': 'Content too large', 'origin': None, } @istest def blob_to_content(self): content = converters.dulwich_blob_to_content(self.repo[self.blob_id]) self.assertEqual(self.blob, content) @istest def blob_to_content_absent(self): max_length = self.blob['length'] - 1 content = converters.dulwich_blob_to_content( self.repo[self.blob_id], max_content_size=max_length) self.assertEqual(self.blob_hidden, content) @istest def commit_to_revision(self): sha1 = b'9768d0b576dbaaecd80abedad6dfd0d72f1476da' revision = converters.dulwich_commit_to_revision(self.repo[sha1]) expected_revision = { - 'id': hex_to_hash('9768d0b576dbaaecd80abedad6dfd0d72f1476da'), + 'id': hash_to_bytes('9768d0b576dbaaecd80abedad6dfd0d72f1476da'), 'directory': b'\xf0i\\./\xa7\xce\x9dW@#\xc3A7a\xa4s\xe5\x00\xca', 'type': 'git', 'committer': { 'name': b'Stefano Zacchiroli', 'fullname': b'Stefano Zacchiroli ', 'email': b'zack@upsilon.cc', }, 'author': { 'name': b'Stefano Zacchiroli', 'fullname': b'Stefano Zacchiroli ', 'email': b'zack@upsilon.cc', }, 'committer_date': { 'negative_utc': None, 'timestamp': 1443083765, 'offset': 120, }, 'message': b'add submodule dependency\n', 'metadata': None, 'date': { 'negative_utc': None, 'timestamp': 1443083765, 'offset': 120, }, 'parents': [ b'\xc3\xc5\x88q23`\x9f[\xbb\xb2\xd9\xe7\xf3\xfbJf\x0f?r' ], 'synthetic': False, } self.assertEquals(revision, expected_revision) @istest def author_line_to_author(self): tests = { b'a ': { 'name': b'a', 'email': b'b@c.com', 'fullname': b'a ', }, b'': { 'name': None, 'email': b'foo@bar.com', 'fullname': b'', }, b'malformed ': { 'name': b'trailing', 'email': b'sp@c.e', 'fullname': b'trailing ', }, b'no': { 'name': b'no', 'email': b'sp@c.e', 'fullname': b'no', }, b' <>': { 'name': b'', 'email': b'', 'fullname': b' <>', }, } for author in sorted(tests): parsed_author = tests[author] self.assertEquals(parsed_author, converters.parse_author(author)) diff --git a/swh/loader/git/updater.py b/swh/loader/git/updater.py index d925e0a..7c6e4ac 100644 --- a/swh/loader/git/updater.py +++ b/swh/loader/git/updater.py @@ -1,425 +1,425 @@ # Copyright (C) 2016-2017 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from io import BytesIO import datetime import logging import os import pickle import sys from collections import defaultdict import dulwich.client from dulwich.object_store import ObjectStoreGraphWalker from dulwich.pack import PackData, PackInflater from urllib.parse import urlparse -from swh.core import hashutil +from swh.model import hashutil from . import base, converters class SWHRepoRepresentation: """Repository representation for a Software Heritage origin.""" def __init__(self, storage, origin_id, occurrences=None): self.storage = storage self._parents_cache = {} self._type_cache = {} if origin_id: self.heads = set(self._cache_heads(origin_id, occurrences)) else: self.heads = set() def _fill_parents_cache(self, commit): """When querying for a commit's parents, we fill the cache to a depth of 100 commits.""" root_rev = hashutil.bytehex_to_hash(commit) for rev, parents in self.storage.revision_shortlog([root_rev], 100): rev_id = hashutil.hash_to_bytehex(rev) if rev_id not in self._parents_cache: self._parents_cache[rev_id] = [ hashutil.hash_to_bytehex(parent) for parent in parents ] def _cache_heads(self, origin_id, occurrences): """Return all the known head commits for `origin_id`""" if not occurrences: occurrences = self.storage.occurrence_get(origin_id) return self._decode_from_storage( occurrence['target'] for occurrence in occurrences ) def get_parents(self, commit): """get the parent commits for `commit`""" if commit not in self._parents_cache: self._fill_parents_cache(commit) return self._parents_cache.get(commit, []) def get_heads(self): return self.heads @staticmethod def _encode_for_storage(objects): return [hashutil.bytehex_to_hash(object) for object in objects] @staticmethod def _decode_from_storage(objects): return set(hashutil.hash_to_bytehex(object) for object in objects) def graph_walker(self): return ObjectStoreGraphWalker(self.get_heads(), self.get_parents) @staticmethod def filter_unwanted_refs(refs): """Filter the unwanted references from refs""" ret = {} for ref, val in refs.items(): if ref.endswith(b'^{}'): # Peeled refs make the git protocol explode continue elif ref.startswith(b'refs/pull/') and ref.endswith(b'/merge'): # We filter-out auto-merged GitHub pull requests continue else: ret[ref] = val return ret def determine_wants(self, refs): """Filter the remote references to figure out which ones Software Heritage needs. """ if not refs: return [] # Find what objects Software Heritage has refs = self.find_remote_ref_types_in_swh(refs) # Cache the objects found in swh as existing heads for target in refs.values(): if target['target_type'] is not None: self.heads.add(target['target']) ret = set() for target in self.filter_unwanted_refs(refs).values(): if target['target_type'] is None: # The target doesn't exist in Software Heritage, let's retrieve # it. ret.add(target['target']) return list(ret) def get_stored_objects(self, objects): return self.storage.object_find_by_sha1_git( self._encode_for_storage(objects)) def find_remote_ref_types_in_swh(self, remote_refs): """Parse the remote refs information and list the objects that exist in Software Heritage. """ all_objs = set(remote_refs.values()) - set(self._type_cache) type_by_id = {} for id, objs in self.get_stored_objects(all_objs).items(): id = hashutil.hash_to_bytehex(id) if objs: type_by_id[id] = objs[0]['type'] self._type_cache.update(type_by_id) ret = {} for ref, id in remote_refs.items(): ret[ref] = { 'target': id, 'target_type': self._type_cache.get(id), } return ret class BulkUpdater(base.BaseLoader): """A bulk loader for a git repository""" CONFIG_BASE_FILENAME = 'loader/git-updater' ADDITIONAL_CONFIG = { 'pack_size_bytes': ('int', 4 * 1024 * 1024 * 1024), } def __init__(self, repo_representation=SWHRepoRepresentation): """Initialize the bulk updater. Args: repo_representation: swh's repository representation which is in charge of filtering between known and remote data. """ super().__init__() self.repo_representation = repo_representation def fetch_pack_from_origin(self, origin_url, base_origin_id, base_occurrences, do_activity): """Fetch a pack from the origin""" pack_buffer = BytesIO() base_repo = self.repo_representation(self.storage, base_origin_id, base_occurrences) parsed_uri = urlparse(origin_url) path = parsed_uri.path if not path.endswith('.git'): path += '.git' client = dulwich.client.TCPGitClient(parsed_uri.netloc, thin_packs=False) size_limit = self.config['pack_size_bytes'] def do_pack(data, pack_buffer=pack_buffer, limit=size_limit, origin_url=origin_url): cur_size = pack_buffer.tell() would_write = len(data) if cur_size + would_write > limit: raise IOError('Pack file too big for repository %s, ' 'limit is %d bytes, current size is %d, ' 'would write %d' % (origin_url, limit, cur_size, would_write)) pack_buffer.write(data) remote_refs = client.fetch_pack(path.encode('ascii'), base_repo.determine_wants, base_repo.graph_walker(), do_pack, progress=do_activity) if remote_refs: local_refs = base_repo.find_remote_ref_types_in_swh(remote_refs) else: local_refs = remote_refs = {} pack_buffer.flush() pack_size = pack_buffer.tell() pack_buffer.seek(0) return { 'remote_refs': base_repo.filter_unwanted_refs(remote_refs), 'local_refs': local_refs, 'pack_buffer': pack_buffer, 'pack_size': pack_size, } def list_pack(self, pack_data, pack_size): id_to_type = {} type_to_ids = defaultdict(set) inflater = self.get_inflater() for obj in inflater: type, id = obj.type_name, obj.id id_to_type[id] = type type_to_ids[type].add(id) return id_to_type, type_to_ids def prepare(self, origin_url, base_url=None): self.fetch_date = datetime.datetime.now(tz=datetime.timezone.utc) origin = converters.origin_url_to_origin(origin_url) base_origin = converters.origin_url_to_origin(base_url) base_occurrences = [] base_origin_id = origin_id = None db_origin = self.storage.origin_get(origin) if db_origin: base_origin_id = origin_id = db_origin['id'] if origin_id: base_occurrences = self.storage.occurrence_get(origin_id) if base_url and not base_occurrences: base_origin = self.storage.origin_get(base_origin) if base_origin: base_origin_id = base_origin['id'] base_occurrences = self.storage.occurrence_get(base_origin_id) self.base_occurrences = list(sorted(base_occurrences, key=lambda occ: occ['branch'])) self.base_origin_id = base_origin_id self.origin = origin def get_origin(self): return self.origin def fetch_data(self): def do_progress(msg): sys.stderr.buffer.write(msg) sys.stderr.flush() fetch_info = self.fetch_pack_from_origin( self.origin['url'], self.base_origin_id, self.base_occurrences, do_progress) self.pack_buffer = fetch_info['pack_buffer'] self.pack_size = fetch_info['pack_size'] self.remote_refs = fetch_info['remote_refs'] self.local_refs = fetch_info['local_refs'] origin_url = self.origin['url'] self.log.info('Listed %d refs for repo %s' % ( len(self.remote_refs), origin_url), extra={ 'swh_type': 'git_repo_list_refs', 'swh_repo': origin_url, 'swh_num_refs': len(self.remote_refs), }) # We want to load the repository, walk all the objects id_to_type, type_to_ids = self.list_pack(self.pack_buffer, self.pack_size) self.id_to_type = id_to_type self.type_to_ids = type_to_ids def save_data(self): """Store a pack for archival""" write_size = 8192 pack_dir = self.get_save_data_path() pack_name = "%s.pack" % self.fetch_date.isoformat() refs_name = "%s.refs" % self.fetch_date.isoformat() with open(os.path.join(pack_dir, pack_name), 'xb') as f: while True: r = self.pack_buffer.read(write_size) if not r: break f.write(r) self.pack_buffer.seek(0) with open(os.path.join(pack_dir, refs_name), 'xb') as f: pickle.dump(self.remote_refs, f) def get_inflater(self): """Reset the pack buffer and get an object inflater from it""" self.pack_buffer.seek(0) return PackInflater.for_pack_data( PackData.from_file(self.pack_buffer, self.pack_size)) def has_contents(self): return bool(self.type_to_ids[b'blob']) def get_contents(self): """Format the blobs from the git repository as swh contents""" max_content_size = self.config['content_size_limit'] for raw_obj in self.get_inflater(): if raw_obj.type_name != b'blob': continue yield converters.dulwich_blob_to_content( raw_obj, log=self.log, max_content_size=max_content_size, origin_id=self.origin_id) def has_directories(self): return bool(self.type_to_ids[b'tree']) def get_directories(self): """Format the trees as swh directories""" for raw_obj in self.get_inflater(): if raw_obj.type_name != b'tree': continue yield converters.dulwich_tree_to_directory(raw_obj, log=self.log) def has_revisions(self): return bool(self.type_to_ids[b'commit']) def get_revisions(self): """Format commits as swh revisions""" for raw_obj in self.get_inflater(): if raw_obj.type_name != b'commit': continue yield converters.dulwich_commit_to_revision(raw_obj, log=self.log) def has_releases(self): return bool(self.type_to_ids[b'tag']) def get_releases(self): """Retrieve all the release objects from the git repository""" for raw_obj in self.get_inflater(): if raw_obj.type_name != b'tag': continue yield converters.dulwich_tag_to_release(raw_obj, log=self.log) def has_occurrences(self): return bool(self.remote_refs) def get_occurrences(self): origin_id = self.origin_id visit = self.visit ret = [] for ref in self.remote_refs: ret_ref = self.local_refs[ref].copy() ret_ref.update({ 'branch': ref, 'origin': origin_id, 'visit': visit, }) if not ret_ref['target_type']: target_type = self.id_to_type[ret_ref['target']] ret_ref['target_type'] = converters.DULWICH_TYPES[target_type] ret_ref['target'] = hashutil.bytehex_to_hash(ret_ref['target']) ret.append(ret_ref) return ret def get_fetch_history_result(self): return { 'contents': len(self.type_to_ids[b'blob']), 'directories': len(self.type_to_ids[b'tree']), 'revisions': len(self.type_to_ids[b'commit']), 'releases': len(self.type_to_ids[b'tag']), 'occurrences': len(self.remote_refs), } def eventful(self): """The load was eventful if the current occurrences are different to the ones we retrieved at the beginning of the run""" current_occurrences = list(sorted( self.storage.occurrence_get(self.origin_id), key=lambda occ: occ['branch'], )) return self.base_occurrences != current_occurrences if __name__ == '__main__': logging.basicConfig( level=logging.DEBUG, format='%(asctime)s %(process)d %(message)s' ) bulkupdater = BulkUpdater() origin_url = sys.argv[1] base_url = origin_url if len(sys.argv) > 2: base_url = sys.argv[2] print(bulkupdater.load(origin_url, base_url))