diff --git a/requirements-swh.txt b/requirements-swh.txt index fcbecde..913ac00 100644 --- a/requirements-swh.txt +++ b/requirements-swh.txt @@ -1,3 +1,4 @@ swh.core >= 0.0.36 swh.storage >= 0.0.95 swh.scheduler >= 0.0.19 +swh.loader.core >= 0.0.28 diff --git a/swh/loader/mercurial/base.py b/swh/loader/mercurial/base.py deleted file mode 100644 index a6a3fb3..0000000 --- a/swh/loader/mercurial/base.py +++ /dev/null @@ -1,460 +0,0 @@ -# Copyright (C) 2016-2017 The Software Heritage developers -# See the AUTHORS file at the top-level directory of this distribution -# License: GNU General Public License version 3, or any later version -# See top-level LICENSE file for more information - -# TODO: This document mirrors swh-loader-git/swh/loader/git/base.py exactly -# (minus the logger names and this comment) -# Please merge the files at a shared level at next opportunity. - -import datetime -import logging -import os -import traceback -import uuid - -import psycopg2 -import requests -from retrying import retry - -from swh.core import config -from swh.storage import get_storage - - -def send_in_packets(objects, sender, packet_size, packet_size_bytes=None): - """Send `objects`, using the `sender`, in packets of `packet_size` objects (and - of max `packet_size_bytes`). - """ - formatted_objects = [] - count = 0 - if not packet_size_bytes: - packet_size_bytes = 0 - for obj in objects: - if not obj: - continue - formatted_objects.append(obj) - if packet_size_bytes: - count += obj['length'] - if len(formatted_objects) >= packet_size or count > packet_size_bytes: - sender(formatted_objects) - formatted_objects = [] - count = 0 - - if formatted_objects: - sender(formatted_objects) - - -def retry_loading(error): - """Retry policy when we catch a recoverable error""" - exception_classes = [ - # raised when two parallel insertions insert the same data. - psycopg2.IntegrityError, - # raised when uWSGI restarts and hungs up on the worker. - requests.exceptions.ConnectionError, - ] - - if not any(isinstance(error, exc) for exc in exception_classes): - return False - - logger = logging.getLogger('swh.loader.hg.BulkLoader') - - error_name = error.__module__ + '.' + error.__class__.__name__ - logger.warning('Retry loading a batch', exc_info=False, extra={ - 'swh_type': 'storage_retry', - 'swh_exception_type': error_name, - 'swh_exception': traceback.format_exception( - error.__class__, - error, - error.__traceback__, - ), - }) - - return True - - -class BaseLoader(config.SWHConfig): - """This base class is a pattern for loaders. - - The external calling convention is as such: - - instantiate the class once (loads storage and the configuration) - - for each origin, call load with the origin-specific arguments (for - instance, an origin URL). - - load calls several methods that must be implemented in subclasses: - - - prepare(\*args, \**kwargs) prepares the loader for the new origin - - get_origin gets the origin object associated to the current loader - - fetch_data downloads the necessary data from the origin - - get\_{contents,directories,revisions,releases,occurrences} retrieve each - kind of object from the origin - - has\_* checks whether there are some objects to load for that object - type - - get\_fetch\_history\_result retrieves the data to insert in the - fetch_history table once the load was successful - - cleanup cleans up an eventual state installed for computations - - eventful returns whether the load was eventful or not - """ - - CONFIG_BASE_FILENAME = None - - DEFAULT_CONFIG = { - 'storage': ('dict', { - 'cls': 'remote', - 'args': { - 'url': 'http://localhost:5002/' - }, - }), - 'send_contents': ('bool', True), - 'send_directories': ('bool', True), - 'send_revisions': ('bool', True), - 'send_releases': ('bool', True), - 'send_occurrences': ('bool', True), - - 'save_data': ('bool', False), - 'save_data_path': ('str', ''), - - 'content_packet_size': ('int', 10000), - 'content_packet_size_bytes': ('int', 1024 * 1024 * 1024), - 'directory_packet_size': ('int', 25000), - 'revision_packet_size': ('int', 100000), - 'release_packet_size': ('int', 100000), - 'occurrence_packet_size': ('int', 100000), - } - - ADDITIONAL_CONFIG = {} - - def __init__(self): - self.config = self.parse_config_file( - additional_configs=[self.ADDITIONAL_CONFIG]) - - # Make sure the config is sane - if self.config['save_data']: - path = self.config['save_data_path'] - os.stat(path) - if not os.access(path, os.R_OK | os.W_OK): - raise PermissionError("Permission denied: %r" % path) - - self.storage = get_storage(**self.config['storage']) - - self.log = logging.getLogger('swh.loader.hg.BulkLoader') - l = logging.getLogger('requests.packages.urllib3.connectionpool') - l.setLevel(logging.WARN) - self.visit_date = None # possibly overridden in self.prepare method - - def prepare(self, *args, **kwargs): - """Prepare the data source to be loaded""" - raise NotImplementedError - - def cleanup(self): - """Clean up an eventual state installed for computations.""" - pass - - def get_origin(self): - """Get the origin that is currently being loaded""" - raise NotImplementedError - - def fetch_data(self): - """Fetch the data from the data source""" - raise NotImplementedError - - def has_contents(self): - """Checks whether we need to load contents""" - return True - - def get_contents(self): - """Get the contents that need to be loaded""" - raise NotImplementedError - - def has_directories(self): - """Checks whether we need to load directories""" - return True - - def get_directories(self): - """Get the directories that need to be loaded""" - raise NotImplementedError - - def has_revisions(self): - """Checks whether we need to load revisions""" - return True - - def get_revisions(self): - """Get the revisions that need to be loaded""" - raise NotImplementedError - - def has_releases(self): - """Checks whether we need to load releases""" - return True - - def get_releases(self): - """Get the releases that need to be loaded""" - raise NotImplementedError - - def has_occurrences(self): - """Checks whether we need to load occurrences""" - return True - - def get_occurrences(self): - """Get the occurrences that need to be loaded""" - raise NotImplementedError - - def get_fetch_history_result(self): - """Return the data to store in fetch_history for the current loader""" - raise NotImplementedError - - def eventful(self): - """Whether the load was eventful""" - raise NotImplementedError - - def save_data(self): - """Save the data associated to the current load""" - raise NotImplementedError - - def get_save_data_path(self): - """The path to which we save the data""" - if not hasattr(self, '__save_data_path'): - origin_id = self.origin_id - year = str(self.visit_date.year) - - path = os.path.join( - self.config['save_data_path'], - "%04d" % (origin_id % 10000), - "%08d" % origin_id, - year, - ) - - os.makedirs(path, exist_ok=True) - self.__save_data_path = path - - return self.__save_data_path - - @retry(retry_on_exception=retry_loading, stop_max_attempt_number=3) - def send_contents(self, content_list): - """Actually send properly formatted contents to the database""" - num_contents = len(content_list) - log_id = str(uuid.uuid4()) - self.log.debug("Sending %d contents" % num_contents, - extra={ - 'swh_type': 'storage_send_start', - 'swh_content_type': 'content', - 'swh_num': num_contents, - 'swh_id': log_id, - }) - self.storage.content_add(content_list) - self.log.debug("Done sending %d contents" % num_contents, - extra={ - 'swh_type': 'storage_send_end', - 'swh_content_type': 'content', - 'swh_num': num_contents, - 'swh_id': log_id, - }) - - @retry(retry_on_exception=retry_loading, stop_max_attempt_number=3) - def send_directories(self, directory_list): - """Actually send properly formatted directories to the database""" - num_directories = len(directory_list) - log_id = str(uuid.uuid4()) - self.log.debug("Sending %d directories" % num_directories, - extra={ - 'swh_type': 'storage_send_start', - 'swh_content_type': 'directory', - 'swh_num': num_directories, - 'swh_id': log_id, - }) - self.storage.directory_add(directory_list) - self.log.debug("Done sending %d directories" % num_directories, - extra={ - 'swh_type': 'storage_send_end', - 'swh_content_type': 'directory', - 'swh_num': num_directories, - 'swh_id': log_id, - }) - - @retry(retry_on_exception=retry_loading, stop_max_attempt_number=3) - def send_revisions(self, revision_list): - """Actually send properly formatted revisions to the database""" - num_revisions = len(revision_list) - log_id = str(uuid.uuid4()) - self.log.debug("Sending %d revisions" % num_revisions, - extra={ - 'swh_type': 'storage_send_start', - 'swh_content_type': 'revision', - 'swh_num': num_revisions, - 'swh_id': log_id, - }) - self.storage.revision_add(revision_list) - self.log.debug("Done sending %d revisions" % num_revisions, - extra={ - 'swh_type': 'storage_send_end', - 'swh_content_type': 'revision', - 'swh_num': num_revisions, - 'swh_id': log_id, - }) - - @retry(retry_on_exception=retry_loading, stop_max_attempt_number=3) - def send_releases(self, release_list): - """Actually send properly formatted releases to the database""" - num_releases = len(release_list) - log_id = str(uuid.uuid4()) - self.log.debug("Sending %d releases" % num_releases, - extra={ - 'swh_type': 'storage_send_start', - 'swh_content_type': 'release', - 'swh_num': num_releases, - 'swh_id': log_id, - }) - self.storage.release_add(release_list) - self.log.debug("Done sending %d releases" % num_releases, - extra={ - 'swh_type': 'storage_send_end', - 'swh_content_type': 'release', - 'swh_num': num_releases, - 'swh_id': log_id, - }) - - @retry(retry_on_exception=retry_loading, stop_max_attempt_number=3) - def send_occurrences(self, occurrence_list): - """Actually send properly formatted occurrences to the database""" - num_occurrences = len(occurrence_list) - log_id = str(uuid.uuid4()) - self.log.debug("Sending %d occurrences" % num_occurrences, - extra={ - 'swh_type': 'storage_send_start', - 'swh_content_type': 'occurrence', - 'swh_num': num_occurrences, - 'swh_id': log_id, - }) - self.storage.occurrence_add(occurrence_list) - self.log.debug("Done sending %d occurrences" % num_occurrences, - extra={ - 'swh_type': 'storage_send_end', - 'swh_content_type': 'occurrence', - 'swh_num': num_occurrences, - 'swh_id': log_id, - }) - - def send_origin(self, origin): - log_id = str(uuid.uuid4()) - self.log.debug('Creating %s origin for %s' % (origin['type'], - origin['url']), - extra={ - 'swh_type': 'storage_send_start', - 'swh_content_type': 'origin', - 'swh_num': 1, - 'swh_id': log_id - }) - origin_id = self.storage.origin_add_one(origin) - self.log.debug('Done creating %s origin for %s' % (origin['type'], - origin['url']), - extra={ - 'swh_type': 'storage_send_end', - 'swh_content_type': 'origin', - 'swh_num': 1, - 'swh_id': log_id - }) - - return origin_id - - def send_all_contents(self, contents): - """Send all the contents to the database""" - packet_size = self.config['content_packet_size'] - packet_size_bytes = self.config['content_packet_size_bytes'] - - send_in_packets(contents, self.send_contents, packet_size, - packet_size_bytes=packet_size_bytes) - - def send_all_directories(self, directories): - """Send all the directories to the database""" - packet_size = self.config['directory_packet_size'] - - send_in_packets(directories, self.send_directories, packet_size) - - def send_all_revisions(self, revisions): - """Send all the revisions to the database""" - packet_size = self.config['revision_packet_size'] - - send_in_packets(revisions, self.send_revisions, packet_size) - - def send_all_releases(self, releases): - """Send all the releases to the database - """ - packet_size = self.config['release_packet_size'] - - send_in_packets(releases, self.send_releases, packet_size) - - def send_all_occurrences(self, occurrences): - """Send all the occurrences to the database - """ - packet_size = self.config['occurrence_packet_size'] - - send_in_packets(occurrences, self.send_occurrences, packet_size) - - def open_fetch_history(self): - return self.storage.fetch_history_start(self.origin_id) - - def close_fetch_history_success(self, fetch_history_id, result): - data = { - 'status': True, - 'result': result, - } - return self.storage.fetch_history_end(fetch_history_id, data) - - def close_fetch_history_failure(self, fetch_history_id): - import traceback - data = { - 'status': False, - 'stderr': traceback.format_exc(), - } - return self.storage.fetch_history_end(fetch_history_id, data) - - def load(self, *args, **kwargs): - - self.prepare(*args, **kwargs) - origin = self.get_origin() - self.origin_id = self.send_origin(origin) - - fetch_history_id = self.open_fetch_history() - if self.visit_date: # overwriting the visit_date the fetching date - date_visit = self.visit_date - else: - date_visit = datetime.datetime.now(tz=datetime.timezone.utc) - - origin_visit = self.storage.origin_visit_add( - self.origin_id, - date_visit) - self.visit = origin_visit['visit'] - - try: - self.fetch_data() - - if self.config['save_data']: - self.save_data() - - if self.config['send_contents'] and self.has_contents(): - self.send_all_contents(self.get_contents()) - - if self.config['send_directories'] and self.has_directories(): - self.send_all_directories(self.get_directories()) - - if self.config['send_revisions'] and self.has_revisions(): - self.send_all_revisions(self.get_revisions()) - - if self.config['send_releases'] and self.has_releases(): - self.send_all_releases(self.get_releases()) - - if self.config['send_occurrences'] and self.has_occurrences(): - self.send_all_occurrences(self.get_occurrences()) - - self.close_fetch_history_success(fetch_history_id, - self.get_fetch_history_result()) - self.storage.origin_visit_update( - self.origin_id, self.visit, status='full') - - except: - self.close_fetch_history_failure(fetch_history_id) - self.storage.origin_visit_update( - self.origin_id, self.visit, status='partial') - raise - finally: - self.cleanup() - - return self.eventful() diff --git a/swh/loader/mercurial/bundle20_loader.py b/swh/loader/mercurial/bundle20_loader.py index 1af8315..81c8eb0 100644 --- a/swh/loader/mercurial/bundle20_loader.py +++ b/swh/loader/mercurial/bundle20_loader.py @@ -1,297 +1,298 @@ # Copyright (C) 2017 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information """This document contains a SWH loader for ingesting repository data from Mercurial version 2 bundle files. """ # NOTE: The code here does expensive work twice in places because of the # intermediate need to check for what is missing before sending to the database # and the desire to not juggle very large amounts of data. # TODO: Decide whether to also serialize to disk and read back more quickly # from there. Maybe only for very large repos and fast drives. # - Avi import os import hglib from swh.model import hashutil, identifiers - +from swh.loader.core.loader import SWHStatelessLoader from . import converters from .bundle20_reader import Bundle20Reader from .converters import PRIMARY_ALGO as ALGO from .objects import SelectiveCache, SimpleTree -from .base import BaseLoader DEBUG = True MAX_BLOB_SIZE = 100*1024*1024 # bytes # TODO: What should MAX_BLOB_SIZE be? -class HgBundle20Loader(BaseLoader): +class HgBundle20Loader(SWHStatelessLoader): CONFIG_BASE_FILENAME = 'loader/hg' BUNDLE_FILENAME = 'HG20_none_bundle' def __init__(self): - super().__init__() + super().__init__(logging_class='swh.loader.mercurial.Bundle20Loader') self.hg = None self.tags = [] def prepare(self, origin_url, directory, visit_date): """see base.BaseLoader.prepare""" self.origin_url = origin_url + self.origin = self.get_origin() self.visit_date = visit_date self.hgdir = directory + bundle_path = os.path.join(directory, HgBundle20Loader.BUNDLE_FILENAME) if DEBUG and not os.path.isfile(bundle_path): # generate a bundle from the given directory if needed (testing) with hglib.open(directory) as repo: repo.bundle( bytes(bundle_path, 'utf-8'), all=True, type=b'none' ) self.br = Bundle20Reader(bundle_path) def get_origin(self): """Get the origin that is currently being loaded in format suitable for swh.storage.""" return { 'type': 'hg', 'url': self.origin_url } def fetch_data(self): """Fetch the data from the data source.""" pass def get_contents(self): """Get the contents that need to be loaded.""" self.file_node_to_hash = {} missing_contents = set() hash_to_info = {} self.num_contents = 0 for blob, node_info in self.br.yield_all_blobs(): self.num_contents += 1 file_name = node_info[0] header = node_info[2] blob_hash = hashutil.hash_data(blob, algorithms=set([ALGO]))[ALGO] self.file_node_to_hash[header['node']] = blob_hash hash_to_info[blob_hash] = node_info missing_contents.add(blob_hash) if file_name == b'.hgtags': # https://www.mercurial-scm.org/wiki/GitConcepts#Tag_model self.tags = blob.split(b'\n') # overwrite until the last one if not DEBUG: missing_contents = set( self.storage.content_missing(iter(missing_contents), ALGO) ) # Clusters needed blobs by file offset and then only fetches the # groups at the needed offsets. focs = {} # "file/offset/contents" for blob_hash in missing_contents: _, file_offset, header = hash_to_info[blob_hash] focs.setdefault(file_offset, {}) focs[file_offset][header['node']] = blob_hash hash_to_info = None for offset, node_hashes in sorted(focs.items()): for header, data, *_ in self.br.yield_group_objects( group_offset=offset ): node = header['node'] if node in node_hashes: blob, meta = self.br.extract_meta_from_blob(data) yield converters.blob_to_content_dict( data=blob, existing_hashes={ALGO: node_hashes[node]}, max_size=MAX_BLOB_SIZE ) # # NOTE: This is a slower but cleaner version of the code above. # for blob, node_info in self.br.yield_all_blobs(): # header = node_info[2] # node = header['node'] # blob_hash = self.file_node_to_hash[node] # if blob_hash in missing_contents: # yield converters.blob_to_content_dict( # data=blob, # existing_hashes={ALGO: blob_hash}, # max_size=MAX_BLOB_SIZE # ) def load_directories(self): """This is where the work is done to convert manifest deltas from the repository bundle into SWH directories. """ self.mnode_to_tree_id = {} base_manifests = self.br.build_manifest_hints() def tree_size(t): return t.size() self.trees = SelectiveCache(cache_hints=base_manifests, size_function=tree_size) tree = SimpleTree() for header, added, removed in self.br.yield_all_manifest_deltas( base_manifests ): node = header['node'] basenode = header['basenode'] tree = self.trees.fetch(basenode) or tree # working tree for path in removed.keys(): tree = tree.remove_tree_node_for_path(path) for path, info in added.items(): file_node, is_symlink, perms_code = info tree = tree.add_blob( path, self.file_node_to_hash[file_node], is_symlink, perms_code ) new_dirs = [] self.mnode_to_tree_id[node] = tree.hash_changed(new_dirs) self.trees.store(node, tree) yield header, tree, new_dirs def get_directories(self): """Get the directories that need to be loaded.""" missing_dirs = [] self.num_directories = 0 for header, tree, new_dirs in self.load_directories(): for d in new_dirs: self.num_directories += 1 missing_dirs.append(d['id']) missing_dirs = set(missing_dirs) if not DEBUG: missing_dirs = set( self.storage.directory_missing(missing_dirs) ) for header, tree, new_dirs in self.load_directories(): for d in new_dirs: if d['id'] in missing_dirs: yield d def get_revisions(self): """Get the revisions that need to be loaded.""" self.branches = {} revisions = {} self.num_revisions = 0 for header, commit in self.br.yield_all_changesets(): self.num_revisions += 1 date_dict = identifiers.normalize_timestamp( int(commit['time'].timestamp()) ) author_dict = converters.parse_author(commit['user']) if commit['manifest'] == Bundle20Reader.NAUGHT_NODE: directory_id = SimpleTree().hash_changed() else: directory_id = self.mnode_to_tree_id[commit['manifest']] extra_meta = [] extra = commit.get('extra') if extra: for e in extra.split(b'\x00'): k, v = e.split(b':', 1) k = k.decode('utf-8') extra_meta.append([k, v]) if k == 'branch': # needed for Occurrences self.branches[v] = header['node'] revision = { 'author': author_dict, 'date': date_dict, 'committer': author_dict, 'committer_date': date_dict, 'type': 'hg', 'directory': directory_id, 'message': commit['message'], 'metadata': { 'node': header['node'], 'extra_headers': [ ['time_offset_seconds', commit['time_offset_seconds']], ] + extra_meta }, 'synthetic': False, 'parents': [ header['p1'], header['p2'] ] } revision['id'] = identifiers.revision_identifier(revision) revisions[revision['id']] = revision missing_revs = revisions.keys() if not DEBUG: missing_revs = set( self.storage.revision_missing(missing_revs) ) for r in missing_revs: yield revisions[r] self.mnode_to_tree_id = None def get_releases(self): """Get the releases that need to be loaded.""" releases = {} self.num_releases = 0 for t in self.tags: self.num_releases += 1 node, name = t.split(b' ') release = { 'name': name, 'target': node, 'target_type': 'revision', 'message': None, 'metadata': None, 'synthetic': False, 'author': None, 'date': None } id_hash = identifiers.release_identifier(release) release['id'] = id_hash releases[id_hash] = release yield from releases.values() def get_occurrences(self): """Get the occurrences that need to be loaded.""" self.num_occurrences = 0 for name, target in self.branches.items(): self.num_occurrences += 1 yield { 'branch': name, 'origin': self.origin_url, 'target': target, 'target_type': 'revision', 'visit': self.visit, } def get_fetch_history_result(self): """Return the data to store in fetch_history.""" return { 'contents': self.num_contents, 'directories': self.num_directories, 'revisions': self.num_revisions, 'releases': self.num_releases, 'occurrences': self.num_occurrences } diff --git a/swh/loader/mercurial/slow_loader.py b/swh/loader/mercurial/slow_loader.py index d2c05a9..c7cb3c7 100644 --- a/swh/loader/mercurial/slow_loader.py +++ b/swh/loader/mercurial/slow_loader.py @@ -1,415 +1,424 @@ # Copyright (C) 2017 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information # WARNING WARNING WARNING WARNING # hglib is too slow to be super useful. Unfortunately it's also the only # python3 library for mercurial as of this writing. - Avi import datetime import hglib import os from swh.model import identifiers +from swh.loader.core.loader import SWHStatelessLoader -from . import base, converters +from . import converters from .archive_extract import tmp_extract # TODO: What should this be? # swh-model/identifiers.py:identifier_to_bytes has a restrictive length check # in it which prevents using blake2 with hashutil.hash_to_hex ALGO = 'sha1_git' OS_PATH_SEP = os.path.sep.encode('utf-8') class SimpleBlob: """ Stores basic metadata for a blob object. """ kind = 'file' def __init__(self, file_hash, file_mode): self.hash = file_hash if not isinstance(file_mode, int): self.mode = 0o100000 + int(file_mode, 8) else: self.mode = file_mode class SimpleTree(dict): """ Stores metadata for a nested 'tree'-like object. """ kind = 'dir' mode = 0o040000 def add_tree_node_for_path(self, path): """Deeply nests SimpleTrees according to a directory path and returns a cursor to the deepest one""" node = self for d in path.split(OS_PATH_SEP): node = node.setdefault(d, SimpleTree()) return node def remove_tree_node_for_path(self, path): """Deletes a SimpleBlob from inside nested SimpleTrees according to the given file path""" first, sep, rest = path.partition(OS_PATH_SEP) if rest: self[first].remove_tree_node_for_path(rest) if not self.get(first): del self[first] else: del self[first] def add_blob(self, file_path, file_hash, file_mode): """Deeply nests a SimpleBlob inside nested SimpleTrees according to the given file path""" fdir = os.path.dirname(file_path) fbase = os.path.basename(file_path) if fdir: node = self.add_tree_node_for_path(fdir) else: node = self node[fbase] = SimpleBlob(file_hash, file_mode) -class HgLoader(base.BaseLoader): +class HgLoader(SWHStatelessLoader): """Load a mercurial repository from a directory. - """ + """ CONFIG_BASE_FILENAME = 'loader/hg' + def __init__(self, logging_class='swh.loader.mercurial.HgLoader'): + super().__init__(logging_class=logging_class) + def prepare(self, origin_url, directory, visit_date): """see base.BaseLoader.prepare""" self.origin_url = origin_url self.repo = hglib.open(directory) self.visit_date = visit_date self.node_to_blob_hash = {} self.blob_hash_to_file_rev = {} self.commit_trees = {} self.unique_trees = {} self.revisions = {} def get_origin(self): """Get the origin that is currently being loaded in format suitable for swh.storage""" return { 'type': 'hg', 'url': self.origin_url } def fetch_data(self): """Fetch the data from the data source""" pass def has_contents(self): """Checks whether we need to load contents""" # if we have any revisions, then obviously we have contents. return self.has_revisions() def iter_changelog(self): """Iterate over the repository log""" yield from self.repo.log('0:tip', removed=True) def get_node_file_if_new(self, f, rev, node_hash): """Load a blob from disk""" # Fast if the node hash is already cached. Somehow this shortcuts a # meaningful but not huge percentage of the loads for a repository. if node_hash not in self.node_to_blob_hash: file_path = os.path.join(self.repo.root(), f) data = self.repo.cat([file_path], rev) blob_hash = identifiers.content_identifier( {'data': data} )[ALGO] self.node_to_blob_hash[node_hash] = blob_hash if blob_hash not in self.blob_hash_to_file_rev: # new blob self.blob_hash_to_file_rev[blob_hash] = (file_path, rev) return blob_hash, data return self.node_to_blob_hash[node_hash], None def get_content_ids(self): """Get all the contents, but trim away the actual data""" self.node_to_blob_hash = {} self.blob_hash_to_file_rev = {} self.num_contents = 0 for li in self.iter_changelog(): c = self.repo[li] rev = c.rev() manifest = c.manifest() for f in c.added() + c.modified(): node_hash = manifest[f] blob_hash, data = self.get_node_file_if_new(f, rev, node_hash) if data is not None: # new blob self.num_contents += 1 yield converters.data_to_content_id(data) def get_contents(self): """Get the contents that need to be loaded""" # This method unfortunately loads and hashes the blobs twice. max_content_size = self.config['content_size_limit'] missing_contents = set( self.storage.content_missing( self.get_content_ids(), ALGO ) ) for oid in missing_contents: file_path, rev = self.blob_hash_to_file_rev[oid] data = self.repo.cat([file_path], rev) yield converters.blob_to_content_dict( data, max_size=max_content_size, logger=self.log ) def has_directories(self): """Checks whether we need to load directories""" # if we have any revs, we must also have dirs return self.has_revisions() def get_directories(self): """Get the directories that need to be loaded""" missing_dirs = set(self.storage.directory_missing( sorted(self.unique_trees.keys()) )) for dir_hash in missing_dirs: yield self.unique_trees[dir_hash] def has_revisions(self): """Checks whether we need to load revisions""" self.num_revisions = int(self.repo.tip()[0]) + 1 return self.num_revisions > 0 def update_tree_from_rev(self, tree, rev, only_these_files=None): """Iterates over changes in a revision and adds corresponding SimpleBlobs to a SimpleTree""" if rev >= 0: manifest = {k[4]: k for k in self.repo.manifest(rev=rev)} loop_keys = only_these_files or manifest.keys() for f in loop_keys: node_hash = manifest[f][0] file_mode = manifest[f][1] file_hash, _ = self.get_node_file_if_new(f, rev, node_hash) tree.add_blob(f, file_hash, file_mode) return tree def reconstruct_tree(self, directory): """Converts a flat directory into nested SimpleTrees.""" # This method exists because the code was already written to use # SimpleTree before then reducing memory use and converting to the # canonical format. A refactor using lookups instead of nesting could # obviate the need. new_tree = SimpleTree() for entry in directory['entries']: tgt = entry['target'] perms = entry['perms'] name = entry['name'] if tgt in self.unique_trees: # subtree new_tree[name] = self.reconstruct_tree(self.unique_trees[tgt]) else: # blob new_tree[name] = SimpleBlob(tgt, perms) new_tree.hash = directory['id'] return new_tree def collapse_tree(self, tree): """Converts nested SimpleTrees into multiple flat directories.""" # This method exists because the code was already written to use # SimpleTree before then reducing memory use and converting to the # canonical format. A refactor using lookups instead of nesting could # obviate the need. directory = { 'entries': [ { 'name': k, 'perms': v.mode, 'type': v.kind, 'target': (isinstance(v, SimpleBlob) and v.hash or self.collapse_tree(v)) } for k, v in tree.items() ] } tree.hash = identifiers.directory_identifier(directory) directory['id'] = tree.hash self.unique_trees[tree.hash] = directory return tree.hash def get_revision_ids(self): """Get the revisions that need to be loaded""" self.unique_trees = {} commit_tree = None for li in self.iter_changelog(): c = self.repo[li[1]] rev = c.rev() # start from the parent state p1 = c.p1().rev() if p1 in self.commit_trees: if p1 != rev-1: # Most of the time, a revision will inherit from the # previous one. In those cases we can reuse commit_tree, # otherwise build a new one here. parent_tree = self.unique_trees[self.commit_trees[p1]] commit_tree = self.reconstruct_tree(parent_tree) else: commit_tree = self.update_tree_from_rev(SimpleTree(), p1) # remove whatever is removed for f in c.removed(): commit_tree.remove_tree_node_for_path(f) # update whatever is updated self.update_tree_from_rev(commit_tree, rev, c.added()+c.modified()) self.commit_trees[rev] = self.collapse_tree(commit_tree) date_dict = identifiers.normalize_timestamp( int(c.date().timestamp()) ) author_dict = converters.parse_author(c.author()) revision = { 'author': author_dict, 'date': date_dict, 'committer': author_dict, 'committer_date': date_dict, 'type': 'hg', 'directory': commit_tree.hash, 'message': c.description(), 'metadata': { 'extra_headers': [ ['phase', c.phase()], ['rev', rev], ['hidden', c.hidden()] ] }, 'synthetic': False, 'parents': [ self.revisions[p.node()]['id'] for p in c.parents() if p.rev() >= 0 ] } revision['id'] = identifiers.revision_identifier(revision) self.revisions[c.node()] = revision for n, r in self.revisions.items(): yield {'node': n, 'id': r['id']} def get_revisions(self): """Get the revision identifiers from the repository""" revs = {r['id']: r['node'] for r in self.get_revision_ids()} missing_revs = set(self.storage.revision_missing(revs.keys())) for r in missing_revs: yield self.revisions[revs[r]] def has_releases(self): """Checks whether we need to load releases""" self.num_releases = len([t for t in self.repo.tags() if not t[3]]) return self.num_releases > 0 def get_releases(self): """Get the releases that need to be loaded""" releases = {} for t in self.repo.tags(): islocal = t[3] name = t[0] if (name != b'tip' and not islocal): short_hash = t[2] target = self.revisions[self.repo[short_hash].node()]['id'] release = { 'name': name, 'target': target, 'target_type': 'revision', 'message': None, 'metadata': None, 'synthetic': False, 'author': None, 'date': None } id_hash = identifiers.release_identifier(release) release['id'] = id_hash releases[id_hash] = release missing_rels = set(self.storage.release_missing( sorted(releases.keys()) )) yield from (releases[r] for r in missing_rels) def has_occurrences(self): """Checks whether we need to load occurrences""" self.num_occurrences = len( self.repo.tags() + self.repo.branches() + self.repo.bookmarks()[0] ) return self.num_occurrences > 0 def get_occurrences(self): """Get the occurrences that need to be loaded""" for t in ( self.repo.tags() + self.repo.branches() + self.repo.bookmarks()[0] ): name = t[0] short_hash = t[2] target = self.revisions[self.repo[short_hash].node()]['id'] yield { 'branch': name, 'origin': self.origin_id, 'target': target, 'target_type': 'revision', 'visit': self.visit, } def get_fetch_history_result(self): """Return the data to store in fetch_history for the current loader""" return { 'contents': self.num_contents, 'directories': len(self.unique_trees), 'revisions': self.num_revisions, 'releases': self.num_releases, 'occurrences': self.num_occurrences, } def save_data(self): """We already have the data locally, no need to save it""" pass def eventful(self): """Whether the load was eventful""" return True class HgLoaderFromArchive(HgLoader): """Load an HG repository from a compressed archive. + """ + def __init__(self): + super().__init__( + logging_class='swh.loader.mercurial.HgLoaderFromArchive') + def prepare(self, origin_url, archive_path, visit_date): tmpdir = tmp_extract(archive=archive_path, prefix='swh.loader.hg.', log=self.log, source=origin_url) super().prepare(origin_url, tmpdir.name, visit_date) if __name__ == '__main__': import logging import sys logging.basicConfig( level=logging.DEBUG, format='%(asctime)s %(process)d %(message)s' ) loader = HgLoader() origin_url = sys.argv[1] directory = sys.argv[2] visit_date = datetime.datetime.now(tz=datetime.timezone.utc) print(loader.load(origin_url, directory, visit_date))