Changeset View
Changeset View
Standalone View
Standalone View
swh/objstorage/backends/seaweed.py
# Copyright (C) 2019 The Software Heritage developers | # Copyright (C) 2019 The Software Heritage developers | |||||||||||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | |||||||||||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | |||||||||||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | |||||||||||||
import io | import io | |||||||||||||
from itertools import islice | ||||||||||||||
import logging | import logging | |||||||||||||
from typing import Iterator, Optional | ||||||||||||||
from urllib.parse import urljoin, urlparse | from urllib.parse import urljoin, urlparse | |||||||||||||
import requests | import requests | |||||||||||||
from swh.model import hashutil | from swh.model import hashutil | |||||||||||||
from swh.objstorage.backends.pathslicing import PathSlicer | ||||||||||||||
from swh.objstorage.exc import Error, ObjNotFoundError | from swh.objstorage.exc import Error, ObjNotFoundError | |||||||||||||
from swh.objstorage.objstorage import ( | from swh.objstorage.objstorage import ( | |||||||||||||
DEFAULT_LIMIT, | DEFAULT_LIMIT, | |||||||||||||
ObjStorage, | ObjStorage, | |||||||||||||
compressors, | compressors, | |||||||||||||
compute_hash, | compute_hash, | |||||||||||||
decompressors, | decompressors, | |||||||||||||
) | ) | |||||||||||||
LOGGER = logging.getLogger(__name__) | LOGGER = logging.getLogger(__name__) | |||||||||||||
LOGGER.setLevel(logging.ERROR) | LOGGER.setLevel(logging.ERROR) | |||||||||||||
ardumont: unrelated to this diff but still, how one can actually use debug messages with this hard-coded… | ||||||||||||||
Done Inline Actionsgood catch, will explore this later on douardda: good catch, will explore this later on | ||||||||||||||
class WeedFiler(object): | class WeedFiler(object): | |||||||||||||
"""Simple class that encapsulates access to a seaweedfs filer service. | """Simple class that encapsulates access to a seaweedfs filer service. | |||||||||||||
TODO: handle errors | TODO: handle errors | |||||||||||||
""" | """ | |||||||||||||
def __init__(self, url): | def __init__(self, url): | |||||||||||||
if url.endswith("/"): | ||||||||||||||
Not Done Inline Actions
That's how it's done in other modules (it does nothing if that won't end with / iirc). On the other hand, this will strip spurious "/" [7] [8] so don't know if that's what we want... $ ipython Python 3.9.2 (default, Feb 28 2021, 17:03:44) Type 'copyright', 'credits' or 'license' for more information IPython 7.27.0 -- An enhanced Interactive Python. Type '?' for help. In [1]: url = "blah" In [2]: url.rstrip("/") Out[2]: 'blah' In [3]: url = "blah/" In [4]: url.rstrip("/") Out[4]: 'blah' In [5]: url.rstrip("//") Out[5]: 'blah' In [6]: url.rstrip("/") Out[6]: 'blah' In [7]: url = "blah//" In [8]: url.rstrip("/") Out[8]: 'blah' ardumont: That's how it's done in other modules (it does nothing if that won't end with / iirc).
On the… | ||||||||||||||
Done Inline Actionsah yes sure, thx douardda: ah yes sure, thx | ||||||||||||||
url = url[:-1] | ||||||||||||||
self.url = url | self.url = url | |||||||||||||
self.baseurl = urljoin(url, "/") | ||||||||||||||
self.basepath = urlparse(url).path | ||||||||||||||
self.session = requests.Session() | ||||||||||||||
self.session.headers["Accept"] = "application/json" | ||||||||||||||
self.batchsize = DEFAULT_LIMIT | ||||||||||||||
def build_url(self, path): | ||||||||||||||
assert path == self.basepath or path.startswith(self.basepath) | ||||||||||||||
return urljoin(self.baseurl, path) | ||||||||||||||
def get(self, remote_path): | def get(self, remote_path): | |||||||||||||
url = urljoin(self.url, remote_path) | url = self.build_url(remote_path) | |||||||||||||
LOGGER.debug("Get file %s", url) | LOGGER.debug("Get file %s", url) | |||||||||||||
return requests.get(url).content | resp = self.session.get(url) | |||||||||||||
resp.raise_for_status() | ||||||||||||||
return resp.content | ||||||||||||||
def exists(self, remote_path): | def exists(self, remote_path): | |||||||||||||
url = urljoin(self.url, remote_path) | url = self.build_url(remote_path) | |||||||||||||
LOGGER.debug("Check file %s", url) | LOGGER.debug("Check file %s", url) | |||||||||||||
return requests.head(url).status_code == 200 | return self.session.head(url).status_code == 200 | |||||||||||||
def put(self, fp, remote_path): | def put(self, fp, remote_path): | |||||||||||||
url = urljoin(self.url, remote_path) | url = self.build_url(remote_path) | |||||||||||||
LOGGER.debug("Put file %s", url) | LOGGER.debug("Put file %s", url) | |||||||||||||
return requests.post(url, files={"file": fp}) | return self.session.post(url, files={"file": fp}) | |||||||||||||
def delete(self, remote_path): | def delete(self, remote_path): | |||||||||||||
url = urljoin(self.url, remote_path) | url = self.build_url(remote_path) | |||||||||||||
LOGGER.debug("Delete file %s", url) | LOGGER.debug("Delete file %s", url) | |||||||||||||
return requests.delete(url) | return self.session.delete(url) | |||||||||||||
def list(self, dir, last_file_name=None, limit=DEFAULT_LIMIT): | def iterfiles( | |||||||||||||
"""list sub folders and files of @dir. show a better look if you turn on | self, dir: str, last_file_name: Optional[str] = None | |||||||||||||
) -> Iterator[str]: | ||||||||||||||
"""Recursively yield absolute file names | ||||||||||||||
Args: | ||||||||||||||
dir (str): retrieve file names starting from this directory; must | ||||||||||||||
be an absolute path. | ||||||||||||||
last_file_name (str): if given, starts from the file just after; must | ||||||||||||||
Not Done Inline Actions
you already typed it (and it was a bit out of sync with the type for the 2nd param) ardumont: you already typed it (and it was a bit out of sync with the type for the 2nd param) | ||||||||||||||
Done Inline Actionsyep, let me fix this douardda: yep, let me fix this | ||||||||||||||
be basename. | ||||||||||||||
returns a dict of "sub-folders and files" | Yields: | |||||||||||||
absolute file names | ||||||||||||||
""" | """ | |||||||||||||
d = dir if dir.endswith("/") else (dir + "/") | if dir.endswith("/"): | |||||||||||||
url = urljoin(self.url, d) | dir = dir[:-1] | |||||||||||||
headers = {"Accept": "application/json"} | ||||||||||||||
params = {"limit": limit} | # first, generates files going "down" | |||||||||||||
yield from self._iter_files(dir, last_file_name) | ||||||||||||||
# then, continue iterate going up the tree | ||||||||||||||
while True: | ||||||||||||||
dir, last = dir.rsplit("/", 1) | ||||||||||||||
if not (dir + "/").startswith(self.basepath): | ||||||||||||||
# we are done | ||||||||||||||
break | ||||||||||||||
yield from self._iter_files(dir, last_file_name=last) | ||||||||||||||
def _iter_files( | ||||||||||||||
self, dir: str, last_file_name: Optional[str] = None | ||||||||||||||
) -> Iterator[str]: | ||||||||||||||
for entry in self._iter_one_dir(dir, last_file_name): | ||||||||||||||
fullpath = entry["FullPath"] | ||||||||||||||
Not Done Inline Actionsi'll believe you on that one... ardumont: i'll believe you on that one... | ||||||||||||||
Done Inline Actionsit's not very well documented in seaweedfs Filer API doc, but it actually comes from go FileMode stuff: https://pkg.go.dev/io/fs#FileMode douardda: it's not very well documented in seaweedfs Filer API doc, but it actually comes from go… | ||||||||||||||
if entry["Mode"] & 1 << 31: # it's a directory, recurse | ||||||||||||||
yield from self._iter_files(fullpath) | ||||||||||||||
else: | ||||||||||||||
yield fullpath | ||||||||||||||
def _iter_one_dir(self, remote_path, last_file_name=None): | ||||||||||||||
url = self.build_url(remote_path) | ||||||||||||||
params = {"limit": self.batchsize} | ||||||||||||||
if last_file_name: | if last_file_name: | |||||||||||||
params["lastFileName"] = last_file_name | params["lastFileName"] = last_file_name | |||||||||||||
LOGGER.debug("List directory %s", url) | LOGGER.debug("List directory %s", url) | |||||||||||||
rsp = requests.get(url, params=params, headers=headers) | while True: | |||||||||||||
rsp = self.session.get(url, params=params) | ||||||||||||||
if rsp.ok: | if rsp.ok: | |||||||||||||
return rsp.json() | dircontent = rsp.json() | |||||||||||||
if dircontent["Entries"]: | ||||||||||||||
yield from dircontent["Entries"] | ||||||||||||||
if not dircontent["ShouldDisplayLoadMore"]: | ||||||||||||||
break | ||||||||||||||
params["lastFileName"] = dircontent["LastFileName"] | ||||||||||||||
else: | else: | |||||||||||||
LOGGER.error('Error listing "%s". [HTTP %d]' % (url, rsp.status_code)) | LOGGER.error('Error listing "%s". [HTTP %d]' % (url, rsp.status_code)) | |||||||||||||
break | ||||||||||||||
class WeedObjStorage(ObjStorage): | class WeedObjStorage(ObjStorage): | |||||||||||||
"""ObjStorage with seaweedfs abilities, using the Filer API. | """ObjStorage with seaweedfs abilities, using the Filer API. | |||||||||||||
https://github.com/chrislusf/seaweedfs/wiki/Filer-Server-API | https://github.com/chrislusf/seaweedfs/wiki/Filer-Server-API | |||||||||||||
""" | """ | |||||||||||||
def __init__(self, url="http://127.0.0.1:8888/swh", compression=None, **kwargs): | def __init__( | |||||||||||||
self, url="http://127.0.0.1:8888/swh", compression=None, slicing="", **kwargs | ||||||||||||||
): | ||||||||||||||
super().__init__(**kwargs) | super().__init__(**kwargs) | |||||||||||||
self.wf = WeedFiler(url) | self.wf = WeedFiler(url) | |||||||||||||
self.root_path = urlparse(url).path | self.root_path = urlparse(url).path | |||||||||||||
if not self.root_path.endswith("/"): | ||||||||||||||
self.root_path += "/" | ||||||||||||||
self.slicer = PathSlicer(self.root_path, slicing) | ||||||||||||||
self.compression = compression | self.compression = compression | |||||||||||||
def check_config(self, *, check_write): | def check_config(self, *, check_write): | |||||||||||||
"""Check the configuration for this object storage""" | """Check the configuration for this object storage""" | |||||||||||||
# FIXME: hopefully this blew up during instantiation | # FIXME: hopefully this blew up during instantiation | |||||||||||||
return True | return True | |||||||||||||
def __contains__(self, obj_id): | def __contains__(self, obj_id): | |||||||||||||
▲ Show 20 Lines • Show All 77 Lines • ▼ Show 20 Lines | def delete(self, obj_id): | |||||||||||||
super().delete(obj_id) # Check delete permission | super().delete(obj_id) # Check delete permission | |||||||||||||
if obj_id not in self: | if obj_id not in self: | |||||||||||||
raise ObjNotFoundError(obj_id) | raise ObjNotFoundError(obj_id) | |||||||||||||
self.wf.delete(self._path(obj_id)) | self.wf.delete(self._path(obj_id)) | |||||||||||||
return True | return True | |||||||||||||
def list_content(self, last_obj_id=None, limit=DEFAULT_LIMIT): | def list_content(self, last_obj_id=None, limit=DEFAULT_LIMIT): | |||||||||||||
if last_obj_id: | if last_obj_id: | |||||||||||||
last_obj_id = hashutil.hash_to_hex(last_obj_id) | objid = hashutil.hash_to_hex(last_obj_id) | |||||||||||||
resp = self.wf.list(self.root_path, last_obj_id, limit) | objpath = self._path(objid) | |||||||||||||
if resp is not None: | startdir, lastfilename = objpath.rsplit("/", 1) | |||||||||||||
entries = resp["Entries"] | else: | |||||||||||||
if entries: | startdir = self.root_path | |||||||||||||
for obj in entries: | lastfilename = None | |||||||||||||
if obj is not None: | # startdir = self.wf.build_url(startdir) | |||||||||||||
bytehex = obj["FullPath"].rsplit("/", 1)[-1] | ||||||||||||||
for fname in islice( | ||||||||||||||
self.wf.iterfiles(startdir, last_file_name=lastfilename), limit | ||||||||||||||
): | ||||||||||||||
bytehex = fname.rsplit("/", 1)[-1] | ||||||||||||||
yield hashutil.bytehex_to_hash(bytehex.encode()) | yield hashutil.bytehex_to_hash(bytehex.encode()) | |||||||||||||
# internal methods | # internal methods | |||||||||||||
def _put_object(self, content, obj_id): | def _put_object(self, content, obj_id): | |||||||||||||
"""Create an object in the cloud storage. | """Create an object in the cloud storage. | |||||||||||||
Created object will contain the content and be referenced by | Created object will contain the content and be referenced by | |||||||||||||
the given id. | the given id. | |||||||||||||
""" | """ | |||||||||||||
def compressor(data): | def compressor(data): | |||||||||||||
comp = compressors[self.compression]() | comp = compressors[self.compression]() | |||||||||||||
for chunk in data: | for chunk in data: | |||||||||||||
yield comp.compress(chunk) | yield comp.compress(chunk) | |||||||||||||
yield comp.flush() | yield comp.flush() | |||||||||||||
if isinstance(content, bytes): | if isinstance(content, bytes): | |||||||||||||
content = [content] | content = [content] | |||||||||||||
self.wf.put(io.BytesIO(b"".join(compressor(content))), self._path(obj_id)) | self.wf.put(io.BytesIO(b"".join(compressor(content))), self._path(obj_id)) | |||||||||||||
def _path(self, obj_id): | def _path(self, obj_id): | |||||||||||||
return hashutil.hash_to_hex(obj_id) | return self.slicer.get_path(hashutil.hash_to_hex(obj_id)) |
unrelated to this diff but still, how one can actually use debug messages with this hard-coded in?