diff --git a/PKG-INFO b/PKG-INFO index 0da254b..a95130a 100644 --- a/PKG-INFO +++ b/PKG-INFO @@ -1,10 +1,10 @@ Metadata-Version: 1.0 Name: swh.loader.git -Version: 0.0.21 +Version: 0.0.22 Summary: Software Heritage git loader Home-page: https://forge.softwareheritage.org/diffusion/DCORE/ Author: Software Heritage developers Author-email: swh-devel@inria.fr License: UNKNOWN Description: UNKNOWN Platform: UNKNOWN diff --git a/swh.loader.git.egg-info/PKG-INFO b/swh.loader.git.egg-info/PKG-INFO index 0da254b..a95130a 100644 --- a/swh.loader.git.egg-info/PKG-INFO +++ b/swh.loader.git.egg-info/PKG-INFO @@ -1,10 +1,10 @@ Metadata-Version: 1.0 Name: swh.loader.git -Version: 0.0.21 +Version: 0.0.22 Summary: Software Heritage git loader Home-page: https://forge.softwareheritage.org/diffusion/DCORE/ Author: Software Heritage developers Author-email: swh-devel@inria.fr License: UNKNOWN Description: UNKNOWN Platform: UNKNOWN diff --git a/swh/loader/git/reader.py b/swh/loader/git/reader.py index a35a971..634e23d 100644 --- a/swh/loader/git/reader.py +++ b/swh/loader/git/reader.py @@ -1,213 +1,204 @@ # Copyright (C) 2016 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import click -import datetime import logging from collections import defaultdict from swh.core import hashutil, utils from .updater import BulkUpdater, SWHRepoRepresentation -from .loader import GitLoader from . import converters -class GitSha1Reader(GitLoader): - """Disk git sha1 reader. Only read and dump sha1s in stdout. - - """ - def fetch_data(self): - """Fetch the data from the data source""" - for oid in self.iter_objects(): - type_name = self.repo[oid].type_name - if type_name != b'blob': - continue - yield hashutil.hex_to_hash(oid.decode('utf-8')) - - def load(self, *args, **kwargs): - self.prepare(*args, **kwargs) - yield from self.fetch_data() - - class SWHRepoFullRepresentation(SWHRepoRepresentation): """Overridden representation of a swh repository to permit to read completely the remote repository. """ def __init__(self, storage, origin_id, occurrences=None): self.storage = storage self._parents_cache = {} self._type_cache = {} self.heads = set() def determine_wants(self, refs): """Filter the remote references to figure out which ones Software Heritage needs. In this particular context, we want to know everything. """ if not refs: return [] for target in refs.values(): self.heads.add(target) return self.filter_unwanted_refs(refs).values() def find_remote_ref_types_in_swh(self, remote_refs): """Find the known swh remote. In that particular context, we know nothing. """ return {} class DummyGraphWalker(object): """Dummy graph walker which claims that the client doesn’t have any objects. """ def ack(self, sha): pass def next(self): pass def __next__(self): pass class GitSha1RemoteReader(BulkUpdater): - """Disk git sha1 reader to dump only repo's content sha1 list. + """Read sha1 git from a remote repository and dump only repository's + content sha1 as list. """ CONFIG_BASE_FILENAME = 'loader/git-remote-reader' ADDITIONAL_CONFIG = { 'pack_size_bytes': ('int', 4 * 1024 * 1024 * 1024), 'pack_storage_base': ('str', ''), # don't want to store packs so empty 'next_task': ( 'dict', { 'queue': 'swh.storage.archiver.tasks.SWHArchiverToBackendTask', 'batch_size': 100, 'destination': 'azure' } ) } def __init__(self): super().__init__(SWHRepoFullRepresentation) self.next_task = self.config['next_task'] self.batch_size = self.next_task['batch_size'] - self.task_destination = self.next_task.get('queue') + self.task_destination = self.next_task['queue'] self.destination = self.next_task['destination'] def graph_walker(self): return DummyGraphWalker() def prepare(self, origin_url, base_url=None): """Only retrieve information about the origin, set everything else to empty. """ ori = converters.origin_url_to_origin(origin_url) self.origin = self.storage.origin_get(ori) self.origin_id = self.origin['id'] self.base_occurrences = [] self.base_origin_id = self.origin['id'] def list_pack(self, pack_data, pack_size): """Override list_pack to only keep contents' sha1. Returns: id_to_type (dict): keys are sha1, values are their associated type type_to_ids (dict): keys are types, values are list of associated ids (sha1 for blobs) """ id_to_type = {} type_to_ids = defaultdict(set) inflater = self.get_inflater() for obj in inflater: - type, id = obj.type_name, obj.id + type = obj.type_name if type != b'blob': # don't keep other types continue - oid = hashutil.hex_to_hash(id.decode('utf-8')) + + # compute the sha1 (obj.id is the sha1_git) + data = obj.as_raw_string() + hashes = hashutil.hashdata(data, {'sha1'}) + oid = hashes['sha1'] + id_to_type[oid] = type type_to_ids[type].add(oid) return id_to_type, type_to_ids def load(self, *args, **kwargs): """Override the loading part which simply reads the repository's contents' sha1. Returns: - If the configuration holds a destination queue, send those - sha1s as batch of sha1s to it for consumption. Otherwise, - returns the list of discovered sha1s. + Returns the list of discovered sha1s for that origin. """ try: self.prepare(*args, **kwargs) except: self.log.error('Unknown repository, skipping...') return [] self.fetch_data() - data = self.type_to_ids[b'blob'] + return self.type_to_ids[b'blob'] + - if not self.task_destination: # to stdout - return data +class GitSha1RemoteReaderAndSendToQueue(GitSha1RemoteReader): + """Read sha1 git from a remote repository and dump only repository's + content sha1 as list and send batch of those sha1s to a celery + queue for consumption. + + """ + def load(self, *args, **kwargs): + """Retrieve the list of sha1s for a particular origin and send those + sha1s as group of sha1s to a specific queue. + + """ + data = super().load(*args, **kwargs) from swh.scheduler.celery_backend.config import app try: # optional dependency from swh.storage.archiver import tasks # noqa except ImportError: pass from celery import group task_destination = app.tasks[self.task_destination] groups = [] for ids in utils.grouper(data, self.batch_size): sig_ids = task_destination.s(destination=self.destination, batch=list(ids)) groups.append(sig_ids) group(groups).delay() return data @click.command() @click.option('--origin-url', help='Origin\'s url') -@click.option('--source', default=None, - help='origin\'s source url (disk or remote)') -def main(origin_url, source): +@click.option('--send/--nosend', default=False, help='Origin\'s url') +def main(origin_url, send): logging.basicConfig( level=logging.DEBUG, format='%(asctime)s %(process)d %(message)s' ) - local_reader = (source and source.startswith('/')) or origin_url.startswith('/') # noqa + if send: + loader = GitSha1RemoteReaderAndSendToQueue() + ids = loader.load(origin_url) + print('%s sha1s were sent to queue' % len(ids)) + return - if local_reader: - loader = GitSha1Reader() - fetch_date = datetime.datetime.now(tz=datetime.timezone.utc) - ids = loader.load(origin_url, source, fetch_date) - else: - loader = GitSha1RemoteReader() - ids = loader.load(origin_url, source) + loader = GitSha1RemoteReader() + ids = loader.load(origin_url) if ids: - count = 0 for oid in ids: - print(oid) - count += 1 - print("sha1s: %s" % count) + print(hashutil.hash_to_hex(oid)) if __name__ == '__main__': main() diff --git a/swh/loader/git/tasks.py b/swh/loader/git/tasks.py index 6373378..6b979de 100644 --- a/swh/loader/git/tasks.py +++ b/swh/loader/git/tasks.py @@ -1,53 +1,53 @@ # Copyright (C) 2015-2016 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import dateutil.parser from swh.scheduler.task import Task from .loader import GitLoader from .updater import BulkUpdater -from .reader import GitSha1RemoteReader +from .reader import GitSha1RemoteReaderAndSendToQueue # TODO: rename to LoadRemoteGitRepository class UpdateGitRepository(Task): """Import a git repository from a remote location""" task_queue = 'swh_loader_git' def run(self, repo_url, base_url=None): """Import a git repository""" loader = BulkUpdater() loader.log = self.log return loader.load(repo_url, base_url) class LoadDiskGitRepository(Task): """Import a git repository from disk""" task_queue = 'swh_loader_git' def run(self, origin_url, directory, date): """Import a git repository, cloned in `directory` from `origin_url` at `date`.""" loader = GitLoader() loader.log = self.log return loader.load(origin_url, directory, dateutil.parser.parse(date)) class ReaderGitRepository(Task): task_queue = 'swh_reader_git' def run(self, repo_url, base_url=None): """Read a git repository from a remote location and send sha1 to archival. """ - loader = GitSha1RemoteReader() + loader = GitSha1RemoteReaderAndSendToQueue() loader.log = self.log return loader.load(repo_url) diff --git a/version.txt b/version.txt index 9f5f749..9ae8190 100644 --- a/version.txt +++ b/version.txt @@ -1 +1 @@ -v0.0.21-0-g370f17a \ No newline at end of file +v0.0.22-0-gae4606d \ No newline at end of file