Changeset View
Changeset View
Standalone View
Standalone View
swh/objstorage/backends/seaweed.py
# Copyright (C) 2019 The Software Heritage developers | # Copyright (C) 2019 The Software Heritage developers | |||||||||||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | |||||||||||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | |||||||||||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | |||||||||||||
import io | import io | |||||||||||||
from itertools import islice | from itertools import islice | |||||||||||||
import logging | import logging | |||||||||||||
import os | import os | |||||||||||||
from typing import Iterator | from typing import Iterator | |||||||||||||
from urllib.parse import urljoin, urlparse | from urllib.parse import urljoin, urlparse | |||||||||||||
import requests | import requests | |||||||||||||
from swh.model import hashutil | from swh.model import hashutil | |||||||||||||
from swh.objstorage.backends.pathslicing import PathSlicer | ||||||||||||||
from swh.objstorage.exc import Error, ObjNotFoundError | from swh.objstorage.exc import Error, ObjNotFoundError | |||||||||||||
from swh.objstorage.objstorage import ( | from swh.objstorage.objstorage import ( | |||||||||||||
DEFAULT_LIMIT, | DEFAULT_LIMIT, | |||||||||||||
ObjStorage, | ObjStorage, | |||||||||||||
compressors, | compressors, | |||||||||||||
compute_hash, | compute_hash, | |||||||||||||
decompressors, | decompressors, | |||||||||||||
) | ) | |||||||||||||
LOGGER = logging.getLogger(__name__) | LOGGER = logging.getLogger(__name__) | |||||||||||||
LOGGER.setLevel(logging.ERROR) | LOGGER.setLevel(logging.ERROR) | |||||||||||||
ardumont: unrelated to this diff but still, how one can actually use debug messages with this hard-coded… | ||||||||||||||
Done Inline Actionsgood catch, will explore this later on douardda: good catch, will explore this later on | ||||||||||||||
class WeedFiler(object): | class WeedFiler(object): | |||||||||||||
"""Simple class that encapsulates access to a seaweedfs filer service. | """Simple class that encapsulates access to a seaweedfs filer service. | |||||||||||||
Objects are expected to be in a single directory. | ||||||||||||||
TODO: handle errors | TODO: handle errors | |||||||||||||
""" | """ | |||||||||||||
def __init__(self, url): | def __init__(self, url): | |||||||||||||
if not url.endswith("/"): | if not url.endswith("/"): | |||||||||||||
Not Done Inline Actions
That's how it's done in other modules (it does nothing if that won't end with / iirc). On the other hand, this will strip spurious "/" [7] [8] so don't know if that's what we want... $ ipython Python 3.9.2 (default, Feb 28 2021, 17:03:44) Type 'copyright', 'credits' or 'license' for more information IPython 7.27.0 -- An enhanced Interactive Python. Type '?' for help. In [1]: url = "blah" In [2]: url.rstrip("/") Out[2]: 'blah' In [3]: url = "blah/" In [4]: url.rstrip("/") Out[4]: 'blah' In [5]: url.rstrip("//") Out[5]: 'blah' In [6]: url.rstrip("/") Out[6]: 'blah' In [7]: url = "blah//" In [8]: url.rstrip("/") Out[8]: 'blah' ardumont: That's how it's done in other modules (it does nothing if that won't end with / iirc).
On the… | ||||||||||||||
Done Inline Actionsah yes sure, thx douardda: ah yes sure, thx | ||||||||||||||
url = url + "/" | url = url + "/" | |||||||||||||
self.url = url | self.url = url | |||||||||||||
self.baseurl = urljoin(url, "/") | self.baseurl = urljoin(url, "/") | |||||||||||||
self.basepath = urlparse(url).path | self.basepath = urlparse(url).path | |||||||||||||
self.session = requests.Session() | self.session = requests.Session() | |||||||||||||
self.session.headers["Accept"] = "application/json" | self.session.headers["Accept"] = "application/json" | |||||||||||||
Show All 19 Lines | def put(self, fp, remote_path): | |||||||||||||
url = self.build_url(remote_path) | url = self.build_url(remote_path) | |||||||||||||
LOGGER.debug("Put file %s", url) | LOGGER.debug("Put file %s", url) | |||||||||||||
return self.session.post(url, files={"file": fp}) | return self.session.post(url, files={"file": fp}) | |||||||||||||
def delete(self, remote_path): | def delete(self, remote_path): | |||||||||||||
url = self.build_url(remote_path) | url = self.build_url(remote_path) | |||||||||||||
LOGGER.debug("Delete file %s", url) | LOGGER.debug("Delete file %s", url) | |||||||||||||
return self.session.delete(url) | return self.session.delete(url) | |||||||||||||
def iterfiles(self, last_file_name: str = "") -> Iterator[str]: | def iterfiles( | |||||||||||||
"""yield absolute file names | self, dir: str, last_file_name: str = "" | |||||||||||||
) -> Iterator[str]: | ||||||||||||||
Not Done Inline Actions
you already typed it (and it was a bit out of sync with the type for the 2nd param) ardumont: you already typed it (and it was a bit out of sync with the type for the 2nd param) | ||||||||||||||
Done Inline Actionsyep, let me fix this douardda: yep, let me fix this | ||||||||||||||
"""Recursively yield absolute file names | ||||||||||||||
Args: | Args: | |||||||||||||
dir: retrieve file names starting from this directory; must | ||||||||||||||
be an absolute path. | ||||||||||||||
last_file_name: if given, starts from the file just after; must | last_file_name: if given, starts from the file just after; must | |||||||||||||
be basename. | be basename. | |||||||||||||
Yields: | Yields: | |||||||||||||
absolute file names | absolute file names | |||||||||||||
""" | """ | |||||||||||||
for entry in self._iter_dir(last_file_name): | if not dir.endswith("/"): | |||||||||||||
dir = dir + "/" | ||||||||||||||
# first, generates files going "down" | ||||||||||||||
yield from self._iter_files(dir, last_file_name) | ||||||||||||||
# then, continue iterate going up the tree | ||||||||||||||
while dir != self.basepath: | ||||||||||||||
dir, last = dir[:-1].rsplit("/", 1) | ||||||||||||||
dir += "/" | ||||||||||||||
yield from self._iter_files(dir, last_file_name=last) | ||||||||||||||
def _iter_files( | ||||||||||||||
self, dir: str, last_file_name: str = "" | ||||||||||||||
) -> Iterator[str]: | ||||||||||||||
for entry in self._iter_one_dir(dir, last_file_name): | ||||||||||||||
fullpath = entry["FullPath"] | fullpath = entry["FullPath"] | |||||||||||||
if entry["Mode"] & 1 << 31: # it's a directory, recurse | if entry["Mode"] & 1 << 31: # it's a directory, recurse | |||||||||||||
# see https://pkg.go.dev/io/fs#FileMode | # see https://pkg.go.dev/io/fs#FileMode | |||||||||||||
Not Done Inline Actionsi'll believe you on that one... ardumont: i'll believe you on that one... | ||||||||||||||
Done Inline Actionsit's not very well documented in seaweedfs Filer API doc, but it actually comes from go FileMode stuff: https://pkg.go.dev/io/fs#FileMode douardda: it's not very well documented in seaweedfs Filer API doc, but it actually comes from go… | ||||||||||||||
yield from self.iterfiles(fullpath) | yield from self._iter_files(fullpath) | |||||||||||||
else: | else: | |||||||||||||
yield fullpath | yield fullpath | |||||||||||||
def _iter_dir(self, last_file_name: str = ""): | def _iter_one_dir(self, remote_path: str, last_file_name: str = ""): | |||||||||||||
url = self.build_url(remote_path) | ||||||||||||||
params = {"limit": self.batchsize} | params = {"limit": self.batchsize} | |||||||||||||
if last_file_name: | if last_file_name: | |||||||||||||
params["lastFileName"] = last_file_name | params["lastFileName"] = last_file_name | |||||||||||||
LOGGER.debug("List directory %s", self.url) | LOGGER.debug("List directory %s", url) | |||||||||||||
while True: | while True: | |||||||||||||
rsp = self.session.get(self.url, params=params) | rsp = self.session.get(url, params=params) | |||||||||||||
if rsp.ok: | if rsp.ok: | |||||||||||||
dircontent = rsp.json() | dircontent = rsp.json() | |||||||||||||
if dircontent["Entries"]: | if dircontent["Entries"]: | |||||||||||||
yield from dircontent["Entries"] | yield from dircontent["Entries"] | |||||||||||||
if not dircontent["ShouldDisplayLoadMore"]: | if not dircontent["ShouldDisplayLoadMore"]: | |||||||||||||
break | break | |||||||||||||
params["lastFileName"] = dircontent["LastFileName"] | params["lastFileName"] = dircontent["LastFileName"] | |||||||||||||
else: | else: | |||||||||||||
LOGGER.error( | LOGGER.error('Error listing "%s". [HTTP %d]' % (url, rsp.status_code)) | |||||||||||||
'Error listing "%s". [HTTP %d]' % (self.url, rsp.status_code) | ||||||||||||||
) | ||||||||||||||
break | break | |||||||||||||
class WeedObjStorage(ObjStorage): | class WeedObjStorage(ObjStorage): | |||||||||||||
"""ObjStorage with seaweedfs abilities, using the Filer API. | """ObjStorage with seaweedfs abilities, using the Filer API. | |||||||||||||
https://github.com/chrislusf/seaweedfs/wiki/Filer-Server-API | https://github.com/chrislusf/seaweedfs/wiki/Filer-Server-API | |||||||||||||
""" | """ | |||||||||||||
def __init__(self, url="http://127.0.0.1:8888/swh", compression=None, **kwargs): | def __init__( | |||||||||||||
self, url="http://127.0.0.1:8888/swh", compression=None, slicing="", **kwargs | ||||||||||||||
): | ||||||||||||||
super().__init__(**kwargs) | super().__init__(**kwargs) | |||||||||||||
self.wf = WeedFiler(url) | self.wf = WeedFiler(url) | |||||||||||||
self.root_path = urlparse(url).path | self.root_path = urlparse(url).path | |||||||||||||
if not self.root_path.endswith("/"): | if not self.root_path.endswith("/"): | |||||||||||||
self.root_path += "/" | self.root_path += "/" | |||||||||||||
self.slicer = PathSlicer(self.root_path, slicing) | ||||||||||||||
self.compression = compression | self.compression = compression | |||||||||||||
def check_config(self, *, check_write): | def check_config(self, *, check_write): | |||||||||||||
"""Check the configuration for this object storage""" | """Check the configuration for this object storage""" | |||||||||||||
# FIXME: hopefully this blew up during instantiation | # FIXME: hopefully this blew up during instantiation | |||||||||||||
return True | return True | |||||||||||||
def __contains__(self, obj_id): | def __contains__(self, obj_id): | |||||||||||||
▲ Show 20 Lines • Show All 78 Lines • ▼ Show 20 Lines | def delete(self, obj_id): | |||||||||||||
if obj_id not in self: | if obj_id not in self: | |||||||||||||
raise ObjNotFoundError(obj_id) | raise ObjNotFoundError(obj_id) | |||||||||||||
self.wf.delete(self._path(obj_id)) | self.wf.delete(self._path(obj_id)) | |||||||||||||
return True | return True | |||||||||||||
def list_content(self, last_obj_id=None, limit=DEFAULT_LIMIT): | def list_content(self, last_obj_id=None, limit=DEFAULT_LIMIT): | |||||||||||||
if last_obj_id: | if last_obj_id: | |||||||||||||
objid = hashutil.hash_to_hex(last_obj_id) | objid = hashutil.hash_to_hex(last_obj_id) | |||||||||||||
lastfilename = objid | objpath = self._path(objid) | |||||||||||||
startdir, lastfilename = objpath.rsplit("/", 1) | ||||||||||||||
else: | else: | |||||||||||||
startdir = self.root_path | ||||||||||||||
lastfilename = None | lastfilename = None | |||||||||||||
for fname in islice(self.wf.iterfiles(last_file_name=lastfilename), limit): | for fname in islice( | |||||||||||||
self.wf.iterfiles(startdir, last_file_name=lastfilename), limit | ||||||||||||||
): | ||||||||||||||
bytehex = fname.rsplit("/", 1)[-1] | bytehex = fname.rsplit("/", 1)[-1] | |||||||||||||
yield hashutil.bytehex_to_hash(bytehex.encode()) | yield hashutil.bytehex_to_hash(bytehex.encode()) | |||||||||||||
# internal methods | # internal methods | |||||||||||||
def _put_object(self, content, obj_id): | def _put_object(self, content, obj_id): | |||||||||||||
"""Create an object in the cloud storage. | """Create an object in the cloud storage. | |||||||||||||
Created object will contain the content and be referenced by | Created object will contain the content and be referenced by | |||||||||||||
the given id. | the given id. | |||||||||||||
""" | """ | |||||||||||||
def compressor(data): | def compressor(data): | |||||||||||||
comp = compressors[self.compression]() | comp = compressors[self.compression]() | |||||||||||||
for chunk in data: | for chunk in data: | |||||||||||||
yield comp.compress(chunk) | yield comp.compress(chunk) | |||||||||||||
yield comp.flush() | yield comp.flush() | |||||||||||||
if isinstance(content, bytes): | if isinstance(content, bytes): | |||||||||||||
content = [content] | content = [content] | |||||||||||||
self.wf.put(io.BytesIO(b"".join(compressor(content))), self._path(obj_id)) | self.wf.put(io.BytesIO(b"".join(compressor(content))), self._path(obj_id)) | |||||||||||||
def _path(self, obj_id): | def _path(self, obj_id): | |||||||||||||
return os.path.join(self.wf.basepath, hashutil.hash_to_hex(obj_id)) | return self.slicer.get_path(hashutil.hash_to_hex(obj_id)) |
unrelated to this diff but still, how one can actually use debug messages with this hard-coded in?