Changeset View
Changeset View
Standalone View
Standalone View
swh/objstorage/backends/seaweedfs/objstorage.py
- This file was moved from swh/objstorage/backends/seaweed.py.
# Copyright (C) 2019 The Software Heritage developers | # Copyright (C) 2019-2021 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
import io | import io | ||||
from itertools import islice | from itertools import islice | ||||
import logging | import logging | ||||
import os | import os | ||||
from typing import Iterator | |||||
from urllib.parse import urljoin, urlparse | |||||
import requests | |||||
from swh.model import hashutil | from swh.model import hashutil | ||||
from swh.objstorage.exc import Error, ObjNotFoundError | from swh.objstorage.exc import Error, ObjNotFoundError | ||||
from swh.objstorage.objstorage import ( | from swh.objstorage.objstorage import ( | ||||
DEFAULT_LIMIT, | DEFAULT_LIMIT, | ||||
ObjStorage, | ObjStorage, | ||||
compressors, | compressors, | ||||
compute_hash, | compute_hash, | ||||
decompressors, | decompressors, | ||||
) | ) | ||||
LOGGER = logging.getLogger(__name__) | from .http import HttpFiler | ||||
LOGGER.setLevel(logging.ERROR) | |||||
class WeedFiler(object): | |||||
"""Simple class that encapsulates access to a seaweedfs filer service. | |||||
Objects are expected to be in a single directory. | |||||
TODO: handle errors | |||||
""" | |||||
def __init__(self, url): | |||||
if not url.endswith("/"): | |||||
url = url + "/" | |||||
self.url = url | |||||
self.baseurl = urljoin(url, "/") | |||||
self.basepath = urlparse(url).path | |||||
self.session = requests.Session() | |||||
self.session.headers["Accept"] = "application/json" | |||||
self.batchsize = DEFAULT_LIMIT | |||||
def build_url(self, path): | |||||
assert path == self.basepath or path.startswith(self.basepath) | |||||
return urljoin(self.baseurl, path) | |||||
def get(self, remote_path): | |||||
url = self.build_url(remote_path) | |||||
LOGGER.debug("Get file %s", url) | |||||
resp = self.session.get(url) | |||||
resp.raise_for_status() | |||||
return resp.content | |||||
def exists(self, remote_path): | |||||
url = self.build_url(remote_path) | |||||
LOGGER.debug("Check file %s", url) | |||||
return self.session.head(url).status_code == 200 | |||||
def put(self, fp, remote_path): | |||||
url = self.build_url(remote_path) | |||||
LOGGER.debug("Put file %s", url) | |||||
return self.session.post(url, files={"file": fp}) | |||||
def delete(self, remote_path): | |||||
url = self.build_url(remote_path) | |||||
LOGGER.debug("Delete file %s", url) | |||||
return self.session.delete(url) | |||||
def iterfiles(self, last_file_name: str = "") -> Iterator[str]: | |||||
"""yield absolute file names | |||||
Args: | |||||
last_file_name: if given, starts from the file just after; must | |||||
be basename. | |||||
Yields: | |||||
absolute file names | |||||
""" | LOGGER = logging.getLogger(__name__) | ||||
for entry in self._iter_dir(last_file_name): | |||||
fullpath = entry["FullPath"] | |||||
if entry["Mode"] & 1 << 31: # it's a directory, recurse | |||||
# see https://pkg.go.dev/io/fs#FileMode | |||||
yield from self.iterfiles(fullpath) | |||||
else: | |||||
yield fullpath | |||||
def _iter_dir(self, last_file_name: str = ""): | |||||
params = {"limit": self.batchsize} | |||||
if last_file_name: | |||||
params["lastFileName"] = last_file_name | |||||
LOGGER.debug("List directory %s", self.url) | |||||
while True: | |||||
rsp = self.session.get(self.url, params=params) | |||||
if rsp.ok: | |||||
dircontent = rsp.json() | |||||
if dircontent["Entries"]: | |||||
yield from dircontent["Entries"] | |||||
if not dircontent["ShouldDisplayLoadMore"]: | |||||
break | |||||
params["lastFileName"] = dircontent["LastFileName"] | |||||
else: | |||||
LOGGER.error( | |||||
'Error listing "%s". [HTTP %d]' % (self.url, rsp.status_code) | |||||
) | |||||
break | |||||
class WeedObjStorage(ObjStorage): | class SeaweedFilerObjStorage(ObjStorage): | ||||
"""ObjStorage with seaweedfs abilities, using the Filer API. | """ObjStorage with seaweedfs abilities, using the Filer API. | ||||
https://github.com/chrislusf/seaweedfs/wiki/Filer-Server-API | https://github.com/chrislusf/seaweedfs/wiki/Filer-Server-API | ||||
""" | """ | ||||
def __init__(self, url="http://127.0.0.1:8888/swh", compression=None, **kwargs): | def __init__(self, url, compression=None, **kwargs): | ||||
super().__init__(**kwargs) | super().__init__(**kwargs) | ||||
self.wf = WeedFiler(url) | self.wf = HttpFiler(url) | ||||
self.root_path = urlparse(url).path | |||||
if not self.root_path.endswith("/"): | |||||
self.root_path += "/" | |||||
self.compression = compression | self.compression = compression | ||||
def check_config(self, *, check_write): | def check_config(self, *, check_write): | ||||
"""Check the configuration for this object storage""" | """Check the configuration for this object storage""" | ||||
# FIXME: hopefully this blew up during instantiation | # FIXME: hopefully this blew up during instantiation | ||||
return True | return True | ||||
def __contains__(self, obj_id): | def __contains__(self, obj_id): | ||||
▲ Show 20 Lines • Show All 114 Lines • Show Last 20 Lines |