diff --git a/PKG-INFO b/PKG-INFO index 6aba2fa..9d36bca 100644 --- a/PKG-INFO +++ b/PKG-INFO @@ -1,10 +1,10 @@ Metadata-Version: 1.0 Name: swh.loader.git -Version: 0.0.35 +Version: 0.0.36 Summary: Software Heritage git loader -Home-page: https://forge.softwareheritage.org/diffusion/DCORE/ +Home-page: https://forge.softwareheritage.org/diffusion/DLDG/ Author: Software Heritage developers Author-email: swh-devel@inria.fr License: UNKNOWN Description: UNKNOWN Platform: UNKNOWN diff --git a/README b/README index d5c000b..a95a5f9 100644 --- a/README +++ b/README @@ -1,83 +1,82 @@ The Software Heritage Git Loader is a tool and a library to walk a local Git repository and inject into the SWH dataset all contained files that weren't known before. License ======= This program is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. See top-level LICENSE file for the full text of the GNU General Public License along with this program. Dependencies ============ Runtime ------- - python3 - python3-dulwich - python3-retrying - python3-swh.core - python3-swh.model - python3-swh.storage - python3-swh.scheduler Test ---- - python3-nose Requirements ============ - implementation language, Python3 - coding guidelines: conform to PEP8 - Git access: via dulwich Configuration ============= You can run the loader or the updater directly by calling python3 -m swh.loader.git.{loader,updater}. Both tools expect a configuration file in .ini format to be present in ~/.config/swh/loader/git-{loader,updater}.ini The configuration file contains the following directives: ``` [main] # the storage class used. one of remote_storage, local_storage storage_class = remote_storage # arguments passed to the storage class # for remote_storage: URI of the storage server storage_args = http://localhost:5002/ # for local_storage: database connection string and root of the # storage, comma separated # storage_args = dbname=softwareheritage-dev, /tmp/swh/storage # Whether to send the given types of objects send_contents = True send_directories = True send_revisions = True send_releases = True -send_occurrences = True +send_snapshot = True # The size of the packets sent to storage for each kind of object content_packet_size = 100000 content_packet_size_bytes = 1073741824 directory_packet_size = 25000 revision_packet_size = 100000 release_packet_size = 100000 -occurrence_packet_size = 100000 ``` diff --git a/debian/control b/debian/control index 6876512..f35292d 100644 --- a/debian/control +++ b/debian/control @@ -1,31 +1,31 @@ Source: swh-loader-git Maintainer: Software Heritage developers Section: python Priority: optional Build-Depends: debhelper (>= 9), dh-python (>= 2), python3-all, python3-click, python3-dulwich (>= 0.18.7~), python3-nose, python3-retrying, python3-setuptools, python3-swh.core (>= 0.0.7~), - python3-swh.loader.core (>= 0.0.22), + python3-swh.loader.core (>= 0.0.30), python3-swh.model (>= 0.0.15~), python3-swh.scheduler (>= 0.0.14~), python3-swh.storage (>= 0.0.83~), python3-vcversioner Standards-Version: 3.9.6 Homepage: https://forge.softwareheritage.org/diffusion/DLDG/ Package: python3-swh.loader.git Architecture: all Depends: python3-swh.core (>= 0.0.7~), - python3-swh.loader.core (>= 0.0.22~), + python3-swh.loader.core (>= 0.0.30~), python3-swh.model (>= 0.0.15~), python3-swh.scheduler (>= 0.0.14~), python3-swh.storage (>= 0.0.83~), ${misc:Depends}, ${python3:Depends} Description: Software Heritage Git loader diff --git a/requirements-swh.txt b/requirements-swh.txt index 187f8ca..9fec7cf 100644 --- a/requirements-swh.txt +++ b/requirements-swh.txt @@ -1,5 +1,5 @@ swh.core >= 0.0.7 -swh.loader.core >= 0.0.22 +swh.loader.core >= 0.0.30 swh.model >= 0.0.15 swh.scheduler >= 0.0.14 swh.storage >= 0.0.83 diff --git a/setup.py b/setup.py old mode 100644 new mode 100755 index 9949208..6472a33 --- a/setup.py +++ b/setup.py @@ -1,30 +1,30 @@ #!/usr/bin/env python3 from setuptools import setup, find_packages def parse_requirements(): requirements = [] for reqf in ('requirements.txt', 'requirements-swh.txt'): with open(reqf) as f: for line in f.readlines(): line = line.strip() if not line or line.startswith('#'): continue requirements.append(line) return requirements setup( name='swh.loader.git', description='Software Heritage git loader', author='Software Heritage developers', author_email='swh-devel@inria.fr', - url='https://forge.softwareheritage.org/diffusion/DCORE/', + url='https://forge.softwareheritage.org/diffusion/DLDG/', packages=find_packages(), scripts=[], install_requires=parse_requirements(), setup_requires=['vcversioner'], vcversioner={}, include_package_data=True, ) diff --git a/swh.loader.git.egg-info/PKG-INFO b/swh.loader.git.egg-info/PKG-INFO index 6aba2fa..9d36bca 100644 --- a/swh.loader.git.egg-info/PKG-INFO +++ b/swh.loader.git.egg-info/PKG-INFO @@ -1,10 +1,10 @@ Metadata-Version: 1.0 Name: swh.loader.git -Version: 0.0.35 +Version: 0.0.36 Summary: Software Heritage git loader -Home-page: https://forge.softwareheritage.org/diffusion/DCORE/ +Home-page: https://forge.softwareheritage.org/diffusion/DLDG/ Author: Software Heritage developers Author-email: swh-devel@inria.fr License: UNKNOWN Description: UNKNOWN Platform: UNKNOWN diff --git a/swh.loader.git.egg-info/SOURCES.txt b/swh.loader.git.egg-info/SOURCES.txt index 18775fc..bd9bf47 100644 --- a/swh.loader.git.egg-info/SOURCES.txt +++ b/swh.loader.git.egg-info/SOURCES.txt @@ -1,48 +1,47 @@ .gitignore .gitmodules AUTHORS LICENSE MANIFEST.in Makefile README requirements-swh.txt requirements.txt setup.py version.txt bin/dir-git-repo-meta.sh debian/changelog debian/compat debian/control debian/copyright debian/rules debian/source/format docs/.gitignore docs/Makefile docs/conf.py docs/index.rst docs/_static/.placeholder docs/_templates/.placeholder docs/attic/api-backend-protocol.txt docs/attic/git-loading-design.txt resources/local-loader-git.ini resources/remote-loader-git.ini resources/updater.ini resources/test/back.ini resources/test/db-manager.ini swh/__init__.py swh.loader.git.egg-info/PKG-INFO swh.loader.git.egg-info/SOURCES.txt swh.loader.git.egg-info/dependency_links.txt swh.loader.git.egg-info/requires.txt swh.loader.git.egg-info/top_level.txt swh/loader/__init__.py swh/loader/git/__init__.py -swh/loader/git/base.py swh/loader/git/converters.py swh/loader/git/loader.py swh/loader/git/reader.py swh/loader/git/tasks.py swh/loader/git/updater.py swh/loader/git/utils.py swh/loader/git/tests/test_converters.py swh/loader/git/tests/test_utils.py \ No newline at end of file diff --git a/swh.loader.git.egg-info/requires.txt b/swh.loader.git.egg-info/requires.txt index 420cd91..725e08b 100644 --- a/swh.loader.git.egg-info/requires.txt +++ b/swh.loader.git.egg-info/requires.txt @@ -1,9 +1,9 @@ click dulwich>=0.18.7 retrying swh.core>=0.0.7 -swh.loader.core>=0.0.22 +swh.loader.core>=0.0.30 swh.model>=0.0.15 swh.scheduler>=0.0.14 swh.storage>=0.0.83 vcversioner diff --git a/swh/loader/git/base.py b/swh/loader/git/base.py deleted file mode 100644 index c135a68..0000000 --- a/swh/loader/git/base.py +++ /dev/null @@ -1,168 +0,0 @@ -# Copyright (C) 2016-2017 The Software Heritage developers -# See the AUTHORS file at the top-level directory of this distribution -# License: GNU General Public License version 3, or any later version -# See top-level LICENSE file for more information - -import abc -import os - -from swh.loader.core.loader import SWHLoader - - -class BaseLoader(SWHLoader): - """This base class is a pattern for loaders. - - The external calling convention is as such: - - - instantiate the class once (loads storage and the configuration) - - for each origin, call load with the origin-specific arguments (for - instance, an origin URL). - - load calls several methods that must be implemented in subclasses: - - - prepare(\*args, \**kwargs) prepares the loader for the new origin - - get_origin gets the origin object associated to the current loader - - fetch_data downloads the necessary data from the origin - - get_{contents,directories,revisions,releases,occurrences} retrieve each - kind of object from the origin - - has\_* checks whether there are some objects to load for that object type - - get_fetch_history_result retrieves the data to insert in the - fetch_history table once the load was successful - - cleanup cleans up an eventual state installed for computations - - eventful returns whether the load was eventful or not - - """ - DEFAULT_CONFIG = { - 'storage': ('dict', { - 'cls': 'remote', - 'args': { - 'url': 'http://localhost:5002/' - }, - }), - 'send_contents': ('bool', True), - 'send_directories': ('bool', True), - 'send_revisions': ('bool', True), - 'send_releases': ('bool', True), - 'send_occurrences': ('bool', True), - - 'save_data': ('bool', False), - 'save_data_path': ('str', ''), - - 'content_packet_size': ('int', 10000), - 'content_packet_size_bytes': ('int', 1024 * 1024 * 1024), - 'directory_packet_size': ('int', 25000), - 'revision_packet_size': ('int', 100000), - 'release_packet_size': ('int', 100000), - 'occurrence_packet_size': ('int', 100000), - } - - def __init__(self): - super().__init__(logging_class='swh.loader.git.BulkLoader') - - # Make sure the config is sane - if self.config['save_data']: - path = self.config['save_data_path'] - os.stat(path) - if not os.access(path, os.R_OK | os.W_OK): - raise PermissionError("Permission denied: %r" % path) - - self.visit_date = None # possibly overridden in self.prepare method - - @abc.abstractmethod - def has_contents(self): - """Checks whether we need to load contents""" - pass - - def get_contents(self): - """Get the contents that need to be loaded""" - raise NotImplementedError - - @abc.abstractmethod - def has_directories(self): - """Checks whether we need to load directories""" - pass - - def get_directories(self): - """Get the directories that need to be loaded""" - raise NotImplementedError - - @abc.abstractmethod - def has_revisions(self): - """Checks whether we need to load revisions""" - - def get_revisions(self): - """Get the revisions that need to be loaded""" - raise NotImplementedError - - @abc.abstractmethod - def has_releases(self): - """Checks whether we need to load releases""" - return True - - def get_releases(self): - """Get the releases that need to be loaded""" - raise NotImplementedError - - @abc.abstractmethod - def has_occurrences(self): - """Checks whether we need to load occurrences""" - pass - - def get_occurrences(self): - """Get the occurrences that need to be loaded""" - raise NotImplementedError - - def get_fetch_history_result(self): - """Return the data to store in fetch_history for the current loader""" - raise NotImplementedError - - def eventful(self): - """Whether the load was eventful""" - raise NotImplementedError - - def save_data(self): - """Save the data associated to the current load""" - raise NotImplementedError - - def get_save_data_path(self): - """The path to which we save the data""" - if not hasattr(self, '__save_data_path'): - origin_id = self.origin_id - year = str(self.visit_date.year) - - path = os.path.join( - self.config['save_data_path'], - "%04d" % (origin_id % 10000), - "%08d" % origin_id, - year, - ) - - os.makedirs(path, exist_ok=True) - self.__save_data_path = path - - return self.__save_data_path - - def cleanup(self): - """Clean up an eventual state installed for computations. - Nothing specific for the loader-git is needed. - - """ - pass - - def store_data(self): - """Store data fetched from the git repository. - - """ - if self.config['save_data']: - self.save_data() - - if self.config['send_contents'] and self.has_contents(): - self.send_batch_contents(self.get_contents()) - if self.config['send_directories'] and self.has_directories(): - self.send_batch_directories(self.get_directories()) - if self.config['send_revisions'] and self.has_revisions(): - self.send_batch_revisions(self.get_revisions()) - if self.config['send_releases'] and self.has_releases(): - self.send_batch_releases(self.get_releases()) - if self.config['send_occurrences'] and self.has_occurrences(): - self.send_batch_occurrences(self.get_occurrences()) diff --git a/swh/loader/git/converters.py b/swh/loader/git/converters.py index b912a3c..bcdea5c 100644 --- a/swh/loader/git/converters.py +++ b/swh/loader/git/converters.py @@ -1,232 +1,240 @@ # Copyright (C) 2015-2017 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information """Convert dulwich objects to dictionaries suitable for swh.storage""" -from swh.model import hashutil +from swh.model import hashutil, identifiers HASH_ALGORITHMS = hashutil.DEFAULT_ALGORITHMS - {'sha1_git'} def origin_url_to_origin(origin_url): """Format a pygit2.Repository as an origin suitable for swh.storage""" return { 'type': 'git', 'url': origin_url, } def dulwich_blob_to_content_id(blob): """Convert a dulwich blob to a Software Heritage content id""" if blob.type_name != b'blob': return size = blob.raw_length() ret = { 'sha1_git': blob.sha().digest(), 'length': size, } data = blob.as_raw_string() ret.update(hashutil.hash_data(data, HASH_ALGORITHMS)) return ret def dulwich_blob_to_content(blob, log=None, max_content_size=None, origin_id=None): """Convert a dulwich blob to a Software Heritage content""" if blob.type_name != b'blob': return ret = dulwich_blob_to_content_id(blob) size = ret['length'] if max_content_size: if size > max_content_size: id = hashutil.hash_to_hex(ret['sha1_git']) if log: log.info('Skipping content %s, too large (%s > %s)' % (id, size, max_content_size), extra={ 'swh_type': 'loader_git_content_skip', 'swh_id': id, 'swh_size': size, }) ret['status'] = 'absent' ret['reason'] = 'Content too large' ret['origin'] = origin_id return ret data = blob.as_raw_string() ret['data'] = data ret['status'] = 'visible' return ret def dulwich_tree_to_directory(tree, log=None): """Format a tree as a directory""" if tree.type_name != b'tree': return ret = { 'id': tree.sha().digest(), } entries = [] ret['entries'] = entries entry_mode_map = { 0o040000: 'dir', 0o160000: 'rev', 0o100644: 'file', 0o100755: 'file', 0o120000: 'file', } for entry in tree.iteritems(): entries.append({ 'type': entry_mode_map.get(entry.mode, 'file'), 'perms': entry.mode, 'name': entry.path, 'target': hashutil.hash_to_bytes(entry.sha.decode('ascii')), }) return ret def parse_author(name_email): """Parse an author line""" if name_email is None: return None try: open_bracket = name_email.index(b'<') except ValueError: name = email = None else: raw_name = name_email[:open_bracket] raw_email = name_email[open_bracket+1:] if not raw_name: name = None elif raw_name.endswith(b' '): name = raw_name[:-1] else: name = raw_name try: close_bracket = raw_email.index(b'>') except ValueError: email = None else: email = raw_email[:close_bracket] return { 'name': name, 'email': email, 'fullname': name_email, } def dulwich_tsinfo_to_timestamp(timestamp, timezone, timezone_neg_utc): """Convert the dulwich timestamp information to a structure compatible with Software Heritage""" return { 'timestamp': timestamp, 'offset': timezone // 60, 'negative_utc': timezone_neg_utc if timezone == 0 else None, } def dulwich_commit_to_revision(commit, log=None): if commit.type_name != b'commit': return ret = { 'id': commit.sha().digest(), 'author': parse_author(commit.author), 'date': dulwich_tsinfo_to_timestamp( commit.author_time, commit.author_timezone, commit._author_timezone_neg_utc, ), 'committer': parse_author(commit.committer), 'committer_date': dulwich_tsinfo_to_timestamp( commit.commit_time, commit.commit_timezone, commit._commit_timezone_neg_utc, ), 'type': 'git', 'directory': bytes.fromhex(commit.tree.decode()), 'message': commit.message, 'metadata': None, 'synthetic': False, 'parents': [bytes.fromhex(p.decode()) for p in commit.parents], } git_metadata = [] if commit.encoding is not None: git_metadata.append(['encoding', commit.encoding]) if commit.mergetag: for mergetag in commit.mergetag: raw_string = mergetag.as_raw_string() assert raw_string.endswith(b'\n') git_metadata.append(['mergetag', raw_string[:-1]]) if commit.extra: git_metadata.extend([k.decode('utf-8'), v] for k, v in commit.extra) if commit.gpgsig: git_metadata.append(['gpgsig', commit.gpgsig]) if git_metadata: ret['metadata'] = { 'extra_headers': git_metadata, } return ret DULWICH_TYPES = { b'blob': 'content', b'tree': 'directory', b'commit': 'revision', b'tag': 'release', } def dulwich_tag_to_release(tag, log=None): if tag.type_name != b'tag': return target_type, target = tag.object ret = { 'id': tag.sha().digest(), 'name': tag.name, 'target': bytes.fromhex(target.decode()), 'target_type': DULWICH_TYPES[target_type.type_name], 'message': tag._message, 'metadata': None, 'synthetic': False, } if tag.tagger: ret['author'] = parse_author(tag.tagger) if not tag.tag_time: ret['date'] = None else: ret['date'] = dulwich_tsinfo_to_timestamp( tag.tag_time, tag.tag_timezone, tag._tag_timezone_neg_utc, ) else: ret['author'] = ret['date'] = None return ret + + +def branches_to_snapshot(branches): + snapshot = {'branches': branches} + snapshot_id = identifiers.snapshot_identifier(snapshot) + snapshot['id'] = identifiers.identifier_to_bytes(snapshot_id) + + return snapshot diff --git a/swh/loader/git/loader.py b/swh/loader/git/loader.py index 785ee4a..2320008 100644 --- a/swh/loader/git/loader.py +++ b/swh/loader/git/loader.py @@ -1,295 +1,304 @@ # Copyright (C) 2015-2017 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import datetime import dulwich.repo import os import shutil from dulwich.errors import ObjectFormatException, EmptyFileException from collections import defaultdict from swh.model import hashutil -from . import base, converters, utils +from swh.loader.core.loader import SWHStatelessLoader +from . import converters, utils -class GitLoader(base.BaseLoader): +class GitLoader(SWHStatelessLoader): """Load a git repository from a directory. """ CONFIG_BASE_FILENAME = 'loader/git-loader' + def __init__(self, config=None): + super().__init__(logging_class='swh.loader.git.Loader', config=config) + def prepare(self, origin_url, directory, visit_date): self.origin_url = origin_url self.origin = self.get_origin() self.repo = dulwich.repo.Repo(directory) self.visit_date = visit_date def get_origin(self): """Get the origin that is currently being loaded""" return converters.origin_url_to_origin(self.origin_url) def iter_objects(self): object_store = self.repo.object_store for pack in object_store.packs: objs = list(pack.index.iterentries()) objs.sort(key=lambda x: x[1]) for sha, offset, crc32 in objs: yield hashutil.hash_to_bytehex(sha) yield from object_store._iter_loose_objects() yield from object_store._iter_alternate_objects() def _check(self, obj): """Check the object's repository representation. If any errors in check exists, an ObjectFormatException is raised. Args: obj (object): Dulwich object read from the repository. """ obj.check() from dulwich.objects import Commit, Tag try: # For additional checks on dulwich objects with date # for now, only checks on *time if isinstance(obj, Commit): commit_time = obj._commit_time utils.check_date_time(commit_time) author_time = obj._author_time utils.check_date_time(author_time) elif isinstance(obj, Tag): tag_time = obj._tag_time utils.check_date_time(tag_time) except Exception as e: raise ObjectFormatException(e) def get_object(self, oid): """Given an object id, return the object if it is found and not malformed in some way. Args: oid (bytes): the object's identifier Returns: The object if found without malformation """ try: # some errors are raised when reading the object obj = self.repo[oid] # some we need to check ourselves self._check(obj) except KeyError: _id = oid.decode('utf-8') self.log.warn('object %s not found, skipping' % _id, extra={ 'swh_type': 'swh_loader_git_missing_object', 'swh_object_id': _id, 'origin_id': self.origin_id, }) return None except ObjectFormatException: _id = oid.decode('utf-8') self.log.warn('object %s malformed, skipping' % _id, extra={ 'swh_type': 'swh_loader_git_missing_object', 'swh_object_id': _id, 'origin_id': self.origin_id, }) return None except EmptyFileException: _id = oid.decode('utf-8') self.log.warn('object %s corrupted (empty file), skipping' % _id, extra={ 'swh_type': 'swh_loader_git_missing_object', 'swh_object_id': _id, 'origin_id': self.origin_id, }) else: return obj def fetch_data(self): """Fetch the data from the data source""" + self.previous_snapshot = self.storage.snapshot_get_latest( + self.origin_id + ) + type_to_ids = defaultdict(list) for oid in self.iter_objects(): obj = self.get_object(oid) if not obj: continue type_name = obj.type_name type_to_ids[type_name].append(oid) self.type_to_ids = type_to_ids def has_contents(self): """Checks whether we need to load contents""" return bool(self.type_to_ids[b'blob']) def get_content_ids(self): """Get the content identifiers from the git repository""" for oid in self.type_to_ids[b'blob']: yield converters.dulwich_blob_to_content_id(self.repo[oid]) def get_contents(self): """Get the contents that need to be loaded""" max_content_size = self.config['content_size_limit'] missing_contents = set(self.storage.content_missing( self.get_content_ids(), 'sha1_git')) for oid in missing_contents: yield converters.dulwich_blob_to_content( self.repo[hashutil.hash_to_bytehex(oid)], log=self.log, max_content_size=max_content_size, origin_id=self.origin_id) def has_directories(self): """Checks whether we need to load directories""" return bool(self.type_to_ids[b'tree']) def get_directory_ids(self): """Get the directory identifiers from the git repository""" return (hashutil.hash_to_bytes(id.decode()) for id in self.type_to_ids[b'tree']) def get_directories(self): """Get the directories that need to be loaded""" missing_dirs = set(self.storage.directory_missing( sorted(self.get_directory_ids()))) for oid in missing_dirs: yield converters.dulwich_tree_to_directory( self.repo[hashutil.hash_to_bytehex(oid)], log=self.log) def has_revisions(self): """Checks whether we need to load revisions""" return bool(self.type_to_ids[b'commit']) def get_revision_ids(self): """Get the revision identifiers from the git repository""" return (hashutil.hash_to_bytes(id.decode()) for id in self.type_to_ids[b'commit']) def get_revisions(self): """Get the revisions that need to be loaded""" missing_revs = set(self.storage.revision_missing( sorted(self.get_revision_ids()))) for oid in missing_revs: yield converters.dulwich_commit_to_revision( self.repo[hashutil.hash_to_bytehex(oid)], log=self.log) def has_releases(self): """Checks whether we need to load releases""" return bool(self.type_to_ids[b'tag']) def get_release_ids(self): """Get the release identifiers from the git repository""" return (hashutil.hash_to_bytes(id.decode()) for id in self.type_to_ids[b'tag']) def get_releases(self): """Get the releases that need to be loaded""" missing_rels = set(self.storage.release_missing( sorted(self.get_release_ids()))) for oid in missing_rels: yield converters.dulwich_tag_to_release( self.repo[hashutil.hash_to_bytehex(oid)], log=self.log) - def has_occurrences(self): - """Checks whether we need to load occurrences""" - return True - - def get_occurrences(self): - """Get the occurrences that need to be loaded""" - origin_id = self.origin_id - visit = self.visit - ref_objs = ((refs, target, self.get_object(target)) - for refs, target in self.repo.refs.as_dict().items() - if self.get_object(target)) - - for ref, target, obj in ref_objs: - target_type_name = obj.type_name - target_type = converters.DULWICH_TYPES[target_type_name] - yield { - 'branch': ref, - 'origin': origin_id, - 'target': hashutil.bytehex_to_hash(target), - 'target_type': target_type, - 'visit': visit, - } + def get_snapshot(self): + """Turn the list of branches into a snapshot to load""" + branches = {} + + for ref, target in self.repo.refs.as_dict().items(): + obj = self.get_object(target) + if obj: + branches[ref] = { + 'target': hashutil.bytehex_to_hash(target), + 'target_type': converters.DULWICH_TYPES[obj.type_name], + } + else: + branches[ref] = None + + self.snapshot = converters.branches_to_snapshot(branches) + return self.snapshot def get_fetch_history_result(self): """Return the data to store in fetch_history for the current loader""" return { 'contents': len(self.type_to_ids[b'blob']), 'directories': len(self.type_to_ids[b'tree']), 'revisions': len(self.type_to_ids[b'commit']), 'releases': len(self.type_to_ids[b'tag']), - 'occurrences': len(self.repo.refs.allkeys()), } def save_data(self): """We already have the data locally, no need to save it""" pass - def eventful(self): - """Whether the load was eventful""" - return True + def load_status(self): + """The load was eventful if the current occurrences are different to + the ones we retrieved at the beginning of the run""" + eventful = False + + if self.previous_snapshot: + eventful = self.snapshot['id'] != self.previous_snapshot['id'] + else: + eventful = bool(self.snapshot['branches']) + + return {'status': ('eventful' if eventful else 'uneventful')} class GitLoaderFromArchive(GitLoader): """Load a git repository from an archive. """ def project_name_from_archive(self, archive_path): """Compute the project name from the archive's path. """ return os.path.basename(os.path.dirname(archive_path)) def prepare(self, origin_url, archive_path, visit_date): """1. Uncompress the archive in temporary location. 2. Prepare as the GitLoader does 3. Load as GitLoader does """ project_name = self.project_name_from_archive(archive_path) self.temp_dir, self.repo_path = utils.init_git_repo_from_archive( project_name, archive_path) self.log.info('Project %s - Uncompressing archive %s at %s' % ( origin_url, os.path.basename(archive_path), self.repo_path)) super().prepare(origin_url, self.repo_path, visit_date) def cleanup(self): """Cleanup the temporary location (if it exists). """ if self.temp_dir and os.path.exists(self.temp_dir): shutil.rmtree(self.temp_dir) self.log.info('Project %s - Done injecting %s' % ( self.origin_url, self.repo_path)) if __name__ == '__main__': import logging import sys logging.basicConfig( level=logging.DEBUG, format='%(asctime)s %(process)d %(message)s' ) loader = GitLoader() origin_url = sys.argv[1] directory = sys.argv[2] visit_date = datetime.datetime.now(tz=datetime.timezone.utc) print(loader.load(origin_url, directory, visit_date)) diff --git a/swh/loader/git/tests/test_utils.py b/swh/loader/git/tests/test_utils.py index b288d2b..adb20c4 100644 --- a/swh/loader/git/tests/test_utils.py +++ b/swh/loader/git/tests/test_utils.py @@ -1,35 +1,35 @@ # Copyright (C) 2015-2017 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import unittest from nose.tools import istest from swh.loader.git import utils class TestUtils(unittest.TestCase): @istest def check_date_time(self): """A long as datetime is fine, date time check does not raise """ for e in range(32, 37): ts = 2**e utils.check_date_time(ts) @istest def check_date_time_empty_value(self): self.assertIsNone(utils.check_date_time(None)) @istest def check_date_time_raises(self): """From a give threshold, check will no longer works. """ exp = 38 timestamp = 2**exp - with self.assertRaisesRegex(ValueError, 'year is out of range'): + with self.assertRaisesRegex(ValueError, 'is out of range'): utils.check_date_time(timestamp) diff --git a/swh/loader/git/updater.py b/swh/loader/git/updater.py index bb02b54..7cc4dfe 100644 --- a/swh/loader/git/updater.py +++ b/swh/loader/git/updater.py @@ -1,480 +1,472 @@ # Copyright (C) 2016-2017 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from io import BytesIO import datetime import logging import os import pickle import sys from collections import defaultdict import dulwich.client from dulwich.object_store import ObjectStoreGraphWalker from dulwich.pack import PackData, PackInflater -from urllib.parse import urlparse from swh.model import hashutil - -from . import base, converters +from swh.loader.core.loader import SWHStatelessLoader +from . import converters class SWHRepoRepresentation: """Repository representation for a Software Heritage origin.""" - def __init__(self, storage, origin_id, occurrences=None): + def __init__(self, storage, origin_id, base_snapshot=None): self.storage = storage self._parents_cache = {} self._type_cache = {} if origin_id: - self.heads = set(self._cache_heads(origin_id, occurrences)) + self.heads = set(self._cache_heads(origin_id, base_snapshot)) else: self.heads = set() def _fill_parents_cache(self, commits): """When querying for a commit's parents, we fill the cache to a depth of 1000 commits.""" root_revs = self._encode_for_storage(commits) for rev, parents in self.storage.revision_shortlog(root_revs, 1000): rev_id = hashutil.hash_to_bytehex(rev) if rev_id not in self._parents_cache: self._parents_cache[rev_id] = [ hashutil.hash_to_bytehex(parent) for parent in parents ] for rev in commits: if rev not in self._parents_cache: self._parents_cache[rev] = [] - def _cache_heads(self, origin_id, occurrences): + def _cache_heads(self, origin_id, base_snapshot): """Return all the known head commits for `origin_id`""" - if not occurrences: - occurrences = self.storage.occurrence_get(origin_id) - - return self._decode_from_storage( - occurrence['target'] for occurrence in occurrences - ) + if not base_snapshot: + base_snapshot = self.storage.snapshot_get_latest(origin_id) + + if base_snapshot: + return self._decode_from_storage( + target['target'] + for target in base_snapshot['branches'].values() + ) + else: + return [] def get_parents(self, commit): """get the parent commits for `commit`""" # Prime the parents cache if not self._parents_cache and self.heads: self._fill_parents_cache(self.heads) if commit not in self._parents_cache: self._fill_parents_cache([commit]) return self._parents_cache[commit] def get_heads(self): return self.heads @staticmethod def _encode_for_storage(objects): return [hashutil.bytehex_to_hash(object) for object in objects] @staticmethod def _decode_from_storage(objects): return set(hashutil.hash_to_bytehex(object) for object in objects) def graph_walker(self): return ObjectStoreGraphWalker(self.get_heads(), self.get_parents) @staticmethod def filter_unwanted_refs(refs): """Filter the unwanted references from refs""" ret = {} for ref, val in refs.items(): if ref.endswith(b'^{}'): # Peeled refs make the git protocol explode continue elif ref.startswith(b'refs/pull/') and ref.endswith(b'/merge'): # We filter-out auto-merged GitHub pull requests continue else: ret[ref] = val return ret def determine_wants(self, refs): """Filter the remote references to figure out which ones Software Heritage needs. """ if not refs: return [] # Find what objects Software Heritage has refs = self.find_remote_ref_types_in_swh(refs) # Cache the objects found in swh as existing heads for target in refs.values(): if target['target_type'] is not None: self.heads.add(target['target']) ret = set() for target in self.filter_unwanted_refs(refs).values(): if target['target_type'] is None: # The target doesn't exist in Software Heritage, let's retrieve # it. ret.add(target['target']) return list(ret) def get_stored_objects(self, objects): return self.storage.object_find_by_sha1_git( self._encode_for_storage(objects)) def find_remote_ref_types_in_swh(self, remote_refs): """Parse the remote refs information and list the objects that exist in Software Heritage. """ all_objs = set(remote_refs.values()) - set(self._type_cache) type_by_id = {} for id, objs in self.get_stored_objects(all_objs).items(): id = hashutil.hash_to_bytehex(id) if objs: type_by_id[id] = objs[0]['type'] self._type_cache.update(type_by_id) ret = {} for ref, id in remote_refs.items(): ret[ref] = { 'target': id, 'target_type': self._type_cache.get(id), } return ret -class BulkUpdater(base.BaseLoader): +class BulkUpdater(SWHStatelessLoader): """A bulk loader for a git repository""" CONFIG_BASE_FILENAME = 'loader/git-updater' ADDITIONAL_CONFIG = { 'pack_size_bytes': ('int', 4 * 1024 * 1024 * 1024), } - def __init__(self, repo_representation=SWHRepoRepresentation): + def __init__(self, repo_representation=SWHRepoRepresentation, config=None): """Initialize the bulk updater. Args: repo_representation: swh's repository representation which is in charge of filtering between known and remote data. """ - super().__init__() + super().__init__(logging_class='swh.loader.git.BulkLoader', + config=config) self.repo_representation = repo_representation def fetch_pack_from_origin(self, origin_url, base_origin_id, - base_occurrences, do_activity): + base_snapshot, do_activity): """Fetch a pack from the origin""" pack_buffer = BytesIO() base_repo = self.repo_representation(self.storage, base_origin_id, - base_occurrences) - - parsed_uri = urlparse(origin_url) - - path = parsed_uri.path - if not path.endswith('.git'): - path += '.git' + base_snapshot) - client = dulwich.client.TCPGitClient(parsed_uri.netloc, - thin_packs=False) + client, path = dulwich.client.get_transport_and_path(origin_url, + thin_packs=False) size_limit = self.config['pack_size_bytes'] def do_pack(data, pack_buffer=pack_buffer, limit=size_limit, origin_url=origin_url): cur_size = pack_buffer.tell() would_write = len(data) if cur_size + would_write > limit: raise IOError('Pack file too big for repository %s, ' 'limit is %d bytes, current size is %d, ' 'would write %d' % (origin_url, limit, cur_size, would_write)) pack_buffer.write(data) - remote_refs = client.fetch_pack(path.encode('ascii'), + remote_refs = client.fetch_pack(path, base_repo.determine_wants, base_repo.graph_walker(), do_pack, progress=do_activity) if remote_refs: local_refs = base_repo.find_remote_ref_types_in_swh(remote_refs) else: local_refs = remote_refs = {} pack_buffer.flush() pack_size = pack_buffer.tell() pack_buffer.seek(0) return { 'remote_refs': base_repo.filter_unwanted_refs(remote_refs), 'local_refs': local_refs, 'pack_buffer': pack_buffer, 'pack_size': pack_size, } def list_pack(self, pack_data, pack_size): id_to_type = {} type_to_ids = defaultdict(set) inflater = self.get_inflater() for obj in inflater: type, id = obj.type_name, obj.id id_to_type[id] = type type_to_ids[type].add(id) return id_to_type, type_to_ids def prepare(self, origin_url, base_url=None): self.visit_date = datetime.datetime.now(tz=datetime.timezone.utc) origin = converters.origin_url_to_origin(origin_url) base_origin = converters.origin_url_to_origin(base_url) - base_occurrences = [] + prev_snapshot = {} base_origin_id = origin_id = None db_origin = self.storage.origin_get(origin) if db_origin: base_origin_id = origin_id = db_origin['id'] if origin_id: - base_occurrences = self.storage.occurrence_get(origin_id) + prev_snapshot = self.storage.snapshot_get_latest(origin_id) - if base_url and not base_occurrences: + if base_url and not prev_snapshot: base_origin = self.storage.origin_get(base_origin) if base_origin: base_origin_id = base_origin['id'] - base_occurrences = self.storage.occurrence_get(base_origin_id) + prev_snapshot = self.storage.snapshot_get_latest( + base_origin_id + ) - self.base_occurrences = list(sorted(base_occurrences, - key=lambda occ: occ['branch'])) + self.base_snapshot = prev_snapshot self.base_origin_id = base_origin_id self.origin = origin def get_origin(self): return self.origin def fetch_data(self): def do_progress(msg): sys.stderr.buffer.write(msg) sys.stderr.flush() fetch_info = self.fetch_pack_from_origin( - self.origin['url'], self.base_origin_id, self.base_occurrences, + self.origin['url'], self.base_origin_id, self.base_snapshot, do_progress) self.pack_buffer = fetch_info['pack_buffer'] self.pack_size = fetch_info['pack_size'] self.remote_refs = fetch_info['remote_refs'] self.local_refs = fetch_info['local_refs'] origin_url = self.origin['url'] self.log.info('Listed %d refs for repo %s' % ( len(self.remote_refs), origin_url), extra={ 'swh_type': 'git_repo_list_refs', 'swh_repo': origin_url, 'swh_num_refs': len(self.remote_refs), }) # We want to load the repository, walk all the objects id_to_type, type_to_ids = self.list_pack(self.pack_buffer, self.pack_size) self.id_to_type = id_to_type self.type_to_ids = type_to_ids def save_data(self): """Store a pack for archival""" write_size = 8192 pack_dir = self.get_save_data_path() pack_name = "%s.pack" % self.visit_date.isoformat() refs_name = "%s.refs" % self.visit_date.isoformat() with open(os.path.join(pack_dir, pack_name), 'xb') as f: while True: r = self.pack_buffer.read(write_size) if not r: break f.write(r) self.pack_buffer.seek(0) with open(os.path.join(pack_dir, refs_name), 'xb') as f: pickle.dump(self.remote_refs, f) def get_inflater(self): """Reset the pack buffer and get an object inflater from it""" self.pack_buffer.seek(0) return PackInflater.for_pack_data( PackData.from_file(self.pack_buffer, self.pack_size)) def has_contents(self): return bool(self.type_to_ids[b'blob']) def get_content_ids(self): """Get the content identifiers from the git repository""" for raw_obj in self.get_inflater(): if raw_obj.type_name != b'blob': continue yield converters.dulwich_blob_to_content_id(raw_obj) def get_contents(self): """Format the blobs from the git repository as swh contents""" max_content_size = self.config['content_size_limit'] missing_contents = set(self.storage.content_missing( self.get_content_ids(), 'sha1_git')) for raw_obj in self.get_inflater(): if raw_obj.type_name != b'blob': continue if raw_obj.sha().digest() not in missing_contents: continue yield converters.dulwich_blob_to_content( raw_obj, log=self.log, max_content_size=max_content_size, origin_id=self.origin_id) def has_directories(self): return bool(self.type_to_ids[b'tree']) def get_directory_ids(self): """Get the directory identifiers from the git repository""" return (hashutil.hash_to_bytes(id.decode()) for id in self.type_to_ids[b'tree']) def get_directories(self): """Format the trees as swh directories""" missing_dirs = set(self.storage.directory_missing( sorted(self.get_directory_ids()))) for raw_obj in self.get_inflater(): if raw_obj.type_name != b'tree': continue if raw_obj.sha().digest() not in missing_dirs: continue yield converters.dulwich_tree_to_directory(raw_obj, log=self.log) def has_revisions(self): return bool(self.type_to_ids[b'commit']) def get_revision_ids(self): """Get the revision identifiers from the git repository""" return (hashutil.hash_to_bytes(id.decode()) for id in self.type_to_ids[b'commit']) def get_revisions(self): """Format commits as swh revisions""" missing_revs = set(self.storage.revision_missing( sorted(self.get_revision_ids()))) for raw_obj in self.get_inflater(): if raw_obj.type_name != b'commit': continue if raw_obj.sha().digest() not in missing_revs: continue yield converters.dulwich_commit_to_revision(raw_obj, log=self.log) def has_releases(self): return bool(self.type_to_ids[b'tag']) def get_release_ids(self): """Get the release identifiers from the git repository""" return (hashutil.hash_to_bytes(id.decode()) for id in self.type_to_ids[b'tag']) def get_releases(self): """Retrieve all the release objects from the git repository""" missing_rels = set(self.storage.release_missing( sorted(self.get_release_ids()))) for raw_obj in self.get_inflater(): if raw_obj.type_name != b'tag': continue if raw_obj.sha().digest() not in missing_rels: continue yield converters.dulwich_tag_to_release(raw_obj, log=self.log) - def has_occurrences(self): - return bool(self.remote_refs) + def get_snapshot(self): + branches = {} - def get_occurrences(self): - origin_id = self.origin_id - visit = self.visit - - ret = [] for ref in self.remote_refs: ret_ref = self.local_refs[ref].copy() - ret_ref.update({ - 'branch': ref, - 'origin': origin_id, - 'visit': visit, - }) if not ret_ref['target_type']: target_type = self.id_to_type[ret_ref['target']] ret_ref['target_type'] = converters.DULWICH_TYPES[target_type] ret_ref['target'] = hashutil.bytehex_to_hash(ret_ref['target']) - ret.append(ret_ref) + branches[ref] = ret_ref - return ret + self.snapshot = converters.branches_to_snapshot(branches) + return self.snapshot def get_fetch_history_result(self): return { 'contents': len(self.type_to_ids[b'blob']), 'directories': len(self.type_to_ids[b'tree']), 'revisions': len(self.type_to_ids[b'commit']), 'releases': len(self.type_to_ids[b'tag']), - 'occurrences': len(self.remote_refs), } - def eventful(self): - """The load was eventful if the current occurrences are different to - the ones we retrieved at the beginning of the run""" - current_occurrences = list(sorted( - self.storage.occurrence_get(self.origin_id), - key=lambda occ: occ['branch'], - )) + def load_status(self): + """The load was eventful if the current snapshot is different to + the one we retrieved at the beginning of the run""" + eventful = False + + if self.base_snapshot: + print(self.snapshot, self.base_snapshot) + eventful = self.snapshot['id'] != self.base_snapshot['id'] + else: + eventful = bool(self.snapshot['branches']) - return self.base_occurrences != current_occurrences + return {'status': ('eventful' if eventful else 'uneventful')} if __name__ == '__main__': logging.basicConfig( level=logging.DEBUG, format='%(asctime)s %(process)d %(message)s' ) bulkupdater = BulkUpdater() origin_url = sys.argv[1] base_url = origin_url if len(sys.argv) > 2: base_url = sys.argv[2] print(bulkupdater.load(origin_url, base_url)) diff --git a/version.txt b/version.txt index 1e88f9d..9d54566 100644 --- a/version.txt +++ b/version.txt @@ -1 +1 @@ -v0.0.35-0-gf8d8d8f \ No newline at end of file +v0.0.36-0-g356b937 \ No newline at end of file