diff --git a/swh/web/common/identifiers.py b/swh/web/common/identifiers.py index c56c0b95..3679958e 100644 --- a/swh/web/common/identifiers.py +++ b/swh/web/common/identifiers.py @@ -1,184 +1,180 @@ # Copyright (C) 2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information from typing import Dict, Iterable, List, Optional from typing_extensions import TypedDict from django.http import QueryDict from swh.model.exceptions import ValidationError from swh.model.hashutil import hash_to_bytes from swh.model.identifiers import ( persistent_identifier, parse_persistent_identifier, CONTENT, DIRECTORY, ORIGIN, RELEASE, REVISION, SNAPSHOT, PersistentId ) from swh.web.common.exc import BadInputExc from swh.web.common.typing import QueryParameters from swh.web.common.utils import reverse def get_swh_persistent_id(object_type: str, object_id: str, scheme_version: int = 1) -> str: """ Returns the persistent identifier for a swh object based on: * the object type * the object id * the swh identifiers scheme version Args: object_type: the swh object type (content/directory/release/revision/snapshot) object_id: the swh object id (hexadecimal representation of its hash value) scheme_version: the scheme version of the swh persistent identifiers Returns: the swh object persistent identifier Raises: BadInputExc: if the provided parameters do not enable to generate a valid identifier """ try: swh_id = persistent_identifier(object_type, object_id, scheme_version) except ValidationError as e: raise BadInputExc('Invalid object (%s) for swh persistent id. %s' % (object_id, e)) else: return swh_id ResolvedPersistentId = TypedDict('ResolvedPersistentId', { 'swh_id_parsed': PersistentId, 'browse_url': Optional[str] }) def resolve_swh_persistent_id(swh_id: str, query_params: Optional[QueryParameters] = None ) -> ResolvedPersistentId: """ Try to resolve a Software Heritage persistent id into an url for browsing the targeted object. Args: swh_id: a Software Heritage persistent identifier query_params: optional dict filled with query parameters to append to the browse url Returns: a dict with the following keys: * **swh_id_parsed**: the parsed identifier * **browse_url**: the url for browsing the targeted object """ swh_id_parsed = get_persistent_identifier(swh_id) object_type = swh_id_parsed.object_type object_id = swh_id_parsed.object_id browse_url = None query_dict = QueryDict('', mutable=True) if query_params and len(query_params) > 0: for k in sorted(query_params.keys()): query_dict[k] = query_params[k] if 'origin' in swh_id_parsed.metadata: query_dict['origin'] = swh_id_parsed.metadata['origin'] if object_type == CONTENT: query_string = 'sha1_git:' + object_id fragment = '' if 'lines' in swh_id_parsed.metadata: lines = swh_id_parsed.metadata['lines'].split('-') fragment += '#L' + lines[0] if len(lines) > 1: fragment += '-L' + lines[1] browse_url = reverse('browse-content', url_args={'query_string': query_string}, query_params=query_dict) + fragment elif object_type == DIRECTORY: browse_url = reverse('browse-directory', url_args={'sha1_git': object_id}, query_params=query_dict) elif object_type == RELEASE: browse_url = reverse('browse-release', url_args={'sha1_git': object_id}, query_params=query_dict) elif object_type == REVISION: browse_url = reverse('browse-revision', url_args={'sha1_git': object_id}, query_params=query_dict) elif object_type == SNAPSHOT: browse_url = reverse('browse-snapshot', url_args={'snapshot_id': object_id}, query_params=query_dict) elif object_type == ORIGIN: raise BadInputExc(('Origin PIDs (Persistent Identifiers) are not ' 'publicly resolvable because they are for ' 'internal usage only')) return { 'swh_id_parsed': swh_id_parsed, 'browse_url': browse_url } def get_persistent_identifier(persistent_id: str) -> PersistentId: """Check if a persistent identifier is valid. Args: persistent_id: A string representing a Software Heritage persistent identifier. Raises: BadInputExc: if the provided persistent identifier can not be parsed. Return: A persistent identifier object. """ try: pid_object = parse_persistent_identifier(persistent_id) except ValidationError as ve: raise BadInputExc('Error when parsing identifier: %s' % ' '.join(ve.messages)) else: return pid_object def group_swh_persistent_identifiers(persistent_ids: Iterable[PersistentId] ) -> Dict[str, List[bytes]]: """ Groups many Software Heritage persistent identifiers into a dictionary depending on their type. Args: persistent_ids: an iterable of Software Heritage persistent identifier objects Returns: A dictionary with: keys: persistent identifier types values: persistent identifiers id - - Raises: - BadInputExc: if one of the provided persistent identifier can - not be parsed. """ pids_by_type: Dict[str, List[bytes]] = { CONTENT: [], DIRECTORY: [], REVISION: [], RELEASE: [], SNAPSHOT: [] } for pid in persistent_ids: obj_id = pid.object_id obj_type = pid.object_type pids_by_type[obj_type].append(hash_to_bytes(obj_id)) return pids_by_type diff --git a/swh/web/tests/common/test_identifiers.py b/swh/web/tests/common/test_identifiers.py index 9fb21ce9..1a90157a 100644 --- a/swh/web/tests/common/test_identifiers.py +++ b/swh/web/tests/common/test_identifiers.py @@ -1,27 +1,114 @@ # Copyright (C) 2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information +from hypothesis import given + import pytest +from swh.model.hashutil import hash_to_bytes +from swh.model.identifiers import ( + CONTENT, DIRECTORY, RELEASE, REVISION, SNAPSHOT, + PersistentId +) + from swh.web.common.exc import BadInputExc -from swh.web.common.identifiers import get_swh_persistent_id +from swh.web.common.identifiers import ( + get_swh_persistent_id, resolve_swh_persistent_id, + get_persistent_identifier, group_swh_persistent_identifiers +) +from swh.web.common.utils import reverse +from swh.web.tests.data import random_sha1 +from swh.web.tests.strategies import ( + content, directory, release, revision, snapshot +) -def test_get_swh_persistent_id(): - swh_object_type = 'content' - sha1_git = 'aafb16d69fd30ff58afdd69036a26047f3aebdc6' +@given(content()) +def test_get_swh_persistent_id(content): + swh_object_type = CONTENT + sha1_git = content['sha1_git'] expected_swh_id = 'swh:1:cnt:' + sha1_git assert get_swh_persistent_id( swh_object_type, sha1_git) == expected_swh_id with pytest.raises(BadInputExc) as e: get_swh_persistent_id('foo', sha1_git) assert e.match('Invalid object') with pytest.raises(BadInputExc) as e: get_swh_persistent_id(swh_object_type, 'not a valid id') assert e.match('Invalid object') + + +@given(content(), directory(), release(), revision(), snapshot()) +def test_resolve_swh_persistent_id(content, directory, release, revision, + snapshot): + for obj_type, obj_id in ((CONTENT, content['sha1_git']), + (DIRECTORY, directory), + (RELEASE, release), + (REVISION, revision), + (SNAPSHOT, snapshot)): + + swh_pid = get_swh_persistent_id(obj_type, obj_id) + + url_args = {} + if obj_type == CONTENT: + url_args['query_string'] = f'sha1_git:{obj_id}' + elif obj_type == SNAPSHOT: + url_args['snapshot_id'] = obj_id + else: + url_args['sha1_git'] = obj_id + query_params = {'origin': 'some-origin'} + browse_url = reverse(f'browse-{obj_type}', url_args=url_args, + query_params=query_params) + + resolved_pid = resolve_swh_persistent_id(swh_pid, query_params) + + assert isinstance(resolved_pid['swh_id_parsed'], PersistentId) + assert str(resolved_pid['swh_id_parsed']) == swh_pid + assert resolved_pid['browse_url'] == browse_url + + with pytest.raises(BadInputExc, match='Origin PIDs'): + resolve_swh_persistent_id(f'swh:1:ori:{random_sha1()}') + + +@given(content(), directory(), release(), revision(), snapshot()) +def test_get_persistent_identifier(content, directory, release, revision, + snapshot): + for obj_type, obj_id in ((CONTENT, content['sha1_git']), + (DIRECTORY, directory), + (RELEASE, release), + (REVISION, revision), + (SNAPSHOT, snapshot)): + swh_pid = get_swh_persistent_id(obj_type, obj_id) + swh_parsed_pid = get_persistent_identifier(swh_pid) + + assert isinstance(swh_parsed_pid, PersistentId) + assert str(swh_parsed_pid) == swh_pid + + with pytest.raises(BadInputExc, match='Error when parsing identifier'): + get_persistent_identifier('foo') + + +@given(content(), directory(), release(), revision(), snapshot()) +def test_group_persistent_identifiers(content, directory, release, revision, + snapshot): + swh_pids = [] + expected = {} + for obj_type, obj_id in ((CONTENT, content['sha1_git']), + (DIRECTORY, directory), + (RELEASE, release), + (REVISION, revision), + (SNAPSHOT, snapshot)): + swh_pid = get_swh_persistent_id(obj_type, obj_id) + swh_pid = get_persistent_identifier(swh_pid) + swh_pids.append(swh_pid) + expected[obj_type] = [hash_to_bytes(obj_id)] + + pid_groups = group_swh_persistent_identifiers(swh_pids) + + assert pid_groups == expected