Changeset View
Changeset View
Standalone View
Standalone View
swh/web/common/utils.py
Show All 15 Lines | |||||
from django.urls import reverse as django_reverse | from django.urls import reverse as django_reverse | ||||
from django.http import QueryDict, HttpRequest | from django.http import QueryDict, HttpRequest | ||||
from prometheus_client.registry import CollectorRegistry | from prometheus_client.registry import CollectorRegistry | ||||
from rest_framework.authentication import SessionAuthentication | from rest_framework.authentication import SessionAuthentication | ||||
from swh.model.exceptions import ValidationError | from swh.model.exceptions import ValidationError | ||||
from swh.model.hashutil import hash_to_bytes | |||||
from swh.model.identifiers import ( | from swh.model.identifiers import ( | ||||
persistent_identifier, parse_persistent_identifier, | persistent_identifier, parse_persistent_identifier, | ||||
CONTENT, DIRECTORY, ORIGIN, RELEASE, REVISION, SNAPSHOT | CONTENT, DIRECTORY, ORIGIN, RELEASE, REVISION, SNAPSHOT | ||||
) | ) | ||||
from swh.web.common.exc import BadInputExc | from swh.web.common.exc import BadInputExc | ||||
from swh.web.config import get_config | from swh.web.config import get_config | ||||
▲ Show 20 Lines • Show All 207 Lines • ▼ Show 20 Lines | Args: | ||||
query parameters to append to the browse url | query parameters to append to the browse url | ||||
Returns: | Returns: | ||||
dict: a dict with the following keys: | dict: a dict with the following keys: | ||||
* **swh_id_parsed (swh.model.identifiers.PersistentId)**: | * **swh_id_parsed (swh.model.identifiers.PersistentId)**: | ||||
the parsed identifier | the parsed identifier | ||||
* **browse_url (str)**: the url for browsing the pointed object | * **browse_url (str)**: the url for browsing the pointed object | ||||
Raises: | |||||
BadInputExc: if the provided identifier can not be parsed | |||||
""" | """ | ||||
try: | swh_id_parsed = get_persistent_identifier(swh_id) | ||||
swh_id_parsed = parse_persistent_identifier(swh_id) | |||||
object_type = swh_id_parsed.object_type | object_type = swh_id_parsed.object_type | ||||
object_id = swh_id_parsed.object_id | object_id = swh_id_parsed.object_id | ||||
browse_url = None | browse_url = None | ||||
query_dict = QueryDict('', mutable=True) | query_dict = QueryDict('', mutable=True) | ||||
if query_params and len(query_params) > 0: | if query_params and len(query_params) > 0: | ||||
for k in sorted(query_params.keys()): | for k in sorted(query_params.keys()): | ||||
query_dict[k] = query_params[k] | query_dict[k] = query_params[k] | ||||
if 'origin' in swh_id_parsed.metadata: | if 'origin' in swh_id_parsed.metadata: | ||||
query_dict['origin'] = swh_id_parsed.metadata['origin'] | query_dict['origin'] = swh_id_parsed.metadata['origin'] | ||||
if object_type == CONTENT: | if object_type == CONTENT: | ||||
query_string = 'sha1_git:' + object_id | query_string = 'sha1_git:' + object_id | ||||
fragment = '' | fragment = '' | ||||
if 'lines' in swh_id_parsed.metadata: | if 'lines' in swh_id_parsed.metadata: | ||||
lines = swh_id_parsed.metadata['lines'].split('-') | lines = swh_id_parsed.metadata['lines'].split('-') | ||||
fragment += '#L' + lines[0] | fragment += '#L' + lines[0] | ||||
if len(lines) > 1: | if len(lines) > 1: | ||||
fragment += '-L' + lines[1] | fragment += '-L' + lines[1] | ||||
browse_url = reverse('browse-content', | browse_url = reverse('browse-content', | ||||
url_args={'query_string': query_string}, | url_args={'query_string': query_string}, | ||||
query_params=query_dict) + fragment | query_params=query_dict) + fragment | ||||
elif object_type == DIRECTORY: | elif object_type == DIRECTORY: | ||||
browse_url = reverse('browse-directory', | browse_url = reverse('browse-directory', | ||||
url_args={'sha1_git': object_id}, | url_args={'sha1_git': object_id}, | ||||
query_params=query_dict) | query_params=query_dict) | ||||
elif object_type == RELEASE: | elif object_type == RELEASE: | ||||
browse_url = reverse('browse-release', | browse_url = reverse('browse-release', | ||||
url_args={'sha1_git': object_id}, | url_args={'sha1_git': object_id}, | ||||
query_params=query_dict) | query_params=query_dict) | ||||
elif object_type == REVISION: | elif object_type == REVISION: | ||||
browse_url = reverse('browse-revision', | browse_url = reverse('browse-revision', | ||||
url_args={'sha1_git': object_id}, | url_args={'sha1_git': object_id}, | ||||
query_params=query_dict) | query_params=query_dict) | ||||
elif object_type == SNAPSHOT: | elif object_type == SNAPSHOT: | ||||
browse_url = reverse('browse-snapshot', | browse_url = reverse('browse-snapshot', | ||||
url_args={'snapshot_id': object_id}, | url_args={'snapshot_id': object_id}, | ||||
query_params=query_dict) | query_params=query_dict) | ||||
elif object_type == ORIGIN: | elif object_type == ORIGIN: | ||||
raise BadInputExc(('Origin PIDs (Persistent Identifiers) are not ' | raise BadInputExc(('Origin PIDs (Persistent Identifiers) are not ' | ||||
'publicly resolvable because they are for ' | 'publicly resolvable because they are for ' | ||||
'internal usage only')) | 'internal usage only')) | ||||
except ValidationError as ve: | |||||
raise BadInputExc('Error when parsing identifier. %s' % | |||||
' '.join(ve.messages)) | |||||
else: | |||||
return {'swh_id_parsed': swh_id_parsed, | return {'swh_id_parsed': swh_id_parsed, | ||||
'browse_url': browse_url} | 'browse_url': browse_url} | ||||
def parse_rst(text, report_level=2): | def parse_rst(text, report_level=2): | ||||
""" | """ | ||||
Parse a reStructuredText string with docutils. | Parse a reStructuredText string with docutils. | ||||
Args: | Args: | ||||
text (str): string with reStructuredText markups in it | text (str): string with reStructuredText markups in it | ||||
▲ Show 20 Lines • Show All 74 Lines • ▼ Show 20 Lines | while branch and branch['target_type'] == 'alias': | ||||
snp = service.lookup_snapshot( | snp = service.lookup_snapshot( | ||||
snapshot['id'], branches_from=branch['target'], | snapshot['id'], branches_from=branch['target'], | ||||
branches_count=1) | branches_count=1) | ||||
if snp and branch['target'] in snp['branches']: | if snp and branch['target'] in snp['branches']: | ||||
branch = snp['branches'][branch['target']] | branch = snp['branches'][branch['target']] | ||||
else: | else: | ||||
branch = None | branch = None | ||||
return branch | return branch | ||||
def get_persistent_identifier(persistent_id): | |||||
"""Check if a persistent identifier is valid. | |||||
Args: | |||||
persistent_id: A string representing a Software Heritage | |||||
persistent identifier. | |||||
Raises: | |||||
BadInputExc: if the provided persistent identifier can | |||||
not be parsed. | |||||
Return: | |||||
zack: the indentation of the dictionary keys do not look right here | |||||
A persistent identifier object. | |||||
""" | |||||
try: | |||||
pid_object = parse_persistent_identifier(persistent_id) | |||||
except ValidationError as ve: | |||||
raise BadInputExc('Error when parsing identifier: %s' % | |||||
' '.join(ve.messages)) | |||||
else: | |||||
return pid_object | |||||
def group_swh_persistent_identifiers(persistent_ids): | |||||
""" | |||||
Groups many Software Heritage persistent identifiers into a | |||||
dictionary depending on their type. | |||||
Args: | |||||
persistent_ids (list): a list of Software Heritage persistent | |||||
identifier objects | |||||
Returns: | |||||
Not Done Inline ActionsYou should add a small wrapper around parse_persistent_identifier to handle the error, that can be shared with resolve_swh_persistent_id. vlorentz: You should add a small wrapper around `parse_persistent_identifier` to handle the error, that… | |||||
A dictionary with: | |||||
keys: persistent identifier types | |||||
values: list(bytes) persistent identifiers id | |||||
Raises: | |||||
BadInputExc: if one of the provided persistent identifier can | |||||
not be parsed. | |||||
""" | |||||
pids_by_type = { | |||||
CONTENT: [], | |||||
DIRECTORY: [], | |||||
REVISION: [], | |||||
RELEASE: [], | |||||
SNAPSHOT: [] | |||||
} | |||||
for pid in persistent_ids: | |||||
obj_id = pid.object_id | |||||
obj_type = pid.object_type | |||||
pids_by_type[obj_type].append(hash_to_bytes(obj_id)) | |||||
return pids_by_type |
the indentation of the dictionary keys do not look right here