diff --git a/docs/configuration.rst b/docs/configuration.rst
index 184aa46..9c60a14 100644
--- a/docs/configuration.rst
+++ b/docs/configuration.rst
@@ -1,71 +1,71 @@
.. _swh-fuse-config:
Configuration
=============
The configuration for the Software Heritage Filesystem resides in the
``swh > fuse`` section of the shared `YAML `_ configuration
file used by all Software Heritage tools, located by default at
``~/.config/swh/global.yml``.
The configuration file location is subject to the `XDG Base Directory
`_ specification as
well as explicitly overridden on the :ref:`command line ` via the
``-C/--config-file`` flag.
The following sub-sections and fields can be used within the `swh > fuse`
stanza:
- ``cache``:
- ``metadata``: where to store the metadata cache, must have either a
``in-memory`` boolean entry or a ``path`` string entry (with the
corresponding disk path)
- ``blob``: where to store the blob cache, same entries as the ``metadata``
cache
- ``web-api``:
- ``url``: archive API URL
- ``auth-token``: authentication token used with the API URL
If no configuration is given, default values are:
- ``cache``: all cache files are stored in ``$XDG_CACHE_HOME/swh/fuse/`` (or
``~/.cache/swh/fuse`` if ``XDG_CACHE_HOME`` is not set)
- ``web-api``: default URL is ,
with no authentication token
Example
-------
Here is a full ``~/.config/swh/global.yml`` example, showcasing different cache
storage strategies (in-memory for metadata and on-disk for blob), using the
default Web API service:
.. code:: yaml
swh:
fuse:
cache:
metadata:
in-memory: true
blob:
path: "/path/to/cache/blob.sqlite"
web-api:
url: "https://archive.softwareheritage.org/api/1/"
auth-token: null
Logging
-------
The default logging level is set to ``INFO`` and can be configured through the
:ref:`shared command line interface ` via the ``-l/--log-level``
flag.
.. code:: bash
- $ swh --log-level DEBUG fs mount swhfs/
+ $ swh --log-level swh.fuse:DEBUG fs mount swhfs/
diff --git a/requirements-swh.txt b/requirements-swh.txt
index c9cabb0..62940b5 100644
--- a/requirements-swh.txt
+++ b/requirements-swh.txt
@@ -1,4 +1,4 @@
# Add here internal Software Heritage dependencies, one per line.
-swh.core
+swh.core>=0.10.0
swh.model>=0.7.0
swh.web.client>=0.2.3
diff --git a/swh/fuse/__init__.py b/swh/fuse/__init__.py
index e69de29..6cd93a9 100644
--- a/swh/fuse/__init__.py
+++ b/swh/fuse/__init__.py
@@ -0,0 +1 @@
+LOGGER_NAME = __name__
diff --git a/swh/fuse/cli.py b/swh/fuse/cli.py
index 0435c0d..ec18dc4 100644
--- a/swh/fuse/cli.py
+++ b/swh/fuse/cli.py
@@ -1,203 +1,206 @@
# Copyright (C) 2020 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
# WARNING: do not import unnecessary things here to keep cli startup time under
# control
import os
from pathlib import Path
from typing import Any, Dict
import click
from swh.core.cli import CONTEXT_SETTINGS
from swh.core.cli import swh as swh_cli_group
from swh.model.cli import SWHIDParamType
# All generic config code should reside in swh.core.config
DEFAULT_CONFIG_PATH = os.environ.get(
"SWH_CONFIG_FILE", os.path.join(click.get_app_dir("swh"), "global.yml")
)
CACHE_HOME_DIR: Path = (
Path(os.environ["XDG_CACHE_HOME"])
if "XDG_CACHE_HOME" in os.environ
else Path.home() / ".cache"
)
DEFAULT_CONFIG: Dict[str, Any] = {
"cache": {
"metadata": {"path": str(CACHE_HOME_DIR / "swh/fuse/metadata.sqlite")},
"blob": {"path": str(CACHE_HOME_DIR / "swh/fuse/blob.sqlite")},
"direntry": {"maxram": "10%"},
},
"web-api": {
"url": "https://archive.softwareheritage.org/api/1",
"auth-token": None,
},
"json-indent": 2,
}
@swh_cli_group.group(name="fs", context_settings=CONTEXT_SETTINGS)
@click.option(
"-C",
"--config-file",
default=None,
type=click.Path(exists=True, dir_okay=False, path_type=str),
help=f"Configuration file (default: {DEFAULT_CONFIG_PATH})",
)
@click.pass_context
def fuse(ctx, config_file):
"""Software Heritage virtual file system"""
import logging
import yaml
from swh.core import config
if not config_file:
config_file = DEFAULT_CONFIG_PATH
try:
conf = config.read_raw_config(config.config_basepath(config_file))
if not conf:
raise ValueError(f"Cannot parse configuration file: {config_file}")
if config_file == DEFAULT_CONFIG_PATH:
try:
conf = conf["swh"]["fuse"]
except KeyError:
pass
# recursive merge not done by config.read
conf = config.merge_configs(DEFAULT_CONFIG, conf)
except Exception:
logging.warning(
"Using default configuration (cannot load custom one)", exc_info=True
)
conf = DEFAULT_CONFIG
logging.debug("Read configuration: \n%s", yaml.dump(conf))
ctx.ensure_object(dict)
ctx.obj["config"] = conf
@fuse.command(name="mount")
@click.argument(
"path",
required=True,
metavar="PATH",
type=click.Path(exists=True, dir_okay=True, file_okay=False),
)
@click.argument("swhids", nargs=-1, metavar="[SWHID]...", type=SWHIDParamType())
@click.option(
"-f/-d",
"--foreground/--daemon",
default=False,
help="whether to run FUSE attached to the console (foreground) "
"or daemonized in the background (default: daemon)",
)
@click.pass_context
def mount(ctx, swhids, path, foreground):
"""Mount the Software Heritage virtual file system at PATH.
If specified, objects referenced by the given SWHIDs will be prefetched and used to
populate the virtual file system (VFS). Otherwise the VFS will be populated
on-demand, when accessing its content.
\b
Example:
\b
$ mkdir swhfs
$ swh fs mount swhfs/
$ grep printf swhfs/archive/swh:1:cnt:c839dea9e8e6f0528b468214348fee8669b305b2
printf("Hello, World!");
$
"""
import asyncio
from contextlib import ExitStack
import logging
from daemon import DaemonContext
- from swh.fuse import fuse
+ from swh.fuse import LOGGER_NAME, fuse
# TODO: set default logging settings when --log-config is not passed
# DEFAULT_LOG_PATH = Path(".local/swh/fuse/mount.log")
with ExitStack() as stack:
if not foreground:
# TODO: temporary fix until swh.core has the proper logging utilities
# Disable logging config before daemonizing, and reset it once
# daemonized to be sure to not close file handlers
+ log_level = logging.getLogger(LOGGER_NAME).getEffectiveLevel()
logging.shutdown()
# Stay in the current working directory when spawning daemon
cwd = os.getcwd()
stack.enter_context(DaemonContext(working_directory=cwd))
logging.config.dictConfig(
{
"version": 1,
"handlers": {
"syslog": {
"class": "logging.handlers.SysLogHandler",
"address": "/dev/log",
},
},
- "root": {"level": ctx.obj["log_level"], "handlers": ["syslog"],},
+ "loggers": {
+ LOGGER_NAME: {"level": log_level, "handlers": ["syslog"],},
+ },
}
)
conf = ctx.obj["config"]
asyncio.run(fuse.main(swhids, path, conf))
@fuse.command()
@click.argument(
"path",
required=True,
metavar="PATH",
type=click.Path(exists=True, dir_okay=True, file_okay=False),
)
@click.pass_context
def umount(ctx, path):
"""Unmount a mounted virtual file system.
Note: this is equivalent to ``fusermount -u PATH``, which can be used to unmount any
FUSE-based virtual file system. See ``man fusermount3``.
"""
import logging
import subprocess
try:
subprocess.run(["fusermount", "-u", path], check=True)
except subprocess.CalledProcessError as err:
logging.error(
"cannot unmount virtual file system: '%s' returned exit status %d",
" ".join(err.cmd),
err.returncode,
)
ctx.exit(1)
@fuse.command()
@click.pass_context
def clean(ctx):
"""Clean on-disk cache(s).
"""
def rm_cache(conf, cache_name):
try:
Path(conf["cache"][cache_name]["path"]).unlink()
except (FileNotFoundError, KeyError):
pass
conf = ctx.obj["config"]
for cache_name in ["blob", "metadata"]:
rm_cache(conf, cache_name)
diff --git a/swh/fuse/fuse.py b/swh/fuse/fuse.py
index d9046ff..6edd2c5 100644
--- a/swh/fuse/fuse.py
+++ b/swh/fuse/fuse.py
@@ -1,298 +1,331 @@
# Copyright (C) 2020 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import asyncio
import errno
import functools
import logging
import os
from pathlib import Path
import time
from typing import Any, Dict, List
import urllib.parse
import pyfuse3
import pyfuse3_asyncio
import requests
+from swh.fuse import LOGGER_NAME
from swh.fuse.cache import FuseCache
from swh.fuse.fs.entry import FuseDirEntry, FuseEntry, FuseFileEntry, FuseSymlinkEntry
from swh.fuse.fs.mountpoint import Root
from swh.model.identifiers import CONTENT, REVISION, SWHID
from swh.web.client.client import WebAPIClient
class Fuse(pyfuse3.Operations):
""" Software Heritage Filesystem in Userspace (FUSE). Locally mount parts of
the archive and navigate it as a virtual file system. """
def __init__(
self, root_path: Path, cache: FuseCache, conf: Dict[str, Any],
):
super(Fuse, self).__init__()
self._next_inode: int = pyfuse3.ROOT_INODE
self._inode2entry: Dict[int, FuseEntry] = {}
self.root = Root(fuse=self)
self.conf = conf
+ self.logger = logging.getLogger(LOGGER_NAME)
self.time_ns: int = time.time_ns() # start time, used as timestamp
self.gid = os.getgid()
self.uid = os.getuid()
self.web_api = WebAPIClient(
conf["web-api"]["url"], conf["web-api"]["auth-token"]
)
self.cache = cache
def shutdown(self) -> None:
pass
def _alloc_inode(self, entry: FuseEntry) -> int:
""" Return a unique inode integer for a given entry """
inode = self._next_inode
self._next_inode += 1
self._inode2entry[inode] = entry
# TODO add inode recycling with invocation to invalidate_inode when
# the dicts get too big
return inode
def inode2entry(self, inode: int) -> FuseEntry:
""" Return the entry matching a given inode """
try:
return self._inode2entry[inode]
except KeyError:
raise pyfuse3.FUSEError(errno.ENOENT)
async def get_metadata(self, swhid: SWHID) -> Any:
""" Retrieve metadata for a given SWHID using Software Heritage API """
cache = await self.cache.metadata.get(swhid)
if cache:
return cache
try:
typify = False # Get the raw JSON from the API
# TODO: async web API
loop = asyncio.get_event_loop()
metadata = await loop.run_in_executor(None, self.web_api.get, swhid, typify)
await self.cache.metadata.set(swhid, metadata)
# Retrieve it from cache so it is correctly typed
return await self.cache.metadata.get(swhid)
except requests.HTTPError as err:
- logging.error("Cannot fetch metadata for object %s: %s", swhid, err)
+ self.logger.error("Cannot fetch metadata for object %s: %s", swhid, err)
raise
async def get_blob(self, swhid: SWHID) -> bytes:
""" Retrieve the blob bytes for a given content SWHID using Software
Heritage API """
if swhid.object_type != CONTENT:
raise pyfuse3.FUSEError(errno.EINVAL)
# Make sure the metadata cache is also populated with the given SWHID
await self.get_metadata(swhid)
cache = await self.cache.blob.get(swhid)
if cache:
+ self.logger.debug("Found blob %s in cache", swhid)
return cache
try:
+ self.logger.debug("Retrieving blob %s via web API...", swhid)
loop = asyncio.get_event_loop()
resp = await loop.run_in_executor(None, self.web_api.content_raw, swhid)
blob = b"".join(list(resp))
await self.cache.blob.set(swhid, blob)
return blob
except requests.HTTPError as err:
- logging.error("Cannot fetch blob for object %s: %s", swhid, err)
+ self.logger.error("Cannot fetch blob for object %s: %s", swhid, err)
raise
async def get_history(self, swhid: SWHID) -> List[SWHID]:
""" Retrieve a revision's history using Software Heritage Graph API """
if swhid.object_type != REVISION:
raise pyfuse3.FUSEError(errno.EINVAL)
cache = await self.cache.history.get(swhid)
if cache:
+ self.logger.debug(
+ "Found history of %s in cache (%d ancestors)", swhid, len(cache)
+ )
return cache
try:
# Use the swh-graph API to retrieve the full history very fast
+ self.logger.debug("Retrieving history of %s via graph API...", swhid)
call = f"graph/visit/edges/{swhid}?edges=rev:rev"
loop = asyncio.get_event_loop()
history = await loop.run_in_executor(None, self.web_api._call, call)
await self.cache.history.set(history.text)
# Retrieve it from cache so it is correctly typed
- return await self.cache.history.get(swhid)
+ res = await self.cache.history.get(swhid)
+ return res
except requests.HTTPError as err:
- logging.error("Cannot fetch history for object %s: %s", swhid, err)
+ self.logger.error("Cannot fetch history for object %s: %s", swhid, err)
# Ignore exception since swh-graph does not necessarily contain the
# most recent artifacts from the archive. Computing the full history
# from the Web API is too computationally intensive so simply return
# an empty list.
return []
async def get_visits(self, url_encoded: str) -> List[Dict[str, Any]]:
""" Retrieve origin visits given an encoded-URL using Software Heritage API """
cache = await self.cache.metadata.get_visits(url_encoded)
if cache:
+ self.logger.debug(
+ "Found %d visits for origin '%s' in cache", len(cache), url_encoded,
+ )
return cache
try:
+ self.logger.debug(
+ "Retrieving visits for origin '%s' via web API...", url_encoded
+ )
typify = False # Get the raw JSON from the API
loop = asyncio.get_event_loop()
# Web API only takes non-encoded URL
url = urllib.parse.unquote_plus(url_encoded)
origin_exists = await loop.run_in_executor(
None, self.web_api.origin_exists, url
)
if not origin_exists:
raise ValueError("origin does not exist")
visits_it = await loop.run_in_executor(
None, functools.partial(self.web_api.visits, url, typify=typify)
)
visits = list(visits_it)
await self.cache.metadata.set_visits(url_encoded, visits)
# Retrieve it from cache so it is correctly typed
- return await self.cache.metadata.get_visits(url_encoded)
+ res = await self.cache.metadata.get_visits(url_encoded)
+ return res
except (ValueError, requests.HTTPError) as err:
- logging.error("Cannot fetch visits for origin '%s': %s", url_encoded, err)
+ self.logger.error(
+ "Cannot fetch visits for origin '%s': %s", url_encoded, err
+ )
raise
async def get_attrs(self, entry: FuseEntry) -> pyfuse3.EntryAttributes:
""" Return entry attributes """
attrs = pyfuse3.EntryAttributes()
attrs.st_size = 0
attrs.st_atime_ns = self.time_ns
attrs.st_ctime_ns = self.time_ns
attrs.st_mtime_ns = self.time_ns
attrs.st_gid = self.gid
attrs.st_uid = self.uid
attrs.st_ino = entry.inode
attrs.st_mode = entry.mode
attrs.st_size = await entry.size()
return attrs
async def getattr(
self, inode: int, _ctx: pyfuse3.RequestContext
) -> pyfuse3.EntryAttributes:
""" Get attributes for a given inode """
entry = self.inode2entry(inode)
return await self.get_attrs(entry)
async def opendir(self, inode: int, _ctx: pyfuse3.RequestContext) -> int:
""" Open a directory referred by a given inode """
# Re-use inode as directory handle
+ self.logger.debug("opendir(inode=%d)", inode)
return inode
async def readdir(self, fh: int, offset: int, token: pyfuse3.ReaddirToken) -> None:
""" Read entries in an open directory """
# opendir() uses inode as directory handle
inode = fh
direntry = self.inode2entry(inode)
+ self.logger.debug(
+ "readdir(dirname=%s, fh=%d, offset=%d)", direntry.name, fh, offset
+ )
assert isinstance(direntry, FuseDirEntry)
next_id = offset + 1
try:
async for entry in direntry.get_entries(offset):
name = os.fsencode(entry.name)
attrs = await self.get_attrs(entry)
if not pyfuse3.readdir_reply(token, name, attrs, next_id):
break
next_id += 1
self._inode2entry[attrs.st_ino] = entry
except Exception as err:
- logging.exception("Cannot readdir: %s", err)
+ self.logger.exception("Cannot readdir: %s", err)
raise pyfuse3.FUSEError(errno.ENOENT)
async def open(
self, inode: int, _flags: int, _ctx: pyfuse3.RequestContext
) -> pyfuse3.FileInfo:
""" Open an inode and return a unique file handle """
# Re-use inode as file handle
+ self.logger.debug("open(inode=%d)", inode)
return pyfuse3.FileInfo(fh=inode, keep_cache=True)
async def read(self, fh: int, offset: int, length: int) -> bytes:
""" Read `length` bytes from file handle `fh` at position `offset` """
# open() uses inode as file handle
inode = fh
entry = self.inode2entry(inode)
+ self.logger.debug(
+ "read(name=%s, fh=%d, offset=%d, length=%d)", entry.name, fh, offset, length
+ )
assert isinstance(entry, FuseFileEntry)
+
try:
data = await entry.get_content()
return data[offset : offset + length]
except Exception as err:
- logging.exception("Cannot read: %s", err)
+ self.logger.exception("Cannot read: %s", err)
raise pyfuse3.FUSEError(errno.ENOENT)
async def lookup(
self, parent_inode: int, name: str, _ctx: pyfuse3.RequestContext
) -> pyfuse3.EntryAttributes:
""" Look up a directory entry by name and get its attributes """
name = os.fsdecode(name)
parent_entry = self.inode2entry(parent_inode)
+ self.logger.debug(
+ "lookup(parent_name=%s, parent_inode=%d, name=%s)",
+ parent_entry.name,
+ parent_inode,
+ name,
+ )
assert isinstance(parent_entry, FuseDirEntry)
+
try:
lookup_entry = await parent_entry.lookup(name)
if lookup_entry:
return await self.get_attrs(lookup_entry)
except Exception as err:
- logging.exception("Cannot lookup: %s", err)
+ self.logger.exception("Cannot lookup: %s", err)
raise pyfuse3.FUSEError(errno.ENOENT)
async def readlink(self, inode: int, _ctx: pyfuse3.RequestContext) -> bytes:
entry = self.inode2entry(inode)
+ self.logger.debug("readlink(name=%s, inode=%d)", entry.name, inode)
assert isinstance(entry, FuseSymlinkEntry)
return os.fsencode(entry.get_target())
async def main(swhids: List[SWHID], root_path: Path, conf: Dict[str, Any]) -> None:
""" swh-fuse CLI entry-point """
# Use pyfuse3 asyncio layer to match the rest of Software Heritage codebase
pyfuse3_asyncio.enable()
async with FuseCache(conf["cache"]) as cache:
fs = Fuse(root_path, cache, conf)
# Initially populate the cache
for swhid in swhids:
try:
await fs.get_metadata(swhid)
except Exception as err:
- logging.exception("Cannot prefetch object %s: %s", swhid, err)
+ fs.logger.exception("Cannot prefetch object %s: %s", swhid, err)
fuse_options = set(pyfuse3.default_options)
fuse_options.add("fsname=swhfs")
- if logging.root.level <= logging.DEBUG:
- fuse_options.add("debug")
try:
pyfuse3.init(fs, root_path, fuse_options)
await pyfuse3.main()
except Exception as err:
- logging.error("Error running FUSE: %s", err)
+ fs.logger.error("Error running FUSE: %s", err)
finally:
fs.shutdown()
pyfuse3.close(unmount=True)