diff --git a/swh/web/tests/conftest.py b/swh/web/tests/conftest.py index 44b8646d..106e8f48 100644 --- a/swh/web/tests/conftest.py +++ b/swh/web/tests/conftest.py @@ -1,307 +1,309 @@ # Copyright (C) 2018-2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information import inspect import json import os import shutil import sys from subprocess import run, PIPE import pytest from django.core.cache import cache from hypothesis import settings, HealthCheck from rest_framework.test import APIClient, APIRequestFactory from swh.model.hashutil import ALGORITHMS, hash_to_bytes from swh.web.common import converters from swh.web.tests.data import get_tests_data, override_storages # Used to skip some tests ctags_json_missing = ( shutil.which('ctags') is None or b'+json' not in run(['ctags', '--version'], stdout=PIPE).stdout ) fossology_missing = shutil.which('nomossa') is None # Register some hypothesis profiles settings.register_profile('default', settings()) settings.register_profile( 'swh-web', settings(deadline=None, suppress_health_check=[HealthCheck.too_slow, HealthCheck.filter_too_much])) settings.register_profile( 'swh-web-fast', settings(deadline=None, max_examples=1, suppress_health_check=[HealthCheck.too_slow, HealthCheck.filter_too_much])) def pytest_configure(config): # Small hack in order to be able to run the unit tests # without static assets generated by webpack. # Those assets are not really needed for the Python tests # but the django templates will fail to load due to missing # generated file webpack-stats.json describing the js and css # files to include. # So generate a dummy webpack-stats.json file to overcome # that issue. test_dir = os.path.dirname(__file__) # location of the static folder when running tests through tox static_dir = os.path.join(sys.prefix, 'share/swh/web/static') if not os.path.exists(static_dir): # location of the static folder when running tests locally with pytest static_dir = os.path.join(test_dir, '../../../static') webpack_stats = os.path.join(static_dir, 'webpack-stats.json') if os.path.exists(webpack_stats): return bundles_dir = os.path.join(test_dir, '../assets/src/bundles') _, dirs, _ = next(os.walk(bundles_dir)) mock_webpack_stats = { 'status': 'done', 'publicPath': '/static', 'chunks': {} } for bundle in dirs: asset = 'js/%s.js' % bundle mock_webpack_stats['chunks'][bundle] = [{ 'name': asset, 'publicPath': '/static/%s' % asset, 'path': os.path.join(static_dir, asset) }] with open(webpack_stats, 'w') as outfile: json.dump(mock_webpack_stats, outfile) # Clear Django cache before each test @pytest.fixture(autouse=True) def django_cache_cleared(): cache.clear() # Alias rf fixture from pytest-django @pytest.fixture def request_factory(rf): return rf # Fixture to get test client from Django REST Framework @pytest.fixture(scope='module') def api_client(): return APIClient() # Fixture to get API request factory from Django REST Framework @pytest.fixture(scope='module') def api_request_factory(): return APIRequestFactory() # Initialize tests data @pytest.fixture(autouse=True) def tests_data(): data = get_tests_data(reset=True) # Update swh-web configuration to use the in-memory storages # instantiated in the tests.data module override_storages(data['storage'], data['idx_storage'], data['search']) return data # Fixture to manipulate data from a sample archive used in the tests @pytest.fixture def archive_data(tests_data): return _ArchiveData(tests_data) # Fixture to manipulate indexer data from a sample archive used in the tests @pytest.fixture def indexer_data(tests_data): return _IndexerData(tests_data) # Custom data directory for requests_mock @pytest.fixture def datadir(): return os.path.join(os.path.abspath(os.path.dirname(__file__)), 'resources') class _ArchiveData: """ Helper class to manage data from a sample test archive. It is initialized with a reference to an in-memory storage containing raw tests data. It is basically a proxy to Storage interface but it overrides some methods to retrieve those tests data in a json serializable format in order to ease tests implementation. """ def __init__(self, tests_data): self.storage = tests_data['storage'] def _call_storage_method(method): def call_storage_method(*args, **kwargs): return method(*args, **kwargs) return call_storage_method # Forward calls to non overridden Storage methods to wrapped # storage instance for method_name, method in inspect.getmembers( self.storage, predicate=inspect.ismethod): if (not hasattr(self, method_name) and not method_name.startswith('_')): setattr(self, method_name, _call_storage_method(method)) def content_find(self, content): cnt_ids_bytes = {algo_hash: hash_to_bytes(content[algo_hash]) for algo_hash in ALGORITHMS if content.get(algo_hash)} cnt = self.storage.content_find(cnt_ids_bytes) return converters.from_content(cnt[0]) if cnt else cnt def content_get_metadata(self, cnt_id): cnt_id_bytes = hash_to_bytes(cnt_id) - metadata = next(self.storage.content_get_metadata([cnt_id_bytes])) - return converters.from_swh(metadata, + metadata = self.storage.content_get_metadata([cnt_id_bytes]) + contents = metadata[cnt_id_bytes] + content = None if not contents else contents[0] + return converters.from_swh(content, hashess={'sha1', 'sha1_git', 'sha256', 'blake2s256'}) def content_get(self, cnt_id): cnt_id_bytes = hash_to_bytes(cnt_id) cnt = next(self.storage.content_get([cnt_id_bytes])) return converters.from_content(cnt) def directory_get(self, dir_id): return { 'id': dir_id, 'content': self.directory_ls(dir_id) } def directory_ls(self, dir_id): cnt_id_bytes = hash_to_bytes(dir_id) dir_content = map(converters.from_directory_entry, self.storage.directory_ls(cnt_id_bytes)) return list(dir_content) def release_get(self, rel_id): rel_id_bytes = hash_to_bytes(rel_id) rel_data = next(self.storage.release_get([rel_id_bytes])) return converters.from_release(rel_data) def revision_get(self, rev_id): rev_id_bytes = hash_to_bytes(rev_id) rev_data = next(self.storage.revision_get([rev_id_bytes])) return converters.from_revision(rev_data) def revision_log(self, rev_id, limit=None): rev_id_bytes = hash_to_bytes(rev_id) return list(map(converters.from_revision, self.storage.revision_log([rev_id_bytes], limit=limit))) def snapshot_get_latest(self, origin_url): snp = self.storage.snapshot_get_latest(origin_url) return converters.from_snapshot(snp) def origin_get(self, origin_info): origin = self.storage.origin_get(origin_info) return converters.from_origin(origin) def origin_visit_get(self, origin_url): visits = self.storage.origin_visit_get(origin_url) return list(map(converters.from_origin_visit, visits)) def origin_visit_get_by(self, origin_url, visit_id): visit = self.storage.origin_visit_get_by(origin_url, visit_id) return converters.from_origin_visit(visit) def snapshot_get(self, snapshot_id): snp = self.storage.snapshot_get(hash_to_bytes(snapshot_id)) return converters.from_snapshot(snp) def snapshot_get_branches(self, snapshot_id, branches_from='', branches_count=1000, target_types=None): snp = self.storage.snapshot_get_branches( hash_to_bytes(snapshot_id), branches_from.encode(), branches_count, target_types) return converters.from_snapshot(snp) def snapshot_get_head(self, snapshot): if snapshot['branches']['HEAD']['target_type'] == 'alias': target = snapshot['branches']['HEAD']['target'] head = snapshot['branches'][target]['target'] else: head = snapshot['branches']['HEAD']['target'] return head class _IndexerData: """ Helper class to manage indexer tests data It is initialized with a reference to an in-memory indexer storage containing raw tests data. It also defines class methods to retrieve those tests data in a json serializable format in order to ease tests implementation. """ def __init__(self, tests_data): self.idx_storage = tests_data['idx_storage'] self.mimetype_indexer = tests_data['mimetype_indexer'] self.license_indexer = tests_data['license_indexer'] self.ctags_indexer = tests_data['ctags_indexer'] def content_add_mimetype(self, cnt_id): self.mimetype_indexer.run([hash_to_bytes(cnt_id)], 'update-dups') def content_get_mimetype(self, cnt_id): mimetype = next(self.idx_storage.content_mimetype_get( [hash_to_bytes(cnt_id)])) return converters.from_filetype(mimetype) def content_add_language(self, cnt_id): raise NotImplementedError('Language indexer is disabled.') self.language_indexer.run([hash_to_bytes(cnt_id)], 'update-dups') def content_get_language(self, cnt_id): lang = next(self.idx_storage.content_language_get( [hash_to_bytes(cnt_id)])) return converters.from_swh(lang, hashess={'id'}) def content_add_license(self, cnt_id): self.license_indexer.run([hash_to_bytes(cnt_id)], 'update-dups') def content_get_license(self, cnt_id): cnt_id_bytes = hash_to_bytes(cnt_id) lic = next(self.idx_storage.content_fossology_license_get( [cnt_id_bytes])) return converters.from_swh({'id': cnt_id_bytes, 'facts': lic[cnt_id_bytes]}, hashess={'id'}) def content_add_ctags(self, cnt_id): self.ctags_indexer.run([hash_to_bytes(cnt_id)], 'update-dups') def content_get_ctags(self, cnt_id): cnt_id_bytes = hash_to_bytes(cnt_id) ctags = self.idx_storage.content_ctags_get([cnt_id_bytes]) for ctag in ctags: yield converters.from_swh(ctag, hashess={'id'}) diff --git a/swh/web/tests/data.py b/swh/web/tests/data.py index 3dcc4769..9370b1e7 100644 --- a/swh/web/tests/data.py +++ b/swh/web/tests/data.py @@ -1,479 +1,481 @@ # Copyright (C) 2018-2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information import os import random from copy import deepcopy from typing import Dict from rest_framework.decorators import api_view from rest_framework.response import Response from swh.indexer.fossology_license import FossologyLicenseIndexer from swh.indexer.mimetype import MimetypeIndexer from swh.indexer.ctags import CtagsIndexer from swh.indexer.storage import get_indexer_storage from swh.model.from_disk import Directory from swh.model.hashutil import hash_to_hex, hash_to_bytes, DEFAULT_ALGORITHMS from swh.model.identifiers import directory_identifier from swh.loader.git.from_disk import GitLoaderFromArchive from swh.search import get_search from swh.storage.algos.dir_iterators import dir_iterator from swh.web import config from swh.web.browse.utils import ( get_mimetype_and_encoding_for_content, prepare_content_for_display ) from swh.web.common import service from swh.web.common.highlightjs import get_hljs_language_from_filename # Module used to initialize data that will be provided as tests input # Configuration for git loader _TEST_LOADER_CONFIG = { 'storage': { 'cls': 'memory' }, 'save_data': False, 'max_content_size': 100 * 1024 * 1024, } # Base content indexer configuration _TEST_INDEXER_BASE_CONFIG = { 'storage': { 'cls': 'memory' }, 'objstorage': { 'cls': 'memory', 'args': {}, }, 'indexer_storage': { 'cls': 'memory', 'args': {}, } } def random_sha1(): return hash_to_hex(bytes(random.randint(0, 255) for _ in range(20))) def random_sha256(): return hash_to_hex(bytes(random.randint(0, 255) for _ in range(32))) def random_blake2s256(): return hash_to_hex(bytes(random.randint(0, 255) for _ in range(32))) def random_content(): return { 'sha1': random_sha1(), 'sha1_git': random_sha1(), 'sha256': random_sha256(), 'blake2s256': random_blake2s256(), } # MimetypeIndexer with custom configuration for tests class _MimetypeIndexer(MimetypeIndexer): def parse_config_file(self, *args, **kwargs): return { **_TEST_INDEXER_BASE_CONFIG, 'tools': { 'name': 'file', 'version': '1:5.30-1+deb9u1', 'configuration': { "type": "library", "debian-package": "python3-magic" } } } # FossologyLicenseIndexer with custom configuration for tests class _FossologyLicenseIndexer(FossologyLicenseIndexer): def parse_config_file(self, *args, **kwargs): return { **_TEST_INDEXER_BASE_CONFIG, 'workdir': '/tmp/swh/indexer.fossology.license', 'tools': { 'name': 'nomos', 'version': '3.1.0rc2-31-ga2cbb8c', 'configuration': { 'command_line': 'nomossa ', }, } } # CtagsIndexer with custom configuration for tests class _CtagsIndexer(CtagsIndexer): def parse_config_file(self, *args, **kwargs): return { **_TEST_INDEXER_BASE_CONFIG, 'workdir': '/tmp/swh/indexer.ctags', 'languages': {'c': 'c'}, 'tools': { 'name': 'universal-ctags', 'version': '~git7859817b', 'configuration': { 'command_line': '''ctags --fields=+lnz --sort=no --links=no ''' # noqa '''--output-format=json ''' }, } } # Lightweight git repositories that will be loaded to generate # input data for tests _TEST_ORIGINS = [ { 'type': 'git', 'url': 'https://github.com/wcoder/highlightjs-line-numbers.js', 'archives': ['highlightjs-line-numbers.js.zip', 'highlightjs-line-numbers.js_visit2.zip'], 'visit_date': ['Dec 1 2018, 01:00 UTC', 'Jan 20 2019, 15:00 UTC'] }, { 'type': 'git', 'url': 'https://github.com/memononen/libtess2', 'archives': ['libtess2.zip'], 'visit_date': ['May 25 2018, 01:00 UTC'] }, { 'type': 'git', 'url': 'repo_with_submodules', 'archives': ['repo_with_submodules.tgz'], 'visit_date': ['Jan 1 2019, 01:00 UTC'] } ] _contents = {} # Tests data initialization def _init_tests_data(): # To hold reference to the memory storage storage = None # Create search instance search = get_search('memory', {}) search.initialize() search.origin_update({'url': origin['url']} for origin in _TEST_ORIGINS) # Load git repositories from archives for origin in _TEST_ORIGINS: for i, archive in enumerate(origin['archives']): origin_repo_archive = \ os.path.join(os.path.dirname(__file__), 'resources/repos/%s' % archive) loader = GitLoaderFromArchive(origin['url'], archive_path=origin_repo_archive, config=_TEST_LOADER_CONFIG, visit_date=origin['visit_date'][i]) if storage is None: storage = loader.storage else: loader.storage = storage loader.load() origin.update(storage.origin_get(origin)) # add an 'id' key if enabled search.origin_update([{'url': origin['url'], 'has_visits': True}]) for i in range(250): url = 'https://many.origins/%d' % (i+1) storage.origin_add([{'url': url}]) search.origin_update([{'url': url, 'has_visits': True}]) visit = storage.origin_visit_add(url, '2019-12-03 13:55:05', 'tar') storage.origin_visit_update( url, visit['visit'], snapshot='1a8893e6a86f444e8be8e7bda6cb34fb1735a00e') contents = set() directories = set() revisions = set() releases = set() snapshots = set() content_path = {} # Get all objects loaded into the test archive for origin in _TEST_ORIGINS: snp = storage.snapshot_get_latest(origin['url']) snapshots.add(hash_to_hex(snp['id'])) for branch_name, branch_data in snp['branches'].items(): if branch_data['target_type'] == 'revision': revisions.add(branch_data['target']) elif branch_data['target_type'] == 'release': release = next(storage.release_get([branch_data['target']])) revisions.add(release['target']) releases.add(hash_to_hex(branch_data['target'])) for rev_log in storage.revision_shortlog(set(revisions)): rev_id = rev_log[0] revisions.add(rev_id) for rev in storage.revision_get(revisions): dir_id = rev['directory'] directories.add(hash_to_hex(dir_id)) for entry in dir_iterator(storage, dir_id): content_path[entry['sha1']] = '/'.join( [hash_to_hex(dir_id), entry['path'].decode('utf-8')]) if entry['type'] == 'file': contents.add(entry['sha1']) elif entry['type'] == 'dir': directories.add(hash_to_hex(entry['target'])) # Get all checksums for each content - contents_metadata = storage.content_get_metadata(contents) + result = storage.content_get_metadata(contents) contents = [] - for content_metadata in contents_metadata: - contents.append({ - algo: hash_to_hex(content_metadata[algo]) - for algo in DEFAULT_ALGORITHMS - }) - path = content_path[content_metadata['sha1']] - cnt = next(storage.content_get([content_metadata['sha1']])) - mimetype, encoding = get_mimetype_and_encoding_for_content(cnt['data']) - content_display_data = prepare_content_for_display( - cnt['data'], mimetype, path) - contents[-1]['path'] = path - contents[-1]['mimetype'] = mimetype - contents[-1]['encoding'] = encoding - contents[-1]['hljs_language'] = content_display_data['language'] - contents[-1]['data'] = content_display_data['content_data'] - _contents[contents[-1]['sha1']] = contents[-1] + for sha1, contents_metadata in result.items(): + for content_metadata in contents_metadata: + contents.append({ + algo: hash_to_hex(content_metadata[algo]) + for algo in DEFAULT_ALGORITHMS + }) + path = content_path[sha1] + cnt = next(storage.content_get([sha1])) + mimetype, encoding = get_mimetype_and_encoding_for_content( + cnt['data']) + content_display_data = prepare_content_for_display( + cnt['data'], mimetype, path) + contents[-1]['path'] = path + contents[-1]['mimetype'] = mimetype + contents[-1]['encoding'] = encoding + contents[-1]['hljs_language'] = content_display_data['language'] + contents[-1]['data'] = content_display_data['content_data'] + _contents[contents[-1]['sha1']] = contents[-1] # Create indexer storage instance that will be shared by indexers idx_storage = get_indexer_storage('memory', {}) # Add the empty directory to the test archive empty_dir_id = directory_identifier({'entries': []}) empty_dir_id_bin = hash_to_bytes(empty_dir_id) storage.directory_add([{'id': empty_dir_id_bin, 'entries': []}]) # Return tests data return { 'search': search, 'storage': storage, 'idx_storage': idx_storage, 'origins': _TEST_ORIGINS, 'contents': contents, 'directories': list(directories), 'releases': list(releases), 'revisions': list(map(hash_to_hex, revisions)), 'snapshots': list(snapshots), 'generated_checksums': set(), } def _init_indexers(tests_data): # Instantiate content indexers that will be used in tests # and force them to use the memory storages indexers = {} for idx_name, idx_class in (('mimetype_indexer', _MimetypeIndexer), ('license_indexer', _FossologyLicenseIndexer), ('ctags_indexer', _CtagsIndexer)): idx = idx_class() idx.storage = tests_data['storage'] idx.objstorage = tests_data['storage'].objstorage idx.idx_storage = tests_data['idx_storage'] idx.register_tools(idx.config['tools']) indexers[idx_name] = idx return indexers def get_content(content_sha1): return _contents.get(content_sha1) _tests_data = None _current_tests_data = None _indexer_loggers = {} def get_tests_data(reset=False): """ Initialize tests data and return them in a dict. """ global _tests_data, _current_tests_data if _tests_data is None: _tests_data = _init_tests_data() indexers = _init_indexers(_tests_data) for (name, idx) in indexers.items(): # pytest makes the loggers use a temporary file; and deepcopy # requires serializability. So we remove them, and add them # back after the copy. _indexer_loggers[name] = idx.log del idx.log _tests_data.update(indexers) if reset or _current_tests_data is None: _current_tests_data = deepcopy(_tests_data) for (name, logger) in _indexer_loggers.items(): _current_tests_data[name].log = logger return _current_tests_data def override_storages(storage, idx_storage, search): """ Helper function to replace the storages from which archive data are fetched. """ swh_config = config.get_config() swh_config.update({ 'storage': storage, 'indexer_storage': idx_storage, 'search': search, }) service.storage = storage service.idx_storage = idx_storage service.search = search # Implement some special endpoints used to provide input tests data # when executing end to end tests with cypress _content_code_data_exts = {} # type: Dict[str, Dict[str, str]] _content_code_data_filenames = {} # type: Dict[str, Dict[str, str]] _content_other_data_exts = {} # type: Dict[str, Dict[str, str]] def _init_content_tests_data(data_path, data_dict, ext_key): """ Helper function to read the content of a directory, store it into a test archive and add some files metadata (sha1 and/or expected programming language) in a dict. Args: data_path (str): path to a directory relative to the tests folder of swh-web data_dict (dict): the dict that will store files metadata ext_key (bool): whether to use file extensions or filenames as dict keys """ test_contents_dir = os.path.join( os.path.dirname(__file__), data_path).encode('utf-8') directory = Directory.from_disk(path=test_contents_dir, data=True, save_path=True) objects = directory.collect() for c in objects['content'].values(): c['status'] = 'visible' sha1 = hash_to_hex(c['sha1']) if ext_key: key = c['path'].decode('utf-8').split('.')[-1] filename = 'test.' + key else: filename = c['path'].decode('utf-8').split('/')[-1] key = filename language = get_hljs_language_from_filename(filename) data_dict[key] = {'sha1': sha1, 'language': language} del c['path'] del c['perms'] storage = get_tests_data()['storage'] storage.content_add(objects['content'].values()) def _init_content_code_data_exts(): """ Fill a global dictionary which maps source file extension to a code content example. """ global _content_code_data_exts _init_content_tests_data('resources/contents/code/extensions', _content_code_data_exts, True) def _init_content_other_data_exts(): """ Fill a global dictionary which maps a file extension to a content example. """ global _content_other_data_exts _init_content_tests_data('resources/contents/other/extensions', _content_other_data_exts, True) def _init_content_code_data_filenames(): """ Fill a global dictionary which maps a filename to a content example. """ global _content_code_data_filenames _init_content_tests_data('resources/contents/code/filenames', _content_code_data_filenames, False) if config.get_config()['e2e_tests_mode']: _init_content_code_data_exts() _init_content_other_data_exts() _init_content_code_data_filenames() @api_view(['GET']) def get_content_code_data_all_exts(request): """ Endpoint implementation returning a list of all source file extensions to test for highlighting using cypress. """ return Response(sorted(_content_code_data_exts.keys()), status=200, content_type='application/json') @api_view(['GET']) def get_content_code_data_by_ext(request, ext): """ Endpoint implementation returning metadata of a code content example based on the source file extension. """ data = None status = 404 if ext in _content_code_data_exts: data = _content_code_data_exts[ext] status = 200 return Response(data, status=status, content_type='application/json') @api_view(['GET']) def get_content_other_data_by_ext(request, ext): """ Endpoint implementation returning metadata of a content example based on the file extension. """ _init_content_other_data_exts() data = None status = 404 if ext in _content_other_data_exts: data = _content_other_data_exts[ext] status = 200 return Response(data, status=status, content_type='application/json') @api_view(['GET']) def get_content_code_data_all_filenames(request): """ Endpoint implementation returning a list of all source filenames to test for highlighting using cypress. """ return Response(sorted(_content_code_data_filenames.keys()), status=200, content_type='application/json') @api_view(['GET']) def get_content_code_data_by_filename(request, filename): """ Endpoint implementation returning metadata of a code content example based on the source filename. """ data = None status = 404 if filename in _content_code_data_filenames: data = _content_code_data_filenames[filename] status = 200 return Response(data, status=status, content_type='application/json')