Changeset View
Changeset View
Standalone View
Standalone View
swh/web/common/service.py
# Copyright (C) 2015-2019 The Software Heritage developers | # Copyright (C) 2015-2019 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU Affero General Public License version 3, or any later version | # License: GNU Affero General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
import os | import os | ||||
from collections import defaultdict | from collections import defaultdict | ||||
from typing import Any, Dict | |||||
from swh.model import hashutil | from swh.model import hashutil | ||||
from swh.storage.algos import diff, revisions_walker | from swh.storage.algos import diff, revisions_walker | ||||
from swh.model.identifiers import ( | |||||
CONTENT, DIRECTORY, RELEASE, REVISION, SNAPSHOT | |||||
) | |||||
from swh.web.common import converters | from swh.web.common import converters | ||||
from swh.web.common import query | from swh.web.common import query | ||||
from swh.web.common.exc import NotFoundExc | from swh.web.common.exc import BadInputExc, NotFoundExc | ||||
from swh.web.common.origin_visits import get_origin_visit | from swh.web.common.origin_visits import get_origin_visit | ||||
from swh.web import config | from swh.web import config | ||||
storage = config.storage() | storage = config.storage() | ||||
vault = config.vault() | vault = config.vault() | ||||
idx_storage = config.indexer_storage() | idx_storage = config.indexer_storage() | ||||
▲ Show 20 Lines • Show All 1,073 Lines • ▼ Show 20 Lines | Args: | ||||
constructor | constructor | ||||
kwargs (dict): keyword arguments to pass to the revisions walker | kwargs (dict): keyword arguments to pass to the revisions walker | ||||
constructor | constructor | ||||
""" | """ | ||||
# first check if the provided revision is valid | # first check if the provided revision is valid | ||||
lookup_revision(rev_start) | lookup_revision(rev_start) | ||||
return _RevisionsWalkerProxy(rev_walker_type, rev_start, *args, **kwargs) | return _RevisionsWalkerProxy(rev_walker_type, rev_start, *args, **kwargs) | ||||
def lookup_object(object_type: str, object_id: str) -> Dict[str, Any]: | |||||
""" | |||||
Utility function for looking up an object in the archive by its type | |||||
and id. | |||||
Args: | |||||
object_type (str): the type of object to lookup, either *content*, | |||||
*directory*, *release*, *revision* or *snapshot* | |||||
object_id (str): the *sha1_git* checksum identifier in hexadecimal | |||||
form of the object to lookup | |||||
Returns: | |||||
Dict[str, Any]: A dictionary describing the object or a list of | |||||
dictionary for the directory object type. | |||||
Raises: | |||||
NotFoundExc: if the object could not be found in the archive | |||||
BadInputExc: if the object identifier is invalid | |||||
""" | |||||
if object_type == CONTENT: | |||||
return lookup_content(f'sha1_git:{object_id}') | |||||
elif object_type == DIRECTORY: | |||||
return { | |||||
'id': object_id, | |||||
'content': list(lookup_directory(object_id)) | |||||
} | |||||
elif object_type == RELEASE: | |||||
return lookup_release(object_id) | |||||
elif object_type == REVISION: | |||||
return lookup_revision(object_id) | |||||
elif object_type == SNAPSHOT: | |||||
return lookup_snapshot(object_id) | |||||
raise BadInputExc(('Invalid swh object type! Valid types are ' | |||||
f'{CONTENT}, {DIRECTORY}, {RELEASE} ' | |||||
f'{REVISION} or {SNAPSHOT}.')) |