diff --git a/swh/foo/bar.py b/swh/foo/bar.py deleted file mode 100644 index 8dd21f7..0000000 --- a/swh/foo/bar.py +++ /dev/null @@ -1,4 +0,0 @@ -# Copyright (C) 2015-2016 The Software Heritage developers -# See the AUTHORS file at the top-level directory of this distribution -# License: GNU General Public License version 3, or any later version -# See top-level LICENSE file for more information diff --git a/swh/loader/mercurial/archive_extract.py b/swh/loader/mercurial/archive_extract.py new file mode 100644 index 0000000..caa73b4 --- /dev/null +++ b/swh/loader/mercurial/archive_extract.py @@ -0,0 +1,45 @@ +# Copyright (C) 2017 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +import os +import tempfile + +import patoolib + + +def tmp_extract(self, archive, tmpdir_prefix=None, log=None, source=None): + """Extract an archive to a temporary location with optional logs. + + Args: + archive (string): Absolute path of the archive to be extracted + tmpdir_prefix (string): Optional modifier to the temporary storage + directory name. (I guess in case something + goes wrong and you want to go look?) + log (python logging instance): Optional for recording extractions. + source (string): Optional source URL of the archive for adding to + log messages. + Returns: + A context manager for a temporary directory that automatically + removes itself. See: help(tempfile.TemporaryDirectory) + """ + archive_base = os.path.basename(archive) + if archive_base[0] == '.': + package = '.' + archive_base.split('.')[1] + else: + package = archive_base.split('.')[0] + + tmpdir = tempfile.TemporaryDirectory(prefix=tmpdir_prefix) + patoolib.extract_archive(archive, interactive=False, outdir=tmpdir) + + repo_path = os.path.join(tmpdir, package) + + if log is not None: + logstr = '' + if source is not None: + logstr = 'From %s - ' % source + log.info(logstr + 'Uncompressing archive %s at %s' % ( + archive_base, repo_path)) + + return tmpdir diff --git a/swh/loader/mercurial/base.py b/swh/loader/mercurial/base.py new file mode 100644 index 0000000..15bc9c1 --- /dev/null +++ b/swh/loader/mercurial/base.py @@ -0,0 +1,457 @@ +# Copyright (C) 2016-2017 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +# TODO: This document mirrors swh-loader-git/swh/loader/git/base.py exactly +# (minus the logger names and this comment) +# Please merge the files at a shared level at next opportunity. + +import datetime +import logging +import os +import traceback +import uuid + +import psycopg2 +import requests +from retrying import retry + +from swh.core import config +from swh.storage import get_storage + + +def send_in_packets(objects, sender, packet_size, packet_size_bytes=None): + """Send `objects`, using the `sender`, in packets of `packet_size` objects (and + of max `packet_size_bytes`). + """ + formatted_objects = [] + count = 0 + if not packet_size_bytes: + packet_size_bytes = 0 + for obj in objects: + if not obj: + continue + formatted_objects.append(obj) + if packet_size_bytes: + count += obj['length'] + if len(formatted_objects) >= packet_size or count > packet_size_bytes: + sender(formatted_objects) + formatted_objects = [] + count = 0 + + if formatted_objects: + sender(formatted_objects) + + +def retry_loading(error): + """Retry policy when we catch a recoverable error""" + exception_classes = [ + # raised when two parallel insertions insert the same data. + psycopg2.IntegrityError, + # raised when uWSGI restarts and hungs up on the worker. + requests.exceptions.ConnectionError, + ] + + if not any(isinstance(error, exc) for exc in exception_classes): + return False + + logger = logging.getLogger('swh.loader.hg.BulkLoader') + + error_name = error.__module__ + '.' + error.__class__.__name__ + logger.warning('Retry loading a batch', exc_info=False, extra={ + 'swh_type': 'storage_retry', + 'swh_exception_type': error_name, + 'swh_exception': traceback.format_exception( + error.__class__, + error, + error.__traceback__, + ), + }) + + return True + + +class BaseLoader(config.SWHConfig): + """This base class is a pattern for loaders. + + The external calling convention is as such: + - instantiate the class once (loads storage and the configuration) + - for each origin, call load with the origin-specific arguments (for + instance, an origin URL). + + load calls several methods that must be implemented in subclasses: + + - prepare(*args, **kwargs) prepares the loader for the new origin + - get_origin gets the origin object associated to the current loader + - fetch_data downloads the necessary data from the origin + - get_{contents,directories,revisions,releases,occurrences} retrieve each + kind of object from the origin + - has_* checks whether there are some objects to load for that object type + - get_fetch_history_result retrieves the data to insert in the + fetch_history table once the load was successful + - cleanup cleans up an eventual state installed for computations + - eventful returns whether the load was eventful or not + """ + + CONFIG_BASE_FILENAME = None + + DEFAULT_CONFIG = { + 'storage': ('dict', { + 'cls': 'remote', + 'args': { + 'url': 'http://localhost:5002/' + }, + }), + 'send_contents': ('bool', True), + 'send_directories': ('bool', True), + 'send_revisions': ('bool', True), + 'send_releases': ('bool', True), + 'send_occurrences': ('bool', True), + + 'save_data': ('bool', False), + 'save_data_path': ('str', ''), + + 'content_packet_size': ('int', 10000), + 'content_packet_size_bytes': ('int', 1024 * 1024 * 1024), + 'directory_packet_size': ('int', 25000), + 'revision_packet_size': ('int', 100000), + 'release_packet_size': ('int', 100000), + 'occurrence_packet_size': ('int', 100000), + } + + ADDITIONAL_CONFIG = {} + + def __init__(self): + self.config = self.parse_config_file( + additional_configs=[self.ADDITIONAL_CONFIG]) + + # Make sure the config is sane + if self.config['save_data']: + path = self.config['save_data_path'] + os.stat(path) + if not os.access(path, os.R_OK | os.W_OK): + raise PermissionError("Permission denied: %r" % path) + + self.storage = get_storage(**self.config['storage']) + + self.log = logging.getLogger('swh.loader.hg.BulkLoader') + self.fetch_date = None # possibly overridden in self.prepare method + + def prepare(self, *args, **kwargs): + """Prepare the data source to be loaded""" + raise NotImplementedError + + def cleanup(self): + """Clean up an eventual state installed for computations.""" + pass + + def get_origin(self): + """Get the origin that is currently being loaded""" + raise NotImplementedError + + def fetch_data(self): + """Fetch the data from the data source""" + raise NotImplementedError + + def has_contents(self): + """Checks whether we need to load contents""" + return True + + def get_contents(self): + """Get the contents that need to be loaded""" + raise NotImplementedError + + def has_directories(self): + """Checks whether we need to load directories""" + return True + + def get_directories(self): + """Get the directories that need to be loaded""" + raise NotImplementedError + + def has_revisions(self): + """Checks whether we need to load revisions""" + return True + + def get_revisions(self): + """Get the revisions that need to be loaded""" + raise NotImplementedError + + def has_releases(self): + """Checks whether we need to load releases""" + return True + + def get_releases(self): + """Get the releases that need to be loaded""" + raise NotImplementedError + + def has_occurrences(self): + """Checks whether we need to load occurrences""" + return True + + def get_occurrences(self): + """Get the occurrences that need to be loaded""" + raise NotImplementedError + + def get_fetch_history_result(self): + """Return the data to store in fetch_history for the current loader""" + raise NotImplementedError + + def eventful(self): + """Whether the load was eventful""" + raise NotImplementedError + + def save_data(self): + """Save the data associated to the current load""" + raise NotImplementedError + + def get_save_data_path(self): + """The path to which we save the data""" + if not hasattr(self, '__save_data_path'): + origin_id = self.origin_id + year = str(self.fetch_date.year) + + path = os.path.join( + self.config['save_data_path'], + "%04d" % (origin_id % 10000), + "%08d" % origin_id, + year, + ) + + os.makedirs(path, exist_ok=True) + self.__save_data_path = path + + return self.__save_data_path + + @retry(retry_on_exception=retry_loading, stop_max_attempt_number=3) + def send_contents(self, content_list): + """Actually send properly formatted contents to the database""" + num_contents = len(content_list) + log_id = str(uuid.uuid4()) + self.log.debug("Sending %d contents" % num_contents, + extra={ + 'swh_type': 'storage_send_start', + 'swh_content_type': 'content', + 'swh_num': num_contents, + 'swh_id': log_id, + }) + self.storage.content_add(content_list) + self.log.debug("Done sending %d contents" % num_contents, + extra={ + 'swh_type': 'storage_send_end', + 'swh_content_type': 'content', + 'swh_num': num_contents, + 'swh_id': log_id, + }) + + @retry(retry_on_exception=retry_loading, stop_max_attempt_number=3) + def send_directories(self, directory_list): + """Actually send properly formatted directories to the database""" + num_directories = len(directory_list) + log_id = str(uuid.uuid4()) + self.log.debug("Sending %d directories" % num_directories, + extra={ + 'swh_type': 'storage_send_start', + 'swh_content_type': 'directory', + 'swh_num': num_directories, + 'swh_id': log_id, + }) + self.storage.directory_add(directory_list) + self.log.debug("Done sending %d directories" % num_directories, + extra={ + 'swh_type': 'storage_send_end', + 'swh_content_type': 'directory', + 'swh_num': num_directories, + 'swh_id': log_id, + }) + + @retry(retry_on_exception=retry_loading, stop_max_attempt_number=3) + def send_revisions(self, revision_list): + """Actually send properly formatted revisions to the database""" + num_revisions = len(revision_list) + log_id = str(uuid.uuid4()) + self.log.debug("Sending %d revisions" % num_revisions, + extra={ + 'swh_type': 'storage_send_start', + 'swh_content_type': 'revision', + 'swh_num': num_revisions, + 'swh_id': log_id, + }) + self.storage.revision_add(revision_list) + self.log.debug("Done sending %d revisions" % num_revisions, + extra={ + 'swh_type': 'storage_send_end', + 'swh_content_type': 'revision', + 'swh_num': num_revisions, + 'swh_id': log_id, + }) + + @retry(retry_on_exception=retry_loading, stop_max_attempt_number=3) + def send_releases(self, release_list): + """Actually send properly formatted releases to the database""" + num_releases = len(release_list) + log_id = str(uuid.uuid4()) + self.log.debug("Sending %d releases" % num_releases, + extra={ + 'swh_type': 'storage_send_start', + 'swh_content_type': 'release', + 'swh_num': num_releases, + 'swh_id': log_id, + }) + self.storage.release_add(release_list) + self.log.debug("Done sending %d releases" % num_releases, + extra={ + 'swh_type': 'storage_send_end', + 'swh_content_type': 'release', + 'swh_num': num_releases, + 'swh_id': log_id, + }) + + @retry(retry_on_exception=retry_loading, stop_max_attempt_number=3) + def send_occurrences(self, occurrence_list): + """Actually send properly formatted occurrences to the database""" + num_occurrences = len(occurrence_list) + log_id = str(uuid.uuid4()) + self.log.debug("Sending %d occurrences" % num_occurrences, + extra={ + 'swh_type': 'storage_send_start', + 'swh_content_type': 'occurrence', + 'swh_num': num_occurrences, + 'swh_id': log_id, + }) + self.storage.occurrence_add(occurrence_list) + self.log.debug("Done sending %d occurrences" % num_occurrences, + extra={ + 'swh_type': 'storage_send_end', + 'swh_content_type': 'occurrence', + 'swh_num': num_occurrences, + 'swh_id': log_id, + }) + + def send_origin(self, origin): + log_id = str(uuid.uuid4()) + self.log.debug('Creating %s origin for %s' % (origin['type'], + origin['url']), + extra={ + 'swh_type': 'storage_send_start', + 'swh_content_type': 'origin', + 'swh_num': 1, + 'swh_id': log_id + }) + origin_id = self.storage.origin_add_one(origin) + self.log.debug('Done creating %s origin for %s' % (origin['type'], + origin['url']), + extra={ + 'swh_type': 'storage_send_end', + 'swh_content_type': 'origin', + 'swh_num': 1, + 'swh_id': log_id + }) + + return origin_id + + def send_all_contents(self, contents): + """Send all the contents to the database""" + packet_size = self.config['content_packet_size'] + packet_size_bytes = self.config['content_packet_size_bytes'] + + send_in_packets(contents, self.send_contents, packet_size, + packet_size_bytes=packet_size_bytes) + + def send_all_directories(self, directories): + """Send all the directories to the database""" + packet_size = self.config['directory_packet_size'] + + send_in_packets(directories, self.send_directories, packet_size) + + def send_all_revisions(self, revisions): + """Send all the revisions to the database""" + packet_size = self.config['revision_packet_size'] + + send_in_packets(revisions, self.send_revisions, packet_size) + + def send_all_releases(self, releases): + """Send all the releases to the database + """ + packet_size = self.config['release_packet_size'] + + send_in_packets(releases, self.send_releases, packet_size) + + def send_all_occurrences(self, occurrences): + """Send all the occurrences to the database + """ + packet_size = self.config['occurrence_packet_size'] + + send_in_packets(occurrences, self.send_occurrences, packet_size) + + def open_fetch_history(self): + return self.storage.fetch_history_start(self.origin_id) + + def close_fetch_history_success(self, fetch_history_id, result): + data = { + 'status': True, + 'result': result, + } + return self.storage.fetch_history_end(fetch_history_id, data) + + def close_fetch_history_failure(self, fetch_history_id): + import traceback + data = { + 'status': False, + 'stderr': traceback.format_exc(), + } + return self.storage.fetch_history_end(fetch_history_id, data) + + def load(self, *args, **kwargs): + + self.prepare(*args, **kwargs) + origin = self.get_origin() + self.origin_id = self.send_origin(origin) + + fetch_history_id = self.open_fetch_history() + if self.fetch_date: # overwriting the visit_date the fetching date + date_visit = self.fetch_date + else: + date_visit = datetime.datetime.now(tz=datetime.timezone.utc) + + origin_visit = self.storage.origin_visit_add( + self.origin_id, + date_visit) + self.visit = origin_visit['visit'] + + try: + self.fetch_data() + + if self.config['save_data']: + self.save_data() + + if self.config['send_contents'] and self.has_contents(): + self.send_all_contents(self.get_contents()) + + if self.config['send_directories'] and self.has_directories(): + self.send_all_directories(self.get_directories()) + + if self.config['send_revisions'] and self.has_revisions(): + self.send_all_revisions(self.get_revisions()) + + if self.config['send_releases'] and self.has_releases(): + self.send_all_releases(self.get_releases()) + + if self.config['send_occurrences'] and self.has_occurrences(): + self.send_all_occurrences(self.get_occurrences()) + + self.close_fetch_history_success(fetch_history_id, + self.get_fetch_history_result()) + self.storage.origin_visit_update( + self.origin_id, self.visit, status='full') + + except: + self.close_fetch_history_failure(fetch_history_id) + self.storage.origin_visit_update( + self.origin_id, self.visit, status='partial') + raise + finally: + self.cleanup() + + return self.eventful() diff --git a/swh/loader/mercurial/converters.py b/swh/loader/mercurial/converters.py new file mode 100644 index 0000000..062fc69 --- /dev/null +++ b/swh/loader/mercurial/converters.py @@ -0,0 +1,90 @@ +# Copyright (C) 2015-2017 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + + +from swh.model import hashutil, identifiers + + +def data_size_too_big(data, id_hash, max_content_size, logger=None, + origin_id=None): + if logger: + size = len(data) + id_hash = hashutil.hash_to_hex(id_hash) + logger.info('Skipping content %s, too large (%s > %s)' % + (id_hash, size, max_content_size), + extra={ + 'swh_type': 'loader_content_skip', + 'swh_id': id_hash, + 'swh_size': size + }) + return { + 'status': 'absent', + 'reason': 'Content too large', + 'origin': origin_id + } + + +def data_to_content_id(data): + size = len(data) + ret = { + 'length': size, + } + ret.update(identifiers.content_identifier({'data': data})) + return ret + + +def blob_to_content_dict(data, ident, max_content_size=None, logger=None, + origin_id=None): + """Convert blob data to a Software Heritage content""" + ret = data_to_content_id(data) + if max_content_size and (len(data) > max_content_size): + ret.update( + data_size_too_big(data, ident, max_content_size, logger=logger, + origin_id=origin_id) + ) + else: + ret.update( + { + 'data': data, + 'status': 'visible' + } + ) + + return ret + + +def parse_author(name_email): + """Parse an author line""" + + if name_email is None: + return None + + try: + open_bracket = name_email.index(b'<') + except ValueError: + name = email = None + else: + raw_name = name_email[:open_bracket] + raw_email = name_email[open_bracket+1:] + + if not raw_name: + name = None + elif raw_name.endswith(b' '): + name = raw_name[:-1] + else: + name = raw_name + + try: + close_bracket = raw_email.index(b'>') + except ValueError: + email = None + else: + email = raw_email[:close_bracket] + + return { + 'name': name, + 'email': email, + 'fullname': name_email, + } diff --git a/swh/loader/mercurial/slow_loader.py b/swh/loader/mercurial/slow_loader.py new file mode 100644 index 0000000..86a7c26 --- /dev/null +++ b/swh/loader/mercurial/slow_loader.py @@ -0,0 +1,416 @@ +# Copyright (C) 2017 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +# WARNING WARNING WARNING WARNING +# hglib is too slow to be super useful. Unfortunately it's also the only +# python3 library for mercurial as of this writing. - Avi + +import datetime +import os + +import hglib + +from archive_extract import tmp_extract +from swh.model import identifiers + +from . import base, converters + +# TODO: What should this be? +# swh-model/identifiers.py:identifier_to_bytes has a restrictive length check +# in it which prevents using blake2 with hashutil.hash_to_hex +ALGO = 'sha1_git' + +OS_PATH_SEP = os.path.sep.encode('utf-8') + + +class SimpleBlob: + """ Stores basic metadata for a blob object. + """ + kind = 'file' + + def __init__(self, file_hash, file_mode): + self.hash = file_hash + if not isinstance(file_mode, int): + self.mode = 0o100000 + int(file_mode, 8) + else: + self.mode = file_mode + + +class SimpleTree(dict): + """ Stores metadata for a nested 'tree'-like object. + """ + kind = 'dir' + mode = 0o040000 + + def add_tree_node_for_path(self, path): + """Deeply nests SimpleTrees according to a directory path and returns + a cursor to the deepest one""" + node = self + for d in path.split(OS_PATH_SEP): + node = node.setdefault(d, SimpleTree()) + return node + + def remove_tree_node_for_path(self, path): + """Deletes a SimpleBlob from inside nested SimpleTrees according to + the given file path""" + first, sep, rest = path.partition(OS_PATH_SEP) + if rest: + self[first].remove_tree_node_for_path(rest) + if not self.get(first): + del self[first] + else: + del self[first] + + def add_blob(self, file_path, file_hash, file_mode): + """Deeply nests a SimpleBlob inside nested SimpleTrees according to + the given file path""" + fdir = os.path.dirname(file_path) + fbase = os.path.basename(file_path) + if fdir: + node = self.add_tree_node_for_path(fdir) + else: + node = self + node[fbase] = SimpleBlob(file_hash, file_mode) + + +class HgLoader(base.BaseLoader): + """Load a mercurial repository from a directory. + """ + + CONFIG_BASE_FILENAME = 'loader/hg-loader' + + def prepare(self, origin_url, directory, fetch_date): + """see base.BaseLoader.prepare""" + self.origin_url = origin_url + self.repo = hglib.open(directory) + self.fetch_date = fetch_date + self.node_to_blob_hash = {} + self.blob_hash_to_file_rev = {} + self.commit_trees = {} + self.unique_trees = {} + self.revisions = {} + + def get_origin(self): + """Get the origin that is currently being loaded in format suitable for + swh.storage""" + return { + 'type': 'hg', + 'url': self.origin_url + } + + def fetch_data(self): + """Fetch the data from the data source""" + pass + + def has_contents(self): + """Checks whether we need to load contents""" + # if we have any revisions, then obviously we have contents. + return self.has_revisions() + + def iter_changelog(self): + """Iterate over the repository log""" + yield from self.repo.log('0:tip', removed=True) + + def get_node_file_if_new(self, f, rev, node_hash): + """Load a blob from disk""" + # Fast if the node hash is already cached. Somehow this shortcuts a + # meaningful but not huge percentage of the loads for a repository. + if node_hash not in self.node_to_blob_hash: + file_path = os.path.join(self.repo.root(), f) + + data = self.repo.cat([file_path], rev) + blob_hash = identifiers.content_identifier( + {'data': data} + )[ALGO] + + self.node_to_blob_hash[node_hash] = blob_hash + + if blob_hash not in self.blob_hash_to_file_rev: + # new blob + self.blob_hash_to_file_rev[blob_hash] = (file_path, rev) + return blob_hash, data + + return self.node_to_blob_hash[node_hash], None + + def get_content_ids(self): + """Get all the contents, but trim away the actual data""" + self.node_to_blob_hash = {} + self.blob_hash_to_file_rev = {} + self.num_contents = 0 + + for li in self.iter_changelog(): + c = self.repo[li] + rev = c.rev() + manifest = c.manifest() + + for f in c.added() + c.modified(): + node_hash = manifest[f] + blob_hash, data = self.get_node_file_if_new(f, rev, node_hash) + if data is not None: # new blob + self.num_contents += 1 + yield converters.data_to_content_id(data) + + def get_contents(self): + """Get the contents that need to be loaded""" + # This method unfortunately loads and hashes the blobs twice. + + max_content_size = self.config['content_size_limit'] + missing_contents = set( + self.storage.content_missing( + self.get_content_ids(), + ALGO + ) + ) + + for oid in missing_contents: + file_path, rev = self.blob_hash_to_file_rev[oid] + data = self.repo.cat([file_path], rev) + yield converters.blob_to_content_dict( + data, max_content_size, self.log, self.origin_id + ) + + def has_directories(self): + """Checks whether we need to load directories""" + # if we have any revs, we must also have dirs + return self.has_revisions() + + def get_directories(self): + """Get the directories that need to be loaded""" + missing_dirs = set(self.storage.directory_missing( + sorted(self.unique_trees.keys()) + )) + + for dir_hash in missing_dirs: + yield self.unique_trees[dir_hash] + + def has_revisions(self): + """Checks whether we need to load revisions""" + self.num_revisions = int(self.repo.tip()[0]) + 1 + return self.num_revisions > 0 + + def update_tree_from_rev(self, tree, rev, only_these_files=None): + """Iterates over changes in a revision and adds corresponding + SimpleBlobs to a SimpleTree""" + if rev >= 0: + manifest = {k[4]: k for k in self.repo.manifest(rev=rev)} + loop_keys = only_these_files or manifest.keys() + for f in loop_keys: + node_hash = manifest[f][0] + file_mode = manifest[f][1] + file_hash, _ = self.get_node_file_if_new(f, rev, node_hash) + tree.add_blob(f, file_hash, file_mode) + + return tree + + def reconstruct_tree(self, directory): + """Converts a flat directory into nested SimpleTrees.""" + # This method exists because the code was already written to use + # SimpleTree before then reducing memory use and converting to the + # canonical format. A refactor using lookups instead of nesting could + # obviate the need. + new_tree = SimpleTree() + for entry in directory['entries']: + tgt = entry['target'] + perms = entry['perms'] + name = entry['name'] + if tgt in self.unique_trees: # subtree + new_tree[name] = self.reconstruct_tree(self.unique_trees[tgt]) + else: # blob + new_tree[name] = SimpleBlob(tgt, perms) + new_tree.hash = directory['id'] + return new_tree + + def collapse_tree(self, tree): + """Converts nested SimpleTrees into multiple flat directories.""" + # This method exists because the code was already written to use + # SimpleTree before then reducing memory use and converting to the + # canonical format. A refactor using lookups instead of nesting could + # obviate the need. + directory = { + 'entries': [ + { + 'name': k, + 'perms': v.mode, + 'type': v.kind, + 'target': (isinstance(v, SimpleBlob) + and v.hash + or self.collapse_tree(v)) + } + for k, v in tree.items() + ] + } + tree.hash = identifiers.directory_identifier(directory) + directory['id'] = tree.hash + self.unique_trees[tree.hash] = directory + return tree.hash + + def get_revision_ids(self): + """Get the revisions that need to be loaded""" + self.unique_trees = {} + commit_tree = None + for li in self.iter_changelog(): + c = self.repo[li[1]] + rev = c.rev() + + # start from the parent state + p1 = c.p1().rev() + if p1 in self.commit_trees: + if p1 != rev-1: + # Most of the time, a revision will inherit from the + # previous one. In those cases we can reuse commit_tree, + # otherwise build a new one here. + parent_tree = self.unique_trees[self.commit_trees[p1]] + commit_tree = self.reconstruct_tree(parent_tree) + else: + commit_tree = self.update_tree_from_rev(SimpleTree(), p1) + + # remove whatever is removed + for f in c.removed(): + commit_tree.remove_tree_node_for_path(f) + + # update whatever is updated + self.update_tree_from_rev(commit_tree, rev, c.added()+c.modified()) + + self.commit_trees[rev] = self.collapse_tree(commit_tree) + + date_dict = identifiers.normalize_timestamp( + int(c.date().timestamp()) + ) + author_dict = converters.parse_author(c.author()) + + revision = { + 'author': author_dict, + 'date': date_dict, + 'committer': author_dict, + 'committer_date': date_dict, + 'type': 'hg', + 'directory': commit_tree.hash, + 'message': c.description(), + 'metadata': { + 'extra_headers': [ + ['phase', c.phase()], + ['rev', rev], + ['hidden', c.hidden()] + ] + }, + 'synthetic': False, + 'parents': [ + self.revisions[p.node()]['id'] for p in c.parents() + if p.rev() >= 0 + ] + } + revision['id'] = identifiers.revision_identifier(revision) + self.revisions[c.node()] = revision + for n, r in self.revisions.items(): + yield {'node': n, 'id': r['id']} + + def get_revisions(self): + """Get the revision identifiers from the repository""" + revs = {r['id']: r['node'] for r in self.get_revision_ids()} + missing_revs = set(self.storage.revision_missing(revs.keys())) + for r in missing_revs: + yield self.revisions[revs[r]] + + def has_releases(self): + """Checks whether we need to load releases""" + self.num_releases = len([t for t in self.repo.tags() if not t[3]]) + return self.num_releases > 0 + + def get_releases(self): + """Get the releases that need to be loaded""" + releases = {} + for t in self.repo.tags(): + islocal = t[3] + name = t[0] + if (name != b'tip' and not islocal): + short_hash = t[2] + target = self.revisions[self.repo[short_hash].node()]['id'] + release = { + 'name': name, + 'target': target, + 'target_type': 'revision', + 'message': None, + 'metadata': None, + 'synthetic': False, + 'author': None, + 'date': None + } + id_hash = identifiers.release_identifier(release) + release['id'] = id_hash + releases[id_hash] = release + + missing_rels = set(self.storage.release_missing( + sorted(releases.keys()) + )) + + yield from (releases[r] for r in missing_rels) + + def has_occurrences(self): + """Checks whether we need to load occurrences""" + self.num_occurrences = len( + self.repo.tags() + self.repo.branches() + self.repo.bookmarks()[0] + ) + return self.num_occurrences > 0 + + def get_occurrences(self): + """Get the occurrences that need to be loaded""" + for t in ( + self.repo.tags() + self.repo.branches() + self.repo.bookmarks()[0] + ): + name = t[0] + short_hash = t[2] + target = self.revisions[self.repo[short_hash].node()]['id'] + yield { + 'branch': name, + 'origin': self.origin_id, + 'target': target, + 'target_type': 'revision', + 'visit': self.visit, + } + + def get_fetch_history_result(self): + """Return the data to store in fetch_history for the current loader""" + return { + 'contents': self.num_contents, + 'directories': len(self.unique_trees), + 'revisions': self.num_revisions, + 'releases': self.num_releases, + 'occurrences': self.num_occurrences, + } + + def save_data(self): + """We already have the data locally, no need to save it""" + pass + + def eventful(self): + """Whether the load was eventful""" + return True + + +class HgLoaderFromArchive(HgLoader): + """Load an HG repository from a compressed archive. + """ + def prepare(self, origin_url, archive_path, fetch_date): + tmpdir = tmp_extract(archive_path, + tmpdir_prefix='swh.loader.hg.', + log=self.log, source=origin_url) + super().prepare(origin_url, tmpdir.name, fetch_date) + + +if __name__ == '__main__': + import logging + import sys + + logging.basicConfig( + level=logging.DEBUG, + format='%(asctime)s %(process)d %(message)s' + ) + loader = HgLoader() + + origin_url = sys.argv[1] + directory = sys.argv[2] + fetch_date = datetime.datetime.now(tz=datetime.timezone.utc) + + print(loader.load(origin_url, directory, fetch_date))