diff --git a/swh/loader/mercurial/base.py b/swh/loader/mercurial/base.py index b76adf7..92a2b62 100644 --- a/swh/loader/mercurial/base.py +++ b/swh/loader/mercurial/base.py @@ -1,458 +1,458 @@ # Copyright (C) 2016-2017 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information # TODO: This document mirrors swh-loader-git/swh/loader/git/base.py exactly # (minus the logger names and this comment) # Please merge the files at a shared level at next opportunity. import datetime import logging import os import traceback import uuid import psycopg2 import requests from retrying import retry from swh.core import config from swh.storage import get_storage def send_in_packets(objects, sender, packet_size, packet_size_bytes=None): """Send `objects`, using the `sender`, in packets of `packet_size` objects (and of max `packet_size_bytes`). """ formatted_objects = [] count = 0 if not packet_size_bytes: packet_size_bytes = 0 for obj in objects: if not obj: continue formatted_objects.append(obj) if packet_size_bytes: count += obj['length'] if len(formatted_objects) >= packet_size or count > packet_size_bytes: sender(formatted_objects) formatted_objects = [] count = 0 if formatted_objects: sender(formatted_objects) def retry_loading(error): """Retry policy when we catch a recoverable error""" exception_classes = [ # raised when two parallel insertions insert the same data. psycopg2.IntegrityError, # raised when uWSGI restarts and hungs up on the worker. requests.exceptions.ConnectionError, ] if not any(isinstance(error, exc) for exc in exception_classes): return False logger = logging.getLogger('swh.loader.hg.BulkLoader') error_name = error.__module__ + '.' + error.__class__.__name__ logger.warning('Retry loading a batch', exc_info=False, extra={ 'swh_type': 'storage_retry', 'swh_exception_type': error_name, 'swh_exception': traceback.format_exception( error.__class__, error, error.__traceback__, ), }) return True class BaseLoader(config.SWHConfig): """This base class is a pattern for loaders. The external calling convention is as such: - instantiate the class once (loads storage and the configuration) - for each origin, call load with the origin-specific arguments (for instance, an origin URL). load calls several methods that must be implemented in subclasses: - prepare(\*args, \**kwargs) prepares the loader for the new origin - get_origin gets the origin object associated to the current loader - fetch_data downloads the necessary data from the origin - get\_{contents,directories,revisions,releases,occurrences} retrieve each kind of object from the origin - has\_* checks whether there are some objects to load for that object type - get\_fetch\_history\_result retrieves the data to insert in the fetch_history table once the load was successful - cleanup cleans up an eventual state installed for computations - eventful returns whether the load was eventful or not """ CONFIG_BASE_FILENAME = None DEFAULT_CONFIG = { 'storage': ('dict', { 'cls': 'remote', 'args': { 'url': 'http://localhost:5002/' }, }), 'send_contents': ('bool', True), 'send_directories': ('bool', True), 'send_revisions': ('bool', True), 'send_releases': ('bool', True), 'send_occurrences': ('bool', True), 'save_data': ('bool', False), 'save_data_path': ('str', ''), 'content_packet_size': ('int', 10000), 'content_packet_size_bytes': ('int', 1024 * 1024 * 1024), 'directory_packet_size': ('int', 25000), 'revision_packet_size': ('int', 100000), 'release_packet_size': ('int', 100000), 'occurrence_packet_size': ('int', 100000), } ADDITIONAL_CONFIG = {} def __init__(self): self.config = self.parse_config_file( additional_configs=[self.ADDITIONAL_CONFIG]) # Make sure the config is sane if self.config['save_data']: path = self.config['save_data_path'] os.stat(path) if not os.access(path, os.R_OK | os.W_OK): raise PermissionError("Permission denied: %r" % path) self.storage = get_storage(**self.config['storage']) self.log = logging.getLogger('swh.loader.hg.BulkLoader') - self.fetch_date = None # possibly overridden in self.prepare method + self.visit_date = None # possibly overridden in self.prepare method def prepare(self, *args, **kwargs): """Prepare the data source to be loaded""" raise NotImplementedError def cleanup(self): """Clean up an eventual state installed for computations.""" pass def get_origin(self): """Get the origin that is currently being loaded""" raise NotImplementedError def fetch_data(self): """Fetch the data from the data source""" raise NotImplementedError def has_contents(self): """Checks whether we need to load contents""" return True def get_contents(self): """Get the contents that need to be loaded""" raise NotImplementedError def has_directories(self): """Checks whether we need to load directories""" return True def get_directories(self): """Get the directories that need to be loaded""" raise NotImplementedError def has_revisions(self): """Checks whether we need to load revisions""" return True def get_revisions(self): """Get the revisions that need to be loaded""" raise NotImplementedError def has_releases(self): """Checks whether we need to load releases""" return True def get_releases(self): """Get the releases that need to be loaded""" raise NotImplementedError def has_occurrences(self): """Checks whether we need to load occurrences""" return True def get_occurrences(self): """Get the occurrences that need to be loaded""" raise NotImplementedError def get_fetch_history_result(self): """Return the data to store in fetch_history for the current loader""" raise NotImplementedError def eventful(self): """Whether the load was eventful""" raise NotImplementedError def save_data(self): """Save the data associated to the current load""" raise NotImplementedError def get_save_data_path(self): """The path to which we save the data""" if not hasattr(self, '__save_data_path'): origin_id = self.origin_id - year = str(self.fetch_date.year) + year = str(self.visit_date.year) path = os.path.join( self.config['save_data_path'], "%04d" % (origin_id % 10000), "%08d" % origin_id, year, ) os.makedirs(path, exist_ok=True) self.__save_data_path = path return self.__save_data_path @retry(retry_on_exception=retry_loading, stop_max_attempt_number=3) def send_contents(self, content_list): """Actually send properly formatted contents to the database""" num_contents = len(content_list) log_id = str(uuid.uuid4()) self.log.debug("Sending %d contents" % num_contents, extra={ 'swh_type': 'storage_send_start', 'swh_content_type': 'content', 'swh_num': num_contents, 'swh_id': log_id, }) self.storage.content_add(content_list) self.log.debug("Done sending %d contents" % num_contents, extra={ 'swh_type': 'storage_send_end', 'swh_content_type': 'content', 'swh_num': num_contents, 'swh_id': log_id, }) @retry(retry_on_exception=retry_loading, stop_max_attempt_number=3) def send_directories(self, directory_list): """Actually send properly formatted directories to the database""" num_directories = len(directory_list) log_id = str(uuid.uuid4()) self.log.debug("Sending %d directories" % num_directories, extra={ 'swh_type': 'storage_send_start', 'swh_content_type': 'directory', 'swh_num': num_directories, 'swh_id': log_id, }) self.storage.directory_add(directory_list) self.log.debug("Done sending %d directories" % num_directories, extra={ 'swh_type': 'storage_send_end', 'swh_content_type': 'directory', 'swh_num': num_directories, 'swh_id': log_id, }) @retry(retry_on_exception=retry_loading, stop_max_attempt_number=3) def send_revisions(self, revision_list): """Actually send properly formatted revisions to the database""" num_revisions = len(revision_list) log_id = str(uuid.uuid4()) self.log.debug("Sending %d revisions" % num_revisions, extra={ 'swh_type': 'storage_send_start', 'swh_content_type': 'revision', 'swh_num': num_revisions, 'swh_id': log_id, }) self.storage.revision_add(revision_list) self.log.debug("Done sending %d revisions" % num_revisions, extra={ 'swh_type': 'storage_send_end', 'swh_content_type': 'revision', 'swh_num': num_revisions, 'swh_id': log_id, }) @retry(retry_on_exception=retry_loading, stop_max_attempt_number=3) def send_releases(self, release_list): """Actually send properly formatted releases to the database""" num_releases = len(release_list) log_id = str(uuid.uuid4()) self.log.debug("Sending %d releases" % num_releases, extra={ 'swh_type': 'storage_send_start', 'swh_content_type': 'release', 'swh_num': num_releases, 'swh_id': log_id, }) self.storage.release_add(release_list) self.log.debug("Done sending %d releases" % num_releases, extra={ 'swh_type': 'storage_send_end', 'swh_content_type': 'release', 'swh_num': num_releases, 'swh_id': log_id, }) @retry(retry_on_exception=retry_loading, stop_max_attempt_number=3) def send_occurrences(self, occurrence_list): """Actually send properly formatted occurrences to the database""" num_occurrences = len(occurrence_list) log_id = str(uuid.uuid4()) self.log.debug("Sending %d occurrences" % num_occurrences, extra={ 'swh_type': 'storage_send_start', 'swh_content_type': 'occurrence', 'swh_num': num_occurrences, 'swh_id': log_id, }) self.storage.occurrence_add(occurrence_list) self.log.debug("Done sending %d occurrences" % num_occurrences, extra={ 'swh_type': 'storage_send_end', 'swh_content_type': 'occurrence', 'swh_num': num_occurrences, 'swh_id': log_id, }) def send_origin(self, origin): log_id = str(uuid.uuid4()) self.log.debug('Creating %s origin for %s' % (origin['type'], origin['url']), extra={ 'swh_type': 'storage_send_start', 'swh_content_type': 'origin', 'swh_num': 1, 'swh_id': log_id }) origin_id = self.storage.origin_add_one(origin) self.log.debug('Done creating %s origin for %s' % (origin['type'], origin['url']), extra={ 'swh_type': 'storage_send_end', 'swh_content_type': 'origin', 'swh_num': 1, 'swh_id': log_id }) return origin_id def send_all_contents(self, contents): """Send all the contents to the database""" packet_size = self.config['content_packet_size'] packet_size_bytes = self.config['content_packet_size_bytes'] send_in_packets(contents, self.send_contents, packet_size, packet_size_bytes=packet_size_bytes) def send_all_directories(self, directories): """Send all the directories to the database""" packet_size = self.config['directory_packet_size'] send_in_packets(directories, self.send_directories, packet_size) def send_all_revisions(self, revisions): """Send all the revisions to the database""" packet_size = self.config['revision_packet_size'] send_in_packets(revisions, self.send_revisions, packet_size) def send_all_releases(self, releases): """Send all the releases to the database """ packet_size = self.config['release_packet_size'] send_in_packets(releases, self.send_releases, packet_size) def send_all_occurrences(self, occurrences): """Send all the occurrences to the database """ packet_size = self.config['occurrence_packet_size'] send_in_packets(occurrences, self.send_occurrences, packet_size) def open_fetch_history(self): return self.storage.fetch_history_start(self.origin_id) def close_fetch_history_success(self, fetch_history_id, result): data = { 'status': True, 'result': result, } return self.storage.fetch_history_end(fetch_history_id, data) def close_fetch_history_failure(self, fetch_history_id): import traceback data = { 'status': False, 'stderr': traceback.format_exc(), } return self.storage.fetch_history_end(fetch_history_id, data) def load(self, *args, **kwargs): self.prepare(*args, **kwargs) origin = self.get_origin() self.origin_id = self.send_origin(origin) fetch_history_id = self.open_fetch_history() - if self.fetch_date: # overwriting the visit_date the fetching date - date_visit = self.fetch_date + if self.visit_date: # overwriting the visit_date the fetching date + date_visit = self.visit_date else: date_visit = datetime.datetime.now(tz=datetime.timezone.utc) origin_visit = self.storage.origin_visit_add( self.origin_id, date_visit) self.visit = origin_visit['visit'] try: self.fetch_data() if self.config['save_data']: self.save_data() if self.config['send_contents'] and self.has_contents(): self.send_all_contents(self.get_contents()) if self.config['send_directories'] and self.has_directories(): self.send_all_directories(self.get_directories()) if self.config['send_revisions'] and self.has_revisions(): self.send_all_revisions(self.get_revisions()) if self.config['send_releases'] and self.has_releases(): self.send_all_releases(self.get_releases()) if self.config['send_occurrences'] and self.has_occurrences(): self.send_all_occurrences(self.get_occurrences()) self.close_fetch_history_success(fetch_history_id, self.get_fetch_history_result()) self.storage.origin_visit_update( self.origin_id, self.visit, status='full') except: self.close_fetch_history_failure(fetch_history_id) self.storage.origin_visit_update( self.origin_id, self.visit, status='partial') raise finally: self.cleanup() return self.eventful() diff --git a/swh/loader/mercurial/bundle20_loader.py b/swh/loader/mercurial/bundle20_loader.py index b227af7..106aa91 100644 --- a/swh/loader/mercurial/bundle20_loader.py +++ b/swh/loader/mercurial/bundle20_loader.py @@ -1,296 +1,296 @@ # Copyright (C) 2017 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information """This document contains a SWH loader for ingesting repository data from Mercurial version 2 bundle files. """ # NOTE: The code here does expensive work twice in places because of the # intermediate need to check for what is missing before sending to the database # and the desire to not juggle very large amounts of data. # TODO: Decide whether to also serialize to disk and read back more quickly # from there. Maybe only for very large repos and fast drives. # - Avi import os import hglib from swh.model import hashutil, identifiers from . import converters from .bundle20_reader import Bundle20Reader from .converters import PRIMARY_ALGO as ALGO from .objects import SelectiveCache, SimpleTree from .base import BaseLoader DEBUG = True MAX_BLOB_SIZE = 100*1024*1024 # bytes # TODO: What should MAX_BLOB_SIZE be? class HgBundle20Loader(BaseLoader): - CONFIG_BASE_FILENAME = 'loader/hg-loader' + CONFIG_BASE_FILENAME = 'loader/hg' BUNDLE_FILENAME = 'HG20_none_bundle' def __init__(self): self.hg = None self.tags = [] - def prepare(self, origin_url, directory, fetch_date): + def prepare(self, origin_url, directory, visit_date): """see base.BaseLoader.prepare""" self.origin_url = origin_url - self.fetch_date = fetch_date + self.visit_date = visit_date self.hgdir = directory bundle_path = os.path.join(directory, HgBundle20Loader.BUNDLE_FILENAME) if DEBUG and not os.path.isfile(bundle_path): # generate a bundle from the given directory if needed (testing) with hglib.open(directory) as repo: repo.bundle( bytes(bundle_path, 'utf-8'), all=True, type=b'none' ) self.br = Bundle20Reader(bundle_path) def get_origin(self): """Get the origin that is currently being loaded in format suitable for swh.storage.""" return { 'type': 'hg', 'url': self.origin_url } def fetch_data(self): """Fetch the data from the data source.""" pass def get_contents(self): """Get the contents that need to be loaded.""" self.file_node_to_hash = {} missing_contents = set() hash_to_info = {} self.num_contents = 0 for blob, node_info in self.br.yield_all_blobs(): self.num_contents += 1 file_name = node_info[0] header = node_info[2] blob_hash = hashutil.hash_data(blob, algorithms=set([ALGO]))[ALGO] self.file_node_to_hash[header['node']] = blob_hash hash_to_info[blob_hash] = node_info missing_contents.add(blob_hash) if file_name == b'.hgtags': # https://www.mercurial-scm.org/wiki/GitConcepts#Tag_model self.tags = blob.split(b'\n') # overwrite until the last one if not DEBUG: missing_contents = set( self.storage.content_missing(iter(missing_contents), ALGO) ) # Clusters needed blobs by file offset and then only fetches the # groups at the needed offsets. focs = {} # "file/offset/contents" for blob_hash in missing_contents: _, file_offset, header = hash_to_info[blob_hash] focs.setdefault(file_offset, {}) focs[file_offset][header['node']] = blob_hash hash_to_info = None for offset, node_hashes in sorted(focs.items()): for header, data, *_ in self.br.yield_group_objects( group_offset=offset ): node = header['node'] if node in node_hashes: blob, meta = self.br.extract_meta_from_blob(data) yield converters.blob_to_content_dict( data=blob, existing_hashes={ALGO: node_hashes[node]}, max_size=MAX_BLOB_SIZE ) # # NOTE: This is a slower but cleaner version of the code above. # for blob, node_info in self.br.yield_all_blobs(): # header = node_info[2] # node = header['node'] # blob_hash = self.file_node_to_hash[node] # if blob_hash in missing_contents: # yield converters.blob_to_content_dict( # data=blob, # existing_hashes={ALGO: blob_hash}, # max_size=MAX_BLOB_SIZE # ) def load_directories(self): """This is where the work is done to convert manifest deltas from the repository bundle into SWH directories. """ self.mnode_to_tree_id = {} base_manifests = self.br.build_manifest_hints() def tree_size(t): return t.size() self.trees = SelectiveCache(cache_hints=base_manifests, size_function=tree_size) tree = SimpleTree() for header, added, removed in self.br.yield_all_manifest_deltas( base_manifests ): node = header['node'] basenode = header['basenode'] tree = self.trees.fetch(basenode) or tree # working tree for path in removed.keys(): tree = tree.remove_tree_node_for_path(path) for path, info in added.items(): file_node, is_symlink, perms_code = info tree = tree.add_blob( path, self.file_node_to_hash[file_node], is_symlink, perms_code ) new_dirs = [] self.mnode_to_tree_id[node] = tree.hash_changed(new_dirs) self.trees.store(node, tree) yield header, tree, new_dirs def get_directories(self): """Get the directories that need to be loaded.""" missing_dirs = [] self.num_directories = 0 for header, tree, new_dirs in self.load_directories(): for d in new_dirs: self.num_directories += 1 missing_dirs.append(d['id']) missing_dirs = set(missing_dirs) if not DEBUG: missing_dirs = set( self.storage.directory_missing(missing_dirs) ) for header, tree, new_dirs in self.load_directories(): for d in new_dirs: if d['id'] in missing_dirs: yield d def get_revisions(self): """Get the revisions that need to be loaded.""" self.branches = {} revisions = {} self.num_revisions = 0 for header, commit in self.br.yield_all_changesets(): self.num_revisions += 1 date_dict = identifiers.normalize_timestamp( int(commit['time'].timestamp()) ) author_dict = converters.parse_author(commit['user']) if commit['manifest'] == Bundle20Reader.NAUGHT_NODE: directory_id = SimpleTree().hash_changed() else: directory_id = self.mnode_to_tree_id[commit['manifest']] extra_meta = [] extra = commit.get('extra') if extra: for e in extra.split(b'\x00'): k, v = e.split(b':', 1) k = k.decode('utf-8') extra_meta.append([k, v]) if k == 'branch': # needed for Occurrences self.branches[v] = header['node'] revision = { 'author': author_dict, 'date': date_dict, 'committer': author_dict, 'committer_date': date_dict, 'type': 'hg', 'directory': directory_id, 'message': commit['message'], 'metadata': { 'node': header['node'], 'extra_headers': [ ['time_offset_seconds', commit['time_offset_seconds']], ] + extra_meta }, 'synthetic': False, 'parents': [ header['p1'], header['p2'] ] } revision['id'] = identifiers.revision_identifier(revision) revisions[revision['id']] = revision missing_revs = revisions.keys() if not DEBUG: missing_revs = set( self.storage.revision_missing(missing_revs) ) for r in missing_revs: yield revisions[r] self.mnode_to_tree_id = None def get_releases(self): """Get the releases that need to be loaded.""" releases = {} self.num_releases = 0 for t in self.tags: self.num_releases += 1 node, name = t.split(b' ') release = { 'name': name, 'target': node, 'target_type': 'revision', 'message': None, 'metadata': None, 'synthetic': False, 'author': None, 'date': None } id_hash = identifiers.release_identifier(release) release['id'] = id_hash releases[id_hash] = release yield from releases.values() def get_occurrences(self): """Get the occurrences that need to be loaded.""" self.num_occurrences = 0 for name, target in self.branches.items(): self.num_occurrences += 1 yield { 'branch': name, 'origin': self.origin_url, 'target': target, 'target_type': 'revision', 'visit': self.visit, } def get_fetch_history_result(self): """Return the data to store in fetch_history.""" return { 'contents': self.num_contents, 'directories': self.num_directories, 'revisions': self.num_revisions, 'releases': self.num_releases, 'occurrences': self.num_occurrences } diff --git a/swh/loader/mercurial/slow_loader.py b/swh/loader/mercurial/slow_loader.py index e9b8ab5..19870bf 100644 --- a/swh/loader/mercurial/slow_loader.py +++ b/swh/loader/mercurial/slow_loader.py @@ -1,415 +1,415 @@ # Copyright (C) 2017 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information # WARNING WARNING WARNING WARNING # hglib is too slow to be super useful. Unfortunately it's also the only # python3 library for mercurial as of this writing. - Avi import datetime import hglib import os from swh.model import identifiers from . import base, converters from .archive_extract import tmp_extract # TODO: What should this be? # swh-model/identifiers.py:identifier_to_bytes has a restrictive length check # in it which prevents using blake2 with hashutil.hash_to_hex ALGO = 'sha1_git' OS_PATH_SEP = os.path.sep.encode('utf-8') class SimpleBlob: """ Stores basic metadata for a blob object. """ kind = 'file' def __init__(self, file_hash, file_mode): self.hash = file_hash if not isinstance(file_mode, int): self.mode = 0o100000 + int(file_mode, 8) else: self.mode = file_mode class SimpleTree(dict): """ Stores metadata for a nested 'tree'-like object. """ kind = 'dir' mode = 0o040000 def add_tree_node_for_path(self, path): """Deeply nests SimpleTrees according to a directory path and returns a cursor to the deepest one""" node = self for d in path.split(OS_PATH_SEP): node = node.setdefault(d, SimpleTree()) return node def remove_tree_node_for_path(self, path): """Deletes a SimpleBlob from inside nested SimpleTrees according to the given file path""" first, sep, rest = path.partition(OS_PATH_SEP) if rest: self[first].remove_tree_node_for_path(rest) if not self.get(first): del self[first] else: del self[first] def add_blob(self, file_path, file_hash, file_mode): """Deeply nests a SimpleBlob inside nested SimpleTrees according to the given file path""" fdir = os.path.dirname(file_path) fbase = os.path.basename(file_path) if fdir: node = self.add_tree_node_for_path(fdir) else: node = self node[fbase] = SimpleBlob(file_hash, file_mode) class HgLoader(base.BaseLoader): """Load a mercurial repository from a directory. """ CONFIG_BASE_FILENAME = 'loader/hg-loader' - def prepare(self, origin_url, directory, fetch_date): + def prepare(self, origin_url, directory, visit_date): """see base.BaseLoader.prepare""" self.origin_url = origin_url self.repo = hglib.open(directory) - self.fetch_date = fetch_date + self.visit_date = visit_date self.node_to_blob_hash = {} self.blob_hash_to_file_rev = {} self.commit_trees = {} self.unique_trees = {} self.revisions = {} def get_origin(self): """Get the origin that is currently being loaded in format suitable for swh.storage""" return { 'type': 'hg', 'url': self.origin_url } def fetch_data(self): """Fetch the data from the data source""" pass def has_contents(self): """Checks whether we need to load contents""" # if we have any revisions, then obviously we have contents. return self.has_revisions() def iter_changelog(self): """Iterate over the repository log""" yield from self.repo.log('0:tip', removed=True) def get_node_file_if_new(self, f, rev, node_hash): """Load a blob from disk""" # Fast if the node hash is already cached. Somehow this shortcuts a # meaningful but not huge percentage of the loads for a repository. if node_hash not in self.node_to_blob_hash: file_path = os.path.join(self.repo.root(), f) data = self.repo.cat([file_path], rev) blob_hash = identifiers.content_identifier( {'data': data} )[ALGO] self.node_to_blob_hash[node_hash] = blob_hash if blob_hash not in self.blob_hash_to_file_rev: # new blob self.blob_hash_to_file_rev[blob_hash] = (file_path, rev) return blob_hash, data return self.node_to_blob_hash[node_hash], None def get_content_ids(self): """Get all the contents, but trim away the actual data""" self.node_to_blob_hash = {} self.blob_hash_to_file_rev = {} self.num_contents = 0 for li in self.iter_changelog(): c = self.repo[li] rev = c.rev() manifest = c.manifest() for f in c.added() + c.modified(): node_hash = manifest[f] blob_hash, data = self.get_node_file_if_new(f, rev, node_hash) if data is not None: # new blob self.num_contents += 1 yield converters.data_to_content_id(data) def get_contents(self): """Get the contents that need to be loaded""" # This method unfortunately loads and hashes the blobs twice. max_content_size = self.config['content_size_limit'] missing_contents = set( self.storage.content_missing( self.get_content_ids(), ALGO ) ) for oid in missing_contents: file_path, rev = self.blob_hash_to_file_rev[oid] data = self.repo.cat([file_path], rev) yield converters.blob_to_content_dict( data, max_content_size, self.log, self.origin_id ) def has_directories(self): """Checks whether we need to load directories""" # if we have any revs, we must also have dirs return self.has_revisions() def get_directories(self): """Get the directories that need to be loaded""" missing_dirs = set(self.storage.directory_missing( sorted(self.unique_trees.keys()) )) for dir_hash in missing_dirs: yield self.unique_trees[dir_hash] def has_revisions(self): """Checks whether we need to load revisions""" self.num_revisions = int(self.repo.tip()[0]) + 1 return self.num_revisions > 0 def update_tree_from_rev(self, tree, rev, only_these_files=None): """Iterates over changes in a revision and adds corresponding SimpleBlobs to a SimpleTree""" if rev >= 0: manifest = {k[4]: k for k in self.repo.manifest(rev=rev)} loop_keys = only_these_files or manifest.keys() for f in loop_keys: node_hash = manifest[f][0] file_mode = manifest[f][1] file_hash, _ = self.get_node_file_if_new(f, rev, node_hash) tree.add_blob(f, file_hash, file_mode) return tree def reconstruct_tree(self, directory): """Converts a flat directory into nested SimpleTrees.""" # This method exists because the code was already written to use # SimpleTree before then reducing memory use and converting to the # canonical format. A refactor using lookups instead of nesting could # obviate the need. new_tree = SimpleTree() for entry in directory['entries']: tgt = entry['target'] perms = entry['perms'] name = entry['name'] if tgt in self.unique_trees: # subtree new_tree[name] = self.reconstruct_tree(self.unique_trees[tgt]) else: # blob new_tree[name] = SimpleBlob(tgt, perms) new_tree.hash = directory['id'] return new_tree def collapse_tree(self, tree): """Converts nested SimpleTrees into multiple flat directories.""" # This method exists because the code was already written to use # SimpleTree before then reducing memory use and converting to the # canonical format. A refactor using lookups instead of nesting could # obviate the need. directory = { 'entries': [ { 'name': k, 'perms': v.mode, 'type': v.kind, 'target': (isinstance(v, SimpleBlob) and v.hash or self.collapse_tree(v)) } for k, v in tree.items() ] } tree.hash = identifiers.directory_identifier(directory) directory['id'] = tree.hash self.unique_trees[tree.hash] = directory return tree.hash def get_revision_ids(self): """Get the revisions that need to be loaded""" self.unique_trees = {} commit_tree = None for li in self.iter_changelog(): c = self.repo[li[1]] rev = c.rev() # start from the parent state p1 = c.p1().rev() if p1 in self.commit_trees: if p1 != rev-1: # Most of the time, a revision will inherit from the # previous one. In those cases we can reuse commit_tree, # otherwise build a new one here. parent_tree = self.unique_trees[self.commit_trees[p1]] commit_tree = self.reconstruct_tree(parent_tree) else: commit_tree = self.update_tree_from_rev(SimpleTree(), p1) # remove whatever is removed for f in c.removed(): commit_tree.remove_tree_node_for_path(f) # update whatever is updated self.update_tree_from_rev(commit_tree, rev, c.added()+c.modified()) self.commit_trees[rev] = self.collapse_tree(commit_tree) date_dict = identifiers.normalize_timestamp( int(c.date().timestamp()) ) author_dict = converters.parse_author(c.author()) revision = { 'author': author_dict, 'date': date_dict, 'committer': author_dict, 'committer_date': date_dict, 'type': 'hg', 'directory': commit_tree.hash, 'message': c.description(), 'metadata': { 'extra_headers': [ ['phase', c.phase()], ['rev', rev], ['hidden', c.hidden()] ] }, 'synthetic': False, 'parents': [ self.revisions[p.node()]['id'] for p in c.parents() if p.rev() >= 0 ] } revision['id'] = identifiers.revision_identifier(revision) self.revisions[c.node()] = revision for n, r in self.revisions.items(): yield {'node': n, 'id': r['id']} def get_revisions(self): """Get the revision identifiers from the repository""" revs = {r['id']: r['node'] for r in self.get_revision_ids()} missing_revs = set(self.storage.revision_missing(revs.keys())) for r in missing_revs: yield self.revisions[revs[r]] def has_releases(self): """Checks whether we need to load releases""" self.num_releases = len([t for t in self.repo.tags() if not t[3]]) return self.num_releases > 0 def get_releases(self): """Get the releases that need to be loaded""" releases = {} for t in self.repo.tags(): islocal = t[3] name = t[0] if (name != b'tip' and not islocal): short_hash = t[2] target = self.revisions[self.repo[short_hash].node()]['id'] release = { 'name': name, 'target': target, 'target_type': 'revision', 'message': None, 'metadata': None, 'synthetic': False, 'author': None, 'date': None } id_hash = identifiers.release_identifier(release) release['id'] = id_hash releases[id_hash] = release missing_rels = set(self.storage.release_missing( sorted(releases.keys()) )) yield from (releases[r] for r in missing_rels) def has_occurrences(self): """Checks whether we need to load occurrences""" self.num_occurrences = len( self.repo.tags() + self.repo.branches() + self.repo.bookmarks()[0] ) return self.num_occurrences > 0 def get_occurrences(self): """Get the occurrences that need to be loaded""" for t in ( self.repo.tags() + self.repo.branches() + self.repo.bookmarks()[0] ): name = t[0] short_hash = t[2] target = self.revisions[self.repo[short_hash].node()]['id'] yield { 'branch': name, 'origin': self.origin_id, 'target': target, 'target_type': 'revision', 'visit': self.visit, } def get_fetch_history_result(self): """Return the data to store in fetch_history for the current loader""" return { 'contents': self.num_contents, 'directories': len(self.unique_trees), 'revisions': self.num_revisions, 'releases': self.num_releases, 'occurrences': self.num_occurrences, } def save_data(self): """We already have the data locally, no need to save it""" pass def eventful(self): """Whether the load was eventful""" return True class HgLoaderFromArchive(HgLoader): """Load an HG repository from a compressed archive. """ - def prepare(self, origin_url, archive_path, fetch_date): + def prepare(self, origin_url, archive_path, visit_date): tmpdir = tmp_extract(archive_path, tmpdir_prefix='swh.loader.hg.', log=self.log, source=origin_url) - super().prepare(origin_url, tmpdir.name, fetch_date) + super().prepare(origin_url, tmpdir.name, visit_date) if __name__ == '__main__': import logging import sys logging.basicConfig( level=logging.DEBUG, format='%(asctime)s %(process)d %(message)s' ) loader = HgLoader() origin_url = sys.argv[1] directory = sys.argv[2] - fetch_date = datetime.datetime.now(tz=datetime.timezone.utc) + visit_date = datetime.datetime.now(tz=datetime.timezone.utc) - print(loader.load(origin_url, directory, fetch_date)) + print(loader.load(origin_url, directory, visit_date)) diff --git a/swh/loader/mercurial/tasks.py b/swh/loader/mercurial/tasks.py index 09403c8..7bff11c 100644 --- a/swh/loader/mercurial/tasks.py +++ b/swh/loader/mercurial/tasks.py @@ -1,27 +1,27 @@ # Copyright (C) 2017 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from swh.scheduler.task import Task from .bundle20_loader import HgBundle20Loader class LoadMercurialTsk(Task): """Mercurial repository loading """ task_queue = 'swh_loader_mercurial' - def run_task(self, *, origin_url, directory, fetch_date): + def run_task(self, *, origin_url, directory, visit_date): """Import a mercurial tarball into swh. Args: see :func:`DepositLoader.load`. """ loader = HgBundle20Loader() loader.log = self.log loader.load(origin_url=origin_url, directory=directory, - fetch_date=fetch_date) + visit_date=visit_date)