diff --git a/swh/fuse/fs/artifact.py b/swh/fuse/fs/artifact.py --- a/swh/fuse/fs/artifact.py +++ b/swh/fuse/fs/artifact.py @@ -21,6 +21,8 @@ from swh.model.from_disk import DentryPerms from swh.model.identifiers import CONTENT, DIRECTORY, RELEASE, REVISION, SNAPSHOT, SWHID +SWHID_REGEXP = r"swh:1:(cnt|dir|rel|rev|snp):[0-9a-f]{40}" + @dataclass class Content(FuseFileEntry): @@ -257,6 +259,7 @@ is_status_done: bool = field(default=False) DATE_FMT = "{year:04d}/{month:02d}/{day:02d}/" + ENTRIES_REGEXP = r"^(([0-9]{2,4})" + f"|({SWHID_REGEXP}))$" @dataclass class StatusFile(FuseFileEntry): @@ -324,6 +327,7 @@ prefix: str = field(default="") SHARDING_LENGTH = 2 + ENTRIES_REGEXP = r"^([a-f0-9]+)" + f"|({SWHID_REGEXP})$" async def compute_entries(self) -> AsyncIterator[FuseEntry]: history = await self.fuse.get_history(self.history_swhid) @@ -362,6 +366,7 @@ PAGE_SIZE = 10_000 PAGE_FMT = "{page_number:03d}" + ENTRIES_REGEXP = r"^([0-9]+)" + f"|({SWHID_REGEXP})$" async def compute_entries(self) -> AsyncIterator[FuseEntry]: history = await self.fuse.get_history(self.history_swhid) @@ -532,6 +537,7 @@ node. """ DATE_FMT = "{year:04d}-{month:02d}-{day:02d}" + ENTRIES_REGEXP = r"^[0-9]{4}-[0-9]{2}-[0-9]{2}$" async def compute_entries(self) -> AsyncIterator[FuseEntry]: # The origin's name is always its URL (encoded to create a valid UNIX filename) diff --git a/swh/fuse/fs/entry.py b/swh/fuse/fs/entry.py --- a/swh/fuse/fs/entry.py +++ b/swh/fuse/fs/entry.py @@ -8,6 +8,7 @@ from dataclasses import dataclass, field from enum import IntEnum from pathlib import Path +import re from stat import S_IFDIR, S_IFLNK, S_IFREG from typing import Any, AsyncIterator, Sequence, Union @@ -60,6 +61,7 @@ return constructor(depth=self.depth + 1, fuse=self.fuse, **kwargs) +@dataclass class FuseFileEntry(FuseEntry): """ FUSE virtual file entry """ @@ -72,12 +74,21 @@ return len(await self.get_content()) +@dataclass class FuseDirEntry(FuseEntry): """ FUSE virtual directory entry """ + ENTRIES_REGEXP: str = field(init=False, default="") + async def size(self) -> int: return 0 + def validate_entry(self, name: str) -> bool: + """ Return true if the name matches the directory entries regular + expression, and false otherwise """ + + return re.match(self.ENTRIES_REGEXP, name) + async def compute_entries(self) -> Sequence[FuseEntry]: """ Return the child entries of a directory entry """ @@ -101,6 +112,9 @@ async def lookup(self, name: str) -> FuseEntry: """ Look up a FUSE entry by name """ + if not self.validate_entry(name): + return None + async for entry in self.get_entries(): if entry.name == name: return entry diff --git a/swh/fuse/fs/mountpoint.py b/swh/fuse/fs/mountpoint.py --- a/swh/fuse/fs/mountpoint.py +++ b/swh/fuse/fs/mountpoint.py @@ -7,7 +7,7 @@ import json from typing import AsyncIterator -from swh.fuse.fs.artifact import OBJTYPE_GETTERS, Origin +from swh.fuse.fs.artifact import OBJTYPE_GETTERS, SWHID_REGEXP, Origin from swh.fuse.fs.entry import EntryMode, FuseDirEntry, FuseEntry, FuseFileEntry from swh.model.exceptions import ValidationError from swh.model.identifiers import CONTENT, SWHID, parse_swhid @@ -38,6 +38,7 @@ name: str = field(init=False, default="archive") mode: int = field(init=False, default=int(EntryMode.RDONLY_DIR)) + ENTRIES_REGEXP = f"^({SWHID_REGEXP})(.json)?$" JSON_SUFFIX = ".json" async def compute_entries(self) -> AsyncIterator[FuseEntry]: @@ -45,6 +46,9 @@ yield async def lookup(self, name: str) -> FuseEntry: + if not self.validate_entry(name): + return None + # On the fly mounting of a new artifact try: if name.endswith(self.JSON_SUFFIX): @@ -100,6 +104,8 @@ name: str = field(init=False, default="origin") mode: int = field(init=False, default=int(EntryMode.RDONLY_DIR)) + ENTRIES_REGEXP = r"^.*%3A.*$" # %3A is the encoded version of ':' + def create_child(self, url_encoded: str) -> FuseEntry: return super().create_child( Origin, name=url_encoded, mode=int(EntryMode.RDONLY_DIR), @@ -110,6 +116,9 @@ yield self.create_child(url) async def lookup(self, name: str) -> FuseEntry: + if not self.validate_entry(name): + return None + entry = await super().lookup(name) if entry: return entry