diff --git a/swh/web/api/views/identifiers.py b/swh/web/api/views/identifiers.py index 4cef36ec..cc8c6567 100644 --- a/swh/web/api/views/identifiers.py +++ b/swh/web/api/views/identifiers.py @@ -1,63 +1,101 @@ # Copyright (C) 2018-2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information -from swh.web.common import service -from swh.web.common.utils import resolve_swh_persistent_id +from swh.web.common import service, utils +from swh.web.common.utils import ( + resolve_swh_persistent_id, + get_persistent_identifier +) from swh.web.api.apidoc import api_doc, format_docstring from swh.web.api.apiurls import api_route @api_route(r'/resolve/(?P.*)/', 'api-1-resolve-swh-pid') @api_doc('/resolve/') @format_docstring() def api_resolve_swh_pid(request, swh_id): """ .. http:get:: /api/1/resolve/(swh_id)/ Resolve a Software Heritage persistent identifier. Try to resolve a provided `persistent identifier `_ into an url for browsing the pointed archive object. If the provided identifier is valid, the existence of the object in the archive will also be checked. :param string swh_id: a Software Heritage persistent identifier :>json string browse_url: the url for browsing the pointed object :>json object metadata: object holding optional parts of the persistent identifier :>json string namespace: the persistent identifier namespace :>json string object_id: the hash identifier of the pointed object :>json string object_type: the type of the pointed object :>json number scheme_version: the scheme version of the persistent identifier {common_headers} **Allowed HTTP Methods:** :http:method:`get`, :http:method:`head`, :http:method:`options` :statuscode 200: no error :statuscode 400: an invalid persistent identifier has been provided :statuscode 404: the pointed object does not exist in the archive **Example:** .. parsed-literal:: :swh_web_api:`resolve/swh:1:rev:96db9023b881d7cd9f379b0c154650d6c108e9a3;origin=https://github.com/openssl/openssl/` """ # noqa # try to resolve the provided pid swh_id_resolved = resolve_swh_persistent_id(swh_id) # id is well-formed, now check that the pointed # object is present in the archive, NotFoundExc # will be raised otherwise swh_id_parsed = swh_id_resolved['swh_id_parsed'] object_type = swh_id_parsed.object_type object_id = swh_id_parsed.object_id service.lookup_object(object_type, object_id) # id is well-formed and the pointed object exists swh_id_data = swh_id_parsed._asdict() swh_id_data['browse_url'] = request.build_absolute_uri( swh_id_resolved['browse_url']) return swh_id_data + + +@api_route(r'/known/', + 'api-1-swh-pid-known', methods=['POST']) +@api_doc('/known/', noargs=True, tags=['hidden']) +@format_docstring() +def api_swh_pid_known(request): + """ + .. http:post:: /api/1/known/ + + Check if a list of Software Heritage persistent identifier is present + in the archive depending on their id (sha1_git). + + Returns: + A dictionary with: + keys(str): Persistent identifier + values(dict): A dictionary containing the key 'known'. (true if + the pid is present, False otherwise) + + """ + persistent_ids = [get_persistent_identifier(pid) + for pid in request.data] + + response = {str(pid): {'known': False} for pid in persistent_ids} + + # group pids by their type + pids_by_type = utils.group_swh_persistent_identifiers(persistent_ids) + # search for hashes not present in the storage + missing_hashes = service.lookup_missing_hashes(pids_by_type) + + for pid in persistent_ids: + if pid.object_id not in missing_hashes: + response[str(pid)]['known'] = True + + return response diff --git a/swh/web/common/service.py b/swh/web/common/service.py index 087bbe1f..9e5c0f37 100644 --- a/swh/web/common/service.py +++ b/swh/web/common/service.py @@ -1,1205 +1,1205 @@ # Copyright (C) 2015-2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information import itertools import os import re from collections import defaultdict -from typing import Any, Dict +from typing import Any, Dict, List, Set from swh.model import hashutil from swh.storage.algos import diff, revisions_walker from swh.model.identifiers import ( CONTENT, DIRECTORY, RELEASE, REVISION, SNAPSHOT ) from swh.web.common import converters from swh.web.common import query from swh.web.common.exc import BadInputExc, NotFoundExc from swh.web.common.origin_visits import get_origin_visit from swh.web import config search = config.search() storage = config.storage() vault = config.vault() idx_storage = config.indexer_storage() MAX_LIMIT = 50 # Top limit the users can ask for def _first_element(l): """Returns the first element in the provided list or None if it is empty or None""" return next(iter(l or []), None) def lookup_multiple_hashes(hashes): """Lookup the passed hashes in a single DB connection, using batch processing. Args: An array of {filename: X, sha1: Y}, string X, hex sha1 string Y. Returns: The same array with elements updated with elem['found'] = true if the hash is present in storage, elem['found'] = false if not. """ hashlist = [hashutil.hash_to_bytes(elem['sha1']) for elem in hashes] content_missing = storage.content_missing_per_sha1(hashlist) missing = [hashutil.hash_to_hex(x) for x in content_missing] for x in hashes: x.update({'found': True}) for h in hashes: if h['sha1'] in missing: h['found'] = False return hashes def lookup_expression(expression, last_sha1, per_page): """Lookup expression in raw content. Args: expression (str): An expression to lookup through raw indexed content last_sha1 (str): Last sha1 seen per_page (int): Number of results per page Yields: ctags whose content match the expression """ limit = min(per_page, MAX_LIMIT) ctags = idx_storage.content_ctags_search(expression, last_sha1=last_sha1, limit=limit) for ctag in ctags: ctag = converters.from_swh(ctag, hashess={'id'}) ctag['sha1'] = ctag['id'] ctag.pop('id') yield ctag def lookup_hash(q): """Checks if the storage contains a given content checksum Args: query string of the form Returns: Dict with key found containing the hash info if the hash is present, None if not. """ algo, hash = query.parse_hash(q) found = _first_element(storage.content_find({algo: hash})) return {'found': converters.from_content(found), 'algo': algo} def search_hash(q): """Checks if the storage contains a given content checksum Args: query string of the form Returns: Dict with key found to True or False, according to whether the checksum is present or not """ algo, hash = query.parse_hash(q) found = _first_element(storage.content_find({algo: hash})) return {'found': found is not None} def _lookup_content_sha1(q): """Given a possible input, query for the content's sha1. Args: q: query string of the form Returns: binary sha1 if found or None """ algo, hash = query.parse_hash(q) if algo != 'sha1': hashes = _first_element(storage.content_find({algo: hash})) if not hashes: return None return hashes['sha1'] return hash def lookup_content_ctags(q): """Return ctags information from a specified content. Args: q: query string of the form Yields: ctags information (dict) list if the content is found. """ sha1 = _lookup_content_sha1(q) if not sha1: return None ctags = list(idx_storage.content_ctags_get([sha1])) if not ctags: return None for ctag in ctags: yield converters.from_swh(ctag, hashess={'id'}) def lookup_content_filetype(q): """Return filetype information from a specified content. Args: q: query string of the form Yields: filetype information (dict) list if the content is found. """ sha1 = _lookup_content_sha1(q) if not sha1: return None filetype = _first_element(list(idx_storage.content_mimetype_get([sha1]))) if not filetype: return None return converters.from_filetype(filetype) def lookup_content_language(q): """Return language information from a specified content. Args: q: query string of the form Yields: language information (dict) list if the content is found. """ sha1 = _lookup_content_sha1(q) if not sha1: return None lang = _first_element(list(idx_storage.content_language_get([sha1]))) if not lang: return None return converters.from_swh(lang, hashess={'id'}) def lookup_content_license(q): """Return license information from a specified content. Args: q: query string of the form Yields: license information (dict) list if the content is found. """ sha1 = _lookup_content_sha1(q) if not sha1: return None lic = _first_element(idx_storage.content_fossology_license_get([sha1])) if not lic: return None return converters.from_swh({'id': sha1, 'facts': lic[sha1]}, hashess={'id'}) def lookup_origin(origin): """Return information about the origin matching dict origin. Args: origin: origin's dict with 'url' key Returns: origin information as dict. """ origin_info = storage.origin_get(origin) if not origin_info: msg = 'Origin with url %s not found!' % origin['url'] raise NotFoundExc(msg) return converters.from_origin(origin_info) def lookup_origins(origin_from=1, origin_count=100): """Get list of archived software origins in a paginated way. Origins are sorted by id before returning them Args: origin_from (int): The minimum id of the origins to return origin_count (int): The maximum number of origins to return Yields: origins information as dicts """ origins = storage.origin_get_range(origin_from, origin_count) return map(converters.from_origin, origins) def search_origin(url_pattern, limit=50, with_visit=False, page_token=None): """Search for origins whose urls contain a provided string pattern or match a provided regular expression. Args: url_pattern: the string pattern to search for in origin urls limit: the maximum number of found origins to return page_token: opaque string used to get the next results of a search Returns: list of origin information as dict. """ if search: results = search.origin_search(url_pattern=url_pattern, count=limit, page_token=page_token, with_visit=with_visit) origins = list(map(converters.from_origin, results['results'])) return (origins, results['next_page_token']) else: # Fallback to swh-storage if swh-search is not configured offset = int(page_token) if page_token else 0 regexp = True search_words = [re.escape(word) for word in url_pattern.split()] if len(search_words) >= 7: url_pattern = '.*'.join(search_words) else: pattern_parts = [] for permut in itertools.permutations(search_words): pattern_parts.append('.*'.join(permut)) url_pattern = '|'.join(pattern_parts) origins = storage.origin_search(url_pattern, offset, limit, regexp, with_visit) origins = list(map(converters.from_origin, origins)) if len(origins) >= limit: page_token = str(offset + len(origins)) else: page_token = None return (origins, page_token) def search_origin_metadata(fulltext, limit=50): """Search for origins whose metadata match a provided string pattern. Args: fulltext: the string pattern to search for in origin metadata offset: number of found origins to skip before returning results limit: the maximum number of found origins to return Returns: list of origin metadata as dict. """ matches = idx_storage.origin_intrinsic_metadata_search_fulltext( conjunction=[fulltext], limit=limit) results = [] for match in matches: match['from_revision'] = hashutil.hash_to_hex(match['from_revision']) origin = storage.origin_get({'url': match['id']}) del match['id'] result = converters.from_origin(origin) if result: result['metadata'] = match results.append(result) return results def lookup_origin_intrinsic_metadata(origin_dict): """Return intrinsic metadata for origin whose origin matches given origin. Args: origin_dict: origin's dict with keys ('type' AND 'url') Returns: origin metadata. """ origin_info = storage.origin_get(origin_dict) if not origin_info: msg = 'Origin with url %s not found!' % origin_dict['url'] raise NotFoundExc(msg) origins = [origin_info['url']] match = _first_element( idx_storage.origin_intrinsic_metadata_get(origins)) result = {} if match: result = match['metadata'] return result def _to_sha1_bin(sha1_hex): _, sha1_git_bin = query.parse_hash_with_algorithms_or_throws( sha1_hex, ['sha1'], # HACK: sha1_git really 'Only sha1_git is supported.') return sha1_git_bin def _check_directory_exists(sha1_git, sha1_git_bin): if len(list(storage.directory_missing([sha1_git_bin]))): raise NotFoundExc('Directory with sha1_git %s not found' % sha1_git) def lookup_directory(sha1_git): """Return information about the directory with id sha1_git. Args: sha1_git as string Returns: directory information as dict. """ empty_dir_sha1 = '4b825dc642cb6eb9a060e54bf8d69288fbee4904' if sha1_git == empty_dir_sha1: return [] sha1_git_bin = _to_sha1_bin(sha1_git) _check_directory_exists(sha1_git, sha1_git_bin) directory_entries = storage.directory_ls(sha1_git_bin) return map(converters.from_directory_entry, directory_entries) def lookup_directory_with_path(sha1_git, path_string): """Return directory information for entry with path path_string w.r.t. root directory pointed by directory_sha1_git Args: - directory_sha1_git: sha1_git corresponding to the directory to which we append paths to (hopefully) find the entry - the relative path to the entry starting from the directory pointed by directory_sha1_git Raises: NotFoundExc if the directory entry is not found """ sha1_git_bin = _to_sha1_bin(sha1_git) _check_directory_exists(sha1_git, sha1_git_bin) paths = path_string.strip(os.path.sep).split(os.path.sep) queried_dir = storage.directory_entry_get_by_path( sha1_git_bin, list(map(lambda p: p.encode('utf-8'), paths))) if not queried_dir: raise NotFoundExc(('Directory entry with path %s from %s not found') % (path_string, sha1_git)) return converters.from_directory_entry(queried_dir) def lookup_release(release_sha1_git): """Return information about the release with sha1 release_sha1_git. Args: release_sha1_git: The release's sha1 as hexadecimal Returns: Release information as dict. Raises: ValueError if the identifier provided is not of sha1 nature. """ sha1_git_bin = _to_sha1_bin(release_sha1_git) release = _first_element(storage.release_get([sha1_git_bin])) if not release: raise NotFoundExc('Release with sha1_git %s not found.' % release_sha1_git) return converters.from_release(release) def lookup_release_multiple(sha1_git_list): """Return information about the revisions identified with their sha1_git identifiers. Args: sha1_git_list: A list of revision sha1_git identifiers Returns: Release information as dict. Raises: ValueError if the identifier provided is not of sha1 nature. """ sha1_bin_list = (_to_sha1_bin(sha1_git) for sha1_git in sha1_git_list) releases = storage.release_get(sha1_bin_list) or [] return (converters.from_release(r) for r in releases) def lookup_revision(rev_sha1_git): """Return information about the revision with sha1 revision_sha1_git. Args: revision_sha1_git: The revision's sha1 as hexadecimal Returns: Revision information as dict. Raises: ValueError if the identifier provided is not of sha1 nature. NotFoundExc if there is no revision with the provided sha1_git. """ sha1_git_bin = _to_sha1_bin(rev_sha1_git) revision = _first_element(storage.revision_get([sha1_git_bin])) if not revision: raise NotFoundExc('Revision with sha1_git %s not found.' % rev_sha1_git) return converters.from_revision(revision) def lookup_revision_multiple(sha1_git_list): """Return information about the revisions identified with their sha1_git identifiers. Args: sha1_git_list: A list of revision sha1_git identifiers Returns: Generator of revisions information as dict. Raises: ValueError if the identifier provided is not of sha1 nature. """ sha1_bin_list = (_to_sha1_bin(sha1_git) for sha1_git in sha1_git_list) revisions = storage.revision_get(sha1_bin_list) or [] return (converters.from_revision(r) for r in revisions) def lookup_revision_message(rev_sha1_git): """Return the raw message of the revision with sha1 revision_sha1_git. Args: revision_sha1_git: The revision's sha1 as hexadecimal Returns: Decoded revision message as dict {'message': } Raises: ValueError if the identifier provided is not of sha1 nature. NotFoundExc if the revision is not found, or if it has no message """ sha1_git_bin = _to_sha1_bin(rev_sha1_git) revision = _first_element(storage.revision_get([sha1_git_bin])) if not revision: raise NotFoundExc('Revision with sha1_git %s not found.' % rev_sha1_git) if 'message' not in revision: raise NotFoundExc('No message for revision with sha1_git %s.' % rev_sha1_git) res = {'message': revision['message']} return res def _lookup_revision_id_by(origin, branch_name, timestamp): def _get_snapshot_branch(snapshot, branch_name): snapshot = lookup_snapshot(visit['snapshot'], branches_from=branch_name, branches_count=10) branch = None if branch_name in snapshot['branches']: branch = snapshot['branches'][branch_name] return branch if isinstance(origin, int): origin = {'id': origin} elif isinstance(origin, str): origin = {'url': origin} else: raise TypeError('"origin" must be an int or a string.') visit = get_origin_visit(origin, visit_ts=timestamp) branch = _get_snapshot_branch(visit['snapshot'], branch_name) rev_id = None if branch and branch['target_type'] == 'revision': rev_id = branch['target'] elif branch and branch['target_type'] == 'alias': branch = _get_snapshot_branch(visit['snapshot'], branch['target']) if branch and branch['target_type'] == 'revision': rev_id = branch['target'] if not rev_id: raise NotFoundExc('Revision for origin %s and branch %s not found.' % (origin.get('url'), branch_name)) return rev_id def lookup_revision_by(origin, branch_name='HEAD', timestamp=None): """Lookup revision by origin, snapshot branch name and visit timestamp. If branch_name is not provided, lookup using 'HEAD' as default. If timestamp is not provided, use the most recent. Args: origin (Union[int,str]): origin of the revision branch_name (str): snapshot branch name timestamp (str/int): origin visit time frame Returns: dict: The revision matching the criterions Raises: NotFoundExc if no revision corresponds to the criterion """ rev_id = _lookup_revision_id_by(origin, branch_name, timestamp) return lookup_revision(rev_id) def lookup_revision_log(rev_sha1_git, limit): """Lookup revision log by revision id. Args: rev_sha1_git (str): The revision's sha1 as hexadecimal limit (int): the maximum number of revisions returned Returns: list: Revision log as list of revision dicts Raises: ValueError: if the identifier provided is not of sha1 nature. NotFoundExc: if there is no revision with the provided sha1_git. """ lookup_revision(rev_sha1_git) sha1_git_bin = _to_sha1_bin(rev_sha1_git) revision_entries = storage.revision_log([sha1_git_bin], limit) return map(converters.from_revision, revision_entries) def lookup_revision_log_by(origin, branch_name, timestamp, limit): """Lookup revision by origin, snapshot branch name and visit timestamp. Args: origin (Union[int,str]): origin of the revision branch_name (str): snapshot branch timestamp (str/int): origin visit time frame limit (int): the maximum number of revisions returned Returns: list: Revision log as list of revision dicts Raises: NotFoundExc: if no revision corresponds to the criterion """ rev_id = _lookup_revision_id_by(origin, branch_name, timestamp) return lookup_revision_log(rev_id, limit) def lookup_revision_with_context_by(origin, branch_name, timestamp, sha1_git, limit=100): """Return information about revision sha1_git, limited to the sub-graph of all transitive parents of sha1_git_root. sha1_git_root being resolved through the lookup of a revision by origin, branch_name and ts. In other words, sha1_git is an ancestor of sha1_git_root. Args: - origin: origin of the revision. - branch_name: revision's branch. - timestamp: revision's time frame. - sha1_git: one of sha1_git_root's ancestors. - limit: limit the lookup to 100 revisions back. Returns: Pair of (root_revision, revision). Information on sha1_git if it is an ancestor of sha1_git_root including children leading to sha1_git_root Raises: - BadInputExc in case of unknown algo_hash or bad hash. - NotFoundExc if either revision is not found or if sha1_git is not an ancestor of sha1_git_root. """ rev_root_id = _lookup_revision_id_by(origin, branch_name, timestamp) rev_root_id_bin = hashutil.hash_to_bytes(rev_root_id) rev_root = _first_element(storage.revision_get([rev_root_id_bin])) return (converters.from_revision(rev_root), lookup_revision_with_context(rev_root, sha1_git, limit)) def lookup_revision_with_context(sha1_git_root, sha1_git, limit=100): """Return information about revision sha1_git, limited to the sub-graph of all transitive parents of sha1_git_root. In other words, sha1_git is an ancestor of sha1_git_root. Args: sha1_git_root: latest revision. The type is either a sha1 (as an hex string) or a non converted dict. sha1_git: one of sha1_git_root's ancestors limit: limit the lookup to 100 revisions back Returns: Information on sha1_git if it is an ancestor of sha1_git_root including children leading to sha1_git_root Raises: BadInputExc in case of unknown algo_hash or bad hash NotFoundExc if either revision is not found or if sha1_git is not an ancestor of sha1_git_root """ sha1_git_bin = _to_sha1_bin(sha1_git) revision = _first_element(storage.revision_get([sha1_git_bin])) if not revision: raise NotFoundExc('Revision %s not found' % sha1_git) if isinstance(sha1_git_root, str): sha1_git_root_bin = _to_sha1_bin(sha1_git_root) revision_root = _first_element(storage.revision_get([sha1_git_root_bin])) # noqa if not revision_root: raise NotFoundExc('Revision root %s not found' % sha1_git_root) else: sha1_git_root_bin = sha1_git_root['id'] revision_log = storage.revision_log([sha1_git_root_bin], limit) parents = {} children = defaultdict(list) for rev in revision_log: rev_id = rev['id'] parents[rev_id] = [] for parent_id in rev['parents']: parents[rev_id].append(parent_id) children[parent_id].append(rev_id) if revision['id'] not in parents: raise NotFoundExc('Revision %s is not an ancestor of %s' % (sha1_git, sha1_git_root)) revision['children'] = children[revision['id']] return converters.from_revision(revision) def lookup_directory_with_revision(sha1_git, dir_path=None, with_data=False): """Return information on directory pointed by revision with sha1_git. If dir_path is not provided, display top level directory. Otherwise, display the directory pointed by dir_path (if it exists). Args: sha1_git: revision's hash. dir_path: optional directory pointed to by that revision. with_data: boolean that indicates to retrieve the raw data if the path resolves to a content. Default to False (for the api) Returns: Information on the directory pointed to by that revision. Raises: BadInputExc in case of unknown algo_hash or bad hash. NotFoundExc either if the revision is not found or the path referenced does not exist. NotImplementedError in case of dir_path exists but do not reference a type 'dir' or 'file'. """ sha1_git_bin = _to_sha1_bin(sha1_git) revision = _first_element(storage.revision_get([sha1_git_bin])) if not revision: raise NotFoundExc('Revision %s not found' % sha1_git) dir_sha1_git_bin = revision['directory'] if dir_path: paths = dir_path.strip(os.path.sep).split(os.path.sep) entity = storage.directory_entry_get_by_path( dir_sha1_git_bin, list(map(lambda p: p.encode('utf-8'), paths))) if not entity: raise NotFoundExc( "Directory or File '%s' pointed to by revision %s not found" % (dir_path, sha1_git)) else: entity = {'type': 'dir', 'target': dir_sha1_git_bin} if entity['type'] == 'dir': directory_entries = storage.directory_ls(entity['target']) or [] return {'type': 'dir', 'path': '.' if not dir_path else dir_path, 'revision': sha1_git, 'content': list(map(converters.from_directory_entry, directory_entries))} elif entity['type'] == 'file': # content content = _first_element( storage.content_find({'sha1_git': entity['target']})) if not content: raise NotFoundExc('Content not found for revision %s' % sha1_git) if with_data: c = _first_element(storage.content_get([content['sha1']])) content['data'] = c['data'] return {'type': 'file', 'path': '.' if not dir_path else dir_path, 'revision': sha1_git, 'content': converters.from_content(content)} elif entity['type'] == 'rev': # revision revision = next(storage.revision_get([entity['target']])) return {'type': 'rev', 'path': '.' if not dir_path else dir_path, 'revision': sha1_git, 'content': converters.from_revision(revision)} else: raise NotImplementedError('Entity of type %s not implemented.' % entity['type']) def lookup_content(q): """Lookup the content designed by q. Args: q: The release's sha1 as hexadecimal Raises: NotFoundExc if the requested content is not found """ algo, hash = query.parse_hash(q) c = _first_element(storage.content_find({algo: hash})) if not c: raise NotFoundExc('Content with %s checksum equals to %s not found!' % (algo, hashutil.hash_to_hex(hash))) return converters.from_content(c) def lookup_content_raw(q): """Lookup the content defined by q. Args: q: query string of the form Returns: dict with 'sha1' and 'data' keys. data representing its raw data decoded. Raises: NotFoundExc if the requested content is not found or if the content bytes are not available in the storage """ c = lookup_content(q) content_sha1_bytes = hashutil.hash_to_bytes(c['checksums']['sha1']) content = _first_element(storage.content_get([content_sha1_bytes])) if not content: algo, hash = query.parse_hash(q) raise NotFoundExc('Bytes of content with %s checksum equals to %s ' 'are not available!' % (algo, hashutil.hash_to_hex(hash))) return converters.from_content(content) def stat_counters(): """Return the stat counters for Software Heritage Returns: A dict mapping textual labels to integer values. """ return storage.stat_counters() def _lookup_origin_visits(origin_url, last_visit=None, limit=10): """Yields the origin origins' visits. Args: origin_url (str): origin to list visits for last_visit (int): last visit to lookup from limit (int): Number of elements max to display Yields: Dictionaries of origin_visit for that origin """ limit = min(limit, MAX_LIMIT) for visit in storage.origin_visit_get( origin_url, last_visit=last_visit, limit=limit): visit['origin'] = origin_url yield visit def lookup_origin_visits(origin, last_visit=None, per_page=10): """Yields the origin origins' visits. Args: origin: origin to list visits for Yields: Dictionaries of origin_visit for that origin """ visits = _lookup_origin_visits(origin, last_visit=last_visit, limit=per_page) for visit in visits: yield converters.from_origin_visit(visit) def lookup_origin_visit_latest(origin_url, require_snapshot): """Return the origin's latest visit Args: origin_url (str): origin to list visits for require_snapshot (bool): filter out origins without a snapshot Returns: dict: The origin_visit concerned """ visit = storage.origin_visit_get_latest( origin_url, require_snapshot=require_snapshot) return converters.from_origin_visit(visit) def lookup_origin_visit(origin_url, visit_id): """Return information about visit visit_id with origin origin. Args: origin (str): origin concerned by the visit visit_id: the visit identifier to lookup Yields: The dict origin_visit concerned """ visit = storage.origin_visit_get_by(origin_url, visit_id) if not visit: raise NotFoundExc('Origin %s or its visit ' 'with id %s not found!' % (origin_url, visit_id)) visit['origin'] = origin_url return converters.from_origin_visit(visit) def lookup_snapshot_sizes(snapshot_id): """Count the number of branches in the snapshot with the given id Args: snapshot_id (str): sha1 identifier of the snapshot Returns: dict: A dict whose keys are the target types of branches and values their corresponding amount """ snapshot_id_bin = _to_sha1_bin(snapshot_id) snapshot_sizes = storage.snapshot_count_branches(snapshot_id_bin) if 'revision' not in snapshot_sizes: snapshot_sizes['revision'] = 0 if 'release' not in snapshot_sizes: snapshot_sizes['release'] = 0 # adjust revision / release count for display if aliases are defined if 'alias' in snapshot_sizes: aliases = lookup_snapshot(snapshot_id, branches_count=snapshot_sizes['alias'], target_types=['alias']) for alias in aliases['branches'].values(): if lookup_snapshot(snapshot_id, branches_from=alias['target'], branches_count=1, target_types=['revision']): snapshot_sizes['revision'] += 1 else: snapshot_sizes['release'] += 1 del snapshot_sizes['alias'] return snapshot_sizes def lookup_snapshot(snapshot_id, branches_from='', branches_count=1000, target_types=None): """Return information about a snapshot, aka the list of named branches found during a specific visit of an origin. Args: snapshot_id (str): sha1 identifier of the snapshot branches_from (str): optional parameter used to skip branches whose name is lesser than it before returning them branches_count (int): optional parameter used to restrain the amount of returned branches target_types (list): optional parameter used to filter the target types of branch to return (possible values that can be contained in that list are `'content', 'directory', 'revision', 'release', 'snapshot', 'alias'`) Returns: A dict filled with the snapshot content. """ snapshot_id_bin = _to_sha1_bin(snapshot_id) snapshot = storage.snapshot_get_branches(snapshot_id_bin, branches_from.encode(), branches_count, target_types) if not snapshot: raise NotFoundExc('Snapshot with id %s not found!' % snapshot_id) return converters.from_snapshot(snapshot) def lookup_latest_origin_snapshot(origin, allowed_statuses=None): """Return information about the latest snapshot of an origin. .. warning:: At most 1000 branches contained in the snapshot will be returned for performance reasons. Args: origin: URL or integer identifier of the origin allowed_statuses: list of visit statuses considered to find the latest snapshot for the visit. For instance, ``allowed_statuses=['full']`` will only consider visits that have successfully run to completion. Returns: A dict filled with the snapshot content. """ snapshot = storage.snapshot_get_latest(origin, allowed_statuses) return converters.from_snapshot(snapshot) def lookup_revision_through(revision, limit=100): """Retrieve a revision from the criterion stored in revision dictionary. Args: revision: Dictionary of criterion to lookup the revision with. Here are the supported combination of possible values: - origin_url, branch_name, ts, sha1_git - origin_url, branch_name, ts - sha1_git_root, sha1_git - sha1_git Returns: None if the revision is not found or the actual revision. """ if ( 'origin_url' in revision and 'branch_name' in revision and 'ts' in revision and 'sha1_git' in revision): return lookup_revision_with_context_by(revision['origin_url'], revision['branch_name'], revision['ts'], revision['sha1_git'], limit) if ( 'origin_url' in revision and 'branch_name' in revision and 'ts' in revision): return lookup_revision_by(revision['origin_url'], revision['branch_name'], revision['ts']) if ( 'sha1_git_root' in revision and 'sha1_git' in revision): return lookup_revision_with_context(revision['sha1_git_root'], revision['sha1_git'], limit) if 'sha1_git' in revision: return lookup_revision(revision['sha1_git']) # this should not happen raise NotImplementedError('Should not happen!') def lookup_directory_through_revision(revision, path=None, limit=100, with_data=False): """Retrieve the directory information from the revision. Args: revision: dictionary of criterion representing a revision to lookup path: directory's path to lookup. limit: optional query parameter to limit the revisions log (default to 100). For now, note that this limit could impede the transitivity conclusion about sha1_git not being an ancestor of. with_data: indicate to retrieve the content's raw data if path resolves to a content. Returns: The directory pointing to by the revision criterions at path. """ rev = lookup_revision_through(revision, limit) if not rev: raise NotFoundExc('Revision with criterion %s not found!' % revision) return (rev['id'], lookup_directory_with_revision(rev['id'], path, with_data)) def vault_cook(obj_type, obj_id, email=None): """Cook a vault bundle. """ return vault.cook(obj_type, obj_id, email=email) def vault_fetch(obj_type, obj_id): """Fetch a vault bundle. """ return vault.fetch(obj_type, obj_id) def vault_progress(obj_type, obj_id): """Get the current progress of a vault bundle. """ return vault.progress(obj_type, obj_id) def diff_revision(rev_id): """Get the list of file changes (insertion / deletion / modification / renaming) for a particular revision. """ rev_sha1_git_bin = _to_sha1_bin(rev_id) changes = diff.diff_revision(storage, rev_sha1_git_bin, track_renaming=True) for change in changes: change['from'] = converters.from_directory_entry(change['from']) change['to'] = converters.from_directory_entry(change['to']) if change['from_path']: change['from_path'] = change['from_path'].decode('utf-8') if change['to_path']: change['to_path'] = change['to_path'].decode('utf-8') return changes class _RevisionsWalkerProxy(object): """ Proxy class wrapping a revisions walker iterator from swh-storage and performing needed conversions. """ def __init__(self, rev_walker_type, rev_start, *args, **kwargs): rev_start_bin = hashutil.hash_to_bytes(rev_start) self.revisions_walker = \ revisions_walker.get_revisions_walker(rev_walker_type, storage, rev_start_bin, *args, **kwargs) def export_state(self): return self.revisions_walker.export_state() def __next__(self): return converters.from_revision(next(self.revisions_walker)) def __iter__(self): return self def get_revisions_walker(rev_walker_type, rev_start, *args, **kwargs): """ Utility function to instantiate a revisions walker of a given type, see :mod:`swh.storage.algos.revisions_walker`. Args: rev_walker_type (str): the type of revisions walker to return, possible values are: ``committer_date``, ``dfs``, ``dfs_post``, ``bfs`` and ``path`` rev_start (str): hexadecimal representation of a revision identifier args (list): position arguments to pass to the revisions walker constructor kwargs (dict): keyword arguments to pass to the revisions walker constructor """ # first check if the provided revision is valid lookup_revision(rev_start) return _RevisionsWalkerProxy(rev_walker_type, rev_start, *args, **kwargs) def lookup_object(object_type: str, object_id: str) -> Dict[str, Any]: """ Utility function for looking up an object in the archive by its type and id. Args: object_type (str): the type of object to lookup, either *content*, *directory*, *release*, *revision* or *snapshot* object_id (str): the *sha1_git* checksum identifier in hexadecimal form of the object to lookup Returns: Dict[str, Any]: A dictionary describing the object or a list of dictionary for the directory object type. Raises: NotFoundExc: if the object could not be found in the archive BadInputExc: if the object identifier is invalid """ if object_type == CONTENT: return lookup_content(f'sha1_git:{object_id}') elif object_type == DIRECTORY: return { 'id': object_id, 'content': list(lookup_directory(object_id)) } elif object_type == RELEASE: return lookup_release(object_id) elif object_type == REVISION: return lookup_revision(object_id) elif object_type == SNAPSHOT: return lookup_snapshot(object_id) raise BadInputExc(('Invalid swh object type! Valid types are ' f'{CONTENT}, {DIRECTORY}, {RELEASE} ' f'{REVISION} or {SNAPSHOT}.')) -def lookup_missing_hashes(grouped_pids): +def lookup_missing_hashes(grouped_pids: Dict[str, List[bytes]]) -> Set[str]: """Lookup missing Software Heritage persistent identifier hash, using batch processing. Args: A dictionary with: keys: persistent identifier type values: list(bytes) persistent identifier hash Returns: A set(hexadecimal) of the hashes not found in the storage """ missing_hashes = [] for obj_type, obj_ids in grouped_pids.items(): if obj_type == CONTENT: missing_hashes.append( storage.content_missing_per_sha1_git(obj_ids)) - if obj_type == DIRECTORY: + elif obj_type == DIRECTORY: missing_hashes.append(storage.directory_missing(obj_ids)) - if obj_type == REVISION: + elif obj_type == REVISION: missing_hashes.append(storage.revision_missing(obj_ids)) - if obj_type == RELEASE: + elif obj_type == RELEASE: missing_hashes.append(storage.directory_missing(obj_ids)) - if obj_type == SNAPSHOT: + elif obj_type == SNAPSHOT: missing_hashes.append(storage.directory_missing(obj_ids)) missing = set(map(lambda x: hashutil.hash_to_hex(x), itertools.chain(*missing_hashes))) return missing diff --git a/swh/web/common/utils.py b/swh/web/common/utils.py index 5d0bf5f9..bf0cc8c6 100644 --- a/swh/web/common/utils.py +++ b/swh/web/common/utils.py @@ -1,427 +1,439 @@ # Copyright (C) 2017-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information import docutils.parsers.rst import docutils.utils import re from datetime import datetime, timezone from dateutil import parser as date_parser from dateutil import tz from typing import Optional, Dict, Any from django.urls import reverse as django_reverse from django.http import QueryDict, HttpRequest from prometheus_client.registry import CollectorRegistry from rest_framework.authentication import SessionAuthentication from swh.model.exceptions import ValidationError from swh.model.hashutil import hash_to_bytes from swh.model.identifiers import ( persistent_identifier, parse_persistent_identifier, CONTENT, DIRECTORY, ORIGIN, RELEASE, REVISION, SNAPSHOT ) from swh.web.common.exc import BadInputExc from swh.web.config import get_config SWH_WEB_METRICS_REGISTRY = CollectorRegistry(auto_describe=True) swh_object_icons = { 'branch': 'fa fa-code-fork', 'branches': 'fa fa-code-fork', 'content': 'fa fa-file-text', 'directory': 'fa fa-folder', 'person': 'fa fa-user', 'revisions history': 'fa fa-history', 'release': 'fa fa-tag', 'releases': 'fa fa-tag', 'revision': 'octicon-git-commit', 'snapshot': 'fa fa-camera', 'visits': 'fa fa-calendar', } def reverse(viewname: str, url_args: Optional[Dict[str, Any]] = None, query_params: Optional[Dict[str, Any]] = None, current_app: Optional[str] = None, urlconf: Optional[str] = None, request: Optional[HttpRequest] = None) -> str: """An override of django reverse function supporting query parameters. Args: viewname: the name of the django view from which to compute a url url_args: dictionary of url arguments indexed by their names query_params: dictionary of query parameters to append to the reversed url current_app: the name of the django app tighten to the view urlconf: url configuration module request: build an absolute URI if provided Returns: str: the url of the requested view with processed arguments and query parameters """ if url_args: url_args = {k: v for k, v in url_args.items() if v is not None} url = django_reverse(viewname, urlconf=urlconf, kwargs=url_args, current_app=current_app) if query_params: query_params = {k: v for k, v in query_params.items() if v} if query_params and len(query_params) > 0: query_dict = QueryDict('', mutable=True) for k in sorted(query_params.keys()): query_dict[k] = query_params[k] url += ('?' + query_dict.urlencode(safe='/;:')) if request is not None: url = request.build_absolute_uri(url) return url def datetime_to_utc(date): """Returns datetime in UTC without timezone info Args: date (datetime.datetime): input datetime with timezone info Returns: datetime.datetime: datetime in UTC without timezone info """ if date.tzinfo: return date.astimezone(tz.gettz('UTC')).replace(tzinfo=timezone.utc) else: return date def parse_timestamp(timestamp): """Given a time or timestamp (as string), parse the result as UTC datetime. Returns: datetime.datetime: a timezone-aware datetime representing the parsed value or None if the parsing fails. Samples: - 2016-01-12 - 2016-01-12T09:19:12+0100 - Today is January 1, 2047 at 8:21:00AM - 1452591542 """ if not timestamp: return None try: date = date_parser.parse(timestamp, ignoretz=False, fuzzy=True) return datetime_to_utc(date) except Exception: try: return datetime.utcfromtimestamp(float(timestamp)).replace( tzinfo=timezone.utc) except (ValueError, OverflowError) as e: raise BadInputExc(e) def shorten_path(path): """Shorten the given path: for each hash present, only return the first 8 characters followed by an ellipsis""" sha256_re = r'([0-9a-f]{8})[0-9a-z]{56}' sha1_re = r'([0-9a-f]{8})[0-9a-f]{32}' ret = re.sub(sha256_re, r'\1...', path) return re.sub(sha1_re, r'\1...', ret) def format_utc_iso_date(iso_date, fmt='%d %B %Y, %H:%M UTC'): """Turns a string representation of an ISO 8601 date string to UTC and format it into a more human readable one. For instance, from the following input string: '2017-05-04T13:27:13+02:00' the following one is returned: '04 May 2017, 11:27 UTC'. Custom format string may also be provided as parameter Args: iso_date (str): a string representation of an ISO 8601 date fmt (str): optional date formatting string Returns: str: a formatted string representation of the input iso date """ if not iso_date: return iso_date date = parse_timestamp(iso_date) return date.strftime(fmt) def gen_path_info(path): """Function to generate path data navigation for use with a breadcrumb in the swh web ui. For instance, from a path /folder1/folder2/folder3, it returns the following list:: [{'name': 'folder1', 'path': 'folder1'}, {'name': 'folder2', 'path': 'folder1/folder2'}, {'name': 'folder3', 'path': 'folder1/folder2/folder3'}] Args: path: a filesystem path Returns: list: a list of path data for navigation as illustrated above. """ path_info = [] if path: sub_paths = path.strip('/').split('/') path_from_root = '' for p in sub_paths: path_from_root += '/' + p path_info.append({'name': p, 'path': path_from_root.strip('/')}) return path_info def get_swh_persistent_id(object_type, object_id, scheme_version=1): """ Returns the persistent identifier for a swh object based on: * the object type * the object id * the swh identifiers scheme version Args: object_type (str): the swh object type (content/directory/release/revision/snapshot) object_id (str): the swh object id (hexadecimal representation of its hash value) scheme_version (int): the scheme version of the swh persistent identifiers Returns: str: the swh object persistent identifier Raises: BadInputExc: if the provided parameters do not enable to generate a valid identifier """ try: swh_id = persistent_identifier(object_type, object_id, scheme_version) except ValidationError as e: raise BadInputExc('Invalid object (%s) for swh persistent id. %s' % (object_id, e)) else: return swh_id def resolve_swh_persistent_id(swh_id, query_params=None): """ Try to resolve a Software Heritage persistent id into an url for browsing the pointed object. Args: swh_id (str): a Software Heritage persistent identifier query_params (django.http.QueryDict): optional dict filled with query parameters to append to the browse url Returns: dict: a dict with the following keys: * **swh_id_parsed (swh.model.identifiers.PersistentId)**: the parsed identifier * **browse_url (str)**: the url for browsing the pointed object - - Raises: - BadInputExc: if the provided identifier can not be parsed """ - try: - swh_id_parsed = parse_persistent_identifier(swh_id) - object_type = swh_id_parsed.object_type - object_id = swh_id_parsed.object_id - browse_url = None - query_dict = QueryDict('', mutable=True) - if query_params and len(query_params) > 0: - for k in sorted(query_params.keys()): - query_dict[k] = query_params[k] - if 'origin' in swh_id_parsed.metadata: - query_dict['origin'] = swh_id_parsed.metadata['origin'] - if object_type == CONTENT: - query_string = 'sha1_git:' + object_id - fragment = '' - if 'lines' in swh_id_parsed.metadata: - lines = swh_id_parsed.metadata['lines'].split('-') - fragment += '#L' + lines[0] - if len(lines) > 1: - fragment += '-L' + lines[1] - browse_url = reverse('browse-content', - url_args={'query_string': query_string}, - query_params=query_dict) + fragment - elif object_type == DIRECTORY: - browse_url = reverse('browse-directory', - url_args={'sha1_git': object_id}, - query_params=query_dict) - elif object_type == RELEASE: - browse_url = reverse('browse-release', - url_args={'sha1_git': object_id}, - query_params=query_dict) - elif object_type == REVISION: - browse_url = reverse('browse-revision', - url_args={'sha1_git': object_id}, - query_params=query_dict) - elif object_type == SNAPSHOT: - browse_url = reverse('browse-snapshot', - url_args={'snapshot_id': object_id}, - query_params=query_dict) - elif object_type == ORIGIN: - raise BadInputExc(('Origin PIDs (Persistent Identifiers) are not ' - 'publicly resolvable because they are for ' - 'internal usage only')) - except ValidationError as ve: - raise BadInputExc('Error when parsing identifier. %s' % - ' '.join(ve.messages)) - else: - return {'swh_id_parsed': swh_id_parsed, - 'browse_url': browse_url} + swh_id_parsed = get_persistent_identifier(swh_id) + object_type = swh_id_parsed.object_type + object_id = swh_id_parsed.object_id + browse_url = None + query_dict = QueryDict('', mutable=True) + if query_params and len(query_params) > 0: + for k in sorted(query_params.keys()): + query_dict[k] = query_params[k] + if 'origin' in swh_id_parsed.metadata: + query_dict['origin'] = swh_id_parsed.metadata['origin'] + if object_type == CONTENT: + query_string = 'sha1_git:' + object_id + fragment = '' + if 'lines' in swh_id_parsed.metadata: + lines = swh_id_parsed.metadata['lines'].split('-') + fragment += '#L' + lines[0] + if len(lines) > 1: + fragment += '-L' + lines[1] + browse_url = reverse('browse-content', + url_args={'query_string': query_string}, + query_params=query_dict) + fragment + elif object_type == DIRECTORY: + browse_url = reverse('browse-directory', + url_args={'sha1_git': object_id}, + query_params=query_dict) + elif object_type == RELEASE: + browse_url = reverse('browse-release', + url_args={'sha1_git': object_id}, + query_params=query_dict) + elif object_type == REVISION: + browse_url = reverse('browse-revision', + url_args={'sha1_git': object_id}, + query_params=query_dict) + elif object_type == SNAPSHOT: + browse_url = reverse('browse-snapshot', + url_args={'snapshot_id': object_id}, + query_params=query_dict) + elif object_type == ORIGIN: + raise BadInputExc(('Origin PIDs (Persistent Identifiers) are not ' + 'publicly resolvable because they are for ' + 'internal usage only')) + + return {'swh_id_parsed': swh_id_parsed, + 'browse_url': browse_url} def parse_rst(text, report_level=2): """ Parse a reStructuredText string with docutils. Args: text (str): string with reStructuredText markups in it report_level (int): level of docutils report messages to print (1 info 2 warning 3 error 4 severe 5 none) Returns: docutils.nodes.document: a parsed docutils document """ parser = docutils.parsers.rst.Parser() components = (docutils.parsers.rst.Parser,) settings = docutils.frontend.OptionParser( components=components).get_default_values() settings.report_level = report_level document = docutils.utils.new_document('rst-doc', settings=settings) parser.parse(text, document) return document def get_client_ip(request): """ Return the client IP address from an incoming HTTP request. Args: request (django.http.HttpRequest): the incoming HTTP request Returns: str: The client IP address """ x_forwarded_for = request.META.get('HTTP_X_FORWARDED_FOR') if x_forwarded_for: ip = x_forwarded_for.split(',')[0] else: ip = request.META.get('REMOTE_ADDR') return ip def context_processor(request): """ Django context processor used to inject variables in all swh-web templates. """ return { 'swh_object_icons': swh_object_icons, 'available_languages': None, 'swh_client_config': get_config()['client_config'], } class EnforceCSRFAuthentication(SessionAuthentication): """ Helper class to enforce CSRF validation on a DRF view when a user is not authenticated. """ def authenticate(self, request): user = getattr(request._request, 'user', None) self.enforce_csrf(request) return (user, None) def resolve_branch_alias(snapshot: Dict[str, Any], branch: Optional[Dict[str, Any]] ) -> Optional[Dict[str, Any]]: """ Resolve branch alias in snapshot content. Args: snapshot: a full snapshot content branch: a branch alias contained in the snapshot Returns: The real snapshot branch that got aliased. """ while branch and branch['target_type'] == 'alias': if branch['target'] in snapshot['branches']: branch = snapshot['branches'][branch['target']] else: from swh.web.common import service snp = service.lookup_snapshot( snapshot['id'], branches_from=branch['target'], branches_count=1) if snp and branch['target'] in snp['branches']: branch = snp['branches'][branch['target']] else: branch = None return branch +def get_persistent_identifier(persistent_id): + """Check if a persistent identifier is valid. + + Args: + persistent_id: A string representing a Software Heritage + persistent identifier. + + Raises: + BadInputExc: if the provided persistent identifier can + not be parsed. + + Return: + A persistent identifier object. + """ + try: + pid_object = parse_persistent_identifier(persistent_id) + except ValidationError as ve: + raise BadInputExc('Error when parsing identifier: %s' % + ' '.join(ve.messages)) + else: + return pid_object + + def group_swh_persistent_identifiers(persistent_ids): """ Groups many Software Heritage persistent identifiers into a dictionary depending on their type. Args: persistent_ids (list): a list of Software Heritage persistent - identifier + identifier objects Returns: A dictionary with: - keys: persistent identifier types - values: list(bytes) persistent identifiers id + keys: persistent identifier types + values: list(bytes) persistent identifiers id Raises: - BadInputExc: if one of the provided identifier is not valid + BadInputExc: if one of the provided persistent identifier can + not be parsed. """ pids_by_type = { CONTENT: [], DIRECTORY: [], REVISION: [], RELEASE: [], SNAPSHOT: [] } - try: - for pid in persistent_ids: - parsed_pid = parse_persistent_identifier(pid) - obj_id = parsed_pid.object_id - obj_type = parsed_pid.object_type - pids_by_type[obj_type].append(hash_to_bytes(obj_id)) - except ValidationError as v: - raise BadInputExc('Error when parsing identifier: %s' % - ' '.join(v.messages)) + for pid in persistent_ids: + obj_id = pid.object_id + obj_type = pid.object_type + pids_by_type[obj_type].append(hash_to_bytes(obj_id)) return pids_by_type diff --git a/swh/web/tests/api/views/test_identifiers.py b/swh/web/tests/api/views/test_identifiers.py index 56230f49..5d6edb0e 100644 --- a/swh/web/tests/api/views/test_identifiers.py +++ b/swh/web/tests/api/views/test_identifiers.py @@ -1,97 +1,142 @@ # Copyright (C) 2018-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information from hypothesis import given from swh.model.identifiers import ( CONTENT, DIRECTORY, RELEASE, REVISION, SNAPSHOT ) from swh.web.common.utils import reverse +from swh.web.tests.data import random_sha1 from swh.web.tests.strategies import ( content, directory, origin, release, revision, snapshot, unknown_content, unknown_directory, unknown_release, unknown_revision, unknown_snapshot ) @given(origin(), content(), directory(), release(), revision(), snapshot()) def test_swh_id_resolve_success(api_client, origin, content, directory, release, revision, snapshot): for obj_type_short, obj_type, obj_id in ( ('cnt', CONTENT, content['sha1_git']), ('dir', DIRECTORY, directory), ('rel', RELEASE, release), ('rev', REVISION, revision), ('snp', SNAPSHOT, snapshot)): swh_id = 'swh:1:%s:%s;origin=%s' % (obj_type_short, obj_id, origin['url']) url = reverse('api-1-resolve-swh-pid', url_args={'swh_id': swh_id}) resp = api_client.get(url) if obj_type == CONTENT: url_args = {'query_string': 'sha1_git:%s' % obj_id} elif obj_type == SNAPSHOT: url_args = {'snapshot_id': obj_id} else: url_args = {'sha1_git': obj_id} browse_rev_url = reverse('browse-%s' % obj_type, url_args=url_args, query_params={'origin': origin['url']}, request=resp.wsgi_request) expected_result = { 'browse_url': browse_rev_url, 'metadata': {'origin': origin['url']}, 'namespace': 'swh', 'object_id': obj_id, 'object_type': obj_type, 'scheme_version': 1 } assert resp.status_code == 200, resp.data assert resp.data == expected_result def test_swh_id_resolve_invalid(api_client): rev_id_invalid = '96db9023b8_foo_50d6c108e9a3' swh_id = 'swh:1:rev:%s' % rev_id_invalid url = reverse('api-1-resolve-swh-pid', url_args={'swh_id': swh_id}) resp = api_client.get(url) assert resp.status_code == 400, resp.data @given(unknown_content(), unknown_directory(), unknown_release(), unknown_revision(), unknown_snapshot()) def test_swh_id_resolve_not_found(api_client, unknown_content, unknown_directory, unknown_release, unknown_revision, unknown_snapshot): for obj_type_short, obj_id in (('cnt', unknown_content['sha1_git']), ('dir', unknown_directory), ('rel', unknown_release), ('rev', unknown_revision), ('snp', unknown_snapshot)): swh_id = 'swh:1:%s:%s' % (obj_type_short, obj_id) url = reverse('api-1-resolve-swh-pid', url_args={'swh_id': swh_id}) resp = api_client.get(url) assert resp.status_code == 404, resp.data def test_swh_origin_id_not_resolvable(api_client): ori_pid = 'swh:1:ori:8068d0075010b590762c6cb5682ed53cb3c13deb' url = reverse('api-1-resolve-swh-pid', url_args={'swh_id': ori_pid}) resp = api_client.get(url) assert resp.status_code == 400, resp.data + + +@given(content(), directory()) +def test_api_known_swhpid_some_present(api_client, content, directory): + content_ = 'swh:1:cnt:%s' % content['sha1_git'] + directory_ = 'swh:1:dir:%s' % directory + unknown_revision_ = 'swh:1:rev:%s' % random_sha1() + unknown_release_ = 'swh:1:rel:%s' % random_sha1() + unknown_snapshot_ = 'swh:1:snp:%s' % random_sha1() + + input_pids = [content_, directory_, unknown_revision_, + unknown_release_, unknown_snapshot_] + + url = reverse('api-1-swh-pid-known') + + resp = api_client.post(url, data=input_pids, format='json', + HTTP_ACCEPT='application/json') + + assert resp.status_code == 200, resp.data + assert resp['Content-Type'] == 'application/json' + assert resp.data == { + content_: {'known': True}, + directory_: {'known': True}, + unknown_revision_: {'known': False}, + unknown_release_: {'known': False}, + unknown_snapshot_: {'known': False} + } + + +def test_api_known_invalid_swhpid(api_client): + invalid_pid_sha1 = ['swh:1:cnt:8068d0075010b590762c6cb5682ed53cb3c13de;'] + invalid_pid_type = ['swh:1:cnn:8068d0075010b590762c6cb5682ed53cb3c13deb'] + + url = reverse('api-1-swh-pid-known') + + resp = api_client.post(url, data=invalid_pid_sha1, format='json', + HTTP_ACCEPT='application/json') + + assert resp.status_code == 400, resp.data + + resp2 = api_client.post(url, data=invalid_pid_type, format='json', + HTTP_ACCEPT='application/json') + + assert resp2.status_code == 400, resp.data diff --git a/swh/web/tests/common/test_service.py b/swh/web/tests/common/test_service.py index b772654c..a4b7c2e8 100644 --- a/swh/web/tests/common/test_service.py +++ b/swh/web/tests/common/test_service.py @@ -1,899 +1,918 @@ # Copyright (C) 2015-2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information import itertools import pytest import random from collections import defaultdict from hypothesis import given from swh.model.hashutil import hash_to_bytes, hash_to_hex from swh.model.from_disk import DentryPerms from swh.model.identifiers import ( CONTENT, DIRECTORY, RELEASE, REVISION, SNAPSHOT ) from swh.web.common import service from swh.web.common.exc import BadInputExc, NotFoundExc from swh.web.tests.data import random_sha1, random_content from swh.web.tests.strategies import ( content, unknown_content, contents, unknown_contents, contents_with_ctags, origin, new_origin, visit_dates, directory, unknown_directory, release, unknown_release, revision, unknown_revision, revisions, ancestor_revisions, non_ancestor_revisions, invalid_sha1, sha256, revision_with_submodules, empty_directory, new_revision, snapshot, unknown_snapshot ) from swh.web.tests.conftest import ctags_json_missing, fossology_missing @given(contents()) def test_lookup_multiple_hashes_all_present(contents): input_data = [] expected_output = [] for cnt in contents: input_data.append({'sha1': cnt['sha1']}) expected_output.append({'sha1': cnt['sha1'], 'found': True}) assert service.lookup_multiple_hashes(input_data) == expected_output @given(contents(), unknown_contents()) def test_lookup_multiple_hashes_some_missing(contents, unknown_contents): input_contents = list(itertools.chain(contents, unknown_contents)) random.shuffle(input_contents) input_data = [] expected_output = [] for cnt in input_contents: input_data.append({'sha1': cnt['sha1']}) expected_output.append({'sha1': cnt['sha1'], 'found': cnt in contents}) assert service.lookup_multiple_hashes(input_data) == expected_output def test_lookup_hash_does_not_exist(): unknown_content_ = random_content() actual_lookup = service.lookup_hash('sha1_git:%s' % unknown_content_['sha1_git']) assert actual_lookup == {'found': None, 'algo': 'sha1_git'} @given(content()) def test_lookup_hash_exist(archive_data, content): actual_lookup = service.lookup_hash('sha1:%s' % content['sha1']) content_metadata = archive_data.content_get_metadata(content['sha1']) assert {'found': content_metadata, 'algo': 'sha1'} == actual_lookup def test_search_hash_does_not_exist(): unknown_content_ = random_content() actual_lookup = service.search_hash('sha1_git:%s' % unknown_content_['sha1_git']) assert {'found': False} == actual_lookup @given(content()) def test_search_hash_exist(content): actual_lookup = service.search_hash('sha1:%s' % content['sha1']) assert {'found': True} == actual_lookup @pytest.mark.skipif(ctags_json_missing, reason="requires ctags with json output support") @given(contents_with_ctags()) def test_lookup_content_ctags(indexer_data, contents_with_ctags): content_sha1 = random.choice(contents_with_ctags['sha1s']) indexer_data.content_add_ctags(content_sha1) actual_ctags = list(service.lookup_content_ctags('sha1:%s' % content_sha1)) expected_data = list(indexer_data.content_get_ctags(content_sha1)) for ctag in expected_data: ctag['id'] = content_sha1 assert actual_ctags == expected_data def test_lookup_content_ctags_no_hash(): unknown_content_ = random_content() actual_ctags = list(service.lookup_content_ctags('sha1:%s' % unknown_content_['sha1'])) assert actual_ctags == [] @given(content()) def test_lookup_content_filetype(indexer_data, content): indexer_data.content_add_mimetype(content['sha1']) actual_filetype = service.lookup_content_filetype(content['sha1']) expected_filetype = indexer_data.content_get_mimetype(content['sha1']) assert actual_filetype == expected_filetype @pytest.mark.skip # Language indexer is disabled. @given(content()) def test_lookup_content_language(indexer_data, content): indexer_data.content_add_language(content['sha1']) actual_language = service.lookup_content_language(content['sha1']) expected_language = indexer_data.content_get_language(content['sha1']) assert actual_language == expected_language @given(contents_with_ctags()) def test_lookup_expression(indexer_data, contents_with_ctags): per_page = 10 expected_ctags = [] for content_sha1 in contents_with_ctags['sha1s']: if len(expected_ctags) == per_page: break indexer_data.content_add_ctags(content_sha1) for ctag in indexer_data.content_get_ctags(content_sha1): if len(expected_ctags) == per_page: break if ctag['name'] == contents_with_ctags['symbol_name']: del ctag['id'] ctag['sha1'] = content_sha1 expected_ctags.append(ctag) actual_ctags = list( service.lookup_expression(contents_with_ctags['symbol_name'], last_sha1=None, per_page=10)) assert actual_ctags == expected_ctags def test_lookup_expression_no_result(): expected_ctags = [] actual_ctags = list(service.lookup_expression('barfoo', last_sha1=None, per_page=10)) assert actual_ctags == expected_ctags @pytest.mark.skipif(fossology_missing, reason="requires fossology-nomossa installed") @given(content()) def test_lookup_content_license(indexer_data, content): indexer_data.content_add_license(content['sha1']) actual_license = service.lookup_content_license(content['sha1']) expected_license = indexer_data.content_get_license(content['sha1']) assert actual_license == expected_license def test_stat_counters(archive_data): actual_stats = service.stat_counters() assert actual_stats == archive_data.stat_counters() @given(new_origin(), visit_dates()) def test_lookup_origin_visits(archive_data, new_origin, visit_dates): archive_data.origin_add_one(new_origin) for ts in visit_dates: archive_data.origin_visit_add( new_origin['url'], ts, type='git') actual_origin_visits = list( service.lookup_origin_visits(new_origin['url'], per_page=100)) expected_visits = archive_data.origin_visit_get(new_origin['url']) for expected_visit in expected_visits: expected_visit['origin'] = new_origin['url'] assert actual_origin_visits == expected_visits @given(new_origin(), visit_dates()) def test_lookup_origin_visit(archive_data, new_origin, visit_dates): archive_data.origin_add_one(new_origin) visits = [] for ts in visit_dates: visits.append(archive_data.origin_visit_add( new_origin['url'], ts, type='git')) visit = random.choice(visits)['visit'] actual_origin_visit = service.lookup_origin_visit( new_origin['url'], visit) expected_visit = dict(archive_data.origin_visit_get_by( new_origin['url'], visit)) assert actual_origin_visit == expected_visit @given(new_origin()) def test_lookup_origin(archive_data, new_origin): archive_data.origin_add_one(new_origin) actual_origin = service.lookup_origin({'url': new_origin['url']}) expected_origin = archive_data.origin_get( {'url': new_origin['url']}) assert actual_origin == expected_origin @given(invalid_sha1()) def test_lookup_release_ko_id_checksum_not_a_sha1(invalid_sha1): with pytest.raises(BadInputExc) as e: service.lookup_release(invalid_sha1) assert e.match('Invalid checksum') @given(sha256()) def test_lookup_release_ko_id_checksum_too_long(sha256): with pytest.raises(BadInputExc) as e: service.lookup_release(sha256) assert e.match('Only sha1_git is supported.') @given(directory()) def test_lookup_directory_with_path_not_found(directory): path = 'some/invalid/path/here' with pytest.raises(NotFoundExc) as e: service.lookup_directory_with_path(directory, path) assert e.match('Directory entry with path %s from %s not found' % (path, directory)) @given(directory()) def test_lookup_directory_with_path_found(archive_data, directory): directory_content = archive_data.directory_ls(directory) directory_entry = random.choice(directory_content) path = directory_entry['name'] actual_result = service.lookup_directory_with_path(directory, path) assert actual_result == directory_entry @given(release()) def test_lookup_release(archive_data, release): actual_release = service.lookup_release(release) assert actual_release == archive_data.release_get(release) @given(revision(), invalid_sha1(), sha256()) def test_lookup_revision_with_context_ko_not_a_sha1(revision, invalid_sha1, sha256): sha1_git_root = revision sha1_git = invalid_sha1 with pytest.raises(BadInputExc) as e: service.lookup_revision_with_context(sha1_git_root, sha1_git) assert e.match('Invalid checksum query string') sha1_git = sha256 with pytest.raises(BadInputExc) as e: service.lookup_revision_with_context(sha1_git_root, sha1_git) assert e.match('Only sha1_git is supported') @given(revision(), unknown_revision()) def test_lookup_revision_with_context_ko_sha1_git_does_not_exist( revision, unknown_revision): sha1_git_root = revision sha1_git = unknown_revision with pytest.raises(NotFoundExc) as e: service.lookup_revision_with_context(sha1_git_root, sha1_git) assert e.match('Revision %s not found' % sha1_git) @given(revision(), unknown_revision()) def test_lookup_revision_with_context_ko_root_sha1_git_does_not_exist( revision, unknown_revision): sha1_git_root = unknown_revision sha1_git = revision with pytest.raises(NotFoundExc) as e: service.lookup_revision_with_context(sha1_git_root, sha1_git) assert e.match('Revision root %s not found' % sha1_git_root) @given(ancestor_revisions()) def test_lookup_revision_with_context(archive_data, ancestor_revisions): sha1_git = ancestor_revisions['sha1_git'] root_sha1_git = ancestor_revisions['sha1_git_root'] for sha1_git_root in (root_sha1_git, {'id': hash_to_bytes(root_sha1_git)}): actual_revision = service.lookup_revision_with_context(sha1_git_root, sha1_git) children = [] for rev in archive_data.revision_log(root_sha1_git): for p_rev in rev['parents']: p_rev_hex = hash_to_hex(p_rev) if p_rev_hex == sha1_git: children.append(rev['id']) expected_revision = archive_data.revision_get(sha1_git) expected_revision['children'] = children assert actual_revision == expected_revision @given(non_ancestor_revisions()) def test_lookup_revision_with_context_ko(non_ancestor_revisions): sha1_git = non_ancestor_revisions['sha1_git'] root_sha1_git = non_ancestor_revisions['sha1_git_root'] with pytest.raises(NotFoundExc) as e: service.lookup_revision_with_context(root_sha1_git, sha1_git) assert e.match('Revision %s is not an ancestor of %s' % (sha1_git, root_sha1_git)) def test_lookup_directory_with_revision_not_found(): unknown_revision_ = random_sha1() with pytest.raises(NotFoundExc) as e: service.lookup_directory_with_revision(unknown_revision_) assert e.match('Revision %s not found' % unknown_revision_) def test_lookup_directory_with_revision_unknown_content(archive_data): unknown_content_ = random_content() unknown_revision_ = random_sha1() unknown_directory_ = random_sha1() dir_path = 'README.md' # Create a revision that points to a directory # Which points to unknown content revision = { 'author': { 'name': b'abcd', 'email': b'abcd@company.org', 'fullname': b'abcd abcd' }, 'committer': { 'email': b'aaaa@company.org', 'fullname': b'aaaa aaa', 'name': b'aaa' }, 'committer_date': { 'negative_utc': False, 'offset': 0, 'timestamp': 1437511651 }, 'date': { 'negative_utc': False, 'offset': 0, 'timestamp': 1437511651 }, 'message': b'bleh', 'metadata': [], 'parents': [], 'synthetic': False, 'type': 'git', 'id': hash_to_bytes(unknown_revision_), 'directory': hash_to_bytes(unknown_directory_) } # A directory that points to unknown content dir = { 'id': hash_to_bytes(unknown_directory_), 'entries': [{ 'name': bytes(dir_path.encode('utf-8')), 'type': 'file', 'target': hash_to_bytes(unknown_content_['sha1_git']), 'perms': DentryPerms.content }] } # Add the directory and revision in mem archive_data.directory_add([dir]) archive_data.revision_add([revision]) with pytest.raises(NotFoundExc) as e: service.lookup_directory_with_revision(unknown_revision_, dir_path) assert e.match('Content not found for revision %s' % unknown_revision_) @given(revision()) def test_lookup_directory_with_revision_ko_path_to_nowhere(revision): invalid_path = 'path/to/something/unknown' with pytest.raises(NotFoundExc) as e: service.lookup_directory_with_revision(revision, invalid_path) assert e.match('Directory or File') assert e.match(invalid_path) assert e.match('revision %s' % revision) assert e.match('not found') @given(revision_with_submodules()) def test_lookup_directory_with_revision_submodules(archive_data, revision_with_submodules): rev_sha1_git = revision_with_submodules['rev_sha1_git'] rev_dir_path = revision_with_submodules['rev_dir_rev_path'] actual_data = service.lookup_directory_with_revision( rev_sha1_git, rev_dir_path) revision = archive_data.revision_get( revision_with_submodules['rev_sha1_git']) directory = archive_data.directory_ls(revision['directory']) rev_entry = next(e for e in directory if e['name'] == rev_dir_path) expected_data = { 'content': archive_data.revision_get(rev_entry['target']), 'path': rev_dir_path, 'revision': rev_sha1_git, 'type': 'rev' } assert actual_data == expected_data @given(revision()) def test_lookup_directory_with_revision_without_path(archive_data, revision): actual_directory_entries = service.lookup_directory_with_revision(revision) revision_data = archive_data.revision_get(revision) expected_directory_entries = archive_data.directory_ls( revision_data['directory']) assert actual_directory_entries['type'] == 'dir' assert actual_directory_entries['content'] == expected_directory_entries @given(revision()) def test_lookup_directory_with_revision_with_path(archive_data, revision): rev_data = archive_data.revision_get(revision) dir_entries = [e for e in archive_data.directory_ls(rev_data['directory']) if e['type'] in ('file', 'dir')] expected_dir_entry = random.choice(dir_entries) actual_dir_entry = service.lookup_directory_with_revision( revision, expected_dir_entry['name']) assert actual_dir_entry['type'] == expected_dir_entry['type'] assert actual_dir_entry['revision'] == revision assert actual_dir_entry['path'] == expected_dir_entry['name'] if actual_dir_entry['type'] == 'file': del actual_dir_entry['content']['checksums']['blake2s256'] for key in ('checksums', 'status', 'length'): assert actual_dir_entry['content'][key] == expected_dir_entry[key] else: sub_dir_entries = archive_data.directory_ls( expected_dir_entry['target']) assert actual_dir_entry['content'] == sub_dir_entries @given(revision()) def test_lookup_directory_with_revision_with_path_to_file_and_data( archive_data, revision): rev_data = archive_data.revision_get(revision) dir_entries = [e for e in archive_data.directory_ls(rev_data['directory']) if e['type'] == 'file'] expected_dir_entry = random.choice(dir_entries) expected_data = archive_data.content_get( expected_dir_entry['checksums']['sha1']) actual_dir_entry = service.lookup_directory_with_revision( revision, expected_dir_entry['name'], with_data=True) assert actual_dir_entry['type'] == expected_dir_entry['type'] assert actual_dir_entry['revision'] == revision assert actual_dir_entry['path'] == expected_dir_entry['name'] del actual_dir_entry['content']['checksums']['blake2s256'] for key in ('checksums', 'status', 'length'): assert actual_dir_entry['content'][key] == expected_dir_entry[key] assert actual_dir_entry['content']['data'] == expected_data['data'] @given(revision()) def test_lookup_revision(archive_data, revision): actual_revision = service.lookup_revision(revision) assert actual_revision == archive_data.revision_get(revision) @given(new_revision()) def test_lookup_revision_invalid_msg(archive_data, new_revision): new_revision['message'] = b'elegant fix for bug \xff' archive_data.revision_add([new_revision]) revision = service.lookup_revision(hash_to_hex(new_revision['id'])) assert revision['message'] is None assert revision['message_decoding_failed'] is True @given(new_revision()) def test_lookup_revision_msg_ok(archive_data, new_revision): archive_data.revision_add([new_revision]) revision_message = service.lookup_revision_message( hash_to_hex(new_revision['id'])) assert revision_message == {'message': new_revision['message']} def test_lookup_revision_msg_no_rev(): unknown_revision_ = random_sha1() with pytest.raises(NotFoundExc) as e: service.lookup_revision_message(unknown_revision_) assert e.match('Revision with sha1_git %s not found.' % unknown_revision_) @given(revisions()) def test_lookup_revision_multiple(archive_data, revisions): actual_revisions = list(service.lookup_revision_multiple(revisions)) expected_revisions = [] for rev in revisions: expected_revisions.append(archive_data.revision_get(rev)) assert actual_revisions == expected_revisions def test_lookup_revision_multiple_none_found(): unknown_revisions_ = [random_sha1(), random_sha1(), random_sha1()] actual_revisions = list( service.lookup_revision_multiple(unknown_revisions_)) assert actual_revisions == [None] * len(unknown_revisions_) @given(revision()) def test_lookup_revision_log(archive_data, revision): actual_revision_log = list(service.lookup_revision_log(revision, limit=25)) expected_revision_log = archive_data.revision_log(revision, limit=25) assert actual_revision_log == expected_revision_log def _get_origin_branches(archive_data, origin): origin_visit = archive_data.origin_visit_get(origin['url'])[-1] snapshot = archive_data.snapshot_get(origin_visit['snapshot']) branches = {k: v for (k, v) in snapshot['branches'].items() if v['target_type'] == 'revision'} return branches @given(origin()) def test_lookup_revision_log_by(archive_data, origin): branches = _get_origin_branches(archive_data, origin) branch_name = random.choice(list(branches.keys())) actual_log = list( service.lookup_revision_log_by(origin['url'], branch_name, None, limit=25)) expected_log = archive_data.revision_log( branches[branch_name]['target'], limit=25) assert actual_log == expected_log @given(origin()) def test_lookup_revision_log_by_notfound(origin): with pytest.raises(NotFoundExc): service.lookup_revision_log_by( origin['url'], 'unknown_branch_name', None, limit=100) def test_lookup_content_raw_not_found(): unknown_content_ = random_content() with pytest.raises(NotFoundExc) as e: service.lookup_content_raw('sha1:' + unknown_content_['sha1']) assert e.match('Content with %s checksum equals to %s not found!' % ('sha1', unknown_content_['sha1'])) @given(content()) def test_lookup_content_raw(archive_data, content): actual_content = service.lookup_content_raw( 'sha256:%s' % content['sha256']) expected_content = archive_data.content_get(content['sha1']) assert actual_content == expected_content def test_lookup_content_not_found(): unknown_content_ = random_content() with pytest.raises(NotFoundExc) as e: service.lookup_content('sha1:%s' % unknown_content_['sha1']) assert e.match('Content with %s checksum equals to %s not found!' % ('sha1', unknown_content_['sha1'])) @given(content()) def test_lookup_content_with_sha1(archive_data, content): actual_content = service.lookup_content('sha1:%s' % content['sha1']) expected_content = archive_data.content_get_metadata(content['sha1']) assert actual_content == expected_content @given(content()) def test_lookup_content_with_sha256(archive_data, content): actual_content = service.lookup_content('sha256:%s' % content['sha256']) expected_content = archive_data.content_get_metadata(content['sha1']) assert actual_content == expected_content def test_lookup_directory_bad_checksum(): with pytest.raises(BadInputExc): service.lookup_directory('directory_id') def test_lookup_directory_not_found(): unknown_directory_ = random_sha1() with pytest.raises(NotFoundExc) as e: service.lookup_directory(unknown_directory_) assert e.match('Directory with sha1_git %s not found' % unknown_directory_) @given(directory()) def test_lookup_directory(archive_data, directory): actual_directory_ls = list(service.lookup_directory(directory)) expected_directory_ls = archive_data.directory_ls(directory) assert actual_directory_ls == expected_directory_ls @given(empty_directory()) def test_lookup_directory_empty(empty_directory): actual_directory_ls = list(service.lookup_directory(empty_directory)) assert actual_directory_ls == [] @given(origin()) def test_lookup_revision_by_nothing_found(origin): with pytest.raises(NotFoundExc): service.lookup_revision_by(origin['url'], 'invalid-branch-name') @given(origin()) def test_lookup_revision_by(archive_data, origin): branches = _get_origin_branches(archive_data, origin) branch_name = random.choice(list(branches.keys())) actual_revision = service.lookup_revision_by(origin['url'], branch_name) expected_revision = archive_data.revision_get( branches[branch_name]['target']) assert actual_revision == expected_revision @given(origin(), revision()) def test_lookup_revision_with_context_by_ko(origin, revision): with pytest.raises(NotFoundExc): service.lookup_revision_with_context_by(origin['url'], 'invalid-branch-name', None, revision) @given(origin()) def test_lookup_revision_with_context_by(archive_data, origin): branches = _get_origin_branches(archive_data, origin) branch_name = random.choice(list(branches.keys())) root_rev = branches[branch_name]['target'] root_rev_log = archive_data.revision_log(root_rev) children = defaultdict(list) for rev in root_rev_log: for rev_p in rev['parents']: children[rev_p].append(rev['id']) rev = root_rev_log[-1]['id'] actual_root_rev, actual_rev = service.lookup_revision_with_context_by( origin['url'], branch_name, None, rev) expected_root_rev = archive_data.revision_get(root_rev) expected_rev = archive_data.revision_get(rev) expected_rev['children'] = children[rev] assert actual_root_rev == expected_root_rev assert actual_rev == expected_rev def test_lookup_revision_through_ko_not_implemented(): with pytest.raises(NotImplementedError): service.lookup_revision_through({'something-unknown': 10}) @given(origin()) def test_lookup_revision_through_with_context_by(archive_data, origin): branches = _get_origin_branches(archive_data, origin) branch_name = random.choice(list(branches.keys())) root_rev = branches[branch_name]['target'] root_rev_log = archive_data.revision_log(root_rev) rev = root_rev_log[-1]['id'] assert service.lookup_revision_through({ 'origin_url': origin['url'], 'branch_name': branch_name, 'ts': None, 'sha1_git': rev }) == service.lookup_revision_with_context_by(origin['url'], branch_name, None, rev) @given(origin()) def test_lookup_revision_through_with_revision_by(archive_data, origin): branches = _get_origin_branches(archive_data, origin) branch_name = random.choice(list(branches.keys())) assert service.lookup_revision_through({ 'origin_url': origin['url'], 'branch_name': branch_name, 'ts': None, }) == service.lookup_revision_by(origin['url'], branch_name, None) @given(ancestor_revisions()) def test_lookup_revision_through_with_context(ancestor_revisions): sha1_git = ancestor_revisions['sha1_git'] sha1_git_root = ancestor_revisions['sha1_git_root'] assert service.lookup_revision_through({ 'sha1_git_root': sha1_git_root, 'sha1_git': sha1_git, }) == service.lookup_revision_with_context(sha1_git_root, sha1_git) @given(revision()) def test_lookup_revision_through_with_revision(revision): assert service.lookup_revision_through({ 'sha1_git': revision }) == service.lookup_revision(revision) @given(revision()) def test_lookup_directory_through_revision_ko_not_found(revision): with pytest.raises(NotFoundExc): service.lookup_directory_through_revision( {'sha1_git': revision}, 'some/invalid/path') @given(revision()) def test_lookup_directory_through_revision_ok(archive_data, revision): rev_data = archive_data.revision_get(revision) dir_entries = [e for e in archive_data.directory_ls(rev_data['directory']) if e['type'] == 'file'] dir_entry = random.choice(dir_entries) assert service.lookup_directory_through_revision( {'sha1_git': revision}, dir_entry['name'] ) == (revision, service.lookup_directory_with_revision(revision, dir_entry['name'])) @given(revision()) def test_lookup_directory_through_revision_ok_with_data( archive_data, revision): rev_data = archive_data.revision_get(revision) dir_entries = [e for e in archive_data.directory_ls(rev_data['directory']) if e['type'] == 'file'] dir_entry = random.choice(dir_entries) assert service.lookup_directory_through_revision( {'sha1_git': revision}, dir_entry['name'], with_data=True ) == (revision, service.lookup_directory_with_revision(revision, dir_entry['name'], with_data=True)) @given(content(), directory(), release(), revision(), snapshot()) def test_lookup_known_objects(archive_data, content, directory, release, revision, snapshot): expected = archive_data.content_find(content) assert service.lookup_object(CONTENT, content['sha1_git']) == expected expected = archive_data.directory_get(directory) assert service.lookup_object(DIRECTORY, directory) == expected expected = archive_data.release_get(release) assert service.lookup_object(RELEASE, release) == expected expected = archive_data.revision_get(revision) assert service.lookup_object(REVISION, revision) == expected expected = archive_data.snapshot_get(snapshot) assert service.lookup_object(SNAPSHOT, snapshot) == expected @given(unknown_content(), unknown_directory(), unknown_release(), unknown_revision(), unknown_snapshot()) def test_lookup_unknown_objects(unknown_content, unknown_directory, unknown_release, unknown_revision, unknown_snapshot): with pytest.raises(NotFoundExc) as e: service.lookup_object(CONTENT, unknown_content['sha1_git']) assert e.match(r'Content.*not found') with pytest.raises(NotFoundExc) as e: service.lookup_object(DIRECTORY, unknown_directory) assert e.match(r'Directory.*not found') with pytest.raises(NotFoundExc) as e: service.lookup_object(RELEASE, unknown_release) assert e.match(r'Release.*not found') with pytest.raises(NotFoundExc) as e: service.lookup_object(REVISION, unknown_revision) assert e.match(r'Revision.*not found') with pytest.raises(NotFoundExc) as e: service.lookup_object(SNAPSHOT, unknown_snapshot) assert e.match(r'Snapshot.*not found') @given(invalid_sha1()) def test_lookup_invalid_objects(invalid_sha1): with pytest.raises(BadInputExc) as e: service.lookup_object('foo', invalid_sha1) assert e.match('Invalid swh object type') with pytest.raises(BadInputExc) as e: service.lookup_object(CONTENT, invalid_sha1) assert e.match('Invalid hash') with pytest.raises(BadInputExc) as e: service.lookup_object(DIRECTORY, invalid_sha1) assert e.match('Invalid checksum') with pytest.raises(BadInputExc) as e: service.lookup_object(RELEASE, invalid_sha1) assert e.match('Invalid checksum') with pytest.raises(BadInputExc) as e: service.lookup_object(REVISION, invalid_sha1) assert e.match('Invalid checksum') with pytest.raises(BadInputExc) as e: service.lookup_object(SNAPSHOT, invalid_sha1) assert e.match('Invalid checksum') -def test_lookup_missing_hashes(): +def test_lookup_missing_hashes_non_present(): missing_cnt = random_sha1() missing_dir = random_sha1() missing_rev = random_sha1() missing_rel = random_sha1() missing_snp = random_sha1() grouped_pids = { CONTENT: [hash_to_bytes(missing_cnt)], DIRECTORY: [hash_to_bytes(missing_dir)], REVISION: [hash_to_bytes(missing_rev)], RELEASE: [hash_to_bytes(missing_rel)], SNAPSHOT: [hash_to_bytes(missing_snp)], } actual_result = service.lookup_missing_hashes(grouped_pids) assert actual_result == {missing_cnt, missing_dir, missing_rev, missing_rel, missing_snp} + + +@given(content(), directory()) +def test_lookup_missing_hashes_some_present(archive_data, content, directory): + missing_rev = random_sha1() + missing_rel = random_sha1() + missing_snp = random_sha1() + + grouped_pids = { + CONTENT: [hash_to_bytes(content['sha1_git'])], + DIRECTORY: [hash_to_bytes(directory)], + REVISION: [hash_to_bytes(missing_rev)], + RELEASE: [hash_to_bytes(missing_rel)], + SNAPSHOT: [hash_to_bytes(missing_snp)], + } + + actual_result = service.lookup_missing_hashes(grouped_pids) + + assert actual_result == {missing_rev, missing_rel, missing_snp} diff --git a/swh/web/tests/common/test_utils.py b/swh/web/tests/common/test_utils.py index 6c2ea949..b1ca03f1 100644 --- a/swh/web/tests/common/test_utils.py +++ b/swh/web/tests/common/test_utils.py @@ -1,126 +1,114 @@ # Copyright (C) 2017-2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information import datetime import pytest from swh.web.common import utils from swh.web.common.exc import BadInputExc def test_shorten_path_noop(): noops = [ '/api/', '/browse/', '/content/symbol/foobar/' ] for noop in noops: assert utils.shorten_path(noop) == noop def test_shorten_path_sha1(): sha1 = 'aafb16d69fd30ff58afdd69036a26047f3aebdc6' short_sha1 = sha1[:8] + '...' templates = [ '/api/1/content/sha1:%s/', '/api/1/content/sha1_git:%s/', '/api/1/directory/%s/', '/api/1/content/sha1:%s/ctags/', ] for template in templates: assert utils.shorten_path(template % sha1) == template % short_sha1 def test_shorten_path_sha256(): sha256 = ('aafb16d69fd30ff58afdd69036a26047' '213add102934013a014dfca031c41aef') short_sha256 = sha256[:8] + '...' templates = [ '/api/1/content/sha256:%s/', '/api/1/directory/%s/', '/api/1/content/sha256:%s/filetype/', ] for template in templates: assert utils.shorten_path(template % sha256) == template % short_sha256 def test_parse_timestamp(): input_timestamps = [ None, '2016-01-12', '2016-01-12T09:19:12+0100', 'Today is January 1, 2047 at 8:21:00AM', '1452591542', ] output_dates = [ None, datetime.datetime(2016, 1, 12, 0, 0), datetime.datetime(2016, 1, 12, 8, 19, 12, tzinfo=datetime.timezone.utc), datetime.datetime(2047, 1, 1, 8, 21), datetime.datetime(2016, 1, 12, 9, 39, 2, tzinfo=datetime.timezone.utc), ] for ts, exp_date in zip(input_timestamps, output_dates): assert utils.parse_timestamp(ts) == exp_date def test_format_utc_iso_date(): assert (utils.format_utc_iso_date('2017-05-04T13:27:13+02:00') == '04 May 2017, 11:27 UTC') def test_gen_path_info(): input_path = '/home/user/swh-environment/swh-web/' expected_result = [ {'name': 'home', 'path': 'home'}, {'name': 'user', 'path': 'home/user'}, {'name': 'swh-environment', 'path': 'home/user/swh-environment'}, {'name': 'swh-web', 'path': 'home/user/swh-environment/swh-web'} ] path_info = utils.gen_path_info(input_path) assert path_info == expected_result input_path = 'home/user/swh-environment/swh-web' path_info = utils.gen_path_info(input_path) assert path_info == expected_result def test_get_swh_persistent_id(): swh_object_type = 'content' sha1_git = 'aafb16d69fd30ff58afdd69036a26047f3aebdc6' expected_swh_id = 'swh:1:cnt:' + sha1_git assert (utils.get_swh_persistent_id(swh_object_type, sha1_git) == expected_swh_id) with pytest.raises(BadInputExc) as e: utils.get_swh_persistent_id('foo', sha1_git) assert e.match('Invalid object') with pytest.raises(BadInputExc) as e: utils.get_swh_persistent_id(swh_object_type, 'not a valid id') assert e.match('Invalid object') - - -def test_group_swh_persistent_identifiers_bad_input(): - sha1_git = 'aafb16d69fd30ff58afdd69036a26047f3aebdc6' - invalid_pid_sha1 = ['swh:1:cnt:aafb16d69fd30ff58afdd69036a26047f3aebdc;'] - invalid_pid_type = ['swh:1:dri:%s' % sha1_git] - - with pytest.raises(BadInputExc): - utils.group_swh_persistent_identifiers(invalid_pid_sha1) - - with pytest.raises(BadInputExc): - utils.group_swh_persistent_identifiers(invalid_pid_type)