diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 380c658..69b3349 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -1,46 +1,40 @@ repos: - repo: https://github.com/pre-commit/pre-commit-hooks rev: v2.4.0 hooks: - id: trailing-whitespace - id: flake8 - id: check-json - id: check-yaml - repo: https://github.com/codespell-project/codespell rev: v1.16.0 hooks: - id: codespell - repo: local hooks: - id: mypy name: mypy entry: mypy args: [swh] pass_filenames: false language: system types: [python] +- repo: https://github.com/python/black + rev: 19.10b0 + hooks: + - id: black + # unfortunately, we are far from being able to enable this... # - repo: https://github.com/PyCQA/pydocstyle.git # rev: 4.0.0 # hooks: # - id: pydocstyle # name: pydocstyle # description: pydocstyle is a static analysis tool for checking compliance with Python docstring conventions. # entry: pydocstyle --convention=google # language: python # types: [python] -# black requires py3.6+ -#- repo: https://github.com/python/black -# rev: 19.3b0 -# hooks: -# - id: black -# language_version: python3 -#- repo: https://github.com/asottile/blacken-docs -# rev: v1.0.0-1 -# hooks: -# - id: blacken-docs -# additional_dependencies: [black==19.3b0] diff --git a/setup.cfg b/setup.cfg new file mode 100644 index 0000000..8d79b7e --- /dev/null +++ b/setup.cfg @@ -0,0 +1,6 @@ +[flake8] +# E203: whitespaces before ':' +# E231: missing whitespace after ',' +# W503: line break before binary operator +ignore = E203,E231,W503 +max-line-length = 88 diff --git a/setup.py b/setup.py index d5669f1..30b6ed9 100755 --- a/setup.py +++ b/setup.py @@ -1,70 +1,69 @@ #!/usr/bin/env python3 # Copyright (C) 2015-2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from setuptools import setup, find_packages from os import path from io import open here = path.abspath(path.dirname(__file__)) # Get the long description from the README file -with open(path.join(here, 'README.md'), encoding='utf-8') as f: +with open(path.join(here, "README.md"), encoding="utf-8") as f: long_description = f.read() def parse_requirements(name=None): if name: - reqf = 'requirements-%s.txt' % name + reqf = "requirements-%s.txt" % name else: - reqf = 'requirements.txt' + reqf = "requirements.txt" requirements = [] if not path.exists(reqf): return requirements with open(reqf) as f: for line in f.readlines(): line = line.strip() - if not line or line.startswith('#'): + if not line or line.startswith("#"): continue requirements.append(line) return requirements setup( - name='swh.loader.mercurial', - description='Software Heritage Mercurial Loader', + name="swh.loader.mercurial", + description="Software Heritage Mercurial Loader", long_description=long_description, - long_description_content_type='text/markdown', - author='Software Heritage developers', - author_email='swh-devel@inria.fr', - url='https://forge.softwareheritage.org/diffusion/DLDHG/', + long_description_content_type="text/markdown", + author="Software Heritage developers", + author_email="swh-devel@inria.fr", + url="https://forge.softwareheritage.org/diffusion/DLDHG/", packages=find_packages(), scripts=[], - install_requires=parse_requirements() + parse_requirements('swh'), - setup_requires=['vcversioner'], - extras_require={'testing': parse_requirements('test')}, + install_requires=parse_requirements() + parse_requirements("swh"), + setup_requires=["vcversioner"], + extras_require={"testing": parse_requirements("test")}, vcversioner={}, include_package_data=True, - entry_points=''' + entry_points=""" [swh.workers] loader.mercurial=swh.loader.mercurial:register - ''', + """, classifiers=[ "Programming Language :: Python :: 3", "Intended Audience :: Developers", "License :: OSI Approved :: GNU General Public License v3 (GPLv3)", "Operating System :: OS Independent", "Development Status :: 4 - Beta", ], project_urls={ - 'Bug Reports': 'https://forge.softwareheritage.org/maniphest', - 'Funding': 'https://www.softwareheritage.org/donate', - 'Source': ( - 'https://forge.softwareheritage.org/source/swh-loader-mercurial'), + "Bug Reports": "https://forge.softwareheritage.org/maniphest", + "Funding": "https://www.softwareheritage.org/donate", + "Source": ("https://forge.softwareheritage.org/source/swh-loader-mercurial"), }, ) diff --git a/swh/loader/mercurial/__init__.py b/swh/loader/mercurial/__init__.py index b8234c8..7f81f85 100644 --- a/swh/loader/mercurial/__init__.py +++ b/swh/loader/mercurial/__init__.py @@ -1,16 +1,17 @@ # Copyright (C) 2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from typing import Any, Mapping def register() -> Mapping[str, Any]: """Register the current worker module's definition""" from .loader import HgBundle20Loader + return { - 'task_modules': [f'{__name__}.tasks'], - 'loader': HgBundle20Loader, + "task_modules": [f"{__name__}.tasks"], + "loader": HgBundle20Loader, } diff --git a/swh/loader/mercurial/archive_extract.py b/swh/loader/mercurial/archive_extract.py index 1cbe9b0..47fa999 100644 --- a/swh/loader/mercurial/archive_extract.py +++ b/swh/loader/mercurial/archive_extract.py @@ -1,54 +1,56 @@ # Copyright (C) 2017-2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import os import tempfile import patoolib import shutil -def tmp_extract(archive, dir=None, prefix=None, suffix=None, log=None, - source=None): +def tmp_extract(archive, dir=None, prefix=None, suffix=None, log=None, source=None): """Extract an archive to a temporary location with optional logs. Args: archive (string): Absolute path of the archive to be extracted prefix (string): Optional modifier to the temporary storage directory name. (I guess in case something goes wrong and you want to go look?) log (python logging instance): Optional for recording extractions. source (string): Optional source URL of the archive for adding to log messages. Returns: A context manager for a temporary directory that automatically removes itself. See: help(tempfile.TemporaryDirectory) """ - logstr = 'From %s - ' % source if log and source else '' + logstr = "From %s - " % source if log and source else "" if dir and not os.path.exists(dir): os.makedirs(dir, exist_ok=True) archive_base = os.path.basename(archive) - if archive_base[0] == '.': - package = '.' + archive_base.split('.')[1] + if archive_base[0] == ".": + package = "." + archive_base.split(".")[1] else: - package = archive_base.split('.')[0] + package = archive_base.split(".")[0] tmpdir = tempfile.mkdtemp(dir=dir, prefix=prefix, suffix=suffix) repo_path = os.path.join(tmpdir, package) try: patoolib.extract_archive(archive, interactive=False, outdir=tmpdir) except Exception as e: if os.path.exists(tmpdir): shutil.rmtree(tmpdir) - msg = '%sFailed to uncompress archive %s at %s - %s' % ( - logstr, archive_base, repo_path, e) + msg = "%sFailed to uncompress archive %s at %s - %s" % ( + logstr, + archive_base, + repo_path, + e, + ) raise ValueError(msg) if log: - log.info('%sUncompressing archive %s at %s' % ( - logstr, archive_base, repo_path)) + log.info("%sUncompressing archive %s at %s" % (logstr, archive_base, repo_path)) return tmpdir diff --git a/swh/loader/mercurial/bundle20_reader.py b/swh/loader/mercurial/bundle20_reader.py index 10b96a3..a69b63b 100644 --- a/swh/loader/mercurial/bundle20_reader.py +++ b/swh/loader/mercurial/bundle20_reader.py @@ -1,624 +1,622 @@ # Copyright (C) 2017 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information """This document contains code for extracting all of the data from Mercurial version 2 bundle file. It is referenced by bundle20_loader.py """ # ============================================================================= # ============================================================================= # BACKGROUND # ============================================================================= # ============================================================================= # # https://www.mercurial-scm.org/wiki/BundleFormat says: # "The new bundle format design is described on the BundleFormat2 page." # # https://www.mercurial-scm.org/wiki/BundleFormat2#Format_of_the_Bundle2_Container says: # noqa # "The latest description of the binary format can be found as comment in the # Mercurial source code." # # https://www.mercurial-scm.org/repo/hg/file/default/mercurial/help/internals/bundles.txt says: # noqa # "The 'HG20' format is not yet documented here. See the inline comments in # 'mercurial/exchange.py' for now." # # ----------------------------------------------------------------------------- # Avi says: # ----------------------------------------------------------------------------- # # All of the above official(?) statements seem to be quite wrong. # # The mercurial-scm wiki is a cluster#@*& of missing pages, bad links, wrong # information, obsolete information, undecipherable names, and half-started # leavings that only sort of look like content. I don't understand who or what # it's there for. I think that means it's not there for me? # # https://www.mercurial-scm.org/wiki/BundleFormat2#New_header is wrong and # bizarre, and there isn't any other information on the page. # # # https://www.mercurial-scm.org/repo/hg/file/de86a6872d06/mercurial/help/internals/changegroups.txt # noqa # (`hg help internals.changegroups`) is very close to what we need. # It is accurate, current, and thorough. # It describes much of the internal structure, which is super helpful if you # know in advance which info can be trusted, but it doesn't describe any of the # file-level details, including the file headers and that the entire bundle # is broken into overlaid 4KB chunks starting from just after the bundle # header, nor does it describe what any of the component elements are used for, # nor does it explain the meta-message segment in the blob deltas, nor does it # explain the file flags occasionally appended to manifest file hashes. Also it # says: "The [delta data] format is described more fully in 'hg help # internals.bdiff'", which is also wrong. As far as I can tell, that # file has never existed. # # It does however have one potentially extremely useful note buried in the # middle that, in hindsight, could have significant implications for complexity # and performance in future Mercurial loading work. # # It says: "In version 1, the delta is always applied against the previous node # from the changegroup or the first parent if this is the first entry in the # changegroup." # # If the next version of HG support for SWH can reliably get version 1 data, # then it could be implemented entirely without worrying about ballooning # memory utilization, which would shrink the code significantly and probably be # faster too. So maybe HG10 bundles instead of HG20 bundles are superior for # this task? But then I read that servers can optionally disable serving # version 1 content, and I like to think that this loader could eventually # be applied directly to a network stream without an intermediate phase for # cloning and local bundling, so...It seemed like a good idea at the time? # # ----------------------------------------------------------------------------- # Other notes and thoughts: # ----------------------------------------------------------------------------- # 1) # This is a relatively minor detail, but # Mercurial nodes are not content-addressable like Git's are. # # https://www.mercurial-scm.org/wiki/Nodeid explains: "If you modify a file, # commit the change, and then modify it to restore the original contents, the # contents are the same but the history is different, so the file will get a # new nodeid. This history-sensitivity is obtained by calculating the nodeid # from the concatenation of the parent nodeids with the file's contents..." # # The result is that we always have to collect and hash everything at least # once in order to know if we've seen something like it before, because nothing # tells us that the node we're looking at is unique. We can use node ids for # linking disparate elements together (e.g. commit to manifest) but not for # determining whether two elements in the same group are identical in all but # descendency. So there's no way to save time on duplicate hashing. Well... # there is the copied_file blob metadata, but, lol. # # 2) # Most of the code complexity is due to dealing with 'version 2' changegroups, # for which we need to keep track of the entire history of all updates made # to a given file or working directory tree structure, because a revision # delta could be applied over any of the prior revisions all the way back to # rev 0, according to whenever files were branched/merged/uncommitted/etc. For # very large repositories with a lot of churn, this can quickly expand to # require multiple gigabytes of space, possibly exceeding RAM availability if # one desires to keep as much data resident in memory as possible to boost # performance. mozilla-unified, for instance, produces some 2 million+ blobs # (1.6 million+ unique). Nested umpteen subdirectory levels deep, those blobs # balloon into a quantity of directory subtrees that rapidly exceeds an 8GB RAM # laptop's ability to keep them all active without a good amount of care and # pruning. The code here tries to strike a balance between memory utilization # and performance. # # This problem is also referenced in the last paragraph of the previous # section, where potentially this problem wouldn't exist for 'version 1' data # if we can reliably get it. Can we? Either that or not use bundles at all, # which has other costs. # # 3) # If the list of changed files stored by the changesets had indicated which # of those changed files were added or modified and which ones were removed, # this code could be much faster. Right now we have to perform a huge number of # substring replacements (see the apply_revdata method) to produce a complete # file manifest for each commit (as a string!!!) in order to know how to get # the set of removed files from the next delta. We can intuit from every # manifest delta which files were modified or added, but I believe there's no # way to intuit which files were removed without actually having the complete # prior state and without the list of removals being explicitly given. If you # have an explicit list of all the files that were removed for any given commit # changegroup, and you combine that with the delta updates in the manifest # changegroups which detail the set of files that have been added or modified, # then you wouldn't even have to apply any of the string deltas to get a # complete understanding of the set of differences between one manifest and the # next. Not having this effective speed boost is rather unfortunate; it would # require only one extra stored byte per commit to differentiate removals and # would make extracting bundles lightning fast. # ============================================================================ ## import itertools import struct from binascii import unhexlify from collections import OrderedDict from datetime import datetime from .chunked_reader import ChunkedFileReader from .objects import SelectiveCache def unpack(fmt_str, source): """Utility function for fetching the right number of bytes from a stream to satisfy a struct.unpack pattern. args: fmt_str: a struct.unpack string pattern (e.g. '>I' for 4 bytes big-endian) source: any IO object that has a read() method which returns an appropriate sequence of bytes """ ret = struct.unpack(fmt_str, source.read(struct.calcsize(fmt_str))) if len(ret) == 1: return ret[0] return ret class Bundle20Reader(object): """Parser for extracting data from Mercurial Bundle20 files. NOTE: Currently only works on uncompressed HG20 bundles, but checking for COMPRESSION=<2chars> and loading the appropriate stream decompressor at that point would be trivial to add if necessary. args: bundlefile (str): name of the binary repository bundle file cache_filename (str): path to the disk cache used (transited to the SelectiveCache instance) cache_size (int): tuning parameter for the upper RAM limit used by historical data caches. The default is defined in the SelectiveCache class. """ - NAUGHT_NODE = b'\x00' * 20 + + NAUGHT_NODE = b"\x00" * 20 def __init__(self, bundlefile, cache_filename, cache_size=None): self.bundlefile = bundlefile self.cache_filename = cache_filename - bfile = open(bundlefile, 'rb', buffering=200*1024*1024) + bfile = open(bundlefile, "rb", buffering=200 * 1024 * 1024) btype = bfile.read(4) # 'HG20' - if btype != b'HG20': - raise Exception(bundlefile, - b'Not an HG20 bundle. First 4 bytes:' + btype) + if btype != b"HG20": + raise Exception(bundlefile, b"Not an HG20 bundle. First 4 bytes:" + btype) bfile.read(4) # '\x00\x00\x00\x00' self.params = self.read_bundle_header(bfile) # print('PARAMETERS', self.params) - self.num_commits = self.params[b'nbchanges'] + self.num_commits = self.params[b"nbchanges"] self.filereader = ChunkedFileReader(bfile) self.cache_size = cache_size self.blobs_offset = None self.changes_offset = self.filereader.tell() self.changes_next_offset = None self.manifests_offset = None self.manifests_next_offset = None self.id_to_info = {} def read_bundle_header(self, bfile): """Parse the file header which describes the format and parameters. See the structure diagram at the top of the file for more insight. args: bfile: bundle file handle with the cursor at the start offset of the content header (the 9th byte in the file) returns: dict of decoded bundle parameters """ - unpack('>I', bfile) # header length - chg_len = unpack('>B', bfile) # len('CHANGEGROUP') == 11 + unpack(">I", bfile) # header length + chg_len = unpack(">B", bfile) # len('CHANGEGROUP') == 11 bfile.read(chg_len) # should say 'CHANGEGROUP' - unpack('>I', bfile) # probably \x00\x00\x00\x00 + unpack(">I", bfile) # probably \x00\x00\x00\x00 - n_mandatory, n_advisory = unpack('>BB', bfile) # parameter counts + n_mandatory, n_advisory = unpack(">BB", bfile) # parameter counts mandatory_params = [ - (key_len, val_len) - for key_len, val_len - in [unpack('>BB', bfile) for i in range(n_mandatory)] - ] + (key_len, val_len) + for key_len, val_len in [unpack(">BB", bfile) for i in range(n_mandatory)] + ] advisory_params = [ - (key_len, val_len) - for key_len, val_len - in [unpack('>BB', bfile) for i in range(n_advisory)] - ] + (key_len, val_len) + for key_len, val_len in [unpack(">BB", bfile) for i in range(n_advisory)] + ] params = {} - for key_len, val_len in mandatory_params+advisory_params: - key = unpack('>%ds' % key_len, bfile) - val = int(unpack('>%ds' % val_len, bfile)) + for key_len, val_len in mandatory_params + advisory_params: + key = unpack(">%ds" % key_len, bfile) + val = int(unpack(">%ds" % val_len, bfile)) params[key] = val return params def revdata_iterator(self, bytes_to_read): """A chunk's revdata section is a series of start/end/length/data_delta content updates called RevDiffs that indicate components of a text diff applied to the node's basenode. The sum length of all diffs is the length indicated at the beginning of the chunk at the start of the header. See the structure diagram at the top of the file for more insight. args: bytes_to_read: int total number of bytes in the chunk's revdata yields: (int, int, read iterator) representing a single text diff component """ while bytes_to_read > 0: - start_offset = unpack('>I', self.filereader) - end_offset = unpack('>I', self.filereader) - blocklen = unpack('>I', self.filereader) + start_offset = unpack(">I", self.filereader) + end_offset = unpack(">I", self.filereader) + blocklen = unpack(">I", self.filereader) delta_it = self.filereader.read_iterator(blocklen) - bytes_to_read -= (12 + blocklen) + bytes_to_read -= 12 + blocklen yield (start_offset, end_offset, delta_it) # RevDiff def read_chunk_header(self): """The header of a RevChunk describes the id ('node') for the current change, the commit id ('linknode') associated with this change, the parental heritage ('p1' and 'p2'), and the node to which the revdata updates will apply ('basenode'). 'linknode' is the same as 'node' when reading the commit log because any commit is already itself. 'basenode' for a changeset will be NAUGHT_NODE, because changeset chunks include complete information and not diffs. See the structure diagram at the top of the file for more insight. returns: dict of the next delta header """ header = self.filereader.read(100) header = { - 'node': header[0:20], - 'p1': header[20:40], - 'p2': header[40:60], - 'basenode': header[60:80], - 'linknode': header[80:100] + "node": header[0:20], + "p1": header[20:40], + "p2": header[40:60], + "basenode": header[60:80], + "linknode": header[80:100], } return header def read_revchunk(self): """Fetch a complete RevChunk. A RevChunk contains the collection of line changes made in a particular update. header['node'] identifies which update. Commits, manifests, and files all have these. Each chunk contains an indicator of the whole chunk size, an update header, and then the body of the update as a series of text diff components. See the structure diagram at the top of the file for more insight. returns: tuple(dict, iterator) of (header, chunk data) if there is another chunk in the group, else None """ - size = unpack('>I', self.filereader) - 104 + size = unpack(">I", self.filereader) - 104 if size >= 0: header = self.read_chunk_header() return (header, self.revdata_iterator(size)) else: return None # NullChunk def extract_commit_metadata(self, data): """Converts the binary commit metadata format into a dict. args: data: bytestring of encoded commit information returns: dict of decoded commit information """ - parts, message = data.split(b'\n\n', 1) - parts = parts.split(b'\n') + parts, message = data.split(b"\n\n", 1) + parts = parts.split(b"\n") commit = {} - commit['message'] = message - commit['manifest'] = unhexlify(parts[0]) - commit['user'] = parts[1] - tstamp, tz, *extra = parts[2].split(b' ') - commit['time'] = datetime.fromtimestamp(float(tstamp)) - commit['time_offset_seconds'] = int(tz) + commit["message"] = message + commit["manifest"] = unhexlify(parts[0]) + commit["user"] = parts[1] + tstamp, tz, *extra = parts[2].split(b" ") + commit["time"] = datetime.fromtimestamp(float(tstamp)) + commit["time_offset_seconds"] = int(tz) if extra: - commit['extra'] = b' '.join(extra) - commit['changed_files'] = parts[3:] + commit["extra"] = b" ".join(extra) + commit["changed_files"] = parts[3:] return commit def skip_sections(self, num_sections=1): """Skip past sections quickly. args: num_sections: int number of sections to skip """ for i in range(num_sections): - size = unpack('>I', self.filereader) + size = unpack(">I", self.filereader) while size >= 104: self.filereader.seek(size - 4, from_current=True) - size = unpack('>I', self.filereader) + size = unpack(">I", self.filereader) def apply_revdata(self, revdata_it, prev_state): """Compose the complete text body for a change from component deltas. args: revdata_it: output from the revdata_iterator method prev_state: bytestring the base complete text on which the new deltas will be applied returns: (bytestring, list, list) the new complete string and lists of added and removed components (used in manifest processing) """ state = [] added = [] removed = [] next_start = 0 for delta_start, delta_end, rev_diff_it in revdata_it: removed.append(prev_state[delta_start:delta_end]) - added.append(b''.join(rev_diff_it)) + added.append(b"".join(rev_diff_it)) state.append(prev_state[next_start:delta_start]) state.append(added[-1]) next_start = delta_end state.append(prev_state[next_start:]) - state = b''.join(state) + state = b"".join(state) return (state, added, removed) def skim_headers(self): """Get all header data from a change group but bypass processing of the contained delta components. yields: output of read_chunk_header method for all chunks in the group """ - size = unpack('>I', self.filereader) - 104 + size = unpack(">I", self.filereader) - 104 while size >= 0: header = self.read_chunk_header() self.filereader.seek(size, from_current=True) yield header - size = unpack('>I', self.filereader) - 104 + size = unpack(">I", self.filereader) - 104 def group_iterator(self): """Bundle sections are called groups. These are composed of one or more revision chunks of delta components. Iterate over all the chunks in a group and hand each one back. yields: see output of read_revchunk method """ revchunk = self.read_revchunk() while revchunk: # A group is terminated by a NullChunk yield revchunk # (header, revdata_iterator) revchunk = self.read_revchunk() def yield_group_objects(self, cache_hints=None, group_offset=None): """Bundles are sectioned into groups: the log of all commits, the log of all manifest changes, and a series of logs of blob changes (one for each file). All groups are structured the same way, as a series of revisions each with a series of delta components. Iterate over the current group and return the completed object data for the current update by applying all of the internal delta components to each prior revision. args: cache_hints: see build_cache_hints (this will be built automatically if not pre-built and passed in) group_offset: int file position of the start of the desired group yields: (dict, bytestring, list, list) the output from read_chunk_header followed by the output from apply_revdata """ if group_offset is not None: self.filereader.seek(group_offset) if cache_hints is None: cache_hints = self.build_cache_hints() - data_cache = SelectiveCache(max_size=self.cache_size, - cache_hints=cache_hints, - filename=self.cache_filename) + data_cache = SelectiveCache( + max_size=self.cache_size, + cache_hints=cache_hints, + filename=self.cache_filename, + ) # Loop over all revisions in the group - data = b'' + data = b"" for header, revdata_it in self.group_iterator(): - node = header['node'] - basenode = header['basenode'] + node = header["node"] + basenode = header["basenode"] - data = data_cache.fetch(basenode) or b'' + data = data_cache.fetch(basenode) or b"" data, added, removed = self.apply_revdata(revdata_it, data) data_cache.store(node, data) yield (header, data, added, removed) # each RevChunk def extract_meta_from_blob(self, data): """File revision data sometimes begins with a metadata section of dubious value. Strip it off and maybe decode it. It seems to be mostly useless. Why indicate that a file node is a copy of another node? You can already get that information from the delta header. args: data: bytestring of one revision of a file, possibly with metadata embedded at the start returns: (bytestring, dict) of (the blob data, the meta information) """ meta = {} - if data.startswith(b'\x01\n'): - empty, metainfo, data = data.split(b'\x01\n', 2) - metainfo = b'\x01\n' + metainfo + b'\x01\n' - if metainfo.startswith(b'copy:'): + if data.startswith(b"\x01\n"): + empty, metainfo, data = data.split(b"\x01\n", 2) + metainfo = b"\x01\n" + metainfo + b"\x01\n" + if metainfo.startswith(b"copy:"): # direct file copy (?) - copyinfo = metainfo.split(b'\n') - meta['copied_file'] = copyinfo[0][6:] - meta['copied_rev'] = copyinfo[1][9:] - elif metainfo.startswith(b'censored:'): + copyinfo = metainfo.split(b"\n") + meta["copied_file"] = copyinfo[0][6:] + meta["copied_rev"] = copyinfo[1][9:] + elif metainfo.startswith(b"censored:"): # censored revision deltas must be full-replacements (?) - meta['censored'] = metainfo + meta["censored"] = metainfo else: # no idea - meta['text'] = metainfo + meta["text"] = metainfo return data, meta def seek_changelog(self): """Seek to the beginning of the change logs section. """ self.filereader.seek(self.changes_offset) def seek_manifests(self): """Seek to the beginning of the manifests section. """ if self.manifests_offset is None: self.seek_changelog() self.skip_sections(1) # skip past commits self.manifests_offset = self.filereader.tell() else: self.filereader.seek(self.manifests_offset) def seek_filelist(self): """Seek to the beginning of the file changes section. """ if self.blobs_offset is None: self.seek_manifests() self.skip_sections(1) # skip past manifests self.blobs_offset = self.filereader.tell() else: self.filereader.seek(self.blobs_offset) def yield_all_blobs(self): """Gets blob data from the bundle. yields: (bytestring, (bytestring, int, dict)) of (blob data, (file name, start offset of the file within the bundle, node header)) """ self.seek_filelist() # Loop through all files that have commits - size = unpack('>I', self.filereader) + size = unpack(">I", self.filereader) while size > 0: - file_name = self.filereader.read(size-4) + file_name = self.filereader.read(size - 4) file_start_offset = self.filereader.tell() # get all of the blobs for each file for header, data, *_ in self.yield_group_objects(): blob, meta = self.extract_meta_from_blob(data) yield blob, (file_name, file_start_offset, header) - size = unpack('>I', self.filereader) + size = unpack(">I", self.filereader) def yield_all_changesets(self): """Gets commit data from the bundle. yields: (dict, dict) of (read_chunk_header output, extract_commit_metadata output) """ self.seek_changelog() for header, data, *_ in self.yield_group_objects(): changeset = self.extract_commit_metadata(data) yield (header, changeset) def yield_all_manifest_deltas(self, cache_hints=None): """Gets manifest data from the bundle. In order to process the manifests in a reasonable amount of time, we want to use only the deltas and not the entire manifest at each change, because if we're processing them in sequential order (we are) then we already have the previous state so we only need the changes. args: cache_hints: see build_cache_hints method yields: (dict, dict, dict) of (read_chunk_header output, extract_manifest_elements output on added/modified files, extract_manifest_elements on removed files) """ self.seek_manifests() for header, data, added, removed in self.yield_group_objects( cache_hints=cache_hints ): added = self.extract_manifest_elements(added) removed = self.extract_manifest_elements(removed) yield (header, added, removed) def build_manifest_hints(self): """Just a minor abstraction shortcut for the build_cache_hints method. returns: see build_cache_hints method """ self.seek_manifests() return self.build_cache_hints() def build_cache_hints(self): """The SelectiveCache class that we use in building nodes can accept a set of key counters that makes its memory usage much more efficient. returns: dict of key=a node id, value=the number of times we will need data from that node when building subsequent nodes """ cur_pos = self.filereader.tell() hints = OrderedDict() prev_node = None for header in self.skim_headers(): - basenode = header['basenode'] + basenode = header["basenode"] if (basenode != self.NAUGHT_NODE) and (basenode != prev_node): # If the base isn't immediately prior, then cache it once more. hints[basenode] = hints.get(basenode, 0) + 1 - prev_node = header['node'] + prev_node = header["node"] self.filereader.seek(cur_pos) return hints def extract_manifest_elements(self, data): """Parses data that looks like a manifest. In practice we only pass in the bits extracted from the application of a manifest delta describing which files were added/modified or which ones were removed. args: data: either a string or a list of strings that, when joined, embodies the composition of a manifest. This takes the form of repetitions of (without the brackets):: b'\x00[flag]\\n' ...repeat... where ``[flag]`` may or may not be there depending on whether the file is specially flagged as executable or something returns: dict: ``{file_path: (file_node, permissions), ...}`` where permissions is given according to the flag that optionally exists in the data """ elements = {} if isinstance(data, str): - data = data.split(b'\n') + data = data.split(b"\n") else: - data = itertools.chain.from_iterable( - [chunk.split(b'\n') for chunk in data] - ) + data = itertools.chain.from_iterable([chunk.split(b"\n") for chunk in data]) for line in data: - if line != b'': - f = line.split(b'\x00') + if line != b"": + f = line.split(b"\x00") node = f[1] flag_bytes = node[40:] elements[f[0]] = ( unhexlify(node[:40]), - b'l' in flag_bytes, - b'755' if (b'x' in flag_bytes) else b'644' + b"l" in flag_bytes, + b"755" if (b"x" in flag_bytes) else b"644", ) return elements diff --git a/swh/loader/mercurial/chunked_reader.py b/swh/loader/mercurial/chunked_reader.py index 7228d77..c028447 100644 --- a/swh/loader/mercurial/chunked_reader.py +++ b/swh/loader/mercurial/chunked_reader.py @@ -1,106 +1,105 @@ # Copyright (C) 2017 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import struct class ChunkedFileReader(object): """A binary stream reader that gives seamless read access to Mercurial's bundle2 HG20 format which is partitioned for some reason at the file level into chunks of [4Bytes:, Bytes:] as if it were encoding transport packets. args: file: rb file handle pre-aligned to the start of the chunked portion size_unpack_fmt: struct format string for unpacking the next chunk size """ - def __init__(self, file, size_unpack_fmt='>I'): + + def __init__(self, file, size_unpack_fmt=">I"): self._file = file self._size_pattern = size_unpack_fmt self._size_bytes = struct.calcsize(size_unpack_fmt) self._bytes_per_chunk = self._chunk_size(True) self._chunk_bytes_left = self._bytes_per_chunk self._offset = self._file.tell() # find the file size self._file.seek(0, 2) # seek to end self._size = self._file.tell() self._file.seek(self._offset, 0) # seek back to original position def _chunk_size(self, first_time=False): """Unpack the next bytes from the file to get the next file chunk size. """ - size = struct.unpack(self._size_pattern, - self._file.read(self._size_bytes))[0] + size = struct.unpack(self._size_pattern, self._file.read(self._size_bytes))[0] return size def size(self): """Returns the file size in bytes. """ return self._size def read(self, bytes_to_read): """Return N bytes from the file as a single block. args: bytes_to_read: int number of bytes of content """ - return b''.join(self.read_iterator(bytes_to_read)) + return b"".join(self.read_iterator(bytes_to_read)) def read_iterator(self, bytes_to_read): """Return a generator that yields N bytes from the file one file chunk at a time. args: bytes_to_read: int number of bytes of content """ while bytes_to_read > self._chunk_bytes_left: yield self._file.read(self._chunk_bytes_left) bytes_to_read -= self._chunk_bytes_left self._chunk_bytes_left = self._chunk_size() self._chunk_bytes_left -= bytes_to_read yield self._file.read(bytes_to_read) def seek(self, new_pos=None, from_current=False): """Wraps the underlying file seek, additionally updating the chunk_bytes_left counter appropriately so that we can start reading from the new location. args: new_pos: new cursor byte position from_current: if True, it treats new_pos as an offset from the current cursor position, bypassing any chunk boundaries as if they weren't there. This should give the same end position as a read except without the reading data part. """ if from_current: - new_pos = new_pos or 0 # None -> current + new_pos = new_pos or 0 # None -> current if new_pos <= self._chunk_bytes_left: new_pos += self._file.tell() else: new_pos += ( self._file.tell() + self._size_bytes + ( ( (new_pos - self._chunk_bytes_left - 1) # aliasing // self._bytes_per_chunk ) * self._size_bytes ) ) else: - new_pos = new_pos or self._offset # None -> start position + new_pos = new_pos or self._offset # None -> start position self._chunk_bytes_left = self._bytes_per_chunk - ( - (new_pos - self._offset) - % (self._bytes_per_chunk + self._size_bytes) + (new_pos - self._offset) % (self._bytes_per_chunk + self._size_bytes) ) self._file.seek(new_pos) def __getattr__(self, item): """Forward other calls to the underlying file object. """ return getattr(self._file, item) diff --git a/swh/loader/mercurial/cli.py b/swh/loader/mercurial/cli.py index 0d32e19..20fc2b2 100644 --- a/swh/loader/mercurial/cli.py +++ b/swh/loader/mercurial/cli.py @@ -1,51 +1,59 @@ # Copyright (C) 2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import click import datetime import logging from itertools import chain -LOGLEVELS = list(chain.from_iterable((logging._levelToName[lvl], - logging._levelToName[lvl].lower()) - for lvl in sorted(logging._levelToName.keys()))) +LOGLEVELS = list( + chain.from_iterable( + (logging._levelToName[lvl], logging._levelToName[lvl].lower()) + for lvl in sorted(logging._levelToName.keys()) + ) +) @click.command() -@click.argument('origin-url') -@click.option('--hg-directory', '-d', - help=('Path to the hg (local) directory to load from. ' - 'If unset, the hg repo will be cloned from the ' - 'given (origin) url.')) -@click.option('--hg-archive', '-a', - help=('Path to the hg archive file to load from.')) -@click.option('--visit-date', '-D', help='Visit date (defaults to now).') -@click.option('--log-level', '-l', - type=click.Choice(LOGLEVELS), - help='Log level.') -def main(origin_url, hg_directory=None, - hg_archive=None, visit_date=None, log_level=None): +@click.argument("origin-url") +@click.option( + "--hg-directory", + "-d", + help=( + "Path to the hg (local) directory to load from. " + "If unset, the hg repo will be cloned from the " + "given (origin) url." + ), +) +@click.option("--hg-archive", "-a", help=("Path to the hg archive file to load from.")) +@click.option("--visit-date", "-D", help="Visit date (defaults to now).") +@click.option("--log-level", "-l", type=click.Choice(LOGLEVELS), help="Log level.") +def main( + origin_url, hg_directory=None, hg_archive=None, visit_date=None, log_level=None +): logging.basicConfig( - level=(log_level or 'DEBUG').upper(), - format='%(asctime)s %(process)d %(message)s') + level=(log_level or "DEBUG").upper(), + format="%(asctime)s %(process)d %(message)s", + ) if not visit_date: visit_date = datetime.datetime.now(tz=datetime.timezone.utc) - kwargs = {'visit_date': visit_date, - 'origin_url': origin_url} + kwargs = {"visit_date": visit_date, "origin_url": origin_url} if hg_archive: from .loader import HgArchiveBundle20Loader as HgLoader - kwargs['archive_path'] = hg_archive + + kwargs["archive_path"] = hg_archive else: from .loader import HgBundle20Loader as HgLoader - kwargs['directory'] = hg_directory + + kwargs["directory"] = hg_directory return HgLoader().load(**kwargs) -if __name__ == '__main__': +if __name__ == "__main__": main() diff --git a/swh/loader/mercurial/converters.py b/swh/loader/mercurial/converters.py index 09bc7a8..a57e90c 100644 --- a/swh/loader/mercurial/converters.py +++ b/swh/loader/mercurial/converters.py @@ -1,42 +1,42 @@ # Copyright (C) 2015-2017 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information -PRIMARY_ALGO = 'sha1_git' +PRIMARY_ALGO = "sha1_git" def parse_author(name_email): """Parse an author line""" if name_email is None: return None try: - open_bracket = name_email.index(b'<') + open_bracket = name_email.index(b"<") except ValueError: name = email = None else: raw_name = name_email[:open_bracket] - raw_email = name_email[open_bracket+1:] + raw_email = name_email[open_bracket + 1 :] if not raw_name: name = None - elif raw_name.endswith(b' '): + elif raw_name.endswith(b" "): name = raw_name[:-1] else: name = raw_name try: - close_bracket = raw_email.index(b'>') + close_bracket = raw_email.index(b">") except ValueError: email = None else: email = raw_email[:close_bracket] return { - 'name': name, - 'email': email, - 'fullname': name_email, + "name": name, + "email": email, + "fullname": name_email, } diff --git a/swh/loader/mercurial/loader.py b/swh/loader/mercurial/loader.py index dc043b0..7d591bc 100644 --- a/swh/loader/mercurial/loader.py +++ b/swh/loader/mercurial/loader.py @@ -1,609 +1,639 @@ # Copyright (C) 2017-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information """This document contains a SWH loader for ingesting repository data from Mercurial version 2 bundle files. """ # NOTE: The code here does expensive work twice in places because of the # intermediate need to check for what is missing before sending to the database # and the desire to not juggle very large amounts of data. # TODO: Decide whether to also serialize to disk and read back more quickly # from there. Maybe only for very large repos and fast drives. # - Avi import datetime import hglib import os from queue import Empty import random import re import time from dateutil import parser from shutil import rmtree from tempfile import mkdtemp from typing import Dict, Iterable, Optional import billiard from swh.model import identifiers from swh.model.model import ( - BaseContent, Content, Directory, ObjectType, Origin, Person, - Release, Revision, RevisionType, SkippedContent, Snapshot, SnapshotBranch, - TargetType, TimestampWithTimezone + BaseContent, + Content, + Directory, + ObjectType, + Origin, + Person, + Release, + Revision, + RevisionType, + SkippedContent, + Snapshot, + SnapshotBranch, + TargetType, + TimestampWithTimezone, ) from swh.model.hashutil import ( - MultiHash, hash_to_hex, hash_to_bytehex, hash_to_bytes, - DEFAULT_ALGORITHMS + MultiHash, + hash_to_hex, + hash_to_bytehex, + hash_to_bytes, + DEFAULT_ALGORITHMS, ) from swh.loader.core.loader import DVCSLoader from swh.loader.core.utils import clean_dangling_folders from . import converters from .archive_extract import tmp_extract from .bundle20_reader import Bundle20Reader from .converters import PRIMARY_ALGO as ALGO from .objects import SelectiveCache, SimpleTree -TAG_PATTERN = re.compile('[0-9A-Fa-f]{40}') +TAG_PATTERN = re.compile("[0-9A-Fa-f]{40}") -TEMPORARY_DIR_PREFIX_PATTERN = 'swh.loader.mercurial.' +TEMPORARY_DIR_PREFIX_PATTERN = "swh.loader.mercurial." -HEAD_POINTER_NAME = b'tip' +HEAD_POINTER_NAME = b"tip" class CloneTimeoutError(Exception): pass class HgBundle20Loader(DVCSLoader): """Mercurial loader able to deal with remote or local repository. """ - CONFIG_BASE_FILENAME = 'loader/mercurial' + + CONFIG_BASE_FILENAME = "loader/mercurial" ADDITIONAL_CONFIG = { - 'bundle_filename': ('str', 'HG20_none_bundle'), - 'reduce_effort': ('bool', False), - 'temp_directory': ('str', '/tmp'), - 'cache1_size': ('int', 800*1024*1024), - 'cache2_size': ('int', 800*1024*1024), - 'clone_timeout_seconds': ('int', 7200), + "bundle_filename": ("str", "HG20_none_bundle"), + "reduce_effort": ("bool", False), + "temp_directory": ("str", "/tmp"), + "cache1_size": ("int", 800 * 1024 * 1024), + "cache2_size": ("int", 800 * 1024 * 1024), + "clone_timeout_seconds": ("int", 7200), } - visit_type = 'hg' + visit_type = "hg" - def __init__(self, url, visit_date=None, directory=None, - logging_class='swh.loader.mercurial.Bundle20Loader'): + def __init__( + self, + url, + visit_date=None, + directory=None, + logging_class="swh.loader.mercurial.Bundle20Loader", + ): super().__init__(logging_class=logging_class) self.origin_url = url self.visit_date = visit_date self.directory = directory - self.bundle_filename = self.config['bundle_filename'] - self.reduce_effort_flag = self.config['reduce_effort'] + self.bundle_filename = self.config["bundle_filename"] + self.reduce_effort_flag = self.config["reduce_effort"] self.empty_repository = None - self.temp_directory = self.config['temp_directory'] - self.cache1_size = self.config['cache1_size'] - self.cache2_size = self.config['cache2_size'] - self.clone_timeout = self.config['clone_timeout_seconds'] + self.temp_directory = self.config["temp_directory"] + self.cache1_size = self.config["cache1_size"] + self.cache2_size = self.config["cache2_size"] + self.clone_timeout = self.config["clone_timeout_seconds"] self.working_directory = None self.bundle_path = None self.heads = {} self.releases = {} def pre_cleanup(self): """Cleanup potential dangling files from prior runs (e.g. OOM killed tasks) """ - clean_dangling_folders(self.temp_directory, - pattern_check=TEMPORARY_DIR_PREFIX_PATTERN, - log=self.log) + clean_dangling_folders( + self.temp_directory, + pattern_check=TEMPORARY_DIR_PREFIX_PATTERN, + log=self.log, + ) def cleanup(self): """Clean temporary working directory """ if self.bundle_path and os.path.exists(self.bundle_path): - self.log.debug('Cleanup up working bundle %s' % self.bundle_path) + self.log.debug("Cleanup up working bundle %s" % self.bundle_path) os.unlink(self.bundle_path) if self.working_directory and os.path.exists(self.working_directory): - self.log.debug('Cleanup up working directory %s' % ( - self.working_directory, )) + self.log.debug( + "Cleanup up working directory %s" % (self.working_directory,) + ) rmtree(self.working_directory) def get_heads(self, repo): """Read the closed branches heads (branch, bookmarks) and returns a dict with key the branch_name (bytes) and values the tuple (pointer nature (bytes), mercurial's node id (bytes)). Those needs conversion to swh-ids. This is taken care of in get_revisions. """ b = {} for _, node_hash_id, pointer_nature, branch_name, *_ in repo.heads(): - b[branch_name] = ( - pointer_nature, hash_to_bytes(node_hash_id.decode())) + b[branch_name] = (pointer_nature, hash_to_bytes(node_hash_id.decode())) bookmarks = repo.bookmarks() if bookmarks and bookmarks[0]: for bookmark_name, _, target_short in bookmarks[0]: target = repo[target_short].node() b[bookmark_name] = (None, hash_to_bytes(target.decode())) return b def prepare_origin_visit(self, *args, **kwargs): self.origin = Origin(url=self.origin_url) visit_date = self.visit_date if isinstance(visit_date, str): # visit_date can be string or datetime visit_date = parser.parse(visit_date) self.visit_date = visit_date self.last_visit = self.storage.origin_visit_get_latest( - self.origin_url, require_snapshot=True) + self.origin_url, require_snapshot=True + ) @staticmethod def clone_with_timeout(log, origin, destination, timeout): queue = billiard.Queue() start = time.monotonic() def do_clone(queue, origin, destination): try: result = hglib.clone(source=origin, dest=destination) except BaseException as e: queue.put(e) else: queue.put(result) - process = billiard.Process(target=do_clone, - args=(queue, origin, destination)) + process = billiard.Process(target=do_clone, args=(queue, origin, destination)) process.start() while True: try: result = queue.get(timeout=0.1) break except Empty: duration = time.monotonic() - start if timeout and duration > timeout: - log.warning('Timeout cloning `%s` within %s seconds', - origin, timeout) + log.warning( + "Timeout cloning `%s` within %s seconds", origin, timeout + ) process.terminate() process.join() raise CloneTimeoutError(origin, timeout) continue process.join() if isinstance(result, Exception): raise result from None return result def prepare(self, *args, **kwargs): """Prepare the necessary steps to load an actual remote or local repository. To load a local repository, pass the optional directory parameter as filled with a path to a real local folder. To load a remote repository, pass the optional directory parameter as None. Args: origin_url (str): Origin url to load visit_date (str/datetime): Date of the visit directory (str/None): The local directory to load """ self.branches = {} self.tags = [] self.releases = {} self.node_2_rev = {} self.heads = {} directory = self.directory if not directory: # remote repository self.working_directory = mkdtemp( prefix=TEMPORARY_DIR_PREFIX_PATTERN, - suffix='-%s' % os.getpid(), - dir=self.temp_directory) + suffix="-%s" % os.getpid(), + dir=self.temp_directory, + ) os.makedirs(self.working_directory, exist_ok=True) self.hgdir = self.working_directory - self.log.debug('Cloning %s to %s with timeout %s seconds', - self.origin_url, self.hgdir, self.clone_timeout) + self.log.debug( + "Cloning %s to %s with timeout %s seconds", + self.origin_url, + self.hgdir, + self.clone_timeout, + ) - self.clone_with_timeout(self.log, self.origin_url, self.hgdir, - self.clone_timeout) + self.clone_with_timeout( + self.log, self.origin_url, self.hgdir, self.clone_timeout + ) else: # local repository self.working_directory = None self.hgdir = directory self.bundle_path = os.path.join(self.hgdir, self.bundle_filename) - self.log.debug('Bundling at %s' % self.bundle_path) + self.log.debug("Bundling at %s" % self.bundle_path) with hglib.open(self.hgdir) as repo: self.heads = self.get_heads(repo) - repo.bundle(bytes(self.bundle_path, 'utf-8'), - all=True, - type=b'none-v2') + repo.bundle(bytes(self.bundle_path, "utf-8"), all=True, type=b"none-v2") self.cache_filename1 = os.path.join( - self.hgdir, 'swh-cache-1-%s' % ( - hex(random.randint(0, 0xffffff))[2:], )) + self.hgdir, "swh-cache-1-%s" % (hex(random.randint(0, 0xFFFFFF))[2:],) + ) self.cache_filename2 = os.path.join( - self.hgdir, 'swh-cache-2-%s' % ( - hex(random.randint(0, 0xffffff))[2:], )) + self.hgdir, "swh-cache-2-%s" % (hex(random.randint(0, 0xFFFFFF))[2:],) + ) try: - self.br = Bundle20Reader(bundlefile=self.bundle_path, - cache_filename=self.cache_filename1, - cache_size=self.cache1_size) + self.br = Bundle20Reader( + bundlefile=self.bundle_path, + cache_filename=self.cache_filename1, + cache_size=self.cache1_size, + ) except FileNotFoundError: # Empty repository! Still a successful visit targeting an # empty snapshot - self.log.warn('%s is an empty repository!' % self.hgdir) + self.log.warn("%s is an empty repository!" % self.hgdir) self.empty_repository = True else: self.reduce_effort = set() if self.reduce_effort_flag: now = datetime.datetime.now(tz=datetime.timezone.utc) if (now - self.visit_date).days > 1: # Assuming that self.visit_date would be today for # a new visit, treat older visit dates as # indication of wanting to skip some processing # effort. for header, commit in self.br.yield_all_changesets(): - ts = commit['time'].timestamp() + ts = commit["time"].timestamp() if ts < self.visit_date.timestamp(): - self.reduce_effort.add(header['node']) + self.reduce_effort.add(header["node"]) def has_contents(self): return not self.empty_repository def has_directories(self): return not self.empty_repository def has_revisions(self): return not self.empty_repository def has_releases(self): return not self.empty_repository def fetch_data(self): """Fetch the data from the data source.""" pass def get_contents(self) -> Iterable[BaseContent]: """Get the contents that need to be loaded.""" # NOTE: This method generates blobs twice to reduce memory usage # without generating disk writes. self.file_node_to_hash = {} hash_to_info = {} self.num_contents = 0 contents: Dict[bytes, BaseContent] = {} missing_contents = set() for blob, node_info in self.br.yield_all_blobs(): self.num_contents += 1 file_name = node_info[0] header = node_info[2] length = len(blob) - if header['linknode'] in self.reduce_effort: + if header["linknode"] in self.reduce_effort: algorithms = set([ALGO]) else: algorithms = DEFAULT_ALGORITHMS h = MultiHash.from_data(blob, hash_names=algorithms) content = h.digest() - content['length'] = length + content["length"] = length blob_hash = content[ALGO] - self.file_node_to_hash[header['node']] = blob_hash + self.file_node_to_hash[header["node"]] = blob_hash - if header['linknode'] in self.reduce_effort: + if header["linknode"] in self.reduce_effort: continue hash_to_info[blob_hash] = node_info - if (self.max_content_size is not None - and length >= self.max_content_size): + if self.max_content_size is not None and length >= self.max_content_size: contents[blob_hash] = SkippedContent( - status='absent', reason='Content too large', **content) + status="absent", reason="Content too large", **content + ) else: - contents[blob_hash] = Content( - data=blob, status='visible', **content) + contents[blob_hash] = Content(data=blob, status="visible", **content) - if file_name == b'.hgtags': + if file_name == b".hgtags": # https://www.mercurial-scm.org/wiki/GitConcepts#Tag_model # overwrite until the last one - self.tags = (t for t in blob.split(b'\n') if t != b'') + self.tags = (t for t in blob.split(b"\n") if t != b"") if contents: missing_contents = set( self.storage.content_missing( - map(lambda c: c.to_dict(), contents.values()), - key_hash=ALGO + map(lambda c: c.to_dict(), contents.values()), key_hash=ALGO ) ) # Clusters needed blobs by file offset and then only fetches the # groups at the needed offsets. focs: Dict[int, Dict[bytes, bytes]] = {} # "file/offset/contents" for blob_hash in missing_contents: _, file_offset, header = hash_to_info[blob_hash] focs.setdefault(file_offset, {}) - focs[file_offset][header['node']] = blob_hash + focs[file_offset][header["node"]] = blob_hash for offset, node_hashes in sorted(focs.items()): - for header, data, *_ in self.br.yield_group_objects( - group_offset=offset - ): - node = header['node'] + for header, data, *_ in self.br.yield_group_objects(group_offset=offset): + node = header["node"] if node in node_hashes: blob, meta = self.br.extract_meta_from_blob(data) content = contents.pop(node_hashes[node], None) if content: - if (self.max_content_size is not None - and len(blob) >= self.max_content_size): + if ( + self.max_content_size is not None + and len(blob) >= self.max_content_size + ): yield SkippedContent.from_data( - blob, reason='Content too large') + blob, reason="Content too large" + ) else: yield Content.from_data(blob) def load_directories(self): """This is where the work is done to convert manifest deltas from the repository bundle into SWH directories. """ self.mnode_to_tree_id = {} cache_hints = self.br.build_manifest_hints() def tree_size(t): return t.size() - self.trees = SelectiveCache(cache_hints=cache_hints, - size_function=tree_size, - filename=self.cache_filename2, - max_size=self.cache2_size) + self.trees = SelectiveCache( + cache_hints=cache_hints, + size_function=tree_size, + filename=self.cache_filename2, + max_size=self.cache2_size, + ) tree = SimpleTree() - for header, added, removed in self.br.yield_all_manifest_deltas( - cache_hints - ): - node = header['node'] - basenode = header['basenode'] + for header, added, removed in self.br.yield_all_manifest_deltas(cache_hints): + node = header["node"] + basenode = header["basenode"] tree = self.trees.fetch(basenode) or tree # working tree for path in removed.keys(): tree = tree.remove_tree_node_for_path(path) for path, info in added.items(): file_node, is_symlink, perms_code = info tree = tree.add_blob( - path, - self.file_node_to_hash[file_node], - is_symlink, - perms_code + path, self.file_node_to_hash[file_node], is_symlink, perms_code ) - if header['linknode'] in self.reduce_effort: + if header["linknode"] in self.reduce_effort: self.trees.store(node, tree) else: new_dirs = [] self.mnode_to_tree_id[node] = tree.hash_changed(new_dirs) self.trees.store(node, tree) yield header, tree, new_dirs def get_directories(self) -> Iterable[Directory]: """Compute directories to load """ dirs = {} self.num_directories = 0 for _, _, new_dirs in self.load_directories(): for d in new_dirs: self.num_directories += 1 - dirs[d['id']] = Directory.from_dict(d) + dirs[d["id"]] = Directory.from_dict(d) missing_dirs = list(dirs.keys()) if missing_dirs: missing_dirs = self.storage.directory_missing(missing_dirs) for _id in missing_dirs: yield dirs[_id] dirs = {} def get_revisions(self) -> Iterable[Revision]: """Compute revisions to load """ revisions = {} self.num_revisions = 0 for header, commit in self.br.yield_all_changesets(): - if header['node'] in self.reduce_effort: + if header["node"] in self.reduce_effort: continue self.num_revisions += 1 - date_dict = identifiers.normalize_timestamp( - int(commit['time'].timestamp()) - ) - author_dict = converters.parse_author(commit['user']) - if commit['manifest'] == Bundle20Reader.NAUGHT_NODE: + date_dict = identifiers.normalize_timestamp(int(commit["time"].timestamp())) + author_dict = converters.parse_author(commit["user"]) + if commit["manifest"] == Bundle20Reader.NAUGHT_NODE: directory_id = SimpleTree().hash_changed() else: - directory_id = self.mnode_to_tree_id[commit['manifest']] + directory_id = self.mnode_to_tree_id[commit["manifest"]] extra_meta = [] - extra = commit.get('extra') + extra = commit.get("extra") if extra: - for e in extra.split(b'\x00'): - k, v = e.split(b':', 1) - k = k.decode('utf-8') + for e in extra.split(b"\x00"): + k, v = e.split(b":", 1) + k = k.decode("utf-8") # transplant_source stores binary reference to a changeset # prefer to dump hexadecimal one in the revision metadata - if k == 'transplant_source': + if k == "transplant_source": v = hash_to_bytehex(v) extra_meta.append([k, v]) parents = [] - p1 = self.node_2_rev.get(header['p1']) - p2 = self.node_2_rev.get(header['p2']) + p1 = self.node_2_rev.get(header["p1"]) + p2 = self.node_2_rev.get(header["p2"]) if p1: parents.append(p1) if p2: parents.append(p2) revision = Revision( author=Person.from_dict(author_dict), date=TimestampWithTimezone.from_dict(date_dict), committer=Person.from_dict(author_dict), committer_date=TimestampWithTimezone.from_dict(date_dict), type=RevisionType.MERCURIAL, directory=directory_id, - message=commit['message'], + message=commit["message"], metadata={ - 'node': hash_to_hex(header['node']), - 'extra_headers': [ - ['time_offset_seconds', - str(commit['time_offset_seconds']).encode('utf-8')], - ] + extra_meta + "node": hash_to_hex(header["node"]), + "extra_headers": [ + [ + "time_offset_seconds", + str(commit["time_offset_seconds"]).encode("utf-8"), + ], + ] + + extra_meta, }, synthetic=False, parents=parents, ) - self.node_2_rev[header['node']] = revision.id + self.node_2_rev[header["node"]] = revision.id revisions[revision.id] = revision # Converts heads to use swh ids self.heads = { branch_name: (pointer_nature, self.node_2_rev[node_id]) for branch_name, (pointer_nature, node_id) in self.heads.items() } missing_revs = set(revisions.keys()) if missing_revs: - missing_revs = set( - self.storage.revision_missing(missing_revs) - ) + missing_revs = set(self.storage.revision_missing(missing_revs)) for rev in missing_revs: yield revisions[rev] self.mnode_to_tree_id = None - def _read_tag(self, tag, split_byte=b' '): + def _read_tag(self, tag, split_byte=b" "): node, *name = tag.split(split_byte) name = split_byte.join(name) return node, name def get_releases(self) -> Iterable[Release]: """Get the releases that need to be loaded.""" self.num_releases = 0 releases = {} missing_releases = set() for t in self.tags: self.num_releases += 1 node, name = self._read_tag(t) node = node.decode() node_bytes = hash_to_bytes(node) if not TAG_PATTERN.match(node): - self.log.warn('Wrong pattern (%s) found in tags. Skipping' % ( - node, )) + self.log.warn("Wrong pattern (%s) found in tags. Skipping" % (node,)) continue if node_bytes not in self.node_2_rev: - self.log.warn('No matching revision for tag %s ' - '(hg changeset: %s). Skipping' % - (name.decode(), node)) + self.log.warn( + "No matching revision for tag %s " + "(hg changeset: %s). Skipping" % (name.decode(), node) + ) continue tgt_rev = self.node_2_rev[node_bytes] release = Release( name=name, target=tgt_rev, target_type=ObjectType.REVISION, message=None, metadata=None, synthetic=False, - author=Person(name=None, email=None, fullname=b''), + author=Person(name=None, email=None, fullname=b""), date=None, ) missing_releases.add(release.id) releases[release.id] = release self.releases[name] = release.id if missing_releases: - missing_releases = set( - self.storage.release_missing(missing_releases)) + missing_releases = set(self.storage.release_missing(missing_releases)) for _id in missing_releases: yield releases[_id] def get_snapshot(self) -> Snapshot: """Get the snapshot that need to be loaded.""" branches: Dict[bytes, Optional[SnapshotBranch]] = {} for name, (pointer_nature, target) in self.heads.items(): branches[name] = SnapshotBranch( - target=target, target_type=TargetType.REVISION) + target=target, target_type=TargetType.REVISION + ) if pointer_nature == HEAD_POINTER_NAME: - branches[b'HEAD'] = SnapshotBranch( - target=name, target_type=TargetType.ALIAS) + branches[b"HEAD"] = SnapshotBranch( + target=name, target_type=TargetType.ALIAS + ) for name, target in self.releases.items(): branches[name] = SnapshotBranch( - target=target, target_type=TargetType.RELEASE) + target=target, target_type=TargetType.RELEASE + ) self.snapshot = Snapshot(branches=branches) return self.snapshot def get_fetch_history_result(self): """Return the data to store in fetch_history.""" return { - 'contents': self.num_contents, - 'directories': self.num_directories, - 'revisions': self.num_revisions, - 'releases': self.num_releases, + "contents": self.num_contents, + "directories": self.num_directories, + "revisions": self.num_revisions, + "releases": self.num_releases, } def load_status(self): snapshot = self.get_snapshot() - load_status = 'eventful' - if (self.last_visit is not None and - self.last_visit['snapshot'] == snapshot.id): - load_status = 'uneventful' + load_status = "eventful" + if self.last_visit is not None and self.last_visit["snapshot"] == snapshot.id: + load_status = "uneventful" return { - 'status': load_status, + "status": load_status, } class HgArchiveBundle20Loader(HgBundle20Loader): """Mercurial loader for repository wrapped within archives. """ + def __init__(self, url, visit_date=None, archive_path=None): super().__init__( - url, visit_date=visit_date, - logging_class='swh.loader.mercurial.HgArchiveBundle20Loader') + url, + visit_date=visit_date, + logging_class="swh.loader.mercurial.HgArchiveBundle20Loader", + ) self.temp_dir = None self.archive_path = archive_path def prepare(self, *args, **kwargs): - self.temp_dir = tmp_extract(archive=self.archive_path, - dir=self.temp_directory, - prefix=TEMPORARY_DIR_PREFIX_PATTERN, - suffix='.dump-%s' % os.getpid(), - log=self.log, - source=self.origin_url) + self.temp_dir = tmp_extract( + archive=self.archive_path, + dir=self.temp_directory, + prefix=TEMPORARY_DIR_PREFIX_PATTERN, + suffix=".dump-%s" % os.getpid(), + log=self.log, + source=self.origin_url, + ) repo_name = os.listdir(self.temp_dir)[0] self.directory = os.path.join(self.temp_dir, repo_name) super().prepare(*args, **kwargs) def cleanup(self): if self.temp_dir and os.path.exists(self.temp_dir): rmtree(self.temp_dir) super().cleanup() diff --git a/swh/loader/mercurial/objects.py b/swh/loader/mercurial/objects.py index fc86133..59a33af 100644 --- a/swh/loader/mercurial/objects.py +++ b/swh/loader/mercurial/objects.py @@ -1,420 +1,413 @@ # Copyright (C) 2017-2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information """This document contains various helper classes used in converting Mercurial bundle files into SWH Contents, Directories, etc. """ import binascii import copy import os import sys import pickle import sqlite3 import zlib from collections import OrderedDict from sqlitedict import SqliteDict from swh.model import identifiers -OS_PATH_SEP = os.path.sep.encode('utf-8') +OS_PATH_SEP = os.path.sep.encode("utf-8") def _encode(obj): - return sqlite3.Binary(zlib.compress( - pickle.dumps(obj, pickle.HIGHEST_PROTOCOL))) + return sqlite3.Binary(zlib.compress(pickle.dumps(obj, pickle.HIGHEST_PROTOCOL))) def _decode(obj): - return pickle.loads( - zlib.decompress(bytes(obj))) + return pickle.loads(zlib.decompress(bytes(obj))) class SimpleBlob: """Stores basic metadata of a blob object.when constructing deep trees from commit file manifests. args: file_hash: unique hash of the file contents is_symlink: (bool) is this file a symlink? file_perms: (string) 3 digit permission code as a string or bytestring, e.g. '755' or b'755' """ - kind = 'file' + kind = "file" def __init__(self, file_hash, is_symlink, file_perms): self.hash = file_hash self.perms = 0o100000 + int(file_perms, 8) if is_symlink: self.perms += 0o020000 def __str__(self): - return ('SimpleBlob: ' + str(self.hash) + ' -- ' + str(self.perms)) + return "SimpleBlob: " + str(self.hash) + " -- " + str(self.perms) def __eq__(self, other): - return ((self.perms == other.perms) and (self.hash == other.hash)) + return (self.perms == other.perms) and (self.hash == other.hash) def size(self): """Return the size in byte.""" return sys.getsizeof(self) + sys.getsizeof(self.__dict__) class SimpleTree(dict): """ Stores data for a nested directory object. Uses shallow cloning to stay compact after forking and change monitoring for efficient re-hashing. """ - kind = 'dir' + kind = "dir" perms = 0o040000 def __init__(self): self.hash = None self._size = None def __eq__(self, other): - return ((self.hash == other.hash) and (self.items() == other.items())) + return (self.hash == other.hash) and (self.items() == other.items()) def _new_tree_node(self, path): """Deeply nests SimpleTrees according to a given subdirectory path and returns a reference to the deepest one. args: path: bytestring containing a relative path from self to a deep subdirectory. e.g. b'foodir/bardir/bazdir' returns: the new node """ node = self for d in path.split(OS_PATH_SEP): if node.get(d): if node[d].hash is not None: node[d] = copy.copy(node[d]) node[d].hash = None node[d]._size = None else: node[d] = SimpleTree() node = node[d] return node def remove_tree_node_for_path(self, path): """Deletes a SimpleBlob or SimpleTree from inside nested SimpleTrees according to the given relative file path, and then recursively removes any newly depopulated SimpleTrees. It keeps the old history by doing a shallow clone before any change. args: path: bytestring containing a relative path from self to a nested file or directory. e.g. b'foodir/bardir/bazdir/quxfile.txt' returns: the new root node """ node = self if node.hash is not None: node = copy.copy(node) node.hash = None node._size = None first, sep, rest = path.partition(OS_PATH_SEP) if rest: node[first] = node[first].remove_tree_node_for_path(rest) if len(node[first]) == 0: del node[first] else: del node[first] return node def add_blob(self, file_path, file_hash, is_symlink, file_perms): """Shallow clones the root node and then deeply nests a SimpleBlob inside nested SimpleTrees according to the given file path, shallow cloning all all intermediate nodes and marking them as changed and in need of new hashes. args: file_path: bytestring containing the relative path from self to a nested file file_hash: primary identifying hash computed from the blob contents is_symlink: True/False whether this item is a symbolic link file_perms: int or string representation of file permissions returns: the new root node """ root = self if root.hash is not None: root = copy.copy(root) root.hash = None root._size = None node = root fdir, fbase = os.path.split(file_path) if fdir: node = root._new_tree_node(fdir) node[fbase] = SimpleBlob(file_hash, is_symlink, file_perms) return root def yield_swh_directories(self): """Converts nested SimpleTrees into a stream of SWH Directories. yields: an SWH Directory for every node in the tree """ for k, v in sorted(self.items()): if isinstance(v, SimpleTree): yield from v.yield_swh_directories() yield { - 'id': self.hash, - 'entries': [ - { - 'name': k, - 'perms': v.perms, - 'type': v.kind, - 'target': v.hash - } + "id": self.hash, + "entries": [ + {"name": k, "perms": v.perms, "type": v.kind, "target": v.hash} for k, v in sorted(self.items()) - ] + ], } def hash_changed(self, new_dirs=None): """Computes and sets primary identifier hashes for unhashed subtrees. args: new_dirs (optional): an empty list to be populated with the SWH Directories for all of the new (not previously hashed) nodes returns: the top level hash of the whole tree """ if self.hash is None: directory = { - 'entries': [ + "entries": [ { - 'name': k, - 'perms': v.perms, - 'type': v.kind, - 'target': (v.hash if v.hash is not None - else v.hash_changed(new_dirs)) + "name": k, + "perms": v.perms, + "type": v.kind, + "target": ( + v.hash if v.hash is not None else v.hash_changed(new_dirs) + ), } for k, v in sorted(self.items()) ] } - self.hash = binascii.unhexlify( - identifiers.directory_identifier(directory) - ) - directory['id'] = self.hash + self.hash = binascii.unhexlify(identifiers.directory_identifier(directory)) + directory["id"] = self.hash if new_dirs is not None: new_dirs.append(directory) return self.hash def flatten(self, _curpath=None, _files=None): """Converts nested sub-SimpleTrees and SimpleBlobs into a list of file paths. Useful for counting the number of files in a manifest. returns: a flat list of all of the contained file paths """ - _curpath = _curpath or b'' + _curpath = _curpath or b"" _files = _files or {} for k, v in sorted(self.items()): p = os.path.join(_curpath, k) if isinstance(v, SimpleBlob): _files[p] = (v.hash, v.perms) else: v.flatten(p, _files) return _files def size(self): """Return the (approximate?) memory utilization in bytes of the nested structure. """ if self._size is None: self._size = ( - sys.getsizeof(self) + sys.getsizeof(self.__dict__) - + sum([ - sys.getsizeof(k)+v.size() - for k, v in self.items() - ]) + sys.getsizeof(self) + + sys.getsizeof(self.__dict__) + + sum([sys.getsizeof(k) + v.size() for k, v in self.items()]) ) return self._size class SelectiveCache(OrderedDict): """Special cache for storing past data upon which new data is known to be dependent. Optional hinting of how many instances of which keys will be needed down the line makes utilization more efficient. And, because the distance between related data can be arbitrarily long and the data fragments can be arbitrarily large, a disk-based secondary storage is used if the primary RAM-based storage area is filled to the designated capacity. Storage is occupied in three phases: 1) The most recent key/value pair is always held, regardless of other factors, until the next entry replaces it. 2) Stored key/value pairs are pushed into a randomly accessible expanding buffer in memory with a stored size function, maximum size value, and special hinting about which keys to store for how long optionally declared at instantiation. 3) The in-memory buffer pickles into a randomly accessible disk-backed secondary buffer when it becomes full. Occupied space is calculated by default as whatever the len() function returns on the values being stored. This can be changed by passing in a new size_function at instantiation. The cache_hints parameter is a dict of key/int pairs recording how many subsequent fetches that particular key's value should stay in storage for before being erased. If you provide a set of hints and then try to store a key that is not in that set of hints, the cache will store it only while it is the most recent entry, and will bypass storage phases 2 and 3. """ - DEFAULT_SIZE = 800*1024*1024 # bytes or whatever - def __init__(self, max_size=None, cache_hints=None, - size_function=None, filename=None): + DEFAULT_SIZE = 800 * 1024 * 1024 # bytes or whatever + + def __init__( + self, max_size=None, cache_hints=None, size_function=None, filename=None + ): """ args: max_size: integer value indicating the maximum size of the part of storage held in memory cache_hints: dict of key/int pairs as described in the class description size_function: callback function that accepts one parameter and returns one int, which should probably be the calculated size of the parameter """ self._max_size = max_size or SelectiveCache.DEFAULT_SIZE self._disk = None if size_function is None: self._size_function = sys.getsizeof else: self._size_function = size_function self._latest = None self._cache_size = 0 self._cache_hints = copy.copy(cache_hints) or None self.filename = filename def store(self, key, data): """Primary method for putting data into the cache. args: key: any hashable value data: any python object (preferably one that is measurable) """ self._latest = (key, data) if (self._cache_hints is not None) and (key not in self._cache_hints): return # cache the completed data... self._cache_size += self._size_function(data) + 53 # ...but limit memory expenditure for the cache by offloading to disk should_commit = False - while ( - self._cache_size > self._max_size - and len(self) > 0 - ): + while self._cache_size > self._max_size and len(self) > 0: should_commit = True k, v = self.popitem(last=False) self._cache_size -= self._size_function(v) - 53 self._diskstore(k, v) if should_commit: self._disk.commit(blocking=False) self[key] = data def _diskstore(self, key, value): if self._disk is None: self._disk = SqliteDict( - autocommit=False, journal_mode='OFF', - filename=self.filename, tablename='swh', - encode=_encode, decode=_decode) + autocommit=False, + journal_mode="OFF", + filename=self.filename, + tablename="swh", + encode=_encode, + decode=_decode, + ) self._disk.in_temp = True # necessary to force the disk clean up self._disk[key] = value def has(self, key): """Tests whether the data for the provided key is being stored. args: key: the key of the data whose storage membership property you wish to discover returns: True or False """ return ( (self._latest and self._latest[0] == key) or (key in self) or (self._disk and (key in self._disk)) ) def fetch(self, key): """Pulls a value out of storage and decrements the hint counter for the given key. args: key: the key of the data that you want to retrieve returns: the retrieved value or None """ retval = None if self._latest and self._latest[0] == key: retval = self._latest[1] if retval is None: retval = self.get(key) if (retval is None) and self._disk: self._disk.commit(blocking=False) retval = self._disk.get(key) or None self.dereference(key) return retval def dereference(self, key): """Remove one instance of expected future retrieval of the data for the given key. This is called automatically by fetch requests that aren't satisfied by phase 1 of storage. args: the key of the data for which the future retrievals hint is to be decremented """ newref = self._cache_hints and self._cache_hints.get(key) if newref: newref -= 1 if newref == 0: del self._cache_hints[key] if key in self: item = self[key] self._cache_size -= self._size_function(item) del self[key] else: if self._disk: del self._disk[key] else: self._cache_hints[key] = newref def keys(self): yield from self.keys() if self._disk: yield from self._disk.keys() def values(self): yield from self.values() if self._disk: yield from self._disk.values() def items(self): yield from self.items() if self._disk: yield from self._disk.items() diff --git a/swh/loader/mercurial/tasks.py b/swh/loader/mercurial/tasks.py index b269315..6b07dca 100644 --- a/swh/loader/mercurial/tasks.py +++ b/swh/loader/mercurial/tasks.py @@ -1,33 +1,33 @@ # Copyright (C) 2017-2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from celery import shared_task from .loader import HgBundle20Loader, HgArchiveBundle20Loader -@shared_task(name=__name__ + '.LoadMercurial') +@shared_task(name=__name__ + ".LoadMercurial") def load_hg(*, url, directory=None, visit_date=None): """Mercurial repository loading Import a mercurial tarball into swh. Args: see :func:`DepositLoader.load`. """ - loader = HgBundle20Loader( - url, directory=directory, visit_date=visit_date) + loader = HgBundle20Loader(url, directory=directory, visit_date=visit_date) return loader.load() -@shared_task(name=__name__ + '.LoadArchiveMercurial') +@shared_task(name=__name__ + ".LoadArchiveMercurial") def load_hg_from_archive(*, url, archive_path=None, visit_date=None): """Import a mercurial tarball into swh. Args: see :func:`DepositLoader.load`. """ loader = HgArchiveBundle20Loader( - url, archive_path=archive_path, visit_date=visit_date) + url, archive_path=archive_path, visit_date=visit_date + ) return loader.load() diff --git a/swh/loader/mercurial/tests/common.py b/swh/loader/mercurial/tests/common.py index b0f0247..8da06d9 100644 --- a/swh/loader/mercurial/tests/common.py +++ b/swh/loader/mercurial/tests/common.py @@ -1,55 +1,50 @@ # Copyright (C) 2018-2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information -from swh.loader.mercurial.loader import ( - HgBundle20Loader, HgArchiveBundle20Loader -) +from swh.loader.mercurial.loader import HgBundle20Loader, HgArchiveBundle20Loader _LOADER_TEST_CONFIG = { - 'bundle_filename': 'HG20_none_bundle', - 'cache1_size': 838860800, - 'cache2_size': 838860800, - 'clone_timeout_seconds': 2 * 3600, - 'log_db': 'dbname=softwareheritage-log', - 'reduce_effort': False, - 'save_data': False, - 'save_data_path': '', - 'max_content_size': 104857600, - 'storage': { - 'cls': 'memory', - }, - 'temp_directory': '/tmp/swh.loader.mercurial' + "bundle_filename": "HG20_none_bundle", + "cache1_size": 838860800, + "cache2_size": 838860800, + "clone_timeout_seconds": 2 * 3600, + "log_db": "dbname=softwareheritage-log", + "reduce_effort": False, + "save_data": False, + "save_data_path": "", + "max_content_size": 104857600, + "storage": {"cls": "memory",}, + "temp_directory": "/tmp/swh.loader.mercurial", } class BaseHgLoaderMemoryStorage: """The base mercurial loader to test. Mixin behavior changed to: - use an in-memory storage - not use the default configuration loading mechanism At the end of the tests, you can make sure you have the rights objects. """ def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.origin_id = 1 self.visit = 1 def parse_config_file(self, *args, **kwargs): return _LOADER_TEST_CONFIG class HgLoaderMemoryStorage(BaseHgLoaderMemoryStorage, HgBundle20Loader): pass -class HgArchiveLoaderMemoryStorage(BaseHgLoaderMemoryStorage, - HgArchiveBundle20Loader): +class HgArchiveLoaderMemoryStorage(BaseHgLoaderMemoryStorage, HgArchiveBundle20Loader): pass diff --git a/swh/loader/mercurial/tests/conftest.py b/swh/loader/mercurial/tests/conftest.py index 3891ac4..b30c4ab 100644 --- a/swh/loader/mercurial/tests/conftest.py +++ b/swh/loader/mercurial/tests/conftest.py @@ -1,10 +1,10 @@ import pytest from swh.scheduler.tests.conftest import * # noqa -@pytest.fixture(scope='session') # type: ignore # expected redefinition +@pytest.fixture(scope="session") # type: ignore # expected redefinition def celery_includes(): return [ - 'swh.loader.mercurial.tasks', + "swh.loader.mercurial.tasks", ] diff --git a/swh/loader/mercurial/tests/test_converters.py b/swh/loader/mercurial/tests/test_converters.py index b3d50e7..ed484f2 100644 --- a/swh/loader/mercurial/tests/test_converters.py +++ b/swh/loader/mercurial/tests/test_converters.py @@ -1,76 +1,67 @@ # Copyright (C) 2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import unittest from swh.loader.mercurial import converters class TestParseAuthorConverters(unittest.TestCase): def test_parse_author_no_email(self): self.assertIsNone(converters.parse_author(None)) def test_parse_author_no_bracket(self): - actual_author = converters.parse_author(b'someone') + actual_author = converters.parse_author(b"someone") - self.assertEqual(actual_author, { - 'name': None, - 'email': None, - 'fullname': b'someone' - }) + self.assertEqual( + actual_author, {"name": None, "email": None, "fullname": b"someone"} + ) def test_parse_author_2(self): - actual_author = converters.parse_author(b'something wicked') + actual_author = converters.parse_author(b"something >wicked") - self.assertEqual(actual_author, { - 'name': None, - 'email': None, - 'fullname': b'something >wicked' - }) + self.assertEqual( + actual_author, + {"name": None, "email": None, "fullname": b"something >wicked"}, + ) def test_parse_author_4(self): - actual_author = converters.parse_author(b'something <') + actual_author = converters.parse_author(b"something <") - self.assertEqual(actual_author, { - 'name': b'something', - 'email': None, - 'fullname': b'something <' - }) + self.assertEqual( + actual_author, + {"name": b"something", "email": None, "fullname": b"something <"}, + ) def test_parse_author_5(self): - actual_author = converters.parse_author(b'') + actual_author = converters.parse_author(b"") - self.assertEqual(actual_author, { - 'name': None, - 'email': b'only', - 'fullname': b'' - }) + self.assertEqual( + actual_author, {"name": None, "email": b"only", "fullname": b""} + ) def test_parse_author_6(self): - actual_author = converters.parse_author(b' ') + actual_author = converters.parse_author(b" ") - self.assertEqual(actual_author, { - 'name': b' ', - 'email': b'something', - 'fullname': b' ' - }) + self.assertEqual( + actual_author, + {"name": b" ", "email": b"something", "fullname": b" "}, + ) def test_parse_author_normal(self): - actual_author = converters.parse_author(b'someone ') + actual_author = converters.parse_author(b"someone ") - self.assertEqual(actual_author, { - 'name': b'someone', - 'email': b'awesome', - 'fullname': b'someone ' - }) + self.assertEqual( + actual_author, + {"name": b"someone", "email": b"awesome", "fullname": b"someone "}, + ) diff --git a/swh/loader/mercurial/tests/test_loader.py b/swh/loader/mercurial/tests/test_loader.py index 95d4dd3..3e9f4e2 100644 --- a/swh/loader/mercurial/tests/test_loader.py +++ b/swh/loader/mercurial/tests/test_loader.py @@ -1,360 +1,354 @@ # Copyright (C) 2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import logging import os import time from unittest.mock import patch import hglib import pytest from swh.loader.core.tests import BaseLoaderTest from swh.storage.algos.snapshot import snapshot_get_all_branches from .common import HgLoaderMemoryStorage, HgArchiveLoaderMemoryStorage from ..loader import HgBundle20Loader, CloneTimeoutError class BaseHgLoaderTest(BaseLoaderTest): """Mixin base loader test to prepare the mercurial repository to uncompress, load and test the results. """ - def setUp(self, archive_name='the-sandbox.tgz', filename='the-sandbox', - uncompress_archive=True): - super().setUp(archive_name=archive_name, filename=filename, - prefix_tmp_folder_name='swh.loader.mercurial.', - start_path=os.path.dirname(__file__), - uncompress_archive=uncompress_archive) + + def setUp( + self, + archive_name="the-sandbox.tgz", + filename="the-sandbox", + uncompress_archive=True, + ): + super().setUp( + archive_name=archive_name, + filename=filename, + prefix_tmp_folder_name="swh.loader.mercurial.", + start_path=os.path.dirname(__file__), + uncompress_archive=uncompress_archive, + ) class WithoutReleaseLoaderTest(BaseHgLoaderTest): """Load a mercurial repository without release """ + def setUp(self, *args, **kwargs): super().setUp(*args, **kwargs) self.loader = HgLoaderMemoryStorage( url=self.repo_url, - visit_date='2016-05-03 15:16:32+00', - directory=self.destination_path) + visit_date="2016-05-03 15:16:32+00", + directory=self.destination_path, + ) self.storage = self.loader.storage def test_load(self): """Load a repository with multiple branches results in 1 snapshot Another visit with no change in between result in uneventful visit """ # when self.loader.load() # then self.assertCountContents(2) self.assertCountDirectories(3) self.assertCountReleases(0) self.assertCountRevisions(58) - tip_revision_develop = 'a9c4534552df370f43f0ef97146f393ef2f2a08c' - tip_revision_default = '70e750bb046101fdced06f428e73fee471509c56' + tip_revision_develop = "a9c4534552df370f43f0ef97146f393ef2f2a08c" + tip_revision_default = "70e750bb046101fdced06f428e73fee471509c56" # same from rev 3 onward - directory_hash = '180bd57623a7c2c47a8c43514a5f4d903503d0aa' + directory_hash = "180bd57623a7c2c47a8c43514a5f4d903503d0aa" # cf. test_loader.org for explaining from where those hashes # come from expected_revisions = { # revision hash | directory hash # noqa - 'aafb69fd7496ca617f741d38c40808ff2382aabe': 'e2e117569b086ceabeeedee4acd95f35298d4553', # noqa - 'b6932cb7f59e746899e4804f3d496126d1343615': '9cd8160c67ac4b0bc97e2e2cd918a580425167d3', # noqa + "aafb69fd7496ca617f741d38c40808ff2382aabe": "e2e117569b086ceabeeedee4acd95f35298d4553", # noqa + "b6932cb7f59e746899e4804f3d496126d1343615": "9cd8160c67ac4b0bc97e2e2cd918a580425167d3", # noqa tip_revision_default: directory_hash, - '18012a93d5aadc331c468dac84b524430f4abc19': directory_hash, - 'bec4c0a31b0b2502f44f34aeb9827cd090cca621': directory_hash, - '5f4eba626c3f826820c4475d2d81410759ec911b': directory_hash, - 'dcba06661c607fe55ec67b1712d153b69f65e38c': directory_hash, - 'c77e776d22548d47a8d96463a3556172776cd59b': directory_hash, - '61d762d65afb3150e2653d6735068241779c1fcf': directory_hash, - '40def747398c76ceec1bd248e3a6cb2a52e22dc5': directory_hash, - '6910964416438ca8d1698f6295871d727c4d4851': directory_hash, - 'be44d5e6cc66580f59c108f8bff5911ee91a22e4': directory_hash, - 'c4a95d5097519dedac437fddf0ef775136081241': directory_hash, - '32eb0354a660128e205bf7c3a84b46040ef70d92': directory_hash, - 'dafa445964230e808148db043c126063ea1dc9b6': directory_hash, - 'a41e2a548ba51ee47f22baad8e88994853d3e2f5': directory_hash, - 'dc3e3ab7fe257d04769528e5e17ad9f1acb44659': directory_hash, - 'd2164061453ecb03d4347a05a77db83f706b8e15': directory_hash, - '34192ceef239b8b72141efcc58b1d7f1676a18c9': directory_hash, - '2652147529269778757d96e09aaf081695548218': directory_hash, - '4d640e8064fe69b4c851dfd43915c431e80c7497': directory_hash, - 'c313df50bfcaa773dcbe038d00f8bd770ba997f8': directory_hash, - '769db00b34b9e085dc699c8f1550c95793d0e904': directory_hash, - '2973e5dc9568ac491b198f6b7f10c44ddc04e0a3': directory_hash, - 'be34b8c7857a6c04e41cc06b26338d8e59cb2601': directory_hash, - '24f45e41637240b7f9e16d2791b5eacb4a406d0f': directory_hash, - '62ff4741eac1821190f6c2cdab7c8a9d7db64ad0': directory_hash, - 'c346f6ff7f42f2a8ff867f92ab83a6721057d86c': directory_hash, - 'f2afbb94b319ef5d60823859875284afb95dcc18': directory_hash, - '4e2dc6d6073f0b6d348f84ded52f9143b10344b9': directory_hash, - '31cd7c5f669868651c57e3a2ba25ac45f76fa5cf': directory_hash, - '25f5b27dfa5ed15d336188ef46bef743d88327d4': directory_hash, - '88b80615ed8561be74a700b92883ec0374ddacb0': directory_hash, - '5ee9ea92ed8cc1737b7670e39dab6081c64f2598': directory_hash, - 'dcddcc32740d2de0e1403e21a5c4ed837b352992': directory_hash, - '74335db9f45a5d1c8133ff7a7db5ed7a8d4a197b': directory_hash, - 'cb36b894129ca7910bb81c457c72d69d5ff111bc': directory_hash, - 'caef0cb155eb6c55215aa59aabe04a9c702bbe6a': directory_hash, - '5017ce0b285351da09a2029ea2cf544f79b593c7': directory_hash, - '17a62618eb6e91a1d5d8e1246ccedae020d3b222': directory_hash, - 'a1f000fb8216838aa2a120738cc6c7fef2d1b4d8': directory_hash, - '9f82d95bd3edfb7f18b1a21d6171170395ea44ce': directory_hash, - 'a701d39a17a9f48c61a06eee08bd9ac0b8e3838b': directory_hash, - '4ef794980f820d44be94b2f0d53eb34d4241638c': directory_hash, - 'ddecbc16f4c916c39eacfcb2302e15a9e70a231e': directory_hash, - '3565e7d385af0745ec208d719e469c2f58be8e94': directory_hash, - 'c875bad563a73a25c5f3379828b161b1441a7c5d': directory_hash, - '94be9abcf9558213ff301af0ecd8223451ce991d': directory_hash, - '1ee770fd10ea2d8c4f6e68a1dbe79378a86611e0': directory_hash, - '553b09724bd30d9691b290e157b27a73e2d3e537': directory_hash, - '9e912851eb64e3a1e08fbb587de7a4c897ce5a0a': directory_hash, - '9c9e0ff08f215a5a5845ce3dbfc5b48c8050bdaf': directory_hash, - 'db9e625ba90056304897a94c92e5d27bc60f112d': directory_hash, - '2d4a801c9a9645fcd3a9f4c06418d8393206b1f3': directory_hash, - 'e874cd5967efb1f45282e9f5ce87cc68a898a6d0': directory_hash, - 'e326a7bbb5bc00f1d8cacd6108869dedef15569c': directory_hash, - '3ed4b85d30401fe32ae3b1d650f215a588293a9e': directory_hash, + "18012a93d5aadc331c468dac84b524430f4abc19": directory_hash, + "bec4c0a31b0b2502f44f34aeb9827cd090cca621": directory_hash, + "5f4eba626c3f826820c4475d2d81410759ec911b": directory_hash, + "dcba06661c607fe55ec67b1712d153b69f65e38c": directory_hash, + "c77e776d22548d47a8d96463a3556172776cd59b": directory_hash, + "61d762d65afb3150e2653d6735068241779c1fcf": directory_hash, + "40def747398c76ceec1bd248e3a6cb2a52e22dc5": directory_hash, + "6910964416438ca8d1698f6295871d727c4d4851": directory_hash, + "be44d5e6cc66580f59c108f8bff5911ee91a22e4": directory_hash, + "c4a95d5097519dedac437fddf0ef775136081241": directory_hash, + "32eb0354a660128e205bf7c3a84b46040ef70d92": directory_hash, + "dafa445964230e808148db043c126063ea1dc9b6": directory_hash, + "a41e2a548ba51ee47f22baad8e88994853d3e2f5": directory_hash, + "dc3e3ab7fe257d04769528e5e17ad9f1acb44659": directory_hash, + "d2164061453ecb03d4347a05a77db83f706b8e15": directory_hash, + "34192ceef239b8b72141efcc58b1d7f1676a18c9": directory_hash, + "2652147529269778757d96e09aaf081695548218": directory_hash, + "4d640e8064fe69b4c851dfd43915c431e80c7497": directory_hash, + "c313df50bfcaa773dcbe038d00f8bd770ba997f8": directory_hash, + "769db00b34b9e085dc699c8f1550c95793d0e904": directory_hash, + "2973e5dc9568ac491b198f6b7f10c44ddc04e0a3": directory_hash, + "be34b8c7857a6c04e41cc06b26338d8e59cb2601": directory_hash, + "24f45e41637240b7f9e16d2791b5eacb4a406d0f": directory_hash, + "62ff4741eac1821190f6c2cdab7c8a9d7db64ad0": directory_hash, + "c346f6ff7f42f2a8ff867f92ab83a6721057d86c": directory_hash, + "f2afbb94b319ef5d60823859875284afb95dcc18": directory_hash, + "4e2dc6d6073f0b6d348f84ded52f9143b10344b9": directory_hash, + "31cd7c5f669868651c57e3a2ba25ac45f76fa5cf": directory_hash, + "25f5b27dfa5ed15d336188ef46bef743d88327d4": directory_hash, + "88b80615ed8561be74a700b92883ec0374ddacb0": directory_hash, + "5ee9ea92ed8cc1737b7670e39dab6081c64f2598": directory_hash, + "dcddcc32740d2de0e1403e21a5c4ed837b352992": directory_hash, + "74335db9f45a5d1c8133ff7a7db5ed7a8d4a197b": directory_hash, + "cb36b894129ca7910bb81c457c72d69d5ff111bc": directory_hash, + "caef0cb155eb6c55215aa59aabe04a9c702bbe6a": directory_hash, + "5017ce0b285351da09a2029ea2cf544f79b593c7": directory_hash, + "17a62618eb6e91a1d5d8e1246ccedae020d3b222": directory_hash, + "a1f000fb8216838aa2a120738cc6c7fef2d1b4d8": directory_hash, + "9f82d95bd3edfb7f18b1a21d6171170395ea44ce": directory_hash, + "a701d39a17a9f48c61a06eee08bd9ac0b8e3838b": directory_hash, + "4ef794980f820d44be94b2f0d53eb34d4241638c": directory_hash, + "ddecbc16f4c916c39eacfcb2302e15a9e70a231e": directory_hash, + "3565e7d385af0745ec208d719e469c2f58be8e94": directory_hash, + "c875bad563a73a25c5f3379828b161b1441a7c5d": directory_hash, + "94be9abcf9558213ff301af0ecd8223451ce991d": directory_hash, + "1ee770fd10ea2d8c4f6e68a1dbe79378a86611e0": directory_hash, + "553b09724bd30d9691b290e157b27a73e2d3e537": directory_hash, + "9e912851eb64e3a1e08fbb587de7a4c897ce5a0a": directory_hash, + "9c9e0ff08f215a5a5845ce3dbfc5b48c8050bdaf": directory_hash, + "db9e625ba90056304897a94c92e5d27bc60f112d": directory_hash, + "2d4a801c9a9645fcd3a9f4c06418d8393206b1f3": directory_hash, + "e874cd5967efb1f45282e9f5ce87cc68a898a6d0": directory_hash, + "e326a7bbb5bc00f1d8cacd6108869dedef15569c": directory_hash, + "3ed4b85d30401fe32ae3b1d650f215a588293a9e": directory_hash, tip_revision_develop: directory_hash, } self.assertRevisionsContain(expected_revisions) self.assertCountSnapshots(1) expected_snapshot = { - 'id': '3b8fe58e467deb7597b12a5fd3b2c096b8c02028', - 'branches': { - 'develop': { - 'target': tip_revision_develop, - 'target_type': 'revision' - }, - 'default': { - 'target': tip_revision_default, - 'target_type': 'revision' - }, - 'HEAD': { - 'target': 'develop', - 'target_type': 'alias', - } - } + "id": "3b8fe58e467deb7597b12a5fd3b2c096b8c02028", + "branches": { + "develop": {"target": tip_revision_develop, "target_type": "revision"}, + "default": {"target": tip_revision_default, "target_type": "revision"}, + "HEAD": {"target": "develop", "target_type": "alias",}, + }, } self.assertSnapshotEqual(expected_snapshot) - self.assertEqual(self.loader.load_status(), {'status': 'eventful'}) - self.assertEqual(self.loader.visit_status(), 'full') + self.assertEqual(self.loader.load_status(), {"status": "eventful"}) + self.assertEqual(self.loader.visit_status(), "full") # second visit with no changes in the mercurial repository # since the first one self.loader.load() - self.assertEqual(self.loader.load_status(), {'status': 'uneventful'}) - self.assertEqual(self.loader.visit_status(), 'full') + self.assertEqual(self.loader.load_status(), {"status": "uneventful"}) + self.assertEqual(self.loader.visit_status(), "full") class CommonHgLoaderData: def assert_data_ok(self): # then self.assertCountContents(3) self.assertCountDirectories(3) self.assertCountReleases(1) self.assertCountRevisions(3) - tip_release = '515c4d72e089404356d0f4b39d60f948b8999140' + tip_release = "515c4d72e089404356d0f4b39d60f948b8999140" self.assertReleasesContain([tip_release]) - tip_revision_default = 'c3dbe4fbeaaa98dd961834e4007edb3efb0e2a27' + tip_revision_default = "c3dbe4fbeaaa98dd961834e4007edb3efb0e2a27" # cf. test_loader.org for explaining from where those hashes # come from expected_revisions = { # revision hash | directory hash # noqa - '93b48d515580522a05f389bec93227fc8e43d940': '43d727f2f3f2f7cb3b098ddad1d7038464a4cee2', # noqa - '8dd3db5d5519e4947f035d141581d304565372d2': 'b3f85f210ff86d334575f64cb01c5bf49895b63e', # noqa - tip_revision_default: '8f2be433c945384c85920a8e60f2a68d2c0f20fb', + "93b48d515580522a05f389bec93227fc8e43d940": "43d727f2f3f2f7cb3b098ddad1d7038464a4cee2", # noqa + "8dd3db5d5519e4947f035d141581d304565372d2": "b3f85f210ff86d334575f64cb01c5bf49895b63e", # noqa + tip_revision_default: "8f2be433c945384c85920a8e60f2a68d2c0f20fb", } self.assertRevisionsContain(expected_revisions) self.assertCountSnapshots(1) expected_snapshot = { - 'id': 'd35668e02e2ba4321dc951cd308cf883786f918a', - 'branches': { - 'default': { - 'target': tip_revision_default, - 'target_type': 'revision' - }, - '0.1': { - 'target': tip_release, - 'target_type': 'release' - }, - 'HEAD': { - 'target': 'default', - 'target_type': 'alias', - } - } + "id": "d35668e02e2ba4321dc951cd308cf883786f918a", + "branches": { + "default": {"target": tip_revision_default, "target_type": "revision"}, + "0.1": {"target": tip_release, "target_type": "release"}, + "HEAD": {"target": "default", "target_type": "alias",}, + }, } self.assertSnapshotEqual(expected_snapshot) - self.assertEqual(self.loader.load_status(), {'status': 'eventful'}) - self.assertEqual(self.loader.visit_status(), 'full') + self.assertEqual(self.loader.load_status(), {"status": "eventful"}) + self.assertEqual(self.loader.visit_status(), "full") class WithReleaseLoaderTest(BaseHgLoaderTest, CommonHgLoaderData): """Load a mercurial repository with release """ + def setUp(self): - super().setUp(archive_name='hello.tgz', filename='hello') + super().setUp(archive_name="hello.tgz", filename="hello") self.loader = HgLoaderMemoryStorage( url=self.repo_url, - visit_date='2016-05-03 15:16:32+00', - directory=self.destination_path + visit_date="2016-05-03 15:16:32+00", + directory=self.destination_path, ) self.storage = self.loader.storage def test_load(self): """Load a repository with tags results in 1 snapshot """ # when self.loader.load() self.assert_data_ok() class ArchiveLoaderTest(BaseHgLoaderTest, CommonHgLoaderData): """Load a mercurial repository archive with release """ + def setUp(self): - super().setUp(archive_name='hello.tgz', filename='hello', - uncompress_archive=False) + super().setUp( + archive_name="hello.tgz", filename="hello", uncompress_archive=False + ) self.loader = HgArchiveLoaderMemoryStorage( url=self.repo_url, - visit_date='2016-05-03 15:16:32+00', - archive_path=self.destination_path + visit_date="2016-05-03 15:16:32+00", + archive_path=self.destination_path, ) self.storage = self.loader.storage def test_load(self): """Load a mercurial repository archive with tags results in 1 snapshot """ # when self.loader.load() self.assert_data_ok() - @patch('swh.loader.mercurial.archive_extract.patoolib') + @patch("swh.loader.mercurial.archive_extract.patoolib") def test_load_with_failure(self, mock_patoo): mock_patoo.side_effect = ValueError # when r = self.loader.load() - self.assertEqual(r, {'status': 'failed'}) + self.assertEqual(r, {"status": "failed"}) self.assertCountContents(0) self.assertCountDirectories(0) self.assertCountRevisions(0) self.assertCountReleases(0) self.assertCountSnapshots(0) class WithTransplantLoaderTest(BaseHgLoaderTest): """Load a mercurial repository where transplant operations have been used. """ + def setUp(self): - super().setUp(archive_name='transplant.tgz', filename='transplant') + super().setUp(archive_name="transplant.tgz", filename="transplant") self.loader = HgLoaderMemoryStorage( url=self.repo_url, - visit_date='2019-05-23 12:06:00+00', - directory=self.destination_path + visit_date="2019-05-23 12:06:00+00", + directory=self.destination_path, ) self.storage = self.loader.storage def test_load(self): # load hg repository self.loader.load() # collect swh revisions - origin_url = self.storage.origin_get([ - {'type': 'hg', 'url': self.repo_url}])[0]['url'] - visit = self.storage.origin_visit_get_latest( - origin_url, require_snapshot=True) + origin_url = self.storage.origin_get([{"type": "hg", "url": self.repo_url}])[0][ + "url" + ] + visit = self.storage.origin_visit_get_latest(origin_url, require_snapshot=True) revisions = [] - snapshot = snapshot_get_all_branches(self.storage, visit['snapshot']) - for branch in snapshot['branches'].values(): - if branch['target_type'] != 'revision': + snapshot = snapshot_get_all_branches(self.storage, visit["snapshot"]) + for branch in snapshot["branches"].values(): + if branch["target_type"] != "revision": continue - revisions.append(branch['target']) + revisions.append(branch["target"]) # extract original changesets info and the transplant sources hg_changesets = set() transplant_sources = set() for rev in self.storage.revision_log(revisions): - hg_changesets.add(rev['metadata']['node']) - for k, v in rev['metadata']['extra_headers']: - if k == 'transplant_source': - transplant_sources.add(v.decode('ascii')) + hg_changesets.add(rev["metadata"]["node"]) + for k, v in rev["metadata"]["extra_headers"]: + if k == "transplant_source": + transplant_sources.add(v.decode("ascii")) # check extracted data are valid self.assertTrue(len(hg_changesets) > 0) self.assertTrue(len(transplant_sources) > 0) self.assertTrue(transplant_sources.issubset(hg_changesets)) def test_clone_with_timeout_timeout(caplog, tmp_path, monkeypatch): - log = logging.getLogger('test_clone_with_timeout') + log = logging.getLogger("test_clone_with_timeout") def clone_timeout(source, dest): time.sleep(60) monkeypatch.setattr(hglib, "clone", clone_timeout) with pytest.raises(CloneTimeoutError): HgBundle20Loader.clone_with_timeout( - log, 'https://www.mercurial-scm.org/repo/hello', tmp_path, 1 + log, "https://www.mercurial-scm.org/repo/hello", tmp_path, 1 ) for record in caplog.records: - assert record.levelname == 'WARNING' - assert ( - 'https://www.mercurial-scm.org/repo/hello' in record.getMessage() - ) - assert record.args == ('https://www.mercurial-scm.org/repo/hello', 1) + assert record.levelname == "WARNING" + assert "https://www.mercurial-scm.org/repo/hello" in record.getMessage() + assert record.args == ("https://www.mercurial-scm.org/repo/hello", 1) def test_clone_with_timeout_returns(caplog, tmp_path, monkeypatch): - log = logging.getLogger('test_clone_with_timeout') + log = logging.getLogger("test_clone_with_timeout") def clone_return(source, dest): return (source, dest) monkeypatch.setattr(hglib, "clone", clone_return) assert HgBundle20Loader.clone_with_timeout( - log, 'https://www.mercurial-scm.org/repo/hello', tmp_path, 1 - ) == ('https://www.mercurial-scm.org/repo/hello', tmp_path) + log, "https://www.mercurial-scm.org/repo/hello", tmp_path, 1 + ) == ("https://www.mercurial-scm.org/repo/hello", tmp_path) def test_clone_with_timeout_exception(caplog, tmp_path, monkeypatch): - log = logging.getLogger('test_clone_with_timeout') + log = logging.getLogger("test_clone_with_timeout") def clone_return(source, dest): - raise ValueError('Test exception') + raise ValueError("Test exception") monkeypatch.setattr(hglib, "clone", clone_return) with pytest.raises(ValueError) as excinfo: HgBundle20Loader.clone_with_timeout( - log, 'https://www.mercurial-scm.org/repo/hello', tmp_path, 1 + log, "https://www.mercurial-scm.org/repo/hello", tmp_path, 1 ) - assert 'Test exception' in excinfo.value.args[0] + assert "Test exception" in excinfo.value.args[0] diff --git a/swh/loader/mercurial/tests/test_tasks.py b/swh/loader/mercurial/tests/test_tasks.py index 0c44cd3..e4cfd1b 100644 --- a/swh/loader/mercurial/tests/test_tasks.py +++ b/swh/loader/mercurial/tests/test_tasks.py @@ -1,45 +1,43 @@ # Copyright (C) 2018-2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information def test_loader(mocker, swh_app, celery_session_worker): - mock_loader = mocker.patch( - 'swh.loader.mercurial.loader.HgBundle20Loader.load') - mock_loader.return_value = {'status': 'eventful'} + mock_loader = mocker.patch("swh.loader.mercurial.loader.HgBundle20Loader.load") + mock_loader.return_value = {"status": "eventful"} res = swh_app.send_task( - 'swh.loader.mercurial.tasks.LoadMercurial', - kwargs={ - 'url': 'origin_url', - 'directory': '/some/repo', - 'visit_date': 'now', - }) + "swh.loader.mercurial.tasks.LoadMercurial", + kwargs={"url": "origin_url", "directory": "/some/repo", "visit_date": "now",}, + ) assert res res.wait() assert res.successful() - assert res.result == {'status': 'eventful'} + assert res.result == {"status": "eventful"} mock_loader.assert_called_once_with() def test_archive_loader(mocker, swh_app, celery_session_worker): mock_loader = mocker.patch( - 'swh.loader.mercurial.loader.HgArchiveBundle20Loader.load') - mock_loader.return_value = {'status': 'uneventful'} + "swh.loader.mercurial.loader.HgArchiveBundle20Loader.load" + ) + mock_loader.return_value = {"status": "uneventful"} res = swh_app.send_task( - 'swh.loader.mercurial.tasks.LoadArchiveMercurial', + "swh.loader.mercurial.tasks.LoadArchiveMercurial", kwargs={ - 'url': 'another_url', - 'archive_path': '/some/tar.tgz', - 'visit_date': 'now', - }) + "url": "another_url", + "archive_path": "/some/tar.tgz", + "visit_date": "now", + }, + ) assert res res.wait() assert res.successful() - assert res.result == {'status': 'uneventful'} + assert res.result == {"status": "uneventful"} mock_loader.assert_called_once_with() diff --git a/tox.ini b/tox.ini index fb3c99d..a1387d5 100644 --- a/tox.ini +++ b/tox.ini @@ -1,27 +1,34 @@ [tox] -envlist=flake8,mypy,py3 +envlist=black,flake8,mypy,py3 [testenv] extras = testing deps = pytest-cov commands = pytest --cov={envsitepackagesdir}/swh/loader/mercurial \ {envsitepackagesdir}/swh/loader/mercurial \ --cov-branch {posargs} +[testenv:black] +skip_install = true +deps = + black +commands = + {envpython} -m black --check swh + [testenv:flake8] skip_install = true deps = flake8 commands = {envpython} -m flake8 [testenv:mypy] extras = testing deps = mypy commands = mypy swh