diff --git a/README.md b/README.md --- a/README.md +++ b/README.md @@ -6,13 +6,10 @@ before. The main entry points are: -- :class:`swh.loader.mercurial.loader.HgBundle20Loader` which reads and loads a remote - repository (or local). +- :class:`swh.loader.mercurial.loader.HgLoader` which reads and loads a repository + (local or remote) into an SWH archive. -- :class:`swh.loader.mercurial.from_disk.HgLoaderFromDisk` which reads and loads a local - repository into an SWH archive. - -- :class:`swh.loader.mercurial.from_disk.HgArchiveLoaderFromDisk` which reads and loads +- :class:`swh.loader.mercurial.loader.HgArchiveLoader` which reads and loads a local repository wrapped within a tarball # CLI run @@ -32,4 +29,3 @@ ``` bash swh loader --C /tmp/mercurial.yml run mercurial https://www.mercurial-scm.org/repo/hello ``` -or `mercurial_from_disk` diff --git a/conftest.py b/conftest.py --- a/conftest.py +++ b/conftest.py @@ -1,4 +1,4 @@ -# Copyright (C) 2020 The Software Heritage developers +# Copyright (C) 2020-2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information @@ -16,5 +16,4 @@ def swh_scheduler_celery_includes(swh_scheduler_celery_includes): return swh_scheduler_celery_includes + [ "swh.loader.mercurial.tasks", - "swh.loader.mercurial.tasks_from_disk", ] diff --git a/setup.py b/setup.py --- a/setup.py +++ b/setup.py @@ -1,5 +1,5 @@ #!/usr/bin/env python3 -# Copyright (C) 2015-2020 The Software Heritage developers +# Copyright (C) 2015-2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information @@ -54,7 +54,6 @@ entry_points=""" [swh.workers] loader.mercurial=swh.loader.mercurial:register - loader.mercurial_from_disk=swh.loader.mercurial:register_from_disk [console_scripts] swh-hg-identify=swh.loader.mercurial.identify:main """, diff --git a/swh/loader/mercurial/__init__.py b/swh/loader/mercurial/__init__.py --- a/swh/loader/mercurial/__init__.py +++ b/swh/loader/mercurial/__init__.py @@ -1,4 +1,4 @@ -# Copyright (C) 2019 The Software Heritage developers +# Copyright (C) 2019-2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information @@ -9,19 +9,9 @@ def register() -> Mapping[str, Any]: """Register the current worker module's definition""" - from .loader import HgBundle20Loader + from .loader import HgLoader return { "task_modules": [f"{__name__}.tasks"], - "loader": HgBundle20Loader, - } - - -def register_from_disk() -> Mapping[str, Any]: - """Register the current worker module's definition""" - from .from_disk import HgLoaderFromDisk - - return { - "task_modules": [f"{__name__}.tasks_from_disk"], - "loader": HgLoaderFromDisk, + "loader": HgLoader, } diff --git a/swh/loader/mercurial/bundle20_reader.py b/swh/loader/mercurial/bundle20_reader.py deleted file mode 100644 --- a/swh/loader/mercurial/bundle20_reader.py +++ /dev/null @@ -1,621 +0,0 @@ -# Copyright (C) 2017 The Software Heritage developers -# See the AUTHORS file at the top-level directory of this distribution -# License: GNU General Public License version 3, or any later version -# See top-level LICENSE file for more information - -"""This document contains code for extracting all of the data from -Mercurial version 2 bundle file. It is referenced by -bundle20_loader.py - -""" - -# ============================================================================= -# ============================================================================= -# BACKGROUND -# ============================================================================= -# ============================================================================= -# -# https://www.mercurial-scm.org/wiki/BundleFormat says: -# "The new bundle format design is described on the BundleFormat2 page." -# -# https://www.mercurial-scm.org/wiki/BundleFormat2#Format_of_the_Bundle2_Container says: # noqa -# "The latest description of the binary format can be found as comment in the -# Mercurial source code." -# -# https://www.mercurial-scm.org/repo/hg/file/default/mercurial/help/internals/bundles.txt says: # noqa -# "The 'HG20' format is not yet documented here. See the inline comments in -# 'mercurial/exchange.py' for now." -# -# ----------------------------------------------------------------------------- -# Avi says: -# ----------------------------------------------------------------------------- -# -# All of the above official(?) statements seem to be quite wrong. -# -# The mercurial-scm wiki is a cluster#@*& of missing pages, bad links, wrong -# information, obsolete information, undecipherable names, and half-started -# leavings that only sort of look like content. I don't understand who or what -# it's there for. I think that means it's not there for me? -# -# https://www.mercurial-scm.org/wiki/BundleFormat2#New_header is wrong and -# bizarre, and there isn't any other information on the page. -# -# -# https://www.mercurial-scm.org/repo/hg/file/de86a6872d06/mercurial/help/internals/changegroups.txt # noqa -# (`hg help internals.changegroups`) is very close to what we need. -# It is accurate, current, and thorough. -# It describes much of the internal structure, which is super helpful if you -# know in advance which info can be trusted, but it doesn't describe any of the -# file-level details, including the file headers and that the entire bundle -# is broken into overlaid 4KB chunks starting from just after the bundle -# header, nor does it describe what any of the component elements are used for, -# nor does it explain the meta-message segment in the blob deltas, nor does it -# explain the file flags occasionally appended to manifest file hashes. Also it -# says: "The [delta data] format is described more fully in 'hg help -# internals.bdiff'", which is also wrong. As far as I can tell, that -# file has never existed. -# -# It does however have one potentially extremely useful note buried in the -# middle that, in hindsight, could have significant implications for complexity -# and performance in future Mercurial loading work. -# -# It says: "In version 1, the delta is always applied against the previous node -# from the changegroup or the first parent if this is the first entry in the -# changegroup." -# -# If the next version of HG support for SWH can reliably get version 1 data, -# then it could be implemented entirely without worrying about ballooning -# memory utilization, which would shrink the code significantly and probably be -# faster too. So maybe HG10 bundles instead of HG20 bundles are superior for -# this task? But then I read that servers can optionally disable serving -# version 1 content, and I like to think that this loader could eventually -# be applied directly to a network stream without an intermediate phase for -# cloning and local bundling, so...It seemed like a good idea at the time? -# -# ----------------------------------------------------------------------------- -# Other notes and thoughts: -# ----------------------------------------------------------------------------- -# 1) -# This is a relatively minor detail, but -# Mercurial nodes are not content-addressable like Git's are. -# -# https://www.mercurial-scm.org/wiki/Nodeid explains: "If you modify a file, -# commit the change, and then modify it to restore the original contents, the -# contents are the same but the history is different, so the file will get a -# new nodeid. This history-sensitivity is obtained by calculating the nodeid -# from the concatenation of the parent nodeids with the file's contents..." -# -# The result is that we always have to collect and hash everything at least -# once in order to know if we've seen something like it before, because nothing -# tells us that the node we're looking at is unique. We can use node ids for -# linking disparate elements together (e.g. commit to manifest) but not for -# determining whether two elements in the same group are identical in all but -# descendency. So there's no way to save time on duplicate hashing. Well... -# there is the copied_file blob metadata, but, lol. -# -# 2) -# Most of the code complexity is due to dealing with 'version 2' changegroups, -# for which we need to keep track of the entire history of all updates made -# to a given file or working directory tree structure, because a revision -# delta could be applied over any of the prior revisions all the way back to -# rev 0, according to whenever files were branched/merged/uncommitted/etc. For -# very large repositories with a lot of churn, this can quickly expand to -# require multiple gigabytes of space, possibly exceeding RAM availability if -# one desires to keep as much data resident in memory as possible to boost -# performance. mozilla-unified, for instance, produces some 2 million+ blobs -# (1.6 million+ unique). Nested umpteen subdirectory levels deep, those blobs -# balloon into a quantity of directory subtrees that rapidly exceeds an 8GB RAM -# laptop's ability to keep them all active without a good amount of care and -# pruning. The code here tries to strike a balance between memory utilization -# and performance. -# -# This problem is also referenced in the last paragraph of the previous -# section, where potentially this problem wouldn't exist for 'version 1' data -# if we can reliably get it. Can we? Either that or not use bundles at all, -# which has other costs. -# -# 3) -# If the list of changed files stored by the changesets had indicated which -# of those changed files were added or modified and which ones were removed, -# this code could be much faster. Right now we have to perform a huge number of -# substring replacements (see the apply_revdata method) to produce a complete -# file manifest for each commit (as a string!!!) in order to know how to get -# the set of removed files from the next delta. We can intuit from every -# manifest delta which files were modified or added, but I believe there's no -# way to intuit which files were removed without actually having the complete -# prior state and without the list of removals being explicitly given. If you -# have an explicit list of all the files that were removed for any given commit -# changegroup, and you combine that with the delta updates in the manifest -# changegroups which detail the set of files that have been added or modified, -# then you wouldn't even have to apply any of the string deltas to get a -# complete understanding of the set of differences between one manifest and the -# next. Not having this effective speed boost is rather unfortunate; it would -# require only one extra stored byte per commit to differentiate removals and -# would make extracting bundles lightning fast. -# ============================================================================ -## - -from binascii import unhexlify -from collections import OrderedDict -from datetime import datetime -import itertools -import struct - -from .chunked_reader import ChunkedFileReader -from .objects import SelectiveCache - - -def unpack(fmt_str, source): - """Utility function for fetching the right number of bytes from a stream to - satisfy a struct.unpack pattern. - - args: - fmt_str: a struct.unpack string pattern - (e.g. '>I' for 4 bytes big-endian) - source: any IO object that has a read() method which - returns an appropriate sequence of bytes - """ - ret = struct.unpack(fmt_str, source.read(struct.calcsize(fmt_str))) - if len(ret) == 1: - return ret[0] - return ret - - -class Bundle20Reader(object): - """Parser for extracting data from Mercurial Bundle20 files. - NOTE: Currently only works on uncompressed HG20 bundles, but checking for - COMPRESSION=<2chars> and loading the appropriate stream decompressor - at that point would be trivial to add if necessary. - - args: - bundlefile (str): name of the binary repository bundle file - cache_filename (str): path to the disk cache used (transited - to the SelectiveCache instance) - cache_size (int): tuning parameter for the upper RAM limit used by - historical data caches. The default is defined in the - SelectiveCache class. - - """ - - NAUGHT_NODE = b"\x00" * 20 - - def __init__(self, bundlefile, cache_filename, cache_size=None): - self.bundlefile = bundlefile - self.cache_filename = cache_filename - bfile = open(bundlefile, "rb", buffering=200 * 1024 * 1024) - - btype = bfile.read(4) # 'HG20' - if btype != b"HG20": - raise Exception(bundlefile, b"Not an HG20 bundle. First 4 bytes:" + btype) - bfile.read(4) # '\x00\x00\x00\x00' - - self.params = self.read_bundle_header(bfile) - # print('PARAMETERS', self.params) - self.num_commits = self.params[b"nbchanges"] - - self.filereader = ChunkedFileReader(bfile) - - self.cache_size = cache_size - self.blobs_offset = None - self.changes_offset = self.filereader.tell() - self.changes_next_offset = None - self.manifests_offset = None - self.manifests_next_offset = None - self.id_to_info = {} - - def read_bundle_header(self, bfile): - """Parse the file header which describes the format and parameters. - See the structure diagram at the top of the file for more insight. - - args: - bfile: bundle file handle with the cursor at the start offset of - the content header (the 9th byte in the file) - - returns: - dict of decoded bundle parameters - """ - unpack(">I", bfile) # header length - chg_len = unpack(">B", bfile) # len('CHANGEGROUP') == 11 - bfile.read(chg_len) # should say 'CHANGEGROUP' - unpack(">I", bfile) # probably \x00\x00\x00\x00 - - n_mandatory, n_advisory = unpack(">BB", bfile) # parameter counts - mandatory_params = [ - (key_len, val_len) - for key_len, val_len in [unpack(">BB", bfile) for i in range(n_mandatory)] - ] - advisory_params = [ - (key_len, val_len) - for key_len, val_len in [unpack(">BB", bfile) for i in range(n_advisory)] - ] - params = {} - - for key_len, val_len in mandatory_params + advisory_params: - key = unpack(">%ds" % key_len, bfile) - val = int(unpack(">%ds" % val_len, bfile)) - params[key] = val - - return params - - def revdata_iterator(self, bytes_to_read): - """A chunk's revdata section is a series of start/end/length/data_delta - content updates called RevDiffs that indicate components of a text diff - applied to the node's basenode. The sum length of all diffs is the - length indicated at the beginning of the chunk at the start of the - header. - See the structure diagram at the top of the file for more insight. - - args: - bytes_to_read: int total number of bytes in the chunk's revdata - yields: - (int, int, read iterator) representing a single text diff component - """ - while bytes_to_read > 0: - start_offset = unpack(">I", self.filereader) - end_offset = unpack(">I", self.filereader) - blocklen = unpack(">I", self.filereader) - delta_it = self.filereader.read_iterator(blocklen) - bytes_to_read -= 12 + blocklen - yield (start_offset, end_offset, delta_it) # RevDiff - - def read_chunk_header(self): - """The header of a RevChunk describes the id ('node') for the current - change, the commit id ('linknode') associated with this change, - the parental heritage ('p1' and 'p2'), and the node to which the - revdata updates will apply ('basenode'). 'linknode' is the same as - 'node' when reading the commit log because any commit is already - itself. 'basenode' for a changeset will be NAUGHT_NODE, because - changeset chunks include complete information and not diffs. - See the structure diagram at the top of the file for more insight. - - returns: - dict of the next delta header - """ - header = self.filereader.read(100) - header = { - "node": header[0:20], - "p1": header[20:40], - "p2": header[40:60], - "basenode": header[60:80], - "linknode": header[80:100], - } - return header - - def read_revchunk(self): - """Fetch a complete RevChunk. - A RevChunk contains the collection of line changes made in a particular - update. header['node'] identifies which update. Commits, manifests, and - files all have these. Each chunk contains an indicator of the whole - chunk size, an update header, and then the body of the update as a - series of text diff components. - See the structure diagram at the top of the file for more insight. - - returns: - tuple(dict, iterator) of (header, chunk data) if there is another - chunk in the group, else None - """ - size = unpack(">I", self.filereader) - 104 - if size >= 0: - header = self.read_chunk_header() - return (header, self.revdata_iterator(size)) - else: - return None # NullChunk - - def extract_commit_metadata(self, data): - """Converts the binary commit metadata format into a dict. - - args: - data: bytestring of encoded commit information - - returns: - dict of decoded commit information - """ - parts, message = data.split(b"\n\n", 1) - parts = parts.split(b"\n") - commit = {} - commit["message"] = message - commit["manifest"] = unhexlify(parts[0]) - commit["user"] = parts[1] - tstamp, tz, *extra = parts[2].split(b" ") - commit["time"] = datetime.fromtimestamp(float(tstamp)) - commit["time_offset_seconds"] = int(tz) - if extra: - commit["extra"] = b" ".join(extra) - commit["changed_files"] = parts[3:] - return commit - - def skip_sections(self, num_sections=1): - """Skip past sections quickly. - - args: - num_sections: int number of sections to skip - """ - for i in range(num_sections): - size = unpack(">I", self.filereader) - while size >= 104: - self.filereader.seek(size - 4, from_current=True) - size = unpack(">I", self.filereader) - - def apply_revdata(self, revdata_it, prev_state): - """Compose the complete text body for a change from component deltas. - - args: - revdata_it: output from the revdata_iterator method - prev_state: bytestring the base complete text on which the new - deltas will be applied - returns: - (bytestring, list, list) the new complete string and lists of added - and removed components (used in manifest processing) - """ - state = [] - added = [] - removed = [] - next_start = 0 - - for delta_start, delta_end, rev_diff_it in revdata_it: - removed.append(prev_state[delta_start:delta_end]) - added.append(b"".join(rev_diff_it)) - - state.append(prev_state[next_start:delta_start]) - state.append(added[-1]) - next_start = delta_end - - state.append(prev_state[next_start:]) - state = b"".join(state) - - return (state, added, removed) - - def skim_headers(self): - """Get all header data from a change group but bypass processing of the - contained delta components. - - yields: - output of read_chunk_header method for all chunks in the group - """ - size = unpack(">I", self.filereader) - 104 - while size >= 0: - header = self.read_chunk_header() - self.filereader.seek(size, from_current=True) - yield header - size = unpack(">I", self.filereader) - 104 - - def group_iterator(self): - """Bundle sections are called groups. These are composed of one or more - revision chunks of delta components. Iterate over all the chunks in a - group and hand each one back. - - yields: - see output of read_revchunk method - """ - revchunk = self.read_revchunk() - while revchunk: # A group is terminated by a NullChunk - yield revchunk # (header, revdata_iterator) - revchunk = self.read_revchunk() - - def yield_group_objects(self, cache_hints=None, group_offset=None): - """Bundles are sectioned into groups: the log of all commits, the log - of all manifest changes, and a series of logs of blob changes (one for - each file). All groups are structured the same way, as a series of - revisions each with a series of delta components. Iterate over the - current group and return the completed object data for the current - update by applying all of the internal delta components to each prior - revision. - - args: - cache_hints: see build_cache_hints (this will be built - automatically if not pre-built and passed in) - group_offset: int file position of the start of the desired group - - yields: - (dict, bytestring, list, list) the output from read_chunk_header - followed by the output from apply_revdata - """ - if group_offset is not None: - self.filereader.seek(group_offset) - - if cache_hints is None: - cache_hints = self.build_cache_hints() - - data_cache = SelectiveCache( - max_size=self.cache_size, - cache_hints=cache_hints, - filename=self.cache_filename, - ) - - # Loop over all revisions in the group - data = b"" - for header, revdata_it in self.group_iterator(): - node = header["node"] - basenode = header["basenode"] - - data = data_cache.fetch(basenode) or b"" - - data, added, removed = self.apply_revdata(revdata_it, data) - - data_cache.store(node, data) - - yield (header, data, added, removed) # each RevChunk - - def extract_meta_from_blob(self, data): - """File revision data sometimes begins with a metadata section of - dubious value. Strip it off and maybe decode it. It seems to be mostly - useless. Why indicate that a file node is a copy of another node? You - can already get that information from the delta header. - - args: - data: bytestring of one revision of a file, possibly with metadata - embedded at the start - - returns: - (bytestring, dict) of (the blob data, the meta information) - """ - meta = {} - if data.startswith(b"\x01\n"): - empty, metainfo, data = data.split(b"\x01\n", 2) - metainfo = b"\x01\n" + metainfo + b"\x01\n" - if metainfo.startswith(b"copy:"): - # direct file copy (?) - copyinfo = metainfo.split(b"\n") - meta["copied_file"] = copyinfo[0][6:] - meta["copied_rev"] = copyinfo[1][9:] - elif metainfo.startswith(b"censored:"): - # censored revision deltas must be full-replacements (?) - meta["censored"] = metainfo - else: - # no idea - meta["text"] = metainfo - return data, meta - - def seek_changelog(self): - """Seek to the beginning of the change logs section. - """ - self.filereader.seek(self.changes_offset) - - def seek_manifests(self): - """Seek to the beginning of the manifests section. - """ - if self.manifests_offset is None: - self.seek_changelog() - self.skip_sections(1) # skip past commits - self.manifests_offset = self.filereader.tell() - else: - self.filereader.seek(self.manifests_offset) - - def seek_filelist(self): - """Seek to the beginning of the file changes section. - """ - if self.blobs_offset is None: - self.seek_manifests() - self.skip_sections(1) # skip past manifests - self.blobs_offset = self.filereader.tell() - else: - self.filereader.seek(self.blobs_offset) - - def yield_all_blobs(self): - """Gets blob data from the bundle. - - yields: - (bytestring, (bytestring, int, dict)) of - (blob data, (file name, start offset of the file within the - bundle, node header)) - """ - self.seek_filelist() - - # Loop through all files that have commits - size = unpack(">I", self.filereader) - while size > 0: - file_name = self.filereader.read(size - 4) - file_start_offset = self.filereader.tell() - # get all of the blobs for each file - for header, data, *_ in self.yield_group_objects(): - blob, meta = self.extract_meta_from_blob(data) - yield blob, (file_name, file_start_offset, header) - size = unpack(">I", self.filereader) - - def yield_all_changesets(self): - """Gets commit data from the bundle. - - yields: - (dict, dict) of (read_chunk_header output, - extract_commit_metadata output) - """ - self.seek_changelog() - for header, data, *_ in self.yield_group_objects(): - changeset = self.extract_commit_metadata(data) - yield (header, changeset) - - def yield_all_manifest_deltas(self, cache_hints=None): - """Gets manifest data from the bundle. - In order to process the manifests in a reasonable amount of time, we - want to use only the deltas and not the entire manifest at each change, - because if we're processing them in sequential order (we are) then we - already have the previous state so we only need the changes. - - args: - cache_hints: see build_cache_hints method - - yields: - (dict, dict, dict) of (read_chunk_header output, - extract_manifest_elements output on added/modified files, - extract_manifest_elements on removed files) - """ - self.seek_manifests() - for header, data, added, removed in self.yield_group_objects( - cache_hints=cache_hints - ): - added = self.extract_manifest_elements(added) - removed = self.extract_manifest_elements(removed) - yield (header, added, removed) - - def build_manifest_hints(self): - """Just a minor abstraction shortcut for the build_cache_hints method. - - returns: - see build_cache_hints method - """ - self.seek_manifests() - return self.build_cache_hints() - - def build_cache_hints(self): - """The SelectiveCache class that we use in building nodes can accept a - set of key counters that makes its memory usage much more efficient. - - returns: - dict of key=a node id, value=the number of times we - will need data from that node when building subsequent nodes - """ - cur_pos = self.filereader.tell() - hints = OrderedDict() - prev_node = None - for header in self.skim_headers(): - basenode = header["basenode"] - if (basenode != self.NAUGHT_NODE) and (basenode != prev_node): - # If the base isn't immediately prior, then cache it once more. - hints[basenode] = hints.get(basenode, 0) + 1 - prev_node = header["node"] - self.filereader.seek(cur_pos) - return hints - - def extract_manifest_elements(self, data): - """Parses data that looks like a manifest. In practice we only pass in - the bits extracted from the application of a manifest delta describing - which files were added/modified or which ones were removed. - - args: - data: either a string or a list of strings that, when joined, - embodies the composition of a manifest. - - This takes the form - of repetitions of (without the brackets):: - - b'\x00[flag]\\n' ...repeat... - - where ``[flag]`` may or may not be there depending on - whether the file is specially flagged as executable or - something - - returns: - dict: ``{file_path: (file_node, permissions), ...}`` where - permissions is given according to the flag that optionally exists - in the data - """ - elements = {} - if isinstance(data, str): - data = data.split(b"\n") - else: - data = itertools.chain.from_iterable([chunk.split(b"\n") for chunk in data]) - - for line in data: - if line != b"": - f = line.split(b"\x00") - - node = f[1] - flag_bytes = node[40:] - - elements[f[0]] = ( - unhexlify(node[:40]), - b"l" in flag_bytes, - b"755" if (b"x" in flag_bytes) else b"644", - ) - - return elements diff --git a/swh/loader/mercurial/chunked_reader.py b/swh/loader/mercurial/chunked_reader.py deleted file mode 100644 --- a/swh/loader/mercurial/chunked_reader.py +++ /dev/null @@ -1,105 +0,0 @@ -# Copyright (C) 2017 The Software Heritage developers -# See the AUTHORS file at the top-level directory of this distribution -# License: GNU General Public License version 3, or any later version -# See top-level LICENSE file for more information - -import struct - - -class ChunkedFileReader(object): - """A binary stream reader that gives seamless read access to Mercurial's - bundle2 HG20 format which is partitioned for some reason at the file level - into chunks of [4Bytes:, Bytes:] as if it were - encoding transport packets. - - args: - file: rb file handle pre-aligned to the start of the chunked portion - size_unpack_fmt: struct format string for unpacking the next chunk size - """ - - def __init__(self, file, size_unpack_fmt=">I"): - self._file = file - self._size_pattern = size_unpack_fmt - self._size_bytes = struct.calcsize(size_unpack_fmt) - self._bytes_per_chunk = self._chunk_size(True) - self._chunk_bytes_left = self._bytes_per_chunk - self._offset = self._file.tell() - # find the file size - self._file.seek(0, 2) # seek to end - self._size = self._file.tell() - self._file.seek(self._offset, 0) # seek back to original position - - def _chunk_size(self, first_time=False): - """Unpack the next bytes from the file to get the next file chunk size. - """ - size = struct.unpack(self._size_pattern, self._file.read(self._size_bytes))[0] - return size - - def size(self): - """Returns the file size in bytes. - """ - return self._size - - def read(self, bytes_to_read): - """Return N bytes from the file as a single block. - - args: - bytes_to_read: int number of bytes of content - """ - return b"".join(self.read_iterator(bytes_to_read)) - - def read_iterator(self, bytes_to_read): - """Return a generator that yields N bytes from the file one file chunk - at a time. - - args: - bytes_to_read: int number of bytes of content - """ - while bytes_to_read > self._chunk_bytes_left: - yield self._file.read(self._chunk_bytes_left) - bytes_to_read -= self._chunk_bytes_left - self._chunk_bytes_left = self._chunk_size() - self._chunk_bytes_left -= bytes_to_read - yield self._file.read(bytes_to_read) - - def seek(self, new_pos=None, from_current=False): - """Wraps the underlying file seek, additionally updating the - chunk_bytes_left counter appropriately so that we can start reading - from the new location. - - args: - - new_pos: new cursor byte position - from_current: if True, it treats new_pos as an offset from the - current cursor position, bypassing any chunk boundaries as if - they weren't there. This should give the same end position as a - read except without the reading data part. - """ - if from_current: - new_pos = new_pos or 0 # None -> current - if new_pos <= self._chunk_bytes_left: - new_pos += self._file.tell() - else: - new_pos += ( - self._file.tell() - + self._size_bytes - + ( - ( - (new_pos - self._chunk_bytes_left - 1) # aliasing - // self._bytes_per_chunk - ) - * self._size_bytes - ) - ) - else: - new_pos = new_pos or self._offset # None -> start position - - self._chunk_bytes_left = self._bytes_per_chunk - ( - (new_pos - self._offset) % (self._bytes_per_chunk + self._size_bytes) - ) - self._file.seek(new_pos) - - def __getattr__(self, item): - """Forward other calls to the underlying file object. - """ - return getattr(self._file, item) diff --git a/swh/loader/mercurial/cli.py b/swh/loader/mercurial/cli.py deleted file mode 100644 --- a/swh/loader/mercurial/cli.py +++ /dev/null @@ -1,66 +0,0 @@ -# Copyright (C) 2018-2021 The Software Heritage developers -# See the AUTHORS file at the top-level directory of this distribution -# License: GNU General Public License version 3, or any later version -# See top-level LICENSE file for more information - -from itertools import chain -import logging - -import click -from deprecated import deprecated - -from swh.loader.mercurial.utils import parse_visit_date - -LOGLEVELS = list( - chain.from_iterable( - (logging._levelToName[lvl], logging._levelToName[lvl].lower()) - for lvl in sorted(logging._levelToName.keys()) - ) -) - - -@click.command() -@click.argument("origin-url") -@click.option( - "--hg-directory", - "-d", - help=( - "Path to the hg (local) directory to load from. " - "If unset, the hg repo will be cloned from the " - "given (origin) url." - ), -) -@click.option("--hg-archive", "-a", help=("Path to the hg archive file to load from.")) -@click.option("--visit-date", "-D", help="Visit date (defaults to now).") -@click.option("--log-level", "-l", type=click.Choice(LOGLEVELS), help="Log level.") -@deprecated( - version="0.4.0", reason="Use `swh loader run mercurial|mercurial_from_disk` instead" -) -def main( - origin_url, hg_directory=None, hg_archive=None, visit_date=None, log_level=None -): - from swh.storage import get_storage - - logging.basicConfig( - level=(log_level or "DEBUG").upper(), - format="%(asctime)s %(process)d %(message)s", - ) - - visit_date = parse_visit_date(visit_date or "now") - - kwargs = {"visit_date": visit_date, "url": origin_url} - if hg_archive: - from .loader import HgArchiveBundle20Loader as HgLoader - - kwargs["archive_path"] = hg_archive - else: - from .loader import HgBundle20Loader as HgLoader - - kwargs["directory"] = hg_directory - - storage = get_storage(cls="memory") - return HgLoader(storage, **kwargs).load() - - -if __name__ == "__main__": - main() diff --git a/swh/loader/mercurial/from_disk.py b/swh/loader/mercurial/from_disk.py deleted file mode 100644 --- a/swh/loader/mercurial/from_disk.py +++ /dev/null @@ -1,815 +0,0 @@ -# Copyright (C) 2020-2021 The Software Heritage developers -# See the AUTHORS file at the top-level directory of this distribution -# License: GNU General Public License version 3, or any later version -# See top-level LICENSE file for more information - -from collections import deque -from datetime import datetime -import os -from shutil import rmtree -from tempfile import mkdtemp -from typing import Deque, Dict, Iterator, List, Optional, Set, Tuple, TypeVar, Union - -from swh.core.utils import grouper -from swh.loader.core.loader import BaseLoader -from swh.loader.core.utils import clean_dangling_folders -from swh.loader.mercurial.utils import get_minimum_env, parse_visit_date -from swh.model import identifiers -from swh.model.from_disk import Content, DentryPerms, Directory -from swh.model.hashutil import hash_to_bytehex -from swh.model.model import ( - ExtID, - ObjectType, - Origin, - Person, - Release, - Revision, - RevisionType, - Sha1Git, - Snapshot, - SnapshotBranch, - TargetType, - TimestampWithTimezone, -) -from swh.model.model import Content as ModelContent -from swh.storage.algos.snapshot import snapshot_get_latest -from swh.storage.interface import StorageInterface - -from . import hgutil -from .archive_extract import tmp_extract -from .hgutil import HgFilteredSet, HgNodeId, HgSpanSet - -FLAG_PERMS = { - b"l": DentryPerms.symlink, - b"x": DentryPerms.executable_content, - b"": DentryPerms.content, -} # type: Dict[bytes, DentryPerms] - -TEMPORARY_DIR_PREFIX_PATTERN = "swh.loader.mercurial.from_disk" - -EXTID_TYPE = "hg-nodeid" -EXTID_VERSION: int = 1 - -T = TypeVar("T") - - -class CorruptedRevision(ValueError): - """Raised when a revision is corrupted.""" - - def __init__(self, hg_nodeid: HgNodeId) -> None: - super().__init__(hg_nodeid.hex()) - self.hg_nodeid = hg_nodeid - - -class HgDirectory(Directory): - """A more practical directory. - - - creates missing parent directories - - removes empty directories - """ - - def __setitem__(self, path: bytes, value: Union[Content, "HgDirectory"]) -> None: - if b"/" in path: - head, tail = path.split(b"/", 1) - - directory = self.get(head) - if directory is None or isinstance(directory, Content): - directory = HgDirectory() - self[head] = directory - - directory[tail] = value - else: - super().__setitem__(path, value) - - def __delitem__(self, path: bytes) -> None: - super().__delitem__(path) - - while b"/" in path: # remove empty parent directories - path = path.rsplit(b"/", 1)[0] - if len(self[path]) == 0: - super().__delitem__(path) - else: - break - - def get( - self, path: bytes, default: Optional[T] = None - ) -> Optional[Union[Content, "HgDirectory", T]]: - # TODO move to swh.model.from_disk.Directory - try: - return self[path] - except KeyError: - return default - - -class HgLoaderFromDisk(BaseLoader): - """Load a mercurial repository from a local repository. - - Mercurial's branching model is more complete than Git's; it allows for multiple - heads per branch, closed heads and bookmarks. The following mapping is used to - represent the branching state of a Mercurial project in a given snapshot: - - - `HEAD` (optional) either the node pointed by the `@` bookmark or the tip of - the `default` branch - - `branch-tip/` (required) the first head of the branch, sorted by - nodeid if there are multiple heads. - - `bookmarks/` (optional) holds the bookmarks mapping if any - - `branch-heads//0..n` (optional) for any branch with multiple open - heads, list all *open* heads - - `branch-closed-heads//0..n` (optional) for any branch with at least - one closed head, list all *closed* heads - - `tags/` (optional) record tags - - The format is not ambiguous regardless of branch name since we know it ends with a - `/`, as long as we have a stable sorting of the heads (we sort by nodeid). - There may be some overlap between the refs, but it's simpler not to try to figure - out de-duplication. - However, to reduce the redundancy between snapshot branches in the most common case, - when a branch has a single open head, it will only be referenced as - `branch-tip/`. The `branch-heads/` hierarchy only appears when a branch - has multiple open heads, which we consistently sort by increasing nodeid. - The `branch-closed-heads/` hierarchy is also sorted by increasing nodeid. - """ - - CONFIG_BASE_FILENAME = "loader/mercurial" - - visit_type = "hg" - - def __init__( - self, - storage: StorageInterface, - url: str, - directory: Optional[str] = None, - logging_class: str = "swh.loader.mercurial.LoaderFromDisk", - visit_date: Optional[datetime] = None, - temp_directory: str = "/tmp", - clone_timeout_seconds: int = 7200, - content_cache_size: int = 10_000, - max_content_size: Optional[int] = None, - ): - """Initialize the loader. - - Args: - url: url of the repository. - directory: directory of the local repository. - logging_class: class of the loader logger. - visit_date: visit date of the repository - config: loader configuration - """ - super().__init__( - storage=storage, - logging_class=logging_class, - max_content_size=max_content_size, - ) - - self._temp_directory = temp_directory - self._clone_timeout = clone_timeout_seconds - - self.origin_url = url - self.visit_date = visit_date - self.directory = directory - - self._repo: Optional[hgutil.Repository] = None - self._revision_nodeid_to_sha1git: Dict[HgNodeId, Sha1Git] = {} - self._repo_directory: Optional[str] = None - - # keeps the last processed hg nodeid - # it is used for differential tree update by store_directories - # NULLID is the parent of the first revision - self._last_hg_nodeid = hgutil.NULLID - - # keeps the last revision tree - # it is used for differential tree update by store_directories - self._last_root = HgDirectory() - - # Cache the content hash across revisions to avoid recalculation. - self._content_hash_cache: hgutil.LRUCacheDict = hgutil.LRUCacheDict( - content_cache_size, - ) - - # hg node id of the latest snapshot branch heads - # used to find what are the new revisions since last snapshot - self._latest_heads: List[HgNodeId] = [] - # hg node ids of all the tags recorded on previous runs - # Used to compute which tags need to be added, even across incremental runs - # that might separate a changeset introducing a tag from its target. - self._saved_tags: Set[HgNodeId] = set() - - self._load_status = "eventful" - # If set, will override the default value - self._visit_status = None - - def pre_cleanup(self) -> None: - """As a first step, will try and check for dangling data to cleanup. - This should do its best to avoid raising issues. - - """ - clean_dangling_folders( - self._temp_directory, - pattern_check=TEMPORARY_DIR_PREFIX_PATTERN, - log=self.log, - ) - - self.old_environ = os.environ.copy() - os.environ.clear() - os.environ.update(get_minimum_env()) - - def cleanup(self) -> None: - """Last step executed by the loader.""" - os.environ.clear() - os.environ.update(self.old_environ) - - # Don't cleanup if loading from a local directory - was_remote = self.directory is None - if was_remote and self._repo_directory and os.path.exists(self._repo_directory): - self.log.debug(f"Cleanup up repository {self._repo_directory}") - rmtree(self._repo_directory) - - def prepare_origin_visit(self) -> None: - """First step executed by the loader to prepare origin and visit - references. Set/update self.origin, and - optionally self.origin_url, self.visit_date. - - """ - self.origin = Origin(url=self.origin_url) - - def prepare(self) -> None: - """Second step executed by the loader to prepare some state needed by - the loader. - - """ - # Set here to allow multiple calls to load on the same loader instance - self._latest_heads = [] - - latest_snapshot = snapshot_get_latest(self.storage, self.origin_url) - if latest_snapshot: - self._set_recorded_state(latest_snapshot) - - def _set_recorded_state(self, latest_snapshot: Snapshot) -> None: - """ - Looks up the nodeid for all revisions in the snapshot via extid_get_from_target, - and adds them to `self._latest_heads`. - Also looks up the currently saved releases ("tags" in Mercurial speak). - The tags are all listed for easy comparison at the end, while only the latest - heads are needed for revisions. - """ - heads = [] - tags = [] - - for branch in latest_snapshot.branches.values(): - if branch.target_type == TargetType.REVISION: - heads.append(branch.target) - elif branch.target_type == TargetType.RELEASE: - tags.append(branch.target) - - self._latest_heads.extend( - HgNodeId(extid.extid) for extid in self._get_extids_for_targets(heads) - ) - self._saved_tags.update( - HgNodeId(extid.extid) for extid in self._get_extids_for_targets(tags) - ) - - def _get_extids_for_targets(self, targets: List[Sha1Git]) -> List[ExtID]: - """Get all Mercurial ExtIDs for the targets in the latest snapshot""" - extids = [] - for extid in self.storage.extid_get_from_target( - identifiers.ObjectType.REVISION, - targets, - extid_type=EXTID_TYPE, - extid_version=EXTID_VERSION, - ): - extids.append(extid) - self._revision_nodeid_to_sha1git[ - HgNodeId(extid.extid) - ] = extid.target.object_id - - if extids: - # Filter out dangling extids, we need to load their target again - revisions_missing = self.storage.revision_missing( - [extid.target.object_id for extid in extids] - ) - extids = [ - extid - for extid in extids - if extid.target.object_id not in revisions_missing - ] - return extids - - def _get_extids_for_hgnodes(self, hgnode_ids: List[HgNodeId]) -> List[ExtID]: - """Get all Mercurial ExtIDs for the mercurial nodes in the list which point to - a known revision. - - """ - extids = [] - - for group_ids in grouper(hgnode_ids, n=1000): - for extid in self.storage.extid_get_from_extid( - EXTID_TYPE, group_ids, version=EXTID_VERSION - ): - extids.append(extid) - self._revision_nodeid_to_sha1git[ - HgNodeId(extid.extid) - ] = extid.target.object_id - - if extids: - # Filter out dangling extids, we need to load their target again - revisions_missing = self.storage.revision_missing( - [extid.target.object_id for extid in extids] - ) - extids = [ - extid - for extid in extids - if extid.target.object_id not in revisions_missing - ] - return extids - - def fetch_data(self) -> bool: - """Fetch the data from the source the loader is currently loading - - Returns: - a value that is interpreted as a boolean. If True, fetch_data needs - to be called again to complete loading. - - """ - if not self.directory: # no local repository - self._repo_directory = mkdtemp( - prefix=TEMPORARY_DIR_PREFIX_PATTERN, - suffix=f"-{os.getpid()}", - dir=self._temp_directory, - ) - self.log.debug( - f"Cloning {self.origin_url} to {self.directory} " - f"with timeout {self._clone_timeout} seconds" - ) - hgutil.clone(self.origin_url, self._repo_directory, self._clone_timeout) - else: # existing local repository - # Allow to load on disk repository without cloning - # for testing purpose. - self._repo_directory = self.directory - - self._repo = hgutil.repository(self._repo_directory) - - return False - - def _new_revs(self, heads: List[HgNodeId]) -> Union[HgFilteredSet, HgSpanSet]: - """Return unseen revisions. That is, filter out revisions that are not ancestors of - heads""" - assert self._repo is not None - existing_heads = [] - - for hg_nodeid in heads: - try: - rev = self._repo[hg_nodeid].rev() - existing_heads.append(rev) - except KeyError: # the node does not exist anymore - pass - - # select revisions that are not ancestors of heads - # and not the heads themselves - new_revs = self._repo.revs("not ::(%ld)", existing_heads) - - if new_revs: - self.log.info("New revisions found: %d", len(new_revs)) - - return new_revs - - def get_hg_revs_to_load(self) -> Iterator[int]: - """Yield hg revision numbers to load. - - """ - assert self._repo is not None - repo: hgutil.Repository = self._repo - - seen_revs: Set[int] = set() - # 1. use snapshot to reuse existing seen heads from it - if self._latest_heads: - for rev in self._new_revs(self._latest_heads): - seen_revs.add(rev) - yield rev - - # 2. Then filter out remaining revisions through the overall extid mappings - # across hg origins - revs_left = repo.revs("all() - ::(%ld)", seen_revs) - hg_nodeids = [repo[nodeid].node() for nodeid in revs_left] - yield from self._new_revs( - [ - HgNodeId(extid.extid) - for extid in self._get_extids_for_hgnodes(hg_nodeids) - ] - ) - - def store_data(self): - """Store fetched data in the database.""" - revs = self.get_hg_revs_to_load() - length_ingested_revs = 0 - - assert self._repo is not None - repo = self._repo - - ignored_revs: Set[int] = set() - for rev in revs: - if rev in ignored_revs: - continue - try: - self.store_revision(repo[rev]) - length_ingested_revs += 1 - except CorruptedRevision as e: - self._visit_status = "partial" - self.log.warning("Corrupted revision %s", e) - descendents = repo.revs("(%ld)::", [rev]) - ignored_revs.update(descendents) - - if length_ingested_revs == 0: - # no new revision ingested, so uneventful - # still we'll make a snapshot, so we continue - self._load_status = "uneventful" - - branching_info = hgutil.branching_info(repo, ignored_revs) - tags_by_name: Dict[bytes, HgNodeId] = repo.tags() - - snapshot_branches: Dict[bytes, SnapshotBranch] = {} - - for tag_name, hg_nodeid in tags_by_name.items(): - if tag_name == b"tip": - # `tip` is listed in the tags by the Mercurial API but its not a tag - # defined by the user in `.hgtags`. - continue - if hg_nodeid not in self._saved_tags: - label = b"tags/%s" % tag_name - target = self.get_revision_id_from_hg_nodeid(hg_nodeid) - snapshot_branches[label] = SnapshotBranch( - target=self.store_release(tag_name, target), - target_type=TargetType.RELEASE, - ) - - for branch_name, node_id in branching_info.tips.items(): - name = b"branch-tip/%s" % branch_name - target = self.get_revision_id_from_hg_nodeid(node_id) - snapshot_branches[name] = SnapshotBranch( - target=target, target_type=TargetType.REVISION - ) - - for bookmark_name, node_id in branching_info.bookmarks.items(): - name = b"bookmarks/%s" % bookmark_name - target = self.get_revision_id_from_hg_nodeid(node_id) - snapshot_branches[name] = SnapshotBranch( - target=target, target_type=TargetType.REVISION - ) - - for branch_name, branch_heads in branching_info.open_heads.items(): - for index, head in enumerate(branch_heads): - name = b"branch-heads/%s/%d" % (branch_name, index) - target = self.get_revision_id_from_hg_nodeid(head) - snapshot_branches[name] = SnapshotBranch( - target=target, target_type=TargetType.REVISION - ) - - for branch_name, closed_heads in branching_info.closed_heads.items(): - for index, head in enumerate(closed_heads): - name = b"branch-closed-heads/%s/%d" % (branch_name, index) - target = self.get_revision_id_from_hg_nodeid(head) - snapshot_branches[name] = SnapshotBranch( - target=target, target_type=TargetType.REVISION - ) - - # If the repo is broken enough or if it has none of the "normal" default - # mechanisms, we ignore `HEAD`. - default_branch_alias = branching_info.default_branch_alias - if default_branch_alias is not None: - snapshot_branches[b"HEAD"] = SnapshotBranch( - target=default_branch_alias, target_type=TargetType.ALIAS, - ) - snapshot = Snapshot(branches=snapshot_branches) - self.storage.snapshot_add([snapshot]) - - self.flush() - self.loaded_snapshot_id = snapshot.id - - def load_status(self) -> Dict[str, str]: - """Detailed loading status. - - Defaults to logging an eventful load. - - Returns: a dictionary that is eventually passed back as the task's - result to the scheduler, allowing tuning of the task recurrence - mechanism. - """ - return { - "status": self._load_status, - } - - def visit_status(self) -> str: - """Allow overriding the visit status in case of partial load""" - if self._visit_status is not None: - return self._visit_status - return super().visit_status() - - def get_revision_id_from_hg_nodeid(self, hg_nodeid: HgNodeId) -> Sha1Git: - """Return the git sha1 of a revision given its hg nodeid. - - Args: - hg_nodeid: the hg nodeid of the revision. - - Returns: - the sha1_git of the revision. - """ - - from_cache = self._revision_nodeid_to_sha1git.get(hg_nodeid) - if from_cache is not None: - return from_cache - # The parent was not loaded in this run, get it from storage - from_storage = [ - extid - for extid in self.storage.extid_get_from_extid( - EXTID_TYPE, ids=[hg_nodeid], version=EXTID_VERSION - ) - ] - - msg = "Expected 1 match from storage for hg node %r, got %d" - assert len(from_storage) == 1, msg % (hg_nodeid.hex(), len(from_storage)) - return from_storage[0].target.object_id - - def get_revision_parents(self, rev_ctx: hgutil.BaseContext) -> Tuple[Sha1Git, ...]: - """Return the git sha1 of the parent revisions. - - Args: - hg_nodeid: the hg nodeid of the revision. - - Returns: - the sha1_git of the parent revisions. - """ - parents = [] - for parent_ctx in rev_ctx.parents(): - parent_hg_nodeid = parent_ctx.node() - # nullid is the value of a parent that does not exist - if parent_hg_nodeid == hgutil.NULLID: - continue - revision_id = self.get_revision_id_from_hg_nodeid(parent_hg_nodeid) - parents.append(revision_id) - - return tuple(parents) - - def store_revision(self, rev_ctx: hgutil.BaseContext) -> None: - """Store a revision given its hg nodeid. - - Args: - rev_ctx: the he revision context. - - Returns: - the sha1_git of the stored revision. - """ - hg_nodeid = rev_ctx.node() - - root_sha1git = self.store_directories(rev_ctx) - - # `Person.from_fullname` is compatible with mercurial's freeform author - # as fullname is what is used in revision hash when available. - author = Person.from_fullname(rev_ctx.user()) - - (timestamp, offset) = rev_ctx.date() - - # TimestampWithTimezone.from_dict will change name - # as it accept more than just dicts - rev_date = TimestampWithTimezone.from_dict(int(timestamp)) - - extra_headers = [ - (b"time_offset_seconds", str(offset).encode(),), - ] - for key, value in rev_ctx.extra().items(): - # The default branch is skipped to match - # the historical implementation - if key == b"branch" and value == b"default": - continue - - # transplant_source is converted to match - # the historical implementation - if key == b"transplant_source": - value = hash_to_bytehex(value) - extra_headers.append((key, value)) - - revision = Revision( - author=author, - date=rev_date, - committer=author, - committer_date=rev_date, - type=RevisionType.MERCURIAL, - directory=root_sha1git, - message=rev_ctx.description(), - extra_headers=tuple(extra_headers), - synthetic=False, - parents=self.get_revision_parents(rev_ctx), - ) - - self._revision_nodeid_to_sha1git[hg_nodeid] = revision.id - self.storage.revision_add([revision]) - - # Save the mapping from SWHID to hg id - revision_swhid = identifiers.CoreSWHID( - object_type=identifiers.ObjectType.REVISION, object_id=revision.id, - ) - self.storage.extid_add( - [ - ExtID( - extid_type=EXTID_TYPE, - extid_version=EXTID_VERSION, - extid=hg_nodeid, - target=revision_swhid, - ) - ] - ) - - def store_release(self, name: bytes, target: Sha1Git) -> Sha1Git: - """Store a release given its name and its target. - - A release correspond to a user defined tag in mercurial. - The mercurial api as a `tip` tag that must be ignored. - - Args: - name: name of the release. - target: sha1_git of the target revision. - - Returns: - the sha1_git of the stored release. - """ - release = Release( - name=name, - target=target, - target_type=ObjectType.REVISION, - message=None, - metadata=None, - synthetic=False, - author=Person(name=None, email=None, fullname=b""), - date=None, - ) - - self.storage.release_add([release]) - - return release.id - - def store_content(self, rev_ctx: hgutil.BaseContext, file_path: bytes) -> Content: - """Store a revision content hg nodeid and file path. - - Content is a mix of file content at a given revision - and its permissions found in the changeset's manifest. - - Args: - rev_ctx: the he revision context. - file_path: the hg path of the content. - - Returns: - the sha1_git of the top level directory. - """ - hg_nodeid = rev_ctx.node() - file_ctx = rev_ctx[file_path] - - try: - file_nodeid = file_ctx.filenode() - except hgutil.LookupError: - # TODO - # Raising CorruptedRevision avoid crashing the whole loading - # but can lead to a lot of missing revisions. - # SkippedContent could be used but need actual content to calculate its id. - # Maybe the hg_nodeid can be used instead. - # Another option could be to just ignore the missing content. - # This point is left to future commits. - # Check for other uses to apply the same logic there. - raise CorruptedRevision(hg_nodeid) - - perms = FLAG_PERMS[file_ctx.flags()] - - # Key is file_nodeid + perms because permissions does not participate - # in content hash in hg while it is the case in swh. - cache_key = (file_nodeid, perms) - - sha1_git = self._content_hash_cache.get(cache_key) - if sha1_git is None: - try: - data = file_ctx.data() - except hgutil.error.RevlogError: - # TODO - # See above use of `CorruptedRevision` - raise CorruptedRevision(hg_nodeid) - - content = ModelContent.from_data(data) - - self.storage.content_add([content]) - - sha1_git = content.sha1_git - self._content_hash_cache[cache_key] = sha1_git - - # Here we make sure to return only necessary data. - return Content({"sha1_git": sha1_git, "perms": perms}) - - def store_directories(self, rev_ctx: hgutil.BaseContext) -> Sha1Git: - """Store a revision directories given its hg nodeid. - - Mercurial as no directory as in git. A Git like tree must be build - from file paths to obtain each directory hash. - - Args: - rev_ctx: the he revision context. - - Returns: - the sha1_git of the top level directory. - """ - repo: hgutil.Repository = self._repo # mypy can't infer that repo is not None - prev_ctx = repo[self._last_hg_nodeid] - - # TODO maybe do diff on parents - try: - status = prev_ctx.status(rev_ctx) - except hgutil.error.LookupError: - raise CorruptedRevision(rev_ctx.node()) - - for file_path in status.removed: - try: - del self._last_root[file_path] - except KeyError: - raise CorruptedRevision(rev_ctx.node()) - - for file_path in status.added: - content = self.store_content(rev_ctx, file_path) - self._last_root[file_path] = content - - for file_path in status.modified: - content = self.store_content(rev_ctx, file_path) - self._last_root[file_path] = content - - self._last_hg_nodeid = rev_ctx.node() - - directories: Deque[Directory] = deque([self._last_root]) - while directories: - directory = directories.pop() - self.storage.directory_add([directory.to_model()]) - directories.extend( - [item for item in directory.values() if isinstance(item, Directory)] - ) - - return self._last_root.hash - - -class HgArchiveLoaderFromDisk(HgLoaderFromDisk): - """Mercurial loader for repository wrapped within tarballs.""" - - def __init__( - self, - storage: StorageInterface, - url: str, - visit_date: Optional[datetime] = None, - archive_path: str = None, - temp_directory: str = "/tmp", - max_content_size: Optional[int] = None, - ): - super().__init__( - storage=storage, - url=url, - visit_date=visit_date, - logging_class="swh.loader.mercurial.ArchiveLoaderFromDisk", - temp_directory=temp_directory, - max_content_size=max_content_size, - ) - self.archive_extract_temp_dir = None - self.archive_path = archive_path - - def prepare(self): - """Extract the archive instead of cloning.""" - self.archive_extract_temp_dir = tmp_extract( - archive=self.archive_path, - dir=self._temp_directory, - prefix=TEMPORARY_DIR_PREFIX_PATTERN, - suffix=f".dump-{os.getpid()}", - log=self.log, - source=self.origin_url, - ) - - repo_name = os.listdir(self.temp_dir)[0] - self.directory = os.path.join(self.archive_extract_temp_dir, repo_name) - super().prepare() - - -# Allow direct usage of the loader from the command line with -# `python -m swh.loader.mercurial.from_disk $ORIGIN_URL` -if __name__ == "__main__": - import logging - - import click - - logging.basicConfig( - level=logging.DEBUG, format="%(asctime)s %(process)d %(message)s" - ) - - @click.command() - @click.option("--origin-url", help="origin url") - @click.option("--hg-directory", help="Path to mercurial repository to load") - @click.option("--visit-date", default=None, help="Visit date") - def main(origin_url, hg_directory, visit_date): - from swh.storage import get_storage - - storage = get_storage(cls="memory") - return HgLoaderFromDisk( - storage, - origin_url, - directory=hg_directory, - visit_date=parse_visit_date(visit_date), - ).load() - - main() diff --git a/swh/loader/mercurial/loader.py b/swh/loader/mercurial/loader.py --- a/swh/loader/mercurial/loader.py +++ b/swh/loader/mercurial/loader.py @@ -1,51 +1,23 @@ -# Copyright (C) 2017-2021 The Software Heritage developers +# Copyright (C) 2020-2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information -"""This document contains a SWH loader for ingesting repository data -from Mercurial version 2 bundle files. - -""" - -# NOTE: The code here does expensive work twice in places because of the -# intermediate need to check for what is missing before sending to the database -# and the desire to not juggle very large amounts of data. - -# TODO: Decide whether to also serialize to disk and read back more quickly -# from there. Maybe only for very large repos and fast drives. -# - Avi - - -import datetime +from collections import deque +from datetime import datetime import os -from queue import Empty -import random -import re from shutil import rmtree from tempfile import mkdtemp -import time -from typing import Any, Dict, Iterable, List, Optional - -import billiard -import hglib -from hglib.error import CommandError +from typing import Deque, Dict, Iterator, List, Optional, Set, Tuple, TypeVar, Union -from swh.loader.core.loader import DVCSLoader +from swh.core.utils import grouper +from swh.loader.core.loader import BaseLoader from swh.loader.core.utils import clean_dangling_folders -from swh.loader.exception import NotFound from swh.loader.mercurial.utils import get_minimum_env from swh.model import identifiers -from swh.model.hashutil import ( - DEFAULT_ALGORITHMS, - MultiHash, - hash_to_bytehex, - hash_to_bytes, -) +from swh.model.from_disk import Content, DentryPerms, Directory +from swh.model.hashutil import hash_to_bytehex from swh.model.model import ( - BaseContent, - Content, - Directory, ExtID, ObjectType, Origin, @@ -54,615 +26,736 @@ Revision, RevisionType, Sha1Git, - SkippedContent, Snapshot, SnapshotBranch, TargetType, TimestampWithTimezone, ) -from swh.storage.algos.origin import origin_get_latest_visit_status +from swh.model.model import Content as ModelContent +from swh.storage.algos.snapshot import snapshot_get_latest from swh.storage.interface import StorageInterface -from . import converters +from . import hgutil from .archive_extract import tmp_extract -from .bundle20_reader import Bundle20Reader -from .converters import PRIMARY_ALGO as ALGO -from .objects import SelectiveCache, SimpleTree - -TAG_PATTERN = re.compile("[0-9A-Fa-f]{40}") +from .hgutil import HgFilteredSet, HgNodeId, HgSpanSet -TEMPORARY_DIR_PREFIX_PATTERN = "swh.loader.mercurial." +FLAG_PERMS = { + b"l": DentryPerms.symlink, + b"x": DentryPerms.executable_content, + b"": DentryPerms.content, +} # type: Dict[bytes, DentryPerms] -HEAD_POINTER_NAME = b"tip" +TEMPORARY_DIR_PREFIX_PATTERN = "swh.loader.mercurial.loader" EXTID_TYPE = "hg-nodeid" +EXTID_VERSION: int = 1 + +T = TypeVar("T") + + +class CorruptedRevision(ValueError): + """Raised when a revision is corrupted.""" + def __init__(self, hg_nodeid: HgNodeId) -> None: + super().__init__(hg_nodeid.hex()) + self.hg_nodeid = hg_nodeid -class CommandErrorWrapper(Exception): - """This exception is raised in place of a 'CommandError' - exception (raised by the underlying hglib library) - This is needed because billiard.Queue is serializing the - queued object and as CommandError doesn't have a constructor without - parameters, the deserialization is failing +class HgDirectory(Directory): + """A more practical directory. + + - creates missing parent directories + - removes empty directories """ - def __init__(self, err: Optional[bytes]): - self.err = err + def __setitem__(self, path: bytes, value: Union[Content, "HgDirectory"]) -> None: + if b"/" in path: + head, tail = path.split(b"/", 1) + directory = self.get(head) + if directory is None or isinstance(directory, Content): + directory = HgDirectory() + self[head] = directory -class CloneTimeoutError(Exception): - pass + directory[tail] = value + else: + super().__setitem__(path, value) + def __delitem__(self, path: bytes) -> None: + super().__delitem__(path) -class HgBundle20Loader(DVCSLoader): - """Mercurial loader able to deal with remote or local repository. + while b"/" in path: # remove empty parent directories + path = path.rsplit(b"/", 1)[0] + if len(self[path]) == 0: + super().__delitem__(path) + else: + break + def get( + self, path: bytes, default: Optional[T] = None + ) -> Optional[Union[Content, "HgDirectory", T]]: + # TODO move to swh.model.from_disk.Directory + try: + return self[path] + except KeyError: + return default + + +class HgLoader(BaseLoader): + """Load a mercurial repository from a local repository. + + Mercurial's branching model is more complete than Git's; it allows for multiple + heads per branch, closed heads and bookmarks. The following mapping is used to + represent the branching state of a Mercurial project in a given snapshot: + + - `HEAD` (optional) either the node pointed by the `@` bookmark or the tip of + the `default` branch + - `branch-tip/` (required) the first head of the branch, sorted by + nodeid if there are multiple heads. + - `bookmarks/` (optional) holds the bookmarks mapping if any + - `branch-heads//0..n` (optional) for any branch with multiple open + heads, list all *open* heads + - `branch-closed-heads//0..n` (optional) for any branch with at least + one closed head, list all *closed* heads + - `tags/` (optional) record tags + + The format is not ambiguous regardless of branch name since we know it ends with a + `/`, as long as we have a stable sorting of the heads (we sort by nodeid). + There may be some overlap between the refs, but it's simpler not to try to figure + out de-duplication. + However, to reduce the redundancy between snapshot branches in the most common case, + when a branch has a single open head, it will only be referenced as + `branch-tip/`. The `branch-heads/` hierarchy only appears when a branch + has multiple open heads, which we consistently sort by increasing nodeid. + The `branch-closed-heads/` hierarchy is also sorted by increasing nodeid. """ + CONFIG_BASE_FILENAME = "loader/mercurial" + visit_type = "hg" def __init__( self, storage: StorageInterface, url: str, - visit_date: Optional[datetime.datetime] = None, directory: Optional[str] = None, - logging_class="swh.loader.mercurial.Bundle20Loader", - bundle_filename: Optional[str] = "HG20_none_bundle", - reduce_effort: bool = False, + logging_class: str = "swh.loader.mercurial.LoaderFromDisk", + visit_date: Optional[datetime] = None, temp_directory: str = "/tmp", - cache1_size: int = 800 * 1024 * 1024, - cache2_size: int = 800 * 1024 * 1024, clone_timeout_seconds: int = 7200, - save_data_path: Optional[str] = None, + content_cache_size: int = 10_000, max_content_size: Optional[int] = None, ): + """Initialize the loader. + + Args: + url: url of the repository. + directory: directory of the local repository. + logging_class: class of the loader logger. + visit_date: visit date of the repository + config: loader configuration + """ super().__init__( storage=storage, logging_class=logging_class, - save_data_path=save_data_path, max_content_size=max_content_size, ) + + self._temp_directory = temp_directory + self._clone_timeout = clone_timeout_seconds + self.origin_url = url self.visit_date = visit_date self.directory = directory - self.bundle_filename = bundle_filename - self.reduce_effort_flag = reduce_effort - self.empty_repository = None - self.temp_directory = temp_directory - self.cache1_size = cache1_size - self.cache2_size = cache2_size - self.clone_timeout = clone_timeout_seconds - self.working_directory = None - self.bundle_path = None - self.heads: Dict[bytes, Any] = {} - self.releases: Dict[bytes, Any] = {} - self.last_snapshot_id: Optional[bytes] = None - self.old_environ = os.environ.copy() - os.environ.clear() - os.environ.update(get_minimum_env()) - def pre_cleanup(self): - """Cleanup potential dangling files from prior runs (e.g. OOM killed - tasks) + self._repo: Optional[hgutil.Repository] = None + self._revision_nodeid_to_sha1git: Dict[HgNodeId, Sha1Git] = {} + self._repo_directory: Optional[str] = None + + # keeps the last processed hg nodeid + # it is used for differential tree update by store_directories + # NULLID is the parent of the first revision + self._last_hg_nodeid = hgutil.NULLID + + # keeps the last revision tree + # it is used for differential tree update by store_directories + self._last_root = HgDirectory() + + # Cache the content hash across revisions to avoid recalculation. + self._content_hash_cache: hgutil.LRUCacheDict = hgutil.LRUCacheDict( + content_cache_size, + ) + + # hg node id of the latest snapshot branch heads + # used to find what are the new revisions since last snapshot + self._latest_heads: List[HgNodeId] = [] + # hg node ids of all the tags recorded on previous runs + # Used to compute which tags need to be added, even across incremental runs + # that might separate a changeset introducing a tag from its target. + self._saved_tags: Set[HgNodeId] = set() + + self._load_status = "eventful" + # If set, will override the default value + self._visit_status = None + + def pre_cleanup(self) -> None: + """As a first step, will try and check for dangling data to cleanup. + This should do its best to avoid raising issues. """ clean_dangling_folders( - self.temp_directory, + self._temp_directory, pattern_check=TEMPORARY_DIR_PREFIX_PATTERN, log=self.log, ) - def cleanup(self): - """Clean temporary working directory + self.old_environ = os.environ.copy() + os.environ.clear() + os.environ.update(get_minimum_env()) - """ + def cleanup(self) -> None: + """Last step executed by the loader.""" os.environ.clear() os.environ.update(self.old_environ) - if self.bundle_path and os.path.exists(self.bundle_path): - self.log.debug("Cleanup up working bundle %s" % self.bundle_path) - os.unlink(self.bundle_path) - if self.working_directory and os.path.exists(self.working_directory): - self.log.debug( - "Cleanup up working directory %s" % (self.working_directory,) - ) - rmtree(self.working_directory) - def get_heads(self, repo): - """Read the closed branches heads (branch, bookmarks) and returns a - dict with key the branch_name (bytes) and values the tuple - (pointer nature (bytes), mercurial's node id - (bytes)). Those needs conversion to swh-ids. This is taken - care of in get_revisions. + # Don't cleanup if loading from a local directory + was_remote = self.directory is None + if was_remote and self._repo_directory and os.path.exists(self._repo_directory): + self.log.debug(f"Cleanup up repository {self._repo_directory}") + rmtree(self._repo_directory) + + def prepare_origin_visit(self) -> None: + """First step executed by the loader to prepare origin and visit + references. Set/update self.origin, and + optionally self.origin_url, self.visit_date. """ - b = {} - for _, node_hash_id, pointer_nature, branch_name, *_ in repo.heads(): - b[branch_name] = (pointer_nature, hash_to_bytes(node_hash_id.decode())) + self.origin = Origin(url=self.origin_url) - bookmarks = repo.bookmarks() - if bookmarks and bookmarks[0]: - for bookmark_name, _, target_short in bookmarks[0]: - target = repo[target_short].node() - b[bookmark_name] = (None, hash_to_bytes(target.decode())) + def prepare(self) -> None: + """Second step executed by the loader to prepare some state needed by + the loader. - return b + """ + # Set here to allow multiple calls to load on the same loader instance + self._latest_heads = [] - def prepare_origin_visit(self) -> None: - self.origin = Origin(url=self.origin_url) - visit_status = origin_get_latest_visit_status( - self.storage, self.origin_url, require_snapshot=True + latest_snapshot = snapshot_get_latest(self.storage, self.origin_url) + if latest_snapshot: + self._set_recorded_state(latest_snapshot) + + def _set_recorded_state(self, latest_snapshot: Snapshot) -> None: + """ + Looks up the nodeid for all revisions in the snapshot via extid_get_from_target, + and adds them to `self._latest_heads`. + Also looks up the currently saved releases ("tags" in Mercurial speak). + The tags are all listed for easy comparison at the end, while only the latest + heads are needed for revisions. + """ + heads = [] + tags = [] + + for branch in latest_snapshot.branches.values(): + if branch.target_type == TargetType.REVISION: + heads.append(branch.target) + elif branch.target_type == TargetType.RELEASE: + tags.append(branch.target) + + self._latest_heads.extend( + HgNodeId(extid.extid) for extid in self._get_extids_for_targets(heads) ) - self.last_snapshot_id = None if visit_status is None else visit_status.snapshot + self._saved_tags.update( + HgNodeId(extid.extid) for extid in self._get_extids_for_targets(tags) + ) + + def _get_extids_for_targets(self, targets: List[Sha1Git]) -> List[ExtID]: + """Get all Mercurial ExtIDs for the targets in the latest snapshot""" + extids = [] + for extid in self.storage.extid_get_from_target( + identifiers.ObjectType.REVISION, + targets, + extid_type=EXTID_TYPE, + extid_version=EXTID_VERSION, + ): + extids.append(extid) + self._revision_nodeid_to_sha1git[ + HgNodeId(extid.extid) + ] = extid.target.object_id + + if extids: + # Filter out dangling extids, we need to load their target again + revisions_missing = self.storage.revision_missing( + [extid.target.object_id for extid in extids] + ) + extids = [ + extid + for extid in extids + if extid.target.object_id not in revisions_missing + ] + return extids - @staticmethod - def clone_with_timeout(log, origin, destination, timeout): - queue = billiard.Queue() - start = time.monotonic() + def _get_extids_for_hgnodes(self, hgnode_ids: List[HgNodeId]) -> List[ExtID]: + """Get all Mercurial ExtIDs for the mercurial nodes in the list which point to + a known revision. - def do_clone(queue, origin, destination): - try: - result = hglib.clone(source=origin, dest=destination, noupdate=True) - except CommandError as e: - # the queued object need an empty constructor to be deserialized later - queue.put(CommandErrorWrapper(e.err)) - except BaseException as e: - queue.put(e) - else: - queue.put(result) + """ + extids = [] + + for group_ids in grouper(hgnode_ids, n=1000): + for extid in self.storage.extid_get_from_extid( + EXTID_TYPE, group_ids, version=EXTID_VERSION + ): + extids.append(extid) + self._revision_nodeid_to_sha1git[ + HgNodeId(extid.extid) + ] = extid.target.object_id + + if extids: + # Filter out dangling extids, we need to load their target again + revisions_missing = self.storage.revision_missing( + [extid.target.object_id for extid in extids] + ) + extids = [ + extid + for extid in extids + if extid.target.object_id not in revisions_missing + ] + return extids - process = billiard.Process(target=do_clone, args=(queue, origin, destination)) - process.start() + def fetch_data(self) -> bool: + """Fetch the data from the source the loader is currently loading - while True: - try: - result = queue.get(timeout=0.1) - break - except Empty: - duration = time.monotonic() - start - if timeout and duration > timeout: - log.warning( - "Timeout cloning `%s` within %s seconds", origin, timeout - ) - process.terminate() - process.join() - raise CloneTimeoutError(origin, timeout) - continue + Returns: + a value that is interpreted as a boolean. If True, fetch_data needs + to be called again to complete loading. - process.join() + """ + if not self.directory: # no local repository + self._repo_directory = mkdtemp( + prefix=TEMPORARY_DIR_PREFIX_PATTERN, + suffix=f"-{os.getpid()}", + dir=self._temp_directory, + ) + self.log.debug( + f"Cloning {self.origin_url} to {self.directory} " + f"with timeout {self._clone_timeout} seconds" + ) + hgutil.clone(self.origin_url, self._repo_directory, self._clone_timeout) + else: # existing local repository + # Allow to load on disk repository without cloning + # for testing purpose. + self._repo_directory = self.directory - if isinstance(result, Exception): - raise result from None + self._repo = hgutil.repository(self._repo_directory) - return result + return False - def prepare(self): - """Prepare the necessary steps to load an actual remote or local - repository. + def _new_revs(self, heads: List[HgNodeId]) -> Union[HgFilteredSet, HgSpanSet]: + """Return unseen revisions. That is, filter out revisions that are not ancestors of + heads""" + assert self._repo is not None + existing_heads = [] - To load a local repository, pass the optional directory - parameter as filled with a path to a real local folder. + for hg_nodeid in heads: + try: + rev = self._repo[hg_nodeid].rev() + existing_heads.append(rev) + except KeyError: # the node does not exist anymore + pass - To load a remote repository, pass the optional directory - parameter as None. + # select revisions that are not ancestors of heads + # and not the heads themselves + new_revs = self._repo.revs("not ::(%ld)", existing_heads) - Args: - origin_url (str): Origin url to load - visit_date (str/datetime): Date of the visit - directory (str/None): The local directory to load + if new_revs: + self.log.info("New revisions found: %d", len(new_revs)) - """ - self.branches = {} - self.tags = [] - self.releases = {} - self.node_2_rev = {} - self.heads = {} - self.extids = [] + return new_revs - directory = self.directory + def get_hg_revs_to_load(self) -> Iterator[int]: + """Yield hg revision numbers to load. - if not directory: # remote repository - self.working_directory = mkdtemp( - prefix=TEMPORARY_DIR_PREFIX_PATTERN, - suffix="-%s" % os.getpid(), - dir=self.temp_directory, - ) - os.makedirs(self.working_directory, exist_ok=True) - self.hgdir = self.working_directory + """ + assert self._repo is not None + repo: hgutil.Repository = self._repo + + seen_revs: Set[int] = set() + # 1. use snapshot to reuse existing seen heads from it + if self._latest_heads: + for rev in self._new_revs(self._latest_heads): + seen_revs.add(rev) + yield rev + + # 2. Then filter out remaining revisions through the overall extid mappings + # across hg origins + revs_left = repo.revs("all() - ::(%ld)", seen_revs) + hg_nodeids = [repo[nodeid].node() for nodeid in revs_left] + yield from self._new_revs( + [ + HgNodeId(extid.extid) + for extid in self._get_extids_for_hgnodes(hg_nodeids) + ] + ) - self.log.debug( - "Cloning %s to %s with timeout %s seconds", - self.origin_url, - self.hgdir, - self.clone_timeout, - ) + def store_data(self): + """Store fetched data in the database.""" + revs = self.get_hg_revs_to_load() + length_ingested_revs = 0 + + assert self._repo is not None + repo = self._repo + ignored_revs: Set[int] = set() + for rev in revs: + if rev in ignored_revs: + continue try: - self.clone_with_timeout( - self.log, self.origin_url, self.hgdir, self.clone_timeout + self.store_revision(repo[rev]) + length_ingested_revs += 1 + except CorruptedRevision as e: + self._visit_status = "partial" + self.log.warning("Corrupted revision %s", e) + descendents = repo.revs("(%ld)::", [rev]) + ignored_revs.update(descendents) + + if length_ingested_revs == 0: + # no new revision ingested, so uneventful + # still we'll make a snapshot, so we continue + self._load_status = "uneventful" + + branching_info = hgutil.branching_info(repo, ignored_revs) + tags_by_name: Dict[bytes, HgNodeId] = repo.tags() + + snapshot_branches: Dict[bytes, SnapshotBranch] = {} + + for tag_name, hg_nodeid in tags_by_name.items(): + if tag_name == b"tip": + # `tip` is listed in the tags by the Mercurial API but its not a tag + # defined by the user in `.hgtags`. + continue + if hg_nodeid not in self._saved_tags: + label = b"tags/%s" % tag_name + target = self.get_revision_id_from_hg_nodeid(hg_nodeid) + snapshot_branches[label] = SnapshotBranch( + target=self.store_release(tag_name, target), + target_type=TargetType.RELEASE, ) - except CommandErrorWrapper as e: - for msg in [ - b"does not appear to be an hg repository", - b"404: not found", - b"name or service not known", - ]: - if msg in e.err.lower(): - raise NotFound(e.args[0]) from None - raise e - - else: # local repository - self.working_directory = None - self.hgdir = directory - - self.bundle_path = os.path.join(self.hgdir, self.bundle_filename) - self.log.debug("Bundling at %s" % self.bundle_path) - - with hglib.open(self.hgdir) as repo: - self.heads = self.get_heads(repo) - repo.bundle(bytes(self.bundle_path, "utf-8"), all=True, type=b"none-v2") - - self.cache_filename1 = os.path.join( - self.hgdir, "swh-cache-1-%s" % (hex(random.randint(0, 0xFFFFFF))[2:],) - ) - self.cache_filename2 = os.path.join( - self.hgdir, "swh-cache-2-%s" % (hex(random.randint(0, 0xFFFFFF))[2:],) - ) - try: - self.br = Bundle20Reader( - bundlefile=self.bundle_path, - cache_filename=self.cache_filename1, - cache_size=self.cache1_size, + for branch_name, node_id in branching_info.tips.items(): + name = b"branch-tip/%s" % branch_name + target = self.get_revision_id_from_hg_nodeid(node_id) + snapshot_branches[name] = SnapshotBranch( + target=target, target_type=TargetType.REVISION ) - except FileNotFoundError: - # Empty repository! Still a successful visit targeting an - # empty snapshot - self.log.warn("%s is an empty repository!" % self.hgdir) - self.empty_repository = True - else: - self.reduce_effort = set() - if self.reduce_effort_flag: - now = datetime.datetime.now(tz=datetime.timezone.utc) - if (now - self.visit_date).days > 1: - # Assuming that self.visit_date would be today for - # a new visit, treat older visit dates as - # indication of wanting to skip some processing - # effort. - for header, commit in self.br.yield_all_changesets(): - ts = commit["time"].timestamp() - if ts < self.visit_date.timestamp(): - self.reduce_effort.add(header["node"]) - - def has_contents(self): - return not self.empty_repository - - def has_directories(self): - return not self.empty_repository - - def has_revisions(self): - return not self.empty_repository - - def has_releases(self): - return not self.empty_repository - - def fetch_data(self): - """Fetch the data from the data source.""" - pass - - def get_contents(self) -> Iterable[BaseContent]: - """Get the contents that need to be loaded.""" - - # NOTE: This method generates blobs twice to reduce memory usage - # without generating disk writes. - self.file_node_to_hash = {} - hash_to_info = {} - self.num_contents = 0 - contents: Dict[bytes, BaseContent] = {} - missing_contents = set() - - for blob, node_info in self.br.yield_all_blobs(): - self.num_contents += 1 - file_name = node_info[0] - header = node_info[2] - - length = len(blob) - if header["linknode"] in self.reduce_effort: - algorithms = set([ALGO]) - else: - algorithms = DEFAULT_ALGORITHMS - h = MultiHash.from_data(blob, hash_names=algorithms) - content = h.digest() - content["length"] = length - blob_hash = content[ALGO] - self.file_node_to_hash[header["node"]] = blob_hash - - if header["linknode"] in self.reduce_effort: - continue - hash_to_info[blob_hash] = node_info - if self.max_content_size is not None and length >= self.max_content_size: - contents[blob_hash] = SkippedContent( - status="absent", reason="Content too large", **content - ) - else: - contents[blob_hash] = Content(data=blob, status="visible", **content) + for bookmark_name, node_id in branching_info.bookmarks.items(): + name = b"bookmarks/%s" % bookmark_name + target = self.get_revision_id_from_hg_nodeid(node_id) + snapshot_branches[name] = SnapshotBranch( + target=target, target_type=TargetType.REVISION + ) - if file_name == b".hgtags": - # https://www.mercurial-scm.org/wiki/GitConcepts#Tag_model - # overwrite until the last one - self.tags = (t for t in blob.split(b"\n") if t != b"") + for branch_name, branch_heads in branching_info.open_heads.items(): + for index, head in enumerate(branch_heads): + name = b"branch-heads/%s/%d" % (branch_name, index) + target = self.get_revision_id_from_hg_nodeid(head) + snapshot_branches[name] = SnapshotBranch( + target=target, target_type=TargetType.REVISION + ) - if contents: - missing_contents = set( - self.storage.content_missing( - [c.to_dict() for c in contents.values()], key_hash=ALGO + for branch_name, closed_heads in branching_info.closed_heads.items(): + for index, head in enumerate(closed_heads): + name = b"branch-closed-heads/%s/%d" % (branch_name, index) + target = self.get_revision_id_from_hg_nodeid(head) + snapshot_branches[name] = SnapshotBranch( + target=target, target_type=TargetType.REVISION ) + + # If the repo is broken enough or if it has none of the "normal" default + # mechanisms, we ignore `HEAD`. + default_branch_alias = branching_info.default_branch_alias + if default_branch_alias is not None: + snapshot_branches[b"HEAD"] = SnapshotBranch( + target=default_branch_alias, target_type=TargetType.ALIAS, ) + snapshot = Snapshot(branches=snapshot_branches) + self.storage.snapshot_add([snapshot]) + + self.flush() + self.loaded_snapshot_id = snapshot.id - # Clusters needed blobs by file offset and then only fetches the - # groups at the needed offsets. - focs: Dict[int, Dict[bytes, bytes]] = {} # "file/offset/contents" - for blob_hash in missing_contents: - _, file_offset, header = hash_to_info[blob_hash] - focs.setdefault(file_offset, {}) - focs[file_offset][header["node"]] = blob_hash - - for offset, node_hashes in sorted(focs.items()): - for header, data, *_ in self.br.yield_group_objects(group_offset=offset): - node = header["node"] - if node in node_hashes: - blob, meta = self.br.extract_meta_from_blob(data) - content = contents.pop(node_hashes[node], None) - if content: - if ( - self.max_content_size is not None - and len(blob) >= self.max_content_size - ): - yield SkippedContent.from_data( - blob, reason="Content too large" - ) - else: - yield Content.from_data(blob) - - def load_directories(self): - """This is where the work is done to convert manifest deltas from the - repository bundle into SWH directories. + def load_status(self) -> Dict[str, str]: + """Detailed loading status. + Defaults to logging an eventful load. + + Returns: a dictionary that is eventually passed back as the task's + result to the scheduler, allowing tuning of the task recurrence + mechanism. """ - self.mnode_to_tree_id = {} - cache_hints = self.br.build_manifest_hints() + return { + "status": self._load_status, + } - def tree_size(t): - return t.size() + def visit_status(self) -> str: + """Allow overriding the visit status in case of partial load""" + if self._visit_status is not None: + return self._visit_status + return super().visit_status() - self.trees = SelectiveCache( - cache_hints=cache_hints, - size_function=tree_size, - filename=self.cache_filename2, - max_size=self.cache2_size, - ) + def get_revision_id_from_hg_nodeid(self, hg_nodeid: HgNodeId) -> Sha1Git: + """Return the git sha1 of a revision given its hg nodeid. - tree = SimpleTree() - for header, added, removed in self.br.yield_all_manifest_deltas(cache_hints): - node = header["node"] - basenode = header["basenode"] - tree = self.trees.fetch(basenode) or tree # working tree - - for path in removed.keys(): - tree = tree.remove_tree_node_for_path(path) - for path, info in added.items(): - file_node, is_symlink, perms_code = info - tree = tree.add_blob( - path, self.file_node_to_hash[file_node], is_symlink, perms_code - ) + Args: + hg_nodeid: the hg nodeid of the revision. - if header["linknode"] in self.reduce_effort: - self.trees.store(node, tree) - else: - new_dirs = [] - self.mnode_to_tree_id[node] = tree.hash_changed(new_dirs) - self.trees.store(node, tree) - yield header, tree, new_dirs + Returns: + the sha1_git of the revision. + """ + + from_cache = self._revision_nodeid_to_sha1git.get(hg_nodeid) + if from_cache is not None: + return from_cache + # The parent was not loaded in this run, get it from storage + from_storage = [ + extid + for extid in self.storage.extid_get_from_extid( + EXTID_TYPE, ids=[hg_nodeid], version=EXTID_VERSION + ) + ] + + msg = "Expected 1 match from storage for hg node %r, got %d" + assert len(from_storage) == 1, msg % (hg_nodeid.hex(), len(from_storage)) + return from_storage[0].target.object_id + + def get_revision_parents(self, rev_ctx: hgutil.BaseContext) -> Tuple[Sha1Git, ...]: + """Return the git sha1 of the parent revisions. - def get_directories(self) -> Iterable[Directory]: - """Compute directories to load + Args: + hg_nodeid: the hg nodeid of the revision. + Returns: + the sha1_git of the parent revisions. """ - dirs: Dict[Sha1Git, Directory] = {} - self.num_directories = 0 - for _, _, new_dirs in self.load_directories(): - for d in new_dirs: - self.num_directories += 1 - dirs[d["id"]] = Directory.from_dict(d) + parents = [] + for parent_ctx in rev_ctx.parents(): + parent_hg_nodeid = parent_ctx.node() + # nullid is the value of a parent that does not exist + if parent_hg_nodeid == hgutil.NULLID: + continue + revision_id = self.get_revision_id_from_hg_nodeid(parent_hg_nodeid) + parents.append(revision_id) - missing_dirs: List[Sha1Git] = list(dirs.keys()) - if missing_dirs: - missing_dirs = list(self.storage.directory_missing(missing_dirs)) + return tuple(parents) - for _id in missing_dirs: - yield dirs[_id] + def store_revision(self, rev_ctx: hgutil.BaseContext) -> None: + """Store a revision given its hg nodeid. - def get_revisions(self) -> Iterable[Revision]: - """Compute revisions to load + Args: + rev_ctx: the he revision context. + Returns: + the sha1_git of the stored revision. """ - revisions = {} - self.num_revisions = 0 - for header, commit in self.br.yield_all_changesets(): - if header["node"] in self.reduce_effort: + hg_nodeid = rev_ctx.node() + + root_sha1git = self.store_directories(rev_ctx) + + # `Person.from_fullname` is compatible with mercurial's freeform author + # as fullname is what is used in revision hash when available. + author = Person.from_fullname(rev_ctx.user()) + + (timestamp, offset) = rev_ctx.date() + + # TimestampWithTimezone.from_dict will change name + # as it accept more than just dicts + rev_date = TimestampWithTimezone.from_dict(int(timestamp)) + + extra_headers = [ + (b"time_offset_seconds", str(offset).encode(),), + ] + for key, value in rev_ctx.extra().items(): + # The default branch is skipped to match + # the historical implementation + if key == b"branch" and value == b"default": continue - self.num_revisions += 1 - date_dict = identifiers.normalize_timestamp(int(commit["time"].timestamp())) - author_dict = converters.parse_author(commit["user"]) - if commit["manifest"] == Bundle20Reader.NAUGHT_NODE: - directory_id = SimpleTree().hash_changed() - else: - directory_id = self.mnode_to_tree_id[commit["manifest"]] + # transplant_source is converted to match + # the historical implementation + if key == b"transplant_source": + value = hash_to_bytehex(value) + extra_headers.append((key, value)) + + revision = Revision( + author=author, + date=rev_date, + committer=author, + committer_date=rev_date, + type=RevisionType.MERCURIAL, + directory=root_sha1git, + message=rev_ctx.description(), + extra_headers=tuple(extra_headers), + synthetic=False, + parents=self.get_revision_parents(rev_ctx), + ) - extra_headers = [ - ( - b"time_offset_seconds", - str(commit["time_offset_seconds"]).encode("utf-8"), + self._revision_nodeid_to_sha1git[hg_nodeid] = revision.id + self.storage.revision_add([revision]) + + # Save the mapping from SWHID to hg id + revision_swhid = identifiers.CoreSWHID( + object_type=identifiers.ObjectType.REVISION, object_id=revision.id, + ) + self.storage.extid_add( + [ + ExtID( + extid_type=EXTID_TYPE, + extid_version=EXTID_VERSION, + extid=hg_nodeid, + target=revision_swhid, ) ] - extra = commit.get("extra") - if extra: - for e in extra.split(b"\x00"): - k, v = e.split(b":", 1) - # transplant_source stores binary reference to a changeset - # prefer to dump hexadecimal one in the revision metadata - if k == b"transplant_source": - v = hash_to_bytehex(v) - extra_headers.append((k, v)) - - parents = [] - p1 = self.node_2_rev.get(header["p1"]) - p2 = self.node_2_rev.get(header["p2"]) - if p1: - parents.append(p1) - if p2: - parents.append(p2) - - revision = Revision( - author=Person.from_dict(author_dict), - date=TimestampWithTimezone.from_dict(date_dict), - committer=Person.from_dict(author_dict), - committer_date=TimestampWithTimezone.from_dict(date_dict), - type=RevisionType.MERCURIAL, - directory=directory_id, - message=commit["message"], - extra_headers=tuple(extra_headers), - synthetic=False, - parents=tuple(parents), - ) + ) - self.node_2_rev[header["node"]] = revision.id - revisions[revision.id] = revision + def store_release(self, name: bytes, target: Sha1Git) -> Sha1Git: + """Store a release given its name and its target. - revision_swhid = identifiers.CoreSWHID( - object_type=identifiers.ObjectType.REVISION, object_id=revision.id, - ) - self.extids.append( - ExtID( - extid_type=EXTID_TYPE, extid=header["node"], target=revision_swhid - ) - ) + A release correspond to a user defined tag in mercurial. + The mercurial api as a `tip` tag that must be ignored. - # Converts heads to use swh ids - self.heads = { - branch_name: (pointer_nature, self.node_2_rev[node_id]) - for branch_name, (pointer_nature, node_id) in self.heads.items() - } + Args: + name: name of the release. + target: sha1_git of the target revision. - missing_revs = set(revisions.keys()) - if missing_revs: - missing_revs = set(self.storage.revision_missing(list(missing_revs))) - - for rev in missing_revs: - yield revisions[rev] - self.mnode_to_tree_id = None - - def _read_tag(self, tag, split_byte=b" "): - node, *name = tag.split(split_byte) - name = split_byte.join(name) - return node, name - - def get_releases(self) -> Iterable[Release]: - """Get the releases that need to be loaded.""" - self.num_releases = 0 - releases = {} - missing_releases = set() - for t in self.tags: - self.num_releases += 1 - node, name = self._read_tag(t) - node = node.decode() - node_bytes = hash_to_bytes(node) - if not TAG_PATTERN.match(node): - self.log.warn("Wrong pattern (%s) found in tags. Skipping" % (node,)) - continue - if node_bytes not in self.node_2_rev: - self.log.warn( - "No matching revision for tag %s " - "(hg changeset: %s). Skipping" % (name.decode(), node) - ) - continue - tgt_rev = self.node_2_rev[node_bytes] - release = Release( - name=name, - target=tgt_rev, - target_type=ObjectType.REVISION, - message=None, - metadata=None, - synthetic=False, - author=Person(name=None, email=None, fullname=b""), - date=None, - ) - missing_releases.add(release.id) - releases[release.id] = release - self.releases[name] = release.id + Returns: + the sha1_git of the stored release. + """ + release = Release( + name=name, + target=target, + target_type=ObjectType.REVISION, + message=None, + metadata=None, + synthetic=False, + author=Person(name=None, email=None, fullname=b""), + date=None, + ) - if missing_releases: - missing_releases = set(self.storage.release_missing(list(missing_releases))) + self.storage.release_add([release]) - for _id in missing_releases: - yield releases[_id] + return release.id - def get_snapshot(self) -> Snapshot: - """Get the snapshot that need to be loaded.""" - branches: Dict[bytes, Optional[SnapshotBranch]] = {} - for name, (pointer_nature, target) in self.heads.items(): - branches[name] = SnapshotBranch( - target=target, target_type=TargetType.REVISION - ) - if pointer_nature == HEAD_POINTER_NAME: - branches[b"HEAD"] = SnapshotBranch( - target=name, target_type=TargetType.ALIAS - ) - for name, target in self.releases.items(): - branches[name] = SnapshotBranch( - target=target, target_type=TargetType.RELEASE - ) + def store_content(self, rev_ctx: hgutil.BaseContext, file_path: bytes) -> Content: + """Store a revision content hg nodeid and file path. - self.snapshot = Snapshot(branches=branches) - return self.snapshot + Content is a mix of file content at a given revision + and its permissions found in the changeset's manifest. - def store_data(self) -> None: - super().store_data() - self.storage.extid_add(self.extids) + Args: + rev_ctx: the he revision context. + file_path: the hg path of the content. - def get_fetch_history_result(self): - """Return the data to store in fetch_history.""" - return { - "contents": self.num_contents, - "directories": self.num_directories, - "revisions": self.num_revisions, - "releases": self.num_releases, - } + Returns: + the sha1_git of the top level directory. + """ + hg_nodeid = rev_ctx.node() + file_ctx = rev_ctx[file_path] - def load_status(self): - snapshot = self.get_snapshot() - load_status = "eventful" - if self.last_snapshot_id is not None and self.last_snapshot_id == snapshot.id: - load_status = "uneventful" - return { - "status": load_status, - } + try: + file_nodeid = file_ctx.filenode() + except hgutil.LookupError: + # TODO + # Raising CorruptedRevision avoid crashing the whole loading + # but can lead to a lot of missing revisions. + # SkippedContent could be used but need actual content to calculate its id. + # Maybe the hg_nodeid can be used instead. + # Another option could be to just ignore the missing content. + # This point is left to future commits. + # Check for other uses to apply the same logic there. + raise CorruptedRevision(hg_nodeid) + + perms = FLAG_PERMS[file_ctx.flags()] + + # Key is file_nodeid + perms because permissions does not participate + # in content hash in hg while it is the case in swh. + cache_key = (file_nodeid, perms) + + sha1_git = self._content_hash_cache.get(cache_key) + if sha1_git is None: + try: + data = file_ctx.data() + except hgutil.error.RevlogError: + # TODO + # See above use of `CorruptedRevision` + raise CorruptedRevision(hg_nodeid) + content = ModelContent.from_data(data) -class HgArchiveBundle20Loader(HgBundle20Loader): - """Mercurial loader for repository wrapped within archives. + self.storage.content_add([content]) - """ + sha1_git = content.sha1_git + self._content_hash_cache[cache_key] = sha1_git + + # Here we make sure to return only necessary data. + return Content({"sha1_git": sha1_git, "perms": perms}) + + def store_directories(self, rev_ctx: hgutil.BaseContext) -> Sha1Git: + """Store a revision directories given its hg nodeid. + + Mercurial as no directory as in git. A Git like tree must be build + from file paths to obtain each directory hash. + + Args: + rev_ctx: the he revision context. + + Returns: + the sha1_git of the top level directory. + """ + repo: hgutil.Repository = self._repo # mypy can't infer that repo is not None + prev_ctx = repo[self._last_hg_nodeid] + + # TODO maybe do diff on parents + try: + status = prev_ctx.status(rev_ctx) + except hgutil.error.LookupError: + raise CorruptedRevision(rev_ctx.node()) + + for file_path in status.removed: + try: + del self._last_root[file_path] + except KeyError: + raise CorruptedRevision(rev_ctx.node()) + + for file_path in status.added: + content = self.store_content(rev_ctx, file_path) + self._last_root[file_path] = content + + for file_path in status.modified: + content = self.store_content(rev_ctx, file_path) + self._last_root[file_path] = content + + self._last_hg_nodeid = rev_ctx.node() + + directories: Deque[Directory] = deque([self._last_root]) + while directories: + directory = directories.pop() + self.storage.directory_add([directory.to_model()]) + directories.extend( + [item for item in directory.values() if isinstance(item, Directory)] + ) + + return self._last_root.hash + + +class HgArchiveLoader(HgLoader): + """Mercurial loader for repository wrapped within tarballs.""" def __init__( self, storage: StorageInterface, url: str, - visit_date: Optional[datetime.datetime] = None, - archive_path=None, + visit_date: Optional[datetime] = None, + archive_path: str = None, temp_directory: str = "/tmp", max_content_size: Optional[int] = None, ): @@ -670,7 +763,7 @@ storage=storage, url=url, visit_date=visit_date, - logging_class="swh.loader.mercurial.HgArchiveBundle20Loader", + logging_class="swh.loader.mercurial.ArchiveLoaderFromDisk", temp_directory=temp_directory, max_content_size=max_content_size, ) @@ -678,15 +771,16 @@ self.archive_path = archive_path def prepare(self): + """Extract the archive instead of cloning.""" self.archive_extract_temp_dir = tmp_extract( archive=self.archive_path, - dir=self.temp_directory, + dir=self._temp_directory, prefix=TEMPORARY_DIR_PREFIX_PATTERN, - suffix=".dump-%s" % os.getpid(), + suffix=f".dump-{os.getpid()}", log=self.log, source=self.origin_url, ) - repo_name = os.listdir(self.archive_extract_temp_dir)[0] + repo_name = os.listdir(self.temp_dir)[0] self.directory = os.path.join(self.archive_extract_temp_dir, repo_name) super().prepare() diff --git a/swh/loader/mercurial/tasks.py b/swh/loader/mercurial/tasks.py --- a/swh/loader/mercurial/tasks.py +++ b/swh/loader/mercurial/tasks.py @@ -9,7 +9,7 @@ from swh.loader.mercurial.utils import parse_visit_date -from .from_disk import HgArchiveLoaderFromDisk, HgLoaderFromDisk +from .loader import HgArchiveLoader, HgLoader @shared_task(name=__name__ + ".LoadMercurial") @@ -20,11 +20,11 @@ Import a mercurial tarball into swh. - Args: see :func:`HgLoaderFromDisk.load`. + Args: see :func:`HgLoader.load`. """ - loader = HgLoaderFromDisk.from_configfile( + loader = HgLoader.from_configfile( url=url, directory=directory, visit_date=parse_visit_date(visit_date) ) return loader.load() @@ -36,9 +36,9 @@ ): """Import a mercurial tarball into swh. - Args: see :func:`HgArchiveLoaderFromDisk.load`. + Args: see :func:`HgArchiveLoader.load`. """ - loader = HgArchiveLoaderFromDisk.from_configfile( + loader = HgArchiveLoader.from_configfile( url=url, archive_path=archive_path, visit_date=parse_visit_date(visit_date) ) return loader.load() diff --git a/swh/loader/mercurial/tasks_from_disk.py b/swh/loader/mercurial/tasks_from_disk.py deleted file mode 100644 --- a/swh/loader/mercurial/tasks_from_disk.py +++ /dev/null @@ -1,43 +0,0 @@ -# Copyright (C) 2020-2021 The Software Heritage developers -# See the AUTHORS file at the top-level directory of this distribution -# License: GNU General Public License version 3, or any later version -# See top-level LICENSE file for more information - -from typing import Optional - -from celery import shared_task - -from swh.loader.mercurial.utils import parse_visit_date - -from .from_disk import HgArchiveLoaderFromDisk, HgLoaderFromDisk - - -@shared_task(name=__name__ + ".LoadMercurialFromDisk") -def load_hg( - *, url: str, directory: Optional[str] = None, visit_date: Optional[str] = None -): - """Mercurial repository loading - - Import a mercurial tarball into swh. - - Args: see :func:`HgLoaderFromDisk` constructor. - - """ - loader = HgLoaderFromDisk.from_configfile( - url=url, directory=directory, visit_date=parse_visit_date(visit_date) - ) - return loader.load() - - -@shared_task(name=__name__ + ".LoadArchiveMercurialFromDisk") -def load_hg_from_archive( - *, url: str, archive_path: Optional[str] = None, visit_date: Optional[str] = None -): - """Import a mercurial tarball into swh. - - Args: see :func:`HgArchiveLoaderFromDisk` constructor. - """ - loader = HgArchiveLoaderFromDisk.from_configfile( - url=url, archive_path=archive_path, visit_date=parse_visit_date(visit_date) - ) - return loader.load() diff --git a/swh/loader/mercurial/tests/test_from_disk.py b/swh/loader/mercurial/tests/test_from_disk.py deleted file mode 100644 --- a/swh/loader/mercurial/tests/test_from_disk.py +++ /dev/null @@ -1,738 +0,0 @@ -# Copyright (C) 2020-2021 The Software Heritage developers -# See the AUTHORS file at the top-level directory of this distribution -# License: GNU General Public License version 3, or any later version -# See top-level LICENSE file for more information -from datetime import datetime -from hashlib import sha1 -import os -from pathlib import Path -import subprocess -import unittest - -import attr -import pytest - -from swh.loader.mercurial.loader import HgBundle20Loader -from swh.loader.mercurial.utils import parse_visit_date -from swh.loader.tests import ( - assert_last_visit_matches, - check_snapshot, - get_stats, - prepare_repository_from_archive, -) -from swh.model.from_disk import Content, DentryPerms -from swh.model.hashutil import hash_to_bytes, hash_to_hex -from swh.model.identifiers import ObjectType -from swh.model.model import RevisionType, Snapshot, SnapshotBranch, TargetType -from swh.storage import get_storage -from swh.storage.algos.snapshot import snapshot_get_latest - -from ..from_disk import EXTID_VERSION, HgDirectory, HgLoaderFromDisk -from .loader_checker import ExpectedSwhids, LoaderChecker - -VISIT_DATE = parse_visit_date("2016-05-03 15:16:32+00") -assert VISIT_DATE is not None - - -def random_content() -> Content: - """Create minimal content object.""" - data = str(datetime.now()).encode() - return Content({"sha1_git": sha1(data).digest(), "perms": DentryPerms.content}) - - -def test_hg_directory_creates_missing_directories(): - directory = HgDirectory() - directory[b"path/to/some/content"] = random_content() - - -def test_hg_directory_get(): - content = random_content() - directory = HgDirectory() - - assert directory.get(b"path/to/content") is None - assert directory.get(b"path/to/content", content) == content - - directory[b"path/to/content"] = content - assert directory.get(b"path/to/content") == content - - -def test_hg_directory_deletes_empty_directories(): - directory = HgDirectory() - content = random_content() - directory[b"path/to/content"] = content - directory[b"path/to/some/deep/content"] = random_content() - - del directory[b"path/to/some/deep/content"] - - assert directory.get(b"path/to/some/deep") is None - assert directory.get(b"path/to/some") is None - assert directory.get(b"path/to/content") == content - - -def test_hg_directory_when_directory_replaces_file(): - directory = HgDirectory() - directory[b"path/to/some"] = random_content() - directory[b"path/to/some/content"] = random_content() - - -# Those tests assert expectations on repository loading -# by reading expected values from associated json files -# produced by the `swh-hg-identify` command line utility. -# -# It has more granularity than historical tests. -# Assertions will tell if the error comes from the directories -# revisions or release rather than only checking the snapshot. -# -# With more work it should event be possible to know which part -# of an object is faulty. -@pytest.mark.parametrize( - "archive_name", ("hello", "transplant", "the-sandbox", "example") -) -def test_examples(swh_storage, datadir, tmp_path, archive_name): - archive_path = Path(datadir, f"{archive_name}.tgz") - json_path = Path(datadir, f"{archive_name}.json") - repo_url = prepare_repository_from_archive(archive_path, archive_name, tmp_path) - - LoaderChecker( - loader=HgLoaderFromDisk(swh_storage, repo_url), - expected=ExpectedSwhids.load(json_path), - ).check() - - -# This test has as been adapted from the historical `HgBundle20Loader` tests -# to ensure compatibility of `HgLoaderFromDisk`. -# Hashes as been produced by copy pasting the result of the implementation -# to prevent regressions. -def test_loader_hg_new_visit_no_release(swh_storage, datadir, tmp_path): - """Eventful visit should yield 1 snapshot""" - archive_name = "the-sandbox" - archive_path = os.path.join(datadir, f"{archive_name}.tgz") - repo_url = prepare_repository_from_archive(archive_path, archive_name, tmp_path) - - loader = HgLoaderFromDisk(swh_storage, url=repo_url) - - assert loader.load() == {"status": "eventful"} - - tips = { - b"branch-tip/default": "70e750bb046101fdced06f428e73fee471509c56", - b"branch-tip/develop": "a9c4534552df370f43f0ef97146f393ef2f2a08c", - } - closed = { - b"feature/fun_time": "4d640e8064fe69b4c851dfd43915c431e80c7497", - b"feature/green2_loader": "94be9abcf9558213ff301af0ecd8223451ce991d", - b"feature/greenloader": "9f82d95bd3edfb7f18b1a21d6171170395ea44ce", - b"feature/my_test": "dafa445964230e808148db043c126063ea1dc9b6", - b"feature/read2_loader": "9e912851eb64e3a1e08fbb587de7a4c897ce5a0a", - b"feature/readloader": "ddecbc16f4c916c39eacfcb2302e15a9e70a231e", - b"feature/red": "cb36b894129ca7910bb81c457c72d69d5ff111bc", - b"feature/split5_loader": "3ed4b85d30401fe32ae3b1d650f215a588293a9e", - b"feature/split_causing": "c346f6ff7f42f2a8ff867f92ab83a6721057d86c", - b"feature/split_loader": "5f4eba626c3f826820c4475d2d81410759ec911b", - b"feature/split_loader5": "5017ce0b285351da09a2029ea2cf544f79b593c7", - b"feature/split_loading": "4e2dc6d6073f0b6d348f84ded52f9143b10344b9", - b"feature/split_redload": "2d4a801c9a9645fcd3a9f4c06418d8393206b1f3", - b"feature/splitloading": "88b80615ed8561be74a700b92883ec0374ddacb0", - b"feature/test": "61d762d65afb3150e2653d6735068241779c1fcf", - b"feature/test_branch": "be44d5e6cc66580f59c108f8bff5911ee91a22e4", - b"feature/test_branching": "d2164061453ecb03d4347a05a77db83f706b8e15", - b"feature/test_dog": "2973e5dc9568ac491b198f6b7f10c44ddc04e0a3", - } - - mapping = {b"branch-closed-heads/%s/0" % b: n for b, n in closed.items()} - mapping.update(tips) - - expected_branches = { - k: SnapshotBranch(target=hash_to_bytes(v), target_type=TargetType.REVISION) - for k, v in mapping.items() - } - expected_branches[b"HEAD"] = SnapshotBranch( - target=b"branch-tip/default", target_type=TargetType.ALIAS - ) - - expected_snapshot = Snapshot( - id=hash_to_bytes("cbc609dcdced34dbd9938fe81b555170f1abc96f"), - branches=expected_branches, - ) - - assert_last_visit_matches( - loader.storage, - repo_url, - status="full", - type="hg", - snapshot=expected_snapshot.id, - ) - check_snapshot(expected_snapshot, loader.storage) - - stats = get_stats(loader.storage) - expected_stats = { - "content": 2, - "directory": 3, - "origin": 1, - "origin_visit": 1, - "release": 0, - "revision": 58, - "skipped_content": 0, - "snapshot": 1, - } - assert stats == expected_stats - loader2 = HgLoaderFromDisk(swh_storage, url=repo_url) - - assert loader2.load() == {"status": "uneventful"} # nothing new happened - - stats2 = get_stats(loader2.storage) - expected_stats2 = expected_stats.copy() - expected_stats2["origin_visit"] = 2 # one new visit recorded - assert stats2 == expected_stats2 - assert_last_visit_matches( - loader2.storage, - repo_url, - status="full", - type="hg", - snapshot=expected_snapshot.id, - ) # but we got a snapshot nonetheless - - -# This test has as been adapted from the historical `HgBundle20Loader` tests -# to ensure compatibility of `HgLoaderFromDisk`. -# Hashes as been produced by copy pasting the result of the implementation -# to prevent regressions. -def test_loader_hg_new_visit_with_release(swh_storage, datadir, tmp_path): - """Eventful visit with release should yield 1 snapshot""" - - archive_name = "hello" - archive_path = os.path.join(datadir, f"{archive_name}.tgz") - repo_url = prepare_repository_from_archive(archive_path, archive_name, tmp_path) - - loader = HgLoaderFromDisk(swh_storage, url=repo_url, visit_date=VISIT_DATE,) - - actual_load_status = loader.load() - assert actual_load_status == {"status": "eventful"} - - # then - stats = get_stats(loader.storage) - assert stats == { - "content": 3, - "directory": 3, - "origin": 1, - "origin_visit": 1, - "release": 1, - "revision": 3, - "skipped_content": 0, - "snapshot": 1, - } - - # cf. test_loader.org for explaining from where those hashes - tip_release = hash_to_bytes("515c4d72e089404356d0f4b39d60f948b8999140") - release = loader.storage.release_get([tip_release])[0] - assert release is not None - - tip_revision_default = hash_to_bytes("c3dbe4fbeaaa98dd961834e4007edb3efb0e2a27") - revision = loader.storage.revision_get([tip_revision_default])[0] - assert revision is not None - - expected_snapshot = Snapshot( - id=hash_to_bytes("7ef082aa8b53136b1bed97f734504be32679bbec"), - branches={ - b"branch-tip/default": SnapshotBranch( - target=tip_revision_default, target_type=TargetType.REVISION, - ), - b"tags/0.1": SnapshotBranch( - target=tip_release, target_type=TargetType.RELEASE, - ), - b"HEAD": SnapshotBranch( - target=b"branch-tip/default", target_type=TargetType.ALIAS, - ), - }, - ) - - check_snapshot(expected_snapshot, loader.storage) - assert_last_visit_matches( - loader.storage, - repo_url, - type=RevisionType.MERCURIAL.value, - status="full", - snapshot=expected_snapshot.id, - ) - - -# This test has as been adapted from the historical `HgBundle20Loader` tests -# to ensure compatibility of `HgLoaderFromDisk`. -# Hashes as been produced by copy pasting the result of the implementation -# to prevent regressions. -def test_visit_repository_with_transplant_operations(swh_storage, datadir, tmp_path): - """Visit a mercurial repository visit transplant operations within should yield a - snapshot as well. - - """ - - archive_name = "transplant" - archive_path = os.path.join(datadir, f"{archive_name}.tgz") - repo_url = prepare_repository_from_archive(archive_path, archive_name, tmp_path) - - loader = HgLoaderFromDisk(swh_storage, url=repo_url, visit_date=VISIT_DATE,) - - # load hg repository - actual_load_status = loader.load() - assert actual_load_status == {"status": "eventful"} - - # collect swh revisions - assert_last_visit_matches( - loader.storage, repo_url, type=RevisionType.MERCURIAL.value, status="full" - ) - - revisions = [] - snapshot = snapshot_get_latest(loader.storage, repo_url) - for branch in snapshot.branches.values(): - if branch.target_type.value != "revision": - continue - revisions.append(branch.target) - - # extract original changesets info and the transplant sources - hg_changesets = set() - transplant_sources = set() - for rev in loader.storage.revision_log(revisions): - extids = list( - loader.storage.extid_get_from_target(ObjectType.REVISION, [rev["id"]]) - ) - assert len(extids) == 1 - hg_changesets.add(hash_to_hex(extids[0].extid)) - for k, v in rev["extra_headers"]: - if k == b"transplant_source": - transplant_sources.add(v.decode("ascii")) - - # check extracted data are valid - assert len(hg_changesets) > 0 - assert len(transplant_sources) > 0 - assert transplant_sources <= hg_changesets - - -def _partial_copy_storage( - old_storage, origin_url: str, mechanism: str, copy_revisions: bool -): - """Create a new storage, and only copy ExtIDs or head revisions to it.""" - new_storage = get_storage(cls="memory") - snapshot = snapshot_get_latest(old_storage, origin_url) - assert snapshot - heads = [branch.target for branch in snapshot.branches.values()] - - if mechanism == "extid": - extids = old_storage.extid_get_from_target(ObjectType.REVISION, heads) - new_storage.extid_add(extids) - if copy_revisions: - # copy revisions, but erase their metadata to make sure the loader doesn't - # fallback to revision.metadata["nodeid"] - revisions = [ - attr.evolve(rev, metadata={}) - for rev in old_storage.revision_get(heads) - if rev - ] - new_storage.revision_add(revisions) - - else: - assert mechanism == "same storage" - return old_storage - - # copy origin, visit, status - new_storage.origin_add(old_storage.origin_get([origin_url])) - visit = old_storage.origin_visit_get_latest(origin_url) - new_storage.origin_visit_add([visit]) - statuses = old_storage.origin_visit_status_get(origin_url, visit.visit).results - new_storage.origin_visit_status_add(statuses) - new_storage.snapshot_add([snapshot]) - - return new_storage - - -def test_load_unchanged_repo_should_be_uneventful( - swh_storage, datadir, tmp_path, -): - """Checks the loader can find which revisions it already loaded, using ExtIDs.""" - archive_name = "hello" - archive_path = os.path.join(datadir, f"{archive_name}.tgz") - repo_url = prepare_repository_from_archive(archive_path, archive_name, tmp_path) - repo_path = repo_url.replace("file://", "") - - loader = HgLoaderFromDisk(swh_storage, repo_path) - - assert loader.load() == {"status": "eventful"} - assert get_stats(loader.storage) == { - "content": 3, - "directory": 3, - "origin": 1, - "origin_visit": 1, - "release": 1, - "revision": 3, - "skipped_content": 0, - "snapshot": 1, - } - visit_status = assert_last_visit_matches( - loader.storage, repo_path, type=RevisionType.MERCURIAL.value, status="full", - ) - assert visit_status.snapshot is not None - - # Create a new loader (to start with a clean slate, eg. remove the caches), - # with the new, partial, storage - loader2 = HgLoaderFromDisk(swh_storage, repo_path) - assert loader2.load() == {"status": "uneventful"} - - # Should have all the objects - assert get_stats(loader.storage) == { - "content": 3, - "directory": 3, - "origin": 1, - "origin_visit": 2, - "release": 1, - "revision": 3, - "skipped_content": 0, - "snapshot": 1, - } - visit_status2 = assert_last_visit_matches( - loader2.storage, repo_path, type=RevisionType.MERCURIAL.value, status="full", - ) - assert visit_status2.snapshot == visit_status.snapshot - - -def test_closed_branch_incremental(swh_storage, datadir, tmp_path): - """Test that a repository with a closed branch does not trip an incremental load""" - archive_name = "example" - archive_path = os.path.join(datadir, f"{archive_name}.tgz") - repo_url = prepare_repository_from_archive(archive_path, archive_name, tmp_path) - repo_path = repo_url.replace("file://", "") - - loader = HgLoaderFromDisk(swh_storage, repo_path) - - # Test 3 loads: full, and two incremental. - assert loader.load() == {"status": "eventful"} - expected_stats = { - "content": 7, - "directory": 16, - "origin": 1, - "origin_visit": 1, - "release": 0, - "revision": 9, - "skipped_content": 0, - "snapshot": 1, - } - assert get_stats(loader.storage) == expected_stats - assert loader.load() == {"status": "uneventful"} - assert get_stats(loader.storage) == {**expected_stats, "origin_visit": 1 + 1} - assert loader.load() == {"status": "uneventful"} - assert get_stats(loader.storage) == {**expected_stats, "origin_visit": 2 + 1} - - -def test_old_loader_new_loader(swh_storage, datadir, tmp_path): - archive_name = "example" - archive_path = os.path.join(datadir, f"{archive_name}.tgz") - repo_url = prepare_repository_from_archive(archive_path, archive_name, tmp_path) - repo_path = repo_url.replace("file://", "") - - old_loader = HgBundle20Loader(swh_storage, repo_path) - assert old_loader.load() == {"status": "eventful"} - - expected_stats = { - "content": 7, - "directory": 16, - "origin": 1, - "origin_visit": 1, - "release": 0, - "revision": 9, - "skipped_content": 0, - "snapshot": 1, - } - assert get_stats(old_loader.storage) == expected_stats - - # Will pick up more branches, hence a different snapshot - loader = HgLoaderFromDisk(swh_storage, repo_path) - res = loader.load() - new_expected_stats = { - **expected_stats, - "origin_visit": 2, - "snapshot": 2, - } - assert get_stats(loader.storage) == new_expected_stats - assert res == {"status": "eventful"} - - # Shouldn't pick up anything now - loader = HgLoaderFromDisk(swh_storage, repo_path) - assert loader.load() == {"status": "uneventful"} - - # Shouldn't pick up anything either after another load - loader = HgLoaderFromDisk(swh_storage, repo_path) - assert loader.load() == {"status": "uneventful"} - - -def test_load_unchanged_repo__dangling_extid(swh_storage, datadir, tmp_path): - """Checks the loader will load revisions targeted by an ExtID if the - revisions are missing from the storage""" - archive_name = "hello" - archive_path = os.path.join(datadir, f"{archive_name}.tgz") - repo_url = prepare_repository_from_archive(archive_path, archive_name, tmp_path) - repo_path = repo_url.replace("file://", "") - - loader = HgLoaderFromDisk(swh_storage, repo_path) - - assert loader.load() == {"status": "eventful"} - assert get_stats(loader.storage) == { - "content": 3, - "directory": 3, - "origin": 1, - "origin_visit": 1, - "release": 1, - "revision": 3, - "skipped_content": 0, - "snapshot": 1, - } - - old_storage = swh_storage - - # Create a new storage, and only copy ExtIDs or head revisions to it. - # This should be enough for the loader to know revisions were already loaded - new_storage = _partial_copy_storage( - old_storage, repo_path, mechanism="extid", copy_revisions=False - ) - - # Create a new loader (to start with a clean slate, eg. remove the caches), - # with the new, partial, storage - loader = HgLoaderFromDisk(new_storage, repo_path) - - assert get_stats(loader.storage) == { - "content": 0, - "directory": 0, - "origin": 1, - "origin_visit": 1, - "release": 0, - "revision": 0, - "skipped_content": 0, - "snapshot": 1, - } - - assert loader.load() == {"status": "eventful"} - - assert get_stats(loader.storage) == { - "content": 3, - "directory": 3, - "origin": 1, - "origin_visit": 2, - "release": 1, - "revision": 3, - "skipped_content": 0, - "snapshot": 1, - } - - -def test_missing_filelog_should_not_crash(swh_storage, datadir, tmp_path): - archive_name = "missing-filelog" - archive_path = os.path.join(datadir, f"{archive_name}.tgz") - repo_url = prepare_repository_from_archive(archive_path, archive_name, tmp_path) - directory = repo_url.replace("file://", "") - - loader = HgLoaderFromDisk( - storage=swh_storage, - url=repo_url, - directory=directory, # specify directory to avoid clone - visit_date=VISIT_DATE, - ) - - actual_load_status = loader.load() - assert actual_load_status == {"status": "eventful"} - - assert_last_visit_matches(swh_storage, repo_url, status="partial", type="hg") - - -def test_multiple_open_heads(swh_storage, datadir, tmp_path): - archive_name = "multiple-heads" - archive_path = os.path.join(datadir, f"{archive_name}.tgz") - repo_url = prepare_repository_from_archive(archive_path, archive_name, tmp_path) - - loader = HgLoaderFromDisk(storage=swh_storage, url=repo_url,) - - actual_load_status = loader.load() - assert actual_load_status == {"status": "eventful"} - - assert_last_visit_matches(swh_storage, repo_url, status="full", type="hg") - - snapshot = snapshot_get_latest(swh_storage, repo_url) - expected_branches = [ - b"HEAD", - b"branch-heads/default/0", - b"branch-heads/default/1", - b"branch-tip/default", - ] - assert sorted(snapshot.branches.keys()) == expected_branches - - # Check that we don't load anything the second time - loader = HgLoaderFromDisk(storage=swh_storage, url=repo_url,) - - actual_load_status = loader.load() - - assert actual_load_status == {"status": "uneventful"} - - -def hg_strip(repo: str, revset: str) -> None: - """Removes `revset` and all of their descendants from the local repository.""" - # Previously called `hg strip`, it was renamed to `hg debugstrip` in Mercurial 5.7 - # because it's most likely not what most users want to do (they should use some kind - # of history-rewriting tool like `histedit` or `prune`). - # But here, it's exactly what we want to do. - subprocess.check_call(["hg", "debugstrip", revset], cwd=repo) - - -def test_load_repo_with_new_commits(swh_storage, datadir, tmp_path): - archive_name = "hello" - archive_path = Path(datadir, f"{archive_name}.tgz") - json_path = Path(datadir, f"{archive_name}.json") - repo_url = prepare_repository_from_archive(archive_path, archive_name, tmp_path) - - # first load with missing commits - hg_strip(repo_url.replace("file://", ""), "tip") - loader = HgLoaderFromDisk(swh_storage, repo_url) - assert loader.load() == {"status": "eventful"} - assert get_stats(loader.storage) == { - "content": 2, - "directory": 2, - "origin": 1, - "origin_visit": 1, - "release": 0, - "revision": 2, - "skipped_content": 0, - "snapshot": 1, - } - - # second load with all commits - repo_url = prepare_repository_from_archive(archive_path, archive_name, tmp_path) - loader = HgLoaderFromDisk(swh_storage, repo_url) - checker = LoaderChecker(loader=loader, expected=ExpectedSwhids.load(json_path),) - - checker.check() - - assert get_stats(loader.storage) == { - "content": 3, - "directory": 3, - "origin": 1, - "origin_visit": 2, - "release": 1, - "revision": 3, - "skipped_content": 0, - "snapshot": 2, - } - - -def test_load_repo_check_extids_write_version(swh_storage, datadir, tmp_path): - """ExtIDs should be stored with a given version when loading is done""" - archive_name = "hello" - archive_path = Path(datadir, f"{archive_name}.tgz") - repo_url = prepare_repository_from_archive(archive_path, archive_name, tmp_path) - - hg_strip(repo_url.replace("file://", ""), "tip") - loader = HgLoaderFromDisk(swh_storage, repo_url) - assert loader.load() == {"status": "eventful"} - - # Ensure we write ExtIDs to a specific version. - snapshot = snapshot_get_latest(swh_storage, repo_url) - - # First, filter out revisions from that snapshot - revision_ids = [ - branch.target - for branch in snapshot.branches.values() - if branch.target_type == TargetType.REVISION - ] - - assert len(revision_ids) > 0 - - # Those revisions should have their associated ExtID version set to EXTID_VERSION - extids = swh_storage.extid_get_from_target(ObjectType.REVISION, revision_ids) - - assert len(extids) == len(revision_ids) - for extid in extids: - assert extid.extid_version == EXTID_VERSION - - -def test_load_new_extid_should_be_eventful(swh_storage, datadir, tmp_path): - """Changing the extid version should make loaders ignore existing extids, - and load the repo again.""" - archive_name = "hello" - archive_path = os.path.join(datadir, f"{archive_name}.tgz") - repo_url = prepare_repository_from_archive(archive_path, archive_name, tmp_path) - repo_path = repo_url.replace("file://", "") - - with unittest.mock.patch("swh.loader.mercurial.from_disk.EXTID_VERSION", 0): - loader = HgLoaderFromDisk(swh_storage, repo_path) - assert loader.load() == {"status": "eventful"} - - loader = HgLoaderFromDisk(swh_storage, repo_path) - assert loader.load() == {"status": "eventful"} - - loader = HgLoaderFromDisk(swh_storage, repo_path) - assert loader.load() == {"status": "uneventful"} - - with unittest.mock.patch("swh.loader.mercurial.from_disk.EXTID_VERSION", 10000): - loader = HgLoaderFromDisk(swh_storage, repo_path) - assert loader.load() == {"status": "eventful"} - - loader = HgLoaderFromDisk(swh_storage, repo_path) - assert loader.load() == {"status": "uneventful"} - - -def test_loader_hg_extid_filtering(swh_storage, datadir, tmp_path): - """The first visit of a fork should filter already seen revisions (through extids) - - """ - archive_name = "the-sandbox" - archive_path = os.path.join(datadir, f"{archive_name}.tgz") - repo_url = prepare_repository_from_archive(archive_path, archive_name, tmp_path) - - loader = HgLoaderFromDisk(swh_storage, url=repo_url) - - assert loader.load() == {"status": "eventful"} - stats = get_stats(loader.storage) - expected_stats = { - "content": 2, - "directory": 3, - "origin": 1, - "origin_visit": 1, - "release": 0, - "revision": 58, - "skipped_content": 0, - "snapshot": 1, - } - assert stats == expected_stats - - visit_status = assert_last_visit_matches( - loader.storage, repo_url, status="full", type="hg", - ) - - # Make a fork of the first repository we ingested - fork_url = prepare_repository_from_archive( - archive_path, "the-sandbox-reloaded", tmp_path - ) - loader2 = HgLoaderFromDisk( - swh_storage, url=fork_url, directory=str(tmp_path / archive_name) - ) - - assert loader2.load() == {"status": "uneventful"} - - stats = get_stats(loader.storage) - expected_stats2 = expected_stats.copy() - expected_stats2.update( - {"origin": 1 + 1, "origin_visit": 1 + 1,} - ) - assert stats == expected_stats2 - - visit_status2 = assert_last_visit_matches( - loader.storage, fork_url, status="full", type="hg", - ) - assert visit_status.snapshot is not None - assert visit_status2.snapshot == visit_status.snapshot - - -def test_loader_repository_with_bookmark_information(swh_storage, datadir, tmp_path): - """Repository with bookmark information should be ingested correctly - - """ - archive_name = "anomad-d" - archive_path = os.path.join(datadir, f"{archive_name}.tgz") - repo_url = prepare_repository_from_archive(archive_path, archive_name, tmp_path) - - loader = HgLoaderFromDisk(swh_storage, url=repo_url) - - assert loader.load() == {"status": "eventful"} diff --git a/swh/loader/mercurial/tests/test_loader.py b/swh/loader/mercurial/tests/test_loader.py --- a/swh/loader/mercurial/tests/test_loader.py +++ b/swh/loader/mercurial/tests/test_loader.py @@ -1,15 +1,15 @@ -# Copyright (C) 2018-2021 The Software Heritage developers +# Copyright (C) 2020-2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information - -import copy -import logging +from datetime import datetime +from hashlib import sha1 import os -import time +from pathlib import Path +import subprocess +import unittest -import hglib -from hglib.error import CommandError +import attr import pytest from swh.loader.mercurial.utils import parse_visit_date @@ -19,51 +19,150 @@ get_stats, prepare_repository_from_archive, ) +from swh.model.from_disk import Content, DentryPerms from swh.model.hashutil import hash_to_bytes, hash_to_hex from swh.model.identifiers import ObjectType from swh.model.model import RevisionType, Snapshot, SnapshotBranch, TargetType +from swh.storage import get_storage from swh.storage.algos.snapshot import snapshot_get_latest -from ..loader import CloneTimeoutError, HgArchiveBundle20Loader, HgBundle20Loader +from ..loader import EXTID_VERSION, HgDirectory, HgLoader +from .loader_checker import ExpectedSwhids, LoaderChecker VISIT_DATE = parse_visit_date("2016-05-03 15:16:32+00") assert VISIT_DATE is not None +def random_content() -> Content: + """Create minimal content object.""" + data = str(datetime.now()).encode() + return Content({"sha1_git": sha1(data).digest(), "perms": DentryPerms.content}) + + +def test_hg_directory_creates_missing_directories(): + directory = HgDirectory() + directory[b"path/to/some/content"] = random_content() + + +def test_hg_directory_get(): + content = random_content() + directory = HgDirectory() + + assert directory.get(b"path/to/content") is None + assert directory.get(b"path/to/content", content) == content + + directory[b"path/to/content"] = content + assert directory.get(b"path/to/content") == content + + +def test_hg_directory_deletes_empty_directories(): + directory = HgDirectory() + content = random_content() + directory[b"path/to/content"] = content + directory[b"path/to/some/deep/content"] = random_content() + + del directory[b"path/to/some/deep/content"] + + assert directory.get(b"path/to/some/deep") is None + assert directory.get(b"path/to/some") is None + assert directory.get(b"path/to/content") == content + + +def test_hg_directory_when_directory_replaces_file(): + directory = HgDirectory() + directory[b"path/to/some"] = random_content() + directory[b"path/to/some/content"] = random_content() + + +# Those tests assert expectations on repository loading +# by reading expected values from associated json files +# produced by the `swh-hg-identify` command line utility. +# +# It has more granularity than historical tests. +# Assertions will tell if the error comes from the directories +# revisions or release rather than only checking the snapshot. +# +# With more work it should event be possible to know which part +# of an object is faulty. +@pytest.mark.parametrize( + "archive_name", ("hello", "transplant", "the-sandbox", "example") +) +def test_examples(swh_storage, datadir, tmp_path, archive_name): + archive_path = Path(datadir, f"{archive_name}.tgz") + json_path = Path(datadir, f"{archive_name}.json") + repo_url = prepare_repository_from_archive(archive_path, archive_name, tmp_path) + + LoaderChecker( + loader=HgLoader(swh_storage, repo_url), expected=ExpectedSwhids.load(json_path), + ).check() + + +# This test has as been adapted from the historical `HgBundle20Loader` tests +# to ensure compatibility of `HgLoader`. +# Hashes as been produced by copy pasting the result of the implementation +# to prevent regressions. def test_loader_hg_new_visit_no_release(swh_storage, datadir, tmp_path): """Eventful visit should yield 1 snapshot""" archive_name = "the-sandbox" archive_path = os.path.join(datadir, f"{archive_name}.tgz") repo_url = prepare_repository_from_archive(archive_path, archive_name, tmp_path) - loader = HgBundle20Loader(swh_storage, repo_url) + loader = HgLoader(swh_storage, url=repo_url) assert loader.load() == {"status": "eventful"} - tip_revision_develop = "a9c4534552df370f43f0ef97146f393ef2f2a08c" - tip_revision_default = "70e750bb046101fdced06f428e73fee471509c56" + tips = { + b"branch-tip/default": "70e750bb046101fdced06f428e73fee471509c56", + b"branch-tip/develop": "a9c4534552df370f43f0ef97146f393ef2f2a08c", + } + closed = { + b"feature/fun_time": "4d640e8064fe69b4c851dfd43915c431e80c7497", + b"feature/green2_loader": "94be9abcf9558213ff301af0ecd8223451ce991d", + b"feature/greenloader": "9f82d95bd3edfb7f18b1a21d6171170395ea44ce", + b"feature/my_test": "dafa445964230e808148db043c126063ea1dc9b6", + b"feature/read2_loader": "9e912851eb64e3a1e08fbb587de7a4c897ce5a0a", + b"feature/readloader": "ddecbc16f4c916c39eacfcb2302e15a9e70a231e", + b"feature/red": "cb36b894129ca7910bb81c457c72d69d5ff111bc", + b"feature/split5_loader": "3ed4b85d30401fe32ae3b1d650f215a588293a9e", + b"feature/split_causing": "c346f6ff7f42f2a8ff867f92ab83a6721057d86c", + b"feature/split_loader": "5f4eba626c3f826820c4475d2d81410759ec911b", + b"feature/split_loader5": "5017ce0b285351da09a2029ea2cf544f79b593c7", + b"feature/split_loading": "4e2dc6d6073f0b6d348f84ded52f9143b10344b9", + b"feature/split_redload": "2d4a801c9a9645fcd3a9f4c06418d8393206b1f3", + b"feature/splitloading": "88b80615ed8561be74a700b92883ec0374ddacb0", + b"feature/test": "61d762d65afb3150e2653d6735068241779c1fcf", + b"feature/test_branch": "be44d5e6cc66580f59c108f8bff5911ee91a22e4", + b"feature/test_branching": "d2164061453ecb03d4347a05a77db83f706b8e15", + b"feature/test_dog": "2973e5dc9568ac491b198f6b7f10c44ddc04e0a3", + } + + mapping = {b"branch-closed-heads/%s/0" % b: n for b, n in closed.items()} + mapping.update(tips) + + expected_branches = { + k: SnapshotBranch(target=hash_to_bytes(v), target_type=TargetType.REVISION) + for k, v in mapping.items() + } + expected_branches[b"HEAD"] = SnapshotBranch( + target=b"branch-tip/default", target_type=TargetType.ALIAS + ) + expected_snapshot = Snapshot( - id=hash_to_bytes("3b8fe58e467deb7597b12a5fd3b2c096b8c02028"), - branches={ - b"develop": SnapshotBranch( - target=hash_to_bytes(tip_revision_develop), - target_type=TargetType.REVISION, - ), - b"default": SnapshotBranch( - target=hash_to_bytes(tip_revision_default), - target_type=TargetType.REVISION, - ), - b"HEAD": SnapshotBranch(target=b"develop", target_type=TargetType.ALIAS,), - }, + id=hash_to_bytes("cbc609dcdced34dbd9938fe81b555170f1abc96f"), + branches=expected_branches, ) assert_last_visit_matches( - swh_storage, repo_url, status="full", type="hg", snapshot=expected_snapshot.id, + loader.storage, + repo_url, + status="full", + type="hg", + snapshot=expected_snapshot.id, ) - check_snapshot(expected_snapshot, swh_storage) + check_snapshot(expected_snapshot, loader.storage) - stats = get_stats(swh_storage) - assert stats == { + stats = get_stats(loader.storage) + expected_stats = { "content": 2, "directory": 3, "origin": 1, @@ -73,44 +172,42 @@ "skipped_content": 0, "snapshot": 1, } + assert stats == expected_stats + loader2 = HgLoader(swh_storage, url=repo_url) - # Ensure archive loader yields the same snapshot - loader2 = HgArchiveBundle20Loader( - swh_storage, url=archive_path, archive_path=archive_path, visit_date=VISIT_DATE, - ) - - actual_load_status = loader2.load() - assert actual_load_status == {"status": "eventful"} + assert loader2.load() == {"status": "uneventful"} # nothing new happened stats2 = get_stats(loader2.storage) - expected_stats = copy.deepcopy(stats) - expected_stats["origin"] += 1 - expected_stats["origin_visit"] += 1 - assert stats2 == expected_stats - - # That visit yields the same snapshot + expected_stats2 = expected_stats.copy() + expected_stats2["origin_visit"] = 2 # one new visit recorded + assert stats2 == expected_stats2 assert_last_visit_matches( loader2.storage, - archive_path, + repo_url, status="full", type="hg", snapshot=expected_snapshot.id, - ) + ) # but we got a snapshot nonetheless +# This test has as been adapted from the historical `HgBundle20Loader` tests +# to ensure compatibility of `HgLoader`. +# Hashes as been produced by copy pasting the result of the implementation +# to prevent regressions. def test_loader_hg_new_visit_with_release(swh_storage, datadir, tmp_path): """Eventful visit with release should yield 1 snapshot""" + archive_name = "hello" archive_path = os.path.join(datadir, f"{archive_name}.tgz") repo_url = prepare_repository_from_archive(archive_path, archive_name, tmp_path) - loader = HgBundle20Loader(swh_storage, url=repo_url, visit_date=VISIT_DATE,) + loader = HgLoader(swh_storage, url=repo_url, visit_date=VISIT_DATE,) actual_load_status = loader.load() assert actual_load_status == {"status": "eventful"} # then - stats = get_stats(swh_storage) + stats = get_stats(loader.storage) assert stats == { "content": 3, "directory": 3, @@ -124,74 +221,238 @@ # cf. test_loader.org for explaining from where those hashes tip_release = hash_to_bytes("515c4d72e089404356d0f4b39d60f948b8999140") - release = swh_storage.release_get([tip_release])[0] + release = loader.storage.release_get([tip_release])[0] assert release is not None tip_revision_default = hash_to_bytes("c3dbe4fbeaaa98dd961834e4007edb3efb0e2a27") - revision = swh_storage.revision_get([tip_revision_default])[0] + revision = loader.storage.revision_get([tip_revision_default])[0] assert revision is not None expected_snapshot = Snapshot( - id=hash_to_bytes("d35668e02e2ba4321dc951cd308cf883786f918a"), + id=hash_to_bytes("7ef082aa8b53136b1bed97f734504be32679bbec"), branches={ - b"default": SnapshotBranch( + b"branch-tip/default": SnapshotBranch( target=tip_revision_default, target_type=TargetType.REVISION, ), - b"0.1": SnapshotBranch(target=tip_release, target_type=TargetType.RELEASE,), - b"HEAD": SnapshotBranch(target=b"default", target_type=TargetType.ALIAS,), + b"tags/0.1": SnapshotBranch( + target=tip_release, target_type=TargetType.RELEASE, + ), + b"HEAD": SnapshotBranch( + target=b"branch-tip/default", target_type=TargetType.ALIAS, + ), }, ) - check_snapshot(expected_snapshot, swh_storage) + check_snapshot(expected_snapshot, loader.storage) assert_last_visit_matches( - swh_storage, + loader.storage, repo_url, type=RevisionType.MERCURIAL.value, status="full", snapshot=expected_snapshot.id, ) - # Ensure archive loader yields the same snapshot - loader2 = HgArchiveBundle20Loader( - swh_storage, url=archive_path, archive_path=archive_path, visit_date=VISIT_DATE, - ) - actual_load_status = loader2.load() - assert actual_load_status == {"status": "eventful"} +# This test has as been adapted from the historical `HgBundle20Loader` tests +# to ensure compatibility of `HgLoader`. +# Hashes as been produced by copy pasting the result of the implementation +# to prevent regressions. +def test_visit_repository_with_transplant_operations(swh_storage, datadir, tmp_path): + """Visit a mercurial repository visit transplant operations within should yield a + snapshot as well. - stats2 = get_stats(loader2.storage) - expected_stats = copy.deepcopy(stats) - expected_stats["origin"] += 1 - expected_stats["origin_visit"] += 1 - assert stats2 == expected_stats + """ - # That visit yields the same snapshot + archive_name = "transplant" + archive_path = os.path.join(datadir, f"{archive_name}.tgz") + repo_url = prepare_repository_from_archive(archive_path, archive_name, tmp_path) + + loader = HgLoader(swh_storage, url=repo_url, visit_date=VISIT_DATE,) + + # load hg repository + actual_load_status = loader.load() + assert actual_load_status == {"status": "eventful"} + + # collect swh revisions assert_last_visit_matches( - loader2.storage, - archive_path, - status="full", - type="hg", - snapshot=expected_snapshot.id, + loader.storage, repo_url, type=RevisionType.MERCURIAL.value, status="full" ) + revisions = [] + snapshot = snapshot_get_latest(loader.storage, repo_url) + for branch in snapshot.branches.values(): + if branch.target_type.value != "revision": + continue + revisions.append(branch.target) + + # extract original changesets info and the transplant sources + hg_changesets = set() + transplant_sources = set() + for rev in loader.storage.revision_log(revisions): + extids = list( + loader.storage.extid_get_from_target(ObjectType.REVISION, [rev["id"]]) + ) + assert len(extids) == 1 + hg_changesets.add(hash_to_hex(extids[0].extid)) + for k, v in rev["extra_headers"]: + if k == b"transplant_source": + transplant_sources.add(v.decode("ascii")) + + # check extracted data are valid + assert len(hg_changesets) > 0 + assert len(transplant_sources) > 0 + assert transplant_sources <= hg_changesets -def test_visit_with_archive_decompression_failure(swh_storage, mocker, datadir): - """Failure to decompress should fail early, no data is ingested""" - mock_patoo = mocker.patch("swh.loader.mercurial.archive_extract.patoolib") - mock_patoo.side_effect = ValueError +def _partial_copy_storage( + old_storage, origin_url: str, mechanism: str, copy_revisions: bool +): + """Create a new storage, and only copy ExtIDs or head revisions to it.""" + new_storage = get_storage(cls="memory") + snapshot = snapshot_get_latest(old_storage, origin_url) + assert snapshot + heads = [branch.target for branch in snapshot.branches.values()] + + if mechanism == "extid": + extids = old_storage.extid_get_from_target(ObjectType.REVISION, heads) + new_storage.extid_add(extids) + if copy_revisions: + # copy revisions, but erase their metadata to make sure the loader doesn't + # fallback to revision.metadata["nodeid"] + revisions = [ + attr.evolve(rev, metadata={}) + for rev in old_storage.revision_get(heads) + if rev + ] + new_storage.revision_add(revisions) + + else: + assert mechanism == "same storage" + return old_storage + + # copy origin, visit, status + new_storage.origin_add(old_storage.origin_get([origin_url])) + visit = old_storage.origin_visit_get_latest(origin_url) + new_storage.origin_visit_add([visit]) + statuses = old_storage.origin_visit_status_get(origin_url, visit.visit).results + new_storage.origin_visit_status_add(statuses) + new_storage.snapshot_add([snapshot]) + + return new_storage + + +def test_load_unchanged_repo_should_be_uneventful( + swh_storage, datadir, tmp_path, +): + """Checks the loader can find which revisions it already loaded, using ExtIDs.""" archive_name = "hello" archive_path = os.path.join(datadir, f"{archive_name}.tgz") + repo_url = prepare_repository_from_archive(archive_path, archive_name, tmp_path) + repo_path = repo_url.replace("file://", "") - loader = HgArchiveBundle20Loader( - swh_storage, url=archive_path, visit_date=VISIT_DATE, + loader = HgLoader(swh_storage, repo_path) + + assert loader.load() == {"status": "eventful"} + assert get_stats(loader.storage) == { + "content": 3, + "directory": 3, + "origin": 1, + "origin_visit": 1, + "release": 1, + "revision": 3, + "skipped_content": 0, + "snapshot": 1, + } + visit_status = assert_last_visit_matches( + loader.storage, repo_path, type=RevisionType.MERCURIAL.value, status="full", ) + assert visit_status.snapshot is not None - actual_load_status = loader.load() - assert actual_load_status == {"status": "failed"} + # Create a new loader (to start with a clean slate, eg. remove the caches), + # with the new, partial, storage + loader2 = HgLoader(swh_storage, repo_path) + assert loader2.load() == {"status": "uneventful"} - stats = get_stats(swh_storage) - assert stats == { + # Should have all the objects + assert get_stats(loader.storage) == { + "content": 3, + "directory": 3, + "origin": 1, + "origin_visit": 2, + "release": 1, + "revision": 3, + "skipped_content": 0, + "snapshot": 1, + } + visit_status2 = assert_last_visit_matches( + loader2.storage, repo_path, type=RevisionType.MERCURIAL.value, status="full", + ) + assert visit_status2.snapshot == visit_status.snapshot + + +def test_closed_branch_incremental(swh_storage, datadir, tmp_path): + """Test that a repository with a closed branch does not trip an incremental load""" + archive_name = "example" + archive_path = os.path.join(datadir, f"{archive_name}.tgz") + repo_url = prepare_repository_from_archive(archive_path, archive_name, tmp_path) + repo_path = repo_url.replace("file://", "") + + loader = HgLoader(swh_storage, repo_path) + + # Test 3 loads: full, and two incremental. + assert loader.load() == {"status": "eventful"} + expected_stats = { + "content": 7, + "directory": 16, + "origin": 1, + "origin_visit": 1, + "release": 0, + "revision": 9, + "skipped_content": 0, + "snapshot": 1, + } + assert get_stats(loader.storage) == expected_stats + assert loader.load() == {"status": "uneventful"} + assert get_stats(loader.storage) == {**expected_stats, "origin_visit": 1 + 1} + assert loader.load() == {"status": "uneventful"} + assert get_stats(loader.storage) == {**expected_stats, "origin_visit": 2 + 1} + + +def test_load_unchanged_repo__dangling_extid(swh_storage, datadir, tmp_path): + """Checks the loader will load revisions targeted by an ExtID if the + revisions are missing from the storage""" + archive_name = "hello" + archive_path = os.path.join(datadir, f"{archive_name}.tgz") + repo_url = prepare_repository_from_archive(archive_path, archive_name, tmp_path) + repo_path = repo_url.replace("file://", "") + + loader = HgLoader(swh_storage, repo_path) + + assert loader.load() == {"status": "eventful"} + assert get_stats(loader.storage) == { + "content": 3, + "directory": 3, + "origin": 1, + "origin_visit": 1, + "release": 1, + "revision": 3, + "skipped_content": 0, + "snapshot": 1, + } + + old_storage = swh_storage + + # Create a new storage, and only copy ExtIDs or head revisions to it. + # This should be enough for the loader to know revisions were already loaded + new_storage = _partial_copy_storage( + old_storage, repo_path, mechanism="extid", copy_revisions=False + ) + + # Create a new loader (to start with a clean slate, eg. remove the caches), + # with the new, partial, storage + loader = HgLoader(new_storage, repo_path) + + assert get_stats(loader.storage) == { "content": 0, "directory": 0, "origin": 1, @@ -199,172 +460,236 @@ "release": 0, "revision": 0, "skipped_content": 0, - "snapshot": 0, + "snapshot": 1, } - # That visit yields the same snapshot - assert_last_visit_matches( - swh_storage, archive_path, status="failed", type="hg", snapshot=None - ) + assert loader.load() == {"status": "eventful"} -def test_visit_error_with_snapshot_partial(swh_storage, datadir, tmp_path, mocker): - """Incomplete ingestion leads to a 'partial' ingestion status""" - mock = mocker.patch("swh.loader.mercurial.loader.HgBundle20Loader.store_metadata") - mock.side_effect = ValueError + assert get_stats(loader.storage) == { + "content": 3, + "directory": 3, + "origin": 1, + "origin_visit": 2, + "release": 1, + "revision": 3, + "skipped_content": 0, + "snapshot": 1, + } - archive_name = "the-sandbox" + +def test_missing_filelog_should_not_crash(swh_storage, datadir, tmp_path): + archive_name = "missing-filelog" archive_path = os.path.join(datadir, f"{archive_name}.tgz") repo_url = prepare_repository_from_archive(archive_path, archive_name, tmp_path) + directory = repo_url.replace("file://", "") - loader = HgBundle20Loader(swh_storage, repo_url) - - assert loader.load() == {"status": "failed"} - - assert_last_visit_matches( - swh_storage, - repo_url, - status="partial", - type="hg", - snapshot=hash_to_bytes("3b8fe58e467deb7597b12a5fd3b2c096b8c02028"), + loader = HgLoader( + storage=swh_storage, + url=repo_url, + directory=directory, # specify directory to avoid clone + visit_date=VISIT_DATE, ) + actual_load_status = loader.load() + assert actual_load_status == {"status": "eventful"} -@pytest.mark.parametrize( - "error_msg", - [ - b"does not appear to be an HG repository", - b"404: Not Found", - b"404: NOT FOUND", - b"Name or service not known", - ], -) -def test_visit_error_with_status_not_found( - swh_storage, datadir, tmp_path, mocker, error_msg -): - """Not reaching the repo leads to a 'not_found' ingestion status""" - mock = mocker.patch("hglib.clone") - mock.side_effect = CommandError((), 255, b"", error_msg) + assert_last_visit_matches(swh_storage, repo_url, status="partial", type="hg") - archive_name = "the-sandbox" + +def test_multiple_open_heads(swh_storage, datadir, tmp_path): + archive_name = "multiple-heads" archive_path = os.path.join(datadir, f"{archive_name}.tgz") repo_url = prepare_repository_from_archive(archive_path, archive_name, tmp_path) - loader = HgBundle20Loader(swh_storage, repo_url) + loader = HgLoader(storage=swh_storage, url=repo_url,) - assert loader.load() == {"status": "uneventful"} + actual_load_status = loader.load() + assert actual_load_status == {"status": "eventful"} - assert_last_visit_matches( - swh_storage, repo_url, status="not_found", type="hg", snapshot=None, - ) + assert_last_visit_matches(swh_storage, repo_url, status="full", type="hg") + snapshot = snapshot_get_latest(swh_storage, repo_url) + expected_branches = [ + b"HEAD", + b"branch-heads/default/0", + b"branch-heads/default/1", + b"branch-tip/default", + ] + assert sorted(snapshot.branches.keys()) == expected_branches -def test_visit_error_with_clone_error(swh_storage, datadir, tmp_path, mocker): - """Testing failures other than 'not_found'""" + # Check that we don't load anything the second time + loader = HgLoader(storage=swh_storage, url=repo_url,) - mock = mocker.patch("hglib.clone") - mock.side_effect = CommandError((), 255, b"", b"out of disk space") + actual_load_status = loader.load() - archive_name = "the-sandbox" - archive_path = os.path.join(datadir, f"{archive_name}.tgz") + assert actual_load_status == {"status": "uneventful"} + + +def hg_strip(repo: str, revset: str) -> None: + """Removes `revset` and all of their descendants from the local repository.""" + # Previously called `hg strip`, it was renamed to `hg debugstrip` in Mercurial 5.7 + # because it's most likely not what most users want to do (they should use some kind + # of history-rewriting tool like `histedit` or `prune`). + # But here, it's exactly what we want to do. + subprocess.check_call(["hg", "debugstrip", revset], cwd=repo) + + +def test_load_repo_with_new_commits(swh_storage, datadir, tmp_path): + archive_name = "hello" + archive_path = Path(datadir, f"{archive_name}.tgz") + json_path = Path(datadir, f"{archive_name}.json") repo_url = prepare_repository_from_archive(archive_path, archive_name, tmp_path) - loader = HgBundle20Loader(swh_storage, repo_url) + # first load with missing commits + hg_strip(repo_url.replace("file://", ""), "tip") + loader = HgLoader(swh_storage, repo_url) + assert loader.load() == {"status": "eventful"} + assert get_stats(loader.storage) == { + "content": 2, + "directory": 2, + "origin": 1, + "origin_visit": 1, + "release": 0, + "revision": 2, + "skipped_content": 0, + "snapshot": 1, + } - assert loader.load() == {"status": "failed"} + # second load with all commits + repo_url = prepare_repository_from_archive(archive_path, archive_name, tmp_path) + loader = HgLoader(swh_storage, repo_url) + checker = LoaderChecker(loader=loader, expected=ExpectedSwhids.load(json_path),) - assert_last_visit_matches( - swh_storage, repo_url, status="failed", type="hg", snapshot=None, - ) + checker.check() + assert get_stats(loader.storage) == { + "content": 3, + "directory": 3, + "origin": 1, + "origin_visit": 2, + "release": 1, + "revision": 3, + "skipped_content": 0, + "snapshot": 2, + } -def test_visit_repository_with_transplant_operations(swh_storage, datadir, tmp_path): - """Visit a mercurial repository visit transplant operations within should yield a - snapshot as well. - """ +def test_load_repo_check_extids_write_version(swh_storage, datadir, tmp_path): + """ExtIDs should be stored with a given version when loading is done""" + archive_name = "hello" + archive_path = Path(datadir, f"{archive_name}.tgz") + repo_url = prepare_repository_from_archive(archive_path, archive_name, tmp_path) - archive_name = "transplant" + hg_strip(repo_url.replace("file://", ""), "tip") + loader = HgLoader(swh_storage, repo_url) + assert loader.load() == {"status": "eventful"} + + # Ensure we write ExtIDs to a specific version. + snapshot = snapshot_get_latest(swh_storage, repo_url) + + # First, filter out revisions from that snapshot + revision_ids = [ + branch.target + for branch in snapshot.branches.values() + if branch.target_type == TargetType.REVISION + ] + + assert len(revision_ids) > 0 + + # Those revisions should have their associated ExtID version set to EXTID_VERSION + extids = swh_storage.extid_get_from_target(ObjectType.REVISION, revision_ids) + + assert len(extids) == len(revision_ids) + for extid in extids: + assert extid.extid_version == EXTID_VERSION + + +def test_load_new_extid_should_be_eventful(swh_storage, datadir, tmp_path): + """Changing the extid version should make loaders ignore existing extids, + and load the repo again.""" + archive_name = "hello" archive_path = os.path.join(datadir, f"{archive_name}.tgz") repo_url = prepare_repository_from_archive(archive_path, archive_name, tmp_path) - loader = HgBundle20Loader(swh_storage, url=repo_url, visit_date=VISIT_DATE,) + repo_path = repo_url.replace("file://", "") - # load hg repository - actual_load_status = loader.load() - assert actual_load_status == {"status": "eventful"} + with unittest.mock.patch("swh.loader.mercurial.loader.EXTID_VERSION", 0): + loader = HgLoader(swh_storage, repo_path) + assert loader.load() == {"status": "eventful"} - # collect swh revisions - assert_last_visit_matches( - swh_storage, repo_url, type=RevisionType.MERCURIAL.value, status="full" - ) - - revisions = [] - snapshot = snapshot_get_latest(swh_storage, repo_url) - for branch in snapshot.branches.values(): - if branch.target_type.value != "revision": - continue - revisions.append(branch.target) + loader = HgLoader(swh_storage, repo_path) + assert loader.load() == {"status": "eventful"} - # extract original changesets info and the transplant sources - hg_changesets = set() - transplant_sources = set() - for rev in swh_storage.revision_log(revisions): - extids = list( - loader.storage.extid_get_from_target(ObjectType.REVISION, [rev["id"]]) - ) - assert len(extids) == 1 - hg_changesets.add(hash_to_hex(extids[0].extid)) - for k, v in rev["extra_headers"]: - if k == b"transplant_source": - transplant_sources.add(v.decode("ascii")) + loader = HgLoader(swh_storage, repo_path) + assert loader.load() == {"status": "uneventful"} - # check extracted data are valid - assert len(hg_changesets) > 0 - assert len(transplant_sources) > 0 - assert transplant_sources.issubset(hg_changesets) + with unittest.mock.patch("swh.loader.mercurial.loader.EXTID_VERSION", 10000): + loader = HgLoader(swh_storage, repo_path) + assert loader.load() == {"status": "eventful"} + loader = HgLoader(swh_storage, repo_path) + assert loader.load() == {"status": "uneventful"} -def test_clone_with_timeout_timeout(caplog, tmp_path, monkeypatch): - log = logging.getLogger("test_clone_with_timeout") - def clone_timeout(source, dest, *args, **kwargs): - time.sleep(60) +def test_loader_hg_extid_filtering(swh_storage, datadir, tmp_path): + """The first visit of a fork should filter already seen revisions (through extids) - monkeypatch.setattr(hglib, "clone", clone_timeout) + """ + archive_name = "the-sandbox" + archive_path = os.path.join(datadir, f"{archive_name}.tgz") + repo_url = prepare_repository_from_archive(archive_path, archive_name, tmp_path) - with pytest.raises(CloneTimeoutError): - HgBundle20Loader.clone_with_timeout( - log, "https://www.mercurial-scm.org/repo/hello", tmp_path, 1 - ) + loader = HgLoader(swh_storage, url=repo_url) - for record in caplog.records: - assert record.levelname == "WARNING" - assert "https://www.mercurial-scm.org/repo/hello" in record.getMessage() - assert record.args == ("https://www.mercurial-scm.org/repo/hello", 1) + assert loader.load() == {"status": "eventful"} + stats = get_stats(loader.storage) + expected_stats = { + "content": 2, + "directory": 3, + "origin": 1, + "origin_visit": 1, + "release": 0, + "revision": 58, + "skipped_content": 0, + "snapshot": 1, + } + assert stats == expected_stats + visit_status = assert_last_visit_matches( + loader.storage, repo_url, status="full", type="hg", + ) -def test_clone_with_timeout_returns(caplog, tmp_path, monkeypatch): - log = logging.getLogger("test_clone_with_timeout") + # Make a fork of the first repository we ingested + fork_url = prepare_repository_from_archive( + archive_path, "the-sandbox-reloaded", tmp_path + ) + loader2 = HgLoader( + swh_storage, url=fork_url, directory=str(tmp_path / archive_name) + ) - def clone_return(source, dest, *args, **kwargs): - return (source, dest) + assert loader2.load() == {"status": "uneventful"} - monkeypatch.setattr(hglib, "clone", clone_return) + stats = get_stats(loader.storage) + expected_stats2 = expected_stats.copy() + expected_stats2.update( + {"origin": 1 + 1, "origin_visit": 1 + 1,} + ) + assert stats == expected_stats2 - assert HgBundle20Loader.clone_with_timeout( - log, "https://www.mercurial-scm.org/repo/hello", tmp_path, 1 - ) == ("https://www.mercurial-scm.org/repo/hello", tmp_path) + visit_status2 = assert_last_visit_matches( + loader.storage, fork_url, status="full", type="hg", + ) + assert visit_status.snapshot is not None + assert visit_status2.snapshot == visit_status.snapshot -def test_clone_with_timeout_exception(caplog, tmp_path, monkeypatch): - log = logging.getLogger("test_clone_with_timeout") +def test_loader_repository_with_bookmark_information(swh_storage, datadir, tmp_path): + """Repository with bookmark information should be ingested correctly - def clone_return(source, dest, *args, **kwargs): - raise ValueError("Test exception") + """ + archive_name = "anomad-d" + archive_path = os.path.join(datadir, f"{archive_name}.tgz") + repo_url = prepare_repository_from_archive(archive_path, archive_name, tmp_path) - monkeypatch.setattr(hglib, "clone", clone_return) + loader = HgLoader(swh_storage, url=repo_url) - with pytest.raises(ValueError) as excinfo: - HgBundle20Loader.clone_with_timeout( - log, "https://www.mercurial-scm.org/repo/hello", tmp_path, 1 - ) - assert "Test exception" in excinfo.value.args[0] + assert loader.load() == {"status": "eventful"} diff --git a/swh/loader/mercurial/tests/test_tasks.py b/swh/loader/mercurial/tests/test_tasks.py --- a/swh/loader/mercurial/tests/test_tasks.py +++ b/swh/loader/mercurial/tests/test_tasks.py @@ -1,4 +1,4 @@ -# Copyright (C) 2018-2020 The Software Heritage developers +# Copyright (C) 2018-2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information @@ -7,7 +7,7 @@ def test_loader( mocker, swh_config, swh_scheduler_celery_app, swh_scheduler_celery_worker ): - mock_loader = mocker.patch("swh.loader.mercurial.from_disk.HgLoaderFromDisk.load") + mock_loader = mocker.patch("swh.loader.mercurial.loader.HgLoader.load") mock_loader.return_value = {"status": "eventful"} res = swh_scheduler_celery_app.send_task( @@ -26,9 +26,7 @@ def test_archive_loader( mocker, swh_config, swh_scheduler_celery_app, swh_scheduler_celery_worker ): - mock_loader = mocker.patch( - "swh.loader.mercurial.from_disk.HgArchiveLoaderFromDisk.load" - ) + mock_loader = mocker.patch("swh.loader.mercurial.loader.HgArchiveLoader.load") mock_loader.return_value = {"status": "uneventful"} res = swh_scheduler_celery_app.send_task( diff --git a/swh/loader/mercurial/tests/test_tasks_from_disk.py b/swh/loader/mercurial/tests/test_tasks_from_disk.py deleted file mode 100644 --- a/swh/loader/mercurial/tests/test_tasks_from_disk.py +++ /dev/null @@ -1,47 +0,0 @@ -# Copyright (C) 2018-2020 The Software Heritage developers -# See the AUTHORS file at the top-level directory of this distribution -# License: GNU General Public License version 3, or any later version -# See top-level LICENSE file for more information - - -def test_loader( - mocker, swh_config, swh_scheduler_celery_app, swh_scheduler_celery_worker -): - mock_loader = mocker.patch("swh.loader.mercurial.from_disk.HgLoaderFromDisk.load") - mock_loader.return_value = {"status": "eventful"} - - res = swh_scheduler_celery_app.send_task( - "swh.loader.mercurial.tasks_from_disk.LoadMercurialFromDisk", - kwargs={"url": "origin_url", "directory": "/some/repo", "visit_date": "now",}, - ) - - assert res - res.wait() - assert res.successful() - - assert res.result == {"status": "eventful"} - mock_loader.assert_called_once_with() - - -def test_archive_loader( - mocker, swh_config, swh_scheduler_celery_app, swh_scheduler_celery_worker -): - mock_loader = mocker.patch( - "swh.loader.mercurial.from_disk.HgArchiveLoaderFromDisk.load" - ) - mock_loader.return_value = {"status": "uneventful"} - - res = swh_scheduler_celery_app.send_task( - "swh.loader.mercurial.tasks_from_disk.LoadArchiveMercurialFromDisk", - kwargs={ - "url": "another_url", - "archive_path": "/some/tar.tgz", - "visit_date": "now", - }, - ) - assert res - res.wait() - assert res.successful() - - assert res.result == {"status": "uneventful"} - mock_loader.assert_called_once_with()