Changeset View
Changeset View
Standalone View
Standalone View
swh/fuse/fuse.py
Show First 20 Lines • Show All 113 Lines • ▼ Show 20 Lines | async def get_attrs(self, entry: FuseEntry) -> pyfuse3.EntryAttributes: | ||||
attrs.st_size = 0 | attrs.st_size = 0 | ||||
attrs.st_atime_ns = self.time_ns | attrs.st_atime_ns = self.time_ns | ||||
attrs.st_ctime_ns = self.time_ns | attrs.st_ctime_ns = self.time_ns | ||||
attrs.st_mtime_ns = self.time_ns | attrs.st_mtime_ns = self.time_ns | ||||
attrs.st_gid = self.gid | attrs.st_gid = self.gid | ||||
attrs.st_uid = self.uid | attrs.st_uid = self.uid | ||||
attrs.st_ino = entry.inode | attrs.st_ino = entry.inode | ||||
attrs.st_mode = entry.mode | attrs.st_mode = entry.mode | ||||
attrs.st_size = await entry.length() | attrs.st_size = await entry.size() | ||||
return attrs | return attrs | ||||
async def getattr( | async def getattr( | ||||
self, inode: int, _ctx: pyfuse3.RequestContext | self, inode: int, _ctx: pyfuse3.RequestContext | ||||
) -> pyfuse3.EntryAttributes: | ) -> pyfuse3.EntryAttributes: | ||||
""" Get attributes for a given inode """ | """ Get attributes for a given inode """ | ||||
entry = self.inode2entry(inode) | entry = self.inode2entry(inode) | ||||
Show All 38 Lines | class Fuse(pyfuse3.Operations): | ||||
async def read(self, fh: int, offset: int, length: int) -> bytes: | async def read(self, fh: int, offset: int, length: int) -> bytes: | ||||
""" Read `length` bytes from file handle `fh` at position `offset` """ | """ Read `length` bytes from file handle `fh` at position `offset` """ | ||||
# open() uses inode as file handle | # open() uses inode as file handle | ||||
inode = fh | inode = fh | ||||
entry = self.inode2entry(inode) | entry = self.inode2entry(inode) | ||||
data = await entry.content() | data = await entry.get_content() | ||||
return data[offset : offset + length] | return data[offset : offset + length] | ||||
async def lookup( | async def lookup( | ||||
self, parent_inode: int, name: str, _ctx: pyfuse3.RequestContext | self, parent_inode: int, name: str, _ctx: pyfuse3.RequestContext | ||||
) -> pyfuse3.EntryAttributes: | ) -> pyfuse3.EntryAttributes: | ||||
""" Look up a directory entry by name and get its attributes """ | """ Look up a directory entry by name and get its attributes """ | ||||
name = os.fsdecode(name) | name = os.fsdecode(name) | ||||
parent_entry = self.inode2entry(parent_inode) | parent_entry = self.inode2entry(parent_inode) | ||||
async for entry in parent_entry: | async for entry in parent_entry: | ||||
if name == entry.name: | if name == entry.name: | ||||
attr = await self.get_attrs(entry) | attr = await self.get_attrs(entry) | ||||
return attr | return attr | ||||
logging.error(f"Unknown name during lookup: '{name}'") | logging.error(f"Unknown name during lookup: '{name}'") | ||||
raise pyfuse3.FUSEError(errno.ENOENT) | raise pyfuse3.FUSEError(errno.ENOENT) | ||||
async def readlink(self, inode: int, _ctx: pyfuse3.RequestContext) -> bytes: | |||||
entry = self.inode2entry(inode) | |||||
haltode: Not sure if we want to use directly the `target` field here or some getter/setter, because… | |||||
return os.fsencode(entry.get_target()) | |||||
async def main(swhids: List[SWHID], root_path: Path, conf: Dict[str, Any]) -> None: | async def main(swhids: List[SWHID], root_path: Path, conf: Dict[str, Any]) -> None: | ||||
""" swh-fuse CLI entry-point """ | """ swh-fuse CLI entry-point """ | ||||
# Use pyfuse3 asyncio layer to match the rest of Software Heritage codebase | # Use pyfuse3 asyncio layer to match the rest of Software Heritage codebase | ||||
pyfuse3_asyncio.enable() | pyfuse3_asyncio.enable() | ||||
async with FuseCache(conf["cache"]) as cache: | async with FuseCache(conf["cache"]) as cache: | ||||
Show All 16 Lines |
Not sure if we want to use directly the target field here or some getter/setter, because FuseEntry does not have such field (so mypy complains).