diff --git a/swh/loader/mercurial/bundle20_loader.py b/swh/loader/mercurial/bundle20_loader.py new file mode 100644 --- /dev/null +++ b/swh/loader/mercurial/bundle20_loader.py @@ -0,0 +1,292 @@ +# Copyright (C) 2017 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +"""This document contains a SWH loader for ingesting repository data from +Mercurial version 2 bundle files. +""" + +# NOTE: The code here does expensive work twice in places because of the +# intermediate need to check for what is missing before sending to the database +# and the desire to not juggle very large amounts of data. + +# TODO: Decide whether to also serialize to disk and read back more quickly +# from there. Maybe only for very large repos and fast drives. +# - Avi + +import os + +import hglib + +import base +import converters +from bundle20_reader import Bundle20Reader +from converters import PRIMARY_ALGO as ALGO +from objects import SelectiveCache, SimpleTree +from swh.model import hashutil, identifiers + +DEBUG = True +MAX_BLOB_SIZE = 100*1024*1024 # bytes +# TODO: What should MAX_BLOB_SIZE be? + + +class HgBundle20Loader(base.BaseLoader): + CONFIG_BASE_FILENAME = 'loader/hg-loader' + BUNDLE_FILENAME = 'HG20_none_bundle' + + def __init__(self): + self.hg = None + self.tags = [] + + def prepare(self, origin_url, directory, fetch_date): + """see base.BaseLoader.prepare""" + self.origin_url = origin_url + self.fetch_date = fetch_date + self.hgdir = directory + bundle_path = os.path.join(directory, HgBundle20Loader.BUNDLE_FILENAME) + + if DEBUG and not os.path.isfile(bundle_path): + # generate a bundle from the given directory if needed (testing) + with hglib.open(directory) as repo: + repo.bundle( + bytes(bundle_path, 'utf-8'), + all=True, + type=b'none' + ) + + self.br = Bundle20Reader(bundle_path) + + def get_origin(self): + """Get the origin that is currently being loaded in format suitable for + swh.storage.""" + return { + 'type': 'hg', + 'url': self.origin_url + } + + def fetch_data(self): + """Fetch the data from the data source.""" + pass + + def get_contents(self): + """Get the contents that need to be loaded.""" + self.file_node_to_hash = {} + missing_contents = set() + hash_to_info = {} + self.num_contents = 0 + + for blob, node_info in self.br.yield_all_blobs(): + self.num_contents += 1 + file_name = node_info[0] + header = node_info[2] + blob_hash = hashutil.hash_data(blob, algorithms=set([ALGO]))[ALGO] + self.file_node_to_hash[header['node']] = blob_hash + hash_to_info[blob_hash] = node_info + missing_contents.add(blob_hash) + + if file_name == b'.hgtags': + # https://www.mercurial-scm.org/wiki/GitConcepts#Tag_model + self.tags = blob.split(b'\n') # overwrite until the last one + + if not DEBUG: + missing_contents = set( + self.storage.content_missing(iter(missing_contents), ALGO) + ) + + # Clusters needed blobs by file offset and then only fetches the + # groups at the needed offsets. + focs = {} # "file/offset/contents" + for blob_hash in missing_contents: + _, file_offset, header = hash_to_info[blob_hash] + focs.setdefault(file_offset, {}) + focs[file_offset][header['node']] = blob_hash + hash_to_info = None + + for offset, node_hashes in sorted(focs.items()): + for header, data, *_ in self.br.yield_group_objects( + group_offset=offset + ): + node = header['node'] + if node in node_hashes: + blob, meta = self.br.extract_meta_from_blob(data) + yield converters.blob_to_content_dict( + data=blob, + existing_hashes={ALGO: node_hashes[node]}, + max_size=MAX_BLOB_SIZE + ) + # # NOTE: This is a slower but cleaner version of the code above. + # for blob, node_info in self.br.yield_all_blobs(): + # header = node_info[2] + # node = header['node'] + # blob_hash = self.file_node_to_hash[node] + # if blob_hash in missing_contents: + # yield converters.blob_to_content_dict( + # data=blob, + # existing_hashes={ALGO: blob_hash}, + # max_size=MAX_BLOB_SIZE + # ) + + def load_directories(self): + """This is where the work is done to convert manifest deltas from the + repository bundle into SWH directories. + """ + self.mnode_to_tree_id = {} + base_manifests = self.br.build_manifest_hints() + + def tree_size(t): + return t.size() + + self.trees = SelectiveCache(cache_hints=base_manifests, + size_function=tree_size) + + tree = SimpleTree() + for header, added, removed in self.br.yield_all_manifest_deltas( + base_manifests + ): + node = header['node'] + basenode = header['basenode'] + tree = self.trees.fetch(basenode) or tree # working tree + + for path in removed.keys(): + tree = tree.remove_tree_node_for_path(path) + for path, info in added.items(): + file_node, is_symlink, perms_code = info + tree = tree.add_blob( + path, + self.file_node_to_hash[file_node], + is_symlink, + perms_code + ) + + new_dirs = [] + self.mnode_to_tree_id[node] = tree.hash_changed(new_dirs) + self.trees.store(node, tree) + yield header, tree, new_dirs + + def get_directories(self): + """Get the directories that need to be loaded.""" + missing_dirs = [] + self.num_directories = 0 + + for header, tree, new_dirs in self.load_directories(): + for d in new_dirs: + self.num_directories += 1 + missing_dirs.append(d['id']) + missing_dirs = set(missing_dirs) + + if not DEBUG: + missing_dirs = set( + self.storage.directory_missing(missing_dirs) + ) + + for header, tree, new_dirs in self.load_directories(): + for d in new_dirs: + if d['id'] in missing_dirs: + yield d + + def get_revisions(self): + """Get the revisions that need to be loaded.""" + self.branches = {} + revisions = {} + self.num_revisions = 0 + for header, commit in self.br.yield_all_changesets(): + self.num_revisions += 1 + date_dict = identifiers.normalize_timestamp( + int(commit['time'].timestamp()) + ) + author_dict = converters.parse_author(commit['user']) + if commit['manifest'] == Bundle20Reader.NAUGHT_NODE: + directory_id = SimpleTree().hash_changed() + else: + directory_id = self.mnode_to_tree_id[commit['manifest']] + + extra_meta = [] + extra = commit.get('extra') + if extra: + for e in extra.split(b'\x00'): + k, v = e.split(b':', 1) + k = k.decode('utf-8') + extra_meta.append([k, v]) + if k == 'branch': # needed for Occurrences + self.branches[v] = header['node'] + + revision = { + 'author': author_dict, + 'date': date_dict, + 'committer': author_dict, + 'committer_date': date_dict, + 'type': 'hg', + 'directory': directory_id, + 'message': commit['message'], + 'metadata': { + 'node': header['node'], + 'extra_headers': [ + ['time_offset_seconds', commit['time_offset_seconds']], + ] + extra_meta + }, + 'synthetic': False, + 'parents': [ + header['p1'], + header['p2'] + ] + } + revision['id'] = identifiers.revision_identifier(revision) + revisions[revision['id']] = revision + + missing_revs = revisions.keys() + + if not DEBUG: + missing_revs = set( + self.storage.revision_missing(missing_revs) + ) + + for r in missing_revs: + yield revisions[r] + self.mnode_to_tree_id = None + + def get_releases(self): + """Get the releases that need to be loaded.""" + releases = {} + self.num_releases = 0 + for t in self.tags: + self.num_releases += 1 + node, name = t.split(b' ') + release = { + 'name': name, + 'target': node, + 'target_type': 'revision', + 'message': None, + 'metadata': None, + 'synthetic': False, + 'author': None, + 'date': None + } + id_hash = identifiers.release_identifier(release) + release['id'] = id_hash + releases[id_hash] = release + + yield from releases.values() + + def get_occurrences(self): + """Get the occurrences that need to be loaded.""" + self.num_occurrences = 0 + for name, target in self.branches.items(): + self.num_occurrences += 1 + yield { + 'branch': name, + 'origin': self.origin_url, + 'target': target, + 'target_type': 'revision', + 'visit': self.visit, + } + + def get_fetch_history_result(self): + """Return the data to store in fetch_history.""" + return { + 'contents': self.num_contents, + 'directories': self.num_directories, + 'revisions': self.num_revisions, + 'releases': self.num_releases, + 'occurrences': self.num_occurrences + } diff --git a/swh/loader/mercurial/bundle20_loader_verifier.py b/swh/loader/mercurial/bundle20_loader_verifier.py new file mode 100644 --- /dev/null +++ b/swh/loader/mercurial/bundle20_loader_verifier.py @@ -0,0 +1,217 @@ +# Copyright (C) 2017 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +import code +import os +import random +import sys +import time +from binascii import hexlify, unhexlify + +import hglib + +from bundle20_loader import HgBundle20Loader +from converters import PRIMARY_ALGO as ALGO +from objects import SimpleTree +from swh.model import hashutil + + +class HgLoaderValidater(HgBundle20Loader): + def generate_all_blobs(self, validate=True, frequency=1): + print('GENERATING BLOBS') + i = 0 + start = time.time() + u = set() + for blob, node_info in self.br.yield_all_blobs(): + filename = node_info[0] + header = node_info[2] + i += 1 + + bhash = hashutil.hash_data(blob, algorithms=set([ALGO]))[ALGO] + self.file_node_to_hash[header['node']] = bhash + + u.update([bhash]) + + if validate: + if random.random() < frequency: + self.validate_blob(filename, header, blob) + + if i % 10000 == 0: + print(i) + + print('') + print('FOUND', i, 'BLOBS') + print('FOUND', len(u), 'UNIQUE BLOBS') + print('ELAPSED', time.time()-start) + + def validate_blob(self, filename, header, blob): + if not self.hg: + self.hg = hglib.open(self.hgdir) + + data = bytes(blob) + + filepath = os.path.join(self.hg.root(), bytes(filename)) + linknode = hexlify(header['linknode']) + cat_contents = self.hg.cat([filepath], rev=linknode) + + if cat_contents != data: + print('INTERNAL ERROR ERROR ERROR ERROR') + print(filename) + print(header) + print('-----') + print(cat_contents) + print('---- vs ----') + print(data) + code.interact(local=dict(globals(), **locals())) + quit() + else: + print('v', end='') + + def generate_all_trees(self, validate=True, frequency=1): + print('GENERATING MANIFEST TREES') + + c = 0 + n = 0 + u = set() + + start = time.time() + validated = 0 + + for header, tree, new_dirs in self.load_directories(): + if validate and (c >= validated) and (random.random() < frequency): + self.validate_tree(tree, header, c) + + for d in new_dirs: + u.add(d['id']) + + c += 1 + n += len(new_dirs) + + print('.', end='') + if c % 20 == 0: + sys.stdout.flush() + if c % 10000 == 0: + print(c) + + print('') + print('FOUND', c, 'COMMIT MANIFESTS') + print('FOUND', n, 'NEW DIRS') + print('FOUND', len(u), 'UNIQUE DIRS') + print('ELAPSED', time.time()-start) + + def validate_tree(self, tree, header, i): + if not self.hg: + self.hg = hglib.open(self.hgdir) + + commit_id = header['linknode'] + if len(commit_id) == 20: + commit_id = hexlify(commit_id) + + base_tree = SimpleTree() + base_files = list(self.hg.manifest(rev=commit_id)) + bfiles = sorted([f[4] for f in base_files]) + + for p in base_files: + base_tree.add_blob( + p[4], self.file_node_to_hash[unhexlify(p[0])], p[3], p[1] + ) + base_tree.hash_changed() + + files = sorted(list(tree.flatten().keys())) + + if tree != base_tree: + print('validating rev:', i, 'commit:', commit_id) + print('validating files:', len(files), len(base_files)) + print(' INVALID TREE') + + def so1(a): + keys = [k['name'] for k in a['entries']] + return b''.join(sorted(keys)) + + tree_dirs = [d for d in tree.yield_SWH_directories()] + base_dirs = [d for d in base_tree.yield_SWH_directories()] + tree_dirs.sort(key=so1) + base_dirs.sort(key=so1) + + # for i in range(len(tree_dirs)): + # if tree_dirs[i] != base_dirs[i]: + # print(i) + # code.interact(local=dict(globals(), **locals())) + + print("Program will quit after your next Ctrl-D") + code.interact(local=dict(globals(), **locals())) + quit() + else: + print('v', end='') + + def generate_all_commits(self, validate=True, frequency=1): + i = 0 + start = time.time() + for rev in self.get_revisions(): + print('.', end='') + i += 1 + if i % 20 == 0: + sys.stdout.flush() + + print('') + print('FOUND', i, 'COMMITS') + print('ELAPSED', time.time()-start) + + def runtest(self, hgdir, validate_blobs=False, validate_trees=False, + frequency=1.0): + """ + HgLoaderValidater().runtest('/home/avi/SWH/mozilla-unified') + """ + self.prepare('nil', hgdir, 'nil') + + self.file_node_to_hash = {} + + # blobs_pickle = 'blob_hashes.' + os.path.basename(hgdir) + '.pickle' + # if os.path.exists(blobs_pickle): + # with open(blobs_pickle, 'rb') as pfile: + # self.file_node_to_hash = pickle.load(pfile) + + # if not self.file_node_to_hash: + # self.generate_all_blobs(validate=validate_blobs, + # frequency=frequency) + + # with open(blobs_pickle, 'wb') as pfile: + # pickle.dump(self.file_node_to_hash, pfile) + + # self.generate_all_trees(validate=validate_trees, frequency=frequency) + # self.generate_all_commits() + for c in self.get_contents(): + pass + for d in self.get_directories(): + pass + for rev in self.get_revisions(): + pass + for rel in self.get_releases(): + print(rel) + self.visit = 'foo' + for o in self.get_occurrences(): + print(o) + + +def main(): + if len(sys.argv) > 1: + test_repo = sys.argv[1] + else: + print("Please pass in the path to an HG repository.") + quit() + + while test_repo[-1] == '/': + test_repo = test_repo[:-1] + + if len(sys.argv) > 2: + validate_frequency = float(sys.argv[2]) + else: + validate_frequency = 0.001 + + HgLoaderValidater().runtest(test_repo, True, True, validate_frequency) + + +if __name__ == '__main__': + main() diff --git a/swh/loader/mercurial/bundle20_reader.py b/swh/loader/mercurial/bundle20_reader.py new file mode 100644 --- /dev/null +++ b/swh/loader/mercurial/bundle20_reader.py @@ -0,0 +1,604 @@ +# Copyright (C) 2017 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +"""This document contains code for extracting all of the data from Mercurial +version 2 bundle file. It is referenced by bundle20_loader.py +""" + +# ============================================================================= +# ============================================================================= +# BACKGROUND +# ============================================================================= +# ============================================================================= +# +# https://www.mercurial-scm.org/wiki/BundleFormat says: +# "The new bundle format design is described on the BundleFormat2 page." +# +# https://www.mercurial-scm.org/wiki/BundleFormat2#Format_of_the_Bundle2_Container says: # noqa +# "The latest description of the binary format can be found as comment in the +# Mercurial source code." +# +# https://www.mercurial-scm.org/repo/hg/file/default/mercurial/help/internals/bundles.txt says: # noqa +# "The 'HG20' format is not yet documented here. See the inline comments in +# 'mercurial/exchange.py' for now." +# +# ----------------------------------------------------------------------------- +# Avi says: +# ----------------------------------------------------------------------------- +# +# All of the above official(?) statements seem to be quite wrong. +# +# The mercurial-scm wiki is a cluster#@*& of missing pages, bad links, wrong +# information, obsolete information, undecipherable names, and half-started +# leavings that only sort of look like content. I don't understand who or what +# it's there for. I think that means it's not there for me? +# +# https://www.mercurial-scm.org/wiki/BundleFormat2#New_header is wrong and +# bizarre, and there isn't any other information on the page. +# +# https://www.mercurial-scm.org/repo/hg/file/de86a6872d06/mercurial/help/internals/changegroups.txt # noqa +# is very close to what we need. It is accurate, current, and thorough. +# It describes much of the internal structure, which is super helpful if you +# know in advance which info can be trusted, but it doesn't describe any of the +# file-level details, including the file headers and that the entire bundle +# is broken into overlayed 4KB chunks starting from just after the bundle +# header, nor does it describe what any of the component elements are used for, +# nor does it explain the meta-message segment in the blob deltas, nor does it +# explain the file flags occasionally appended to manifest file hashes. Also it +# says: "The [delta data] format is described more fully in 'hg help +# internals.bdiff'", which is also wrong. As far as I can tell, that file has +# never existed. +# +# It does however have one potentially extremely useful note buried in the +# middle that, in hindsight, could have significant implications for complexity +# and performance in future Mercurial loading work. +# +# It says: "In version 1, the delta is always applied against the previous node +# from the changegroup or the first parent if this is the first entry in the +# changegroup." +# +# If the next version of HG support for SWH can reliably get version 1 data, +# then it could be implemented entirely without worrying about ballooning +# memory utilization, which would shrink the code significantly and probably be +# faster too. So maybe HG10 bundles instead of HG20 bundles are superior for +# this task? But then I read that servers can optionally disable serving +# version 1 content, and I like to think that this loader could eventually +# be applied directly to a network stream without an intermediate phase for +# cloning and local bundling, so...It seemed like a good idea at the time? +# +# ----------------------------------------------------------------------------- +# Other notes and thoughts: +# ----------------------------------------------------------------------------- +# 1) +# This is a relatively minor detail, but +# Mercurial nodes are not content-addressable like Git's are. +# +# https://www.mercurial-scm.org/wiki/Nodeid explains: "If you modify a file, +# commit the change, and then modify it to restore the original contents, the +# contents are the same but the history is different, so the file will get a +# new nodeid. This history-sensitivity is obtained by calculating the nodeid +# from the concatentation of the parent nodeids with the file's contents..." +# +# The result is that we always have to collect and hash everything at least +# once in order to know if we've seen something like it before, because nothing +# tells us that the node we're looking at is unique. We can use node ids for +# linking disparate elements together (e.g. commit to manifest) but not for +# determining whether two elements in the same group are identical in all but +# descendency. So there's no way to save time on duplicate hashing. Well... +# there is the copied_file blob metadata, but, lol. +# +# 2) +# Most of the code complexity is due to dealing with 'version 2' changegroups, +# for which we need to keep track of the entire history of all updates made +# to a given file or working directory tree structure, because a revision +# delta could be applied over any of the prior revisions all the way back to +# rev 0, according to whenever files were branched/merged/uncommitted/etc. For +# very large repositories with a lot of churn, this can quickly expand to +# require multiple gigabytes of space, possibly exceeding RAM availability if +# one desires to keep as much data resident in memory as possible to boost +# performance. mozilla-unified, for instance, produces some 2 million+ blobs +# (1.6 million+ unique). Nested umpteen subdirectory levels deep, those blobs +# balloon into a quantity of directory subtrees that rapidly exceeds an 8GB RAM +# laptop's ability to keep them all active without a good amount of care and +# pruning. The code here tries to strike a balance between memory utilization +# and performance. +# +# This problem is also referenced in the last paragraph of the previous +# section, where potentially this problem wouldn't exist for 'version 1' data +# if we can reliably get it. Can we? Either that or not use bundles at all, +# which has other costs. +# +# 3) +# If the list of changed files stored by the changesets had indicated which +# of those changed files were added or modified and which ones were removed, +# this code could be much faster. Right now we have to perform a huge number of +# substring replacements (see the apply_revdata method) to produce a complete +# file manifest for each commit (as a string!!!) in order to know how to get +# the set of removed files from the next delta. We can intuit from every +# manifest delta which files were modified or added, but I believe there's no +# way to intuit which files were removed without actually having the complete +# prior state and without the list of removals being explicitly given. If you +# have an explicit list of all the files that were removed for any given commit +# changegroup, and you combine that with the delta updates in the manifest +# changegroups which detail the set of files that have been added or modified, +# then you wouldn't even have to apply any of the string deltas to get a +# complete understanding of the set of differences between one manifest and the +# next. Not having this effective speed boost is rather unfortunate; it would +# require only one extra stored byte per commit to differentiate removals and +# would make extracting bundles lightning fast. +# ============================================================================ +## + +import itertools +import struct +from binascii import unhexlify +from collections import OrderedDict +from datetime import datetime + +from chunked_reader import ChunkedFileReader +from objects import SelectiveCache + + +def unpack(fmt_str, source): + """Utility function for fetching the right number of bytes from a stream to + satisfy a struct.unpack pattern. + + args: + fmt_str: a struct.unpack string pattern + (e.g. '>I' for 4 bytes big-endian) + source: any IO object that has a read() method which + returns an appropriate sequence of bytes + """ + ret = struct.unpack(fmt_str, source.read(struct.calcsize(fmt_str))) + if len(ret) == 1: + return ret[0] + return ret + + +class Bundle20Reader(object): + """Parser for extracting data from Mercurial Bundle20 files. + + args: + bundlefile: string name of the binary repository bundle file + cache_size: int tuning parameter for the upper RAM limit used by + historical data caches. The default is defined in the + SelectiveCache class. + """ + NAUGHT_NODE = b'\x00' * 20 + + def __init__(self, bundlefile, cache_size=None): + self.bundlefile = bundlefile + bfile = open(bundlefile, 'rb', buffering=200*1024*1024) + + btype = bfile.read(4) # 'HG20' + if btype != b'HG20': + raise Exception(bundlefile, + b'Not an HG20 bundle. First 4 bytes:' + btype) + bfile.read(4) # '\x00\x00\x00\x00' + + self.params = self.read_bundle_header(bfile) + print('PARAMETERS', self.params) + self.num_commits = self.params[b'nbchanges'] + + self.filereader = ChunkedFileReader(bfile) + + self.cache_size = cache_size + self.blobs_offset = None + self.changes_offset = self.filereader.tell() + self.changes_next_offset = None + self.manifests_offset = None + self.manifests_next_offset = None + self.id_to_info = {} + + def read_bundle_header(self, bfile): + """Parse the file header which describes the format and parameters. + See the structure diagram at the top of the file for more insight. + + args: + bfile: bundle file handle with the cursor at the start offset of + the content header (the 9th byte in the file) + + returns: + dict of decoded bundle parameters + """ + unpack('>I', bfile) # header length + chg_len = unpack('>B', bfile) # len('CHANGEGROUP') == 11 + bfile.read(chg_len) # should say 'CHANGEGROUP' + unpack('>I', bfile) # probably \x00\x00\x00\x00 + + n_mandatory, n_advisory = unpack('>BB', bfile) # parameter counts + mandatory_params = [ + (key_len, val_len) + for key_len, val_len + in [unpack('>BB', bfile) for i in range(n_mandatory)] + ] + advisory_params = [ + (key_len, val_len) + for key_len, val_len + in [unpack('>BB', bfile) for i in range(n_advisory)] + ] + params = {} + + for key_len, val_len in mandatory_params+advisory_params: + key = unpack('>%ds' % key_len, bfile) + val = int(unpack('>%ds' % val_len, bfile)) + params[key] = val + + return params + + def revdata_iterator(self, bytes_to_read): + """A chunk's revdata section is a series of start/end/length/data_delta + content updates called RevDiffs that indicate components of a text diff + applied to the node's basenode. The sum length of all diffs is the + length indicated at the beginning of the chunk at the start of the + header. + See the structure diagram at the top of the file for more insight. + + args: + bytes_to_read: int total number of bytes in the chunk's revdata + yields: + (int, int, read iterator) representing a single text diff component + """ + while bytes_to_read > 0: + start_offset = unpack('>I', self.filereader) + end_offset = unpack('>I', self.filereader) + blocklen = unpack('>I', self.filereader) + delta_it = self.filereader.read_iterator(blocklen) + bytes_to_read -= (12 + blocklen) + yield (start_offset, end_offset, delta_it) # RevDiff + + def read_chunk_header(self): + """The header of a RevChunk describes the id ('node') for the current + change, the commit id ('linknode') associated with this change, + the parental heritage ('p1' and 'p2'), and the node to which the + revdata updates will apply ('basenode'). 'linknode' is the same as + 'node' when reading the commit log because any commit is already + itself. 'basenode' for a changeset will be NAUGHT_NODE, because + changeset chunks include complete information and not diffs. + See the structure diagram at the top of the file for more insight. + + returns: + dict of the next delta header + """ + header = self.filereader.read(100) + header = { + 'node': header[0:20], + 'p1': header[20:40], + 'p2': header[40:60], + 'basenode': header[60:80], + 'linknode': header[80:100] + } + return header + + def read_revchunk(self): + """Fetch a complete RevChunk. + A RevChunk contains the collection of line changes made in a particular + update. header['node'] identifies which update. Commits, manifests, and + files all have these. Each chunk contains an indicator of the whole + chunk size, an update header, and then the body of the update as a + series of text diff components. + See the structure diagram at the top of the file for more insight. + + returns: + tuple(dict, iterator) of (header, chunk data) if there is another + chunk in the group, else None + """ + size = unpack('>I', self.filereader) - 104 + if size >= 0: + header = self.read_chunk_header() + return (header, self.revdata_iterator(size)) + else: + return None # NullChunk + + def extract_commit_metadata(self, data): + """Converts the binary commit metadata format into a dict. + + args: + data: bytestring of encoded commit information + + returns: + dict of decoded commit information + """ + parts, message = data.split(b'\n\n', 1) + parts = parts.split(b'\n') + commit = {} + commit['message'] = message + commit['manifest'] = unhexlify(parts[0]) + commit['user'] = parts[1] + tstamp, tz, *extra = parts[2].split(b' ') + commit['time'] = datetime.fromtimestamp(float(tstamp)) + commit['time_offset_seconds'] = int(tz) + if extra: + commit['extra'] = extra[0] + commit['changed_files'] = parts[3:] + return commit + + def skip_sections(self, num_sections=1): + """Skip past sections quickly. + + args: + num_sections: int number of sections to skip + """ + for i in range(num_sections): + size = unpack('>I', self.filereader) + while size >= 104: + self.filereader.seek(size - 4, from_current=True) + size = unpack('>I', self.filereader) + + def apply_revdata(self, revdata_it, prev_state): + """Compose the complete text body for a change from component deltas. + + args: + revdata_it: output from the revdata_iterator method + prev_state: bytestring the base complete text on which the new + deltas will be applied + returns: + (bytestring, list, list) the new complete string and lists of added + and removed components (used in manifest processing) + """ + state = [] + added = [] + removed = [] + next_start = 0 + + for delta_start, delta_end, rev_diff_it in revdata_it: + removed.append(prev_state[delta_start:delta_end]) + added.append(b''.join(rev_diff_it)) + + state.append(prev_state[next_start:delta_start]) + state.append(added[-1]) + next_start = delta_end + + state.append(prev_state[next_start:]) + state = b''.join(state) + + return (state, added, removed) + + def skim_headers(self): + """Get all header data from a change group but bypass processing of the + contained delta components. + + yields: + output of read_chunk_header method for all chunks in the group + """ + size = unpack('>I', self.filereader) - 104 + while size >= 0: + header = self.read_chunk_header() + self.filereader.seek(size, from_current=True) + yield header + size = unpack('>I', self.filereader) - 104 + + def group_iterator(self): + """Bundle sections are called groups. These are composed of one or more + revision chunks of delta components. Iterate over all the chunks in a + group and hand each one back. + + yields: + see output of read_revchunk method + """ + revchunk = self.read_revchunk() + while revchunk: # A group is terminated by a NullChunk + yield revchunk # (header, revdata_iterator) + revchunk = self.read_revchunk() + + def yield_group_objects(self, cache_hints=None, group_offset=None): + """Bundles are sectioned into groups: the log of all commits, the log + of all manifest changes, and a series of logs of blob changes (one for + each file). All groups are structured the same way, as a series of + revisions each with a series of delta components. Iterate over the + current group and return the completed object data for the current + update by applying all of the internal delta components to each prior + revision. + + args: + cache_hints: see build_cache_hints (this will be built + automatically if not pre-built and passed in) + group_offset: int file position of the start of the desired group + + yields: + (dict, bytestring, list, list) the output from read_chunk_header + followed by the output from apply_revdata + """ + if group_offset is not None: + self.filereader.seek(group_offset) + + if cache_hints is None: + cache_hints = self.build_cache_hints() + + data_cache = SelectiveCache(self.cache_size, cache_hints) + + # Loop over all revisions in the group + data = b'' + for header, revdata_it in self.group_iterator(): + node = header['node'] + basenode = header['basenode'] + + data = data_cache.fetch(basenode) or b'' + + data, added, removed = self.apply_revdata(revdata_it, data) + + data_cache.store(node, data) + + yield (header, data, added, removed) # each RevChunk + + def extract_meta_from_blob(self, data): + """File revision data sometimes begins with a metadata section of + dubious value. Strip it off and maybe decode it. It seems to be mostly + useless. Why indicate that a file node is a copy of another node? You + can already get that information from the delta header. + + args: + data: bytestring of one revision of a file, possibly with metadata + embedded at the start + + returns: + (bytestring, dict) of (the blob data, the meta information) + """ + meta = {} + if data.startswith(b'\x01\n'): + empty, metainfo, data = data.split(b'\x01\n', 2) + metainfo = b'\x01\n' + metainfo + b'\x01\n' + if metainfo.startswith(b'copy:'): + # direct file copy (?) + copyinfo = metainfo.split(b'\n') + meta['copied_file'] = copyinfo[0][6:] + meta['copied_rev'] = copyinfo[1][9:] + elif metainfo.startswith(b'censored:'): + # censored revision deltas must be full-replacements (?) + meta['censored'] = metainfo + else: + # no idea + meta['text'] = metainfo + return data, meta + + def seek_changelog(self): + """Seek to the beginning of the change logs section. + """ + self.filereader.seek(self.changes_offset) + + def seek_manifests(self): + """Seek to the beginning of the manifests section. + """ + if self.manifests_offset is None: + self.seek_changelog() + self.skip_sections(1) # skip past commits + self.manifests_offset = self.filereader.tell() + else: + self.filereader.seek(self.manifests_offset) + + def seek_filelist(self): + """Seek to the beginning of the file changes section. + """ + if self.blobs_offset is None: + self.seek_manifests() + self.skip_sections(1) # skip past manifests + self.blobs_offset = self.filereader.tell() + else: + self.filereader.seek(self.blobs_offset) + + def yield_all_blobs(self): + """Gets blob data from the bundle. + + yields: + (bytestring, (bytestring, int, dict)) of + (blob data, (file name, start offset of the file within the + bundle, node header)) + """ + self.seek_filelist() + + # Loop through all files that have commits + size = unpack('>I', self.filereader) + while size > 0: + file_name = self.filereader.read(size-4) + file_start_offset = self.filereader.tell() + # get all of the blobs for each file + for header, data, *_ in self.yield_group_objects(): + blob, meta = self.extract_meta_from_blob(data) + yield blob, (file_name, file_start_offset, header) + size = unpack('>I', self.filereader) + + def yield_all_changesets(self): + """Gets commit data from the bundle. + + yields: + (dict, dict) of (read_chunk_header output, + extract_commit_metadata output) + """ + self.seek_changelog() + for header, data, *_ in self.yield_group_objects(): + changeset = self.extract_commit_metadata(data) + yield (header, changeset) + + def yield_all_manifest_deltas(self, cache_hints=None): + """Gets manifest data from the bundle. + In order to process the manifests in a reasonable amount of time, we + want to use only the deltas and not the entire manifest at each change, + because if we're processing them in sequential order (we are) then we + already have the previous state so we only need the changes. + + args: + cache_hints: see build_cache_hints method + + yields: + (dict, dict, dict) of (read_chunk_header output, + extract_manifest_elements output on added/modified files, + extract_manifest_elements on removed files) + """ + self.seek_manifests() + for header, data, added, removed in self.yield_group_objects( + cache_hints=cache_hints + ): + added = self.extract_manifest_elements(added) + removed = self.extract_manifest_elements(removed) + yield (header, added, removed) + + def build_manifest_hints(self): + """Just a minor abstraction shortcut for the build_cache_hints method. + + returns: + see build_cache_hints method + """ + self.seek_manifests() + return self.build_cache_hints() + + def build_cache_hints(self): + """The SelectiveCache class that we use in building nodes can accept a + set of key counters that makes its memory usage much more efficient. + + returns: + dict of key=a node id, value=the number of times we + will need data from that node when building subsequent nodes + """ + cur_pos = self.filereader.tell() + hints = OrderedDict() + prev_node = None + for header in self.skim_headers(): + basenode = header['basenode'] + if (basenode != self.NAUGHT_NODE) and (basenode != prev_node): + hints[basenode] = hints.get(basenode, 0) + 1 + prev_node = header['node'] + self.filereader.seek(cur_pos) + return hints + + def extract_manifest_elements(self, data): + """Parses data that looks like a manifest. In practice we only pass in + the bits extracted from the application of a manifest delta describing + which files were added/modified or which ones were removed. + + args: + data: either a string or a list of strings that, when joined, + embodies the composition of a manifest. This takes the form + of repetitions of (without the brackets): + b'\x00[flag]\n' ...repeat... + where [flag] may or may not be there depending on whether the + file is specially flagged as executable or something + + returns: + dict of key=file_path, value=(file_node, permissions) where + permissions is given according to the flag that optionally exists + in the data + """ + elements = {} + if isinstance(data, str): + data = data.split(b'\n') + else: + data = itertools.chain.from_iterable( + [chunk.split(b'\n') for chunk in data] + ) + + for line in data: + if line != b'': + f = line.split(b'\x00') + + node = f[1] + flag_bytes = node[40:] + + elements[f[0]] = ( + unhexlify(node[:40]), + b'l' in flag_bytes, + b'755' if (b'x' in flag_bytes) else b'644' + ) + + return elements diff --git a/swh/loader/mercurial/bundle_loader.py b/swh/loader/mercurial/bundle_loader.py deleted file mode 100644 --- a/swh/loader/mercurial/bundle_loader.py +++ /dev/null @@ -1,322 +0,0 @@ -# Copyright (C) 2017 The Software Heritage developers -# See the AUTHORS file at the top-level directory of this distribution -# License: GNU General Public License version 3, or any later version -# See top-level LICENSE file for more information - -# https://www.mercurial-scm.org/wiki/BundleFormat says: -# "The new bundle format design is described on the BundleFormat2 page." -# -# https://www.mercurial-scm.org/wiki/BundleFormat2#Format_of_the_Bundle2_Container says: # noqa -# "The latest description of the binary format can be found as comment in the -# Mercurial source code." -# -# https://www.mercurial-scm.org/repo/hg/file/default/mercurial/help/internals/bundles.txt says: # noqa -# "The 'HG20' format is not yet documented here. See the inline comments in -# 'mercurial/exchange.py' for now." -# -# Avi says: -------------------- -# All of the official statements shown above are wrong. -# The bundle20 format seems to be accurately documented nowhere. -# https://www.mercurial-scm.org/wiki/BundleFormatHG19 is predictably invalid -# and absent in a number of details besides. -# https://www.mercurial-scm.org/wiki/BundleFormat2#New_header is wrong and -# bizarre, and there isn't any other information on the page. -# We get something reasonably close in -# https://www.mercurial-scm.org/repo/hg/file/tip/mercurial/help/internals/changegroups.txt # noqa -# which describes much of the internal structure, but it doesn't describe any -# of the file-level structure, including the file header and that the entire -# bundle is broken into overlayed 4KB chunks starting from byte 9, nor does it -# describe what any of the component elements are used for. -# Also it says "The [delta data] format is described more fully in -# 'hg help internals.bdiff'", which is also wrong. As far as I can tell, -# that file has never existed. -# ------------------------------ -# - -import struct -from binascii import hexlify -from datetime import datetime - -from swh.model import identifiers - -""" -mkdir WORKSPACE -cd WORKSPACE -hg init -hg pull -hg bundle -f -a bundle_file -t none-v2 -""" - - -BLOB_MAX_SIZE = 1024*1024*100 -ALGO = 'sha1_git' - - -class ChunkedFileReader(object): - """A file reader that gives seamless read access to files such as the - Mercurial bundle2 HG20 format which are partitioned into chunks of - [4Bytes:, Bytes:]. - - init args: - file: either a filename string or pre-opened binary-read file handle. - """ - def __init__(self, file): - if isinstance(file, str): - self._file = open(file, "rb", buffering=12*1024*1024) - else: - self._file = file - self._chunk_bytes_left = struct.unpack('>I', self._file.read(4))[0] - - def read(self, bytes_to_read): - """Return N bytes from the file as a single block. - """ - return b''.join(self.read_iterator(bytes_to_read)) - - def read_iterator(self, bytes_to_read): - """Return a generator that eventually yields N bytes from the file - one file chunk at a time. - """ - while bytes_to_read > self._chunk_bytes_left: - yield self._file.read(self._chunk_bytes_left) - bytes_to_read -= self._chunk_bytes_left - self._chunk_bytes_left = struct.unpack('>I', self._file.read(4))[0] - self._chunk_bytes_left -= bytes_to_read - yield self._file.read(bytes_to_read) - - -def DEBUG1(*args, **kwargs): - print(*args, **kwargs) - - -def DEBUG2(*args, **kwargs): - pass - # print(*args, **kwargs) - - -def unpack(fmt_str, source): - ret = struct.unpack(fmt_str, source.read(struct.calcsize(fmt_str))) - if len(ret) == 1: - return ret[0] - return ret - - -class Bundle20Reader(object): - """Purpose-built bundle20 file parser for SWH loading. - """ - def __init__(self, bundlefile): - """ - args: - bundlefile: (string) name of the binary repository bundle file - """ - bfile = open(bundlefile, 'rb') - btype = bfile.read(4) # 'HG20' - if btype != b'HG20': - raise Exception(bundlefile, - b'Not an HG20 bundle. First 4 bytes:'+btype) - bfile.read(4) # '\x00\x00\x00\x00' - self.chunkreader = ChunkedFileReader(bfile) - self.commits = [] - self.manifests = [] - self.files = {} - - def read_deltaheader(self): - """Yield the complete header section of a delta chunk. - """ - chunkreader = self.chunkreader - node = hexlify(chunkreader.read(20)) - p1 = hexlify(chunkreader.read(20)) - p2 = hexlify(chunkreader.read(20)) - basenode = hexlify(chunkreader.read(20)) - linknode = hexlify(chunkreader.read(20)) - return {'node': node, 'p1': p1, 'p2': p2, - 'basenode': basenode, 'linknode': linknode} - - def read_deltadata(self, size): - """Yield the complete data section of a delta chunk. - """ - read_bytes = 104 - while read_bytes < size: - start_offset = unpack(">I", self.chunkreader) - end_offset = unpack(">I", self.chunkreader) - data_size = unpack(">I", self.chunkreader) - DEBUG2("DATA SIZE:", data_size) - if data_size > 0: - data_it = self.chunkreader.read_iterator(data_size) - else: - data_it = None - read_bytes += data_size+12 - yield (start_offset, end_offset, data_size, data_it) - - def commit_handler(self, header, data_block_it): - """Handler method for changeset delta components. - """ - data_block = next(data_block_it)[3] - data = b''.join(data_block) - firstpart, message = data.split(b'\n\n', 1) - firstpart = firstpart.split(b'\n') - commit = header - commit['message'] = message - commit['manifest'] = firstpart[0] - commit['user'] = firstpart[1] - tstamp, tz, *extra = firstpart[2].split(b' ') - commit['time'] = datetime.fromtimestamp(float(tstamp)) - commit['time_offset_seconds'] = int(tz) - if extra: - commit['extra'] = extra - commit['changed_files'] = firstpart[3:] - self.commits.append((header['node'], commit)) - - def manifest_handler(self, header, data_block_it): - """Handler method for manifest delta components. - """ - commit = header - commit['manifest'] = [] - for data_block in data_block_it: - data_it = data_block[3] - if data_it is not None: - data = b''.join(data_it)[:-1] - commit['manifest'] += [tuple(file.split(b'\x00')) - for file in data.split(b'\n')] - self.manifests.append((header['node'], commit)) - - def filedelta_handler(self, header, commits_iterator): - """Handler method for filelog delta components. - """ - commit = {} - self.num_commits += 1 - blob = None - - # pieces of a commit - for commit_stuff in commits_iterator: - delta_start = commit_stuff[0] - delta_end = commit_stuff[1] - delta_data_it = commit_stuff[3] - - blob = bytearray(self.last_blob[0:delta_start]) - - # gather all the data for the current blob - if delta_data_it is not None: - blob.extend(b''.join(delta_data_it)) -# blob_size = delta_start -# for more_data in delta_data_it: -# part_size = len(more_data) -# blob_size += part_size -# if blob_size <= BLOB_MAX_SIZE: -# blob.extend(more_data) -# blob_size += len(self.last_blob)-delta_end -# if blob_size <= BLOB_MAX_SIZE: - blob.extend(self.last_blob[delta_end:-1]) - - # remove meta garbage - if (delta_start == 0) and blob.startswith(b'\x01\n'): - empty, metainfo, blob = blob.split(b'\x01\n', 2) -# We may not actually care about this meta stuff, but here it is anyway -# if metainfo.startswith(b'copy:'): # direct file copy -# copyinfo = metainfo.split(b'\n') -# commit['copied_file'] = copyinfo[0][6:] -# commit['copied_filerev'] = copyinfo[1][9:] -# elif metainfo.startswith(b'censored:'): -# # censored revision deltas must be full-replacements -# commit['censored'] = metainfo -# else: -# commit['meta'] = metainfo - - self.last_blob = blob - - # commit['blob'] = blob - commit.update( - identifiers.content_identifier({'data': blob or bytearray()}) - ) - self.current_file.append((header['node'], commit)) - - def loop_deltagroups(self, section_handler): - """Bundle sections are composed of one or more groups of deltas. - Iterate over them and hand each one to the current section-specific - handler method. - """ - size = unpack(">I", self.chunkreader) - while size > 0: - DEBUG2("SIZE ", size) - section_handler( - self.read_deltaheader(), - (size > 104) and self.read_deltadata(size) or [] - ) - size = unpack(">I", self.chunkreader) - - def process_changesets(self): - """Parsing stage for the changeset section, containing metadata about - each commit. - """ - DEBUG1("\nREADING COMMITS\n") - self.loop_deltagroups(self.commit_handler) - - def process_manifest(self): - """Parsing stage for the manifest section, containing manifest deltas - for each changeset. - """ - DEBUG1("\nREADING MANIFEST\n") - self.loop_deltagroups(self.manifest_handler) - - def process_filelog(self): - """Parsing stage for the filelog section, containing data deltas for - each change to each file. - """ - DEBUG1("\nREADING DELTAS\n") - name_size = unpack(">I", self.chunkreader) - while name_size > 0: - name = b''.join(self.chunkreader.read_iterator(name_size-4)) - DEBUG1("\nFILE", name, "\n") - self.cur_file_name = name - self.last_blob = bytearray() - self.current_file = [] - self.num_commits = 0 - self.loop_deltagroups(self.filedelta_handler) - print("NUMCOMMITS: ", self.num_commits) - # import code - # code.interact(local=dict(globals(), **locals())) - name_size = unpack(">I", self.chunkreader) - - def process_bundle_header(self): - """Parsing stage for the file header which describes format and - parameters. - """ - chunkreader = self.chunkreader - DEBUG1("\nREADING BUNDLE HEADER\n") - chg_len = unpack('>B', chunkreader) # len('CHANGEGROUP') == 11 - chunkreader.read(chg_len) # 'CHANGEGROUP' - unpack('>I', chunkreader) # probably \x00\x00\x00\x00 - n_mandatory, n_advisory = unpack('>BB', chunkreader) # parameters - mandatory_params = [ - (key_len, val_len) - for key_len, val_len - in [unpack('>BB', chunkreader) for i in range(n_mandatory)] - ] - advisory_params = [ - (key_len, val_len) - for key_len, val_len - in [unpack('>BB', chunkreader) for i in range(n_advisory)] - ] - params = {} - - for key_len, val_len in mandatory_params+advisory_params: - key = unpack('>%ds' % key_len, chunkreader) - val = int(unpack('>%ds' % val_len, chunkreader)) - params[key] = val - DEBUG1(params) - - def read_bundle(self): - """Initiate loading of the bundle. - """ - self.process_bundle_header() - self.process_changesets() - self.commits = None - self.process_manifest() - self.manifests = None - self.process_filelog() - - -if __name__ == "__main__": - import sys - if len(sys.argv) > 1: - br = Bundle20Reader(sys.argv[1]) - br.read_bundle() diff --git a/swh/loader/mercurial/chunked_reader.py b/swh/loader/mercurial/chunked_reader.py --- a/swh/loader/mercurial/chunked_reader.py +++ b/swh/loader/mercurial/chunked_reader.py @@ -6,12 +6,13 @@ class ChunkedFileReader(object): - """A binary file reader that gives seamless read access to files such as - the Mercurial bundle2 HG20 format which are partitioned for no great reason - into chunks of [4Bytes:, Bytes:]. + """A binary stream reader that gives seamless read access to Mercurial's + bundle2 HG20 format which is partitioned for some reason at the file level + into chunks of [4Bytes:, Bytes:] as if it were + encoding transport packets. - init args: - file: rb file handle pre-seeked to the start of the chunked portion + args: + file: rb file handle pre-aligned to the start of the chunked portion size_unpack_fmt: struct format string for unpacking the next chunk size """ def __init__(self, file, size_unpack_fmt='>I'): @@ -27,18 +28,11 @@ self._file.seek(self._offset, 0) # seek back to original position def _chunk_size(self, first_time=False): - """Unpack the next bytes from the file - to get the next chunk size. + """Unpack the next bytes from the + file to get the next file chunk size. """ size = struct.unpack(self._size_pattern, self._file.read(self._size_bytes))[0] - - # TODO: remove this assert after verifying a few - if (not first_time) and (size != self._bytes_per_chunk) \ - and self._file.tell() < (self._size - self._bytes_per_chunk): - raise Exception("inconsistent chunk %db at offset %d" - % (size, self._file.tell())) - return size def size(self): @@ -48,12 +42,18 @@ def read(self, bytes_to_read): """Return N bytes from the file as a single block. + + args: + bytes_to_read: int number of bytes of content """ return b''.join(self.read_iterator(bytes_to_read)) def read_iterator(self, bytes_to_read): - """Return a generator that eventually yields N bytes from the file - one file chunk at a time. + """Return a generator that yields N bytes from the file one file chunk + at a time. + + args: + bytes_to_read: int number of bytes of content """ while bytes_to_read > self._chunk_bytes_left: yield self._file.read(self._chunk_bytes_left) @@ -62,23 +62,43 @@ self._chunk_bytes_left -= bytes_to_read yield self._file.read(bytes_to_read) - def seek(self, seek_pos=None): + def seek(self, new_pos=None, from_current=False): """Wraps the underlying file seek, additionally updating the - _chunk_bytes_left counter appropriately so that we can start reading + chunk_bytes_left counter appropriately so that we can start reading from the new location. - WARNING: Expects all chunks to be the same size. + args: + + new_pos: new cursor byte position + from_current: if True, it treats new_pos as an offset from the + current cursor position, bypassing any chunk boundaries as if + they weren't there. This should give the same end position as a + read except without the reading data part. """ - seek_pos = seek_pos or self._offset # None -> start position - assert seek_pos >= self._offset, \ - "Seek position %d is before starting offset %d" % (seek_pos, - self._offset) + if from_current: + new_pos = new_pos or 0 # None -> current + if new_pos <= self._chunk_bytes_left: + new_pos += self._file.tell() + else: + new_pos += ( + self._file.tell() + + self._size_bytes + + ( + ( + (new_pos - self._chunk_bytes_left - 1) # aliasing + // self._bytes_per_chunk + ) + * self._size_bytes + ) + ) + else: + new_pos = new_pos or self._offset # None -> start position self._chunk_bytes_left = self._bytes_per_chunk - ( - (seek_pos - self._offset) + (new_pos - self._offset) % (self._bytes_per_chunk + self._size_bytes) ) - self._file.seek(seek_pos) + self._file.seek(new_pos) def __getattr__(self, item): """Forward other calls to the underlying file object. diff --git a/swh/loader/mercurial/converters.py b/swh/loader/mercurial/converters.py --- a/swh/loader/mercurial/converters.py +++ b/swh/loader/mercurial/converters.py @@ -4,55 +4,60 @@ # See top-level LICENSE file for more information -from swh.model import hashutil, identifiers - - -def data_size_too_big(data, id_hash, max_content_size, logger=None, - origin_id=None): - if logger: - size = len(data) - id_hash = hashutil.hash_to_hex(id_hash) - logger.info('Skipping content %s, too large (%s > %s)' % - (id_hash, size, max_content_size), - extra={ - 'swh_type': 'loader_content_skip', - 'swh_id': id_hash, - 'swh_size': size - }) - return { - 'status': 'absent', - 'reason': 'Content too large', - 'origin': origin_id - } +from swh.model import hashutil + + +# TODO: What should this be set to? +# swh-model/identifiers.py:identifier_to_bytes has a restrictive length check +# in it which prevents using blake2 with hashutil.hash_to_hex +PRIMARY_ALGO = 'sha1_git' -def data_to_content_id(data): +def blob_to_content_dict(data, existing_hashes=None, max_size=None, + logger=None): + """Convert blob data to a SWH Content. If the blob already + has hashes computed, don't recompute them. + TODO: This should be unified with similar functions in other places. + + args: + existing_hashes: dict of hash algorithm:value pairs + max_size: size over which blobs should be rejected + logger: logging class instance + returns: + A Software Heritage "content". + """ + existing_hashes = existing_hashes or {} + size = len(data) - ret = { + content = { 'length': size, } - ret.update(identifiers.content_identifier({'data': data})) - return ret - - -def blob_to_content_dict(data, ident, max_content_size=None, logger=None, - origin_id=None): - """Convert blob data to a Software Heritage content""" - ret = data_to_content_id(data) - if max_content_size and (len(data) > max_content_size): - ret.update( - data_size_too_big(data, ident, max_content_size, logger=logger, - origin_id=origin_id) - ) + content.update(existing_hashes) + + hash_types = list(existing_hashes.keys()) + hashes_to_do = hashutil.DEFAULT_ALGORITHMS.difference(hash_types) + content.update(hashutil.hash_data(data, algorithms=hashes_to_do)) + + if max_size and (size > max_size): + content.update({ + 'status': 'absent', + 'reason': 'Content too large', + }) + if logger: + id_hash = hashutil.hash_to_hex(content[PRIMARY_ALGO]) + logger.info( + 'Skipping content %s, too large (%s > %s)' + % (id_hash, size, max_size), + extra={ + 'swh_type': 'loader_content_skip', + 'swh_id': id_hash, + 'swh_size': size + } + ) else: - ret.update( - { - 'data': data, - 'status': 'visible' - } - ) - - return ret + content.update({'data': data, 'status': 'visible'}) + + return content def parse_author(name_email): diff --git a/swh/loader/mercurial/objects.py b/swh/loader/mercurial/objects.py new file mode 100644 --- /dev/null +++ b/swh/loader/mercurial/objects.py @@ -0,0 +1,398 @@ +# Copyright (C) 2017 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +"""This document contains various helper classes used in converting Mercurial +bundle files into SWH Contents, Directories, etc. +""" + +import binascii +import copy +import os +import sys +from collections import OrderedDict + +from sqlitedict import SqliteDict + +from swh.model import identifiers + +OS_PATH_SEP = os.path.sep.encode('utf-8') + + +class SimpleBlob: + """Stores basic metadata of a blob object.when constructing deep trees from + commit file manifests. + + args: + file_hash: unique hash of the file contents + is_symlink: (bool) is this file a symlink? + file_perms: (string) 3 digit permission code as a string or bytestring, + e.g. '755' or b'755' + """ + + kind = 'file' + + def __init__(self, file_hash, is_symlink, file_perms): + self.hash = file_hash + self.perms = 0o100000 + int(file_perms, 8) + if is_symlink: + self.perms += 0o020000 + + def __str__(self): + return ('SimpleBlob: ' + str(self.hash) + ' -- ' + str(self.perms)) + + def __eq__(self, other): + return ((self.perms == other.perms) and (self.hash == other.hash)) + + def size(self): + """Return the size in byte.""" + return sys.getsizeof(self) + sys.getsizeof(self.__dict__) + + +class SimpleTree(dict): + """ Stores data for a nested directory object. Uses shallow cloning to stay + compact after forking and change monitoring for efficient re-hashing. + """ + + kind = 'dir' + perms = 0o040000 + + def __init__(self): + self.hash = None + self._size = None + + def __eq__(self, other): + return ((self.hash == other.hash) and (self.items() == other.items())) + + def _new_tree_node(self, path): + """Deeply nests SimpleTrees according to a given subdirectory path and + returns a reference to the deepest one. + + args: + path: bytestring containing a relative path from self to a deep + subdirectory. e.g. b'foodir/bardir/bazdir' + + returns: + the new node + """ + node = self + for d in path.split(OS_PATH_SEP): + if node.get(d): + if node[d].hash is not None: + node[d] = copy.copy(node[d]) + node[d].hash = None + node[d]._size = None + else: + node[d] = SimpleTree() + node = node[d] + return node + + def remove_tree_node_for_path(self, path): + """Deletes a SimpleBlob or SimpleTree from inside nested SimpleTrees + according to the given relative file path, and then recursively removes + any newly depopulated SimpleTrees. It keeps the old history by doing a + shallow clone before any change. + + args: + path: bytestring containing a relative path from self to a nested + file or directory. e.g. b'foodir/bardir/bazdir/quxfile.txt' + + returns: + the new root node + """ + node = self + if node.hash is not None: + node = copy.copy(node) + node.hash = None + node._size = None + first, sep, rest = path.partition(OS_PATH_SEP) + if rest: + node[first] = node[first].remove_tree_node_for_path(rest) + if len(node[first]) == 0: + del node[first] + else: + del node[first] + + return node + + def add_blob(self, file_path, file_hash, is_symlink, file_perms): + """Shallow clones the root node and then deeply nests a SimpleBlob + inside nested SimpleTrees according to the given file path, shallow + cloning all all intermediate nodes and marking them as changed and + in need of new hashes. + + args: + file_path: bytestring containing the relative path from self to a + nested file + file_hash: primary identifying hash computed from the blob contents + is_symlink: True/False whether this item is a symbolic link + file_perms: int or string representation of file permissions + + returns: + the new root node + """ + root = self + if root.hash is not None: + root = copy.copy(root) + root.hash = None + root._size = None + node = root + fdir, fbase = os.path.split(file_path) + if fdir: + node = root._new_tree_node(fdir) + node[fbase] = SimpleBlob(file_hash, is_symlink, file_perms) + return root + + def yield_SWH_directories(self): + """Converts nested SimpleTrees into a stream of SWH Directories. + + yields: + an SWH Directory for every node in the tree + """ + for k, v in sorted(self.items()): + if isinstance(v, SimpleTree): + yield from v.yield_SWH_directories() + + yield { + 'id': self.hash, + 'entries': [ + { + 'name': k, + 'perms': v.perms, + 'type': v.kind, + 'target': v.hash + } + for k, v in sorted(self.items()) + ] + } + + def hash_changed(self, new_dirs=None): + """Computes and sets primary indentifier hashes for unhashed subtrees. + + args: + new_dirs (optional): an empty list to be populated with the SWH + Directories for all of the new (not previously + hashed) nodes + + returns: + the top level hash of the whole tree + """ + if self.hash is None: + directory = { + 'entries': [ + { + 'name': k, + 'perms': v.perms, + 'type': v.kind, + 'target': (v.hash if v.hash is not None + else v.hash_changed(new_dirs)) + } + for k, v in sorted(self.items()) + ] + } + + self.hash = binascii.unhexlify( + identifiers.directory_identifier(directory) + ) + directory['id'] = self.hash + if new_dirs is not None: + new_dirs.append(directory) + + return self.hash + + def flatten(self, _curpath=None, _files=None): + """Converts nested sub-SimpleTrees and SimpleBlobs into a list of + file paths. Useful for counting the number of files in a manifest. + + returns: + a flat list of all of the contained file paths + """ + _curpath = _curpath or b'' + _files = _files or {} + for k, v in sorted(self.items()): + p = os.path.join(_curpath, k) + if isinstance(v, SimpleBlob): + _files[p] = (v.hash, v.perms) + else: + v.flatten(p, _files) + return _files + + def size(self): + """Return the (approximate?) memory utilization in bytes of the nested + structure. + """ + if self._size is None: + self._size = ( + sys.getsizeof(self) + sys.getsizeof(self.__dict__) + + sum([ + sys.getsizeof(k)+v.size() + for k, v in self.items() + ]) + ) + return self._size + + +class SelectiveCache(OrderedDict): + """Special cache for storing past data upon which new data is known to be + dependent. Optional hinting of how many instances of which keys will be + needed down the line makes utilization more efficient. And, because the + distance between related data can be arbitrarily long and the data + fragments can be arbitrarily large, a disk-based secondary storage is used + if the primary RAM-based storage area is filled to the designated capacity. + + Storage is occupied in three phases: + 1) The most recent key/value pair is always held, regardless of other + factors, until the next entry replaces it. + 2) Stored key/value pairs are pushed into a randomly accessible + expanding buffer in memory with a stored size function, maximum size + value, and special hinting about which keys to store for how long + optionally declared at instantiation. + 3) The in-memory buffer pickles into a randomly accessible disk-backed + secondary buffer when it becomes full. + + Occupied space is calculated by default as whatever the len() function + returns on the values being stored. This can be changed by passing in a new + size_function at instantiation. + + The cache_hints parameter is a dict of key/int pairs recording how many + subsequent fetches that particular key's value should stay in storage for + before being erased. If you provide a set of hints and then try to store a + key that is not in that set of hints, the cache will store it only while it + is the most recent entry, and will bypass storage phases 2 and 3. + """ + DEFAULT_SIZE = 800*1024*1024 # bytes or whatever + + def __init__(self, max_size=None, cache_hints=None, + size_function=None): + """args: + max_size: integer value indicating the maximum size of the part + of storage held in memory + cache_hints: dict of key/int pairs as described in the class + description + size_function: callback function that accepts one parameter and + returns one int, which should probably be the + calculated size of the parameter + """ + self._max_size = max_size or SelectiveCache.DEFAULT_SIZE + self._disk = None + if size_function is None: + self._size_function = sys.getsizeof + else: + self._size_function = size_function + self._latest = None + self._cache_size = 0 + self._cache_hints = copy.copy(cache_hints) or None + + def store(self, key, data): + """Primary method for putting data into the cache. + + args: + key: any hashable value + data: any python object (preferably one that is measurable) + + """ + self._latest = (key, data) + + if (self._cache_hints is not None) and (key not in self._cache_hints): + return + + # cache the completed data... + self._cache_size += self._size_function(data) + 53 + + # ...but limit memory expenditure for the cache by offloading to disk + should_commit = False + while ( + self._cache_size > self._max_size + and len(self) > 0 + ): + should_commit = True + k, v = self.popitem(last=False) + self._cache_size -= self._size_function(v) - 53 + self._diskstore(k, v) + + if should_commit: + self._disk.commit(blocking=False) + + self[key] = data + + def _diskstore(self, key, value): + if self._disk is None: + self._disk = SqliteDict(autocommit=False, journal_mode='OFF') + self._disk[key] = value + + def has(self, key): + """Tests whether the data for the provided key is being stored. + + args: + key: the key of the data whose storage membership property you wish + to discover + + returns: + True or False + """ + return ( + (self._latest and self._latest[0] == key) + or (key in self) + or (self._disk and (key in self._disk)) + ) + + def fetch(self, key): + """Pulls a value out of storage and decrements the hint counter for the + given key. + + args: + key: the key of the data that you want to retrieve + + returns: + the retrieved value or None + """ + retval = None + if self._latest and self._latest[0] == key: + retval = self._latest[1] + if retval is None: + retval = self.get(key) + if (retval is None) and self._disk: + self._disk.commit(blocking=False) + retval = self._disk.get(key) or None + self.dereference(key) + return retval + + def dereference(self, key): + """Remove one instance of expected future retrieval of the data for the + given key. This is called automatically by fetch requests that aren't + satisfied by phase 1 of storage. + + args: + the key of the data for which the future retrievals hint is to be + decremented + """ + newref = self._cache_hints and self._cache_hints.get(key) + if newref: + newref -= 1 + if newref == 0: + del self._cache_hints[key] + if key in self: + item = self[key] + self._cache_size -= self._size_function(item) + del self[key] + else: + if self._disk: + del self._disk[key] + else: + self._cache_hints[key] = newref + + def keys(self): + yield from self.keys() + if self._disk: + yield from self._disk.keys() + + def values(self): + yield from self.values() + if self._disk: + yield from self._disk.values() + + def items(self): + yield from self.items() + if self._disk: + yield from self._disk.items()