Changeset View
Changeset View
Standalone View
Standalone View
swh/fuse/fs/artifact.py
# Copyright (C) 2020 The Software Heritage developers | # Copyright (C) 2020 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
from dataclasses import dataclass | from dataclasses import dataclass | ||||
from pathlib import Path | from pathlib import Path | ||||
from typing import Any, AsyncIterator, List | from typing import Any, AsyncIterator, List | ||||
import urllib.parse | |||||
from swh.fuse.fs.entry import ( | from swh.fuse.fs.entry import ( | ||||
EntryMode, | EntryMode, | ||||
FuseDirEntry, | FuseDirEntry, | ||||
FuseEntry, | FuseEntry, | ||||
FuseFileEntry, | FuseFileEntry, | ||||
FuseSymlinkEntry, | FuseSymlinkEntry, | ||||
) | ) | ||||
from swh.model.from_disk import DentryPerms | from swh.model.from_disk import DentryPerms | ||||
from swh.model.identifiers import CONTENT, DIRECTORY, RELEASE, REVISION, SWHID | from swh.model.identifiers import CONTENT, DIRECTORY, RELEASE, REVISION, SNAPSHOT, SWHID | ||||
@dataclass | @dataclass | ||||
class Content(FuseFileEntry): | class Content(FuseFileEntry): | ||||
""" Software Heritage content artifact. | """ Software Heritage content artifact. | ||||
Attributes: | Attributes: | ||||
swhid: Software Heritage persistent identifier | swhid: Software Heritage persistent identifier | ||||
▲ Show 20 Lines • Show All 232 Lines • ▼ Show 20 Lines | class ReleaseType(FuseFileEntry): | ||||
async def get_content(self) -> bytes: | async def get_content(self) -> bytes: | ||||
return str.encode(self.target_type + "\n") | return str.encode(self.target_type + "\n") | ||||
async def size(self) -> int: | async def size(self) -> int: | ||||
return len(await self.get_content()) | return len(await self.get_content()) | ||||
@dataclass | |||||
class Snapshot(FuseDirEntry): | |||||
""" Software Heritage snapshot artifact. | |||||
Attributes: | |||||
swhid: Software Heritage persistent identifier | |||||
Snapshot nodes are represented on the file-system as directories with one | |||||
entry for each branch in the snapshot. Each entry is a symlink pointing into | |||||
`archive/` to the branch target SWHID. Branch names are URL encoded (hence | |||||
'/' are replaced with '%2F'). """ | |||||
swhid: SWHID | |||||
async def __aiter__(self) -> AsyncIterator[FuseEntry]: | |||||
metadata = await self.fuse.get_metadata(self.swhid) | |||||
root_path = self.get_relative_root_path() | |||||
for branch_name, branch_meta in metadata.items(): | |||||
# Mangle branch name to create a valid UNIX filename | |||||
name = urllib.parse.quote_plus(branch_name) | |||||
yield self.create_child( | |||||
FuseSymlinkEntry, | |||||
name=name, | |||||
target=Path(root_path, f"archive/{branch_meta['target']}"), | |||||
) | |||||
OBJTYPE_GETTERS = { | OBJTYPE_GETTERS = { | ||||
CONTENT: Content, | CONTENT: Content, | ||||
DIRECTORY: Directory, | DIRECTORY: Directory, | ||||
REVISION: Revision, | REVISION: Revision, | ||||
RELEASE: Release, | RELEASE: Release, | ||||
SNAPSHOT: Snapshot, | |||||
} | } |