Changeset View
Changeset View
Standalone View
Standalone View
swh/fuse/fs/artifact.py
# Copyright (C) 2020 The Software Heritage developers | # Copyright (C) 2020 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
import asyncio | |||||
from dataclasses import dataclass, field | from dataclasses import dataclass, field | ||||
from pathlib import Path | from pathlib import Path | ||||
from typing import Any, AsyncIterator, List | from typing import Any, AsyncIterator, List | ||||
import urllib.parse | import urllib.parse | ||||
from swh.fuse.fs.entry import ( | from swh.fuse.fs.entry import ( | ||||
EntryMode, | EntryMode, | ||||
FuseDirEntry, | FuseDirEntry, | ||||
▲ Show 20 Lines • Show All 190 Lines • ▼ Show 20 Lines | |||||
class RevisionHistory(FuseDirEntry): | class RevisionHistory(FuseDirEntry): | ||||
""" Revision virtual `history/` directory """ | """ Revision virtual `history/` directory """ | ||||
swhid: SWHID | swhid: SWHID | ||||
async def compute_entries(self) -> AsyncIterator[FuseEntry]: | async def compute_entries(self) -> AsyncIterator[FuseEntry]: | ||||
history = await self.fuse.get_history(self.swhid) | history = await self.fuse.get_history(self.swhid) | ||||
by_date = self.create_child( | |||||
RevisionHistoryShardByDate, | |||||
name="by-date", | |||||
mode=int(EntryMode.RDONLY_DIR), | |||||
history_swhid=self.swhid, | |||||
) | |||||
# Populate the by-date/ directory in another thread because it needs to | |||||
# pull from the Web API all history SWHIDs date metadata | |||||
loop = asyncio.get_event_loop() | |||||
asyncio.run_coroutine_threadsafe(by_date.fill_direntry_cache(history), loop) | |||||
yield by_date | |||||
haltode: Hm, i realize now that starting the new thread here might mean multiple concurrent access to… | |||||
by_hash = self.create_child( | by_hash = self.create_child( | ||||
RevisionHistoryShardByHash, | RevisionHistoryShardByHash, | ||||
name="by-hash", | name="by-hash", | ||||
mode=int(EntryMode.RDONLY_DIR), | mode=int(EntryMode.RDONLY_DIR), | ||||
history_swhid=self.swhid, | history_swhid=self.swhid, | ||||
) | ) | ||||
by_hash.fill_direntry_cache(history) | by_hash.fill_direntry_cache(history) | ||||
yield by_hash | yield by_hash | ||||
by_page = self.create_child( | by_page = self.create_child( | ||||
RevisionHistoryShardByPage, | RevisionHistoryShardByPage, | ||||
name="by-page", | name="by-page", | ||||
mode=int(EntryMode.RDONLY_DIR), | mode=int(EntryMode.RDONLY_DIR), | ||||
history_swhid=self.swhid, | history_swhid=self.swhid, | ||||
) | ) | ||||
by_page.fill_direntry_cache(history) | by_page.fill_direntry_cache(history) | ||||
yield by_page | yield by_page | ||||
@dataclass | @dataclass | ||||
class RevisionHistoryShardByDate(FuseDirEntry): | |||||
""" Revision virtual `history/by-date` sharded directory """ | |||||
history_swhid: SWHID | |||||
prefix: str = field(default="") | |||||
@dataclass | |||||
class StatusFile(FuseFileEntry): | |||||
""" Temporary file used to indicate loading progress in by-date/ """ | |||||
name: str = field(init=False, default=".status") | |||||
mode: int = field(init=False, default=int(EntryMode.RDONLY_FILE)) | |||||
done: int = field(init=False, default=0) | |||||
todo: int = field(default=0) | |||||
async def get_content(self) -> bytes: | |||||
fmt = f"Remaining: {self.done}/{self.todo}\n" | |||||
return fmt.encode() | |||||
async def size(self) -> int: | |||||
return len(await self.get_content()) | |||||
async def fill_direntry_cache(self, swhids: List[SWHID]) -> None: | |||||
depth = self.prefix.count("/") | |||||
children = [] | |||||
has_status_file = not self.prefix and depth == 0 | |||||
if depth == 3: | |||||
root_path = self.get_relative_root_path() | |||||
for swhid in swhids: | |||||
children.append( | |||||
self.create_child( | |||||
FuseSymlinkEntry, | |||||
name=str(swhid), | |||||
target=Path(root_path, f"archive/{swhid}"), | |||||
) | |||||
) | |||||
else: | |||||
if has_status_file: | |||||
children.append( | |||||
self.create_child( | |||||
RevisionHistoryShardByDate.StatusFile, todo=len(swhids), | |||||
) | |||||
) | |||||
subdirs = {} | |||||
prefix2child = {} | |||||
for swhid in swhids: | |||||
meta = await self.fuse.get_metadata(swhid) | |||||
if depth == 0: | |||||
next_prefix = f"{meta['date'].year:04d}" | |||||
elif depth == 1: | |||||
next_prefix = f"{meta['date'].month:02d}" | |||||
elif depth == 2: | |||||
next_prefix = f"{meta['date'].day:02d}" | |||||
if next_prefix not in subdirs: | |||||
child_prefix = f"{self.prefix}{next_prefix}/" | |||||
child = self.create_child( | |||||
RevisionHistoryShardByDate, | |||||
name=next_prefix, | |||||
mode=int(EntryMode.RDONLY_DIR), | |||||
prefix=child_prefix, | |||||
history_swhid=self.history_swhid, | |||||
) | |||||
children.append(child) | |||||
prefix2child[next_prefix] = child | |||||
# Update cache at every new entry to populate it as soon as possible | |||||
self.fuse.cache.direntry.set(self, children) | |||||
subdirs.setdefault(next_prefix, []).append(swhid) | |||||
if has_status_file: | |||||
children[0].done += 1 | |||||
for subdir, subentries in subdirs.items(): | |||||
child = prefix2child[subdir] | |||||
await child.fill_direntry_cache(subentries) | |||||
# Remove temporary status file when finished | |||||
if has_status_file: | |||||
children.pop(0) | |||||
self.fuse.cache.direntry.set(self, children) | |||||
async def compute_entries(self) -> AsyncIterator[FuseEntry]: | |||||
# TODO: restart the thread when not in cache? | |||||
for entry in self.fuse.cache.direntry.get(self) or []: | |||||
yield entry | |||||
@dataclass | |||||
class RevisionHistoryShardByHash(FuseDirEntry): | class RevisionHistoryShardByHash(FuseDirEntry): | ||||
""" Revision virtual `history/by-hash` sharded directory """ | """ Revision virtual `history/by-hash` sharded directory """ | ||||
history_swhid: SWHID | history_swhid: SWHID | ||||
prefix: str = field(default="") | prefix: str = field(default="") | ||||
def get_full_sharded_name(self, swhid: SWHID) -> str: | def get_full_sharded_name(self, swhid: SWHID) -> str: | ||||
sharding_depth = self.fuse.conf["sharding"]["depth"] | sharding_depth = self.fuse.conf["sharding"]["depth"] | ||||
▲ Show 20 Lines • Show All 223 Lines • Show Last 20 Lines |
Hm, i realize now that starting the new thread here might mean multiple concurrent access to the direntry cache since we are modifying it as well a few lines down the code. So move this after by_hash and by_page? Also, put all the direntry cache filling in the separate cache?