diff --git a/swh/loader/git/reader.py b/swh/loader/git/reader.py index 69def1b..b069665 100644 --- a/swh/loader/git/reader.py +++ b/swh/loader/git/reader.py @@ -1,95 +1,130 @@ # Copyright (C) 2016 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import click import datetime from collections import defaultdict +from swh.core import hashutil, utils + from .updater import BulkUpdater from .loader import GitLoader class GitSha1Reader(GitLoader): """Disk git sha1 reader. Only read and dump sha1s in stdout. """ def fetch_data(self): """Fetch the data from the data source""" for oid in self.iter_objects(): type_name = self.repo[oid].type_name if type_name != b'blob': continue - yield oid + yield hashutil.hex_to_hash(oid.decode('utf-8')) def load(self, *args, **kwargs): self.prepare(*args, **kwargs) - try: - for oid in self.fetch_data(): - yield oid.decode('utf-8') - except: - pass + yield from self.fetch_data() class GitSha1RemoteReader(BulkUpdater): """Disk git sha1 reader to dump only repo's content sha1 list. """ + CONFIG_BASE_FILENAME = 'loader/git-remote-reader' + + ADDITIONAL_CONFIG = { + 'pack_size_bytes': ('int', 4 * 1024 * 1024 * 1024), + 'pack_storage_base': ('str', ''), # don't want to store packs so empty + 'next_task': ( + 'dict', { + 'queue': 'swh.storage.archiver.tasks.SWHArchiverToBackendTask', + 'batch_size': 100, + 'destination': 'azure' + } + ) + } + + def __init__(self): + super().__init__() + self.next_task = self.config['next_task'] + self.batch_size = self.next_task['batch_size'] + self.task_destination = self.next_task.get('queue') + self.destination = self.next_task['destination'] + def list_pack(self, pack_data, pack_size): """Override list_pack to only keep blobs data. """ id_to_type = {} type_to_ids = defaultdict(set) inflater = self.get_inflater() for obj in inflater: type, id = obj.type_name, obj.id - if type != b'blob': + if type != b'blob': # don't keep other types continue - id_to_type[id] = type - type_to_ids[type].add(id) + oid = hashutil.hex_to_hash(id.decode('utf-8')) + id_to_type[oid] = type + type_to_ids[type].add(oid) return id_to_type, type_to_ids def load(self, *args, **kwargs): self.prepare(*args, **kwargs) origin = self.get_origin() self.origin_id = self.send_origin(origin) + self.fetch_data() + data = self.id_to_type.keys() + if not self.task_destination: # to stdout + yield from data + return + + from swh.scheduler.celery_backend.config import app try: - self.fetch_data() - for oid in self.id_to_type.keys(): - yield oid.decode('utf-8') - except: + # optional dependency + from swh.storage.archiver import tasks # noqa + except ImportError: pass + from celery import group + + task_destination = app.tasks[self.task_destination] + groups = [] + for ids in utils.grouper(data, self.batch_size): + sig_ids = task_destination.s(destination=self.destination, + batch=list(ids)) + groups.append(sig_ids) + group(groups).delay() @click.command() @click.option('--origin-url', help='Origin\'s url') @click.option('--source', help='origin\'s source url (disk or remote)') def main(origin_url, source): import logging logging.basicConfig( level=logging.DEBUG, format='%(asctime)s %(process)d %(message)s' ) if source.startswith('/'): loader = GitSha1Reader() fetch_date = datetime.datetime.now(tz=datetime.timezone.utc) - r = loader.load(origin_url, source, fetch_date) + ids = loader.load(origin_url, source, fetch_date) else: loader = GitSha1RemoteReader() - r = loader.load(origin_url, source) + ids = loader.load(origin_url, source) - for id in r: - print(id) + for oid in ids: + print(oid) if __name__ == '__main__': main() diff --git a/swh/loader/git/tasks.py b/swh/loader/git/tasks.py index 98e2333..752e6fc 100644 --- a/swh/loader/git/tasks.py +++ b/swh/loader/git/tasks.py @@ -1,38 +1,53 @@ # Copyright (C) 2015 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import dateutil.parser from swh.scheduler.task import Task from .loader import GitLoader from .updater import BulkUpdater +from .reader import GitSha1RemoteReader # TODO: rename to LoadRemoteGitRepository class UpdateGitRepository(Task): """Import a git repository from a remote location""" task_queue = 'swh_loader_git' def run(self, repo_url, base_url=None): """Import a git repository""" loader = BulkUpdater() loader.log = self.log return loader.load(repo_url, base_url) class LoadDiskGitRepository(Task): """Import a git repository from disk""" task_queue = 'swh_loader_git' def run(self, origin_url, directory, date): """Import a git repository, cloned in `directory` from `origin_url` at `date`.""" loader = GitLoader() loader.log = self.log return loader.load(origin_url, directory, dateutil.parser.parse(date)) + + +class ReaderGitRepository(Task): + task_queue = 'swh_reader_git' + + def run(self, repo_url, base_url=None): + """Read a git repository from a remote location and send sha1 to + archival. + + """ + loader = GitSha1RemoteReader() + loader.log = self.log + + loader.load(repo_url, base_url)