diff --git a/debian/control b/debian/control index 642dfcf..8917b97 100644 --- a/debian/control +++ b/debian/control @@ -1,37 +1,37 @@ Source: swh-loader-mercurial Maintainer: Software Heritage developers Section: python Priority: optional Build-Depends: debhelper (>= 9), dh-python (>= 2), python3-all, python3-dateutil, python3-nose, python3-setuptools, python3-vcversioner, python3-retrying, python3-hglib, patool, python3-swh.core (>= 0.0.36~), - python3-swh.model (>= 0.0.20~), + python3-swh.model (>= 0.0.27~), python3-swh.storage (>= 0.0.95~), python3-swh.scheduler (>= 0.0.19~), python3-swh.loader.core (>= 0.0.33~), python-sqlitedict Standards-Version: 3.9.6 Homepage: https://forge.softwareheritage.org/source/swh-loader-mercurial/ Package: python3-swh.loader.mercurial Architecture: all Depends: python3-swh.core (>= 0.0.36~), python3-swh.loader.core (>= 0.0.33~), - python3-swh.model (>= 0.0.20~), + python3-swh.model (>= 0.0.27~), python3-swh.storage (>= 0.0.95~), python3-swh.scheduler (>= 0.0.19~), patool, python-sqlitedict, python3-hglib, ${misc:Depends}, ${python3:Depends} Description: Software Heritage Mercurial Loader Module in charge of loading hg/mercurial repositories into swh storage. diff --git a/requirements-swh.txt b/requirements-swh.txt index bf7d5cf..db44d1e 100644 --- a/requirements-swh.txt +++ b/requirements-swh.txt @@ -1,5 +1,5 @@ swh.core >= 0.0.36 -swh.model >= 0.0.20 +swh.model >= 0.0.27 swh.storage >= 0.0.95 swh.scheduler >= 0.0.19 swh.loader.core >= 0.0.33 diff --git a/swh/loader/mercurial/bundle20_loader.py b/swh/loader/mercurial/bundle20_loader.py index 3ad1a41..e26b817 100644 --- a/swh/loader/mercurial/bundle20_loader.py +++ b/swh/loader/mercurial/bundle20_loader.py @@ -1,522 +1,526 @@ # Copyright (C) 2017-2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information """This document contains a SWH loader for ingesting repository data from Mercurial version 2 bundle files. """ # NOTE: The code here does expensive work twice in places because of the # intermediate need to check for what is missing before sending to the database # and the desire to not juggle very large amounts of data. # TODO: Decide whether to also serialize to disk and read back more quickly # from there. Maybe only for very large repos and fast drives. # - Avi import datetime import hglib import os import random import re from dateutil import parser from shutil import rmtree from tempfile import mkdtemp -from swh.model import hashutil, identifiers +from swh.model import identifiers +from swh.model.hashutil import ( + MultiHash, hash_to_hex, hash_to_bytes, + DEFAULT_ALGORITHMS +) from swh.loader.core.loader import SWHStatelessLoader from swh.loader.core.converters import content_for_storage from swh.loader.core.utils import clean_dangling_folders from . import converters from .archive_extract import tmp_extract from .bundle20_reader import Bundle20Reader from .converters import PRIMARY_ALGO as ALGO from .objects import SelectiveCache, SimpleTree TAG_PATTERN = re.compile('[0-9A-Fa-f]{40}') TEMPORARY_DIR_PREFIX_PATTERN = 'swh.loader.mercurial.' class HgBundle20Loader(SWHStatelessLoader): """Mercurial loader able to deal with remote or local repository. """ CONFIG_BASE_FILENAME = 'loader/hg' ADDITIONAL_CONFIG = { 'bundle_filename': ('str', 'HG20_none_bundle'), 'reduce_effort': ('bool', True), # default: Try to be smart about time 'temp_directory': ('str', '/tmp'), 'cache1_size': ('int', 800*1024*1024), 'cache2_size': ('int', 800*1024*1024), } def __init__(self, logging_class='swh.loader.mercurial.Bundle20Loader'): super().__init__(logging_class=logging_class) self.content_max_size_limit = self.config['content_size_limit'] self.bundle_filename = self.config['bundle_filename'] self.reduce_effort_flag = self.config['reduce_effort'] self.empty_repository = None self.temp_directory = self.config['temp_directory'] self.cache1_size = self.config['cache1_size'] self.cache2_size = self.config['cache2_size'] self.working_directory = None self.bundle_path = None def pre_cleanup(self): """Cleanup potential dangling files from prior runs (e.g. OOM killed tasks) """ clean_dangling_folders(self.temp_directory, pattern_check=TEMPORARY_DIR_PREFIX_PATTERN, log=self.log) def cleanup(self): """Clean temporary working directory """ if self.bundle_path and os.path.exists(self.bundle_path): self.log.debug('Cleanup up working bundle %s' % self.bundle_path) os.unlink(self.bundle_path) if self.working_directory and os.path.exists(self.working_directory): self.log.debug('Cleanup up working directory %s' % ( self.working_directory, )) rmtree(self.working_directory) def get_heads(self, repo): """Read the closed branches heads (branch, bookmarks) and returns a dict with branch_name (bytes) and mercurial's node id (bytes). Those needs conversion to swh-ids. This is taken care of in get_revisions. """ b = {} for _, node_hash_id, _, branch_name, *_ in repo.heads(): - b[branch_name] = hashutil.hash_to_bytes( + b[branch_name] = hash_to_bytes( node_hash_id.decode()) bookmarks = repo.bookmarks() if bookmarks and bookmarks[0]: for bookmark_name, _, target_short in bookmarks[0]: target = repo[target_short].node() - b[bookmark_name] = hashutil.hash_to_bytes( - target.decode()) + b[bookmark_name] = hash_to_bytes(target.decode()) return b def prepare_origin_visit(self, *, origin_url, visit_date, **kwargs): self.origin_url = origin_url self.origin = {'url': self.origin_url, 'type': 'hg'} if isinstance(visit_date, str): # visit_date can be string or datetime visit_date = parser.parse(visit_date) self.visit_date = visit_date def prepare(self, *, origin_url, visit_date, directory=None): """Prepare the necessary steps to load an actual remote or local repository. To load a local repository, pass the optional directory parameter as filled with a path to a real local folder. To load a remote repository, pass the optional directory parameter as None. Args: origin_url (str): Origin url to load visit_date (str/datetime): Date of the visit directory (str/None): The local directory to load """ self.branches = {} self.tags = [] self.releases = {} self.node_2_rev = {} if not directory: # remote repository self.working_directory = mkdtemp( prefix=TEMPORARY_DIR_PREFIX_PATTERN, suffix='-%s' % os.getpid(), dir=self.temp_directory) os.makedirs(self.working_directory, exist_ok=True) self.hgdir = self.working_directory self.log.debug('Cloning %s to %s' % ( self.origin['url'], self.hgdir)) hglib.clone(source=self.origin['url'], dest=self.hgdir) else: # local repository self.working_directory = None self.hgdir = directory self.bundle_path = os.path.join(self.hgdir, self.bundle_filename) self.log.debug('Bundling at %s' % self.bundle_path) with hglib.open(self.hgdir) as repo: self.heads = self.get_heads(repo) repo.bundle(bytes(self.bundle_path, 'utf-8'), all=True, type=b'none-v2') self.cache_filename1 = os.path.join( self.hgdir, 'swh-cache-1-%s' % ( hex(random.randint(0, 0xffffff))[2:], )) self.cache_filename2 = os.path.join( self.hgdir, 'swh-cache-2-%s' % ( hex(random.randint(0, 0xffffff))[2:], )) try: self.br = Bundle20Reader(bundlefile=self.bundle_path, cache_filename=self.cache_filename1, cache_size=self.cache1_size) except FileNotFoundError as e: # Empty repository! Still a successful visit targeting an # empty snapshot self.log.warn('%s is an empty repository!' % self.hgdir) self.empty_repository = True else: self.reduce_effort = set() if self.reduce_effort_flag: now = datetime.datetime.now(tz=datetime.timezone.utc) if (now - self.visit_date).days > 1: # Assuming that self.visit_date would be today for # a new visit, treat older visit dates as # indication of wanting to skip some processing # effort. for header, commit in self.br.yield_all_changesets(): ts = commit['time'].timestamp() if ts < self.visit_date.timestamp(): self.reduce_effort.add(header['node']) def has_contents(self): return not self.empty_repository def has_directories(self): return not self.empty_repository def has_revisions(self): return not self.empty_repository def has_releases(self): return not self.empty_repository def fetch_data(self): """Fetch the data from the data source.""" pass def get_contents(self): """Get the contents that need to be loaded.""" # NOTE: This method generates blobs twice to reduce memory usage # without generating disk writes. self.file_node_to_hash = {} hash_to_info = {} self.num_contents = 0 contents = {} missing_contents = set() for blob, node_info in self.br.yield_all_blobs(): self.num_contents += 1 file_name = node_info[0] header = node_info[2] + length = len(blob) if header['linknode'] in self.reduce_effort: - content = hashutil.hash_data(blob, algorithms=[ALGO], - with_length=True) + algorithms = [ALGO] else: - content = hashutil.hash_data(blob, with_length=True) - + algorithms = DEFAULT_ALGORITHMS + h = MultiHash.from_data(blob, hash_names=algorithms, length=length) + content = h.digest() + content['length'] = length blob_hash = content[ALGO] self.file_node_to_hash[header['node']] = blob_hash if header['linknode'] in self.reduce_effort: continue hash_to_info[blob_hash] = node_info contents[blob_hash] = content missing_contents.add(blob_hash) if file_name == b'.hgtags': # https://www.mercurial-scm.org/wiki/GitConcepts#Tag_model # overwrite until the last one self.tags = (t for t in blob.split(b'\n') if t != b'') if contents: missing_contents = set( self.storage.content_missing( list(contents.values()), key_hash=ALGO ) ) # Clusters needed blobs by file offset and then only fetches the # groups at the needed offsets. focs = {} # "file/offset/contents" for blob_hash in missing_contents: _, file_offset, header = hash_to_info[blob_hash] focs.setdefault(file_offset, {}) focs[file_offset][header['node']] = blob_hash hash_to_info = None for offset, node_hashes in sorted(focs.items()): for header, data, *_ in self.br.yield_group_objects( group_offset=offset ): node = header['node'] if node in node_hashes: blob, meta = self.br.extract_meta_from_blob(data) content = contents.pop(node_hashes[node], None) if content: content['data'] = blob - content['length'] = len(blob) yield content_for_storage( content, log=self.log, max_content_size=self.content_max_size_limit, origin_id=self.origin_id ) def load_directories(self): """This is where the work is done to convert manifest deltas from the repository bundle into SWH directories. """ self.mnode_to_tree_id = {} cache_hints = self.br.build_manifest_hints() def tree_size(t): return t.size() self.trees = SelectiveCache(cache_hints=cache_hints, size_function=tree_size, filename=self.cache_filename2, max_size=self.cache2_size) tree = SimpleTree() for header, added, removed in self.br.yield_all_manifest_deltas( cache_hints ): node = header['node'] basenode = header['basenode'] tree = self.trees.fetch(basenode) or tree # working tree for path in removed.keys(): tree = tree.remove_tree_node_for_path(path) for path, info in added.items(): file_node, is_symlink, perms_code = info tree = tree.add_blob( path, self.file_node_to_hash[file_node], is_symlink, perms_code ) if header['linknode'] in self.reduce_effort: self.trees.store(node, tree) else: new_dirs = [] self.mnode_to_tree_id[node] = tree.hash_changed(new_dirs) self.trees.store(node, tree) yield header, tree, new_dirs def get_directories(self): """Compute directories to load """ dirs = {} self.num_directories = 0 for _, _, new_dirs in self.load_directories(): for d in new_dirs: self.num_directories += 1 dirs[d['id']] = d missing_dirs = list(dirs.keys()) if missing_dirs: missing_dirs = self.storage.directory_missing(missing_dirs) for _id in missing_dirs: yield dirs[_id] dirs = {} def get_revisions(self): """Compute revisions to load """ revisions = {} self.num_revisions = 0 for header, commit in self.br.yield_all_changesets(): if header['node'] in self.reduce_effort: continue self.num_revisions += 1 date_dict = identifiers.normalize_timestamp( int(commit['time'].timestamp()) ) author_dict = converters.parse_author(commit['user']) if commit['manifest'] == Bundle20Reader.NAUGHT_NODE: directory_id = SimpleTree().hash_changed() else: directory_id = self.mnode_to_tree_id[commit['manifest']] extra_meta = [] extra = commit.get('extra') if extra: for e in extra.split(b'\x00'): k, v = e.split(b':', 1) k = k.decode('utf-8') extra_meta.append([k, v]) revision = { 'author': author_dict, 'date': date_dict, 'committer': author_dict, 'committer_date': date_dict, 'type': 'hg', 'directory': directory_id, 'message': commit['message'], 'metadata': { - 'node': hashutil.hash_to_hex(header['node']), + 'node': hash_to_hex(header['node']), 'extra_headers': [ ['time_offset_seconds', str(commit['time_offset_seconds']).encode('utf-8')], ] + extra_meta }, 'synthetic': False, 'parents': [] } p1 = self.node_2_rev.get(header['p1']) p2 = self.node_2_rev.get(header['p2']) if p1: revision['parents'].append(p1) if p2: revision['parents'].append(p2) - revision['id'] = hashutil.hash_to_bytes( + revision['id'] = hash_to_bytes( identifiers.revision_identifier(revision) ) self.node_2_rev[header['node']] = revision['id'] revisions[revision['id']] = revision # Converts heads to use swh ids self.heads = { branch_name: self.node_2_rev[node_id] for branch_name, node_id in self.heads.items() } missing_revs = revisions.keys() if missing_revs: missing_revs = set( self.storage.revision_missing(list(missing_revs)) ) for r in missing_revs: yield revisions[r] self.mnode_to_tree_id = None def _read_tag(self, tag, split_byte=b' '): node, *name = tag.split(split_byte) name = split_byte.join(name) return node, name def get_releases(self): """Get the releases that need to be loaded.""" self.num_releases = 0 releases = {} missing_releases = [] for t in self.tags: self.num_releases += 1 node, name = self._read_tag(t) node = node.decode() - node_bytes = hashutil.hash_to_bytes(node) + node_bytes = hash_to_bytes(node) if not TAG_PATTERN.match(node): self.log.warn('Wrong pattern (%s) found in tags. Skipping' % ( node, )) continue if node_bytes not in self.node_2_rev: self.log.warn('No matching revision for tag %s ' '(hg changeset: %s). Skipping' % (name.decode(), node)) continue tgt_rev = self.node_2_rev[node_bytes] release = { 'name': name, 'target': tgt_rev, 'target_type': 'revision', 'message': None, 'metadata': None, 'synthetic': False, 'author': {'name': None, 'email': None, 'fullname': b''}, 'date': None } - id_hash = hashutil.hash_to_bytes( + id_hash = hash_to_bytes( identifiers.release_identifier(release)) release['id'] = id_hash missing_releases.append(id_hash) releases[id_hash] = release self.releases[name] = id_hash if missing_releases: missing_releases = set( self.storage.release_missing(missing_releases)) for _id in missing_releases: yield releases[_id] def get_snapshot(self): """Get the snapshot that need to be loaded.""" branches = {} for name, target in self.heads.items(): branches[name] = {'target': target, 'target_type': 'revision'} for name, target in self.releases.items(): branches[name] = {'target': target, 'target_type': 'release'} snap = { 'id': None, 'branches': branches, } snap['id'] = identifiers.identifier_to_bytes( identifiers.snapshot_identifier(snap)) return snap def get_fetch_history_result(self): """Return the data to store in fetch_history.""" return { 'contents': self.num_contents, 'directories': self.num_directories, 'revisions': self.num_revisions, 'releases': self.num_releases, } class HgArchiveBundle20Loader(HgBundle20Loader): """Mercurial loader for repository wrapped within archives. """ def __init__(self): super().__init__( logging_class='swh.loader.mercurial.HgArchiveBundle20Loader') self.temp_dir = None def prepare(self, *, origin_url, archive_path, visit_date): self.temp_dir = tmp_extract(archive=archive_path, dir=self.temp_directory, prefix=TEMPORARY_DIR_PREFIX_PATTERN, suffix='.dump-%s' % os.getpid(), log=self.log, source=origin_url) repo_name = os.listdir(self.temp_dir)[0] directory = os.path.join(self.temp_dir, repo_name) super().prepare(origin_url=origin_url, visit_date=visit_date, directory=directory) def cleanup(self): if self.temp_dir and os.path.exists(self.temp_dir): rmtree(self.temp_dir) super().cleanup() diff --git a/swh/loader/mercurial/bundle20_loader_verifier.py b/swh/loader/mercurial/bundle20_loader_verifier.py index ca8f308..111c41e 100644 --- a/swh/loader/mercurial/bundle20_loader_verifier.py +++ b/swh/loader/mercurial/bundle20_loader_verifier.py @@ -1,254 +1,255 @@ -# Copyright (C) 2017 The Software Heritage developers +# Copyright (C) 2017-2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import code import datetime import hglib import os import random import sys import time from binascii import hexlify, unhexlify -from swh.model import hashutil +from swh.model.hashutil import MultiHash from .bundle20_loader import HgBundle20Loader from .converters import PRIMARY_ALGO as ALGO from .objects import SimpleTree class HgLoaderValidater(HgBundle20Loader): def generate_all_blobs(self, validate=True, frequency=1): print('GENERATING BLOBS') i = 0 start = time.time() u = set() for blob, node_info in self.br.yield_all_blobs(): filename = node_info[0] header = node_info[2] i += 1 - bhash = hashutil.hash_data(blob, algorithms=set([ALGO]))[ALGO] + hashes = MultiHash.from_data(blob, hash_names=set([ALGO])).digest() + bhash = hashes[ALGO] self.file_node_to_hash[header['node']] = bhash u.update([bhash]) if validate: if random.random() < frequency: self.validate_blob(filename, header, blob) if i % 10000 == 0: print(i) print('') print('FOUND', i, 'BLOBS') print('FOUND', len(u), 'UNIQUE BLOBS') print('ELAPSED', time.time()-start) def validate_blob(self, filename, header, blob): if not self.hg: self.hg = hglib.open(self.hgdir) data = bytes(blob) filepath = os.path.join(self.hg.root(), bytes(filename)) linknode = hexlify(header['linknode']) cat_contents = self.hg.cat([filepath], rev=linknode) if cat_contents != data: print('INTERNAL ERROR ERROR ERROR ERROR') print(filename) print(header) print('-----') print(cat_contents) print('---- vs ----') print(data) code.interact(local=dict(globals(), **locals())) quit() else: print('v', end='') def generate_all_trees(self, validate=True, frequency=1): print('GENERATING MANIFEST TREES') c = 0 n = 0 u = set() start = time.time() validated = 0 for header, tree, new_dirs in self.load_directories(): if validate and (c >= validated) and (random.random() < frequency): self.validate_tree(tree, header, c) for d in new_dirs: u.add(d['id']) c += 1 n += len(new_dirs) print('.', end='') if c % 20 == 0: sys.stdout.flush() if c % 10000 == 0: print(c) print('') print('FOUND', c, 'COMMIT MANIFESTS') print('FOUND', n, 'NEW DIRS') print('FOUND', len(u), 'UNIQUE DIRS') print('ELAPSED', time.time()-start) def validate_tree(self, tree, header, i): if not self.hg: self.hg = hglib.open(self.hgdir) commit_id = header['linknode'] if len(commit_id) == 20: commit_id = hexlify(commit_id) base_tree = SimpleTree() base_files = list(self.hg.manifest(rev=commit_id)) bfiles = sorted([f[4] for f in base_files]) for p in base_files: base_tree.add_blob( p[4], self.file_node_to_hash[unhexlify(p[0])], p[3], p[1] ) base_tree.hash_changed() files = sorted(list(tree.flatten().keys())) if tree != base_tree: print('validating rev:', i, 'commit:', commit_id) print('validating files:', len(files), len(base_files)) print(' INVALID TREE') def so1(a): keys = [k['name'] for k in a['entries']] return b''.join(sorted(keys)) tree_dirs = [d for d in tree.yield_swh_directories()] base_dirs = [d for d in base_tree.yield_swh_directories()] tree_dirs.sort(key=so1) base_dirs.sort(key=so1) # for i in range(len(tree_dirs)): # if tree_dirs[i] != base_dirs[i]: # print(i) # code.interact(local=dict(globals(), **locals())) print('Program will quit after your next Ctrl-D') code.interact(local=dict(globals(), **locals())) quit() else: print('v', end='') def generate_all_commits(self, validate=True, frequency=1): i = 0 start = time.time() for rev in self.get_revisions(): print('.', end='') i += 1 if i % 20 == 0: sys.stdout.flush() print('') print('FOUND', i, 'COMMITS') print('ELAPSED', time.time()-start) def runtest(self, hgdir, validate_blobs=False, validate_trees=False, frequency=1.0, test_iterative=False): """ HgLoaderValidater().runtest('/home/avi/SWH/mozilla-unified') """ self.origin_id = 'test' dt = datetime.datetime.now(tz=datetime.timezone.utc) if test_iterative: dt = dt - datetime.timedelta(10) hgrepo = None if (hgdir.lower().startswith('http:') or hgdir.lower().startswith('https:')): hgrepo, hgdir = hgdir, hgrepo self.hgdir = hgdir try: print('preparing') self.prepare(hgrepo, dt, hgdir) self.file_node_to_hash = {} # self.generate_all_blobs(validate=validate_blobs, # frequency=frequency) # self.generate_all_trees(validate=validate_trees, frequency=frequency) # self.generate_all_commits() print('getting contents') cs = 0 for c in self.get_contents(): cs += 1 pass print('getting directories') ds = 0 for d in self.get_directories(): ds += 1 pass revs = 0 print('getting revisions') for rev in self.get_revisions(): revs += 1 pass print('getting releases') rels = 0 for rel in self.get_releases(): rels += 1 print(rel) self.visit = 'foo' print('getting snapshot') o = self.get_snapshot() print(o['branches'].keys()) finally: self.cleanup() print('final count: ', 'cs', cs, 'ds', ds, 'revs', revs, 'rels', rels) def main(): if len(sys.argv) > 1: test_repo = sys.argv[1] else: print('Please pass in the path to an HG repository.') quit() while test_repo[-1] == '/': test_repo = test_repo[:-1] if len(sys.argv) > 2: validate_frequency = float(sys.argv[2]) else: validate_frequency = 0.001 if len(sys.argv) > 3: test_iterative = True else: test_iterative = False HgLoaderValidater().runtest(test_repo, True, True, validate_frequency, test_iterative) if __name__ == '__main__': main() diff --git a/swh/loader/mercurial/slow_loader.py b/swh/loader/mercurial/slow_loader.py index a2d53e4..90740fe 100644 --- a/swh/loader/mercurial/slow_loader.py +++ b/swh/loader/mercurial/slow_loader.py @@ -1,469 +1,471 @@ # Copyright (C) 2017-2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information # WARNING WARNING WARNING WARNING # hglib is too slow to be super useful. Unfortunately it's also the only # python3 library for mercurial as of this writing. - Avi import datetime import hglib import os -from swh.model import identifiers, hashutil +from swh.model import identifiers +from swh.model.hashutil import MultiHash, DEFAULT_ALGORITHMS, hash_to_hex from swh.loader.core.loader import SWHStatelessLoader from .converters import parse_author, PRIMARY_ALGO as ALGO OS_PATH_SEP = os.path.sep.encode('utf-8') def data_to_content_id(data): size = len(data) ret = { 'length': size, } ret.update(identifiers.content_identifier({'data': data})) return ret def blob_to_content_dict(data, existing_hashes=None, max_size=None, logger=None): """Convert blob data to a SWH Content. If the blob already has hashes computed, don't recompute them. TODO: This should be unified with similar functions in other places. args: existing_hashes: dict of hash algorithm:value pairs max_size: size over which blobs should be rejected logger: logging class instance returns: A Software Heritage "content". """ existing_hashes = existing_hashes or {} size = len(data) content = { 'length': size, } content.update(existing_hashes) hash_types = list(existing_hashes.keys()) - hashes_to_do = hashutil.DEFAULT_ALGORITHMS.difference(hash_types) - content.update(hashutil.hash_data(data, algorithms=hashes_to_do)) + hashes_to_do = DEFAULT_ALGORITHMS.difference(hash_types) + hashes = MultiHash.from_data(data, hash_names=hashes_to_do).digest() + content.update(hashes) if max_size and (size > max_size): content.update({ 'status': 'absent', 'reason': 'Content too large', }) if logger: - id_hash = hashutil.hash_to_hex(content[ALGO]) + id_hash = hash_to_hex(content[ALGO]) logger.info( 'Skipping content %s, too large (%s > %s)' % (id_hash, size, max_size), extra={ 'swh_type': 'loader_content_skip', 'swh_id': id_hash, 'swh_size': size } ) else: content.update({'data': data, 'status': 'visible'}) return content class SimpleBlob: """ Stores basic metadata for a blob object. """ kind = 'file' def __init__(self, file_hash, file_mode): self.hash = file_hash if not isinstance(file_mode, int): self.mode = 0o100000 + int(file_mode, 8) else: self.mode = file_mode class SimpleTree(dict): """ Stores metadata for a nested 'tree'-like object. """ kind = 'dir' mode = 0o040000 def add_tree_node_for_path(self, path): """Deeply nests SimpleTrees according to a directory path and returns a cursor to the deepest one""" node = self for d in path.split(OS_PATH_SEP): node = node.setdefault(d, SimpleTree()) return node def remove_tree_node_for_path(self, path): """Deletes a SimpleBlob from inside nested SimpleTrees according to the given file path""" first, sep, rest = path.partition(OS_PATH_SEP) if rest: self[first].remove_tree_node_for_path(rest) if not self.get(first): del self[first] else: del self[first] def add_blob(self, file_path, file_hash, file_mode): """Deeply nests a SimpleBlob inside nested SimpleTrees according to the given file path""" fdir = os.path.dirname(file_path) fbase = os.path.basename(file_path) if fdir: node = self.add_tree_node_for_path(fdir) else: node = self node[fbase] = SimpleBlob(file_hash, file_mode) class HgLoader(SWHStatelessLoader): """Load a mercurial repository from a directory. """ CONFIG_BASE_FILENAME = 'loader/hg' def __init__(self, logging_class='swh.loader.mercurial.HgLoader'): super().__init__(logging_class=logging_class) def prepare_origin_visit(self, origin_url, directory, visit_date): self.origin = { 'type': 'hg', 'url': origin_url } self.visit_date = visit_date def prepare(self, origin_url, directory, visit_date): """see base.BaseLoader.prepare""" self.repo = hglib.open(directory) self.node_to_blob_hash = {} self.blob_hash_to_file_rev = {} self.commit_trees = {} self.unique_trees = {} self.revisions = {} def fetch_data(self): """Fetch the data from the data source""" pass def has_contents(self): """Checks whether we need to load contents""" # if we have any revisions, then obviously we have contents. return self.has_revisions() def iter_changelog(self): """Iterate over the repository log""" yield from self.repo.log('0:tip', removed=True) def get_node_file_if_new(self, f, rev, node_hash): """Load a blob from disk""" # Fast if the node hash is already cached. Somehow this shortcuts a # meaningful but not huge percentage of the loads for a repository. if node_hash not in self.node_to_blob_hash: file_path = os.path.join(self.repo.root(), f) data = self.repo.cat([file_path], rev) blob_hash = identifiers.content_identifier( {'data': data} )[ALGO] self.node_to_blob_hash[node_hash] = blob_hash if blob_hash not in self.blob_hash_to_file_rev: # new blob self.blob_hash_to_file_rev[blob_hash] = (file_path, rev) return blob_hash, data return self.node_to_blob_hash[node_hash], None def get_content_ids(self): """Get all the contents, but trim away the actual data""" self.node_to_blob_hash = {} self.blob_hash_to_file_rev = {} self.num_contents = 0 for li in self.iter_changelog(): c = self.repo[li] rev = c.rev() manifest = c.manifest() for f in c.added() + c.modified(): node_hash = manifest[f] blob_hash, data = self.get_node_file_if_new(f, rev, node_hash) if data is not None: # new blob self.num_contents += 1 yield data_to_content_id(data) def get_contents(self): """Get the contents that need to be loaded""" # This method unfortunately loads and hashes the blobs twice. max_content_size = self.config['content_size_limit'] missing_contents = set( self.storage.content_missing( self.get_content_ids(), ALGO ) ) for oid in missing_contents: file_path, rev = self.blob_hash_to_file_rev[oid] data = self.repo.cat([file_path], rev) yield blob_to_content_dict( data, max_size=max_content_size, logger=self.log ) def has_directories(self): """Checks whether we need to load directories""" # if we have any revs, we must also have dirs return self.has_revisions() def get_directories(self): """Get the directories that need to be loaded""" missing_dirs = set(self.storage.directory_missing( sorted(self.unique_trees.keys()) )) for dir_hash in missing_dirs: yield self.unique_trees[dir_hash] def has_revisions(self): """Checks whether we need to load revisions""" self.num_revisions = int(self.repo.tip()[0]) + 1 return self.num_revisions > 0 def update_tree_from_rev(self, tree, rev, only_these_files=None): """Iterates over changes in a revision and adds corresponding SimpleBlobs to a SimpleTree""" if rev >= 0: manifest = {k[4]: k for k in self.repo.manifest(rev=rev)} loop_keys = only_these_files or manifest.keys() for f in loop_keys: node_hash = manifest[f][0] file_mode = manifest[f][1] file_hash, _ = self.get_node_file_if_new(f, rev, node_hash) tree.add_blob(f, file_hash, file_mode) return tree def reconstruct_tree(self, directory): """Converts a flat directory into nested SimpleTrees.""" # This method exists because the code was already written to use # SimpleTree before then reducing memory use and converting to the # canonical format. A refactor using lookups instead of nesting could # obviate the need. new_tree = SimpleTree() for entry in directory['entries']: tgt = entry['target'] perms = entry['perms'] name = entry['name'] if tgt in self.unique_trees: # subtree new_tree[name] = self.reconstruct_tree(self.unique_trees[tgt]) else: # blob new_tree[name] = SimpleBlob(tgt, perms) new_tree.hash = directory['id'] return new_tree def collapse_tree(self, tree): """Converts nested SimpleTrees into multiple flat directories.""" # This method exists because the code was already written to use # SimpleTree before then reducing memory use and converting to the # canonical format. A refactor using lookups instead of nesting could # obviate the need. directory = { 'entries': [ { 'name': k, 'perms': v.mode, 'type': v.kind, 'target': (isinstance(v, SimpleBlob) and v.hash or self.collapse_tree(v)) } for k, v in tree.items() ] } tree.hash = identifiers.directory_identifier(directory) directory['id'] = tree.hash self.unique_trees[tree.hash] = directory return tree.hash def get_revision_ids(self): """Get the revisions that need to be loaded""" self.unique_trees = {} commit_tree = None for li in self.iter_changelog(): c = self.repo[li[1]] rev = c.rev() # start from the parent state p1 = c.p1().rev() if p1 in self.commit_trees: if p1 != rev-1: # Most of the time, a revision will inherit from the # previous one. In those cases we can reuse commit_tree, # otherwise build a new one here. parent_tree = self.unique_trees[self.commit_trees[p1]] commit_tree = self.reconstruct_tree(parent_tree) else: commit_tree = self.update_tree_from_rev(SimpleTree(), p1) # remove whatever is removed for f in c.removed(): commit_tree.remove_tree_node_for_path(f) # update whatever is updated self.update_tree_from_rev(commit_tree, rev, c.added()+c.modified()) self.commit_trees[rev] = self.collapse_tree(commit_tree) date_dict = identifiers.normalize_timestamp( int(c.date().timestamp()) ) author_dict = parse_author(c.author()) parents = [] for p in c.parents(): if p.rev() >= 0: parents.append(self.revisions[p.node()]['id']) phase = c.phase() # bytes rev = str(rev).encode('utf-8') hidden = str(c.hidden()).encode('utf-8') hg_headers = [['phase', phase], ['rev', rev], ['hidden', hidden]] revision = { 'author': author_dict, 'date': date_dict, 'committer': author_dict, 'committer_date': date_dict, 'type': 'hg', 'directory': identifiers.identifier_to_bytes(commit_tree.hash), 'message': c.description(), 'metadata': { 'extra_headers': hg_headers }, 'synthetic': False, 'parents': parents, } revision['id'] = identifiers.identifier_to_bytes( identifiers.revision_identifier(revision)) self.revisions[c.node()] = revision for n, r in self.revisions.items(): yield {'node': n, 'id': r['id']} def get_revisions(self): """Get the revision identifiers from the repository""" revs = { r['id']: r['node'] for r in self.get_revision_ids() } missing_revs = set(self.storage.revision_missing(revs.keys())) for r in missing_revs: yield self.revisions[revs[r]] def has_releases(self): """Checks whether we need to load releases""" self.num_releases = len([t for t in self.repo.tags() if not t[3]]) return self.num_releases > 0 def get_releases(self): """Get the releases that need to be loaded""" releases = {} for t in self.repo.tags(): islocal = t[3] name = t[0] if (name != b'tip' and not islocal): short_hash = t[2] node_id = self.repo[short_hash].node() target = self.revisions[node_id]['id'] release = { 'name': name, 'target': target, 'target_type': 'revision', 'message': None, 'metadata': None, 'synthetic': False, 'author': {'name': None, 'email': None, 'fullname': b''}, 'date': None } id_bytes = identifiers.identifier_to_bytes( identifiers.release_identifier(release)) release['id'] = id_bytes releases[id_bytes] = release missing_rels = set(self.storage.release_missing( sorted(releases.keys()) )) yield from (releases[r] for r in missing_rels) def get_snapshot(self): """Get the snapshot that need to be loaded""" self.num_snapshot = 1 def _get_branches(repo=self.repo): for t in ( repo.tags() + repo.branches() + repo.bookmarks()[0] ): name = t[0] short_hash = t[2] node = self.repo[short_hash].node() yield name, { 'target': self.revisions[node]['id'], 'target_type': 'revision' } snap = { 'branches': { name: branch for name, branch in _get_branches() } } snap['id'] = identifiers.identifier_to_bytes( identifiers.snapshot_identifier(snap)) return snap def get_fetch_history_result(self): """Return the data to store in fetch_history for the current loader""" return { 'contents': self.num_contents, 'directories': len(self.unique_trees), 'revisions': self.num_revisions, 'releases': self.num_releases, 'snapshot': self.num_snapshot, } def save_data(self): """We already have the data locally, no need to save it""" pass def eventful(self): """Whether the load was eventful""" return True if __name__ == '__main__': import logging import sys logging.basicConfig( level=logging.DEBUG, format='%(asctime)s %(process)d %(message)s' ) loader = HgLoader() origin_url = sys.argv[1] directory = sys.argv[2] visit_date = datetime.datetime.now(tz=datetime.timezone.utc) print(loader.load(origin_url, directory, visit_date))