Page MenuHomeSoftware Heritage

No OneTemporary

diff --git a/swh/fuse/fs/artifact.py b/swh/fuse/fs/artifact.py
index f9b4e97..5f24b54 100644
--- a/swh/fuse/fs/artifact.py
+++ b/swh/fuse/fs/artifact.py
@@ -1,597 +1,604 @@
# Copyright (C) 2020 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import asyncio
from dataclasses import dataclass, field
import json
import logging
import os
from pathlib import Path
+import re
from typing import Any, AsyncIterator, Dict, List
from swh.fuse.fs.entry import (
EntryMode,
FuseDirEntry,
FuseEntry,
FuseFileEntry,
FuseSymlinkEntry,
)
from swh.model.from_disk import DentryPerms
from swh.model.identifiers import CONTENT, DIRECTORY, RELEASE, REVISION, SNAPSHOT, SWHID
+SWHID_REGEXP = r"swh:1:(cnt|dir|rel|rev|snp):[0-9a-f]{40}"
+
@dataclass
class Content(FuseFileEntry):
""" Software Heritage content artifact.
Attributes:
swhid: Software Heritage persistent identifier
prefetch: optional prefetched metadata used to set entry attributes
Content leaves (AKA blobs) are represented on disks as regular files,
containing the corresponding bytes, as archived.
Note that permissions are associated to blobs only in the context of
directories. Hence, when accessing blobs from the top-level `archive/`
directory, the permissions of the `archive/SWHID` file will be arbitrary and
not meaningful (e.g., `0x644`). """
swhid: SWHID
prefetch: Any = None
async def get_content(self) -> bytes:
data = await self.fuse.get_blob(self.swhid)
if not self.prefetch:
self.prefetch = {"length": len(data)}
return data
async def size(self) -> int:
if self.prefetch:
return self.prefetch["length"]
else:
return await super().size()
@dataclass
class Directory(FuseDirEntry):
""" Software Heritage directory artifact.
Attributes:
swhid: Software Heritage persistent identifier
Directory nodes are represented as directories on the file-system,
containing one entry for each entry of the archived directory. Entry names
and other metadata, including permissions, will correspond to the archived
entry metadata.
Note that the FUSE mount is read-only, no matter what the permissions say.
So it is possible that, in the context of a directory, a file is presented
as writable, whereas actually writing to it will fail with `EPERM`. """
swhid: SWHID
async def compute_entries(self) -> AsyncIterator[FuseEntry]:
metadata = await self.fuse.get_metadata(self.swhid)
for entry in metadata:
name = entry["name"]
swhid = entry["target"]
mode = (
# Archived permissions for directories are always set to
# 0o040000 so use a read-only permission instead
int(EntryMode.RDONLY_DIR)
if swhid.object_type == DIRECTORY
else entry["perms"]
)
# 1. Symlink (check symlink first because condition is less restrictive)
if mode == DentryPerms.symlink:
target = ""
try:
# Symlink target is stored in the blob content
target = await self.fuse.get_blob(swhid)
except Exception:
pass # Ignore error and create a (broken) symlink anyway
yield self.create_child(
FuseSymlinkEntry, name=name, target=target,
)
# 2. Regular file
elif swhid.object_type == CONTENT:
yield self.create_child(
Content,
name=name,
mode=mode,
swhid=swhid,
# The directory API has extra info we can use to set
# attributes without additional Software Heritage API call
prefetch=entry,
)
# 3. Regular directory
elif swhid.object_type == DIRECTORY:
yield self.create_child(
Directory, name=name, mode=mode, swhid=swhid,
)
# 4. Submodule
elif swhid.object_type == REVISION:
try:
# Make sure the revision metadata is fetched and create a
# symlink to distinguish it with regular directories
await self.fuse.get_metadata(swhid)
except Exception:
pass # Ignore error and create a (broken) symlink anyway
yield self.create_child(
FuseSymlinkEntry,
name=name,
target=Path(self.get_relative_root_path(), f"archive/{swhid}"),
)
else:
raise ValueError("Unknown directory entry type: {swhid.object_type}")
@dataclass
class Revision(FuseDirEntry):
""" Software Heritage revision artifact.
Attributes:
swhid: Software Heritage persistent identifier
Revision (AKA commit) nodes are represented on the file-system as
directories with the following entries:
- `root`: source tree at the time of the commit, as a symlink pointing into
`archive/`, to a SWHID of type `dir`
- `parents/` (note the plural): a virtual directory containing entries named
`1`, `2`, `3`, etc., one for each parent commit. Each of these entry is a
symlink pointing into `archive/`, to the SWHID file for the given parent
commit
- `parent` (note the singular): present if and only if the current commit
has at least one parent commit (which is the most common case). When
present it is a symlink pointing into `parents/1/`
- `history`: a virtual directory listing all its revision ancestors, sorted
in reverse topological order. The history can be listed through
`by-date/`, `by-hash/` or `by-page/` with each its own sharding policy.
- `meta.json`: metadata for the current node, as a symlink pointing to the
relevant `archive/<SWHID>.json` file """
swhid: SWHID
async def compute_entries(self) -> AsyncIterator[FuseEntry]:
metadata = await self.fuse.get_metadata(self.swhid)
directory = metadata["directory"]
parents = metadata["parents"]
root_path = self.get_relative_root_path()
yield self.create_child(
FuseSymlinkEntry,
name="root",
target=Path(root_path, f"archive/{directory}"),
)
yield self.create_child(
FuseSymlinkEntry,
name="meta.json",
target=Path(root_path, f"archive/{self.swhid}.json"),
)
yield self.create_child(
RevisionParents,
name="parents",
mode=int(EntryMode.RDONLY_DIR),
parents=[x["id"] for x in parents],
)
if len(parents) >= 1:
yield self.create_child(
FuseSymlinkEntry, name="parent", target="parents/1/",
)
yield self.create_child(
RevisionHistory,
name="history",
mode=int(EntryMode.RDONLY_DIR),
swhid=self.swhid,
)
@dataclass
class RevisionParents(FuseDirEntry):
""" Revision virtual `parents/` directory """
parents: List[SWHID]
async def compute_entries(self) -> AsyncIterator[FuseEntry]:
root_path = self.get_relative_root_path()
for i, parent in enumerate(self.parents):
yield self.create_child(
FuseSymlinkEntry,
name=str(i + 1),
target=Path(root_path, f"archive/{parent}"),
)
@dataclass
class RevisionHistory(FuseDirEntry):
""" Revision virtual `history/` directory """
swhid: SWHID
async def prefill_caches(self) -> None:
history = await self.fuse.get_history(self.swhid)
for swhid in history:
await self.fuse.get_metadata(swhid)
async def compute_entries(self) -> AsyncIterator[FuseEntry]:
# Run it concurrently because of the many API calls necessary
asyncio.create_task(self.prefill_caches())
yield self.create_child(
RevisionHistoryShardByDate,
name="by-date",
mode=int(EntryMode.RDONLY_DIR),
history_swhid=self.swhid,
)
yield self.create_child(
RevisionHistoryShardByHash,
name="by-hash",
mode=int(EntryMode.RDONLY_DIR),
history_swhid=self.swhid,
)
yield self.create_child(
RevisionHistoryShardByPage,
name="by-page",
mode=int(EntryMode.RDONLY_DIR),
history_swhid=self.swhid,
)
@dataclass
class RevisionHistoryShardByDate(FuseDirEntry):
""" Revision virtual `history/by-date` sharded directory """
history_swhid: SWHID
prefix: str = field(default="")
is_status_done: bool = field(default=False)
DATE_FMT = "{year:04d}/{month:02d}/{day:02d}/"
+ ENTRIES_REGEXP = re.compile(r"^([0-9]{2,4})|(" + SWHID_REGEXP + ")$")
@dataclass
class StatusFile(FuseFileEntry):
""" Temporary file used to indicate loading progress in by-date/ """
name: str = field(init=False, default=".status")
mode: int = field(init=False, default=int(EntryMode.RDONLY_FILE))
done: int
todo: int
async def get_content(self) -> bytes:
fmt = f"Done: {self.done}/{self.todo}\n"
return fmt.encode()
async def compute_entries(self) -> AsyncIterator[FuseEntry]:
history = await self.fuse.get_history(self.history_swhid)
# Only check for cached revisions with the appropriate prefix, since
# fetching all of them with the Web API would take too long
swhids = await self.fuse.cache.history.get_with_date_prefix(
self.history_swhid, date_prefix=self.prefix
)
depth = self.prefix.count("/")
root_path = self.get_relative_root_path()
sharded_dirs = set()
for (swhid, sharded_name) in swhids:
if not sharded_name.startswith(self.prefix):
continue
if depth == 3:
yield self.create_child(
FuseSymlinkEntry,
name=str(swhid),
target=Path(root_path, f"archive/{swhid}"),
)
# Create sharded directories
else:
next_prefix = sharded_name.split("/")[depth]
if next_prefix not in sharded_dirs:
sharded_dirs.add(next_prefix)
yield self.create_child(
RevisionHistoryShardByDate,
name=next_prefix,
mode=int(EntryMode.RDONLY_DIR),
prefix=f"{self.prefix}{next_prefix}/",
history_swhid=self.history_swhid,
)
# TODO: store len(history) somewhere to avoid recompute?
self.is_status_done = len(swhids) == len(history)
if not self.is_status_done and depth == 0:
yield self.create_child(
RevisionHistoryShardByDate.StatusFile,
done=len(swhids),
todo=len(history),
)
@dataclass
class RevisionHistoryShardByHash(FuseDirEntry):
""" Revision virtual `history/by-hash` sharded directory """
history_swhid: SWHID
prefix: str = field(default="")
SHARDING_LENGTH = 2
+ ENTRIES_REGEXP = re.compile(r"^([a-f0-9]+)|(" + SWHID_REGEXP + ")$")
async def compute_entries(self) -> AsyncIterator[FuseEntry]:
history = await self.fuse.get_history(self.history_swhid)
if self.prefix:
root_path = self.get_relative_root_path()
for swhid in history:
if swhid.object_id.startswith(self.prefix):
yield self.create_child(
FuseSymlinkEntry,
name=str(swhid),
target=Path(root_path, f"archive/{swhid}"),
)
# Create sharded directories
else:
sharded_dirs = set()
for swhid in history:
next_prefix = swhid.object_id[: self.SHARDING_LENGTH]
if next_prefix not in sharded_dirs:
sharded_dirs.add(next_prefix)
yield self.create_child(
RevisionHistoryShardByHash,
name=next_prefix,
mode=int(EntryMode.RDONLY_DIR),
prefix=next_prefix,
history_swhid=self.history_swhid,
)
@dataclass
class RevisionHistoryShardByPage(FuseDirEntry):
""" Revision virtual `history/by-page` sharded directory """
history_swhid: SWHID
prefix: int = field(default=None)
PAGE_SIZE = 10_000
PAGE_FMT = "{page_number:03d}"
+ ENTRIES_REGEXP = re.compile(r"^([0-9]+)|(" + SWHID_REGEXP + ")$")
async def compute_entries(self) -> AsyncIterator[FuseEntry]:
history = await self.fuse.get_history(self.history_swhid)
if self.prefix is not None:
current_page = self.prefix
root_path = self.get_relative_root_path()
max_idx = min(len(history), (current_page + 1) * self.PAGE_SIZE)
for i in range(current_page * self.PAGE_SIZE, max_idx):
swhid = history[i]
yield self.create_child(
FuseSymlinkEntry,
name=str(swhid),
target=Path(root_path, f"archive/{swhid}"),
)
# Create sharded directories
else:
for i in range(0, len(history), self.PAGE_SIZE):
page_number = i // self.PAGE_SIZE
yield self.create_child(
RevisionHistoryShardByPage,
name=self.PAGE_FMT.format(page_number=page_number),
mode=int(EntryMode.RDONLY_DIR),
history_swhid=self.history_swhid,
prefix=page_number,
)
@dataclass
class Release(FuseDirEntry):
""" Software Heritage release artifact.
Attributes:
swhid: Software Heritage persistent identifier
Release nodes are represented on the file-system as directories with the
following entries:
- `target`: target node, as a symlink to `archive/<SWHID>`
- `target_type`: regular file containing the type of the target SWHID
- `root`: present if and only if the release points to something that
(transitively) resolves to a directory. When present it is a symlink
pointing into `archive/` to the SWHID of the given directory
- `meta.json`: metadata for the current node, as a symlink pointing to the
relevant `archive/<SWHID>.json` file """
swhid: SWHID
async def find_root_directory(self, swhid: SWHID) -> SWHID:
if swhid.object_type == RELEASE:
metadata = await self.fuse.get_metadata(swhid)
return await self.find_root_directory(metadata["target"])
elif swhid.object_type == REVISION:
metadata = await self.fuse.get_metadata(swhid)
return metadata["directory"]
elif swhid.object_type == DIRECTORY:
return swhid
else:
return None
async def compute_entries(self) -> AsyncIterator[FuseEntry]:
metadata = await self.fuse.get_metadata(self.swhid)
root_path = self.get_relative_root_path()
yield self.create_child(
FuseSymlinkEntry,
name="meta.json",
target=Path(root_path, f"archive/{self.swhid}.json"),
)
target = metadata["target"]
yield self.create_child(
FuseSymlinkEntry, name="target", target=Path(root_path, f"archive/{target}")
)
yield self.create_child(
ReleaseType,
name="target_type",
mode=int(EntryMode.RDONLY_FILE),
target_type=target.object_type,
)
target_dir = await self.find_root_directory(target)
if target_dir is not None:
yield self.create_child(
FuseSymlinkEntry,
name="root",
target=Path(root_path, f"archive/{target_dir}"),
)
@dataclass
class ReleaseType(FuseFileEntry):
""" Release type virtual file """
target_type: str
async def get_content(self) -> bytes:
return str.encode(self.target_type + "\n")
@dataclass
class Snapshot(FuseDirEntry):
""" Software Heritage snapshot artifact.
Attributes:
swhid: Software Heritage persistent identifier
Snapshot nodes are represented on the file-system as recursive directories
following the branch names structure. For example, a branch named
``refs/tags/v1.0`` will be represented as a ``refs`` directory containing a
``tags`` directory containing a ``v1.0`` symlink pointing to the branch
target SWHID. """
swhid: SWHID
prefix: str = field(default="")
async def compute_entries(self) -> AsyncIterator[FuseEntry]:
metadata = await self.fuse.get_metadata(self.swhid)
root_path = self.get_relative_root_path()
subdirs = set()
for branch_name, branch_meta in metadata.items():
if not branch_name.startswith(self.prefix):
continue
next_subdirs = branch_name[len(self.prefix) :].split("/")
next_prefix = next_subdirs[0]
if len(next_subdirs) == 1:
# Non-alias targets are symlinks to their corresponding archived
# artifact, whereas alias targets are relative symlinks to the
# corresponding snapshot directory entry.
target_type = branch_meta["target_type"]
target_raw = branch_meta["target"]
if target_type == "alias":
prefix = Path(branch_name).parent
target = os.path.relpath(target_raw, prefix)
else:
target = f"{root_path}/archive/{target_raw}"
yield self.create_child(
FuseSymlinkEntry, name=next_prefix, target=Path(target),
)
else:
subdirs.add(next_prefix)
for subdir in subdirs:
yield self.create_child(
Snapshot,
name=subdir,
mode=int(EntryMode.RDONLY_DIR),
swhid=self.swhid,
prefix=f"{self.prefix}{subdir}/",
)
@dataclass
class Origin(FuseDirEntry):
""" Software Heritage origin artifact.
Origin nodes are represented on the file-system as directories with one
entry for each origin visit.
The visits directories are named after the visit date (`YYYY-MM-DD`, if
multiple visits occur the same day only the first one is kept). Each visit
directory contains a `meta.json` with associated metadata for the origin
node, and potentially a `snapshot` symlink pointing to the visit's snapshot
node. """
DATE_FMT = "{year:04d}-{month:02d}-{day:02d}"
+ ENTRIES_REGEXP = re.compile(r"^[0-9]{4}-[0-9]{2}-[0-9]{2}$")
async def compute_entries(self) -> AsyncIterator[FuseEntry]:
# The origin's name is always its URL (encoded to create a valid UNIX filename)
visits = await self.fuse.get_visits(self.name)
seen_date = set()
for visit in visits:
date = visit["date"]
name = self.DATE_FMT.format(year=date.year, month=date.month, day=date.day)
if name in seen_date:
logging.debug(
"Conflict date on origin: %s, %s", visit["origin"], str(name)
)
else:
seen_date.add(name)
yield self.create_child(
OriginVisit, name=name, mode=int(EntryMode.RDONLY_DIR), meta=visit,
)
@dataclass
class OriginVisit(FuseDirEntry):
""" Origin visit virtual directory """
meta: Dict[str, Any]
@dataclass
class MetaFile(FuseFileEntry):
content: str
async def get_content(self) -> bytes:
return str.encode(self.content + "\n")
async def compute_entries(self) -> AsyncIterator[FuseEntry]:
snapshot_swhid = self.meta["snapshot"]
if snapshot_swhid:
root_path = self.get_relative_root_path()
yield self.create_child(
FuseSymlinkEntry,
name="snapshot",
target=Path(root_path, f"archive/{snapshot_swhid}"),
)
yield self.create_child(
OriginVisit.MetaFile,
name="meta.json",
mode=int(EntryMode.RDONLY_FILE),
content=json.dumps(
self.meta,
indent=self.fuse.conf["json-indent"],
default=lambda x: str(x),
),
)
OBJTYPE_GETTERS = {
CONTENT: Content,
DIRECTORY: Directory,
REVISION: Revision,
RELEASE: Release,
SNAPSHOT: Snapshot,
}
diff --git a/swh/fuse/fs/entry.py b/swh/fuse/fs/entry.py
index e295e3d..cada9bb 100644
--- a/swh/fuse/fs/entry.py
+++ b/swh/fuse/fs/entry.py
@@ -1,127 +1,141 @@
# Copyright (C) 2020 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
from __future__ import annotations
from dataclasses import dataclass, field
from enum import IntEnum
from pathlib import Path
+import re
from stat import S_IFDIR, S_IFLNK, S_IFREG
-from typing import Any, AsyncIterator, Sequence, Union
+from typing import Any, AsyncIterator, Optional, Pattern, Sequence, Union
# Avoid cycling import
Fuse = "Fuse"
class EntryMode(IntEnum):
""" Default entry mode and permissions for the FUSE.
The FUSE mount is always read-only, even if permissions contradict this
statement (in a context of a directory, entries are listed with permissions
taken from the archive).
"""
RDONLY_FILE = S_IFREG | 0o444
RDONLY_DIR = S_IFDIR | 0o555
SYMLINK = S_IFLNK | 0o444
@dataclass
class FuseEntry:
""" Main wrapper class to manipulate virtual FUSE entries
Attributes:
name: entry filename
mode: entry permission mode
fuse: internal reference to the main FUSE class
inode: unique integer identifying the entry
"""
name: str
mode: int
depth: int
fuse: Fuse
inode: int = field(init=False)
def __post_init__(self):
self.inode = self.fuse._alloc_inode(self)
async def size(self) -> int:
""" Return the size (in bytes) of an entry """
raise NotImplementedError
def get_relative_root_path(self) -> str:
return "../" * (self.depth - 1)
def create_child(self, constructor: Any, **kwargs) -> FuseEntry:
return constructor(depth=self.depth + 1, fuse=self.fuse, **kwargs)
+@dataclass
class FuseFileEntry(FuseEntry):
""" FUSE virtual file entry """
async def get_content(self) -> bytes:
""" Return the content of a file entry """
raise NotImplementedError
async def size(self) -> int:
return len(await self.get_content())
+@dataclass
class FuseDirEntry(FuseEntry):
""" FUSE virtual directory entry """
+ ENTRIES_REGEXP: Optional[Pattern] = field(init=False, default=None)
+
async def size(self) -> int:
return 0
+ def validate_entry(self, name: str) -> bool:
+ """ Return true if the name matches the directory entries regular
+ expression, and false otherwise """
+
+ if self.ENTRIES_REGEXP:
+ return re.match(self.ENTRIES_REGEXP, name)
+ else:
+ return True
+
async def compute_entries(self) -> Sequence[FuseEntry]:
""" Return the child entries of a directory entry """
raise NotImplementedError
async def get_entries(self, offset: int = 0) -> AsyncIterator[FuseEntry]:
""" Return the child entries of a directory entry using direntry cache """
cache = self.fuse.cache.direntry.get(self)
if cache:
entries = cache
else:
entries = [x async for x in self.compute_entries()]
self.fuse.cache.direntry.set(self, entries)
# Avoid copy by manual iteration (instead of slicing) and use of a
# generator (instead of returning the full list every time)
for i in range(offset, len(entries)):
yield entries[i]
async def lookup(self, name: str) -> FuseEntry:
""" Look up a FUSE entry by name """
async for entry in self.get_entries():
if entry.name == name:
return entry
return None
@dataclass
class FuseSymlinkEntry(FuseEntry):
""" FUSE virtual symlink entry
Attributes:
target: path to symlink target
"""
mode: int = field(init=False, default=int(EntryMode.SYMLINK))
target: Union[str, bytes, Path]
async def size(self) -> int:
return len(str(self.target))
def get_target(self) -> Union[str, bytes, Path]:
""" Return the path target of a symlink entry """
return self.target
diff --git a/swh/fuse/fs/mountpoint.py b/swh/fuse/fs/mountpoint.py
index 7f930c3..97a4d07 100644
--- a/swh/fuse/fs/mountpoint.py
+++ b/swh/fuse/fs/mountpoint.py
@@ -1,123 +1,127 @@
# Copyright (C) 2020 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
from dataclasses import dataclass, field
import json
+import re
from typing import AsyncIterator
-from swh.fuse.fs.artifact import OBJTYPE_GETTERS, Origin
+from swh.fuse.fs.artifact import OBJTYPE_GETTERS, SWHID_REGEXP, Origin
from swh.fuse.fs.entry import EntryMode, FuseDirEntry, FuseEntry, FuseFileEntry
from swh.model.exceptions import ValidationError
from swh.model.identifiers import CONTENT, SWHID, parse_swhid
@dataclass
class Root(FuseDirEntry):
""" The FUSE mountpoint, consisting of the archive/ and origin/ directories """
name: str = field(init=False, default=None)
mode: int = field(init=False, default=int(EntryMode.RDONLY_DIR))
depth: int = field(init=False, default=1)
async def compute_entries(self) -> AsyncIterator[FuseEntry]:
yield self.create_child(ArchiveDir)
yield self.create_child(OriginDir)
@dataclass
class ArchiveDir(FuseDirEntry):
""" The `archive/` virtual directory allows to mount any artifact on the fly
using its SWHID as name. The associated metadata of the artifact from the
Software Heritage Web API can also be accessed through the `SWHID.json` file
(in case of pagination, the JSON file will contain a complete version with
all pages merged together). Note: the archive directory cannot be listed
with ls, but entries in it can be accessed (e.g., using cat or cd). """
name: str = field(init=False, default="archive")
mode: int = field(init=False, default=int(EntryMode.RDONLY_DIR))
+ ENTRIES_REGEXP = re.compile(r"^(" + SWHID_REGEXP + ")(.json)?$")
JSON_SUFFIX = ".json"
async def compute_entries(self) -> AsyncIterator[FuseEntry]:
return
yield
async def lookup(self, name: str) -> FuseEntry:
# On the fly mounting of a new artifact
try:
if name.endswith(self.JSON_SUFFIX):
swhid = parse_swhid(name[: -len(self.JSON_SUFFIX)])
return self.create_child(
MetaEntry,
name=f"{swhid}{self.JSON_SUFFIX}",
mode=int(EntryMode.RDONLY_FILE),
swhid=swhid,
)
else:
swhid = parse_swhid(name)
await self.fuse.get_metadata(swhid)
return self.create_child(
OBJTYPE_GETTERS[swhid.object_type],
name=str(swhid),
mode=int(
EntryMode.RDONLY_FILE
if swhid.object_type == CONTENT
else EntryMode.RDONLY_DIR
),
swhid=swhid,
)
except ValidationError:
return None
@dataclass
class MetaEntry(FuseFileEntry):
""" An entry for a `archive/<SWHID>.json` file, containing all the SWHID's
metadata from the Software Heritage archive. """
swhid: SWHID
async def get_content(self) -> bytes:
# Make sure the metadata is in cache
await self.fuse.get_metadata(self.swhid)
# Retrieve raw JSON metadata from cache (un-typified)
metadata = await self.fuse.cache.metadata.get(self.swhid, typify=False)
json_str = json.dumps(metadata, indent=self.fuse.conf["json-indent"])
return (json_str + "\n").encode()
async def size(self) -> int:
return len(await self.get_content())
@dataclass
class OriginDir(FuseDirEntry):
""" The origin/ directory is lazily populated with one entry per accessed
origin URL (mangled to create a valid UNIX filename). The URL encoding is
done using the percent-encoding mechanism described in RFC 3986. """
name: str = field(init=False, default="origin")
mode: int = field(init=False, default=int(EntryMode.RDONLY_DIR))
+ ENTRIES_REGEXP = re.compile(r"^.*%3A.*$") # %3A is the encoded version of ':'
+
def create_child(self, url_encoded: str) -> FuseEntry:
return super().create_child(
Origin, name=url_encoded, mode=int(EntryMode.RDONLY_DIR),
)
async def compute_entries(self) -> AsyncIterator[FuseEntry]:
async for url in self.fuse.cache.get_cached_visits():
yield self.create_child(url)
async def lookup(self, name: str) -> FuseEntry:
entry = await super().lookup(name)
if entry:
return entry
# On the fly mounting of new origin url
try:
url_encoded = name
await self.fuse.get_visits(url_encoded)
return self.create_child(url_encoded)
except ValueError:
return None
diff --git a/swh/fuse/fuse.py b/swh/fuse/fuse.py
index 6edd2c5..00d1a5b 100644
--- a/swh/fuse/fuse.py
+++ b/swh/fuse/fuse.py
@@ -1,331 +1,332 @@
# Copyright (C) 2020 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import asyncio
import errno
import functools
import logging
import os
from pathlib import Path
import time
from typing import Any, Dict, List
import urllib.parse
import pyfuse3
import pyfuse3_asyncio
import requests
from swh.fuse import LOGGER_NAME
from swh.fuse.cache import FuseCache
from swh.fuse.fs.entry import FuseDirEntry, FuseEntry, FuseFileEntry, FuseSymlinkEntry
from swh.fuse.fs.mountpoint import Root
from swh.model.identifiers import CONTENT, REVISION, SWHID
from swh.web.client.client import WebAPIClient
class Fuse(pyfuse3.Operations):
""" Software Heritage Filesystem in Userspace (FUSE). Locally mount parts of
the archive and navigate it as a virtual file system. """
def __init__(
self, root_path: Path, cache: FuseCache, conf: Dict[str, Any],
):
super(Fuse, self).__init__()
self._next_inode: int = pyfuse3.ROOT_INODE
self._inode2entry: Dict[int, FuseEntry] = {}
self.root = Root(fuse=self)
self.conf = conf
self.logger = logging.getLogger(LOGGER_NAME)
self.time_ns: int = time.time_ns() # start time, used as timestamp
self.gid = os.getgid()
self.uid = os.getuid()
self.web_api = WebAPIClient(
conf["web-api"]["url"], conf["web-api"]["auth-token"]
)
self.cache = cache
def shutdown(self) -> None:
pass
def _alloc_inode(self, entry: FuseEntry) -> int:
""" Return a unique inode integer for a given entry """
inode = self._next_inode
self._next_inode += 1
self._inode2entry[inode] = entry
# TODO add inode recycling with invocation to invalidate_inode when
# the dicts get too big
return inode
def inode2entry(self, inode: int) -> FuseEntry:
""" Return the entry matching a given inode """
try:
return self._inode2entry[inode]
except KeyError:
raise pyfuse3.FUSEError(errno.ENOENT)
async def get_metadata(self, swhid: SWHID) -> Any:
""" Retrieve metadata for a given SWHID using Software Heritage API """
cache = await self.cache.metadata.get(swhid)
if cache:
return cache
try:
typify = False # Get the raw JSON from the API
# TODO: async web API
loop = asyncio.get_event_loop()
metadata = await loop.run_in_executor(None, self.web_api.get, swhid, typify)
await self.cache.metadata.set(swhid, metadata)
# Retrieve it from cache so it is correctly typed
return await self.cache.metadata.get(swhid)
except requests.HTTPError as err:
self.logger.error("Cannot fetch metadata for object %s: %s", swhid, err)
raise
async def get_blob(self, swhid: SWHID) -> bytes:
""" Retrieve the blob bytes for a given content SWHID using Software
Heritage API """
if swhid.object_type != CONTENT:
raise pyfuse3.FUSEError(errno.EINVAL)
# Make sure the metadata cache is also populated with the given SWHID
await self.get_metadata(swhid)
cache = await self.cache.blob.get(swhid)
if cache:
self.logger.debug("Found blob %s in cache", swhid)
return cache
try:
self.logger.debug("Retrieving blob %s via web API...", swhid)
loop = asyncio.get_event_loop()
resp = await loop.run_in_executor(None, self.web_api.content_raw, swhid)
blob = b"".join(list(resp))
await self.cache.blob.set(swhid, blob)
return blob
except requests.HTTPError as err:
self.logger.error("Cannot fetch blob for object %s: %s", swhid, err)
raise
async def get_history(self, swhid: SWHID) -> List[SWHID]:
""" Retrieve a revision's history using Software Heritage Graph API """
if swhid.object_type != REVISION:
raise pyfuse3.FUSEError(errno.EINVAL)
cache = await self.cache.history.get(swhid)
if cache:
self.logger.debug(
"Found history of %s in cache (%d ancestors)", swhid, len(cache)
)
return cache
try:
# Use the swh-graph API to retrieve the full history very fast
self.logger.debug("Retrieving history of %s via graph API...", swhid)
call = f"graph/visit/edges/{swhid}?edges=rev:rev"
loop = asyncio.get_event_loop()
history = await loop.run_in_executor(None, self.web_api._call, call)
await self.cache.history.set(history.text)
# Retrieve it from cache so it is correctly typed
res = await self.cache.history.get(swhid)
return res
except requests.HTTPError as err:
self.logger.error("Cannot fetch history for object %s: %s", swhid, err)
# Ignore exception since swh-graph does not necessarily contain the
# most recent artifacts from the archive. Computing the full history
# from the Web API is too computationally intensive so simply return
# an empty list.
return []
async def get_visits(self, url_encoded: str) -> List[Dict[str, Any]]:
""" Retrieve origin visits given an encoded-URL using Software Heritage API """
cache = await self.cache.metadata.get_visits(url_encoded)
if cache:
self.logger.debug(
"Found %d visits for origin '%s' in cache", len(cache), url_encoded,
)
return cache
try:
self.logger.debug(
"Retrieving visits for origin '%s' via web API...", url_encoded
)
typify = False # Get the raw JSON from the API
loop = asyncio.get_event_loop()
# Web API only takes non-encoded URL
url = urllib.parse.unquote_plus(url_encoded)
origin_exists = await loop.run_in_executor(
None, self.web_api.origin_exists, url
)
if not origin_exists:
raise ValueError("origin does not exist")
visits_it = await loop.run_in_executor(
None, functools.partial(self.web_api.visits, url, typify=typify)
)
visits = list(visits_it)
await self.cache.metadata.set_visits(url_encoded, visits)
# Retrieve it from cache so it is correctly typed
res = await self.cache.metadata.get_visits(url_encoded)
return res
except (ValueError, requests.HTTPError) as err:
self.logger.error(
"Cannot fetch visits for origin '%s': %s", url_encoded, err
)
raise
async def get_attrs(self, entry: FuseEntry) -> pyfuse3.EntryAttributes:
""" Return entry attributes """
attrs = pyfuse3.EntryAttributes()
attrs.st_size = 0
attrs.st_atime_ns = self.time_ns
attrs.st_ctime_ns = self.time_ns
attrs.st_mtime_ns = self.time_ns
attrs.st_gid = self.gid
attrs.st_uid = self.uid
attrs.st_ino = entry.inode
attrs.st_mode = entry.mode
attrs.st_size = await entry.size()
return attrs
async def getattr(
self, inode: int, _ctx: pyfuse3.RequestContext
) -> pyfuse3.EntryAttributes:
""" Get attributes for a given inode """
entry = self.inode2entry(inode)
return await self.get_attrs(entry)
async def opendir(self, inode: int, _ctx: pyfuse3.RequestContext) -> int:
""" Open a directory referred by a given inode """
# Re-use inode as directory handle
self.logger.debug("opendir(inode=%d)", inode)
return inode
async def readdir(self, fh: int, offset: int, token: pyfuse3.ReaddirToken) -> None:
""" Read entries in an open directory """
# opendir() uses inode as directory handle
inode = fh
direntry = self.inode2entry(inode)
self.logger.debug(
"readdir(dirname=%s, fh=%d, offset=%d)", direntry.name, fh, offset
)
assert isinstance(direntry, FuseDirEntry)
next_id = offset + 1
try:
async for entry in direntry.get_entries(offset):
name = os.fsencode(entry.name)
attrs = await self.get_attrs(entry)
if not pyfuse3.readdir_reply(token, name, attrs, next_id):
break
next_id += 1
self._inode2entry[attrs.st_ino] = entry
except Exception as err:
self.logger.exception("Cannot readdir: %s", err)
raise pyfuse3.FUSEError(errno.ENOENT)
async def open(
self, inode: int, _flags: int, _ctx: pyfuse3.RequestContext
) -> pyfuse3.FileInfo:
""" Open an inode and return a unique file handle """
# Re-use inode as file handle
self.logger.debug("open(inode=%d)", inode)
return pyfuse3.FileInfo(fh=inode, keep_cache=True)
async def read(self, fh: int, offset: int, length: int) -> bytes:
""" Read `length` bytes from file handle `fh` at position `offset` """
# open() uses inode as file handle
inode = fh
entry = self.inode2entry(inode)
self.logger.debug(
"read(name=%s, fh=%d, offset=%d, length=%d)", entry.name, fh, offset, length
)
assert isinstance(entry, FuseFileEntry)
try:
data = await entry.get_content()
return data[offset : offset + length]
except Exception as err:
self.logger.exception("Cannot read: %s", err)
raise pyfuse3.FUSEError(errno.ENOENT)
async def lookup(
self, parent_inode: int, name: str, _ctx: pyfuse3.RequestContext
) -> pyfuse3.EntryAttributes:
""" Look up a directory entry by name and get its attributes """
name = os.fsdecode(name)
parent_entry = self.inode2entry(parent_inode)
self.logger.debug(
"lookup(parent_name=%s, parent_inode=%d, name=%s)",
parent_entry.name,
parent_inode,
name,
)
assert isinstance(parent_entry, FuseDirEntry)
try:
- lookup_entry = await parent_entry.lookup(name)
- if lookup_entry:
- return await self.get_attrs(lookup_entry)
+ if parent_entry.validate_entry(name):
+ lookup_entry = await parent_entry.lookup(name)
+ if lookup_entry:
+ return await self.get_attrs(lookup_entry)
except Exception as err:
self.logger.exception("Cannot lookup: %s", err)
raise pyfuse3.FUSEError(errno.ENOENT)
async def readlink(self, inode: int, _ctx: pyfuse3.RequestContext) -> bytes:
entry = self.inode2entry(inode)
self.logger.debug("readlink(name=%s, inode=%d)", entry.name, inode)
assert isinstance(entry, FuseSymlinkEntry)
return os.fsencode(entry.get_target())
async def main(swhids: List[SWHID], root_path: Path, conf: Dict[str, Any]) -> None:
""" swh-fuse CLI entry-point """
# Use pyfuse3 asyncio layer to match the rest of Software Heritage codebase
pyfuse3_asyncio.enable()
async with FuseCache(conf["cache"]) as cache:
fs = Fuse(root_path, cache, conf)
# Initially populate the cache
for swhid in swhids:
try:
await fs.get_metadata(swhid)
except Exception as err:
fs.logger.exception("Cannot prefetch object %s: %s", swhid, err)
fuse_options = set(pyfuse3.default_options)
fuse_options.add("fsname=swhfs")
try:
pyfuse3.init(fs, root_path, fuse_options)
await pyfuse3.main()
except Exception as err:
fs.logger.error("Error running FUSE: %s", err)
finally:
fs.shutdown()
pyfuse3.close(unmount=True)

File Metadata

Mime Type
text/x-diff
Expires
Thu, Jul 3, 10:49 AM (1 w, 5 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3252007

Event Timeline