diff --git a/debian/control b/debian/control index 8b42bdc..83cfa83 100644 --- a/debian/control +++ b/debian/control @@ -1,31 +1,31 @@ Source: swh-loader-git Maintainer: Software Heritage developers Section: python Priority: optional Build-Depends: debhelper (>= 9), dh-python (>= 2), python3-all, python3-click, python3-dulwich (>= 0.18.7~), python3-nose, python3-retrying, python3-setuptools, python3-swh.core (>= 0.0.7~), python3-swh.loader.core (>= 0.0.32), - python3-swh.model (>= 0.0.15~), + python3-swh.model (>= 0.0.27~), python3-swh.scheduler (>= 0.0.14~), python3-swh.storage (>= 0.0.83~), python3-vcversioner Standards-Version: 3.9.6 Homepage: https://forge.softwareheritage.org/diffusion/DLDG/ Package: python3-swh.loader.git Architecture: all Depends: python3-swh.core (>= 0.0.7~), python3-swh.loader.core (>= 0.0.32~), - python3-swh.model (>= 0.0.15~), + python3-swh.model (>= 0.0.27~), python3-swh.scheduler (>= 0.0.14~), python3-swh.storage (>= 0.0.83~), ${misc:Depends}, ${python3:Depends} Description: Software Heritage Git loader diff --git a/requirements-swh.txt b/requirements-swh.txt index be87c8b..5e2352e 100644 --- a/requirements-swh.txt +++ b/requirements-swh.txt @@ -1,5 +1,5 @@ swh.core >= 0.0.7 swh.loader.core >= 0.0.32 -swh.model >= 0.0.15 +swh.model >= 0.0.27 swh.scheduler >= 0.0.14 swh.storage >= 0.0.83 diff --git a/swh/loader/git/converters.py b/swh/loader/git/converters.py index bcdea5c..6793c9e 100644 --- a/swh/loader/git/converters.py +++ b/swh/loader/git/converters.py @@ -1,240 +1,238 @@ # Copyright (C) 2015-2017 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information """Convert dulwich objects to dictionaries suitable for swh.storage""" -from swh.model import hashutil, identifiers +from swh.model import identifiers +from swh.model.hashutil import ( + DEFAULT_ALGORITHMS, hash_to_hex, hash_to_bytes, MultiHash +) -HASH_ALGORITHMS = hashutil.DEFAULT_ALGORITHMS - {'sha1_git'} +HASH_ALGORITHMS = DEFAULT_ALGORITHMS - {'sha1_git'} def origin_url_to_origin(origin_url): """Format a pygit2.Repository as an origin suitable for swh.storage""" return { 'type': 'git', 'url': origin_url, } def dulwich_blob_to_content_id(blob): """Convert a dulwich blob to a Software Heritage content id""" - if blob.type_name != b'blob': return size = blob.raw_length() - ret = { - 'sha1_git': blob.sha().digest(), - 'length': size, - } - data = blob.as_raw_string() - ret.update(hashutil.hash_data(data, HASH_ALGORITHMS)) - - return ret + hashes = MultiHash.from_data(data, HASH_ALGORITHMS, length=size).digest() + hashes['sha1_git'] = blob.sha().digest() + hashes['length'] = size + return hashes def dulwich_blob_to_content(blob, log=None, max_content_size=None, origin_id=None): """Convert a dulwich blob to a Software Heritage content""" if blob.type_name != b'blob': return ret = dulwich_blob_to_content_id(blob) size = ret['length'] if max_content_size: if size > max_content_size: - id = hashutil.hash_to_hex(ret['sha1_git']) + id = hash_to_hex(ret['sha1_git']) if log: log.info('Skipping content %s, too large (%s > %s)' % (id, size, max_content_size), extra={ 'swh_type': 'loader_git_content_skip', 'swh_id': id, 'swh_size': size, }) ret['status'] = 'absent' ret['reason'] = 'Content too large' ret['origin'] = origin_id return ret data = blob.as_raw_string() ret['data'] = data ret['status'] = 'visible' return ret def dulwich_tree_to_directory(tree, log=None): """Format a tree as a directory""" if tree.type_name != b'tree': return ret = { 'id': tree.sha().digest(), } entries = [] ret['entries'] = entries entry_mode_map = { 0o040000: 'dir', 0o160000: 'rev', 0o100644: 'file', 0o100755: 'file', 0o120000: 'file', } for entry in tree.iteritems(): entries.append({ 'type': entry_mode_map.get(entry.mode, 'file'), 'perms': entry.mode, 'name': entry.path, - 'target': hashutil.hash_to_bytes(entry.sha.decode('ascii')), + 'target': hash_to_bytes(entry.sha.decode('ascii')), }) return ret def parse_author(name_email): """Parse an author line""" if name_email is None: return None try: open_bracket = name_email.index(b'<') except ValueError: name = email = None else: raw_name = name_email[:open_bracket] raw_email = name_email[open_bracket+1:] if not raw_name: name = None elif raw_name.endswith(b' '): name = raw_name[:-1] else: name = raw_name try: close_bracket = raw_email.index(b'>') except ValueError: email = None else: email = raw_email[:close_bracket] return { 'name': name, 'email': email, 'fullname': name_email, } def dulwich_tsinfo_to_timestamp(timestamp, timezone, timezone_neg_utc): """Convert the dulwich timestamp information to a structure compatible with Software Heritage""" return { 'timestamp': timestamp, 'offset': timezone // 60, 'negative_utc': timezone_neg_utc if timezone == 0 else None, } def dulwich_commit_to_revision(commit, log=None): if commit.type_name != b'commit': return ret = { 'id': commit.sha().digest(), 'author': parse_author(commit.author), 'date': dulwich_tsinfo_to_timestamp( commit.author_time, commit.author_timezone, commit._author_timezone_neg_utc, ), 'committer': parse_author(commit.committer), 'committer_date': dulwich_tsinfo_to_timestamp( commit.commit_time, commit.commit_timezone, commit._commit_timezone_neg_utc, ), 'type': 'git', 'directory': bytes.fromhex(commit.tree.decode()), 'message': commit.message, 'metadata': None, 'synthetic': False, 'parents': [bytes.fromhex(p.decode()) for p in commit.parents], } git_metadata = [] if commit.encoding is not None: git_metadata.append(['encoding', commit.encoding]) if commit.mergetag: for mergetag in commit.mergetag: raw_string = mergetag.as_raw_string() assert raw_string.endswith(b'\n') git_metadata.append(['mergetag', raw_string[:-1]]) if commit.extra: git_metadata.extend([k.decode('utf-8'), v] for k, v in commit.extra) if commit.gpgsig: git_metadata.append(['gpgsig', commit.gpgsig]) if git_metadata: ret['metadata'] = { 'extra_headers': git_metadata, } return ret DULWICH_TYPES = { b'blob': 'content', b'tree': 'directory', b'commit': 'revision', b'tag': 'release', } def dulwich_tag_to_release(tag, log=None): if tag.type_name != b'tag': return target_type, target = tag.object ret = { 'id': tag.sha().digest(), 'name': tag.name, 'target': bytes.fromhex(target.decode()), 'target_type': DULWICH_TYPES[target_type.type_name], 'message': tag._message, 'metadata': None, 'synthetic': False, } if tag.tagger: ret['author'] = parse_author(tag.tagger) if not tag.tag_time: ret['date'] = None else: ret['date'] = dulwich_tsinfo_to_timestamp( tag.tag_time, tag.tag_timezone, tag._tag_timezone_neg_utc, ) else: ret['author'] = ret['date'] = None return ret def branches_to_snapshot(branches): snapshot = {'branches': branches} snapshot_id = identifiers.snapshot_identifier(snapshot) snapshot['id'] = identifiers.identifier_to_bytes(snapshot_id) return snapshot diff --git a/swh/loader/git/reader.py b/swh/loader/git/reader.py index 2491530..2da2b7f 100644 --- a/swh/loader/git/reader.py +++ b/swh/loader/git/reader.py @@ -1,258 +1,258 @@ # Copyright (C) 2016-2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from collections import defaultdict import logging import pprint import click from swh.core import utils -from swh.model import hashutil +from swh.model.hashutil import MultiHash, hash_to_hex from .updater import BulkUpdater, SWHRepoRepresentation from . import converters class SWHRepoFullRepresentation(SWHRepoRepresentation): """Overridden representation of a swh repository to permit to read completely the remote repository. """ def __init__(self, storage, origin_id, occurrences=None): self.storage = storage self._parents_cache = {} self._type_cache = {} self.heads = set() def determine_wants(self, refs): """Filter the remote references to figure out which ones Software Heritage needs. In this particular context, we want to know everything. """ if not refs: return [] for target in refs.values(): self.heads.add(target) return self.filter_unwanted_refs(refs).values() def find_remote_ref_types_in_swh(self, remote_refs): """Find the known swh remote. In that particular context, we know nothing. """ return {} class DummyGraphWalker(object): """Dummy graph walker which claims that the client doesn’t have any objects. """ def ack(self, sha): pass def next(self): pass def __next__(self): pass class BaseGitRemoteReader(BulkUpdater): CONFIG_BASE_FILENAME = 'loader/git-remote-reader' ADDITIONAL_CONFIG = { 'pack_size_bytes': ('int', 4 * 1024 * 1024 * 1024), 'pack_storage_base': ('str', ''), # don't want to store packs so empty 'next_task': ( 'dict', { 'queue': 'swh.storage.archiver.tasks.SWHArchiverToBackendTask', 'batch_size': 100, 'destination': 'azure' } ) } def __init__(self): super().__init__(SWHRepoFullRepresentation) self.next_task = self.config['next_task'] self.batch_size = self.next_task['batch_size'] self.task_destination = self.next_task['queue'] self.destination = self.next_task['destination'] def graph_walker(self): return DummyGraphWalker() def prepare_origin_visit(self, origin_url, base_url=None): self.origin = converters.origin_url_to_origin(origin_url) self.origin_id = 0 def prepare(self, origin_url, base_url=None): """Only retrieve information about the origin, set everything else to empty. """ self.base_occurrences = [] self.base_origin_id = 0 def keep_object(self, obj): """Do we want to keep this object or not?""" raise NotImplementedError('Please implement keep_object') def get_id_and_data(self, obj): """Get the id, type and data of the given object""" raise NotImplementedError('Please implement get_id_and_data') def list_pack(self, pack_data, pack_size): """Override list_pack to only keep contents' sha1. Returns: id_to_type (dict): keys are sha1, values are their associated type type_to_ids (dict): keys are types, values are list of associated data (sha1 for blobs) """ self.data = {} id_to_type = {} type_to_ids = defaultdict(set) inflater = self.get_inflater() for obj in inflater: if not self.keep_object(obj): continue object_id, type, data = self.get_id_and_data(obj) id_to_type[object_id] = type type_to_ids[type].add(object_id) self.data[object_id] = data return id_to_type, type_to_ids def load(self, *args, **kwargs): """Override the loading part which simply reads the repository's contents' sha1. Returns: Returns the list of discovered sha1s for that origin. """ self.prepare(*args, **kwargs) self.fetch_data() class GitSha1RemoteReader(BaseGitRemoteReader): """Read sha1 git from a remote repository and dump only repository's content sha1 as list. """ def keep_object(self, obj): """Only keep blobs""" return obj.type_name == b'blob' def get_id_and_data(self, obj): """We want to store only object identifiers""" # compute the sha1 (obj.id is the sha1_git) data = obj.as_raw_string() - hashes = hashutil.hash_data(data, {'sha1'}) + hashes = MultiHash.from_data(data, {'sha1'}).digest() oid = hashes['sha1'] return (oid, b'blob', oid) class GitSha1RemoteReaderAndSendToQueue(GitSha1RemoteReader): """Read sha1 git from a remote repository and dump only repository's content sha1 as list and send batch of those sha1s to a celery queue for consumption. """ def load(self, *args, **kwargs): """Retrieve the list of sha1s for a particular origin and send those sha1s as group of sha1s to a specific queue. """ super().load(*args, **kwargs) data = self.type_to_ids[b'blob'] from swh.scheduler.celery_backend.config import app try: # optional dependency from swh.storage.archiver import tasks # noqa except ImportError: pass from celery import group task_destination = app.tasks[self.task_destination] groups = [] for ids in utils.grouper(data, self.batch_size): sig_ids = task_destination.s(destination=self.destination, batch=list(ids)) groups.append(sig_ids) group(groups).delay() return data class GitCommitRemoteReader(BaseGitRemoteReader): def keep_object(self, obj): return obj.type_name == b'commit' def get_id_and_data(self, obj): return obj.id, b'commit', converters.dulwich_commit_to_revision(obj) def load(self, *args, **kwargs): super().load(*args, **kwargs) return self.data @click.group() @click.option('--origin-url', help='Origin url') @click.pass_context def main(ctx, origin_url): logging.basicConfig( level=logging.DEBUG, format='%(asctime)s %(process)d %(message)s' ) ctx.obj['origin_url'] = origin_url @main.command() @click.option('--send/--nosend', default=False, help='Origin\'s url') @click.pass_context def blobs(ctx, send): origin_url = ctx.obj['origin_url'] if send: loader = GitSha1RemoteReaderAndSendToQueue() ids = loader.load(origin_url) print('%s sha1s were sent to queue' % len(ids)) return loader = GitSha1RemoteReader() ids = loader.load(origin_url) if ids: for oid in ids: - print(hashutil.hash_to_hex(oid)) + print(hash_to_hex(oid)) @main.command() @click.option('--ids-only', is_flag=True, help='print ids only') @click.pass_context def commits(ctx, ids_only): origin_url = ctx.obj['origin_url'] reader = GitCommitRemoteReader() commits = reader.load(origin_url) for commit_id, commit in commits.items(): if ids_only: print(commit_id.decode()) else: pprint.pprint(commit) if __name__ == '__main__': main(obj={})