Changeset View
Changeset View
Standalone View
Standalone View
swh/fuse/fs/artifact.py
# Copyright (C) 2020 The Software Heritage developers | # Copyright (C) 2020 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
import asyncio | import asyncio | ||||
from dataclasses import dataclass, field | from dataclasses import dataclass, field | ||||
from pathlib import Path | from pathlib import Path | ||||
from typing import Any, AsyncIterator, Dict, List | from typing import Any, AsyncIterator, List | ||||
import urllib.parse | import urllib.parse | ||||
from swh.fuse.fs.entry import ( | from swh.fuse.fs.entry import ( | ||||
EntryMode, | EntryMode, | ||||
FuseDirEntry, | FuseDirEntry, | ||||
FuseEntry, | FuseEntry, | ||||
FuseFileEntry, | FuseFileEntry, | ||||
FuseSymlinkEntry, | FuseSymlinkEntry, | ||||
▲ Show 20 Lines • Show All 184 Lines • ▼ Show 20 Lines | |||||
@dataclass | @dataclass | ||||
class RevisionHistory(FuseDirEntry): | class RevisionHistory(FuseDirEntry): | ||||
""" Revision virtual `history/` directory """ | """ Revision virtual `history/` directory """ | ||||
swhid: SWHID | swhid: SWHID | ||||
async def compute_entries(self) -> AsyncIterator[FuseEntry]: | async def prefill_caches(self) -> None: | ||||
history = await self.fuse.get_history(self.swhid) | history = await self.fuse.get_history(self.swhid) | ||||
for swhid in history: | |||||
await self.fuse.get_metadata(swhid) | |||||
async def compute_entries(self) -> AsyncIterator[FuseEntry]: | |||||
# Run it concurrently because of the many API calls necessary | |||||
asyncio.create_task(self.prefill_caches()) | |||||
by_date = self.create_child( | yield self.create_child( | ||||
RevisionHistoryShardByDate, | RevisionHistoryShardByDate, | ||||
name="by-date", | name="by-date", | ||||
mode=int(EntryMode.RDONLY_DIR), | mode=int(EntryMode.RDONLY_DIR), | ||||
history_swhid=self.swhid, | history_swhid=self.swhid, | ||||
) | ) | ||||
# Populate the by-date/ directory in parallel because it needs to pull | |||||
# from the Web API all history SWHIDs date metadata | |||||
asyncio.create_task(by_date.fill_metadata_cache(history)) | |||||
yield by_date | |||||
by_hash = self.create_child( | yield self.create_child( | ||||
RevisionHistoryShardByHash, | RevisionHistoryShardByHash, | ||||
name="by-hash", | name="by-hash", | ||||
mode=int(EntryMode.RDONLY_DIR), | mode=int(EntryMode.RDONLY_DIR), | ||||
history_swhid=self.swhid, | history_swhid=self.swhid, | ||||
) | ) | ||||
by_hash.fill_direntry_cache(history) | |||||
yield by_hash | |||||
by_page = self.create_child( | yield self.create_child( | ||||
RevisionHistoryShardByPage, | RevisionHistoryShardByPage, | ||||
name="by-page", | name="by-page", | ||||
mode=int(EntryMode.RDONLY_DIR), | mode=int(EntryMode.RDONLY_DIR), | ||||
history_swhid=self.swhid, | history_swhid=self.swhid, | ||||
) | ) | ||||
by_page.fill_direntry_cache(history) | |||||
yield by_page | |||||
@dataclass | @dataclass | ||||
class RevisionHistoryShardByDate(FuseDirEntry): | class RevisionHistoryShardByDate(FuseDirEntry): | ||||
""" Revision virtual `history/by-date` sharded directory """ | """ Revision virtual `history/by-date` sharded directory """ | ||||
history_swhid: SWHID | history_swhid: SWHID | ||||
prefix: str = field(default="") | prefix: str = field(default="") | ||||
is_status_done: bool = field(default=False) | is_status_done: bool = field(default=False) | ||||
DATE_FMT = "{year:04d}/{month:02d}/{day:02d}/" | |||||
@dataclass | @dataclass | ||||
class StatusFile(FuseFileEntry): | class StatusFile(FuseFileEntry): | ||||
""" Temporary file used to indicate loading progress in by-date/ """ | """ Temporary file used to indicate loading progress in by-date/ """ | ||||
name: str = field(init=False, default=".status") | name: str = field(init=False, default=".status") | ||||
mode: int = field(init=False, default=int(EntryMode.RDONLY_FILE)) | mode: int = field(init=False, default=int(EntryMode.RDONLY_FILE)) | ||||
done: int | done: int | ||||
todo: int | todo: int | ||||
async def get_content(self) -> bytes: | async def get_content(self) -> bytes: | ||||
fmt = f"Done: {self.done}/{self.todo}\n" | fmt = f"Done: {self.done}/{self.todo}\n" | ||||
return fmt.encode() | return fmt.encode() | ||||
async def size(self) -> int: | async def size(self) -> int: | ||||
return len(await self.get_content()) | return len(await self.get_content()) | ||||
async def fill_metadata_cache(self, swhids: List[SWHID]) -> None: | |||||
for swhid in swhids: | |||||
await self.fuse.get_metadata(swhid) | |||||
def get_full_sharded_name(self, meta: Dict[str, Any]) -> str: | |||||
date = meta["date"] | |||||
return f"{date.year:04d}/{date.month:02d}/{date.day:02d}" | |||||
async def compute_entries(self) -> AsyncIterator[FuseEntry]: | async def compute_entries(self) -> AsyncIterator[FuseEntry]: | ||||
history = await self.fuse.get_history(self.history_swhid) | history = await self.fuse.get_history(self.history_swhid) | ||||
self.is_status_done = True | # Only check for cached revisions since fetching all of them with the | ||||
# Web API would take too long | |||||
swhids = await self.fuse.cache.metadata.get_cached_subset(history) | |||||
depth = self.prefix.count("/") | depth = self.prefix.count("/") | ||||
subdirs = set() | root_path = self.get_relative_root_path() | ||||
nb_entries = 0 | sharded_dirs = set() | ||||
for swhid in history: | |||||
# Only check for cached revisions since fetching all of them with | |||||
# the Web API would take too long | |||||
meta = await self.fuse.cache.metadata.get(swhid) | |||||
if not meta: | |||||
self.is_status_done = False | |||||
continue | |||||
nb_entries += 1 | for swhid in swhids: | ||||
name = self.get_full_sharded_name(meta) | meta = await self.fuse.cache.metadata.get(swhid) | ||||
if not name.startswith(self.prefix): | date = meta["date"] | ||||
sharded_name = self.DATE_FMT.format( | |||||
year=date.year, month=date.month, day=date.day | |||||
) | |||||
if not sharded_name.startswith(self.prefix): | |||||
continue | continue | ||||
next_prefix = name.split("/")[depth] | if depth == 3: | ||||
if next_prefix not in subdirs: | yield self.create_child( | ||||
subdirs.add(next_prefix) | FuseSymlinkEntry, | ||||
name=str(swhid), | |||||
target=Path(root_path, f"archive/{swhid}"), | |||||
) | |||||
# Create sharded directories | |||||
else: | |||||
next_prefix = sharded_name.split("/")[depth] | |||||
if next_prefix not in sharded_dirs: | |||||
sharded_dirs.add(next_prefix) | |||||
yield self.create_child( | yield self.create_child( | ||||
RevisionHistoryShardByDate, | RevisionHistoryShardByDate, | ||||
name=next_prefix, | name=next_prefix, | ||||
mode=int(EntryMode.RDONLY_DIR), | mode=int(EntryMode.RDONLY_DIR), | ||||
prefix=f"{self.prefix}{next_prefix}/", | prefix=f"{self.prefix}{next_prefix}/", | ||||
history_swhid=self.history_swhid, | history_swhid=self.history_swhid, | ||||
) | ) | ||||
if not self.is_status_done: | self.is_status_done = len(swhids) == len(history) | ||||
if not self.is_status_done and depth == 0: | |||||
seirl: Add parentheses to be easier to understand | |||||
Done Inline Actionsblack prevents parentheses here. haltode: black prevents parentheses here. | |||||
yield self.create_child( | yield self.create_child( | ||||
RevisionHistoryShardByDate.StatusFile, | RevisionHistoryShardByDate.StatusFile, | ||||
done=nb_entries, | done=len(swhids), | ||||
todo=len(history), | todo=len(history), | ||||
) | ) | ||||
@dataclass | @dataclass | ||||
class RevisionHistoryShardByHash(FuseDirEntry): | class RevisionHistoryShardByHash(FuseDirEntry): | ||||
""" Revision virtual `history/by-hash` sharded directory """ | """ Revision virtual `history/by-hash` sharded directory """ | ||||
history_swhid: SWHID | history_swhid: SWHID | ||||
prefix: str = field(default="") | prefix: str = field(default="") | ||||
def get_full_sharded_name(self, swhid: SWHID) -> str: | SHARDING_LENGTH = 2 | ||||
sharding_depth = self.fuse.conf["sharding"]["depth"] | |||||
sharding_length = self.fuse.conf["sharding"]["length"] | async def compute_entries(self) -> AsyncIterator[FuseEntry]: | ||||
if sharding_depth <= 0: | history = await self.fuse.get_history(self.history_swhid) | ||||
return str(swhid) | |||||
else: | if self.prefix: | ||||
basename = swhid.object_id | |||||
parts = [ | |||||
basename[i * sharding_length : (i + 1) * sharding_length] | |||||
for i in range(sharding_depth) | |||||
] | |||||
# Always keep the full SWHID as the path basename (otherwise we | |||||
# loose the SWHID object type information) | |||||
parts.append(str(swhid)) | |||||
path = Path(*parts) | |||||
return str(path) | |||||
def fill_direntry_cache(self, swhids: List[SWHID]): | |||||
sharding_depth = self.fuse.conf["sharding"]["depth"] | |||||
sharding_length = self.fuse.conf["sharding"]["length"] | |||||
depth = self.prefix.count("/") | |||||
children = [] | |||||
if depth == sharding_depth: | |||||
root_path = self.get_relative_root_path() | root_path = self.get_relative_root_path() | ||||
for swhid in swhids: | for swhid in history: | ||||
children.append( | if swhid.object_id.startswith(self.prefix): | ||||
self.create_child( | yield self.create_child( | ||||
FuseSymlinkEntry, | FuseSymlinkEntry, | ||||
name=str(swhid), | name=str(swhid), | ||||
target=Path(root_path, f"archive/{swhid}"), | target=Path(root_path, f"archive/{swhid}"), | ||||
) | ) | ||||
) | # Create sharded directories | ||||
else: | else: | ||||
subdirs = {} | sharded_dirs = set() | ||||
prefix_len = len(self.prefix) | for swhid in history: | ||||
for swhid in swhids: | next_prefix = swhid.object_id[: self.SHARDING_LENGTH] | ||||
name = self.get_full_sharded_name(swhid) | if next_prefix not in sharded_dirs: | ||||
next_prefix = name[prefix_len : prefix_len + sharding_length] | sharded_dirs.add(next_prefix) | ||||
subdirs.setdefault(next_prefix, []).append(swhid) | yield self.create_child( | ||||
# Recursive intermediate sharded directories | |||||
for subdir, subentries in subdirs.items(): | |||||
child_prefix = f"{self.prefix}{subdir}/" | |||||
child = self.create_child( | |||||
RevisionHistoryShardByHash, | RevisionHistoryShardByHash, | ||||
name=subdir, | name=next_prefix, | ||||
mode=int(EntryMode.RDONLY_DIR), | mode=int(EntryMode.RDONLY_DIR), | ||||
prefix=child_prefix, | prefix=next_prefix, | ||||
history_swhid=self.history_swhid, | history_swhid=self.history_swhid, | ||||
) | ) | ||||
children.append(child) | |||||
child.fill_direntry_cache(subentries) | |||||
self.fuse.cache.direntry.set(self, children) | |||||
return children | |||||
async def compute_entries(self) -> AsyncIterator[FuseEntry]: | |||||
history = await self.fuse.get_history(self.history_swhid) | |||||
hash_prefix = self.prefix.replace("/", "") | |||||
swhids = [s for s in history if s.object_id.startswith(hash_prefix)] | |||||
for entry in self.fill_direntry_cache(swhids): | |||||
yield entry | |||||
@dataclass | @dataclass | ||||
class RevisionHistoryShardByPage(FuseDirEntry): | class RevisionHistoryShardByPage(FuseDirEntry): | ||||
""" Revision virtual `history/by-page` sharded directory """ | """ Revision virtual `history/by-page` sharded directory """ | ||||
history_swhid: SWHID | history_swhid: SWHID | ||||
Done Inline ActionsCould you use None here? seirl: Could you use `None` here? | |||||
prefix: int = field(default=None) | |||||
PAGE_SIZE = 10_000 | PAGE_SIZE = 10_000 | ||||
PAGE_FMT = "{page_number:03d}" | PAGE_FMT = "{page_number:03d}" | ||||
def fill_direntry_cache(self, swhids: List[SWHID]): | async def compute_entries(self) -> AsyncIterator[FuseEntry]: | ||||
page_number = -1 | history = await self.fuse.get_history(self.history_swhid) | ||||
page = None | |||||
page_root_path = None | |||||
page_children = [] | |||||
pages = [] | |||||
for idx, swhid in enumerate(swhids): | |||||
if idx % self.PAGE_SIZE == 0: | |||||
if page: | |||||
self.fuse.cache.direntry.set(page, page_children) | |||||
pages.append(page) | |||||
page_number += 1 | if self.prefix is not None: | ||||
page = self.create_child( | current_page = self.prefix | ||||
root_path = self.get_relative_root_path() | |||||
max_idx = min(len(history), (current_page + 1) * self.PAGE_SIZE) | |||||
for i in range(current_page * self.PAGE_SIZE, max_idx): | |||||
swhid = history[i] | |||||
yield self.create_child( | |||||
FuseSymlinkEntry, | |||||
name=str(swhid), | |||||
target=Path(root_path, f"archive/{swhid}"), | |||||
) | |||||
# Create sharded directories | |||||
else: | |||||
for i in range(0, len(history), self.PAGE_SIZE): | |||||
page_number = i // self.PAGE_SIZE | |||||
yield self.create_child( | |||||
RevisionHistoryShardByPage, | RevisionHistoryShardByPage, | ||||
name=self.PAGE_FMT.format(page_number=page_number), | name=self.PAGE_FMT.format(page_number=page_number), | ||||
mode=int(EntryMode.RDONLY_DIR), | mode=int(EntryMode.RDONLY_DIR), | ||||
history_swhid=self.history_swhid, | history_swhid=self.history_swhid, | ||||
prefix=page_number, | |||||
) | ) | ||||
page_root_path = page.get_relative_root_path() | |||||
page_children = [] | |||||
page_children.append( | |||||
page.create_child( | |||||
FuseSymlinkEntry, | |||||
name=str(swhid), | |||||
target=Path(page_root_path, f"archive/{swhid}"), | |||||
) | |||||
) | |||||
if page: | |||||
self.fuse.cache.direntry.set(page, page_children) | |||||
pages.append(page) | |||||
self.fuse.cache.direntry.set(self, pages) | |||||
return pages | |||||
async def compute_entries(self) -> AsyncIterator[FuseEntry]: | |||||
history = await self.fuse.get_history(self.history_swhid) | |||||
for entry in self.fill_direntry_cache(history): | |||||
yield entry | |||||
@dataclass | @dataclass | ||||
class Release(FuseDirEntry): | class Release(FuseDirEntry): | ||||
""" Software Heritage release artifact. | """ Software Heritage release artifact. | ||||
Attributes: | Attributes: | ||||
swhid: Software Heritage persistent identifier | swhid: Software Heritage persistent identifier | ||||
▲ Show 20 Lines • Show All 104 Lines • Show Last 20 Lines |
Add parentheses to be easier to understand