Changeset View
Changeset View
Standalone View
Standalone View
swh/objstorage/backends/seaweed.py
# Copyright (C) 2019 The Software Heritage developers | # Copyright (C) 2019 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
import io | import io | ||||
from itertools import islice | |||||
import logging | import logging | ||||
import os | |||||
from typing import Iterator, Optional | |||||
from urllib.parse import urljoin, urlparse | from urllib.parse import urljoin, urlparse | ||||
import requests | import requests | ||||
from swh.model import hashutil | from swh.model import hashutil | ||||
from swh.objstorage.exc import Error, ObjNotFoundError | from swh.objstorage.exc import Error, ObjNotFoundError | ||||
from swh.objstorage.objstorage import ( | from swh.objstorage.objstorage import ( | ||||
DEFAULT_LIMIT, | DEFAULT_LIMIT, | ||||
ObjStorage, | ObjStorage, | ||||
compressors, | compressors, | ||||
compute_hash, | compute_hash, | ||||
decompressors, | decompressors, | ||||
) | ) | ||||
LOGGER = logging.getLogger(__name__) | LOGGER = logging.getLogger(__name__) | ||||
LOGGER.setLevel(logging.ERROR) | LOGGER.setLevel(logging.ERROR) | ||||
class WeedFiler(object): | class WeedFiler(object): | ||||
"""Simple class that encapsulates access to a seaweedfs filer service. | """Simple class that encapsulates access to a seaweedfs filer service. | ||||
Objects are expected to be in a single directory. | |||||
TODO: handle errors | TODO: handle errors | ||||
""" | """ | ||||
def __init__(self, url): | def __init__(self, url): | ||||
if not url.endswith("/"): | |||||
url = url + "/" | |||||
self.url = url | self.url = url | ||||
self.baseurl = urljoin(url, "/") | |||||
self.basepath = urlparse(url).path | |||||
self.session = requests.Session() | |||||
self.session.headers["Accept"] = "application/json" | |||||
self.batchsize = DEFAULT_LIMIT | |||||
def build_url(self, path): | |||||
assert path == self.basepath or path.startswith(self.basepath) | |||||
return urljoin(self.baseurl, path) | |||||
def get(self, remote_path): | def get(self, remote_path): | ||||
url = urljoin(self.url, remote_path) | url = self.build_url(remote_path) | ||||
LOGGER.debug("Get file %s", url) | LOGGER.debug("Get file %s", url) | ||||
return requests.get(url).content | resp = self.session.get(url) | ||||
resp.raise_for_status() | |||||
return resp.content | |||||
def exists(self, remote_path): | def exists(self, remote_path): | ||||
url = urljoin(self.url, remote_path) | url = self.build_url(remote_path) | ||||
LOGGER.debug("Check file %s", url) | LOGGER.debug("Check file %s", url) | ||||
return requests.head(url).status_code == 200 | return self.session.head(url).status_code == 200 | ||||
def put(self, fp, remote_path): | def put(self, fp, remote_path): | ||||
url = urljoin(self.url, remote_path) | url = self.build_url(remote_path) | ||||
LOGGER.debug("Put file %s", url) | LOGGER.debug("Put file %s", url) | ||||
return requests.post(url, files={"file": fp}) | return self.session.post(url, files={"file": fp}) | ||||
def delete(self, remote_path): | def delete(self, remote_path): | ||||
url = urljoin(self.url, remote_path) | url = self.build_url(remote_path) | ||||
LOGGER.debug("Delete file %s", url) | LOGGER.debug("Delete file %s", url) | ||||
return requests.delete(url) | return self.session.delete(url) | ||||
def iterfiles(self, last_file_name: Optional[str] = None) -> Iterator[str]: | |||||
"""yield absolute file names | |||||
def list(self, dir, last_file_name=None, limit=DEFAULT_LIMIT): | Args: | ||||
"""list sub folders and files of @dir. show a better look if you turn on | last_file_name (str): if given, starts from the file just after; must | ||||
be basename. | |||||
returns a dict of "sub-folders and files" | Yields: | ||||
absolute file names | |||||
""" | """ | ||||
d = dir if dir.endswith("/") else (dir + "/") | for entry in self._iter_dir(last_file_name): | ||||
url = urljoin(self.url, d) | fullpath = entry["FullPath"] | ||||
headers = {"Accept": "application/json"} | if entry["Mode"] & 1 << 31: # it's a directory, recurse | ||||
params = {"limit": limit} | yield from self.iterfiles(fullpath) | ||||
else: | |||||
yield fullpath | |||||
def _iter_dir(self, last_file_name=None): | |||||
params = {"limit": self.batchsize} | |||||
if last_file_name: | if last_file_name: | ||||
params["lastFileName"] = last_file_name | params["lastFileName"] = last_file_name | ||||
LOGGER.debug("List directory %s", url) | LOGGER.debug("List directory %s", self.url) | ||||
rsp = requests.get(url, params=params, headers=headers) | while True: | ||||
rsp = self.session.get(self.url, params=params) | |||||
if rsp.ok: | if rsp.ok: | ||||
return rsp.json() | dircontent = rsp.json() | ||||
if dircontent["Entries"]: | |||||
yield from dircontent["Entries"] | |||||
if not dircontent["ShouldDisplayLoadMore"]: | |||||
break | |||||
params["lastFileName"] = dircontent["LastFileName"] | |||||
else: | else: | ||||
LOGGER.error('Error listing "%s". [HTTP %d]' % (url, rsp.status_code)) | LOGGER.error( | ||||
'Error listing "%s". [HTTP %d]' % (self.url, rsp.status_code) | |||||
) | |||||
break | |||||
vlorentz: would this work, as a minor simplification? | |||||
Done Inline Actionsneed to check against actual seaweed filer douardda: need to check against actual seaweed filer | |||||
class WeedObjStorage(ObjStorage): | class WeedObjStorage(ObjStorage): | ||||
"""ObjStorage with seaweedfs abilities, using the Filer API. | """ObjStorage with seaweedfs abilities, using the Filer API. | ||||
https://github.com/chrislusf/seaweedfs/wiki/Filer-Server-API | https://github.com/chrislusf/seaweedfs/wiki/Filer-Server-API | ||||
""" | """ | ||||
def __init__(self, url="http://127.0.0.1:8888/swh", compression=None, **kwargs): | def __init__(self, url="http://127.0.0.1:8888/swh", compression=None, **kwargs): | ||||
super().__init__(**kwargs) | super().__init__(**kwargs) | ||||
self.wf = WeedFiler(url) | self.wf = WeedFiler(url) | ||||
self.root_path = urlparse(url).path | self.root_path = urlparse(url).path | ||||
if not self.root_path.endswith("/"): | |||||
self.root_path += "/" | |||||
self.compression = compression | self.compression = compression | ||||
def check_config(self, *, check_write): | def check_config(self, *, check_write): | ||||
"""Check the configuration for this object storage""" | """Check the configuration for this object storage""" | ||||
# FIXME: hopefully this blew up during instantiation | # FIXME: hopefully this blew up during instantiation | ||||
return True | return True | ||||
def __contains__(self, obj_id): | def __contains__(self, obj_id): | ||||
▲ Show 20 Lines • Show All 77 Lines • ▼ Show 20 Lines | def delete(self, obj_id): | ||||
super().delete(obj_id) # Check delete permission | super().delete(obj_id) # Check delete permission | ||||
if obj_id not in self: | if obj_id not in self: | ||||
raise ObjNotFoundError(obj_id) | raise ObjNotFoundError(obj_id) | ||||
self.wf.delete(self._path(obj_id)) | self.wf.delete(self._path(obj_id)) | ||||
return True | return True | ||||
def list_content(self, last_obj_id=None, limit=DEFAULT_LIMIT): | def list_content(self, last_obj_id=None, limit=DEFAULT_LIMIT): | ||||
if last_obj_id: | if last_obj_id: | ||||
last_obj_id = hashutil.hash_to_hex(last_obj_id) | objid = hashutil.hash_to_hex(last_obj_id) | ||||
resp = self.wf.list(self.root_path, last_obj_id, limit) | lastfilename = objid | ||||
if resp is not None: | else: | ||||
entries = resp["Entries"] | lastfilename = None | ||||
if entries: | # startdir = self.wf.build_url(startdir) | ||||
for obj in entries: | |||||
if obj is not None: | for fname in islice(self.wf.iterfiles(last_file_name=lastfilename), limit): | ||||
bytehex = obj["FullPath"].rsplit("/", 1)[-1] | bytehex = fname.rsplit("/", 1)[-1] | ||||
yield hashutil.bytehex_to_hash(bytehex.encode()) | yield hashutil.bytehex_to_hash(bytehex.encode()) | ||||
# internal methods | # internal methods | ||||
def _put_object(self, content, obj_id): | def _put_object(self, content, obj_id): | ||||
"""Create an object in the cloud storage. | """Create an object in the cloud storage. | ||||
Created object will contain the content and be referenced by | Created object will contain the content and be referenced by | ||||
the given id. | the given id. | ||||
""" | """ | ||||
def compressor(data): | def compressor(data): | ||||
comp = compressors[self.compression]() | comp = compressors[self.compression]() | ||||
for chunk in data: | for chunk in data: | ||||
yield comp.compress(chunk) | yield comp.compress(chunk) | ||||
yield comp.flush() | yield comp.flush() | ||||
if isinstance(content, bytes): | if isinstance(content, bytes): | ||||
content = [content] | content = [content] | ||||
self.wf.put(io.BytesIO(b"".join(compressor(content))), self._path(obj_id)) | self.wf.put(io.BytesIO(b"".join(compressor(content))), self._path(obj_id)) | ||||
def _path(self, obj_id): | def _path(self, obj_id): | ||||
return hashutil.hash_to_hex(obj_id) | return os.path.join(self.wf.basepath, hashutil.hash_to_hex(obj_id)) |
would this work, as a minor simplification?