Changeset View
Changeset View
Standalone View
Standalone View
swh/fuse/fs/artifact.py
Show All 14 Lines | |||||
from swh.fuse.fs.entry import ( | from swh.fuse.fs.entry import ( | ||||
EntryMode, | EntryMode, | ||||
FuseDirEntry, | FuseDirEntry, | ||||
FuseEntry, | FuseEntry, | ||||
FuseFileEntry, | FuseFileEntry, | ||||
FuseSymlinkEntry, | FuseSymlinkEntry, | ||||
) | ) | ||||
from swh.model.from_disk import DentryPerms | from swh.model.from_disk import DentryPerms | ||||
from swh.model.identifiers import CONTENT, DIRECTORY, RELEASE, REVISION, SNAPSHOT, SWHID | from swh.model.identifiers import CONTENT, DIRECTORY, RELEASE, REVISION, SNAPSHOT, SWHID, parse_swhid | ||||
SWHID_REGEXP = r"swh:1:(cnt|dir|rel|rev|snp):[0-9a-f]{40}" | SWHID_REGEXP = r"swh:1:(cnt|dir|rel|rev|snp):[0-9a-f]{40}" | ||||
@dataclass | @dataclass | ||||
class Content(FuseFileEntry): | class Content(FuseFileEntry): | ||||
""" Software Heritage content artifact. | """ Software Heritage content artifact. | ||||
▲ Show 20 Lines • Show All 183 Lines • ▼ Show 20 Lines | |||||
@dataclass | @dataclass | ||||
class RevisionHistory(FuseDirEntry): | class RevisionHistory(FuseDirEntry): | ||||
""" Revision virtual `history/` directory """ | """ Revision virtual `history/` directory """ | ||||
swhid: SWHID | swhid: SWHID | ||||
async def prefill_by_date_cache(self, by_date_dir: FuseDirEntry) -> None: | async def prefill_metadata_cache(self, by_date_dir, swhids) -> None: | ||||
history = await self.fuse.get_history(self.swhid) | |||||
nb_api_calls = 0 | nb_api_calls = 0 | ||||
for swhid in history: | for swhid in swhids: | ||||
cache = await self.fuse.cache.metadata.get(swhid) | cache = await self.fuse.cache.metadata.get(swhid) | ||||
if cache: | if cache: | ||||
continue | continue | ||||
await self.fuse.get_metadata(swhid) | await self.fuse.get_metadata(swhid) | ||||
# The by-date/ directory is cached temporarily in direntry, and | # The by-date/ directory is cached temporarily in direntry, and | ||||
# invalidated + updated every 100 API calls | # invalidated + updated every 100 API calls | ||||
nb_api_calls += 1 | nb_api_calls += 1 | ||||
if nb_api_calls % 100 == 0: | if nb_api_calls % 100 == 0: | ||||
self.fuse.cache.direntry.invalidate(by_date_dir) | self.fuse.cache.direntry.invalidate(by_date_dir) | ||||
# Make sure to have the latest entries once the prefilling is done | |||||
self.fuse.cache.direntry.invalidate(by_date_dir) | async def prefill_caches(self, by_date_dir) -> None: | ||||
import requests | |||||
import functools | |||||
try: | |||||
# Use the swh-graph API to retrieve the full history very fast | |||||
self.fuse.logger.debug("Retrieving history of %s via graph API...", self.swhid) | |||||
call = f"graph/visit/edges/{self.swhid}?edges=rev:rev" | |||||
loop = asyncio.get_event_loop() | |||||
resp = await loop.run_in_executor( | |||||
None, functools.partial(self.fuse.web_api._call, call, stream=True) | |||||
) | |||||
# TODO: ensure chunk_size | |||||
for history in resp.iter_content(chunk_size=(50 * 2 + 2) * 500): | |||||
history = history.decode().strip() | |||||
edges = [edge.split(" ") for edge in history.split("\n")] | |||||
await self.fuse.cache.history.set(edges) | |||||
swhids = set() | |||||
for edge in edges: | |||||
if not edge: | |||||
continue | |||||
for swhid in edge: | |||||
swhids.add(parse_swhid(swhid)) | |||||
print("Start prefill", len(swhids)) | |||||
asyncio.create_task(self.prefill_metadata_cache(by_date_dir, swhids)) | |||||
except requests.HTTPError as err: | |||||
self.logger.error("Cannot fetch history for object %s: %s", swhid, err) | |||||
# Ignore exception since swh-graph does not necessarily contain the | |||||
# most recent artifacts from the archive. Computing the full history | |||||
# from the Web API is too computationally intensive so simply return | |||||
# an empty list. | |||||
except Exception as e: | |||||
print("prefill_caches excp", e) | |||||
# async def fill_by_date_cache(self, by_date_dir: FuseDirEntry) -> None: | |||||
# history = await self.fuse.get_history(self.swhid) | |||||
# nb_api_calls = 0 | |||||
# for swhid in history: | |||||
# cache = await self.fuse.cache.metadata.get(swhid) | |||||
# if cache: | |||||
# continue | |||||
# await self.fuse.get_metadata(swhid) | |||||
# # The by-date/ directory is cached temporarily in direntry, and | |||||
# # invalidated + updated every 100 API calls | |||||
# nb_api_calls += 1 | |||||
# if nb_api_calls % 100 == 0: | |||||
# self.fuse.cache.direntry.invalidate(by_date_dir) | |||||
# # Make sure to have the latest entries once the prefilling is done | |||||
# self.fuse.cache.direntry.invalidate(by_date_dir) | |||||
async def compute_entries(self) -> AsyncIterator[FuseEntry]: | async def compute_entries(self) -> AsyncIterator[FuseEntry]: | ||||
by_date_dir = self.create_child( | by_date_dir = self.create_child( | ||||
RevisionHistoryShardByDate, | RevisionHistoryShardByDate, | ||||
name="by-date", | name="by-date", | ||||
mode=int(EntryMode.RDONLY_DIR), | mode=int(EntryMode.RDONLY_DIR), | ||||
history_swhid=self.swhid, | history_swhid=self.swhid, | ||||
) | ) | ||||
# Run it concurrently because of the many API calls necessary | # Run it concurrently because of the many API calls necessary | ||||
asyncio.create_task(self.prefill_by_date_cache(by_date_dir)) | asyncio.create_task(self.prefill_caches(by_date_dir)) | ||||
yield by_date_dir | yield by_date_dir | ||||
yield self.create_child( | yield self.create_child( | ||||
RevisionHistoryShardByHash, | RevisionHistoryShardByHash, | ||||
name="by-hash", | name="by-hash", | ||||
mode=int(EntryMode.RDONLY_DIR), | mode=int(EntryMode.RDONLY_DIR), | ||||
history_swhid=self.swhid, | history_swhid=self.swhid, | ||||
Show All 28 Lines | class StatusFile(FuseFileEntry): | ||||
def __post_init__(self): | def __post_init__(self): | ||||
super().__post_init__() | super().__post_init__() | ||||
# This is the only case where we do not want the kernel to cache the file | # This is the only case where we do not want the kernel to cache the file | ||||
self.file_info_attrs["keep_cache"] = False | self.file_info_attrs["keep_cache"] = False | ||||
self.file_info_attrs["direct_io"] = True | self.file_info_attrs["direct_io"] = True | ||||
async def get_content(self) -> bytes: | async def get_content(self) -> bytes: | ||||
history_full = await self.fuse.get_history(self.history_swhid) | # history_full = await self.fuse.get_history(self.history_swhid) | ||||
history_full = await self.fuse.cache.history.get(self.history_swhid) | |||||
history_cached = await self.fuse.cache.history.get_with_date_prefix( | history_cached = await self.fuse.cache.history.get_with_date_prefix( | ||||
self.history_swhid, date_prefix="" | self.history_swhid, date_prefix="" | ||||
) | ) | ||||
fmt = f"Done: {len(history_cached)}/{len(history_full)}\n" | fmt = f"Done: {len(history_cached)}/{len(history_full)}\n" | ||||
return fmt.encode() | return fmt.encode() | ||||
def __post_init__(self): | def __post_init__(self): | ||||
super().__post_init__() | super().__post_init__() | ||||
# Create the status file only once so we can easily remove it when the | # Create the status file only once so we can easily remove it when the | ||||
# entire history is fetched | # entire history is fetched | ||||
self.status_file = self.create_child( | self.status_file = self.create_child( | ||||
RevisionHistoryShardByDate.StatusFile, history_swhid=self.history_swhid | RevisionHistoryShardByDate.StatusFile, history_swhid=self.history_swhid | ||||
) | ) | ||||
async def compute_entries(self) -> AsyncIterator[FuseEntry]: | async def compute_entries(self) -> AsyncIterator[FuseEntry]: | ||||
history_full = await self.fuse.get_history(self.history_swhid) | # history_full = await self.fuse.get_history(self.history_swhid) | ||||
history_full = await self.fuse.cache.history.get(self.history_swhid) | |||||
# Only check for cached revisions with the appropriate prefix, since | # Only check for cached revisions with the appropriate prefix, since | ||||
# fetching all of them with the Web API would take too long | # fetching all of them with the Web API would take too long | ||||
history_cached = await self.fuse.cache.history.get_with_date_prefix( | history_cached = await self.fuse.cache.history.get_with_date_prefix( | ||||
self.history_swhid, date_prefix=self.prefix | self.history_swhid, date_prefix=self.prefix | ||||
) | ) | ||||
depth = self.prefix.count("/") | depth = self.prefix.count("/") | ||||
root_path = self.get_relative_root_path() | root_path = self.get_relative_root_path() | ||||
Show All 35 Lines | class RevisionHistoryShardByHash(FuseDirEntry): | ||||
history_swhid: SWHID | history_swhid: SWHID | ||||
prefix: str = field(default="") | prefix: str = field(default="") | ||||
SHARDING_LENGTH = 2 | SHARDING_LENGTH = 2 | ||||
ENTRIES_REGEXP = re.compile(r"^([a-f0-9]+)|(" + SWHID_REGEXP + ")$") | ENTRIES_REGEXP = re.compile(r"^([a-f0-9]+)|(" + SWHID_REGEXP + ")$") | ||||
async def compute_entries(self) -> AsyncIterator[FuseEntry]: | async def compute_entries(self) -> AsyncIterator[FuseEntry]: | ||||
history = await self.fuse.get_history(self.history_swhid) | # history = await self.fuse.get_history(self.history_swhid) | ||||
history = await self.fuse.cache.history.get(self.history_swhid) | |||||
if self.prefix: | if self.prefix: | ||||
root_path = self.get_relative_root_path() | root_path = self.get_relative_root_path() | ||||
for swhid in history: | for swhid in history: | ||||
if swhid.object_id.startswith(self.prefix): | if swhid.object_id.startswith(self.prefix): | ||||
yield self.create_child( | yield self.create_child( | ||||
FuseSymlinkEntry, | FuseSymlinkEntry, | ||||
name=str(swhid), | name=str(swhid), | ||||
Show All 22 Lines | class RevisionHistoryShardByPage(FuseDirEntry): | ||||
history_swhid: SWHID | history_swhid: SWHID | ||||
prefix: int = field(default=None) | prefix: int = field(default=None) | ||||
PAGE_SIZE = 10_000 | PAGE_SIZE = 10_000 | ||||
PAGE_FMT = "{page_number:03d}" | PAGE_FMT = "{page_number:03d}" | ||||
ENTRIES_REGEXP = re.compile(r"^([0-9]+)|(" + SWHID_REGEXP + ")$") | ENTRIES_REGEXP = re.compile(r"^([0-9]+)|(" + SWHID_REGEXP + ")$") | ||||
async def compute_entries(self) -> AsyncIterator[FuseEntry]: | async def compute_entries(self) -> AsyncIterator[FuseEntry]: | ||||
history = await self.fuse.get_history(self.history_swhid) | # history = await self.fuse.get_history(self.history_swhid) | ||||
history = await self.fuse.cache.history.get(self.history_swhid) | |||||
if self.prefix is not None: | if self.prefix is not None: | ||||
current_page = self.prefix | current_page = self.prefix | ||||
root_path = self.get_relative_root_path() | root_path = self.get_relative_root_path() | ||||
max_idx = min(len(history), (current_page + 1) * self.PAGE_SIZE) | max_idx = min(len(history), (current_page + 1) * self.PAGE_SIZE) | ||||
for i in range(current_page * self.PAGE_SIZE, max_idx): | for i in range(current_page * self.PAGE_SIZE, max_idx): | ||||
swhid = history[i] | swhid = history[i] | ||||
yield self.create_child( | yield self.create_child( | ||||
▲ Show 20 Lines • Show All 223 Lines • Show Last 20 Lines |