diff --git a/README.md b/README.md --- a/README.md +++ b/README.md @@ -6,9 +6,6 @@ before. The main entry points are: -- :class:`swh.loader.mercurial.loader.HgBundle20Loader` which reads and loads a remote - repository (or local). - - :class:`swh.loader.mercurial.from_disk.HgLoaderFromDisk` which reads and loads a local repository into an SWH archive. @@ -30,6 +27,5 @@ ## Basic use ``` bash -swh loader --C /tmp/mercurial.yml run mercurial https://www.mercurial-scm.org/repo/hello +swh loader --C /tmp/mercurial.yml run mercurial_from_disk https://www.mercurial-scm.org/repo/hello ``` -or `mercurial_from_disk` diff --git a/conftest.py b/conftest.py --- a/conftest.py +++ b/conftest.py @@ -15,6 +15,5 @@ @pytest.fixture(scope="session") def swh_scheduler_celery_includes(swh_scheduler_celery_includes): return swh_scheduler_celery_includes + [ - "swh.loader.mercurial.tasks", "swh.loader.mercurial.tasks_from_disk", ] diff --git a/setup.py b/setup.py --- a/setup.py +++ b/setup.py @@ -1,5 +1,5 @@ #!/usr/bin/env python3 -# Copyright (C) 2015-2020 The Software Heritage developers +# Copyright (C) 2015-2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information @@ -53,7 +53,6 @@ include_package_data=True, entry_points=""" [swh.workers] - loader.mercurial=swh.loader.mercurial:register loader.mercurial_from_disk=swh.loader.mercurial:register_from_disk [console_scripts] swh-hg-identify=swh.loader.mercurial.identify:main diff --git a/swh/loader/mercurial/__init__.py b/swh/loader/mercurial/__init__.py --- a/swh/loader/mercurial/__init__.py +++ b/swh/loader/mercurial/__init__.py @@ -1,4 +1,4 @@ -# Copyright (C) 2019 The Software Heritage developers +# Copyright (C) 2019-2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information @@ -7,16 +7,6 @@ from typing import Any, Mapping -def register() -> Mapping[str, Any]: - """Register the current worker module's definition""" - from .loader import HgBundle20Loader - - return { - "task_modules": [f"{__name__}.tasks"], - "loader": HgBundle20Loader, - } - - def register_from_disk() -> Mapping[str, Any]: """Register the current worker module's definition""" from .from_disk import HgLoaderFromDisk diff --git a/swh/loader/mercurial/bundle20_reader.py b/swh/loader/mercurial/bundle20_reader.py deleted file mode 100644 --- a/swh/loader/mercurial/bundle20_reader.py +++ /dev/null @@ -1,621 +0,0 @@ -# Copyright (C) 2017 The Software Heritage developers -# See the AUTHORS file at the top-level directory of this distribution -# License: GNU General Public License version 3, or any later version -# See top-level LICENSE file for more information - -"""This document contains code for extracting all of the data from -Mercurial version 2 bundle file. It is referenced by -bundle20_loader.py - -""" - -# ============================================================================= -# ============================================================================= -# BACKGROUND -# ============================================================================= -# ============================================================================= -# -# https://www.mercurial-scm.org/wiki/BundleFormat says: -# "The new bundle format design is described on the BundleFormat2 page." -# -# https://www.mercurial-scm.org/wiki/BundleFormat2#Format_of_the_Bundle2_Container says: # noqa -# "The latest description of the binary format can be found as comment in the -# Mercurial source code." -# -# https://www.mercurial-scm.org/repo/hg/file/default/mercurial/help/internals/bundles.txt says: # noqa -# "The 'HG20' format is not yet documented here. See the inline comments in -# 'mercurial/exchange.py' for now." -# -# ----------------------------------------------------------------------------- -# Avi says: -# ----------------------------------------------------------------------------- -# -# All of the above official(?) statements seem to be quite wrong. -# -# The mercurial-scm wiki is a cluster#@*& of missing pages, bad links, wrong -# information, obsolete information, undecipherable names, and half-started -# leavings that only sort of look like content. I don't understand who or what -# it's there for. I think that means it's not there for me? -# -# https://www.mercurial-scm.org/wiki/BundleFormat2#New_header is wrong and -# bizarre, and there isn't any other information on the page. -# -# -# https://www.mercurial-scm.org/repo/hg/file/de86a6872d06/mercurial/help/internals/changegroups.txt # noqa -# (`hg help internals.changegroups`) is very close to what we need. -# It is accurate, current, and thorough. -# It describes much of the internal structure, which is super helpful if you -# know in advance which info can be trusted, but it doesn't describe any of the -# file-level details, including the file headers and that the entire bundle -# is broken into overlaid 4KB chunks starting from just after the bundle -# header, nor does it describe what any of the component elements are used for, -# nor does it explain the meta-message segment in the blob deltas, nor does it -# explain the file flags occasionally appended to manifest file hashes. Also it -# says: "The [delta data] format is described more fully in 'hg help -# internals.bdiff'", which is also wrong. As far as I can tell, that -# file has never existed. -# -# It does however have one potentially extremely useful note buried in the -# middle that, in hindsight, could have significant implications for complexity -# and performance in future Mercurial loading work. -# -# It says: "In version 1, the delta is always applied against the previous node -# from the changegroup or the first parent if this is the first entry in the -# changegroup." -# -# If the next version of HG support for SWH can reliably get version 1 data, -# then it could be implemented entirely without worrying about ballooning -# memory utilization, which would shrink the code significantly and probably be -# faster too. So maybe HG10 bundles instead of HG20 bundles are superior for -# this task? But then I read that servers can optionally disable serving -# version 1 content, and I like to think that this loader could eventually -# be applied directly to a network stream without an intermediate phase for -# cloning and local bundling, so...It seemed like a good idea at the time? -# -# ----------------------------------------------------------------------------- -# Other notes and thoughts: -# ----------------------------------------------------------------------------- -# 1) -# This is a relatively minor detail, but -# Mercurial nodes are not content-addressable like Git's are. -# -# https://www.mercurial-scm.org/wiki/Nodeid explains: "If you modify a file, -# commit the change, and then modify it to restore the original contents, the -# contents are the same but the history is different, so the file will get a -# new nodeid. This history-sensitivity is obtained by calculating the nodeid -# from the concatenation of the parent nodeids with the file's contents..." -# -# The result is that we always have to collect and hash everything at least -# once in order to know if we've seen something like it before, because nothing -# tells us that the node we're looking at is unique. We can use node ids for -# linking disparate elements together (e.g. commit to manifest) but not for -# determining whether two elements in the same group are identical in all but -# descendency. So there's no way to save time on duplicate hashing. Well... -# there is the copied_file blob metadata, but, lol. -# -# 2) -# Most of the code complexity is due to dealing with 'version 2' changegroups, -# for which we need to keep track of the entire history of all updates made -# to a given file or working directory tree structure, because a revision -# delta could be applied over any of the prior revisions all the way back to -# rev 0, according to whenever files were branched/merged/uncommitted/etc. For -# very large repositories with a lot of churn, this can quickly expand to -# require multiple gigabytes of space, possibly exceeding RAM availability if -# one desires to keep as much data resident in memory as possible to boost -# performance. mozilla-unified, for instance, produces some 2 million+ blobs -# (1.6 million+ unique). Nested umpteen subdirectory levels deep, those blobs -# balloon into a quantity of directory subtrees that rapidly exceeds an 8GB RAM -# laptop's ability to keep them all active without a good amount of care and -# pruning. The code here tries to strike a balance between memory utilization -# and performance. -# -# This problem is also referenced in the last paragraph of the previous -# section, where potentially this problem wouldn't exist for 'version 1' data -# if we can reliably get it. Can we? Either that or not use bundles at all, -# which has other costs. -# -# 3) -# If the list of changed files stored by the changesets had indicated which -# of those changed files were added or modified and which ones were removed, -# this code could be much faster. Right now we have to perform a huge number of -# substring replacements (see the apply_revdata method) to produce a complete -# file manifest for each commit (as a string!!!) in order to know how to get -# the set of removed files from the next delta. We can intuit from every -# manifest delta which files were modified or added, but I believe there's no -# way to intuit which files were removed without actually having the complete -# prior state and without the list of removals being explicitly given. If you -# have an explicit list of all the files that were removed for any given commit -# changegroup, and you combine that with the delta updates in the manifest -# changegroups which detail the set of files that have been added or modified, -# then you wouldn't even have to apply any of the string deltas to get a -# complete understanding of the set of differences between one manifest and the -# next. Not having this effective speed boost is rather unfortunate; it would -# require only one extra stored byte per commit to differentiate removals and -# would make extracting bundles lightning fast. -# ============================================================================ -## - -from binascii import unhexlify -from collections import OrderedDict -from datetime import datetime -import itertools -import struct - -from .chunked_reader import ChunkedFileReader -from .objects import SelectiveCache - - -def unpack(fmt_str, source): - """Utility function for fetching the right number of bytes from a stream to - satisfy a struct.unpack pattern. - - args: - fmt_str: a struct.unpack string pattern - (e.g. '>I' for 4 bytes big-endian) - source: any IO object that has a read() method which - returns an appropriate sequence of bytes - """ - ret = struct.unpack(fmt_str, source.read(struct.calcsize(fmt_str))) - if len(ret) == 1: - return ret[0] - return ret - - -class Bundle20Reader(object): - """Parser for extracting data from Mercurial Bundle20 files. - NOTE: Currently only works on uncompressed HG20 bundles, but checking for - COMPRESSION=<2chars> and loading the appropriate stream decompressor - at that point would be trivial to add if necessary. - - args: - bundlefile (str): name of the binary repository bundle file - cache_filename (str): path to the disk cache used (transited - to the SelectiveCache instance) - cache_size (int): tuning parameter for the upper RAM limit used by - historical data caches. The default is defined in the - SelectiveCache class. - - """ - - NAUGHT_NODE = b"\x00" * 20 - - def __init__(self, bundlefile, cache_filename, cache_size=None): - self.bundlefile = bundlefile - self.cache_filename = cache_filename - bfile = open(bundlefile, "rb", buffering=200 * 1024 * 1024) - - btype = bfile.read(4) # 'HG20' - if btype != b"HG20": - raise Exception(bundlefile, b"Not an HG20 bundle. First 4 bytes:" + btype) - bfile.read(4) # '\x00\x00\x00\x00' - - self.params = self.read_bundle_header(bfile) - # print('PARAMETERS', self.params) - self.num_commits = self.params[b"nbchanges"] - - self.filereader = ChunkedFileReader(bfile) - - self.cache_size = cache_size - self.blobs_offset = None - self.changes_offset = self.filereader.tell() - self.changes_next_offset = None - self.manifests_offset = None - self.manifests_next_offset = None - self.id_to_info = {} - - def read_bundle_header(self, bfile): - """Parse the file header which describes the format and parameters. - See the structure diagram at the top of the file for more insight. - - args: - bfile: bundle file handle with the cursor at the start offset of - the content header (the 9th byte in the file) - - returns: - dict of decoded bundle parameters - """ - unpack(">I", bfile) # header length - chg_len = unpack(">B", bfile) # len('CHANGEGROUP') == 11 - bfile.read(chg_len) # should say 'CHANGEGROUP' - unpack(">I", bfile) # probably \x00\x00\x00\x00 - - n_mandatory, n_advisory = unpack(">BB", bfile) # parameter counts - mandatory_params = [ - (key_len, val_len) - for key_len, val_len in [unpack(">BB", bfile) for i in range(n_mandatory)] - ] - advisory_params = [ - (key_len, val_len) - for key_len, val_len in [unpack(">BB", bfile) for i in range(n_advisory)] - ] - params = {} - - for key_len, val_len in mandatory_params + advisory_params: - key = unpack(">%ds" % key_len, bfile) - val = int(unpack(">%ds" % val_len, bfile)) - params[key] = val - - return params - - def revdata_iterator(self, bytes_to_read): - """A chunk's revdata section is a series of start/end/length/data_delta - content updates called RevDiffs that indicate components of a text diff - applied to the node's basenode. The sum length of all diffs is the - length indicated at the beginning of the chunk at the start of the - header. - See the structure diagram at the top of the file for more insight. - - args: - bytes_to_read: int total number of bytes in the chunk's revdata - yields: - (int, int, read iterator) representing a single text diff component - """ - while bytes_to_read > 0: - start_offset = unpack(">I", self.filereader) - end_offset = unpack(">I", self.filereader) - blocklen = unpack(">I", self.filereader) - delta_it = self.filereader.read_iterator(blocklen) - bytes_to_read -= 12 + blocklen - yield (start_offset, end_offset, delta_it) # RevDiff - - def read_chunk_header(self): - """The header of a RevChunk describes the id ('node') for the current - change, the commit id ('linknode') associated with this change, - the parental heritage ('p1' and 'p2'), and the node to which the - revdata updates will apply ('basenode'). 'linknode' is the same as - 'node' when reading the commit log because any commit is already - itself. 'basenode' for a changeset will be NAUGHT_NODE, because - changeset chunks include complete information and not diffs. - See the structure diagram at the top of the file for more insight. - - returns: - dict of the next delta header - """ - header = self.filereader.read(100) - header = { - "node": header[0:20], - "p1": header[20:40], - "p2": header[40:60], - "basenode": header[60:80], - "linknode": header[80:100], - } - return header - - def read_revchunk(self): - """Fetch a complete RevChunk. - A RevChunk contains the collection of line changes made in a particular - update. header['node'] identifies which update. Commits, manifests, and - files all have these. Each chunk contains an indicator of the whole - chunk size, an update header, and then the body of the update as a - series of text diff components. - See the structure diagram at the top of the file for more insight. - - returns: - tuple(dict, iterator) of (header, chunk data) if there is another - chunk in the group, else None - """ - size = unpack(">I", self.filereader) - 104 - if size >= 0: - header = self.read_chunk_header() - return (header, self.revdata_iterator(size)) - else: - return None # NullChunk - - def extract_commit_metadata(self, data): - """Converts the binary commit metadata format into a dict. - - args: - data: bytestring of encoded commit information - - returns: - dict of decoded commit information - """ - parts, message = data.split(b"\n\n", 1) - parts = parts.split(b"\n") - commit = {} - commit["message"] = message - commit["manifest"] = unhexlify(parts[0]) - commit["user"] = parts[1] - tstamp, tz, *extra = parts[2].split(b" ") - commit["time"] = datetime.fromtimestamp(float(tstamp)) - commit["time_offset_seconds"] = int(tz) - if extra: - commit["extra"] = b" ".join(extra) - commit["changed_files"] = parts[3:] - return commit - - def skip_sections(self, num_sections=1): - """Skip past sections quickly. - - args: - num_sections: int number of sections to skip - """ - for i in range(num_sections): - size = unpack(">I", self.filereader) - while size >= 104: - self.filereader.seek(size - 4, from_current=True) - size = unpack(">I", self.filereader) - - def apply_revdata(self, revdata_it, prev_state): - """Compose the complete text body for a change from component deltas. - - args: - revdata_it: output from the revdata_iterator method - prev_state: bytestring the base complete text on which the new - deltas will be applied - returns: - (bytestring, list, list) the new complete string and lists of added - and removed components (used in manifest processing) - """ - state = [] - added = [] - removed = [] - next_start = 0 - - for delta_start, delta_end, rev_diff_it in revdata_it: - removed.append(prev_state[delta_start:delta_end]) - added.append(b"".join(rev_diff_it)) - - state.append(prev_state[next_start:delta_start]) - state.append(added[-1]) - next_start = delta_end - - state.append(prev_state[next_start:]) - state = b"".join(state) - - return (state, added, removed) - - def skim_headers(self): - """Get all header data from a change group but bypass processing of the - contained delta components. - - yields: - output of read_chunk_header method for all chunks in the group - """ - size = unpack(">I", self.filereader) - 104 - while size >= 0: - header = self.read_chunk_header() - self.filereader.seek(size, from_current=True) - yield header - size = unpack(">I", self.filereader) - 104 - - def group_iterator(self): - """Bundle sections are called groups. These are composed of one or more - revision chunks of delta components. Iterate over all the chunks in a - group and hand each one back. - - yields: - see output of read_revchunk method - """ - revchunk = self.read_revchunk() - while revchunk: # A group is terminated by a NullChunk - yield revchunk # (header, revdata_iterator) - revchunk = self.read_revchunk() - - def yield_group_objects(self, cache_hints=None, group_offset=None): - """Bundles are sectioned into groups: the log of all commits, the log - of all manifest changes, and a series of logs of blob changes (one for - each file). All groups are structured the same way, as a series of - revisions each with a series of delta components. Iterate over the - current group and return the completed object data for the current - update by applying all of the internal delta components to each prior - revision. - - args: - cache_hints: see build_cache_hints (this will be built - automatically if not pre-built and passed in) - group_offset: int file position of the start of the desired group - - yields: - (dict, bytestring, list, list) the output from read_chunk_header - followed by the output from apply_revdata - """ - if group_offset is not None: - self.filereader.seek(group_offset) - - if cache_hints is None: - cache_hints = self.build_cache_hints() - - data_cache = SelectiveCache( - max_size=self.cache_size, - cache_hints=cache_hints, - filename=self.cache_filename, - ) - - # Loop over all revisions in the group - data = b"" - for header, revdata_it in self.group_iterator(): - node = header["node"] - basenode = header["basenode"] - - data = data_cache.fetch(basenode) or b"" - - data, added, removed = self.apply_revdata(revdata_it, data) - - data_cache.store(node, data) - - yield (header, data, added, removed) # each RevChunk - - def extract_meta_from_blob(self, data): - """File revision data sometimes begins with a metadata section of - dubious value. Strip it off and maybe decode it. It seems to be mostly - useless. Why indicate that a file node is a copy of another node? You - can already get that information from the delta header. - - args: - data: bytestring of one revision of a file, possibly with metadata - embedded at the start - - returns: - (bytestring, dict) of (the blob data, the meta information) - """ - meta = {} - if data.startswith(b"\x01\n"): - empty, metainfo, data = data.split(b"\x01\n", 2) - metainfo = b"\x01\n" + metainfo + b"\x01\n" - if metainfo.startswith(b"copy:"): - # direct file copy (?) - copyinfo = metainfo.split(b"\n") - meta["copied_file"] = copyinfo[0][6:] - meta["copied_rev"] = copyinfo[1][9:] - elif metainfo.startswith(b"censored:"): - # censored revision deltas must be full-replacements (?) - meta["censored"] = metainfo - else: - # no idea - meta["text"] = metainfo - return data, meta - - def seek_changelog(self): - """Seek to the beginning of the change logs section. - """ - self.filereader.seek(self.changes_offset) - - def seek_manifests(self): - """Seek to the beginning of the manifests section. - """ - if self.manifests_offset is None: - self.seek_changelog() - self.skip_sections(1) # skip past commits - self.manifests_offset = self.filereader.tell() - else: - self.filereader.seek(self.manifests_offset) - - def seek_filelist(self): - """Seek to the beginning of the file changes section. - """ - if self.blobs_offset is None: - self.seek_manifests() - self.skip_sections(1) # skip past manifests - self.blobs_offset = self.filereader.tell() - else: - self.filereader.seek(self.blobs_offset) - - def yield_all_blobs(self): - """Gets blob data from the bundle. - - yields: - (bytestring, (bytestring, int, dict)) of - (blob data, (file name, start offset of the file within the - bundle, node header)) - """ - self.seek_filelist() - - # Loop through all files that have commits - size = unpack(">I", self.filereader) - while size > 0: - file_name = self.filereader.read(size - 4) - file_start_offset = self.filereader.tell() - # get all of the blobs for each file - for header, data, *_ in self.yield_group_objects(): - blob, meta = self.extract_meta_from_blob(data) - yield blob, (file_name, file_start_offset, header) - size = unpack(">I", self.filereader) - - def yield_all_changesets(self): - """Gets commit data from the bundle. - - yields: - (dict, dict) of (read_chunk_header output, - extract_commit_metadata output) - """ - self.seek_changelog() - for header, data, *_ in self.yield_group_objects(): - changeset = self.extract_commit_metadata(data) - yield (header, changeset) - - def yield_all_manifest_deltas(self, cache_hints=None): - """Gets manifest data from the bundle. - In order to process the manifests in a reasonable amount of time, we - want to use only the deltas and not the entire manifest at each change, - because if we're processing them in sequential order (we are) then we - already have the previous state so we only need the changes. - - args: - cache_hints: see build_cache_hints method - - yields: - (dict, dict, dict) of (read_chunk_header output, - extract_manifest_elements output on added/modified files, - extract_manifest_elements on removed files) - """ - self.seek_manifests() - for header, data, added, removed in self.yield_group_objects( - cache_hints=cache_hints - ): - added = self.extract_manifest_elements(added) - removed = self.extract_manifest_elements(removed) - yield (header, added, removed) - - def build_manifest_hints(self): - """Just a minor abstraction shortcut for the build_cache_hints method. - - returns: - see build_cache_hints method - """ - self.seek_manifests() - return self.build_cache_hints() - - def build_cache_hints(self): - """The SelectiveCache class that we use in building nodes can accept a - set of key counters that makes its memory usage much more efficient. - - returns: - dict of key=a node id, value=the number of times we - will need data from that node when building subsequent nodes - """ - cur_pos = self.filereader.tell() - hints = OrderedDict() - prev_node = None - for header in self.skim_headers(): - basenode = header["basenode"] - if (basenode != self.NAUGHT_NODE) and (basenode != prev_node): - # If the base isn't immediately prior, then cache it once more. - hints[basenode] = hints.get(basenode, 0) + 1 - prev_node = header["node"] - self.filereader.seek(cur_pos) - return hints - - def extract_manifest_elements(self, data): - """Parses data that looks like a manifest. In practice we only pass in - the bits extracted from the application of a manifest delta describing - which files were added/modified or which ones were removed. - - args: - data: either a string or a list of strings that, when joined, - embodies the composition of a manifest. - - This takes the form - of repetitions of (without the brackets):: - - b'\x00[flag]\\n' ...repeat... - - where ``[flag]`` may or may not be there depending on - whether the file is specially flagged as executable or - something - - returns: - dict: ``{file_path: (file_node, permissions), ...}`` where - permissions is given according to the flag that optionally exists - in the data - """ - elements = {} - if isinstance(data, str): - data = data.split(b"\n") - else: - data = itertools.chain.from_iterable([chunk.split(b"\n") for chunk in data]) - - for line in data: - if line != b"": - f = line.split(b"\x00") - - node = f[1] - flag_bytes = node[40:] - - elements[f[0]] = ( - unhexlify(node[:40]), - b"l" in flag_bytes, - b"755" if (b"x" in flag_bytes) else b"644", - ) - - return elements diff --git a/swh/loader/mercurial/chunked_reader.py b/swh/loader/mercurial/chunked_reader.py deleted file mode 100644 --- a/swh/loader/mercurial/chunked_reader.py +++ /dev/null @@ -1,105 +0,0 @@ -# Copyright (C) 2017 The Software Heritage developers -# See the AUTHORS file at the top-level directory of this distribution -# License: GNU General Public License version 3, or any later version -# See top-level LICENSE file for more information - -import struct - - -class ChunkedFileReader(object): - """A binary stream reader that gives seamless read access to Mercurial's - bundle2 HG20 format which is partitioned for some reason at the file level - into chunks of [4Bytes:, Bytes:] as if it were - encoding transport packets. - - args: - file: rb file handle pre-aligned to the start of the chunked portion - size_unpack_fmt: struct format string for unpacking the next chunk size - """ - - def __init__(self, file, size_unpack_fmt=">I"): - self._file = file - self._size_pattern = size_unpack_fmt - self._size_bytes = struct.calcsize(size_unpack_fmt) - self._bytes_per_chunk = self._chunk_size(True) - self._chunk_bytes_left = self._bytes_per_chunk - self._offset = self._file.tell() - # find the file size - self._file.seek(0, 2) # seek to end - self._size = self._file.tell() - self._file.seek(self._offset, 0) # seek back to original position - - def _chunk_size(self, first_time=False): - """Unpack the next bytes from the file to get the next file chunk size. - """ - size = struct.unpack(self._size_pattern, self._file.read(self._size_bytes))[0] - return size - - def size(self): - """Returns the file size in bytes. - """ - return self._size - - def read(self, bytes_to_read): - """Return N bytes from the file as a single block. - - args: - bytes_to_read: int number of bytes of content - """ - return b"".join(self.read_iterator(bytes_to_read)) - - def read_iterator(self, bytes_to_read): - """Return a generator that yields N bytes from the file one file chunk - at a time. - - args: - bytes_to_read: int number of bytes of content - """ - while bytes_to_read > self._chunk_bytes_left: - yield self._file.read(self._chunk_bytes_left) - bytes_to_read -= self._chunk_bytes_left - self._chunk_bytes_left = self._chunk_size() - self._chunk_bytes_left -= bytes_to_read - yield self._file.read(bytes_to_read) - - def seek(self, new_pos=None, from_current=False): - """Wraps the underlying file seek, additionally updating the - chunk_bytes_left counter appropriately so that we can start reading - from the new location. - - args: - - new_pos: new cursor byte position - from_current: if True, it treats new_pos as an offset from the - current cursor position, bypassing any chunk boundaries as if - they weren't there. This should give the same end position as a - read except without the reading data part. - """ - if from_current: - new_pos = new_pos or 0 # None -> current - if new_pos <= self._chunk_bytes_left: - new_pos += self._file.tell() - else: - new_pos += ( - self._file.tell() - + self._size_bytes - + ( - ( - (new_pos - self._chunk_bytes_left - 1) # aliasing - // self._bytes_per_chunk - ) - * self._size_bytes - ) - ) - else: - new_pos = new_pos or self._offset # None -> start position - - self._chunk_bytes_left = self._bytes_per_chunk - ( - (new_pos - self._offset) % (self._bytes_per_chunk + self._size_bytes) - ) - self._file.seek(new_pos) - - def __getattr__(self, item): - """Forward other calls to the underlying file object. - """ - return getattr(self._file, item) diff --git a/swh/loader/mercurial/cli.py b/swh/loader/mercurial/cli.py deleted file mode 100644 --- a/swh/loader/mercurial/cli.py +++ /dev/null @@ -1,66 +0,0 @@ -# Copyright (C) 2018-2021 The Software Heritage developers -# See the AUTHORS file at the top-level directory of this distribution -# License: GNU General Public License version 3, or any later version -# See top-level LICENSE file for more information - -from itertools import chain -import logging - -import click -from deprecated import deprecated - -from swh.loader.mercurial.utils import parse_visit_date - -LOGLEVELS = list( - chain.from_iterable( - (logging._levelToName[lvl], logging._levelToName[lvl].lower()) - for lvl in sorted(logging._levelToName.keys()) - ) -) - - -@click.command() -@click.argument("origin-url") -@click.option( - "--hg-directory", - "-d", - help=( - "Path to the hg (local) directory to load from. " - "If unset, the hg repo will be cloned from the " - "given (origin) url." - ), -) -@click.option("--hg-archive", "-a", help=("Path to the hg archive file to load from.")) -@click.option("--visit-date", "-D", help="Visit date (defaults to now).") -@click.option("--log-level", "-l", type=click.Choice(LOGLEVELS), help="Log level.") -@deprecated( - version="0.4.0", reason="Use `swh loader run mercurial|mercurial_from_disk` instead" -) -def main( - origin_url, hg_directory=None, hg_archive=None, visit_date=None, log_level=None -): - from swh.storage import get_storage - - logging.basicConfig( - level=(log_level or "DEBUG").upper(), - format="%(asctime)s %(process)d %(message)s", - ) - - visit_date = parse_visit_date(visit_date or "now") - - kwargs = {"visit_date": visit_date, "url": origin_url} - if hg_archive: - from .loader import HgArchiveBundle20Loader as HgLoader - - kwargs["archive_path"] = hg_archive - else: - from .loader import HgBundle20Loader as HgLoader - - kwargs["directory"] = hg_directory - - storage = get_storage(cls="memory") - return HgLoader(storage, **kwargs).load() - - -if __name__ == "__main__": - main() diff --git a/swh/loader/mercurial/loader.py b/swh/loader/mercurial/loader.py deleted file mode 100644 --- a/swh/loader/mercurial/loader.py +++ /dev/null @@ -1,692 +0,0 @@ -# Copyright (C) 2017-2021 The Software Heritage developers -# See the AUTHORS file at the top-level directory of this distribution -# License: GNU General Public License version 3, or any later version -# See top-level LICENSE file for more information - -"""This document contains a SWH loader for ingesting repository data -from Mercurial version 2 bundle files. - -""" - -# NOTE: The code here does expensive work twice in places because of the -# intermediate need to check for what is missing before sending to the database -# and the desire to not juggle very large amounts of data. - -# TODO: Decide whether to also serialize to disk and read back more quickly -# from there. Maybe only for very large repos and fast drives. -# - Avi - - -import datetime -import os -from queue import Empty -import random -import re -from shutil import rmtree -from tempfile import mkdtemp -import time -from typing import Any, Dict, Iterable, List, Optional - -import billiard -import hglib -from hglib.error import CommandError - -from swh.loader.core.loader import DVCSLoader -from swh.loader.core.utils import clean_dangling_folders -from swh.loader.exception import NotFound -from swh.loader.mercurial.utils import get_minimum_env -from swh.model import identifiers -from swh.model.hashutil import ( - DEFAULT_ALGORITHMS, - MultiHash, - hash_to_bytehex, - hash_to_bytes, -) -from swh.model.model import ( - BaseContent, - Content, - Directory, - ExtID, - ObjectType, - Origin, - Person, - Release, - Revision, - RevisionType, - Sha1Git, - SkippedContent, - Snapshot, - SnapshotBranch, - TargetType, - TimestampWithTimezone, -) -from swh.storage.algos.origin import origin_get_latest_visit_status -from swh.storage.interface import StorageInterface - -from . import converters -from .archive_extract import tmp_extract -from .bundle20_reader import Bundle20Reader -from .converters import PRIMARY_ALGO as ALGO -from .objects import SelectiveCache, SimpleTree - -TAG_PATTERN = re.compile("[0-9A-Fa-f]{40}") - -TEMPORARY_DIR_PREFIX_PATTERN = "swh.loader.mercurial." - -HEAD_POINTER_NAME = b"tip" - -EXTID_TYPE = "hg-nodeid" - - -class CommandErrorWrapper(Exception): - """This exception is raised in place of a 'CommandError' - exception (raised by the underlying hglib library) - - This is needed because billiard.Queue is serializing the - queued object and as CommandError doesn't have a constructor without - parameters, the deserialization is failing - """ - - def __init__(self, err: Optional[bytes]): - self.err = err - - -class CloneTimeoutError(Exception): - pass - - -class HgBundle20Loader(DVCSLoader): - """Mercurial loader able to deal with remote or local repository. - - """ - - visit_type = "hg" - - def __init__( - self, - storage: StorageInterface, - url: str, - visit_date: Optional[datetime.datetime] = None, - directory: Optional[str] = None, - logging_class="swh.loader.mercurial.Bundle20Loader", - bundle_filename: Optional[str] = "HG20_none_bundle", - reduce_effort: bool = False, - temp_directory: str = "/tmp", - cache1_size: int = 800 * 1024 * 1024, - cache2_size: int = 800 * 1024 * 1024, - clone_timeout_seconds: int = 7200, - save_data_path: Optional[str] = None, - max_content_size: Optional[int] = None, - ): - super().__init__( - storage=storage, - logging_class=logging_class, - save_data_path=save_data_path, - max_content_size=max_content_size, - ) - self.origin_url = url - self.visit_date = visit_date - self.directory = directory - self.bundle_filename = bundle_filename - self.reduce_effort_flag = reduce_effort - self.empty_repository = None - self.temp_directory = temp_directory - self.cache1_size = cache1_size - self.cache2_size = cache2_size - self.clone_timeout = clone_timeout_seconds - self.working_directory = None - self.bundle_path = None - self.heads: Dict[bytes, Any] = {} - self.releases: Dict[bytes, Any] = {} - self.last_snapshot_id: Optional[bytes] = None - self.old_environ = os.environ.copy() - os.environ.clear() - os.environ.update(get_minimum_env()) - - def pre_cleanup(self): - """Cleanup potential dangling files from prior runs (e.g. OOM killed - tasks) - - """ - clean_dangling_folders( - self.temp_directory, - pattern_check=TEMPORARY_DIR_PREFIX_PATTERN, - log=self.log, - ) - - def cleanup(self): - """Clean temporary working directory - - """ - os.environ.clear() - os.environ.update(self.old_environ) - if self.bundle_path and os.path.exists(self.bundle_path): - self.log.debug("Cleanup up working bundle %s" % self.bundle_path) - os.unlink(self.bundle_path) - if self.working_directory and os.path.exists(self.working_directory): - self.log.debug( - "Cleanup up working directory %s" % (self.working_directory,) - ) - rmtree(self.working_directory) - - def get_heads(self, repo): - """Read the closed branches heads (branch, bookmarks) and returns a - dict with key the branch_name (bytes) and values the tuple - (pointer nature (bytes), mercurial's node id - (bytes)). Those needs conversion to swh-ids. This is taken - care of in get_revisions. - - """ - b = {} - for _, node_hash_id, pointer_nature, branch_name, *_ in repo.heads(): - b[branch_name] = (pointer_nature, hash_to_bytes(node_hash_id.decode())) - - bookmarks = repo.bookmarks() - if bookmarks and bookmarks[0]: - for bookmark_name, _, target_short in bookmarks[0]: - target = repo[target_short].node() - b[bookmark_name] = (None, hash_to_bytes(target.decode())) - - return b - - def prepare_origin_visit(self) -> None: - self.origin = Origin(url=self.origin_url) - visit_status = origin_get_latest_visit_status( - self.storage, self.origin_url, require_snapshot=True - ) - self.last_snapshot_id = None if visit_status is None else visit_status.snapshot - - @staticmethod - def clone_with_timeout(log, origin, destination, timeout): - queue = billiard.Queue() - start = time.monotonic() - - def do_clone(queue, origin, destination): - try: - result = hglib.clone(source=origin, dest=destination, noupdate=True) - except CommandError as e: - # the queued object need an empty constructor to be deserialized later - queue.put(CommandErrorWrapper(e.err)) - except BaseException as e: - queue.put(e) - else: - queue.put(result) - - process = billiard.Process(target=do_clone, args=(queue, origin, destination)) - process.start() - - while True: - try: - result = queue.get(timeout=0.1) - break - except Empty: - duration = time.monotonic() - start - if timeout and duration > timeout: - log.warning( - "Timeout cloning `%s` within %s seconds", origin, timeout - ) - process.terminate() - process.join() - raise CloneTimeoutError(origin, timeout) - continue - - process.join() - - if isinstance(result, Exception): - raise result from None - - return result - - def prepare(self): - """Prepare the necessary steps to load an actual remote or local - repository. - - To load a local repository, pass the optional directory - parameter as filled with a path to a real local folder. - - To load a remote repository, pass the optional directory - parameter as None. - - Args: - origin_url (str): Origin url to load - visit_date (str/datetime): Date of the visit - directory (str/None): The local directory to load - - """ - self.branches = {} - self.tags = [] - self.releases = {} - self.node_2_rev = {} - self.heads = {} - self.extids = [] - - directory = self.directory - - if not directory: # remote repository - self.working_directory = mkdtemp( - prefix=TEMPORARY_DIR_PREFIX_PATTERN, - suffix="-%s" % os.getpid(), - dir=self.temp_directory, - ) - os.makedirs(self.working_directory, exist_ok=True) - self.hgdir = self.working_directory - - self.log.debug( - "Cloning %s to %s with timeout %s seconds", - self.origin_url, - self.hgdir, - self.clone_timeout, - ) - - try: - self.clone_with_timeout( - self.log, self.origin_url, self.hgdir, self.clone_timeout - ) - except CommandErrorWrapper as e: - for msg in [ - b"does not appear to be an hg repository", - b"404: not found", - b"name or service not known", - ]: - if msg in e.err.lower(): - raise NotFound(e.args[0]) from None - raise e - - else: # local repository - self.working_directory = None - self.hgdir = directory - - self.bundle_path = os.path.join(self.hgdir, self.bundle_filename) - self.log.debug("Bundling at %s" % self.bundle_path) - - with hglib.open(self.hgdir) as repo: - self.heads = self.get_heads(repo) - repo.bundle(bytes(self.bundle_path, "utf-8"), all=True, type=b"none-v2") - - self.cache_filename1 = os.path.join( - self.hgdir, "swh-cache-1-%s" % (hex(random.randint(0, 0xFFFFFF))[2:],) - ) - self.cache_filename2 = os.path.join( - self.hgdir, "swh-cache-2-%s" % (hex(random.randint(0, 0xFFFFFF))[2:],) - ) - - try: - self.br = Bundle20Reader( - bundlefile=self.bundle_path, - cache_filename=self.cache_filename1, - cache_size=self.cache1_size, - ) - except FileNotFoundError: - # Empty repository! Still a successful visit targeting an - # empty snapshot - self.log.warn("%s is an empty repository!" % self.hgdir) - self.empty_repository = True - else: - self.reduce_effort = set() - if self.reduce_effort_flag: - now = datetime.datetime.now(tz=datetime.timezone.utc) - if (now - self.visit_date).days > 1: - # Assuming that self.visit_date would be today for - # a new visit, treat older visit dates as - # indication of wanting to skip some processing - # effort. - for header, commit in self.br.yield_all_changesets(): - ts = commit["time"].timestamp() - if ts < self.visit_date.timestamp(): - self.reduce_effort.add(header["node"]) - - def has_contents(self): - return not self.empty_repository - - def has_directories(self): - return not self.empty_repository - - def has_revisions(self): - return not self.empty_repository - - def has_releases(self): - return not self.empty_repository - - def fetch_data(self): - """Fetch the data from the data source.""" - pass - - def get_contents(self) -> Iterable[BaseContent]: - """Get the contents that need to be loaded.""" - - # NOTE: This method generates blobs twice to reduce memory usage - # without generating disk writes. - self.file_node_to_hash = {} - hash_to_info = {} - self.num_contents = 0 - contents: Dict[bytes, BaseContent] = {} - missing_contents = set() - - for blob, node_info in self.br.yield_all_blobs(): - self.num_contents += 1 - file_name = node_info[0] - header = node_info[2] - - length = len(blob) - if header["linknode"] in self.reduce_effort: - algorithms = set([ALGO]) - else: - algorithms = DEFAULT_ALGORITHMS - h = MultiHash.from_data(blob, hash_names=algorithms) - content = h.digest() - content["length"] = length - blob_hash = content[ALGO] - self.file_node_to_hash[header["node"]] = blob_hash - - if header["linknode"] in self.reduce_effort: - continue - - hash_to_info[blob_hash] = node_info - if self.max_content_size is not None and length >= self.max_content_size: - contents[blob_hash] = SkippedContent( - status="absent", reason="Content too large", **content - ) - else: - contents[blob_hash] = Content(data=blob, status="visible", **content) - - if file_name == b".hgtags": - # https://www.mercurial-scm.org/wiki/GitConcepts#Tag_model - # overwrite until the last one - self.tags = (t for t in blob.split(b"\n") if t != b"") - - if contents: - missing_contents = set( - self.storage.content_missing( - [c.to_dict() for c in contents.values()], key_hash=ALGO - ) - ) - - # Clusters needed blobs by file offset and then only fetches the - # groups at the needed offsets. - focs: Dict[int, Dict[bytes, bytes]] = {} # "file/offset/contents" - for blob_hash in missing_contents: - _, file_offset, header = hash_to_info[blob_hash] - focs.setdefault(file_offset, {}) - focs[file_offset][header["node"]] = blob_hash - - for offset, node_hashes in sorted(focs.items()): - for header, data, *_ in self.br.yield_group_objects(group_offset=offset): - node = header["node"] - if node in node_hashes: - blob, meta = self.br.extract_meta_from_blob(data) - content = contents.pop(node_hashes[node], None) - if content: - if ( - self.max_content_size is not None - and len(blob) >= self.max_content_size - ): - yield SkippedContent.from_data( - blob, reason="Content too large" - ) - else: - yield Content.from_data(blob) - - def load_directories(self): - """This is where the work is done to convert manifest deltas from the - repository bundle into SWH directories. - - """ - self.mnode_to_tree_id = {} - cache_hints = self.br.build_manifest_hints() - - def tree_size(t): - return t.size() - - self.trees = SelectiveCache( - cache_hints=cache_hints, - size_function=tree_size, - filename=self.cache_filename2, - max_size=self.cache2_size, - ) - - tree = SimpleTree() - for header, added, removed in self.br.yield_all_manifest_deltas(cache_hints): - node = header["node"] - basenode = header["basenode"] - tree = self.trees.fetch(basenode) or tree # working tree - - for path in removed.keys(): - tree = tree.remove_tree_node_for_path(path) - for path, info in added.items(): - file_node, is_symlink, perms_code = info - tree = tree.add_blob( - path, self.file_node_to_hash[file_node], is_symlink, perms_code - ) - - if header["linknode"] in self.reduce_effort: - self.trees.store(node, tree) - else: - new_dirs = [] - self.mnode_to_tree_id[node] = tree.hash_changed(new_dirs) - self.trees.store(node, tree) - yield header, tree, new_dirs - - def get_directories(self) -> Iterable[Directory]: - """Compute directories to load - - """ - dirs: Dict[Sha1Git, Directory] = {} - self.num_directories = 0 - for _, _, new_dirs in self.load_directories(): - for d in new_dirs: - self.num_directories += 1 - dirs[d["id"]] = Directory.from_dict(d) - - missing_dirs: List[Sha1Git] = list(dirs.keys()) - if missing_dirs: - missing_dirs = list(self.storage.directory_missing(missing_dirs)) - - for _id in missing_dirs: - yield dirs[_id] - - def get_revisions(self) -> Iterable[Revision]: - """Compute revisions to load - - """ - revisions = {} - self.num_revisions = 0 - for header, commit in self.br.yield_all_changesets(): - if header["node"] in self.reduce_effort: - continue - - self.num_revisions += 1 - date_dict = identifiers.normalize_timestamp(int(commit["time"].timestamp())) - author_dict = converters.parse_author(commit["user"]) - if commit["manifest"] == Bundle20Reader.NAUGHT_NODE: - directory_id = SimpleTree().hash_changed() - else: - directory_id = self.mnode_to_tree_id[commit["manifest"]] - - extra_headers = [ - ( - b"time_offset_seconds", - str(commit["time_offset_seconds"]).encode("utf-8"), - ) - ] - extra = commit.get("extra") - if extra: - for e in extra.split(b"\x00"): - k, v = e.split(b":", 1) - # transplant_source stores binary reference to a changeset - # prefer to dump hexadecimal one in the revision metadata - if k == b"transplant_source": - v = hash_to_bytehex(v) - extra_headers.append((k, v)) - - parents = [] - p1 = self.node_2_rev.get(header["p1"]) - p2 = self.node_2_rev.get(header["p2"]) - if p1: - parents.append(p1) - if p2: - parents.append(p2) - - revision = Revision( - author=Person.from_dict(author_dict), - date=TimestampWithTimezone.from_dict(date_dict), - committer=Person.from_dict(author_dict), - committer_date=TimestampWithTimezone.from_dict(date_dict), - type=RevisionType.MERCURIAL, - directory=directory_id, - message=commit["message"], - extra_headers=tuple(extra_headers), - synthetic=False, - parents=tuple(parents), - ) - - self.node_2_rev[header["node"]] = revision.id - revisions[revision.id] = revision - - revision_swhid = identifiers.CoreSWHID( - object_type=identifiers.ObjectType.REVISION, object_id=revision.id, - ) - self.extids.append( - ExtID( - extid_type=EXTID_TYPE, extid=header["node"], target=revision_swhid - ) - ) - - # Converts heads to use swh ids - self.heads = { - branch_name: (pointer_nature, self.node_2_rev[node_id]) - for branch_name, (pointer_nature, node_id) in self.heads.items() - } - - missing_revs = set(revisions.keys()) - if missing_revs: - missing_revs = set(self.storage.revision_missing(list(missing_revs))) - - for rev in missing_revs: - yield revisions[rev] - self.mnode_to_tree_id = None - - def _read_tag(self, tag, split_byte=b" "): - node, *name = tag.split(split_byte) - name = split_byte.join(name) - return node, name - - def get_releases(self) -> Iterable[Release]: - """Get the releases that need to be loaded.""" - self.num_releases = 0 - releases = {} - missing_releases = set() - for t in self.tags: - self.num_releases += 1 - node, name = self._read_tag(t) - node = node.decode() - node_bytes = hash_to_bytes(node) - if not TAG_PATTERN.match(node): - self.log.warn("Wrong pattern (%s) found in tags. Skipping" % (node,)) - continue - if node_bytes not in self.node_2_rev: - self.log.warn( - "No matching revision for tag %s " - "(hg changeset: %s). Skipping" % (name.decode(), node) - ) - continue - tgt_rev = self.node_2_rev[node_bytes] - release = Release( - name=name, - target=tgt_rev, - target_type=ObjectType.REVISION, - message=None, - metadata=None, - synthetic=False, - author=Person(name=None, email=None, fullname=b""), - date=None, - ) - missing_releases.add(release.id) - releases[release.id] = release - self.releases[name] = release.id - - if missing_releases: - missing_releases = set(self.storage.release_missing(list(missing_releases))) - - for _id in missing_releases: - yield releases[_id] - - def get_snapshot(self) -> Snapshot: - """Get the snapshot that need to be loaded.""" - branches: Dict[bytes, Optional[SnapshotBranch]] = {} - for name, (pointer_nature, target) in self.heads.items(): - branches[name] = SnapshotBranch( - target=target, target_type=TargetType.REVISION - ) - if pointer_nature == HEAD_POINTER_NAME: - branches[b"HEAD"] = SnapshotBranch( - target=name, target_type=TargetType.ALIAS - ) - for name, target in self.releases.items(): - branches[name] = SnapshotBranch( - target=target, target_type=TargetType.RELEASE - ) - - self.snapshot = Snapshot(branches=branches) - return self.snapshot - - def store_data(self) -> None: - super().store_data() - self.storage.extid_add(self.extids) - - def get_fetch_history_result(self): - """Return the data to store in fetch_history.""" - return { - "contents": self.num_contents, - "directories": self.num_directories, - "revisions": self.num_revisions, - "releases": self.num_releases, - } - - def load_status(self): - snapshot = self.get_snapshot() - load_status = "eventful" - if self.last_snapshot_id is not None and self.last_snapshot_id == snapshot.id: - load_status = "uneventful" - return { - "status": load_status, - } - - -class HgArchiveBundle20Loader(HgBundle20Loader): - """Mercurial loader for repository wrapped within archives. - - """ - - def __init__( - self, - storage: StorageInterface, - url: str, - visit_date: Optional[datetime.datetime] = None, - archive_path=None, - temp_directory: str = "/tmp", - max_content_size: Optional[int] = None, - ): - super().__init__( - storage=storage, - url=url, - visit_date=visit_date, - logging_class="swh.loader.mercurial.HgArchiveBundle20Loader", - temp_directory=temp_directory, - max_content_size=max_content_size, - ) - self.archive_extract_temp_dir = None - self.archive_path = archive_path - - def prepare(self): - self.archive_extract_temp_dir = tmp_extract( - archive=self.archive_path, - dir=self.temp_directory, - prefix=TEMPORARY_DIR_PREFIX_PATTERN, - suffix=".dump-%s" % os.getpid(), - log=self.log, - source=self.origin_url, - ) - - repo_name = os.listdir(self.archive_extract_temp_dir)[0] - self.directory = os.path.join(self.archive_extract_temp_dir, repo_name) - super().prepare() diff --git a/swh/loader/mercurial/tasks.py b/swh/loader/mercurial/tasks.py deleted file mode 100644 --- a/swh/loader/mercurial/tasks.py +++ /dev/null @@ -1,44 +0,0 @@ -# Copyright (C) 2017-2021 The Software Heritage developers -# See the AUTHORS file at the top-level directory of this distribution -# License: GNU General Public License version 3, or any later version -# See top-level LICENSE file for more information - -from typing import Optional - -from celery import shared_task - -from swh.loader.mercurial.utils import parse_visit_date - -from .from_disk import HgArchiveLoaderFromDisk, HgLoaderFromDisk - - -@shared_task(name=__name__ + ".LoadMercurial") -def load_hg( - *, url: str, directory: Optional[str] = None, visit_date: Optional[str] = None -): - """Mercurial repository loading - - Import a mercurial tarball into swh. - - Args: see :func:`HgLoaderFromDisk.load`. - - """ - - loader = HgLoaderFromDisk.from_configfile( - url=url, directory=directory, visit_date=parse_visit_date(visit_date) - ) - return loader.load() - - -@shared_task(name=__name__ + ".LoadArchiveMercurial") -def load_hg_from_archive( - *, url: str, archive_path: Optional[str] = None, visit_date: Optional[str] = None -): - """Import a mercurial tarball into swh. - - Args: see :func:`HgArchiveLoaderFromDisk.load`. - """ - loader = HgArchiveLoaderFromDisk.from_configfile( - url=url, archive_path=archive_path, visit_date=parse_visit_date(visit_date) - ) - return loader.load() diff --git a/swh/loader/mercurial/tests/test_from_disk.py b/swh/loader/mercurial/tests/test_from_disk.py --- a/swh/loader/mercurial/tests/test_from_disk.py +++ b/swh/loader/mercurial/tests/test_from_disk.py @@ -12,7 +12,6 @@ import attr import pytest -from swh.loader.mercurial.loader import HgBundle20Loader from swh.loader.mercurial.utils import parse_visit_date from swh.loader.tests import ( assert_last_visit_matches, @@ -420,47 +419,6 @@ assert get_stats(loader.storage) == {**expected_stats, "origin_visit": 2 + 1} -def test_old_loader_new_loader(swh_storage, datadir, tmp_path): - archive_name = "example" - archive_path = os.path.join(datadir, f"{archive_name}.tgz") - repo_url = prepare_repository_from_archive(archive_path, archive_name, tmp_path) - repo_path = repo_url.replace("file://", "") - - old_loader = HgBundle20Loader(swh_storage, repo_path) - assert old_loader.load() == {"status": "eventful"} - - expected_stats = { - "content": 7, - "directory": 16, - "origin": 1, - "origin_visit": 1, - "release": 0, - "revision": 9, - "skipped_content": 0, - "snapshot": 1, - } - assert get_stats(old_loader.storage) == expected_stats - - # Will pick up more branches, hence a different snapshot - loader = HgLoaderFromDisk(swh_storage, repo_path) - res = loader.load() - new_expected_stats = { - **expected_stats, - "origin_visit": 2, - "snapshot": 2, - } - assert get_stats(loader.storage) == new_expected_stats - assert res == {"status": "eventful"} - - # Shouldn't pick up anything now - loader = HgLoaderFromDisk(swh_storage, repo_path) - assert loader.load() == {"status": "uneventful"} - - # Shouldn't pick up anything either after another load - loader = HgLoaderFromDisk(swh_storage, repo_path) - assert loader.load() == {"status": "uneventful"} - - def test_load_unchanged_repo__dangling_extid(swh_storage, datadir, tmp_path): """Checks the loader will load revisions targeted by an ExtID if the revisions are missing from the storage""" diff --git a/swh/loader/mercurial/tests/test_loader.py b/swh/loader/mercurial/tests/test_loader.py deleted file mode 100644 --- a/swh/loader/mercurial/tests/test_loader.py +++ /dev/null @@ -1,370 +0,0 @@ -# Copyright (C) 2018-2021 The Software Heritage developers -# See the AUTHORS file at the top-level directory of this distribution -# License: GNU General Public License version 3, or any later version -# See top-level LICENSE file for more information - -import copy -import logging -import os -import time - -import hglib -from hglib.error import CommandError -import pytest - -from swh.loader.mercurial.utils import parse_visit_date -from swh.loader.tests import ( - assert_last_visit_matches, - check_snapshot, - get_stats, - prepare_repository_from_archive, -) -from swh.model.hashutil import hash_to_bytes, hash_to_hex -from swh.model.identifiers import ObjectType -from swh.model.model import RevisionType, Snapshot, SnapshotBranch, TargetType -from swh.storage.algos.snapshot import snapshot_get_latest - -from ..loader import CloneTimeoutError, HgArchiveBundle20Loader, HgBundle20Loader - -VISIT_DATE = parse_visit_date("2016-05-03 15:16:32+00") -assert VISIT_DATE is not None - - -def test_loader_hg_new_visit_no_release(swh_storage, datadir, tmp_path): - """Eventful visit should yield 1 snapshot""" - archive_name = "the-sandbox" - archive_path = os.path.join(datadir, f"{archive_name}.tgz") - repo_url = prepare_repository_from_archive(archive_path, archive_name, tmp_path) - - loader = HgBundle20Loader(swh_storage, repo_url) - - assert loader.load() == {"status": "eventful"} - - tip_revision_develop = "a9c4534552df370f43f0ef97146f393ef2f2a08c" - tip_revision_default = "70e750bb046101fdced06f428e73fee471509c56" - expected_snapshot = Snapshot( - id=hash_to_bytes("3b8fe58e467deb7597b12a5fd3b2c096b8c02028"), - branches={ - b"develop": SnapshotBranch( - target=hash_to_bytes(tip_revision_develop), - target_type=TargetType.REVISION, - ), - b"default": SnapshotBranch( - target=hash_to_bytes(tip_revision_default), - target_type=TargetType.REVISION, - ), - b"HEAD": SnapshotBranch(target=b"develop", target_type=TargetType.ALIAS,), - }, - ) - - assert_last_visit_matches( - swh_storage, repo_url, status="full", type="hg", snapshot=expected_snapshot.id, - ) - check_snapshot(expected_snapshot, swh_storage) - - stats = get_stats(swh_storage) - assert stats == { - "content": 2, - "directory": 3, - "origin": 1, - "origin_visit": 1, - "release": 0, - "revision": 58, - "skipped_content": 0, - "snapshot": 1, - } - - # Ensure archive loader yields the same snapshot - loader2 = HgArchiveBundle20Loader( - swh_storage, url=archive_path, archive_path=archive_path, visit_date=VISIT_DATE, - ) - - actual_load_status = loader2.load() - assert actual_load_status == {"status": "eventful"} - - stats2 = get_stats(loader2.storage) - expected_stats = copy.deepcopy(stats) - expected_stats["origin"] += 1 - expected_stats["origin_visit"] += 1 - assert stats2 == expected_stats - - # That visit yields the same snapshot - assert_last_visit_matches( - loader2.storage, - archive_path, - status="full", - type="hg", - snapshot=expected_snapshot.id, - ) - - -def test_loader_hg_new_visit_with_release(swh_storage, datadir, tmp_path): - """Eventful visit with release should yield 1 snapshot""" - archive_name = "hello" - archive_path = os.path.join(datadir, f"{archive_name}.tgz") - repo_url = prepare_repository_from_archive(archive_path, archive_name, tmp_path) - - loader = HgBundle20Loader(swh_storage, url=repo_url, visit_date=VISIT_DATE,) - - actual_load_status = loader.load() - assert actual_load_status == {"status": "eventful"} - - # then - stats = get_stats(swh_storage) - assert stats == { - "content": 3, - "directory": 3, - "origin": 1, - "origin_visit": 1, - "release": 1, - "revision": 3, - "skipped_content": 0, - "snapshot": 1, - } - - # cf. test_loader.org for explaining from where those hashes - tip_release = hash_to_bytes("515c4d72e089404356d0f4b39d60f948b8999140") - release = swh_storage.release_get([tip_release])[0] - assert release is not None - - tip_revision_default = hash_to_bytes("c3dbe4fbeaaa98dd961834e4007edb3efb0e2a27") - revision = swh_storage.revision_get([tip_revision_default])[0] - assert revision is not None - - expected_snapshot = Snapshot( - id=hash_to_bytes("d35668e02e2ba4321dc951cd308cf883786f918a"), - branches={ - b"default": SnapshotBranch( - target=tip_revision_default, target_type=TargetType.REVISION, - ), - b"0.1": SnapshotBranch(target=tip_release, target_type=TargetType.RELEASE,), - b"HEAD": SnapshotBranch(target=b"default", target_type=TargetType.ALIAS,), - }, - ) - - check_snapshot(expected_snapshot, swh_storage) - assert_last_visit_matches( - swh_storage, - repo_url, - type=RevisionType.MERCURIAL.value, - status="full", - snapshot=expected_snapshot.id, - ) - - # Ensure archive loader yields the same snapshot - loader2 = HgArchiveBundle20Loader( - swh_storage, url=archive_path, archive_path=archive_path, visit_date=VISIT_DATE, - ) - - actual_load_status = loader2.load() - assert actual_load_status == {"status": "eventful"} - - stats2 = get_stats(loader2.storage) - expected_stats = copy.deepcopy(stats) - expected_stats["origin"] += 1 - expected_stats["origin_visit"] += 1 - assert stats2 == expected_stats - - # That visit yields the same snapshot - assert_last_visit_matches( - loader2.storage, - archive_path, - status="full", - type="hg", - snapshot=expected_snapshot.id, - ) - - -def test_visit_with_archive_decompression_failure(swh_storage, mocker, datadir): - """Failure to decompress should fail early, no data is ingested""" - mock_patoo = mocker.patch("swh.loader.mercurial.archive_extract.patoolib") - mock_patoo.side_effect = ValueError - - archive_name = "hello" - archive_path = os.path.join(datadir, f"{archive_name}.tgz") - - loader = HgArchiveBundle20Loader( - swh_storage, url=archive_path, visit_date=VISIT_DATE, - ) - - actual_load_status = loader.load() - assert actual_load_status == {"status": "failed"} - - stats = get_stats(swh_storage) - assert stats == { - "content": 0, - "directory": 0, - "origin": 1, - "origin_visit": 1, - "release": 0, - "revision": 0, - "skipped_content": 0, - "snapshot": 0, - } - # That visit yields the same snapshot - assert_last_visit_matches( - swh_storage, archive_path, status="failed", type="hg", snapshot=None - ) - - -def test_visit_error_with_snapshot_partial(swh_storage, datadir, tmp_path, mocker): - """Incomplete ingestion leads to a 'partial' ingestion status""" - mock = mocker.patch("swh.loader.mercurial.loader.HgBundle20Loader.store_metadata") - mock.side_effect = ValueError - - archive_name = "the-sandbox" - archive_path = os.path.join(datadir, f"{archive_name}.tgz") - repo_url = prepare_repository_from_archive(archive_path, archive_name, tmp_path) - - loader = HgBundle20Loader(swh_storage, repo_url) - - assert loader.load() == {"status": "failed"} - - assert_last_visit_matches( - swh_storage, - repo_url, - status="partial", - type="hg", - snapshot=hash_to_bytes("3b8fe58e467deb7597b12a5fd3b2c096b8c02028"), - ) - - -@pytest.mark.parametrize( - "error_msg", - [ - b"does not appear to be an HG repository", - b"404: Not Found", - b"404: NOT FOUND", - b"Name or service not known", - ], -) -def test_visit_error_with_status_not_found( - swh_storage, datadir, tmp_path, mocker, error_msg -): - """Not reaching the repo leads to a 'not_found' ingestion status""" - mock = mocker.patch("hglib.clone") - mock.side_effect = CommandError((), 255, b"", error_msg) - - archive_name = "the-sandbox" - archive_path = os.path.join(datadir, f"{archive_name}.tgz") - repo_url = prepare_repository_from_archive(archive_path, archive_name, tmp_path) - - loader = HgBundle20Loader(swh_storage, repo_url) - - assert loader.load() == {"status": "uneventful"} - - assert_last_visit_matches( - swh_storage, repo_url, status="not_found", type="hg", snapshot=None, - ) - - -def test_visit_error_with_clone_error(swh_storage, datadir, tmp_path, mocker): - """Testing failures other than 'not_found'""" - - mock = mocker.patch("hglib.clone") - mock.side_effect = CommandError((), 255, b"", b"out of disk space") - - archive_name = "the-sandbox" - archive_path = os.path.join(datadir, f"{archive_name}.tgz") - repo_url = prepare_repository_from_archive(archive_path, archive_name, tmp_path) - - loader = HgBundle20Loader(swh_storage, repo_url) - - assert loader.load() == {"status": "failed"} - - assert_last_visit_matches( - swh_storage, repo_url, status="failed", type="hg", snapshot=None, - ) - - -def test_visit_repository_with_transplant_operations(swh_storage, datadir, tmp_path): - """Visit a mercurial repository visit transplant operations within should yield a - snapshot as well. - - """ - - archive_name = "transplant" - archive_path = os.path.join(datadir, f"{archive_name}.tgz") - repo_url = prepare_repository_from_archive(archive_path, archive_name, tmp_path) - loader = HgBundle20Loader(swh_storage, url=repo_url, visit_date=VISIT_DATE,) - - # load hg repository - actual_load_status = loader.load() - assert actual_load_status == {"status": "eventful"} - - # collect swh revisions - assert_last_visit_matches( - swh_storage, repo_url, type=RevisionType.MERCURIAL.value, status="full" - ) - - revisions = [] - snapshot = snapshot_get_latest(swh_storage, repo_url) - for branch in snapshot.branches.values(): - if branch.target_type.value != "revision": - continue - revisions.append(branch.target) - - # extract original changesets info and the transplant sources - hg_changesets = set() - transplant_sources = set() - for rev in swh_storage.revision_log(revisions): - extids = list( - loader.storage.extid_get_from_target(ObjectType.REVISION, [rev["id"]]) - ) - assert len(extids) == 1 - hg_changesets.add(hash_to_hex(extids[0].extid)) - for k, v in rev["extra_headers"]: - if k == b"transplant_source": - transplant_sources.add(v.decode("ascii")) - - # check extracted data are valid - assert len(hg_changesets) > 0 - assert len(transplant_sources) > 0 - assert transplant_sources.issubset(hg_changesets) - - -def test_clone_with_timeout_timeout(caplog, tmp_path, monkeypatch): - log = logging.getLogger("test_clone_with_timeout") - - def clone_timeout(source, dest, *args, **kwargs): - time.sleep(60) - - monkeypatch.setattr(hglib, "clone", clone_timeout) - - with pytest.raises(CloneTimeoutError): - HgBundle20Loader.clone_with_timeout( - log, "https://www.mercurial-scm.org/repo/hello", tmp_path, 1 - ) - - for record in caplog.records: - assert record.levelname == "WARNING" - assert "https://www.mercurial-scm.org/repo/hello" in record.getMessage() - assert record.args == ("https://www.mercurial-scm.org/repo/hello", 1) - - -def test_clone_with_timeout_returns(caplog, tmp_path, monkeypatch): - log = logging.getLogger("test_clone_with_timeout") - - def clone_return(source, dest, *args, **kwargs): - return (source, dest) - - monkeypatch.setattr(hglib, "clone", clone_return) - - assert HgBundle20Loader.clone_with_timeout( - log, "https://www.mercurial-scm.org/repo/hello", tmp_path, 1 - ) == ("https://www.mercurial-scm.org/repo/hello", tmp_path) - - -def test_clone_with_timeout_exception(caplog, tmp_path, monkeypatch): - log = logging.getLogger("test_clone_with_timeout") - - def clone_return(source, dest, *args, **kwargs): - raise ValueError("Test exception") - - monkeypatch.setattr(hglib, "clone", clone_return) - - with pytest.raises(ValueError) as excinfo: - HgBundle20Loader.clone_with_timeout( - log, "https://www.mercurial-scm.org/repo/hello", tmp_path, 1 - ) - assert "Test exception" in excinfo.value.args[0] diff --git a/swh/loader/mercurial/tests/test_tasks.py b/swh/loader/mercurial/tests/test_tasks.py deleted file mode 100644 --- a/swh/loader/mercurial/tests/test_tasks.py +++ /dev/null @@ -1,47 +0,0 @@ -# Copyright (C) 2018-2020 The Software Heritage developers -# See the AUTHORS file at the top-level directory of this distribution -# License: GNU General Public License version 3, or any later version -# See top-level LICENSE file for more information - - -def test_loader( - mocker, swh_config, swh_scheduler_celery_app, swh_scheduler_celery_worker -): - mock_loader = mocker.patch("swh.loader.mercurial.from_disk.HgLoaderFromDisk.load") - mock_loader.return_value = {"status": "eventful"} - - res = swh_scheduler_celery_app.send_task( - "swh.loader.mercurial.tasks.LoadMercurial", - kwargs={"url": "origin_url", "directory": "/some/repo", "visit_date": "now",}, - ) - - assert res - res.wait() - assert res.successful() - - assert res.result == {"status": "eventful"} - mock_loader.assert_called_once_with() - - -def test_archive_loader( - mocker, swh_config, swh_scheduler_celery_app, swh_scheduler_celery_worker -): - mock_loader = mocker.patch( - "swh.loader.mercurial.from_disk.HgArchiveLoaderFromDisk.load" - ) - mock_loader.return_value = {"status": "uneventful"} - - res = swh_scheduler_celery_app.send_task( - "swh.loader.mercurial.tasks.LoadArchiveMercurial", - kwargs={ - "url": "another_url", - "archive_path": "/some/tar.tgz", - "visit_date": "now", - }, - ) - assert res - res.wait() - assert res.successful() - - assert res.result == {"status": "uneventful"} - mock_loader.assert_called_once_with()