diff --git a/swh/fuse/fs/artifact.py b/swh/fuse/fs/artifact.py index ab051fa..85651c1 100644 --- a/swh/fuse/fs/artifact.py +++ b/swh/fuse/fs/artifact.py @@ -1,632 +1,635 @@ -# Copyright (C) 2020 The Software Heritage developers +# Copyright (C) 2020-2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import asyncio from dataclasses import dataclass, field import json import logging import os from pathlib import Path import re -from typing import Any, AsyncIterator, Dict, List +from typing import Any, AsyncIterator, Dict, List, Optional, cast from swh.fuse.fs.entry import ( EntryMode, FuseDirEntry, FuseEntry, FuseFileEntry, FuseSymlinkEntry, ) from swh.model.from_disk import DentryPerms from swh.model.identifiers import CONTENT, DIRECTORY, RELEASE, REVISION, SNAPSHOT, SWHID SWHID_REGEXP = r"swh:1:(cnt|dir|rel|rev|snp):[0-9a-f]{40}" @dataclass class Content(FuseFileEntry): """ Software Heritage content artifact. Attributes: swhid: Software Heritage persistent identifier prefetch: optional prefetched metadata used to set entry attributes Content leaves (AKA blobs) are represented on disks as regular files, containing the corresponding bytes, as archived. Note that permissions are associated to blobs only in the context of directories. Hence, when accessing blobs from the top-level `archive/` directory, the permissions of the `archive/SWHID` file will be arbitrary and not meaningful (e.g., `0x644`). """ swhid: SWHID prefetch: Any = None async def get_content(self) -> bytes: data = await self.fuse.get_blob(self.swhid) if not self.prefetch: self.prefetch = {"length": len(data)} return data async def size(self) -> int: if self.prefetch: return self.prefetch["length"] else: return await super().size() @dataclass class Directory(FuseDirEntry): """ Software Heritage directory artifact. Attributes: swhid: Software Heritage persistent identifier Directory nodes are represented as directories on the file-system, containing one entry for each entry of the archived directory. Entry names and other metadata, including permissions, will correspond to the archived entry metadata. Note that the FUSE mount is read-only, no matter what the permissions say. So it is possible that, in the context of a directory, a file is presented as writable, whereas actually writing to it will fail with `EPERM`. """ swhid: SWHID async def compute_entries(self) -> AsyncIterator[FuseEntry]: metadata = await self.fuse.get_metadata(self.swhid) for entry in metadata: name = entry["name"] swhid = entry["target"] mode = ( # Archived permissions for directories are always set to # 0o040000 so use a read-only permission instead int(EntryMode.RDONLY_DIR) if swhid.object_type == DIRECTORY else entry["perms"] ) # 1. Symlink (check symlink first because condition is less restrictive) if mode == DentryPerms.symlink: - target = "" + target = b"" try: # Symlink target is stored in the blob content target = await self.fuse.get_blob(swhid) except Exception: pass # Ignore error and create a (broken) symlink anyway yield self.create_child( FuseSymlinkEntry, name=name, target=target, ) # 2. Regular file elif swhid.object_type == CONTENT: yield self.create_child( Content, name=name, mode=mode, swhid=swhid, # The directory API has extra info we can use to set # attributes without additional Software Heritage API call prefetch=entry, ) # 3. Regular directory elif swhid.object_type == DIRECTORY: yield self.create_child( Directory, name=name, mode=mode, swhid=swhid, ) # 4. Submodule elif swhid.object_type == REVISION: try: # Make sure the revision metadata is fetched and create a # symlink to distinguish it with regular directories await self.fuse.get_metadata(swhid) except Exception: pass # Ignore error and create a (broken) symlink anyway yield self.create_child( FuseSymlinkEntry, name=name, target=Path(self.get_relative_root_path(), f"archive/{swhid}"), ) else: raise ValueError("Unknown directory entry type: {swhid.object_type}") @dataclass class Revision(FuseDirEntry): """ Software Heritage revision artifact. Attributes: swhid: Software Heritage persistent identifier Revision (AKA commit) nodes are represented on the file-system as directories with the following entries: - `root`: source tree at the time of the commit, as a symlink pointing into `archive/`, to a SWHID of type `dir` - `parents/` (note the plural): a virtual directory containing entries named `1`, `2`, `3`, etc., one for each parent commit. Each of these entry is a symlink pointing into `archive/`, to the SWHID file for the given parent commit - `parent` (note the singular): present if and only if the current commit has at least one parent commit (which is the most common case). When present it is a symlink pointing into `parents/1/` - `history`: a virtual directory listing all its revision ancestors, sorted in reverse topological order. The history can be listed through `by-date/`, `by-hash/` or `by-page/` with each its own sharding policy. - `meta.json`: metadata for the current node, as a symlink pointing to the relevant `archive/.json` file """ swhid: SWHID async def compute_entries(self) -> AsyncIterator[FuseEntry]: metadata = await self.fuse.get_metadata(self.swhid) directory = metadata["directory"] parents = metadata["parents"] root_path = self.get_relative_root_path() yield self.create_child( FuseSymlinkEntry, name="root", target=Path(root_path, f"archive/{directory}"), ) yield self.create_child( FuseSymlinkEntry, name="meta.json", target=Path(root_path, f"archive/{self.swhid}.json"), ) yield self.create_child( RevisionParents, name="parents", mode=int(EntryMode.RDONLY_DIR), parents=[x["id"] for x in parents], ) if len(parents) >= 1: yield self.create_child( FuseSymlinkEntry, name="parent", target="parents/1/", ) yield self.create_child( RevisionHistory, name="history", mode=int(EntryMode.RDONLY_DIR), swhid=self.swhid, ) @dataclass class RevisionParents(FuseDirEntry): """ Revision virtual `parents/` directory """ parents: List[SWHID] async def compute_entries(self) -> AsyncIterator[FuseEntry]: root_path = self.get_relative_root_path() for i, parent in enumerate(self.parents): yield self.create_child( FuseSymlinkEntry, name=str(i + 1), target=Path(root_path, f"archive/{parent}"), ) @dataclass class RevisionHistory(FuseDirEntry): """ Revision virtual `history/` directory """ swhid: SWHID async def prefill_by_date_cache(self, by_date_dir: FuseDirEntry) -> None: history = await self.fuse.get_history(self.swhid) nb_api_calls = 0 for swhid in history: cache = await self.fuse.cache.metadata.get(swhid) if cache: continue await self.fuse.get_metadata(swhid) # The by-date/ directory is cached temporarily in direntry, and # invalidated + updated every 100 API calls nb_api_calls += 1 if nb_api_calls % 100 == 0: self.fuse.cache.direntry.invalidate(by_date_dir) # Make sure to have the latest entries once the prefilling is done self.fuse.cache.direntry.invalidate(by_date_dir) async def compute_entries(self) -> AsyncIterator[FuseEntry]: - by_date_dir = self.create_child( + by_date_dir = cast( RevisionHistoryShardByDate, - name="by-date", - mode=int(EntryMode.RDONLY_DIR), - history_swhid=self.swhid, + self.create_child( + RevisionHistoryShardByDate, + name="by-date", + mode=int(EntryMode.RDONLY_DIR), + history_swhid=self.swhid, + ), ) # Run it concurrently because of the many API calls necessary asyncio.create_task(self.prefill_by_date_cache(by_date_dir)) yield by_date_dir yield self.create_child( RevisionHistoryShardByHash, name="by-hash", mode=int(EntryMode.RDONLY_DIR), history_swhid=self.swhid, ) yield self.create_child( RevisionHistoryShardByPage, name="by-page", mode=int(EntryMode.RDONLY_DIR), history_swhid=self.swhid, ) @dataclass class RevisionHistoryShardByDate(FuseDirEntry): """ Revision virtual `history/by-date` sharded directory """ history_swhid: SWHID prefix: str = field(default="") is_status_done: bool = field(default=False) DATE_FMT = "{year:04d}/{month:02d}/{day:02d}/" ENTRIES_REGEXP = re.compile(r"^([0-9]{2,4})|(" + SWHID_REGEXP + ")$") @dataclass class StatusFile(FuseFileEntry): """ Temporary file used to indicate loading progress in by-date/ """ name: str = field(init=False, default=".status") mode: int = field(init=False, default=int(EntryMode.RDONLY_FILE)) history_swhid: SWHID def __post_init__(self): super().__post_init__() # This is the only case where we do not want the kernel to cache the file self.file_info_attrs["keep_cache"] = False self.file_info_attrs["direct_io"] = True async def get_content(self) -> bytes: history_full = await self.fuse.get_history(self.history_swhid) history_cached = await self.fuse.cache.history.get_with_date_prefix( self.history_swhid, date_prefix="" ) fmt = f"Done: {len(history_cached)}/{len(history_full)}\n" return fmt.encode() def __post_init__(self): super().__post_init__() # Create the status file only once so we can easily remove it when the # entire history is fetched self.status_file = self.create_child( RevisionHistoryShardByDate.StatusFile, history_swhid=self.history_swhid ) async def compute_entries(self) -> AsyncIterator[FuseEntry]: history_full = await self.fuse.get_history(self.history_swhid) # Only check for cached revisions with the appropriate prefix, since # fetching all of them with the Web API would take too long history_cached = await self.fuse.cache.history.get_with_date_prefix( self.history_swhid, date_prefix=self.prefix ) depth = self.prefix.count("/") root_path = self.get_relative_root_path() sharded_dirs = set() for (swhid, sharded_name) in history_cached: if not sharded_name.startswith(self.prefix): continue if depth == 3: yield self.create_child( FuseSymlinkEntry, name=str(swhid), target=Path(root_path, f"archive/{swhid}"), ) # Create sharded directories else: next_prefix = sharded_name.split("/")[depth] if next_prefix not in sharded_dirs: sharded_dirs.add(next_prefix) yield self.create_child( RevisionHistoryShardByDate, name=next_prefix, mode=int(EntryMode.RDONLY_DIR), prefix=f"{self.prefix}{next_prefix}/", history_swhid=self.history_swhid, ) self.is_status_done = len(history_cached) == len(history_full) if self.is_status_done: self.fuse._remove_inode(self.status_file.inode) elif not self.is_status_done and depth == 0: yield self.status_file @dataclass class RevisionHistoryShardByHash(FuseDirEntry): """ Revision virtual `history/by-hash` sharded directory """ history_swhid: SWHID prefix: str = field(default="") SHARDING_LENGTH = 2 ENTRIES_REGEXP = re.compile(r"^([a-f0-9]+)|(" + SWHID_REGEXP + ")$") async def compute_entries(self) -> AsyncIterator[FuseEntry]: history = await self.fuse.get_history(self.history_swhid) if self.prefix: root_path = self.get_relative_root_path() for swhid in history: if swhid.object_id.startswith(self.prefix): yield self.create_child( FuseSymlinkEntry, name=str(swhid), target=Path(root_path, f"archive/{swhid}"), ) # Create sharded directories else: sharded_dirs = set() for swhid in history: next_prefix = swhid.object_id[: self.SHARDING_LENGTH] if next_prefix not in sharded_dirs: sharded_dirs.add(next_prefix) yield self.create_child( RevisionHistoryShardByHash, name=next_prefix, mode=int(EntryMode.RDONLY_DIR), prefix=next_prefix, history_swhid=self.history_swhid, ) @dataclass class RevisionHistoryShardByPage(FuseDirEntry): """ Revision virtual `history/by-page` sharded directory """ history_swhid: SWHID - prefix: int = field(default=None) + prefix: Optional[int] = field(default=None) PAGE_SIZE = 10_000 PAGE_FMT = "{page_number:03d}" ENTRIES_REGEXP = re.compile(r"^([0-9]+)|(" + SWHID_REGEXP + ")$") async def compute_entries(self) -> AsyncIterator[FuseEntry]: history = await self.fuse.get_history(self.history_swhid) if self.prefix is not None: current_page = self.prefix root_path = self.get_relative_root_path() max_idx = min(len(history), (current_page + 1) * self.PAGE_SIZE) for i in range(current_page * self.PAGE_SIZE, max_idx): swhid = history[i] yield self.create_child( FuseSymlinkEntry, name=str(swhid), target=Path(root_path, f"archive/{swhid}"), ) # Create sharded directories else: for i in range(0, len(history), self.PAGE_SIZE): page_number = i // self.PAGE_SIZE yield self.create_child( RevisionHistoryShardByPage, name=self.PAGE_FMT.format(page_number=page_number), mode=int(EntryMode.RDONLY_DIR), history_swhid=self.history_swhid, prefix=page_number, ) @dataclass class Release(FuseDirEntry): """ Software Heritage release artifact. Attributes: swhid: Software Heritage persistent identifier Release nodes are represented on the file-system as directories with the following entries: - `target`: target node, as a symlink to `archive/` - `target_type`: regular file containing the type of the target SWHID - `root`: present if and only if the release points to something that (transitively) resolves to a directory. When present it is a symlink pointing into `archive/` to the SWHID of the given directory - `meta.json`: metadata for the current node, as a symlink pointing to the relevant `archive/.json` file """ swhid: SWHID - async def find_root_directory(self, swhid: SWHID) -> SWHID: + async def find_root_directory(self, swhid: SWHID) -> Optional[SWHID]: if swhid.object_type == RELEASE: metadata = await self.fuse.get_metadata(swhid) return await self.find_root_directory(metadata["target"]) elif swhid.object_type == REVISION: metadata = await self.fuse.get_metadata(swhid) return metadata["directory"] elif swhid.object_type == DIRECTORY: return swhid else: return None async def compute_entries(self) -> AsyncIterator[FuseEntry]: metadata = await self.fuse.get_metadata(self.swhid) root_path = self.get_relative_root_path() yield self.create_child( FuseSymlinkEntry, name="meta.json", target=Path(root_path, f"archive/{self.swhid}.json"), ) target = metadata["target"] yield self.create_child( FuseSymlinkEntry, name="target", target=Path(root_path, f"archive/{target}") ) yield self.create_child( ReleaseType, name="target_type", mode=int(EntryMode.RDONLY_FILE), target_type=target.object_type, ) target_dir = await self.find_root_directory(target) if target_dir is not None: yield self.create_child( FuseSymlinkEntry, name="root", target=Path(root_path, f"archive/{target_dir}"), ) @dataclass class ReleaseType(FuseFileEntry): """ Release type virtual file """ target_type: str async def get_content(self) -> bytes: return str.encode(self.target_type + "\n") @dataclass class Snapshot(FuseDirEntry): """ Software Heritage snapshot artifact. Attributes: swhid: Software Heritage persistent identifier Snapshot nodes are represented on the file-system as recursive directories following the branch names structure. For example, a branch named ``refs/tags/v1.0`` will be represented as a ``refs`` directory containing a ``tags`` directory containing a ``v1.0`` symlink pointing to the branch target SWHID. """ swhid: SWHID prefix: str = field(default="") async def compute_entries(self) -> AsyncIterator[FuseEntry]: metadata = await self.fuse.get_metadata(self.swhid) root_path = self.get_relative_root_path() subdirs = set() for branch_name, branch_meta in metadata.items(): if not branch_name.startswith(self.prefix): continue next_subdirs = branch_name[len(self.prefix) :].split("/") next_prefix = next_subdirs[0] if len(next_subdirs) == 1: # Non-alias targets are symlinks to their corresponding archived # artifact, whereas alias targets are relative symlinks to the # corresponding snapshot directory entry. target_type = branch_meta["target_type"] target_raw = branch_meta["target"] if target_type == "alias": prefix = Path(branch_name).parent target = os.path.relpath(target_raw, prefix) else: target = f"{root_path}/archive/{target_raw}" yield self.create_child( FuseSymlinkEntry, name=next_prefix, target=Path(target), ) else: subdirs.add(next_prefix) for subdir in subdirs: yield self.create_child( Snapshot, name=subdir, mode=int(EntryMode.RDONLY_DIR), swhid=self.swhid, prefix=f"{self.prefix}{subdir}/", ) @dataclass class Origin(FuseDirEntry): """ Software Heritage origin artifact. Origin nodes are represented on the file-system as directories with one entry for each origin visit. The visits directories are named after the visit date (`YYYY-MM-DD`, if multiple visits occur the same day only the first one is kept). Each visit directory contains a `meta.json` with associated metadata for the origin node, and potentially a `snapshot` symlink pointing to the visit's snapshot node. """ DATE_FMT = "{year:04d}-{month:02d}-{day:02d}" ENTRIES_REGEXP = re.compile(r"^[0-9]{4}-[0-9]{2}-[0-9]{2}$") async def compute_entries(self) -> AsyncIterator[FuseEntry]: # The origin's name is always its URL (encoded to create a valid UNIX filename) visits = await self.fuse.get_visits(self.name) seen_date = set() for visit in visits: date = visit["date"] name = self.DATE_FMT.format(year=date.year, month=date.month, day=date.day) if name in seen_date: logging.debug( "Conflict date on origin: %s, %s", visit["origin"], str(name) ) else: seen_date.add(name) yield self.create_child( OriginVisit, name=name, mode=int(EntryMode.RDONLY_DIR), meta=visit, ) @dataclass class OriginVisit(FuseDirEntry): """ Origin visit virtual directory """ meta: Dict[str, Any] @dataclass class MetaFile(FuseFileEntry): content: str async def get_content(self) -> bytes: return str.encode(self.content + "\n") async def compute_entries(self) -> AsyncIterator[FuseEntry]: snapshot_swhid = self.meta["snapshot"] if snapshot_swhid: root_path = self.get_relative_root_path() yield self.create_child( FuseSymlinkEntry, name="snapshot", target=Path(root_path, f"archive/{snapshot_swhid}"), ) yield self.create_child( OriginVisit.MetaFile, name="meta.json", mode=int(EntryMode.RDONLY_FILE), content=json.dumps( self.meta, indent=self.fuse.conf["json-indent"], default=lambda x: str(x), ), ) OBJTYPE_GETTERS = { CONTENT: Content, DIRECTORY: Directory, REVISION: Revision, RELEASE: Release, SNAPSHOT: Snapshot, } diff --git a/swh/fuse/fs/entry.py b/swh/fuse/fs/entry.py index 3a4f5d8..dbe0a87 100644 --- a/swh/fuse/fs/entry.py +++ b/swh/fuse/fs/entry.py @@ -1,151 +1,151 @@ -# Copyright (C) 2020 The Software Heritage developers +# Copyright (C) 2020-2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from __future__ import annotations from dataclasses import dataclass, field from enum import IntEnum from pathlib import Path import re from stat import S_IFDIR, S_IFLNK, S_IFREG -from typing import Any, AsyncIterator, Dict, Optional, Pattern, Sequence, Union +from typing import TYPE_CHECKING, Any, AsyncIterator, Dict, Optional, Pattern, Union -# Avoid cycling import -Fuse = "Fuse" +if TYPE_CHECKING: # avoid cyclic import + from swh.fuse.fuse import Fuse class EntryMode(IntEnum): """ Default entry mode and permissions for the FUSE. The FUSE mount is always read-only, even if permissions contradict this statement (in a context of a directory, entries are listed with permissions taken from the archive). """ RDONLY_FILE = S_IFREG | 0o444 RDONLY_DIR = S_IFDIR | 0o555 RDONLY_LNK = S_IFLNK | 0o444 # `cache/` sub-directories need the write permission in order to invalidate # cached artifacts using `rm {SWHID}` RDWR_DIR = S_IFDIR | 0o755 @dataclass class FuseEntry: """ Main wrapper class to manipulate virtual FUSE entries Attributes: name: entry filename mode: entry permission mode fuse: internal reference to the main FUSE class inode: unique integer identifying the entry """ name: str mode: int depth: int fuse: Fuse inode: int = field(init=False) file_info_attrs: Dict[str, Any] = field(init=False, default_factory=dict) def __post_init__(self): self.inode = self.fuse._alloc_inode(self) # By default, let the kernel cache previously accessed data self.file_info_attrs["keep_cache"] = True async def size(self) -> int: """ Return the size (in bytes) of an entry """ raise NotImplementedError async def unlink(self, name: str) -> None: raise NotImplementedError def get_relative_root_path(self) -> str: return "../" * (self.depth - 1) def create_child(self, constructor: Any, **kwargs) -> FuseEntry: return constructor(depth=self.depth + 1, fuse=self.fuse, **kwargs) @dataclass class FuseFileEntry(FuseEntry): """ FUSE virtual file entry """ async def get_content(self) -> bytes: """ Return the content of a file entry """ raise NotImplementedError async def size(self) -> int: return len(await self.get_content()) @dataclass class FuseDirEntry(FuseEntry): """ FUSE virtual directory entry """ ENTRIES_REGEXP: Optional[Pattern] = field(init=False, default=None) async def size(self) -> int: return 0 def validate_entry(self, name: str) -> bool: """ Return true if the name matches the directory entries regular expression, and false otherwise """ if self.ENTRIES_REGEXP: - return re.match(self.ENTRIES_REGEXP, name) + return bool(re.match(self.ENTRIES_REGEXP, name)) else: return True - async def compute_entries(self) -> Sequence[FuseEntry]: + async def compute_entries(self): """ Return the child entries of a directory entry """ raise NotImplementedError async def get_entries(self, offset: int = 0) -> AsyncIterator[FuseEntry]: """ Return the child entries of a directory entry using direntry cache """ cache = self.fuse.cache.direntry.get(self) if cache: entries = cache else: entries = [x async for x in self.compute_entries()] self.fuse.cache.direntry.set(self, entries) # Avoid copy by manual iteration (instead of slicing) and use of a # generator (instead of returning the full list every time) for i in range(offset, len(entries)): yield entries[i] - async def lookup(self, name: str) -> FuseEntry: + async def lookup(self, name: str) -> Optional[FuseEntry]: """ Look up a FUSE entry by name """ async for entry in self.get_entries(): if entry.name == name: return entry return None @dataclass class FuseSymlinkEntry(FuseEntry): """ FUSE virtual symlink entry Attributes: target: path to symlink target """ mode: int = field(init=False, default=int(EntryMode.RDONLY_LNK)) target: Union[str, bytes, Path] async def size(self) -> int: return len(str(self.target)) def get_target(self) -> Union[str, bytes, Path]: """ Return the path target of a symlink entry """ return self.target diff --git a/swh/fuse/fs/mountpoint.py b/swh/fuse/fs/mountpoint.py index ca47768..9c302d8 100644 --- a/swh/fuse/fs/mountpoint.py +++ b/swh/fuse/fs/mountpoint.py @@ -1,241 +1,241 @@ -# Copyright (C) 2020 The Software Heritage developers +# Copyright (C) 2020-2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from dataclasses import dataclass, field import json from pathlib import Path import re -from typing import AsyncIterator +from typing import AsyncIterator, Optional from swh.fuse.fs.artifact import OBJTYPE_GETTERS, SWHID_REGEXP, Origin from swh.fuse.fs.entry import ( EntryMode, FuseDirEntry, FuseEntry, FuseFileEntry, FuseSymlinkEntry, ) from swh.model.exceptions import ValidationError from swh.model.identifiers import CONTENT, SWHID, parse_swhid JSON_SUFFIX = ".json" @dataclass class Root(FuseDirEntry): """ The FUSE mountpoint, consisting of the archive/ and origin/ directories """ - name: str = field(init=False, default=None) + name: str = field(init=False, default="") mode: int = field(init=False, default=int(EntryMode.RDONLY_DIR)) depth: int = field(init=False, default=1) async def compute_entries(self) -> AsyncIterator[FuseEntry]: yield self.create_child(ArchiveDir) yield self.create_child(OriginDir) yield self.create_child(CacheDir) yield self.create_child(Readme) @dataclass class ArchiveDir(FuseDirEntry): """ The `archive/` virtual directory allows to mount any artifact on the fly using its SWHID as name. The associated metadata of the artifact from the Software Heritage Web API can also be accessed through the `SWHID.json` file (in case of pagination, the JSON file will contain a complete version with all pages merged together). Note: the archive directory cannot be listed with ls, but entries in it can be accessed (e.g., using cat or cd). """ name: str = field(init=False, default="archive") mode: int = field(init=False, default=int(EntryMode.RDONLY_DIR)) ENTRIES_REGEXP = re.compile(r"^(" + SWHID_REGEXP + ")(.json)?$") async def compute_entries(self) -> AsyncIterator[FuseEntry]: return yield - async def lookup(self, name: str) -> FuseEntry: + async def lookup(self, name: str) -> Optional[FuseEntry]: # On the fly mounting of a new artifact try: if name.endswith(JSON_SUFFIX): swhid = parse_swhid(name[: -len(JSON_SUFFIX)]) return self.create_child( MetaEntry, name=f"{swhid}{JSON_SUFFIX}", mode=int(EntryMode.RDONLY_FILE), swhid=swhid, ) else: swhid = parse_swhid(name) await self.fuse.get_metadata(swhid) return self.create_child( OBJTYPE_GETTERS[swhid.object_type], name=str(swhid), mode=int( EntryMode.RDONLY_FILE if swhid.object_type == CONTENT else EntryMode.RDONLY_DIR ), swhid=swhid, ) except ValidationError: return None @dataclass class MetaEntry(FuseFileEntry): """ An entry for a `archive/.json` file, containing all the SWHID's metadata from the Software Heritage archive. """ swhid: SWHID async def get_content(self) -> bytes: # Make sure the metadata is in cache await self.fuse.get_metadata(self.swhid) # Retrieve raw JSON metadata from cache (un-typified) metadata = await self.fuse.cache.metadata.get(self.swhid, typify=False) json_str = json.dumps(metadata, indent=self.fuse.conf["json-indent"]) return (json_str + "\n").encode() async def size(self) -> int: return len(await self.get_content()) @dataclass class OriginDir(FuseDirEntry): """ The origin/ directory is lazily populated with one entry per accessed origin URL (mangled to create a valid UNIX filename). The URL encoding is done using the percent-encoding mechanism described in RFC 3986. """ name: str = field(init=False, default="origin") mode: int = field(init=False, default=int(EntryMode.RDONLY_DIR)) ENTRIES_REGEXP = re.compile(r"^.*%3A.*$") # %3A is the encoded version of ':' - def create_child(self, url_encoded: str) -> FuseEntry: + def create_origin_child(self, url_encoded: str) -> FuseEntry: return super().create_child( Origin, name=url_encoded, mode=int(EntryMode.RDONLY_DIR), ) async def compute_entries(self) -> AsyncIterator[FuseEntry]: async for url in self.fuse.cache.get_cached_visits(): - yield self.create_child(url) + yield self.create_origin_child(url) - async def lookup(self, name: str) -> FuseEntry: + async def lookup(self, name: str) -> Optional[FuseEntry]: entry = await super().lookup(name) if entry: return entry # On the fly mounting of new origin url try: url_encoded = name await self.fuse.get_visits(url_encoded) - return self.create_child(url_encoded) + return self.create_origin_child(url_encoded) except ValueError: return None @dataclass class CacheDir(FuseDirEntry): """ The cache/ directory is an on-disk representation of locally cached objects and metadata. Via this directory you can browse cached data and selectively remove them from the cache, freeing disk space. (See `swh fs clean` in the {ref}`CLI ` to completely empty the cache). The directory is populated with symlinks to: all artifacts, identified by their SWHIDs and sharded by the first two character of their object id, the metadata identified by a `SWHID.json` entry, and the `origin/` directory. """ name: str = field(init=False, default="cache") mode: int = field(init=False, default=int(EntryMode.RDONLY_DIR)) ENTRIES_REGEXP = re.compile(r"^([a-f0-9]{2})|(" + OriginDir.name + ")$") @dataclass class ArtifactShardBySwhid(FuseDirEntry): ENTRIES_REGEXP = re.compile(r"^(" + SWHID_REGEXP + ")$") prefix: str = field(default="") async def compute_entries(self) -> AsyncIterator[FuseEntry]: root_path = self.get_relative_root_path() async for swhid in self.fuse.cache.get_cached_swhids(): if not swhid.object_id.startswith(self.prefix): continue yield self.create_child( FuseSymlinkEntry, name=str(swhid), target=Path(root_path, f"archive/{swhid}"), ) yield self.create_child( FuseSymlinkEntry, name=f"{swhid}{JSON_SUFFIX}", target=Path(root_path, f"archive/{swhid}{JSON_SUFFIX}"), ) async def unlink(self, name: str) -> None: try: if name.endswith(JSON_SUFFIX): name = name[: -len(JSON_SUFFIX)] swhid = parse_swhid(name) await self.fuse.cache.metadata.remove(swhid) await self.fuse.cache.blob.remove(swhid) except ValidationError: raise async def compute_entries(self) -> AsyncIterator[FuseEntry]: prefixes = set() async for swhid in self.fuse.cache.get_cached_swhids(): prefixes.add(swhid.object_id[:2]) for prefix in prefixes: yield self.create_child( CacheDir.ArtifactShardBySwhid, name=prefix, mode=int(EntryMode.RDWR_DIR), prefix=prefix, ) yield self.create_child( FuseSymlinkEntry, name=OriginDir.name, target=Path(self.get_relative_root_path(), OriginDir.name), ) @dataclass class Readme(FuseFileEntry): """ Top-level README to explain briefly what is SwhFS. """ name: str = field(init=False, default="README") mode: int = field(init=False, default=int(EntryMode.RDONLY_FILE)) CONTENT = """Welcome to the Software Heritage Filesystem (SwhFS)! This is a user-space POSIX filesystem to browse the Software Heritage archive, as if it were locally available. The SwhFS mount point contains 3 directories, all initially empty and lazily populated: - "archive": virtual directory to mount any Software Heritage artifact on the fly using its SWHID as name. Note: this directory cannot be listed with ls, but entries in it can be accessed (e.g., using cat or cd). - "origin": virtual directory to mount any origin using its encoded URL as name. - "cache": on-disk representation of locally cached objects and metadata. Try it yourself: $ cat archive/swh:1:cnt:c839dea9e8e6f0528b468214348fee8669b305b2 #include int main(void) { printf("Hello, World!\\n"); } You can find more details and examples in the SwhFS online documentation: https://docs.softwareheritage.org/devel/swh-fuse/ """ async def get_content(self) -> bytes: return self.CONTENT.encode() diff --git a/swh/fuse/fuse.py b/swh/fuse/fuse.py index 49c4cf8..d18fa3b 100644 --- a/swh/fuse/fuse.py +++ b/swh/fuse/fuse.py @@ -1,361 +1,361 @@ -# Copyright (C) 2020 The Software Heritage developers +# Copyright (C) 2020-2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import asyncio import errno import functools import logging import os from pathlib import Path import time from typing import Any, Dict, List import urllib.parse import pyfuse3 import pyfuse3_asyncio import requests from swh.fuse import LOGGER_NAME from swh.fuse.cache import FuseCache from swh.fuse.fs.entry import FuseDirEntry, FuseEntry, FuseFileEntry, FuseSymlinkEntry from swh.fuse.fs.mountpoint import Root from swh.model.identifiers import CONTENT, REVISION, SWHID from swh.web.client.client import WebAPIClient class Fuse(pyfuse3.Operations): """ Software Heritage Filesystem in Userspace (FUSE). Locally mount parts of the archive and navigate it as a virtual file system. """ - def __init__( - self, root_path: Path, cache: FuseCache, conf: Dict[str, Any], - ): + def __init__(self, root_path: Path, cache: FuseCache, conf: Dict[str, Any]): super(Fuse, self).__init__() self._next_inode: int = pyfuse3.ROOT_INODE self._inode2entry: Dict[int, FuseEntry] = {} - self.root = Root(fuse=self) + # The fuse constructor keyword is propagated up to FuseEntry dataclass, but mypy + # 0.812 considers it an unexpected kwarg. Skip typing it. + self.root = Root(fuse=self) # type: ignore self.conf = conf self.logger = logging.getLogger(LOGGER_NAME) self.time_ns: int = time.time_ns() # start time, used as timestamp self.gid = os.getgid() self.uid = os.getuid() self.web_api = WebAPIClient( conf["web-api"]["url"], conf["web-api"]["auth-token"] ) self.cache = cache def shutdown(self) -> None: pass def _alloc_inode(self, entry: FuseEntry) -> int: """ Return a unique inode integer for a given entry """ inode = self._next_inode self._next_inode += 1 self._inode2entry[inode] = entry return inode def _remove_inode(self, inode: int) -> None: try: del self._inode2entry[inode] except KeyError: pass try: pyfuse3.invalidate_inode(inode) except FileNotFoundError: pass def inode2entry(self, inode: int) -> FuseEntry: """ Return the entry matching a given inode """ try: return self._inode2entry[inode] except KeyError: raise pyfuse3.FUSEError(errno.ENOENT) async def get_metadata(self, swhid: SWHID) -> Any: """ Retrieve metadata for a given SWHID using Software Heritage API """ cache = await self.cache.metadata.get(swhid) if cache: return cache try: typify = False # Get the raw JSON from the API # TODO: async web API loop = asyncio.get_event_loop() metadata = await loop.run_in_executor(None, self.web_api.get, swhid, typify) await self.cache.metadata.set(swhid, metadata) # Retrieve it from cache so it is correctly typed return await self.cache.metadata.get(swhid) except requests.HTTPError as err: self.logger.error("Cannot fetch metadata for object %s: %s", swhid, err) raise async def get_blob(self, swhid: SWHID) -> bytes: """ Retrieve the blob bytes for a given content SWHID using Software Heritage API """ if swhid.object_type != CONTENT: raise pyfuse3.FUSEError(errno.EINVAL) # Make sure the metadata cache is also populated with the given SWHID await self.get_metadata(swhid) cache = await self.cache.blob.get(swhid) if cache: self.logger.debug("Found blob %s in cache", swhid) return cache try: self.logger.debug("Retrieving blob %s via web API...", swhid) loop = asyncio.get_event_loop() resp = await loop.run_in_executor(None, self.web_api.content_raw, swhid) blob = b"".join(list(resp)) await self.cache.blob.set(swhid, blob) return blob except requests.HTTPError as err: self.logger.error("Cannot fetch blob for object %s: %s", swhid, err) raise async def get_history(self, swhid: SWHID) -> List[SWHID]: """ Retrieve a revision's history using Software Heritage Graph API """ if swhid.object_type != REVISION: raise pyfuse3.FUSEError(errno.EINVAL) cache = await self.cache.history.get(swhid) if cache: self.logger.debug( "Found history of %s in cache (%d ancestors)", swhid, len(cache) ) return cache try: # Use the swh-graph API to retrieve the full history very fast self.logger.debug("Retrieving history of %s via graph API...", swhid) call = f"graph/visit/edges/{swhid}?edges=rev:rev" loop = asyncio.get_event_loop() history = await loop.run_in_executor(None, self.web_api._call, call) await self.cache.history.set(history.text) # Retrieve it from cache so it is correctly typed res = await self.cache.history.get(swhid) return res except requests.HTTPError as err: self.logger.error("Cannot fetch history for object %s: %s", swhid, err) # Ignore exception since swh-graph does not necessarily contain the # most recent artifacts from the archive. Computing the full history # from the Web API is too computationally intensive so simply return # an empty list. return [] async def get_visits(self, url_encoded: str) -> List[Dict[str, Any]]: """ Retrieve origin visits given an encoded-URL using Software Heritage API """ cache = await self.cache.metadata.get_visits(url_encoded) if cache: self.logger.debug( "Found %d visits for origin '%s' in cache", len(cache), url_encoded, ) return cache try: self.logger.debug( "Retrieving visits for origin '%s' via web API...", url_encoded ) typify = False # Get the raw JSON from the API loop = asyncio.get_event_loop() # Web API only takes non-encoded URL url = urllib.parse.unquote_plus(url_encoded) origin_exists = await loop.run_in_executor( None, self.web_api.origin_exists, url ) if not origin_exists: raise ValueError("origin does not exist") visits_it = await loop.run_in_executor( None, functools.partial(self.web_api.visits, url, typify=typify) ) visits = list(visits_it) await self.cache.metadata.set_visits(url_encoded, visits) # Retrieve it from cache so it is correctly typed res = await self.cache.metadata.get_visits(url_encoded) return res except (ValueError, requests.HTTPError) as err: self.logger.error( "Cannot fetch visits for origin '%s': %s", url_encoded, err ) raise async def get_attrs(self, entry: FuseEntry) -> pyfuse3.EntryAttributes: """ Return entry attributes """ attrs = pyfuse3.EntryAttributes() attrs.st_size = 0 attrs.st_atime_ns = self.time_ns attrs.st_ctime_ns = self.time_ns attrs.st_mtime_ns = self.time_ns attrs.st_gid = self.gid attrs.st_uid = self.uid attrs.st_ino = entry.inode attrs.st_mode = entry.mode attrs.st_size = await entry.size() return attrs async def getattr( self, inode: int, _ctx: pyfuse3.RequestContext ) -> pyfuse3.EntryAttributes: """ Get attributes for a given inode """ entry = self.inode2entry(inode) return await self.get_attrs(entry) async def opendir(self, inode: int, _ctx: pyfuse3.RequestContext) -> int: """ Open a directory referred by a given inode """ # Re-use inode as directory handle self.logger.debug("opendir(inode=%d)", inode) return inode async def readdir(self, fh: int, offset: int, token: pyfuse3.ReaddirToken) -> None: """ Read entries in an open directory """ # opendir() uses inode as directory handle inode = fh direntry = self.inode2entry(inode) self.logger.debug( "readdir(dirname=%s, fh=%d, offset=%d)", direntry.name, fh, offset ) assert isinstance(direntry, FuseDirEntry) next_id = offset + 1 try: async for entry in direntry.get_entries(offset): name = os.fsencode(entry.name) attrs = await self.get_attrs(entry) if not pyfuse3.readdir_reply(token, name, attrs, next_id): break next_id += 1 self._inode2entry[attrs.st_ino] = entry except Exception as err: self.logger.exception("Cannot readdir: %s", err) raise pyfuse3.FUSEError(errno.ENOENT) async def open( self, inode: int, _flags: int, _ctx: pyfuse3.RequestContext ) -> pyfuse3.FileInfo: """ Open an inode and return a unique file handle """ # Re-use inode as file handle self.logger.debug("open(inode=%d)", inode) entry = self.inode2entry(inode) return pyfuse3.FileInfo(fh=inode, **entry.file_info_attrs) async def read(self, fh: int, offset: int, length: int) -> bytes: """ Read `length` bytes from file handle `fh` at position `offset` """ # open() uses inode as file handle inode = fh entry = self.inode2entry(inode) self.logger.debug( "read(name=%s, fh=%d, offset=%d, length=%d)", entry.name, fh, offset, length ) assert isinstance(entry, FuseFileEntry) try: data = await entry.get_content() return data[offset : offset + length] except Exception as err: self.logger.exception("Cannot read: %s", err) raise pyfuse3.FUSEError(errno.ENOENT) async def lookup( self, parent_inode: int, name: str, _ctx: pyfuse3.RequestContext ) -> pyfuse3.EntryAttributes: """ Look up a directory entry by name and get its attributes """ name = os.fsdecode(name) parent_entry = self.inode2entry(parent_inode) self.logger.debug( "lookup(parent_name=%s, parent_inode=%d, name=%s)", parent_entry.name, parent_inode, name, ) assert isinstance(parent_entry, FuseDirEntry) try: if parent_entry.validate_entry(name): lookup_entry = await parent_entry.lookup(name) if lookup_entry: return await self.get_attrs(lookup_entry) except Exception as err: self.logger.exception("Cannot lookup: %s", err) raise pyfuse3.FUSEError(errno.ENOENT) async def readlink(self, inode: int, _ctx: pyfuse3.RequestContext) -> bytes: entry = self.inode2entry(inode) self.logger.debug("readlink(name=%s, inode=%d)", entry.name, inode) assert isinstance(entry, FuseSymlinkEntry) return os.fsencode(entry.get_target()) async def unlink( self, parent_inode: int, name: str, _ctx: pyfuse3.RequestContext ) -> None: """ Remove a file """ name = os.fsdecode(name) parent_entry = self.inode2entry(parent_inode) self.logger.debug( "unlink(parent_name=%s, parent_inode=%d, name=%s)", parent_entry.name, parent_inode, name, ) try: await parent_entry.unlink(name) except Exception as err: self.logger.exception("Cannot unlink: %s", err) raise pyfuse3.FUSEError(errno.ENOENT) async def main(swhids: List[SWHID], root_path: Path, conf: Dict[str, Any]) -> None: """ swh-fuse CLI entry-point """ # Use pyfuse3 asyncio layer to match the rest of Software Heritage codebase pyfuse3_asyncio.enable() async with FuseCache(conf["cache"]) as cache: fs = Fuse(root_path, cache, conf) # Initially populate the cache for swhid in swhids: try: await fs.get_metadata(swhid) except Exception as err: fs.logger.exception("Cannot prefetch object %s: %s", swhid, err) fuse_options = set(pyfuse3.default_options) fuse_options.add("fsname=swhfs") try: pyfuse3.init(fs, root_path, fuse_options) await pyfuse3.main() except Exception as err: fs.logger.error("Error running FUSE: %s", err) finally: fs.shutdown() pyfuse3.close(unmount=True)