diff --git a/PKG-INFO b/PKG-INFO index b23afab..6aba2fa 100644 --- a/PKG-INFO +++ b/PKG-INFO @@ -1,10 +1,10 @@ Metadata-Version: 1.0 Name: swh.loader.git -Version: 0.0.34 +Version: 0.0.35 Summary: Software Heritage git loader Home-page: https://forge.softwareheritage.org/diffusion/DCORE/ Author: Software Heritage developers Author-email: swh-devel@inria.fr License: UNKNOWN Description: UNKNOWN Platform: UNKNOWN diff --git a/debian/control b/debian/control index f992a0a..6876512 100644 --- a/debian/control +++ b/debian/control @@ -1,29 +1,31 @@ Source: swh-loader-git Maintainer: Software Heritage developers Section: python Priority: optional Build-Depends: debhelper (>= 9), dh-python (>= 2), python3-all, python3-click, - python3-dulwich, + python3-dulwich (>= 0.18.7~), python3-nose, python3-retrying, python3-setuptools, python3-swh.core (>= 0.0.7~), + python3-swh.loader.core (>= 0.0.22), python3-swh.model (>= 0.0.15~), python3-swh.scheduler (>= 0.0.14~), python3-swh.storage (>= 0.0.83~), python3-vcversioner Standards-Version: 3.9.6 Homepage: https://forge.softwareheritage.org/diffusion/DLDG/ Package: python3-swh.loader.git Architecture: all Depends: python3-swh.core (>= 0.0.7~), + python3-swh.loader.core (>= 0.0.22~), python3-swh.model (>= 0.0.15~), python3-swh.scheduler (>= 0.0.14~), python3-swh.storage (>= 0.0.83~), ${misc:Depends}, ${python3:Depends} Description: Software Heritage Git loader diff --git a/docs/index.rst b/docs/index.rst index 8b64117..cef3d76 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -1,15 +1,17 @@ +.. _swh-loader-git: + Software Heritage - Development Documentation ============================================= .. toctree:: :maxdepth: 2 :caption: Contents: Indices and tables ================== * :ref:`genindex` * :ref:`modindex` * :ref:`search` diff --git a/requirements-swh.txt b/requirements-swh.txt index f0dd184..187f8ca 100644 --- a/requirements-swh.txt +++ b/requirements-swh.txt @@ -1,4 +1,5 @@ swh.core >= 0.0.7 +swh.loader.core >= 0.0.22 swh.model >= 0.0.15 swh.scheduler >= 0.0.14 swh.storage >= 0.0.83 diff --git a/requirements.txt b/requirements.txt index 5925f74..b42ee35 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,4 +1,4 @@ -dulwich +dulwich >= 0.18.7 retrying vcversioner click diff --git a/swh.loader.git.egg-info/PKG-INFO b/swh.loader.git.egg-info/PKG-INFO index b23afab..6aba2fa 100644 --- a/swh.loader.git.egg-info/PKG-INFO +++ b/swh.loader.git.egg-info/PKG-INFO @@ -1,10 +1,10 @@ Metadata-Version: 1.0 Name: swh.loader.git -Version: 0.0.34 +Version: 0.0.35 Summary: Software Heritage git loader Home-page: https://forge.softwareheritage.org/diffusion/DCORE/ Author: Software Heritage developers Author-email: swh-devel@inria.fr License: UNKNOWN Description: UNKNOWN Platform: UNKNOWN diff --git a/swh.loader.git.egg-info/SOURCES.txt b/swh.loader.git.egg-info/SOURCES.txt index b791016..18775fc 100644 --- a/swh.loader.git.egg-info/SOURCES.txt +++ b/swh.loader.git.egg-info/SOURCES.txt @@ -1,47 +1,48 @@ .gitignore .gitmodules AUTHORS LICENSE MANIFEST.in Makefile README requirements-swh.txt requirements.txt setup.py version.txt bin/dir-git-repo-meta.sh debian/changelog debian/compat debian/control debian/copyright debian/rules debian/source/format docs/.gitignore docs/Makefile docs/conf.py docs/index.rst docs/_static/.placeholder docs/_templates/.placeholder docs/attic/api-backend-protocol.txt docs/attic/git-loading-design.txt resources/local-loader-git.ini resources/remote-loader-git.ini resources/updater.ini resources/test/back.ini resources/test/db-manager.ini swh/__init__.py swh.loader.git.egg-info/PKG-INFO swh.loader.git.egg-info/SOURCES.txt swh.loader.git.egg-info/dependency_links.txt swh.loader.git.egg-info/requires.txt swh.loader.git.egg-info/top_level.txt swh/loader/__init__.py swh/loader/git/__init__.py swh/loader/git/base.py swh/loader/git/converters.py swh/loader/git/loader.py swh/loader/git/reader.py swh/loader/git/tasks.py swh/loader/git/updater.py swh/loader/git/utils.py -swh/loader/git/tests/test_converters.py \ No newline at end of file +swh/loader/git/tests/test_converters.py +swh/loader/git/tests/test_utils.py \ No newline at end of file diff --git a/swh.loader.git.egg-info/requires.txt b/swh.loader.git.egg-info/requires.txt index 9e96a57..420cd91 100644 --- a/swh.loader.git.egg-info/requires.txt +++ b/swh.loader.git.egg-info/requires.txt @@ -1,8 +1,9 @@ click -dulwich +dulwich>=0.18.7 retrying swh.core>=0.0.7 +swh.loader.core>=0.0.22 swh.model>=0.0.15 swh.scheduler>=0.0.14 swh.storage>=0.0.83 vcversioner diff --git a/swh/loader/git/base.py b/swh/loader/git/base.py index fd0171e..c135a68 100644 --- a/swh/loader/git/base.py +++ b/swh/loader/git/base.py @@ -1,455 +1,168 @@ # Copyright (C) 2016-2017 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information -import datetime -import logging +import abc import os -import traceback -import uuid -import psycopg2 -import requests -from retrying import retry +from swh.loader.core.loader import SWHLoader -from swh.core import config -from swh.storage import get_storage - -def send_in_packets(objects, sender, packet_size, packet_size_bytes=None): - """Send `objects`, using the `sender`, in packets of `packet_size` objects (and - of max `packet_size_bytes`). - """ - formatted_objects = [] - count = 0 - if not packet_size_bytes: - packet_size_bytes = 0 - for obj in objects: - if not obj: - continue - formatted_objects.append(obj) - if packet_size_bytes: - count += obj['length'] - if len(formatted_objects) >= packet_size or count > packet_size_bytes: - sender(formatted_objects) - formatted_objects = [] - count = 0 - - if formatted_objects: - sender(formatted_objects) - - -def retry_loading(error): - """Retry policy when we catch a recoverable error""" - exception_classes = [ - # raised when two parallel insertions insert the same data. - psycopg2.IntegrityError, - # raised when uWSGI restarts and hungs up on the worker. - requests.exceptions.ConnectionError, - ] - - if not any(isinstance(error, exc) for exc in exception_classes): - return False - - logger = logging.getLogger('swh.loader.git.BulkLoader') - - error_name = error.__module__ + '.' + error.__class__.__name__ - logger.warning('Retry loading a batch', exc_info=False, extra={ - 'swh_type': 'storage_retry', - 'swh_exception_type': error_name, - 'swh_exception': traceback.format_exception( - error.__class__, - error, - error.__traceback__, - ), - }) - - return True - - -class BaseLoader(config.SWHConfig): +class BaseLoader(SWHLoader): """This base class is a pattern for loaders. The external calling convention is as such: - instantiate the class once (loads storage and the configuration) - for each origin, call load with the origin-specific arguments (for instance, an origin URL). load calls several methods that must be implemented in subclasses: - prepare(\*args, \**kwargs) prepares the loader for the new origin - get_origin gets the origin object associated to the current loader - fetch_data downloads the necessary data from the origin - get_{contents,directories,revisions,releases,occurrences} retrieve each kind of object from the origin - has\_* checks whether there are some objects to load for that object type - get_fetch_history_result retrieves the data to insert in the fetch_history table once the load was successful - cleanup cleans up an eventual state installed for computations - eventful returns whether the load was eventful or not """ - - CONFIG_BASE_FILENAME = None - DEFAULT_CONFIG = { 'storage': ('dict', { 'cls': 'remote', 'args': { 'url': 'http://localhost:5002/' }, }), 'send_contents': ('bool', True), 'send_directories': ('bool', True), 'send_revisions': ('bool', True), 'send_releases': ('bool', True), 'send_occurrences': ('bool', True), 'save_data': ('bool', False), 'save_data_path': ('str', ''), 'content_packet_size': ('int', 10000), 'content_packet_size_bytes': ('int', 1024 * 1024 * 1024), 'directory_packet_size': ('int', 25000), 'revision_packet_size': ('int', 100000), 'release_packet_size': ('int', 100000), 'occurrence_packet_size': ('int', 100000), } - ADDITIONAL_CONFIG = {} - def __init__(self): - self.config = self.parse_config_file( - additional_configs=[self.ADDITIONAL_CONFIG]) + super().__init__(logging_class='swh.loader.git.BulkLoader') # Make sure the config is sane if self.config['save_data']: path = self.config['save_data_path'] os.stat(path) if not os.access(path, os.R_OK | os.W_OK): raise PermissionError("Permission denied: %r" % path) - self.storage = get_storage(**self.config['storage']) - - self.log = logging.getLogger('swh.loader.git.BulkLoader') - self.fetch_date = None # possibly overridden in self.prepare method - - def prepare(self, *args, **kwargs): - """Prepare the data source to be loaded""" - raise NotImplementedError - - def cleanup(self): - """Clean up an eventual state installed for computations.""" - pass - - def get_origin(self): - """Get the origin that is currently being loaded""" - raise NotImplementedError - - def fetch_data(self): - """Fetch the data from the data source""" - raise NotImplementedError + self.visit_date = None # possibly overridden in self.prepare method + @abc.abstractmethod def has_contents(self): """Checks whether we need to load contents""" - return True + pass def get_contents(self): """Get the contents that need to be loaded""" raise NotImplementedError + @abc.abstractmethod def has_directories(self): """Checks whether we need to load directories""" - return True + pass def get_directories(self): """Get the directories that need to be loaded""" raise NotImplementedError + @abc.abstractmethod def has_revisions(self): """Checks whether we need to load revisions""" - return True def get_revisions(self): """Get the revisions that need to be loaded""" raise NotImplementedError + @abc.abstractmethod def has_releases(self): """Checks whether we need to load releases""" return True def get_releases(self): """Get the releases that need to be loaded""" raise NotImplementedError + @abc.abstractmethod def has_occurrences(self): """Checks whether we need to load occurrences""" - return True + pass def get_occurrences(self): """Get the occurrences that need to be loaded""" raise NotImplementedError def get_fetch_history_result(self): """Return the data to store in fetch_history for the current loader""" raise NotImplementedError def eventful(self): """Whether the load was eventful""" raise NotImplementedError def save_data(self): """Save the data associated to the current load""" raise NotImplementedError def get_save_data_path(self): """The path to which we save the data""" if not hasattr(self, '__save_data_path'): origin_id = self.origin_id - year = str(self.fetch_date.year) + year = str(self.visit_date.year) path = os.path.join( self.config['save_data_path'], "%04d" % (origin_id % 10000), "%08d" % origin_id, year, ) os.makedirs(path, exist_ok=True) self.__save_data_path = path return self.__save_data_path - @retry(retry_on_exception=retry_loading, stop_max_attempt_number=3) - def send_contents(self, content_list): - """Actually send properly formatted contents to the database""" - num_contents = len(content_list) - log_id = str(uuid.uuid4()) - self.log.debug("Sending %d contents" % num_contents, - extra={ - 'swh_type': 'storage_send_start', - 'swh_content_type': 'content', - 'swh_num': num_contents, - 'swh_id': log_id, - }) - self.storage.content_add(content_list) - self.log.debug("Done sending %d contents" % num_contents, - extra={ - 'swh_type': 'storage_send_end', - 'swh_content_type': 'content', - 'swh_num': num_contents, - 'swh_id': log_id, - }) - - @retry(retry_on_exception=retry_loading, stop_max_attempt_number=3) - def send_directories(self, directory_list): - """Actually send properly formatted directories to the database""" - num_directories = len(directory_list) - log_id = str(uuid.uuid4()) - self.log.debug("Sending %d directories" % num_directories, - extra={ - 'swh_type': 'storage_send_start', - 'swh_content_type': 'directory', - 'swh_num': num_directories, - 'swh_id': log_id, - }) - self.storage.directory_add(directory_list) - self.log.debug("Done sending %d directories" % num_directories, - extra={ - 'swh_type': 'storage_send_end', - 'swh_content_type': 'directory', - 'swh_num': num_directories, - 'swh_id': log_id, - }) - - @retry(retry_on_exception=retry_loading, stop_max_attempt_number=3) - def send_revisions(self, revision_list): - """Actually send properly formatted revisions to the database""" - num_revisions = len(revision_list) - log_id = str(uuid.uuid4()) - self.log.debug("Sending %d revisions" % num_revisions, - extra={ - 'swh_type': 'storage_send_start', - 'swh_content_type': 'revision', - 'swh_num': num_revisions, - 'swh_id': log_id, - }) - self.storage.revision_add(revision_list) - self.log.debug("Done sending %d revisions" % num_revisions, - extra={ - 'swh_type': 'storage_send_end', - 'swh_content_type': 'revision', - 'swh_num': num_revisions, - 'swh_id': log_id, - }) - - @retry(retry_on_exception=retry_loading, stop_max_attempt_number=3) - def send_releases(self, release_list): - """Actually send properly formatted releases to the database""" - num_releases = len(release_list) - log_id = str(uuid.uuid4()) - self.log.debug("Sending %d releases" % num_releases, - extra={ - 'swh_type': 'storage_send_start', - 'swh_content_type': 'release', - 'swh_num': num_releases, - 'swh_id': log_id, - }) - self.storage.release_add(release_list) - self.log.debug("Done sending %d releases" % num_releases, - extra={ - 'swh_type': 'storage_send_end', - 'swh_content_type': 'release', - 'swh_num': num_releases, - 'swh_id': log_id, - }) - - @retry(retry_on_exception=retry_loading, stop_max_attempt_number=3) - def send_occurrences(self, occurrence_list): - """Actually send properly formatted occurrences to the database""" - num_occurrences = len(occurrence_list) - log_id = str(uuid.uuid4()) - self.log.debug("Sending %d occurrences" % num_occurrences, - extra={ - 'swh_type': 'storage_send_start', - 'swh_content_type': 'occurrence', - 'swh_num': num_occurrences, - 'swh_id': log_id, - }) - self.storage.occurrence_add(occurrence_list) - self.log.debug("Done sending %d occurrences" % num_occurrences, - extra={ - 'swh_type': 'storage_send_end', - 'swh_content_type': 'occurrence', - 'swh_num': num_occurrences, - 'swh_id': log_id, - }) - - def send_origin(self, origin): - log_id = str(uuid.uuid4()) - self.log.debug('Creating %s origin for %s' % (origin['type'], - origin['url']), - extra={ - 'swh_type': 'storage_send_start', - 'swh_content_type': 'origin', - 'swh_num': 1, - 'swh_id': log_id - }) - origin_id = self.storage.origin_add_one(origin) - self.log.debug('Done creating %s origin for %s' % (origin['type'], - origin['url']), - extra={ - 'swh_type': 'storage_send_end', - 'swh_content_type': 'origin', - 'swh_num': 1, - 'swh_id': log_id - }) - - return origin_id - - def send_all_contents(self, contents): - """Send all the contents to the database""" - packet_size = self.config['content_packet_size'] - packet_size_bytes = self.config['content_packet_size_bytes'] - - send_in_packets(contents, self.send_contents, packet_size, - packet_size_bytes=packet_size_bytes) - - def send_all_directories(self, directories): - """Send all the directories to the database""" - packet_size = self.config['directory_packet_size'] - - send_in_packets(directories, self.send_directories, packet_size) - - def send_all_revisions(self, revisions): - """Send all the revisions to the database""" - packet_size = self.config['revision_packet_size'] - - send_in_packets(revisions, self.send_revisions, packet_size) - - def send_all_releases(self, releases): - """Send all the releases to the database - """ - packet_size = self.config['release_packet_size'] - - send_in_packets(releases, self.send_releases, packet_size) + def cleanup(self): + """Clean up an eventual state installed for computations. + Nothing specific for the loader-git is needed. - def send_all_occurrences(self, occurrences): - """Send all the occurrences to the database """ - packet_size = self.config['occurrence_packet_size'] - - send_in_packets(occurrences, self.send_occurrences, packet_size) - - def open_fetch_history(self): - return self.storage.fetch_history_start(self.origin_id) - - def close_fetch_history_success(self, fetch_history_id, result): - data = { - 'status': True, - 'result': result, - } - return self.storage.fetch_history_end(fetch_history_id, data) - - def close_fetch_history_failure(self, fetch_history_id): - import traceback - data = { - 'status': False, - 'stderr': traceback.format_exc(), - } - return self.storage.fetch_history_end(fetch_history_id, data) - - def load(self, *args, **kwargs): - - self.prepare(*args, **kwargs) - origin = self.get_origin() - self.origin_id = self.send_origin(origin) - - fetch_history_id = self.open_fetch_history() - if self.fetch_date: # overwriting the visit_date the fetching date - date_visit = self.fetch_date - else: - date_visit = datetime.datetime.now(tz=datetime.timezone.utc) - - origin_visit = self.storage.origin_visit_add( - self.origin_id, - date_visit) - self.visit = origin_visit['visit'] - - try: - self.fetch_data() - - if self.config['save_data']: - self.save_data() - - if self.config['send_contents'] and self.has_contents(): - self.send_all_contents(self.get_contents()) - - if self.config['send_directories'] and self.has_directories(): - self.send_all_directories(self.get_directories()) - - if self.config['send_revisions'] and self.has_revisions(): - self.send_all_revisions(self.get_revisions()) - - if self.config['send_releases'] and self.has_releases(): - self.send_all_releases(self.get_releases()) - - if self.config['send_occurrences'] and self.has_occurrences(): - self.send_all_occurrences(self.get_occurrences()) - - self.close_fetch_history_success(fetch_history_id, - self.get_fetch_history_result()) - self.storage.origin_visit_update( - self.origin_id, self.visit, status='full') + pass - except: - self.close_fetch_history_failure(fetch_history_id) - self.storage.origin_visit_update( - self.origin_id, self.visit, status='partial') - raise - finally: - self.cleanup() + def store_data(self): + """Store data fetched from the git repository. - return self.eventful() + """ + if self.config['save_data']: + self.save_data() + + if self.config['send_contents'] and self.has_contents(): + self.send_batch_contents(self.get_contents()) + if self.config['send_directories'] and self.has_directories(): + self.send_batch_directories(self.get_directories()) + if self.config['send_revisions'] and self.has_revisions(): + self.send_batch_revisions(self.get_revisions()) + if self.config['send_releases'] and self.has_releases(): + self.send_batch_releases(self.get_releases()) + if self.config['send_occurrences'] and self.has_occurrences(): + self.send_batch_occurrences(self.get_occurrences()) diff --git a/swh/loader/git/converters.py b/swh/loader/git/converters.py index fc28332..b912a3c 100644 --- a/swh/loader/git/converters.py +++ b/swh/loader/git/converters.py @@ -1,229 +1,232 @@ # Copyright (C) 2015-2017 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information """Convert dulwich objects to dictionaries suitable for swh.storage""" from swh.model import hashutil HASH_ALGORITHMS = hashutil.DEFAULT_ALGORITHMS - {'sha1_git'} def origin_url_to_origin(origin_url): """Format a pygit2.Repository as an origin suitable for swh.storage""" return { 'type': 'git', 'url': origin_url, } def dulwich_blob_to_content_id(blob): """Convert a dulwich blob to a Software Heritage content id""" if blob.type_name != b'blob': return size = blob.raw_length() ret = { 'sha1_git': blob.sha().digest(), 'length': size, } data = blob.as_raw_string() ret.update(hashutil.hash_data(data, HASH_ALGORITHMS)) return ret def dulwich_blob_to_content(blob, log=None, max_content_size=None, origin_id=None): """Convert a dulwich blob to a Software Heritage content""" if blob.type_name != b'blob': return ret = dulwich_blob_to_content_id(blob) size = ret['length'] if max_content_size: if size > max_content_size: id = hashutil.hash_to_hex(ret['sha1_git']) if log: log.info('Skipping content %s, too large (%s > %s)' % (id, size, max_content_size), extra={ 'swh_type': 'loader_git_content_skip', 'swh_id': id, 'swh_size': size, }) ret['status'] = 'absent' ret['reason'] = 'Content too large' ret['origin'] = origin_id return ret data = blob.as_raw_string() ret['data'] = data ret['status'] = 'visible' return ret def dulwich_tree_to_directory(tree, log=None): """Format a tree as a directory""" if tree.type_name != b'tree': return ret = { 'id': tree.sha().digest(), } entries = [] ret['entries'] = entries entry_mode_map = { 0o040000: 'dir', 0o160000: 'rev', 0o100644: 'file', 0o100755: 'file', 0o120000: 'file', } for entry in tree.iteritems(): entries.append({ 'type': entry_mode_map.get(entry.mode, 'file'), 'perms': entry.mode, 'name': entry.path, 'target': hashutil.hash_to_bytes(entry.sha.decode('ascii')), }) return ret def parse_author(name_email): """Parse an author line""" if name_email is None: return None try: open_bracket = name_email.index(b'<') except ValueError: name = email = None else: raw_name = name_email[:open_bracket] raw_email = name_email[open_bracket+1:] if not raw_name: name = None elif raw_name.endswith(b' '): name = raw_name[:-1] else: name = raw_name try: close_bracket = raw_email.index(b'>') except ValueError: email = None else: email = raw_email[:close_bracket] return { 'name': name, 'email': email, 'fullname': name_email, } def dulwich_tsinfo_to_timestamp(timestamp, timezone, timezone_neg_utc): """Convert the dulwich timestamp information to a structure compatible with Software Heritage""" return { 'timestamp': timestamp, 'offset': timezone // 60, 'negative_utc': timezone_neg_utc if timezone == 0 else None, } def dulwich_commit_to_revision(commit, log=None): if commit.type_name != b'commit': return ret = { 'id': commit.sha().digest(), 'author': parse_author(commit.author), 'date': dulwich_tsinfo_to_timestamp( commit.author_time, commit.author_timezone, commit._author_timezone_neg_utc, ), 'committer': parse_author(commit.committer), 'committer_date': dulwich_tsinfo_to_timestamp( commit.commit_time, commit.commit_timezone, commit._commit_timezone_neg_utc, ), 'type': 'git', 'directory': bytes.fromhex(commit.tree.decode()), 'message': commit.message, 'metadata': None, 'synthetic': False, 'parents': [bytes.fromhex(p.decode()) for p in commit.parents], } git_metadata = [] if commit.encoding is not None: git_metadata.append(['encoding', commit.encoding]) if commit.mergetag: for mergetag in commit.mergetag: raw_string = mergetag.as_raw_string() assert raw_string.endswith(b'\n') git_metadata.append(['mergetag', raw_string[:-1]]) if commit.extra: git_metadata.extend([k.decode('utf-8'), v] for k, v in commit.extra) if commit.gpgsig: git_metadata.append(['gpgsig', commit.gpgsig]) if git_metadata: ret['metadata'] = { 'extra_headers': git_metadata, } return ret DULWICH_TYPES = { b'blob': 'content', b'tree': 'directory', b'commit': 'revision', b'tag': 'release', } def dulwich_tag_to_release(tag, log=None): if tag.type_name != b'tag': return target_type, target = tag.object ret = { 'id': tag.sha().digest(), 'name': tag.name, 'target': bytes.fromhex(target.decode()), 'target_type': DULWICH_TYPES[target_type.type_name], 'message': tag._message, 'metadata': None, 'synthetic': False, } if tag.tagger: ret['author'] = parse_author(tag.tagger) - ret['date'] = dulwich_tsinfo_to_timestamp( - tag.tag_time, - tag.tag_timezone, - tag._tag_timezone_neg_utc, - ) + if not tag.tag_time: + ret['date'] = None + else: + ret['date'] = dulwich_tsinfo_to_timestamp( + tag.tag_time, + tag.tag_timezone, + tag._tag_timezone_neg_utc, + ) else: ret['author'] = ret['date'] = None return ret diff --git a/swh/loader/git/loader.py b/swh/loader/git/loader.py index ee4450b..785ee4a 100644 --- a/swh/loader/git/loader.py +++ b/swh/loader/git/loader.py @@ -1,218 +1,295 @@ # Copyright (C) 2015-2017 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import datetime import dulwich.repo import os import shutil +from dulwich.errors import ObjectFormatException, EmptyFileException from collections import defaultdict from swh.model import hashutil from . import base, converters, utils class GitLoader(base.BaseLoader): """Load a git repository from a directory. """ CONFIG_BASE_FILENAME = 'loader/git-loader' - def prepare(self, origin_url, directory, fetch_date): + def prepare(self, origin_url, directory, visit_date): self.origin_url = origin_url + self.origin = self.get_origin() self.repo = dulwich.repo.Repo(directory) - self.fetch_date = fetch_date + self.visit_date = visit_date def get_origin(self): """Get the origin that is currently being loaded""" return converters.origin_url_to_origin(self.origin_url) def iter_objects(self): object_store = self.repo.object_store for pack in object_store.packs: objs = list(pack.index.iterentries()) objs.sort(key=lambda x: x[1]) for sha, offset, crc32 in objs: yield hashutil.hash_to_bytehex(sha) yield from object_store._iter_loose_objects() yield from object_store._iter_alternate_objects() + def _check(self, obj): + """Check the object's repository representation. + + If any errors in check exists, an ObjectFormatException is + raised. + + Args: + obj (object): Dulwich object read from the repository. + + """ + obj.check() + from dulwich.objects import Commit, Tag + try: + # For additional checks on dulwich objects with date + # for now, only checks on *time + if isinstance(obj, Commit): + commit_time = obj._commit_time + utils.check_date_time(commit_time) + author_time = obj._author_time + utils.check_date_time(author_time) + elif isinstance(obj, Tag): + tag_time = obj._tag_time + utils.check_date_time(tag_time) + except Exception as e: + raise ObjectFormatException(e) + + def get_object(self, oid): + """Given an object id, return the object if it is found and not + malformed in some way. + + Args: + oid (bytes): the object's identifier + + Returns: + The object if found without malformation + + """ + try: + # some errors are raised when reading the object + obj = self.repo[oid] + # some we need to check ourselves + self._check(obj) + except KeyError: + _id = oid.decode('utf-8') + self.log.warn('object %s not found, skipping' % _id, + extra={ + 'swh_type': 'swh_loader_git_missing_object', + 'swh_object_id': _id, + 'origin_id': self.origin_id, + }) + return None + except ObjectFormatException: + _id = oid.decode('utf-8') + self.log.warn('object %s malformed, skipping' % _id, + extra={ + 'swh_type': 'swh_loader_git_missing_object', + 'swh_object_id': _id, + 'origin_id': self.origin_id, + }) + return None + except EmptyFileException: + _id = oid.decode('utf-8') + self.log.warn('object %s corrupted (empty file), skipping' % _id, + extra={ + 'swh_type': 'swh_loader_git_missing_object', + 'swh_object_id': _id, + 'origin_id': self.origin_id, + }) + else: + return obj + def fetch_data(self): """Fetch the data from the data source""" type_to_ids = defaultdict(list) for oid in self.iter_objects(): - type_name = self.repo[oid].type_name + obj = self.get_object(oid) + if not obj: + continue + type_name = obj.type_name type_to_ids[type_name].append(oid) self.type_to_ids = type_to_ids def has_contents(self): """Checks whether we need to load contents""" return bool(self.type_to_ids[b'blob']) def get_content_ids(self): """Get the content identifiers from the git repository""" for oid in self.type_to_ids[b'blob']: - yield converters.dulwich_blob_to_content_id(self.repo[oid]) def get_contents(self): """Get the contents that need to be loaded""" max_content_size = self.config['content_size_limit'] missing_contents = set(self.storage.content_missing( self.get_content_ids(), 'sha1_git')) for oid in missing_contents: yield converters.dulwich_blob_to_content( self.repo[hashutil.hash_to_bytehex(oid)], log=self.log, max_content_size=max_content_size, origin_id=self.origin_id) def has_directories(self): """Checks whether we need to load directories""" return bool(self.type_to_ids[b'tree']) def get_directory_ids(self): """Get the directory identifiers from the git repository""" return (hashutil.hash_to_bytes(id.decode()) for id in self.type_to_ids[b'tree']) def get_directories(self): """Get the directories that need to be loaded""" missing_dirs = set(self.storage.directory_missing( sorted(self.get_directory_ids()))) for oid in missing_dirs: yield converters.dulwich_tree_to_directory( self.repo[hashutil.hash_to_bytehex(oid)], log=self.log) def has_revisions(self): """Checks whether we need to load revisions""" return bool(self.type_to_ids[b'commit']) def get_revision_ids(self): """Get the revision identifiers from the git repository""" return (hashutil.hash_to_bytes(id.decode()) for id in self.type_to_ids[b'commit']) def get_revisions(self): """Get the revisions that need to be loaded""" missing_revs = set(self.storage.revision_missing( sorted(self.get_revision_ids()))) for oid in missing_revs: yield converters.dulwich_commit_to_revision( self.repo[hashutil.hash_to_bytehex(oid)], log=self.log) def has_releases(self): """Checks whether we need to load releases""" return bool(self.type_to_ids[b'tag']) def get_release_ids(self): """Get the release identifiers from the git repository""" return (hashutil.hash_to_bytes(id.decode()) for id in self.type_to_ids[b'tag']) def get_releases(self): """Get the releases that need to be loaded""" missing_rels = set(self.storage.release_missing( sorted(self.get_release_ids()))) for oid in missing_rels: yield converters.dulwich_tag_to_release( self.repo[hashutil.hash_to_bytehex(oid)], log=self.log) def has_occurrences(self): """Checks whether we need to load occurrences""" return True def get_occurrences(self): """Get the occurrences that need to be loaded""" - repo = self.repo origin_id = self.origin_id visit = self.visit + ref_objs = ((refs, target, self.get_object(target)) + for refs, target in self.repo.refs.as_dict().items() + if self.get_object(target)) - for ref, target in repo.refs.as_dict().items(): - target_type_name = repo[target].type_name + for ref, target, obj in ref_objs: + target_type_name = obj.type_name target_type = converters.DULWICH_TYPES[target_type_name] yield { 'branch': ref, 'origin': origin_id, 'target': hashutil.bytehex_to_hash(target), 'target_type': target_type, 'visit': visit, } def get_fetch_history_result(self): """Return the data to store in fetch_history for the current loader""" return { 'contents': len(self.type_to_ids[b'blob']), 'directories': len(self.type_to_ids[b'tree']), 'revisions': len(self.type_to_ids[b'commit']), 'releases': len(self.type_to_ids[b'tag']), 'occurrences': len(self.repo.refs.allkeys()), } def save_data(self): """We already have the data locally, no need to save it""" pass def eventful(self): """Whether the load was eventful""" return True class GitLoaderFromArchive(GitLoader): """Load a git repository from an archive. """ def project_name_from_archive(self, archive_path): """Compute the project name from the archive's path. """ return os.path.basename(os.path.dirname(archive_path)) - def prepare(self, origin_url, archive_path, fetch_date): + def prepare(self, origin_url, archive_path, visit_date): """1. Uncompress the archive in temporary location. 2. Prepare as the GitLoader does 3. Load as GitLoader does """ project_name = self.project_name_from_archive(archive_path) self.temp_dir, self.repo_path = utils.init_git_repo_from_archive( project_name, archive_path) self.log.info('Project %s - Uncompressing archive %s at %s' % ( origin_url, os.path.basename(archive_path), self.repo_path)) - super().prepare(origin_url, self.repo_path, fetch_date) + super().prepare(origin_url, self.repo_path, visit_date) def cleanup(self): """Cleanup the temporary location (if it exists). """ if self.temp_dir and os.path.exists(self.temp_dir): shutil.rmtree(self.temp_dir) self.log.info('Project %s - Done injecting %s' % ( self.origin_url, self.repo_path)) if __name__ == '__main__': import logging import sys logging.basicConfig( level=logging.DEBUG, format='%(asctime)s %(process)d %(message)s' ) loader = GitLoader() origin_url = sys.argv[1] directory = sys.argv[2] - fetch_date = datetime.datetime.now(tz=datetime.timezone.utc) + visit_date = datetime.datetime.now(tz=datetime.timezone.utc) - print(loader.load(origin_url, directory, fetch_date)) + print(loader.load(origin_url, directory, visit_date)) diff --git a/swh/loader/git/tests/test_converters.py b/swh/loader/git/tests/test_converters.py index 622c10f..f64aaa3 100644 --- a/swh/loader/git/tests/test_converters.py +++ b/swh/loader/git/tests/test_converters.py @@ -1,179 +1,317 @@ # Copyright (C) 2015-2017 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import os import shutil import subprocess import tempfile import unittest from nose.tools import istest from nose.plugins.attrib import attr import dulwich.repo import swh.loader.git.converters as converters from swh.model.hashutil import bytehex_to_hash, hash_to_bytes +class SWHTargetType: + """Dulwich lookalike TargetType class + + """ + def __init__(self, type_name): + self.type_name = type_name + + +class SWHTag: + """Dulwich lookalike tag class + + """ + def __init__(self, name, type_name, target, target_type, tagger, tag_time, + tag_timezone, message): + self.name = name + self.type_name = type_name + self.object = SWHTargetType(target_type), target + self.tagger = tagger + self._message = message + self.tag_time = tag_time + self.tag_timezone = tag_timezone + self._tag_timezone_neg_utc = False + + def sha(self): + from hashlib import sha1 + return sha1() + + @attr('fs') class TestConverters(unittest.TestCase): @classmethod def setUpClass(cls): super().setUpClass() cls.repo_path = tempfile.mkdtemp() cls.repo = dulwich.repo.Repo.init_bare(cls.repo_path) fast_export = os.path.join(os.path.dirname(__file__), '../../../../..', 'swh-storage-testdata', 'git-repos', 'example-submodule.fast-export.xz') xz = subprocess.Popen( ['xzcat'], stdin=open(fast_export, 'rb'), stdout=subprocess.PIPE, ) git = subprocess.Popen( ['git', 'fast-import', '--quiet'], stdin=xz.stdout, cwd=cls.repo_path, ) # flush stdout of xz xz.stdout.close() git.communicate() @classmethod def tearDownClass(cls): super().tearDownClass() shutil.rmtree(cls.repo_path) def setUp(self): super().setUp() self.blob_id = b'28c6f4023d65f74e3b59a2dea3c4277ed9ee07b0' self.blob = { 'sha1_git': bytehex_to_hash(self.blob_id), 'sha1': hash_to_bytes('4850a3420a2262ff061cb296fb915430fa92301c'), 'sha256': hash_to_bytes('fee7c8a485a10321ad94b64135073cb5' '5f22cb9f57fa2417d2adfb09d310adef'), 'blake2s256': hash_to_bytes('5d71873f42a137f6d89286e43677721e574' '1fa05ce4cd5e3c7ea7c44d4c2d10b'), 'data': (b'[submodule "example-dependency"]\n' b'\tpath = example-dependency\n' b'\turl = https://github.com/githubtraining/' b'example-dependency.git\n'), 'length': 124, 'status': 'visible', } self.blob_hidden = { 'sha1_git': bytehex_to_hash(self.blob_id), 'sha1': hash_to_bytes('4850a3420a2262ff061cb296fb915430fa92301c'), 'sha256': hash_to_bytes('fee7c8a485a10321ad94b64135073cb5' '5f22cb9f57fa2417d2adfb09d310adef'), 'blake2s256': hash_to_bytes('5d71873f42a137f6d89286e43677721e574' '1fa05ce4cd5e3c7ea7c44d4c2d10b'), 'length': 124, 'status': 'absent', 'reason': 'Content too large', 'origin': None, } @istest def blob_to_content(self): content = converters.dulwich_blob_to_content(self.repo[self.blob_id]) self.assertEqual(self.blob, content) @istest def blob_to_content_absent(self): max_length = self.blob['length'] - 1 content = converters.dulwich_blob_to_content( self.repo[self.blob_id], max_content_size=max_length) self.assertEqual(self.blob_hidden, content) @istest def commit_to_revision(self): sha1 = b'9768d0b576dbaaecd80abedad6dfd0d72f1476da' revision = converters.dulwich_commit_to_revision(self.repo[sha1]) expected_revision = { 'id': hash_to_bytes('9768d0b576dbaaecd80abedad6dfd0d72f1476da'), 'directory': b'\xf0i\\./\xa7\xce\x9dW@#\xc3A7a\xa4s\xe5\x00\xca', 'type': 'git', 'committer': { 'name': b'Stefano Zacchiroli', 'fullname': b'Stefano Zacchiroli ', 'email': b'zack@upsilon.cc', }, 'author': { 'name': b'Stefano Zacchiroli', 'fullname': b'Stefano Zacchiroli ', 'email': b'zack@upsilon.cc', }, 'committer_date': { 'negative_utc': None, 'timestamp': 1443083765, 'offset': 120, }, 'message': b'add submodule dependency\n', 'metadata': None, 'date': { 'negative_utc': None, 'timestamp': 1443083765, 'offset': 120, }, 'parents': [ b'\xc3\xc5\x88q23`\x9f[\xbb\xb2\xd9\xe7\xf3\xfbJf\x0f?r' ], 'synthetic': False, } self.assertEquals(revision, expected_revision) @istest def author_line_to_author(self): tests = { b'a ': { 'name': b'a', 'email': b'b@c.com', 'fullname': b'a ', }, b'': { 'name': None, 'email': b'foo@bar.com', 'fullname': b'', }, b'malformed ': { 'name': b'trailing', 'email': b'sp@c.e', 'fullname': b'trailing ', }, b'no': { 'name': b'no', 'email': b'sp@c.e', 'fullname': b'no', }, b' <>': { 'name': b'', 'email': b'', 'fullname': b' <>', }, } for author in sorted(tests): parsed_author = tests[author] self.assertEquals(parsed_author, converters.parse_author(author)) + + @istest + def dulwich_tag_to_release_no_author_no_date(self): + target = b'641fb6e08ddb2e4fd096dcf18e80b894bf' + message = b'some release message' + tag = SWHTag(name='blah', + type_name=b'tag', + target=target, + target_type=b'commit', + message=message, + tagger=None, + tag_time=None, tag_timezone=None) + + # when + actual_release = converters.dulwich_tag_to_release(tag) + + # then + expected_release = { + 'author': None, + 'date': None, + 'id': b'\xda9\xa3\xee^kK\r2U\xbf\xef\x95`\x18\x90\xaf\xd8\x07\t', + 'message': message, + 'metadata': None, + 'name': 'blah', + 'synthetic': False, + 'target': hash_to_bytes(target.decode()), + 'target_type': 'revision' + } + + self.assertEquals(actual_release, expected_release) + + @istest + def dulwich_tag_to_release_author_and_date(self): + tagger = b'hey dude ' + target = b'641fb6e08ddb2e4fd096dcf18e80b894bf' + message = b'some release message' + + import datetime + date = datetime.datetime(2007, 12, 5).timestamp() + + tag = SWHTag(name='blah', + type_name=b'tag', + target=target, + target_type=b'commit', + message=message, + tagger=tagger, + tag_time=date, + tag_timezone=0) + + # when + actual_release = converters.dulwich_tag_to_release(tag) + + # then + expected_release = { + 'author': { + 'email': b'hello@mail.org', + 'fullname': b'hey dude ', + 'name': b'hey dude' + }, + 'date': { + 'negative_utc': False, + 'offset': 0, + 'timestamp': 1196809200.0 + }, + 'id': b'\xda9\xa3\xee^kK\r2U\xbf\xef\x95`\x18\x90\xaf\xd8\x07\t', + 'message': message, + 'metadata': None, + 'name': 'blah', + 'synthetic': False, + 'target': hash_to_bytes(target.decode()), + 'target_type': 'revision' + } + + self.assertEquals(actual_release, expected_release) + + @istest + def dulwich_tag_to_release_author_no_date(self): + # to reproduce bug T815 (fixed) + tagger = b'hey dude ' + target = b'641fb6e08ddb2e4fd096dcf18e80b894bf' + message = b'some release message' + tag = SWHTag(name='blah', + type_name=b'tag', + target=target, + target_type=b'commit', + message=message, + tagger=tagger, + tag_time=None, tag_timezone=None) + + # when + actual_release = converters.dulwich_tag_to_release(tag) + + # then + expected_release = { + 'author': { + 'email': b'hello@mail.org', + 'fullname': b'hey dude ', + 'name': b'hey dude' + }, + 'date': None, + 'id': b'\xda9\xa3\xee^kK\r2U\xbf\xef\x95`\x18\x90\xaf\xd8\x07\t', + 'message': message, + 'metadata': None, + 'name': 'blah', + 'synthetic': False, + 'target': hash_to_bytes(target.decode()), + 'target_type': 'revision' + } + + self.assertEquals(actual_release, expected_release) diff --git a/swh/loader/git/tests/test_utils.py b/swh/loader/git/tests/test_utils.py new file mode 100644 index 0000000..b288d2b --- /dev/null +++ b/swh/loader/git/tests/test_utils.py @@ -0,0 +1,35 @@ +# Copyright (C) 2015-2017 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +import unittest + +from nose.tools import istest + +from swh.loader.git import utils + + +class TestUtils(unittest.TestCase): + @istest + def check_date_time(self): + """A long as datetime is fine, date time check does not raise + + """ + for e in range(32, 37): + ts = 2**e + utils.check_date_time(ts) + + @istest + def check_date_time_empty_value(self): + self.assertIsNone(utils.check_date_time(None)) + + @istest + def check_date_time_raises(self): + """From a give threshold, check will no longer works. + + """ + exp = 38 + timestamp = 2**exp + with self.assertRaisesRegex(ValueError, 'year is out of range'): + utils.check_date_time(timestamp) diff --git a/swh/loader/git/updater.py b/swh/loader/git/updater.py index 73b3590..bb02b54 100644 --- a/swh/loader/git/updater.py +++ b/swh/loader/git/updater.py @@ -1,480 +1,480 @@ # Copyright (C) 2016-2017 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from io import BytesIO import datetime import logging import os import pickle import sys from collections import defaultdict import dulwich.client from dulwich.object_store import ObjectStoreGraphWalker from dulwich.pack import PackData, PackInflater from urllib.parse import urlparse from swh.model import hashutil from . import base, converters class SWHRepoRepresentation: """Repository representation for a Software Heritage origin.""" def __init__(self, storage, origin_id, occurrences=None): self.storage = storage self._parents_cache = {} self._type_cache = {} if origin_id: self.heads = set(self._cache_heads(origin_id, occurrences)) else: self.heads = set() def _fill_parents_cache(self, commits): """When querying for a commit's parents, we fill the cache to a depth of 1000 commits.""" root_revs = self._encode_for_storage(commits) for rev, parents in self.storage.revision_shortlog(root_revs, 1000): rev_id = hashutil.hash_to_bytehex(rev) if rev_id not in self._parents_cache: self._parents_cache[rev_id] = [ hashutil.hash_to_bytehex(parent) for parent in parents ] for rev in commits: if rev not in self._parents_cache: self._parents_cache[rev] = [] def _cache_heads(self, origin_id, occurrences): """Return all the known head commits for `origin_id`""" if not occurrences: occurrences = self.storage.occurrence_get(origin_id) return self._decode_from_storage( occurrence['target'] for occurrence in occurrences ) def get_parents(self, commit): """get the parent commits for `commit`""" # Prime the parents cache if not self._parents_cache and self.heads: self._fill_parents_cache(self.heads) if commit not in self._parents_cache: self._fill_parents_cache([commit]) return self._parents_cache[commit] def get_heads(self): return self.heads @staticmethod def _encode_for_storage(objects): return [hashutil.bytehex_to_hash(object) for object in objects] @staticmethod def _decode_from_storage(objects): return set(hashutil.hash_to_bytehex(object) for object in objects) def graph_walker(self): return ObjectStoreGraphWalker(self.get_heads(), self.get_parents) @staticmethod def filter_unwanted_refs(refs): """Filter the unwanted references from refs""" ret = {} for ref, val in refs.items(): if ref.endswith(b'^{}'): # Peeled refs make the git protocol explode continue elif ref.startswith(b'refs/pull/') and ref.endswith(b'/merge'): # We filter-out auto-merged GitHub pull requests continue else: ret[ref] = val return ret def determine_wants(self, refs): """Filter the remote references to figure out which ones Software Heritage needs. """ if not refs: return [] # Find what objects Software Heritage has refs = self.find_remote_ref_types_in_swh(refs) # Cache the objects found in swh as existing heads for target in refs.values(): if target['target_type'] is not None: self.heads.add(target['target']) ret = set() for target in self.filter_unwanted_refs(refs).values(): if target['target_type'] is None: # The target doesn't exist in Software Heritage, let's retrieve # it. ret.add(target['target']) return list(ret) def get_stored_objects(self, objects): return self.storage.object_find_by_sha1_git( self._encode_for_storage(objects)) def find_remote_ref_types_in_swh(self, remote_refs): """Parse the remote refs information and list the objects that exist in Software Heritage. """ all_objs = set(remote_refs.values()) - set(self._type_cache) type_by_id = {} for id, objs in self.get_stored_objects(all_objs).items(): id = hashutil.hash_to_bytehex(id) if objs: type_by_id[id] = objs[0]['type'] self._type_cache.update(type_by_id) ret = {} for ref, id in remote_refs.items(): ret[ref] = { 'target': id, 'target_type': self._type_cache.get(id), } return ret class BulkUpdater(base.BaseLoader): """A bulk loader for a git repository""" CONFIG_BASE_FILENAME = 'loader/git-updater' ADDITIONAL_CONFIG = { 'pack_size_bytes': ('int', 4 * 1024 * 1024 * 1024), } def __init__(self, repo_representation=SWHRepoRepresentation): """Initialize the bulk updater. Args: repo_representation: swh's repository representation which is in charge of filtering between known and remote data. """ super().__init__() self.repo_representation = repo_representation def fetch_pack_from_origin(self, origin_url, base_origin_id, base_occurrences, do_activity): """Fetch a pack from the origin""" pack_buffer = BytesIO() base_repo = self.repo_representation(self.storage, base_origin_id, base_occurrences) parsed_uri = urlparse(origin_url) path = parsed_uri.path if not path.endswith('.git'): path += '.git' client = dulwich.client.TCPGitClient(parsed_uri.netloc, thin_packs=False) size_limit = self.config['pack_size_bytes'] def do_pack(data, pack_buffer=pack_buffer, limit=size_limit, origin_url=origin_url): cur_size = pack_buffer.tell() would_write = len(data) if cur_size + would_write > limit: raise IOError('Pack file too big for repository %s, ' 'limit is %d bytes, current size is %d, ' 'would write %d' % (origin_url, limit, cur_size, would_write)) pack_buffer.write(data) remote_refs = client.fetch_pack(path.encode('ascii'), base_repo.determine_wants, base_repo.graph_walker(), do_pack, progress=do_activity) if remote_refs: local_refs = base_repo.find_remote_ref_types_in_swh(remote_refs) else: local_refs = remote_refs = {} pack_buffer.flush() pack_size = pack_buffer.tell() pack_buffer.seek(0) return { 'remote_refs': base_repo.filter_unwanted_refs(remote_refs), 'local_refs': local_refs, 'pack_buffer': pack_buffer, 'pack_size': pack_size, } def list_pack(self, pack_data, pack_size): id_to_type = {} type_to_ids = defaultdict(set) inflater = self.get_inflater() for obj in inflater: type, id = obj.type_name, obj.id id_to_type[id] = type type_to_ids[type].add(id) return id_to_type, type_to_ids def prepare(self, origin_url, base_url=None): - self.fetch_date = datetime.datetime.now(tz=datetime.timezone.utc) + self.visit_date = datetime.datetime.now(tz=datetime.timezone.utc) origin = converters.origin_url_to_origin(origin_url) base_origin = converters.origin_url_to_origin(base_url) base_occurrences = [] base_origin_id = origin_id = None db_origin = self.storage.origin_get(origin) if db_origin: base_origin_id = origin_id = db_origin['id'] if origin_id: base_occurrences = self.storage.occurrence_get(origin_id) if base_url and not base_occurrences: base_origin = self.storage.origin_get(base_origin) if base_origin: base_origin_id = base_origin['id'] base_occurrences = self.storage.occurrence_get(base_origin_id) self.base_occurrences = list(sorted(base_occurrences, key=lambda occ: occ['branch'])) self.base_origin_id = base_origin_id self.origin = origin def get_origin(self): return self.origin def fetch_data(self): def do_progress(msg): sys.stderr.buffer.write(msg) sys.stderr.flush() fetch_info = self.fetch_pack_from_origin( self.origin['url'], self.base_origin_id, self.base_occurrences, do_progress) self.pack_buffer = fetch_info['pack_buffer'] self.pack_size = fetch_info['pack_size'] self.remote_refs = fetch_info['remote_refs'] self.local_refs = fetch_info['local_refs'] origin_url = self.origin['url'] self.log.info('Listed %d refs for repo %s' % ( len(self.remote_refs), origin_url), extra={ 'swh_type': 'git_repo_list_refs', 'swh_repo': origin_url, 'swh_num_refs': len(self.remote_refs), }) # We want to load the repository, walk all the objects id_to_type, type_to_ids = self.list_pack(self.pack_buffer, self.pack_size) self.id_to_type = id_to_type self.type_to_ids = type_to_ids def save_data(self): """Store a pack for archival""" write_size = 8192 pack_dir = self.get_save_data_path() - pack_name = "%s.pack" % self.fetch_date.isoformat() - refs_name = "%s.refs" % self.fetch_date.isoformat() + pack_name = "%s.pack" % self.visit_date.isoformat() + refs_name = "%s.refs" % self.visit_date.isoformat() with open(os.path.join(pack_dir, pack_name), 'xb') as f: while True: r = self.pack_buffer.read(write_size) if not r: break f.write(r) self.pack_buffer.seek(0) with open(os.path.join(pack_dir, refs_name), 'xb') as f: pickle.dump(self.remote_refs, f) def get_inflater(self): """Reset the pack buffer and get an object inflater from it""" self.pack_buffer.seek(0) return PackInflater.for_pack_data( PackData.from_file(self.pack_buffer, self.pack_size)) def has_contents(self): return bool(self.type_to_ids[b'blob']) def get_content_ids(self): """Get the content identifiers from the git repository""" for raw_obj in self.get_inflater(): if raw_obj.type_name != b'blob': continue yield converters.dulwich_blob_to_content_id(raw_obj) def get_contents(self): """Format the blobs from the git repository as swh contents""" max_content_size = self.config['content_size_limit'] missing_contents = set(self.storage.content_missing( self.get_content_ids(), 'sha1_git')) for raw_obj in self.get_inflater(): if raw_obj.type_name != b'blob': continue if raw_obj.sha().digest() not in missing_contents: continue yield converters.dulwich_blob_to_content( raw_obj, log=self.log, max_content_size=max_content_size, origin_id=self.origin_id) def has_directories(self): return bool(self.type_to_ids[b'tree']) def get_directory_ids(self): """Get the directory identifiers from the git repository""" return (hashutil.hash_to_bytes(id.decode()) for id in self.type_to_ids[b'tree']) def get_directories(self): """Format the trees as swh directories""" missing_dirs = set(self.storage.directory_missing( sorted(self.get_directory_ids()))) for raw_obj in self.get_inflater(): if raw_obj.type_name != b'tree': continue if raw_obj.sha().digest() not in missing_dirs: continue yield converters.dulwich_tree_to_directory(raw_obj, log=self.log) def has_revisions(self): return bool(self.type_to_ids[b'commit']) def get_revision_ids(self): """Get the revision identifiers from the git repository""" return (hashutil.hash_to_bytes(id.decode()) for id in self.type_to_ids[b'commit']) def get_revisions(self): """Format commits as swh revisions""" missing_revs = set(self.storage.revision_missing( sorted(self.get_revision_ids()))) for raw_obj in self.get_inflater(): if raw_obj.type_name != b'commit': continue if raw_obj.sha().digest() not in missing_revs: continue yield converters.dulwich_commit_to_revision(raw_obj, log=self.log) def has_releases(self): return bool(self.type_to_ids[b'tag']) def get_release_ids(self): """Get the release identifiers from the git repository""" return (hashutil.hash_to_bytes(id.decode()) for id in self.type_to_ids[b'tag']) def get_releases(self): """Retrieve all the release objects from the git repository""" missing_rels = set(self.storage.release_missing( sorted(self.get_release_ids()))) for raw_obj in self.get_inflater(): if raw_obj.type_name != b'tag': continue if raw_obj.sha().digest() not in missing_rels: continue yield converters.dulwich_tag_to_release(raw_obj, log=self.log) def has_occurrences(self): return bool(self.remote_refs) def get_occurrences(self): origin_id = self.origin_id visit = self.visit ret = [] for ref in self.remote_refs: ret_ref = self.local_refs[ref].copy() ret_ref.update({ 'branch': ref, 'origin': origin_id, 'visit': visit, }) if not ret_ref['target_type']: target_type = self.id_to_type[ret_ref['target']] ret_ref['target_type'] = converters.DULWICH_TYPES[target_type] ret_ref['target'] = hashutil.bytehex_to_hash(ret_ref['target']) ret.append(ret_ref) return ret def get_fetch_history_result(self): return { 'contents': len(self.type_to_ids[b'blob']), 'directories': len(self.type_to_ids[b'tree']), 'revisions': len(self.type_to_ids[b'commit']), 'releases': len(self.type_to_ids[b'tag']), 'occurrences': len(self.remote_refs), } def eventful(self): """The load was eventful if the current occurrences are different to the ones we retrieved at the beginning of the run""" current_occurrences = list(sorted( self.storage.occurrence_get(self.origin_id), key=lambda occ: occ['branch'], )) return self.base_occurrences != current_occurrences if __name__ == '__main__': logging.basicConfig( level=logging.DEBUG, format='%(asctime)s %(process)d %(message)s' ) bulkupdater = BulkUpdater() origin_url = sys.argv[1] base_url = origin_url if len(sys.argv) > 2: base_url = sys.argv[2] print(bulkupdater.load(origin_url, base_url)) diff --git a/swh/loader/git/utils.py b/swh/loader/git/utils.py index e5b2825..3b46f68 100644 --- a/swh/loader/git/utils.py +++ b/swh/loader/git/utils.py @@ -1,49 +1,68 @@ # Copyright (C) 2017 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information +"""Utilities helper functions""" + +import datetime import os import shutil import tempfile from subprocess import call def init_git_repo_from_archive(project_name, archive_path, root_temp_dir='/tmp'): """Given a path to an archive containing a git repository. Uncompress that archive to a temporary location and returns the path. If any problem whatsoever is raised, clean up the temporary location. Args: project_name (str): Project's name archive_path (str): Full path to the archive root_temp_dir (str): Optional temporary directory mount point (default to /tmp) Returns A tuple: - temporary folder: containing the mounted repository - repo_path, path to the mounted repository inside the temporary folder Raises ValueError in case of failure to run the command to uncompress """ temp_dir = tempfile.mkdtemp( suffix='.swh.loader.git', prefix='tmp.', dir=root_temp_dir) try: # create the repository that will be loaded with the dump r = call(['unzip', '-q', '-o', archive_path, '-d', temp_dir]) if r != 0: raise ValueError('Failed to uncompress archive %s' % archive_path) repo_path = os.path.join(temp_dir, project_name) return temp_dir, repo_path except Exception as e: shutil.rmtree(temp_dir) raise e + + +def check_date_time(timestamp): + """Check date time for overflow errors. + + Args: + timestamp (timestamp): Timestamp in seconds + + Raise: + Any error raised by datetime fromtimestamp conversion error. + + """ + if not timestamp: + return None + datetime.datetime.fromtimestamp(timestamp, + datetime.timezone.utc) diff --git a/version.txt b/version.txt index db20099..1e88f9d 100644 --- a/version.txt +++ b/version.txt @@ -1 +1 @@ -v0.0.34-0-g6fa9c7a \ No newline at end of file +v0.0.35-0-gf8d8d8f \ No newline at end of file