diff --git a/PKG-INFO b/PKG-INFO index e17131e..9ea45d3 100644 --- a/PKG-INFO +++ b/PKG-INFO @@ -1,10 +1,10 @@ Metadata-Version: 1.0 Name: swh.loader.mercurial -Version: 0.0.2 +Version: 0.0.3 Summary: Software Heritage Mercurial Loader Home-page: https://forge.softwareheritage.org/diffusion/DLDHG/ Author: Software Heritage developers Author-email: swh-devel@inria.fr License: UNKNOWN Description: UNKNOWN Platform: UNKNOWN diff --git a/debian/control b/debian/control index 64e51ab..9889981 100644 --- a/debian/control +++ b/debian/control @@ -1,36 +1,36 @@ Source: swh-loader-mercurial Maintainer: Software Heritage developers Section: python Priority: optional Build-Depends: debhelper (>= 9), dh-python (>= 2), python3-all, python3-nose, python3-setuptools, python3-vcversioner, python3-retrying, python3-hglib, patool, python3-swh.core (>= 0.0.36~), python3-swh.model (>= 0.0.20~), python3-swh.storage (>= 0.0.95~), python3-swh.scheduler (>= 0.0.19~), - python3-swh.loader.core (>= 0.0.28~), + python3-swh.loader.core (>= 0.0.30~), python-sqlitedict Standards-Version: 3.9.6 Homepage: https://forge.softwareheritage.org/source/swh-loader-mercurial/ Package: python3-swh.loader.mercurial Architecture: all Depends: python3-swh.core (>= 0.0.36~), - python3-swh.loader.core (>= 0.0.28~), + python3-swh.loader.core (>= 0.0.30~), python3-swh.model (>= 0.0.20~), python3-swh.storage (>= 0.0.95~), python3-swh.scheduler (>= 0.0.19~), patool, python-sqlitedict, python3-hglib, ${misc:Depends}, ${python3:Depends} Description: Software Heritage Mercurial Loader Module in charge of loading hg/mercurial repositories into swh storage. diff --git a/requirements-swh.txt b/requirements-swh.txt index 27581a8..43e440f 100644 --- a/requirements-swh.txt +++ b/requirements-swh.txt @@ -1,5 +1,5 @@ swh.core >= 0.0.36 swh.model >= 0.0.20 swh.storage >= 0.0.95 swh.scheduler >= 0.0.19 -swh.loader.core >= 0.0.28 +swh.loader.core >= 0.0.30 diff --git a/swh.loader.mercurial.egg-info/PKG-INFO b/swh.loader.mercurial.egg-info/PKG-INFO index e17131e..9ea45d3 100644 --- a/swh.loader.mercurial.egg-info/PKG-INFO +++ b/swh.loader.mercurial.egg-info/PKG-INFO @@ -1,10 +1,10 @@ Metadata-Version: 1.0 Name: swh.loader.mercurial -Version: 0.0.2 +Version: 0.0.3 Summary: Software Heritage Mercurial Loader Home-page: https://forge.softwareheritage.org/diffusion/DLDHG/ Author: Software Heritage developers Author-email: swh-devel@inria.fr License: UNKNOWN Description: UNKNOWN Platform: UNKNOWN diff --git a/swh.loader.mercurial.egg-info/requires.txt b/swh.loader.mercurial.egg-info/requires.txt index 056048b..3b56b99 100644 --- a/swh.loader.mercurial.egg-info/requires.txt +++ b/swh.loader.mercurial.egg-info/requires.txt @@ -1,9 +1,9 @@ hglib retrying sqlitedict swh.core>=0.0.36 -swh.loader.core>=0.0.28 +swh.loader.core>=0.0.30 swh.model>=0.0.20 swh.scheduler>=0.0.19 swh.storage>=0.0.95 vcversioner diff --git a/swh/loader/mercurial/bundle20_loader.py b/swh/loader/mercurial/bundle20_loader.py index ed4e57b..e104279 100644 --- a/swh/loader/mercurial/bundle20_loader.py +++ b/swh/loader/mercurial/bundle20_loader.py @@ -1,370 +1,374 @@ -# Copyright (C) 2017 The Software Heritage developers +# Copyright (C) 2017-2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information """This document contains a SWH loader for ingesting repository data from Mercurial version 2 bundle files. """ # NOTE: The code here does expensive work twice in places because of the # intermediate need to check for what is missing before sending to the database # and the desire to not juggle very large amounts of data. # TODO: Decide whether to also serialize to disk and read back more quickly # from there. Maybe only for very large repos and fast drives. # - Avi import hglib import os from shutil import rmtree from tempfile import mkdtemp from swh.model import hashutil, identifiers from swh.loader.core.loader import SWHStatelessLoader from swh.loader.core.converters import content_for_storage from . import converters from .archive_extract import tmp_extract from .bundle20_reader import Bundle20Reader from .converters import PRIMARY_ALGO as ALGO from .objects import SelectiveCache, SimpleTree class HgBundle20Loader(SWHStatelessLoader): """Mercurial loader able to deal with remote or local repository. """ CONFIG_BASE_FILENAME = 'loader/hg' ADDITIONAL_CONFIG = { 'bundle_filename': ('str', 'HG20_none_bundle'), } def __init__(self, logging_class='swh.loader.mercurial.Bundle20Loader'): super().__init__(logging_class=logging_class) self.content_max_size_limit = self.config['content_size_limit'] self.bundle_filename = self.config['bundle_filename'] self.hg = None self.tags = [] def cleanup(self): """Clean temporary working directory """ if self.bundle_path and os.path.exists(self.bundle_path): self.log.debug('Cleanup up working bundle %s' % self.bundle_path) os.unlink(self.bundle_path) if self.working_directory and os.path.exists(self.working_directory): self.log.debug('Cleanup up working directory %s' % ( self.working_directory, )) rmtree(self.working_directory) def prepare(self, origin_url, visit_date, directory=None): """Prepare the necessary steps to load an actual remote or local repository. To load a local repository, pass the optional directory parameter as filled with a path to a real local folder. To load a remote repository, pass the optional directory parameter as None. """ self.origin_url = origin_url self.origin = self.get_origin() self.visit_date = visit_date self.working_directory = None self.bundle_path = None try: if not directory: # remote repository self.working_directory = mkdtemp( suffix='.tmp', prefix='swh.loader.mercurial.', dir='/tmp') os.makedirs(self.working_directory, exist_ok=True) self.hgdir = self.working_directory self.log.debug('Cloning %s to %s' % ( self.origin_url, self.hgdir)) hglib.clone(source=self.origin_url, dest=self.hgdir) else: # local repository self.working_directory = None self.hgdir = directory self.bundle_path = os.path.join(self.hgdir, self.bundle_filename) self.log.debug('Bundling at %s' % self.bundle_path) with hglib.open(self.hgdir) as repo: repo.bundle(bytes(self.bundle_path, 'utf-8'), all=True, - type=b'none') - except: + type=b'none-v2') + except Exception: self.cleanup() raise self.br = Bundle20Reader(self.bundle_path) def get_origin(self): """Get the origin that is currently being loaded in format suitable for swh.storage.""" return { 'type': 'hg', 'url': self.origin_url } def fetch_data(self): """Fetch the data from the data source.""" pass def get_contents(self): """Get the contents that need to be loaded.""" self.file_node_to_hash = {} missing_contents = set() hash_to_info = {} self.num_contents = 0 contents = {} for blob, node_info in self.br.yield_all_blobs(): self.num_contents += 1 file_name = node_info[0] header = node_info[2] content = hashutil.hash_data(blob, with_length=True) content['data'] = blob blob_hash = content[ALGO] self.file_node_to_hash[header['node']] = blob_hash hash_to_info[blob_hash] = node_info missing_contents.add(blob_hash) contents[blob_hash] = content if file_name == b'.hgtags': # https://www.mercurial-scm.org/wiki/GitConcepts#Tag_model # overwrite until the last one self.tags = (t for t in blob.split(b'\n') if t != b'') missing_contents = set( self.storage.content_missing( (contents[h] for h in missing_contents), key_hash=ALGO ) ) # Clusters needed blobs by file offset and then only fetches the # groups at the needed offsets. focs = {} # "file/offset/contents" for blob_hash in missing_contents: _, file_offset, header = hash_to_info[blob_hash] focs.setdefault(file_offset, {}) focs[file_offset][header['node']] = blob_hash hash_to_info = None for offset, node_hashes in sorted(focs.items()): for header, data, *_ in self.br.yield_group_objects( group_offset=offset ): node = header['node'] if node in node_hashes: h = node_hashes[node] yield content_for_storage( contents[h], log=self.log, max_content_size=self.content_max_size_limit, origin_id=self.origin_id ) def load_directories(self): """This is where the work is done to convert manifest deltas from the repository bundle into SWH directories. """ self.mnode_to_tree_id = {} base_manifests = self.br.build_manifest_hints() def tree_size(t): return t.size() self.trees = SelectiveCache(cache_hints=base_manifests, size_function=tree_size) tree = SimpleTree() for header, added, removed in self.br.yield_all_manifest_deltas( base_manifests ): node = header['node'] basenode = header['basenode'] tree = self.trees.fetch(basenode) or tree # working tree for path in removed.keys(): tree = tree.remove_tree_node_for_path(path) for path, info in added.items(): file_node, is_symlink, perms_code = info tree = tree.add_blob( path, self.file_node_to_hash[file_node], is_symlink, perms_code ) new_dirs = [] self.mnode_to_tree_id[node] = tree.hash_changed(new_dirs) self.trees.store(node, tree) yield header, tree, new_dirs def get_directories(self): """Get the directories that need to be loaded.""" missing_dirs = [] self.num_directories = 0 for header, tree, new_dirs in self.load_directories(): for d in new_dirs: self.num_directories += 1 missing_dirs.append(d['id']) missing_dirs = set( self.storage.directory_missing(missing_dirs) ) for header, tree, new_dirs in self.load_directories(): for d in new_dirs: if d['id'] in missing_dirs: yield d def get_revisions(self): """Get the revisions that need to be loaded.""" self.branches = {} revisions = {} self.num_revisions = 0 for header, commit in self.br.yield_all_changesets(): self.num_revisions += 1 date_dict = identifiers.normalize_timestamp( int(commit['time'].timestamp()) ) author_dict = converters.parse_author(commit['user']) if commit['manifest'] == Bundle20Reader.NAUGHT_NODE: directory_id = SimpleTree().hash_changed() else: directory_id = self.mnode_to_tree_id[commit['manifest']] extra_meta = [] extra = commit.get('extra') if extra: for e in extra.split(b'\x00'): k, v = e.split(b':', 1) k = k.decode('utf-8') extra_meta.append([k, v]) if k == 'branch': # needed for Occurrences self.branches[v] = header['node'] revision = { 'author': author_dict, 'date': date_dict, 'committer': author_dict, 'committer_date': date_dict, 'type': 'hg', 'directory': directory_id, 'message': commit['message'], 'metadata': { 'node': hashutil.hash_to_hex(header['node']), 'extra_headers': [ ['time_offset_seconds', str(commit['time_offset_seconds']).encode('utf-8')], ] + extra_meta }, 'synthetic': False, 'parents': [ header['p1'], header['p2'] ] } revision['id'] = hashutil.hash_to_bytes( identifiers.revision_identifier(revision)) revisions[revision['id']] = revision missing_revs = revisions.keys() missing_revs = set( self.storage.revision_missing(list(missing_revs)) ) for r in missing_revs: yield revisions[r] self.mnode_to_tree_id = None def get_releases(self): """Get the releases that need to be loaded.""" self.num_releases = 0 releases = {} missing_releases = [] for t in self.tags: self.num_releases += 1 node, name = t.split(b' ') release = { 'name': name, 'target': hashutil.hash_to_bytes(node.decode()), 'target_type': 'revision', 'message': None, 'metadata': None, 'synthetic': False, 'author': {'name': None, 'email': None, 'fullname': b''}, 'date': None } id_hash = hashutil.hash_to_bytes( identifiers.release_identifier(release)) release['id'] = id_hash missing_releases.append(id_hash) releases[id_hash] = release missing_releases = set(self.storage.release_missing(missing_releases)) for _id in missing_releases: yield releases[_id] - def get_occurrences(self): - """Get the occurrences that need to be loaded.""" - self.num_occurrences = 0 - for name, target in self.branches.items(): - self.num_occurrences += 1 - yield { - 'branch': name, - 'origin': self.origin_id, - 'target': target, - 'target_type': 'revision', - 'visit': self.visit, + def get_snapshot(self): + """Get the snapshot that need to be loaded.""" + self.num_snapshot = 1 + snap = { + 'id': None, + 'branches': { + name: { + 'target': target, + 'target_type': 'revision', + } + for name, target in self.branches.items() } + } + snap['id'] = identifiers.identifier_to_bytes( + identifiers.snapshot_identifier(snap)) + return snap def get_fetch_history_result(self): """Return the data to store in fetch_history.""" return { 'contents': self.num_contents, 'directories': self.num_directories, 'revisions': self.num_revisions, 'releases': self.num_releases, - 'occurrences': self.num_occurrences + 'snapshot': self.num_snapshot } class HgArchiveBundle20Loader(HgBundle20Loader): """Mercurial loader for repository wrapped within archives. """ def __init__(self): super().__init__( logging_class='swh.loader.mercurial.HgArchiveBundle20Loader') def prepare(self, origin_url, archive_path, visit_date): self.temp_dir = tmp_extract(archive=archive_path, prefix='swh.loader.mercurial.', log=self.log, source=origin_url) repo_name = os.listdir(self.temp_dir)[0] directory = os.path.join(self.temp_dir, repo_name) try: super().prepare(origin_url, visit_date, directory=directory) - except: + except Exception: self.cleanup() raise def cleanup(self): if os.path.exists(self.temp_dir): rmtree(self.temp_dir) super().cleanup() diff --git a/swh/loader/mercurial/bundle20_loader_verifier.py b/swh/loader/mercurial/bundle20_loader_verifier.py index bf229b5..475f8e2 100644 --- a/swh/loader/mercurial/bundle20_loader_verifier.py +++ b/swh/loader/mercurial/bundle20_loader_verifier.py @@ -1,218 +1,218 @@ # Copyright (C) 2017 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import code import hglib import os import random import sys import time from binascii import hexlify, unhexlify from swh.model import hashutil from .bundle20_loader import HgBundle20Loader from .converters import PRIMARY_ALGO as ALGO from .objects import SimpleTree class HgLoaderValidater(HgBundle20Loader): def generate_all_blobs(self, validate=True, frequency=1): print('GENERATING BLOBS') i = 0 start = time.time() u = set() for blob, node_info in self.br.yield_all_blobs(): filename = node_info[0] header = node_info[2] i += 1 bhash = hashutil.hash_data(blob, algorithms=set([ALGO]))[ALGO] self.file_node_to_hash[header['node']] = bhash u.update([bhash]) if validate: if random.random() < frequency: self.validate_blob(filename, header, blob) if i % 10000 == 0: print(i) print('') print('FOUND', i, 'BLOBS') print('FOUND', len(u), 'UNIQUE BLOBS') print('ELAPSED', time.time()-start) def validate_blob(self, filename, header, blob): if not self.hg: self.hg = hglib.open(self.hgdir) data = bytes(blob) filepath = os.path.join(self.hg.root(), bytes(filename)) linknode = hexlify(header['linknode']) cat_contents = self.hg.cat([filepath], rev=linknode) if cat_contents != data: print('INTERNAL ERROR ERROR ERROR ERROR') print(filename) print(header) print('-----') print(cat_contents) print('---- vs ----') print(data) code.interact(local=dict(globals(), **locals())) quit() else: print('v', end='') def generate_all_trees(self, validate=True, frequency=1): print('GENERATING MANIFEST TREES') c = 0 n = 0 u = set() start = time.time() validated = 0 for header, tree, new_dirs in self.load_directories(): if validate and (c >= validated) and (random.random() < frequency): self.validate_tree(tree, header, c) for d in new_dirs: u.add(d['id']) c += 1 n += len(new_dirs) print('.', end='') if c % 20 == 0: sys.stdout.flush() if c % 10000 == 0: print(c) print('') print('FOUND', c, 'COMMIT MANIFESTS') print('FOUND', n, 'NEW DIRS') print('FOUND', len(u), 'UNIQUE DIRS') print('ELAPSED', time.time()-start) def validate_tree(self, tree, header, i): if not self.hg: self.hg = hglib.open(self.hgdir) commit_id = header['linknode'] if len(commit_id) == 20: commit_id = hexlify(commit_id) base_tree = SimpleTree() base_files = list(self.hg.manifest(rev=commit_id)) bfiles = sorted([f[4] for f in base_files]) for p in base_files: base_tree.add_blob( p[4], self.file_node_to_hash[unhexlify(p[0])], p[3], p[1] ) base_tree.hash_changed() files = sorted(list(tree.flatten().keys())) if tree != base_tree: print('validating rev:', i, 'commit:', commit_id) print('validating files:', len(files), len(base_files)) print(' INVALID TREE') def so1(a): keys = [k['name'] for k in a['entries']] return b''.join(sorted(keys)) - tree_dirs = [d for d in tree.yield_SWH_directories()] - base_dirs = [d for d in base_tree.yield_SWH_directories()] + tree_dirs = [d for d in tree.yield_swh_directories()] + base_dirs = [d for d in base_tree.yield_swh_directories()] tree_dirs.sort(key=so1) base_dirs.sort(key=so1) # for i in range(len(tree_dirs)): # if tree_dirs[i] != base_dirs[i]: # print(i) # code.interact(local=dict(globals(), **locals())) print("Program will quit after your next Ctrl-D") code.interact(local=dict(globals(), **locals())) quit() else: print('v', end='') def generate_all_commits(self, validate=True, frequency=1): i = 0 start = time.time() for rev in self.get_revisions(): print('.', end='') i += 1 if i % 20 == 0: sys.stdout.flush() print('') print('FOUND', i, 'COMMITS') print('ELAPSED', time.time()-start) def runtest(self, hgdir, validate_blobs=False, validate_trees=False, frequency=1.0): """ HgLoaderValidater().runtest('/home/avi/SWH/mozilla-unified') """ - self.prepare('nil', hgdir, 'nil') + self.prepare('nil', 'nil', hgdir) self.file_node_to_hash = {} # blobs_pickle = 'blob_hashes.' + os.path.basename(hgdir) + '.pickle' # if os.path.exists(blobs_pickle): # with open(blobs_pickle, 'rb') as pfile: # self.file_node_to_hash = pickle.load(pfile) # if not self.file_node_to_hash: # self.generate_all_blobs(validate=validate_blobs, # frequency=frequency) # with open(blobs_pickle, 'wb') as pfile: # pickle.dump(self.file_node_to_hash, pfile) # self.generate_all_trees(validate=validate_trees, frequency=frequency) # self.generate_all_commits() for c in self.get_contents(): pass for d in self.get_directories(): pass for rev in self.get_revisions(): pass for rel in self.get_releases(): print(rel) self.visit = 'foo' for o in self.get_occurrences(): print(o) def main(): if len(sys.argv) > 1: test_repo = sys.argv[1] else: print("Please pass in the path to an HG repository.") quit() while test_repo[-1] == '/': test_repo = test_repo[:-1] if len(sys.argv) > 2: validate_frequency = float(sys.argv[2]) else: validate_frequency = 0.001 HgLoaderValidater().runtest(test_repo, True, True, validate_frequency) if __name__ == '__main__': main() diff --git a/swh/loader/mercurial/objects.py b/swh/loader/mercurial/objects.py index 347b38c..a935d70 100644 --- a/swh/loader/mercurial/objects.py +++ b/swh/loader/mercurial/objects.py @@ -1,398 +1,398 @@ # Copyright (C) 2017 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information """This document contains various helper classes used in converting Mercurial bundle files into SWH Contents, Directories, etc. """ import binascii import copy import os import sys from collections import OrderedDict from sqlitedict import SqliteDict from swh.model import identifiers OS_PATH_SEP = os.path.sep.encode('utf-8') class SimpleBlob: """Stores basic metadata of a blob object.when constructing deep trees from commit file manifests. args: file_hash: unique hash of the file contents is_symlink: (bool) is this file a symlink? file_perms: (string) 3 digit permission code as a string or bytestring, e.g. '755' or b'755' """ kind = 'file' def __init__(self, file_hash, is_symlink, file_perms): self.hash = file_hash self.perms = 0o100000 + int(file_perms, 8) if is_symlink: self.perms += 0o020000 def __str__(self): return ('SimpleBlob: ' + str(self.hash) + ' -- ' + str(self.perms)) def __eq__(self, other): return ((self.perms == other.perms) and (self.hash == other.hash)) def size(self): """Return the size in byte.""" return sys.getsizeof(self) + sys.getsizeof(self.__dict__) class SimpleTree(dict): """ Stores data for a nested directory object. Uses shallow cloning to stay compact after forking and change monitoring for efficient re-hashing. """ kind = 'dir' perms = 0o040000 def __init__(self): self.hash = None self._size = None def __eq__(self, other): return ((self.hash == other.hash) and (self.items() == other.items())) def _new_tree_node(self, path): """Deeply nests SimpleTrees according to a given subdirectory path and returns a reference to the deepest one. args: path: bytestring containing a relative path from self to a deep subdirectory. e.g. b'foodir/bardir/bazdir' returns: the new node """ node = self for d in path.split(OS_PATH_SEP): if node.get(d): if node[d].hash is not None: node[d] = copy.copy(node[d]) node[d].hash = None node[d]._size = None else: node[d] = SimpleTree() node = node[d] return node def remove_tree_node_for_path(self, path): """Deletes a SimpleBlob or SimpleTree from inside nested SimpleTrees according to the given relative file path, and then recursively removes any newly depopulated SimpleTrees. It keeps the old history by doing a shallow clone before any change. args: path: bytestring containing a relative path from self to a nested file or directory. e.g. b'foodir/bardir/bazdir/quxfile.txt' returns: the new root node """ node = self if node.hash is not None: node = copy.copy(node) node.hash = None node._size = None first, sep, rest = path.partition(OS_PATH_SEP) if rest: node[first] = node[first].remove_tree_node_for_path(rest) if len(node[first]) == 0: del node[first] else: del node[first] return node def add_blob(self, file_path, file_hash, is_symlink, file_perms): """Shallow clones the root node and then deeply nests a SimpleBlob inside nested SimpleTrees according to the given file path, shallow cloning all all intermediate nodes and marking them as changed and in need of new hashes. args: file_path: bytestring containing the relative path from self to a nested file file_hash: primary identifying hash computed from the blob contents is_symlink: True/False whether this item is a symbolic link file_perms: int or string representation of file permissions returns: the new root node """ root = self if root.hash is not None: root = copy.copy(root) root.hash = None root._size = None node = root fdir, fbase = os.path.split(file_path) if fdir: node = root._new_tree_node(fdir) node[fbase] = SimpleBlob(file_hash, is_symlink, file_perms) return root - def yield_SWH_directories(self): + def yield_swh_directories(self): """Converts nested SimpleTrees into a stream of SWH Directories. yields: an SWH Directory for every node in the tree """ for k, v in sorted(self.items()): if isinstance(v, SimpleTree): - yield from v.yield_SWH_directories() + yield from v.yield_swh_directories() yield { 'id': self.hash, 'entries': [ { 'name': k, 'perms': v.perms, 'type': v.kind, 'target': v.hash } for k, v in sorted(self.items()) ] } def hash_changed(self, new_dirs=None): """Computes and sets primary indentifier hashes for unhashed subtrees. args: new_dirs (optional): an empty list to be populated with the SWH Directories for all of the new (not previously hashed) nodes returns: the top level hash of the whole tree """ if self.hash is None: directory = { 'entries': [ { 'name': k, 'perms': v.perms, 'type': v.kind, 'target': (v.hash if v.hash is not None else v.hash_changed(new_dirs)) } for k, v in sorted(self.items()) ] } self.hash = binascii.unhexlify( identifiers.directory_identifier(directory) ) directory['id'] = self.hash if new_dirs is not None: new_dirs.append(directory) return self.hash def flatten(self, _curpath=None, _files=None): """Converts nested sub-SimpleTrees and SimpleBlobs into a list of file paths. Useful for counting the number of files in a manifest. returns: a flat list of all of the contained file paths """ _curpath = _curpath or b'' _files = _files or {} for k, v in sorted(self.items()): p = os.path.join(_curpath, k) if isinstance(v, SimpleBlob): _files[p] = (v.hash, v.perms) else: v.flatten(p, _files) return _files def size(self): """Return the (approximate?) memory utilization in bytes of the nested structure. """ if self._size is None: self._size = ( sys.getsizeof(self) + sys.getsizeof(self.__dict__) + sum([ sys.getsizeof(k)+v.size() for k, v in self.items() ]) ) return self._size class SelectiveCache(OrderedDict): """Special cache for storing past data upon which new data is known to be dependent. Optional hinting of how many instances of which keys will be needed down the line makes utilization more efficient. And, because the distance between related data can be arbitrarily long and the data fragments can be arbitrarily large, a disk-based secondary storage is used if the primary RAM-based storage area is filled to the designated capacity. Storage is occupied in three phases: 1) The most recent key/value pair is always held, regardless of other factors, until the next entry replaces it. 2) Stored key/value pairs are pushed into a randomly accessible expanding buffer in memory with a stored size function, maximum size value, and special hinting about which keys to store for how long optionally declared at instantiation. 3) The in-memory buffer pickles into a randomly accessible disk-backed secondary buffer when it becomes full. Occupied space is calculated by default as whatever the len() function returns on the values being stored. This can be changed by passing in a new size_function at instantiation. The cache_hints parameter is a dict of key/int pairs recording how many subsequent fetches that particular key's value should stay in storage for before being erased. If you provide a set of hints and then try to store a key that is not in that set of hints, the cache will store it only while it is the most recent entry, and will bypass storage phases 2 and 3. """ DEFAULT_SIZE = 800*1024*1024 # bytes or whatever def __init__(self, max_size=None, cache_hints=None, size_function=None): """args: max_size: integer value indicating the maximum size of the part of storage held in memory cache_hints: dict of key/int pairs as described in the class description size_function: callback function that accepts one parameter and returns one int, which should probably be the calculated size of the parameter """ self._max_size = max_size or SelectiveCache.DEFAULT_SIZE self._disk = None if size_function is None: self._size_function = sys.getsizeof else: self._size_function = size_function self._latest = None self._cache_size = 0 self._cache_hints = copy.copy(cache_hints) or None def store(self, key, data): """Primary method for putting data into the cache. args: key: any hashable value data: any python object (preferably one that is measurable) """ self._latest = (key, data) if (self._cache_hints is not None) and (key not in self._cache_hints): return # cache the completed data... self._cache_size += self._size_function(data) + 53 # ...but limit memory expenditure for the cache by offloading to disk should_commit = False while ( self._cache_size > self._max_size and len(self) > 0 ): should_commit = True k, v = self.popitem(last=False) self._cache_size -= self._size_function(v) - 53 self._diskstore(k, v) if should_commit: self._disk.commit(blocking=False) self[key] = data def _diskstore(self, key, value): if self._disk is None: self._disk = SqliteDict(autocommit=False, journal_mode='OFF') self._disk[key] = value def has(self, key): """Tests whether the data for the provided key is being stored. args: key: the key of the data whose storage membership property you wish to discover returns: True or False """ return ( (self._latest and self._latest[0] == key) or (key in self) or (self._disk and (key in self._disk)) ) def fetch(self, key): """Pulls a value out of storage and decrements the hint counter for the given key. args: key: the key of the data that you want to retrieve returns: the retrieved value or None """ retval = None if self._latest and self._latest[0] == key: retval = self._latest[1] if retval is None: retval = self.get(key) if (retval is None) and self._disk: self._disk.commit(blocking=False) retval = self._disk.get(key) or None self.dereference(key) return retval def dereference(self, key): """Remove one instance of expected future retrieval of the data for the given key. This is called automatically by fetch requests that aren't satisfied by phase 1 of storage. args: the key of the data for which the future retrievals hint is to be decremented """ newref = self._cache_hints and self._cache_hints.get(key) if newref: newref -= 1 if newref == 0: del self._cache_hints[key] if key in self: item = self[key] self._cache_size -= self._size_function(item) del self[key] else: if self._disk: del self._disk[key] else: self._cache_hints[key] = newref def keys(self): yield from self.keys() if self._disk: yield from self._disk.keys() def values(self): yield from self.values() if self._disk: yield from self._disk.values() def items(self): yield from self.items() if self._disk: yield from self._disk.items() diff --git a/swh/loader/mercurial/slow_loader.py b/swh/loader/mercurial/slow_loader.py index d0f79d5..1f31818 100644 --- a/swh/loader/mercurial/slow_loader.py +++ b/swh/loader/mercurial/slow_loader.py @@ -1,460 +1,472 @@ -# Copyright (C) 2017 The Software Heritage developers +# Copyright (C) 2017-2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information # WARNING WARNING WARNING WARNING # hglib is too slow to be super useful. Unfortunately it's also the only # python3 library for mercurial as of this writing. - Avi import datetime import hglib import os from swh.model import identifiers, hashutil from swh.loader.core.loader import SWHStatelessLoader from .converters import parse_author, PRIMARY_ALGO as ALGO OS_PATH_SEP = os.path.sep.encode('utf-8') def data_to_content_id(data): size = len(data) ret = { 'length': size, } ret.update(identifiers.content_identifier({'data': data})) return ret def blob_to_content_dict(data, existing_hashes=None, max_size=None, logger=None): """Convert blob data to a SWH Content. If the blob already has hashes computed, don't recompute them. TODO: This should be unified with similar functions in other places. args: existing_hashes: dict of hash algorithm:value pairs max_size: size over which blobs should be rejected logger: logging class instance returns: A Software Heritage "content". """ existing_hashes = existing_hashes or {} size = len(data) content = { 'length': size, } content.update(existing_hashes) hash_types = list(existing_hashes.keys()) hashes_to_do = hashutil.DEFAULT_ALGORITHMS.difference(hash_types) content.update(hashutil.hash_data(data, algorithms=hashes_to_do)) if max_size and (size > max_size): content.update({ 'status': 'absent', 'reason': 'Content too large', }) if logger: id_hash = hashutil.hash_to_hex(content[ALGO]) logger.info( 'Skipping content %s, too large (%s > %s)' % (id_hash, size, max_size), extra={ 'swh_type': 'loader_content_skip', 'swh_id': id_hash, 'swh_size': size } ) else: content.update({'data': data, 'status': 'visible'}) return content class SimpleBlob: """ Stores basic metadata for a blob object. """ kind = 'file' def __init__(self, file_hash, file_mode): self.hash = file_hash if not isinstance(file_mode, int): self.mode = 0o100000 + int(file_mode, 8) else: self.mode = file_mode class SimpleTree(dict): """ Stores metadata for a nested 'tree'-like object. """ kind = 'dir' mode = 0o040000 def add_tree_node_for_path(self, path): """Deeply nests SimpleTrees according to a directory path and returns a cursor to the deepest one""" node = self for d in path.split(OS_PATH_SEP): node = node.setdefault(d, SimpleTree()) return node def remove_tree_node_for_path(self, path): """Deletes a SimpleBlob from inside nested SimpleTrees according to the given file path""" first, sep, rest = path.partition(OS_PATH_SEP) if rest: self[first].remove_tree_node_for_path(rest) if not self.get(first): del self[first] else: del self[first] def add_blob(self, file_path, file_hash, file_mode): """Deeply nests a SimpleBlob inside nested SimpleTrees according to the given file path""" fdir = os.path.dirname(file_path) fbase = os.path.basename(file_path) if fdir: node = self.add_tree_node_for_path(fdir) else: node = self node[fbase] = SimpleBlob(file_hash, file_mode) class HgLoader(SWHStatelessLoader): """Load a mercurial repository from a directory. """ CONFIG_BASE_FILENAME = 'loader/hg' def __init__(self, logging_class='swh.loader.mercurial.HgLoader'): super().__init__(logging_class=logging_class) def prepare(self, origin_url, directory, visit_date): """see base.BaseLoader.prepare""" - self.origin_url = origin_url + self.origin = { + 'type': 'hg', + 'url': origin_url + } self.repo = hglib.open(directory) self.visit_date = visit_date self.node_to_blob_hash = {} self.blob_hash_to_file_rev = {} self.commit_trees = {} self.unique_trees = {} self.revisions = {} def get_origin(self): """Get the origin that is currently being loaded in format suitable for swh.storage""" - return { - 'type': 'hg', - 'url': self.origin_url - } + return self.origin def fetch_data(self): """Fetch the data from the data source""" pass def has_contents(self): """Checks whether we need to load contents""" # if we have any revisions, then obviously we have contents. return self.has_revisions() def iter_changelog(self): """Iterate over the repository log""" yield from self.repo.log('0:tip', removed=True) def get_node_file_if_new(self, f, rev, node_hash): """Load a blob from disk""" # Fast if the node hash is already cached. Somehow this shortcuts a # meaningful but not huge percentage of the loads for a repository. if node_hash not in self.node_to_blob_hash: file_path = os.path.join(self.repo.root(), f) data = self.repo.cat([file_path], rev) blob_hash = identifiers.content_identifier( {'data': data} )[ALGO] self.node_to_blob_hash[node_hash] = blob_hash if blob_hash not in self.blob_hash_to_file_rev: # new blob self.blob_hash_to_file_rev[blob_hash] = (file_path, rev) return blob_hash, data return self.node_to_blob_hash[node_hash], None def get_content_ids(self): """Get all the contents, but trim away the actual data""" self.node_to_blob_hash = {} self.blob_hash_to_file_rev = {} self.num_contents = 0 for li in self.iter_changelog(): c = self.repo[li] rev = c.rev() manifest = c.manifest() for f in c.added() + c.modified(): node_hash = manifest[f] blob_hash, data = self.get_node_file_if_new(f, rev, node_hash) if data is not None: # new blob self.num_contents += 1 yield data_to_content_id(data) def get_contents(self): """Get the contents that need to be loaded""" # This method unfortunately loads and hashes the blobs twice. max_content_size = self.config['content_size_limit'] missing_contents = set( self.storage.content_missing( self.get_content_ids(), ALGO ) ) for oid in missing_contents: file_path, rev = self.blob_hash_to_file_rev[oid] data = self.repo.cat([file_path], rev) yield blob_to_content_dict( data, max_size=max_content_size, logger=self.log ) def has_directories(self): """Checks whether we need to load directories""" # if we have any revs, we must also have dirs return self.has_revisions() def get_directories(self): """Get the directories that need to be loaded""" missing_dirs = set(self.storage.directory_missing( sorted(self.unique_trees.keys()) )) for dir_hash in missing_dirs: yield self.unique_trees[dir_hash] def has_revisions(self): """Checks whether we need to load revisions""" self.num_revisions = int(self.repo.tip()[0]) + 1 return self.num_revisions > 0 def update_tree_from_rev(self, tree, rev, only_these_files=None): """Iterates over changes in a revision and adds corresponding SimpleBlobs to a SimpleTree""" if rev >= 0: manifest = {k[4]: k for k in self.repo.manifest(rev=rev)} loop_keys = only_these_files or manifest.keys() for f in loop_keys: node_hash = manifest[f][0] file_mode = manifest[f][1] file_hash, _ = self.get_node_file_if_new(f, rev, node_hash) tree.add_blob(f, file_hash, file_mode) return tree def reconstruct_tree(self, directory): """Converts a flat directory into nested SimpleTrees.""" # This method exists because the code was already written to use # SimpleTree before then reducing memory use and converting to the # canonical format. A refactor using lookups instead of nesting could # obviate the need. new_tree = SimpleTree() for entry in directory['entries']: tgt = entry['target'] perms = entry['perms'] name = entry['name'] if tgt in self.unique_trees: # subtree new_tree[name] = self.reconstruct_tree(self.unique_trees[tgt]) else: # blob new_tree[name] = SimpleBlob(tgt, perms) new_tree.hash = directory['id'] return new_tree def collapse_tree(self, tree): """Converts nested SimpleTrees into multiple flat directories.""" # This method exists because the code was already written to use # SimpleTree before then reducing memory use and converting to the # canonical format. A refactor using lookups instead of nesting could # obviate the need. directory = { 'entries': [ { 'name': k, 'perms': v.mode, 'type': v.kind, 'target': (isinstance(v, SimpleBlob) and v.hash or self.collapse_tree(v)) } for k, v in tree.items() ] } tree.hash = identifiers.directory_identifier(directory) directory['id'] = tree.hash self.unique_trees[tree.hash] = directory return tree.hash def get_revision_ids(self): """Get the revisions that need to be loaded""" self.unique_trees = {} commit_tree = None for li in self.iter_changelog(): c = self.repo[li[1]] rev = c.rev() # start from the parent state p1 = c.p1().rev() if p1 in self.commit_trees: if p1 != rev-1: # Most of the time, a revision will inherit from the # previous one. In those cases we can reuse commit_tree, # otherwise build a new one here. parent_tree = self.unique_trees[self.commit_trees[p1]] commit_tree = self.reconstruct_tree(parent_tree) else: commit_tree = self.update_tree_from_rev(SimpleTree(), p1) # remove whatever is removed for f in c.removed(): commit_tree.remove_tree_node_for_path(f) # update whatever is updated self.update_tree_from_rev(commit_tree, rev, c.added()+c.modified()) self.commit_trees[rev] = self.collapse_tree(commit_tree) date_dict = identifiers.normalize_timestamp( int(c.date().timestamp()) ) author_dict = parse_author(c.author()) + parents = [] + for p in c.parents(): + if p.rev() >= 0: + parents.append(self.revisions[p.node()]['id']) + + phase = c.phase() # bytes + rev = str(rev).encode('utf-8') + hidden = str(c.hidden()).encode('utf-8') + hg_headers = [['phase', phase], ['rev', rev], ['hidden', hidden]] + revision = { 'author': author_dict, 'date': date_dict, 'committer': author_dict, 'committer_date': date_dict, 'type': 'hg', - 'directory': commit_tree.hash, + 'directory': identifiers.identifier_to_bytes(commit_tree.hash), 'message': c.description(), 'metadata': { - 'extra_headers': [ - ['phase', c.phase()], - ['rev', rev], - ['hidden', c.hidden()] - ] + 'extra_headers': hg_headers }, 'synthetic': False, - 'parents': [ - self.revisions[p.node()]['id'] for p in c.parents() - if p.rev() >= 0 - ] + 'parents': parents, } - revision['id'] = identifiers.revision_identifier(revision) + revision['id'] = identifiers.identifier_to_bytes( + identifiers.revision_identifier(revision)) self.revisions[c.node()] = revision for n, r in self.revisions.items(): yield {'node': n, 'id': r['id']} def get_revisions(self): """Get the revision identifiers from the repository""" - revs = {r['id']: r['node'] for r in self.get_revision_ids()} + revs = { + r['id']: r['node'] + for r in self.get_revision_ids() + } missing_revs = set(self.storage.revision_missing(revs.keys())) for r in missing_revs: yield self.revisions[revs[r]] def has_releases(self): """Checks whether we need to load releases""" self.num_releases = len([t for t in self.repo.tags() if not t[3]]) return self.num_releases > 0 def get_releases(self): """Get the releases that need to be loaded""" releases = {} for t in self.repo.tags(): islocal = t[3] name = t[0] if (name != b'tip' and not islocal): short_hash = t[2] - target = self.revisions[self.repo[short_hash].node()]['id'] + node_id = self.repo[short_hash].node() + target = self.revisions[node_id]['id'] release = { 'name': name, 'target': target, 'target_type': 'revision', 'message': None, 'metadata': None, 'synthetic': False, - 'author': None, + 'author': {'name': None, 'email': None, 'fullname': b''}, 'date': None } - id_hash = identifiers.release_identifier(release) - release['id'] = id_hash - releases[id_hash] = release + id_bytes = identifiers.identifier_to_bytes( + identifiers.release_identifier(release)) + release['id'] = id_bytes + releases[id_bytes] = release missing_rels = set(self.storage.release_missing( sorted(releases.keys()) )) yield from (releases[r] for r in missing_rels) - def has_occurrences(self): - """Checks whether we need to load occurrences""" - self.num_occurrences = len( - self.repo.tags() + self.repo.branches() + self.repo.bookmarks()[0] - ) - return self.num_occurrences > 0 + def get_snapshot(self): + """Get the snapshot that need to be loaded""" + self.num_snapshot = 1 - def get_occurrences(self): - """Get the occurrences that need to be loaded""" - for t in ( - self.repo.tags() + self.repo.branches() + self.repo.bookmarks()[0] - ): - name = t[0] - short_hash = t[2] - target = self.revisions[self.repo[short_hash].node()]['id'] - yield { - 'branch': name, - 'origin': self.origin_id, - 'target': target, - 'target_type': 'revision', - 'visit': self.visit, + def _get_branches(repo=self.repo): + for t in ( + repo.tags() + repo.branches() + repo.bookmarks()[0] + ): + name = t[0] + short_hash = t[2] + node = self.repo[short_hash].node() + yield name, { + 'target': self.revisions[node]['id'], + 'target_type': 'revision' + } + + snap = { + 'branches': { + name: branch + for name, branch in _get_branches() } + } + snap['id'] = identifiers.identifier_to_bytes( + identifiers.snapshot_identifier(snap)) + return snap def get_fetch_history_result(self): """Return the data to store in fetch_history for the current loader""" return { 'contents': self.num_contents, 'directories': len(self.unique_trees), 'revisions': self.num_revisions, 'releases': self.num_releases, - 'occurrences': self.num_occurrences, + 'snapshot': self.num_snapshot, } def save_data(self): """We already have the data locally, no need to save it""" pass def eventful(self): """Whether the load was eventful""" return True if __name__ == '__main__': import logging import sys logging.basicConfig( level=logging.DEBUG, format='%(asctime)s %(process)d %(message)s' ) loader = HgLoader() origin_url = sys.argv[1] directory = sys.argv[2] visit_date = datetime.datetime.now(tz=datetime.timezone.utc) print(loader.load(origin_url, directory, visit_date)) diff --git a/swh/loader/mercurial/tasks.py b/swh/loader/mercurial/tasks.py index 11ea571..8162a3c 100644 --- a/swh/loader/mercurial/tasks.py +++ b/swh/loader/mercurial/tasks.py @@ -1,43 +1,43 @@ -# Copyright (C) 2017 The Software Heritage developers +# Copyright (C) 2017-2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from swh.scheduler.task import Task from .bundle20_loader import HgBundle20Loader, HgArchiveBundle20Loader class LoadMercurialTsk(Task): """Mercurial repository loading """ task_queue = 'swh_loader_mercurial' def run_task(self, *, origin_url, visit_date, directory): """Import a mercurial tarball into swh. Args: see :func:`DepositLoader.load`. """ loader = HgBundle20Loader() loader.log = self.log - loader.load(origin_url=origin_url, - directory=directory, - visit_date=visit_date) + return loader.load(origin_url=origin_url, + directory=directory, + visit_date=visit_date) class LoadArchiveMercurialTsk(Task): task_queue = 'swh_loader_mercurial_archive' def run_task(self, *, origin_url, archive_path, visit_date): """Import a mercurial tarball into swh. Args: see :func:`DepositLoader.load`. """ loader = HgArchiveBundle20Loader() loader.log = self.log - loader.load(origin_url=origin_url, - archive_path=archive_path, - visit_date=visit_date) + return loader.load(origin_url=origin_url, + archive_path=archive_path, + visit_date=visit_date) diff --git a/version.txt b/version.txt index 5bb8c9d..c409df3 100644 --- a/version.txt +++ b/version.txt @@ -1 +1 @@ -v0.0.2-0-gfefd4ca \ No newline at end of file +v0.0.3-0-gf9a7657 \ No newline at end of file