diff --git a/swh/fuse/cache.py b/swh/fuse/cache.py --- a/swh/fuse/cache.py +++ b/swh/fuse/cache.py @@ -21,7 +21,7 @@ from swh.fuse.fs.mountpoint import ArchiveDir, MetaDir from swh.model.exceptions import ValidationError from swh.model.identifiers import SWHID, parse_swhid -from swh.web.client.client import typify_json +from swh.web.client.client import ORIGIN_VISIT, typify_json class FuseCache: @@ -71,6 +71,14 @@ for raw_swhid in swhids: yield parse_swhid(raw_swhid[0]) + async def get_cached_visits(self) -> AsyncGenerator[str, None]: + """ Return a list of all previously cached visit URL """ + + cursor = await self.metadata.conn.execute("select url from visits_cache") + urls = await cursor.fetchall() + for raw_url in urls: + yield raw_url[0] + class AbstractCache(ABC): """ Abstract cache implementation to share common behavior between cache @@ -104,6 +112,9 @@ await self.conn.execute( "create table if not exists metadata_cache (swhid, metadata)" ) + await self.conn.execute( + "create table if not exists visits_cache (url, metadata)" + ) await self.conn.commit() return self @@ -118,6 +129,21 @@ else: return None + async def get_visits( + self, url_encoded: str, typify: bool = True + ) -> Optional[List[Dict[str, Any]]]: + cursor = await self.conn.execute( + "select metadata from visits_cache where url=?", (url_encoded,) + ) + cache = await cursor.fetchone() + if cache: + visits = json.loads(cache[0]) + if typify: + visits = [typify_json(v, ORIGIN_VISIT) for v in visits] + return visits + else: + return None + async def set(self, swhid: SWHID, metadata: Any) -> None: await self.conn.execute( "insert into metadata_cache values (?, ?)", @@ -125,6 +151,12 @@ ) await self.conn.commit() + async def set_visits(self, url_encoded: str, visits: List[Dict[str, Any]]) -> None: + await self.conn.execute( + "insert into visits_cache values (?, ?)", (url_encoded, json.dumps(visits)), + ) + await self.conn.commit() + async def get_cached_subset(self, swhids: List[SWHID]) -> List[SWHID]: swhids_str = ",".join(f'"{x}"' for x in swhids) cursor = await self.conn.execute( diff --git a/swh/fuse/fs/artifact.py b/swh/fuse/fs/artifact.py --- a/swh/fuse/fs/artifact.py +++ b/swh/fuse/fs/artifact.py @@ -5,8 +5,10 @@ import asyncio from dataclasses import dataclass, field +import json +import logging from pathlib import Path -from typing import Any, AsyncIterator, List +from typing import Any, AsyncIterator, Dict, List import urllib.parse from swh.fuse.fs.entry import ( @@ -49,7 +51,7 @@ if self.prefetch: return self.prefetch["length"] else: - return len(await self.get_content()) + return await super().size() @dataclass @@ -261,9 +263,6 @@ fmt = f"Done: {self.done}/{self.todo}\n" return fmt.encode() - async def size(self) -> int: - return len(await self.get_content()) - async def compute_entries(self) -> AsyncIterator[FuseEntry]: history = await self.fuse.get_history(self.history_swhid) # Only check for cached revisions since fetching all of them with the @@ -456,9 +455,6 @@ async def get_content(self) -> bytes: return str.encode(self.target_type + "\n") - async def size(self) -> int: - return len(await self.get_content()) - @dataclass class Snapshot(FuseDirEntry): @@ -488,6 +484,65 @@ ) +@dataclass +class Origin(FuseDirEntry): + """ TODO """ + + DATE_FMT = "{year:04d}-{month:02d}-{day:02d}" + + async def compute_entries(self) -> AsyncIterator[FuseEntry]: + # The origin's name is always its URL (encoded to create a valid UNIX filename) + visits = await self.fuse.get_visits(self.name) + + seen_date = set() + for visit in visits: + date = visit["date"] + name = self.DATE_FMT.format(year=date.year, month=date.month, day=date.day) + + if name in seen_date: + logging.debug( + "Conflict date on origin: %s, %s", visit["origin"], str(name) + ) + else: + seen_date.add(name) + yield self.create_child( + OriginVisit, name=name, mode=int(EntryMode.RDONLY_DIR), meta=visit, + ) + + +@dataclass +class OriginVisit(FuseDirEntry): + """ TODO """ + + meta: Dict[str, Any] + + @dataclass + class MetaFile(FuseFileEntry): + """ TODO """ + + content: str + + async def get_content(self) -> bytes: + return str.encode(self.content + "\n") + + async def compute_entries(self) -> AsyncIterator[FuseEntry]: + snapshot_swhid = self.meta["snapshot"] + if snapshot_swhid: + root_path = self.get_relative_root_path() + yield self.create_child( + FuseSymlinkEntry, + name="snapshot", + target=Path(root_path, f"archive/{snapshot_swhid}"), + ) + + yield self.create_child( + OriginVisit.MetaFile, + name="meta.json", + mode=int(EntryMode.RDONLY_FILE), + content=json.dumps(self.meta, default=lambda x: str(x)), + ) + + OBJTYPE_GETTERS = { CONTENT: Content, DIRECTORY: Directory, diff --git a/swh/fuse/fs/entry.py b/swh/fuse/fs/entry.py --- a/swh/fuse/fs/entry.py +++ b/swh/fuse/fs/entry.py @@ -68,6 +68,9 @@ raise NotImplementedError + async def size(self) -> int: + return len(await self.get_content()) + class FuseDirEntry(FuseEntry): """ FUSE virtual directory entry """ diff --git a/swh/fuse/fs/mountpoint.py b/swh/fuse/fs/mountpoint.py --- a/swh/fuse/fs/mountpoint.py +++ b/swh/fuse/fs/mountpoint.py @@ -7,7 +7,7 @@ import json from typing import AsyncIterator -from swh.fuse.fs.artifact import OBJTYPE_GETTERS +from swh.fuse.fs.artifact import OBJTYPE_GETTERS, Origin from swh.fuse.fs.entry import EntryMode, FuseDirEntry, FuseEntry, FuseFileEntry from swh.model.exceptions import ValidationError from swh.model.identifiers import CONTENT, SWHID, parse_swhid @@ -24,6 +24,7 @@ async def compute_entries(self) -> AsyncIterator[FuseEntry]: yield self.create_child(ArchiveDir) yield self.create_child(MetaDir) + yield self.create_child(OriginDir) @dataclass @@ -101,3 +102,33 @@ async def size(self) -> int: return len(await self.get_content()) + + +@dataclass +class OriginDir(FuseDirEntry): + """ TODO """ + + name: str = field(init=False, default="origin") + mode: int = field(init=False, default=int(EntryMode.RDONLY_DIR)) + + def create_child(self, url_encoded: str) -> FuseEntry: + return super().create_child( + Origin, name=url_encoded, mode=int(EntryMode.RDONLY_DIR), + ) + + async def compute_entries(self) -> AsyncIterator[FuseEntry]: + async for url in self.fuse.cache.get_cached_visits(): + yield self.create_child(url) + + async def lookup(self, name: str) -> FuseEntry: + entry = await super().lookup(name) + if entry: + return entry + + # On the fly mounting of new origin url + try: + url_encoded = name + await self.fuse.get_visits(url_encoded) + return self.create_child(url_encoded) + except ValidationError: + return None diff --git a/swh/fuse/fuse.py b/swh/fuse/fuse.py --- a/swh/fuse/fuse.py +++ b/swh/fuse/fuse.py @@ -5,11 +5,13 @@ import asyncio import errno +import functools import logging import os from pathlib import Path import time from typing import Any, Dict, List +import urllib.parse import pyfuse3 import pyfuse3_asyncio @@ -136,6 +138,29 @@ # an empty list. return [] + async def get_visits(self, url_encoded: str) -> List[Dict[str, Any]]: + """ TODO """ + + cache = await self.cache.metadata.get_visits(url_encoded) + if cache: + return cache + + try: + typify = False # Get the raw JSON from the API + loop = asyncio.get_event_loop() + # Web API only takes non-encoded url + url = urllib.parse.unquote_plus(url_encoded) + visits_it = await loop.run_in_executor( + None, functools.partial(self.web_api.visits, url, typify=typify) + ) + visits = list(visits_it) + await self.cache.metadata.set_visits(url_encoded, visits) + # Retrieve it from cache so it is correctly typed + return await self.cache.metadata.get_visits(url_encoded) + except requests.HTTPError as err: + logging.error("Cannot fetch visits for object %s: %s", url_encoded, err) + raise + async def get_attrs(self, entry: FuseEntry) -> pyfuse3.EntryAttributes: """ Return entry attributes """