diff --git a/swh/web/tests/data.py b/swh/web/tests/data.py index 873205b3..0dc59c76 100644 --- a/swh/web/tests/data.py +++ b/swh/web/tests/data.py @@ -1,369 +1,367 @@ # Copyright (C) 2018-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information from copy import deepcopy from datetime import timedelta import os import random import time from typing import Dict, List, Optional, Set +from swh.core.config import merge_configs from swh.indexer.ctags import CtagsIndexer from swh.indexer.fossology_license import FossologyLicenseIndexer from swh.indexer.mimetype import MimetypeIndexer from swh.indexer.storage import get_indexer_storage from swh.loader.git.from_disk import GitLoaderFromArchive from swh.model.hashutil import DEFAULT_ALGORITHMS, hash_to_bytes, hash_to_hex from swh.model.model import Content, Directory, Origin, OriginVisit, OriginVisitStatus from swh.search import get_search from swh.storage.algos.dir_iterators import dir_iterator from swh.storage.algos.snapshot import snapshot_get_latest from swh.storage.interface import Sha1 from swh.storage.utils import now from swh.web import config from swh.web.browse.utils import ( _re_encode_content, get_mimetype_and_encoding_for_content, prepare_content_for_display, ) from swh.web.common import service # Module used to initialize data that will be provided as tests input # Configuration for git loader _TEST_LOADER_CONFIG = { "storage": {"cls": "memory",}, "save_data": False, "max_content_size": 100 * 1024 * 1024, } # Base content indexer configuration _TEST_INDEXER_BASE_CONFIG = { "storage": {"cls": "memory"}, "objstorage": {"cls": "memory", "args": {},}, "indexer_storage": {"cls": "memory", "args": {},}, } def random_sha1(): return hash_to_hex(bytes(random.randint(0, 255) for _ in range(20))) def random_sha256(): return hash_to_hex(bytes(random.randint(0, 255) for _ in range(32))) def random_blake2s256(): return hash_to_hex(bytes(random.randint(0, 255) for _ in range(32))) def random_content(): return { "sha1": random_sha1(), "sha1_git": random_sha1(), "sha256": random_sha256(), "blake2s256": random_blake2s256(), } -# MimetypeIndexer with custom configuration for tests -class _MimetypeIndexer(MimetypeIndexer): - def parse_config_file(self, *args, **kwargs): - return { - **_TEST_INDEXER_BASE_CONFIG, - "tools": { - "name": "file", - "version": "1:5.30-1+deb9u1", - "configuration": {"type": "library", "debian-package": "python3-magic"}, - }, +_TEST_MIMETYPE_INDEXER_CONFIG = merge_configs( + _TEST_INDEXER_BASE_CONFIG, + { + "tools": { + "name": "file", + "version": "1:5.30-1+deb9u1", + "configuration": {"type": "library", "debian-package": "python3-magic"}, } + }, +) -# FossologyLicenseIndexer with custom configuration for tests -class _FossologyLicenseIndexer(FossologyLicenseIndexer): - def parse_config_file(self, *args, **kwargs): - return { - **_TEST_INDEXER_BASE_CONFIG, - "workdir": "/tmp/swh/indexer.fossology.license", - "tools": { - "name": "nomos", - "version": "3.1.0rc2-31-ga2cbb8c", - "configuration": {"command_line": "nomossa ",}, - }, - } +_TEST_LICENSE_INDEXER_CONFIG = merge_configs( + _TEST_INDEXER_BASE_CONFIG, + { + "workdir": "/tmp/swh/indexer.fossology.license", + "tools": { + "name": "nomos", + "version": "3.1.0rc2-31-ga2cbb8c", + "configuration": {"command_line": "nomossa ",}, + }, + }, +) -# CtagsIndexer with custom configuration for tests -class _CtagsIndexer(CtagsIndexer): - def parse_config_file(self, *args, **kwargs): - return { - **_TEST_INDEXER_BASE_CONFIG, - "workdir": "/tmp/swh/indexer.ctags", - "languages": {"c": "c"}, - "tools": { - "name": "universal-ctags", - "version": "~git7859817b", - "configuration": { - "command_line": """ctags --fields=+lnz --sort=no --links=no """ - """--output-format=json """ - }, +_TEST_CTAGS_INDEXER_CONFIG = merge_configs( + _TEST_INDEXER_BASE_CONFIG, + { + "workdir": "/tmp/swh/indexer.ctags", + "languages": {"c": "c"}, + "tools": { + "name": "universal-ctags", + "version": "~git7859817b", + "configuration": { + "command_line": """ctags --fields=+lnz --sort=no --links=no """ + """--output-format=json """ }, - } + }, + }, +) # Lightweight git repositories that will be loaded to generate # input data for tests _TEST_ORIGINS = [ { "type": "git", "url": "https://github.com/wcoder/highlightjs-line-numbers.js", "archives": [ "highlightjs-line-numbers.js.zip", "highlightjs-line-numbers.js_visit2.zip", ], }, { "type": "git", "url": "https://github.com/memononen/libtess2", "archives": ["libtess2.zip"], }, { "type": "git", "url": "repo_with_submodules", "archives": ["repo_with_submodules.tgz"], }, ] _contents = {} def _add_extra_contents(storage, contents): pbm_image_data = b"""P1 # PBM example 24 7 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 1 1 1 1 0 0 1 1 1 1 0 0 1 1 1 1 0 0 1 1 1 1 0 0 1 0 0 0 0 0 1 0 0 0 0 0 1 0 0 0 0 0 1 0 0 1 0 0 1 1 1 0 0 0 1 1 1 0 0 0 1 1 1 0 0 0 1 1 1 1 0 0 1 0 0 0 0 0 1 0 0 0 0 0 1 0 0 0 0 0 1 0 0 0 0 0 1 0 0 0 0 0 1 1 1 1 0 0 1 1 1 1 0 0 1 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0""" # add file with mimetype image/x-portable-bitmap in the archive content pbm_content = Content.from_data(pbm_image_data) storage.content_add([pbm_content]) contents.add(pbm_content.sha1) # Tests data initialization def _init_tests_data(): # To hold reference to the memory storage storage = None # Create search instance search = get_search("memory") search.initialize() search.origin_update({"url": origin["url"]} for origin in _TEST_ORIGINS) # Load git repositories from archives for origin in _TEST_ORIGINS: for i, archive in enumerate(origin["archives"]): if i > 0: # ensure visit dates will be different when simulating # multiple visits of an origin time.sleep(1) origin_repo_archive = os.path.join( os.path.dirname(__file__), "resources/repos/%s" % archive ) loader = GitLoaderFromArchive( origin["url"], archive_path=origin_repo_archive, config=_TEST_LOADER_CONFIG, ) if storage is None: storage = loader.storage else: loader.storage = storage loader.load() ori = storage.origin_get([origin["url"]])[0] origin.update(ori.to_dict()) # add an 'id' key if enabled search.origin_update([{"url": origin["url"], "has_visits": True}]) for i in range(250): url = "https://many.origins/%d" % (i + 1) # storage.origin_add([{'url': url}]) storage.origin_add([Origin(url=url)]) search.origin_update([{"url": url, "has_visits": True}]) date = now() visit = OriginVisit(origin=url, date=date, type="tar") visit = storage.origin_visit_add([visit])[0] visit_status = OriginVisitStatus( origin=url, visit=visit.visit, date=date + timedelta(minutes=1), status="full", snapshot=hash_to_bytes("1a8893e6a86f444e8be8e7bda6cb34fb1735a00e"), ) storage.origin_visit_status_add([visit_status]) sha1s: Set[Sha1] = set() directories = set() revisions = set() releases = set() snapshots = set() content_path = {} # Get all objects loaded into the test archive for origin in _TEST_ORIGINS: snp = snapshot_get_latest(storage, origin["url"]) snapshots.add(hash_to_hex(snp.id)) for branch_name, branch_data in snp.branches.items(): target_type = branch_data.target_type.value if target_type == "revision": revisions.add(branch_data.target) elif target_type == "release": release = storage.release_get([branch_data.target])[0] revisions.add(release.target) releases.add(hash_to_hex(branch_data.target)) for rev_log in storage.revision_shortlog(set(revisions)): rev_id = rev_log[0] revisions.add(rev_id) for rev in storage.revision_get(revisions): if rev is None: continue dir_id = rev.directory directories.add(hash_to_hex(dir_id)) for entry in dir_iterator(storage, dir_id): if entry["type"] == "file": sha1s.add(entry["sha1"]) content_path[entry["sha1"]] = "/".join( [hash_to_hex(dir_id), entry["path"].decode("utf-8")] ) elif entry["type"] == "dir": directories.add(hash_to_hex(entry["target"])) _add_extra_contents(storage, sha1s) # Get all checksums for each content result: List[Optional[Content]] = storage.content_get(list(sha1s)) contents: List[Dict] = [] for content in result: assert content is not None sha1 = hash_to_hex(content.sha1) content_metadata = { algo: hash_to_hex(getattr(content, algo)) for algo in DEFAULT_ALGORITHMS } path = "" if content.sha1 in content_path: path = content_path[content.sha1] cnt_data = storage.content_get_data(content.sha1) assert cnt_data is not None mimetype, encoding = get_mimetype_and_encoding_for_content(cnt_data) _, _, cnt_data = _re_encode_content(mimetype, encoding, cnt_data) content_display_data = prepare_content_for_display(cnt_data, mimetype, path) content_metadata.update( { "path": path, "mimetype": mimetype, "encoding": encoding, "hljs_language": content_display_data["language"], "data": content_display_data["content_data"], } ) _contents[sha1] = content_metadata contents.append(content_metadata) # Create indexer storage instance that will be shared by indexers idx_storage = get_indexer_storage("memory", {}) # Add the empty directory to the test archive storage.directory_add([Directory(entries=())]) # Return tests data return { "search": search, "storage": storage, "idx_storage": idx_storage, "origins": _TEST_ORIGINS, "contents": contents, "directories": list(directories), "releases": list(releases), "revisions": list(map(hash_to_hex, revisions)), "snapshots": list(snapshots), "generated_checksums": set(), } def _init_indexers(tests_data): # Instantiate content indexers that will be used in tests # and force them to use the memory storages indexers = {} - for idx_name, idx_class in ( - ("mimetype_indexer", _MimetypeIndexer), - ("license_indexer", _FossologyLicenseIndexer), - ("ctags_indexer", _CtagsIndexer), + for idx_name, idx_class, idx_config in ( + ("mimetype_indexer", MimetypeIndexer, _TEST_MIMETYPE_INDEXER_CONFIG), + ("license_indexer", FossologyLicenseIndexer, _TEST_LICENSE_INDEXER_CONFIG), + ("ctags_indexer", CtagsIndexer, _TEST_CTAGS_INDEXER_CONFIG), ): - idx = idx_class() + idx = idx_class(config=idx_config) idx.storage = tests_data["storage"] idx.objstorage = tests_data["storage"].objstorage idx.idx_storage = tests_data["idx_storage"] idx.register_tools(idx.config["tools"]) indexers[idx_name] = idx return indexers def get_content(content_sha1): return _contents.get(content_sha1) _tests_data = None _current_tests_data = None _indexer_loggers = {} def get_tests_data(reset=False): """ Initialize tests data and return them in a dict. """ global _tests_data, _current_tests_data if _tests_data is None: _tests_data = _init_tests_data() indexers = _init_indexers(_tests_data) for (name, idx) in indexers.items(): # pytest makes the loggers use a temporary file; and deepcopy # requires serializability. So we remove them, and add them # back after the copy. _indexer_loggers[name] = idx.log del idx.log _tests_data.update(indexers) if reset or _current_tests_data is None: _current_tests_data = deepcopy(_tests_data) for (name, logger) in _indexer_loggers.items(): _current_tests_data[name].log = logger return _current_tests_data def override_storages(storage, idx_storage, search): """ Helper function to replace the storages from which archive data are fetched. """ swh_config = config.get_config() swh_config.update( {"storage": storage, "indexer_storage": idx_storage, "search": search,} ) service.storage = storage service.idx_storage = idx_storage service.search = search