Changeset View
Changeset View
Standalone View
Standalone View
swh/graphql/backends/archive.py
# Copyright (C) 2022 The Software Heritage developers | # Copyright (C) 2022 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
import os | import os | ||||
from typing import Any, Dict, Optional | from typing import Any, Dict, Iterable, List, Optional | ||||
from swh.graphql import server | from swh.graphql import server | ||||
from swh.model.model import Sha1Git | from swh.model.model import ( | ||||
Content, | |||||
DirectoryEntry, | |||||
Origin, | |||||
OriginVisit, | |||||
OriginVisitStatus, | |||||
Release, | |||||
Revision, | |||||
Sha1, | |||||
Sha1Git, | |||||
) | |||||
from swh.model.swhids import ObjectType | from swh.model.swhids import ObjectType | ||||
from swh.storage.interface import PagedResult, PartialBranches, StorageInterface | |||||
class Archive: | class Archive: | ||||
def __init__(self): | def __init__(self) -> None: | ||||
self.storage = server.get_storage() | self.storage: StorageInterface = server.get_storage() | ||||
def get_origin(self, url): | def get_origin(self, url: str) -> Optional[Origin]: | ||||
return self.storage.origin_get([url])[0] | return list(self.storage.origin_get(origins=[url]))[0] | ||||
def get_origins(self, after=None, first=50): | def get_origins( | ||||
self, after: Optional[str] = None, first: int = 50 | |||||
) -> PagedResult[Origin]: | |||||
return self.storage.origin_list(page_token=after, limit=first) | return self.storage.origin_list(page_token=after, limit=first) | ||||
def get_origin_visits(self, origin_url, after=None, first=50): | def get_origin_visits( | ||||
return self.storage.origin_visit_get(origin_url, page_token=after, limit=first) | self, origin_url: str, after: Optional[str] = None, first: int = 50 | ||||
) -> PagedResult[OriginVisit]: | |||||
return self.storage.origin_visit_get( | |||||
origin=origin_url, page_token=after, limit=first | |||||
) | |||||
def get_origin_visit(self, origin_url, visit_id): | def get_origin_visit(self, origin_url: str, visit_id: int) -> Optional[OriginVisit]: | ||||
return self.storage.origin_visit_get_by(origin_url, visit_id) | return self.storage.origin_visit_get_by(origin=origin_url, visit=visit_id) | ||||
def get_origin_latest_visit(self, origin_url): | def get_origin_latest_visit(self, origin_url: str) -> Optional[OriginVisit]: | ||||
return self.storage.origin_visit_get_latest(origin_url) | return self.storage.origin_visit_get_latest(origin=origin_url) | ||||
def get_visit_status(self, origin_url, visit_id, after=None, first=50): | def get_visit_status( | ||||
self, | |||||
origin_url: str, | |||||
visit_id: int, | |||||
after: Optional[str] = None, | |||||
first: int = 50, | |||||
) -> PagedResult[OriginVisitStatus]: | |||||
return self.storage.origin_visit_status_get( | return self.storage.origin_visit_status_get( | ||||
origin_url, visit_id, page_token=after, limit=first | origin=origin_url, visit=visit_id, page_token=after, limit=first | ||||
) | ) | ||||
def get_latest_visit_status(self, origin_url, visit_id): | def get_latest_visit_status( | ||||
return self.storage.origin_visit_status_get_latest(origin_url, visit_id) | self, origin_url: str, visit_id: int | ||||
) -> Optional[OriginVisitStatus]: | |||||
return self.storage.origin_visit_status_get_latest( | |||||
origin_url=origin_url, visit=visit_id | |||||
) | |||||
def get_origin_snapshots(self, origin_url): | def get_origin_snapshots(self, origin_url: str) -> List[Sha1Git]: | ||||
return self.storage.origin_snapshot_get_all(origin_url) | return self.storage.origin_snapshot_get_all(origin_url=origin_url) | ||||
def get_snapshot_branches( | def get_snapshot_branches( | ||||
self, snapshot, after=b"", first=50, target_types=None, name_include=None | self, | ||||
): | snapshot: Sha1Git, | ||||
after: bytes = b"", | |||||
first: int = 50, | |||||
target_types: Optional[List[str]] = None, | |||||
name_include: Optional[bytes] = None, | |||||
) -> Optional[PartialBranches]: | |||||
return self.storage.snapshot_get_branches( | return self.storage.snapshot_get_branches( | ||||
snapshot, | snapshot_id=snapshot, | ||||
branches_from=after, | branches_from=after, | ||||
branches_count=first, | branches_count=first, | ||||
target_types=target_types, | target_types=target_types, | ||||
branch_name_include_substring=name_include, | branch_name_include_substring=name_include, | ||||
) | ) | ||||
def get_revisions(self, revision_ids): | def get_revisions(self, revision_ids: List[Sha1Git]) -> List[Optional[Revision]]: | ||||
return self.storage.revision_get(revision_ids=revision_ids) | return self.storage.revision_get(revision_ids=revision_ids) | ||||
def get_revision_log(self, revision_ids, after=None, first=50): | def get_revision_log( | ||||
self, revision_ids: List[Sha1Git], first: int = 50 | |||||
) -> Iterable[Optional[Dict[str, Any]]]: | |||||
return self.storage.revision_log(revisions=revision_ids, limit=first) | return self.storage.revision_log(revisions=revision_ids, limit=first) | ||||
def get_releases(self, release_ids): | def get_releases(self, release_ids: List[Sha1Git]) -> List[Optional[Release]]: | ||||
return self.storage.release_get(releases=release_ids) | return self.storage.release_get(releases=release_ids) | ||||
def get_directory_entry_by_path( | def get_directory_entry_by_path( | ||||
self, directory_id: Sha1Git, path: str | self, directory_id: Sha1Git, path: str | ||||
) -> Optional[Dict[str, Any]]: | ) -> Optional[Dict[str, Any]]: | ||||
paths = [x.encode() for x in path.strip(os.path.sep).split(os.path.sep)] | paths = [x.encode() for x in path.strip(os.path.sep).split(os.path.sep)] | ||||
return self.storage.directory_entry_get_by_path( | return self.storage.directory_entry_get_by_path( | ||||
directory=directory_id, paths=paths | directory=directory_id, paths=paths | ||||
) | ) | ||||
def get_directory_entries(self, directory_id, after=None, first=50): | def get_directory_entries( | ||||
self, directory_id: Sha1Git, after: Optional[bytes] = None, first: int = 50 | |||||
) -> Optional[PagedResult[DirectoryEntry]]: | |||||
return self.storage.directory_get_entries( | return self.storage.directory_get_entries( | ||||
directory_id, limit=first, page_token=after | directory_id=directory_id, limit=first, page_token=after | ||||
) | ) | ||||
def is_object_available(self, object_id: str, object_type: ObjectType) -> bool: | def is_object_available(self, object_id: bytes, object_type: ObjectType) -> bool: | ||||
mapping = { | mapping = { | ||||
ObjectType.CONTENT: self.storage.content_missing_per_sha1_git, | ObjectType.CONTENT: self.storage.content_missing_per_sha1_git, | ||||
ObjectType.DIRECTORY: self.storage.directory_missing, | ObjectType.DIRECTORY: self.storage.directory_missing, | ||||
ObjectType.RELEASE: self.storage.release_missing, | ObjectType.RELEASE: self.storage.release_missing, | ||||
ObjectType.REVISION: self.storage.revision_missing, | ObjectType.REVISION: self.storage.revision_missing, | ||||
ObjectType.SNAPSHOT: self.storage.snapshot_missing, | ObjectType.SNAPSHOT: self.storage.snapshot_missing, | ||||
} | } | ||||
return not list(mapping[object_type]([object_id])) | return not list(mapping[object_type]([object_id])) | ||||
def get_contents(self, checksums: dict): | def get_contents(self, checksums: Dict[str, Any]) -> List[Content]: | ||||
return self.storage.content_find(checksums) | return self.storage.content_find(content=checksums) | ||||
def get_content_data(self, content_sha1): | def get_content_data(self, content_sha1: Sha1) -> Optional[bytes]: | ||||
return self.storage.content_get_data(content_sha1) | return self.storage.content_get_data(content=content_sha1) |