Changeset View
Changeset View
Standalone View
Standalone View
swh/web/api/v2/views/content.py
- This file was added.
# Copyright (C) 2020 The Software Heritage developers | |||||
# See the AUTHORS file at the top-level directory of this distribution | |||||
# License: GNU Affero General Public License version 3, or any later version | |||||
# See top-level LICENSE file for more information | |||||
from typing import List | |||||
from openapi_core.validation.request.datatypes import RequestParameters | |||||
from rest_framework.request import Request | |||||
from swh.model.identifiers import parse_swhid | |||||
from swh.web.api.v2.typing import Content, ContentKnown | |||||
from swh.web.common import archive | |||||
from swh.web.common.identifiers import get_swhid, group_swhids | |||||
def _fetch_content_data(swhid_content: str) -> Content: | |||||
parsed_swhid = parse_swhid(swhid_content) | |||||
content_data = archive.lookup_content(f"sha1_git:{parsed_swhid.object_id}") | |||||
return Content( | |||||
sha1=content_data["checksums"]["sha1"], | |||||
sha1_git=content_data["checksums"]["sha1_git"], | |||||
sha256=content_data["checksums"]["sha256"], | |||||
length=content_data["length"], | |||||
) | |||||
def contents(request: Request, parameters: RequestParameters) -> List[Content]: | |||||
swhids_content = parameters.query["swhids_content"] | |||||
return [_fetch_content_data(swhid_content) for swhid_content in swhids_content] | |||||
def content(request: Request, parameters: RequestParameters) -> Content: | |||||
return _fetch_content_data(parameters.path["swhid_content"]) | |||||
def content_raw(request: Request, parameters: RequestParameters) -> bytes: | |||||
parsed_swhid = parse_swhid(parameters.path["swhid_content"]) | |||||
content_raw = archive.lookup_content_raw(f"sha1_git:{parsed_swhid.object_id}") | |||||
return content_raw["data"] | |||||
def content_known( | |||||
request: Request, parameters: RequestParameters | |||||
) -> List[ContentKnown]: | |||||
swhids = [get_swhid(swhid) for swhid in request.data] | |||||
swhids_by_type = group_swhids(swhids) | |||||
missing_hashes = archive.lookup_missing_hashes(swhids_by_type) | |||||
contents_known = [] | |||||
for swhid in swhids: | |||||
contents_known.append( | |||||
ContentKnown(swhid=str(swhid), known=swhid.object_id not in missing_hashes) | |||||
) | |||||
return contents_known |