Changeset View
Changeset View
Standalone View
Standalone View
swh/storage/in_memory.py
Show All 10 Lines | |||||
import copy | import copy | ||||
import datetime | import datetime | ||||
import itertools | import itertools | ||||
import random | import random | ||||
import warnings | import warnings | ||||
from swh.model.hashutil import DEFAULT_ALGORITHMS | from swh.model.hashutil import DEFAULT_ALGORITHMS | ||||
from swh.model.identifiers import normalize_timestamp | from swh.model.identifiers import normalize_timestamp | ||||
from swh.objstorage import get_objstorage | |||||
from swh.objstorage.exc import ObjNotFoundError | |||||
# Max block size of contents to return | # Max block size of contents to return | ||||
BULK_BLOCK_CONTENT_LEN_MAX = 10000 | BULK_BLOCK_CONTENT_LEN_MAX = 10000 | ||||
def now(): | def now(): | ||||
return datetime.datetime.now(tz=datetime.timezone.utc) | return datetime.datetime.now(tz=datetime.timezone.utc) | ||||
class Storage: | class Storage: | ||||
def __init__(self): | def __init__(self): | ||||
self._contents = {} | self._contents = {} | ||||
self._contents_data = {} | |||||
self._content_indexes = defaultdict(lambda: defaultdict(set)) | self._content_indexes = defaultdict(lambda: defaultdict(set)) | ||||
self._directories = {} | self._directories = {} | ||||
self._revisions = {} | self._revisions = {} | ||||
self._releases = {} | self._releases = {} | ||||
self._snapshots = {} | self._snapshots = {} | ||||
self._origins = [] | self._origins = [] | ||||
self._origin_visits = [] | self._origin_visits = [] | ||||
self._origin_metadata = defaultdict(list) | self._origin_metadata = defaultdict(list) | ||||
self._tools = {} | self._tools = {} | ||||
self._metadata_providers = {} | self._metadata_providers = {} | ||||
self._objects = defaultdict(list) | self._objects = defaultdict(list) | ||||
# ideally we would want a skip list for both fast inserts and searches | # ideally we would want a skip list for both fast inserts and searches | ||||
self._sorted_sha1s = [] | self._sorted_sha1s = [] | ||||
vlorentz: It would make more sense to use a config dict here, like the pg storage. | |||||
self.objstorage = get_objstorage('memory', {}) | |||||
def check_config(self, *, check_write): | def check_config(self, *, check_write): | ||||
"""Check that the storage is configured and ready to go.""" | """Check that the storage is configured and ready to go.""" | ||||
return True | return True | ||||
def content_add(self, contents): | def content_add(self, contents): | ||||
"""Add content blobs to the storage | """Add content blobs to the storage | ||||
Args: | Args: | ||||
Show All 23 Lines | def content_add(self, contents): | ||||
for algorithm in DEFAULT_ALGORITHMS: | for algorithm in DEFAULT_ALGORITHMS: | ||||
self._content_indexes[algorithm][content[algorithm]].add(key) | self._content_indexes[algorithm][content[algorithm]].add(key) | ||||
self._objects[content['sha1_git']].append( | self._objects[content['sha1_git']].append( | ||||
('content', content['sha1'])) | ('content', content['sha1'])) | ||||
self._contents[key] = copy.deepcopy(content) | self._contents[key] = copy.deepcopy(content) | ||||
self._contents[key]['ctime'] = now() | self._contents[key]['ctime'] = now() | ||||
bisect.insort(self._sorted_sha1s, content['sha1']) | bisect.insort(self._sorted_sha1s, content['sha1']) | ||||
if self._contents[key]['status'] == 'visible': | if self._contents[key]['status'] == 'visible': | ||||
self._contents_data[key] = self._contents[key].pop('data') | content_data = self._contents[key].pop('data') | ||||
self.objstorage.add(content_data, content['sha1']) | |||||
def content_get(self, ids): | def content_get(self, ids): | ||||
"""Retrieve in bulk contents and their data. | """Retrieve in bulk contents and their data. | ||||
This function may yield more blobs than provided sha1 identifiers, | This function may yield more blobs than provided sha1 identifiers, | ||||
in case they collide. | in case they collide. | ||||
Args: | Args: | ||||
Show All 10 Lines | def content_get(self, ids): | ||||
ValueError in case of too much contents are required. | ValueError in case of too much contents are required. | ||||
cf. BULK_BLOCK_CONTENT_LEN_MAX | cf. BULK_BLOCK_CONTENT_LEN_MAX | ||||
""" | """ | ||||
# FIXME: Make this method support slicing the `data`. | # FIXME: Make this method support slicing the `data`. | ||||
if len(ids) > BULK_BLOCK_CONTENT_LEN_MAX: | if len(ids) > BULK_BLOCK_CONTENT_LEN_MAX: | ||||
raise ValueError( | raise ValueError( | ||||
"Sending at most %s contents." % BULK_BLOCK_CONTENT_LEN_MAX) | "Sending at most %s contents." % BULK_BLOCK_CONTENT_LEN_MAX) | ||||
for id_ in ids: | for obj_id in ids: | ||||
for key in self._content_indexes['sha1'][id_]: | try: | ||||
yield { | data = self.objstorage.get(obj_id) | ||||
'sha1': id_, | except ObjNotFoundError: | ||||
'data': self._contents_data[key], | yield None | ||||
} | continue | ||||
Not Done Inline ActionsInstead of continue, put the yield in an else clause (try blocks accept an else close that runs if no exception was raised). vlorentz: Instead of `continue`, put the `yield` in an `else` clause (`try` blocks accept an `else` close… | |||||
Not Done Inline Actionsah yes, that'd be more readable. ardumont: ah yes, that'd be more readable. | |||||
yield {'sha1': obj_id, 'data': data} | |||||
def content_get_range(self, start, end, limit=1000, db=None, cur=None): | def content_get_range(self, start, end, limit=1000, db=None, cur=None): | ||||
"""Retrieve contents within range [start, end] bound by limit. | """Retrieve contents within range [start, end] bound by limit. | ||||
Note that this function may return more than one blob per hash. The | Note that this function may return more than one blob per hash. The | ||||
limit is enforced with multiplicity (ie. two blobs with the same hash | limit is enforced with multiplicity (ie. two blobs with the same hash | ||||
will count twice toward the limit). | will count twice toward the limit). | ||||
Show All 14 Lines | def content_get_range(self, start, end, limit=1000, db=None, cur=None): | ||||
if limit is None: | if limit is None: | ||||
raise ValueError('Development error: limit should not be None') | raise ValueError('Development error: limit should not be None') | ||||
from_index = bisect.bisect_left(self._sorted_sha1s, start) | from_index = bisect.bisect_left(self._sorted_sha1s, start) | ||||
sha1s = itertools.islice(self._sorted_sha1s, from_index, None) | sha1s = itertools.islice(self._sorted_sha1s, from_index, None) | ||||
sha1s = ((sha1, content_key) | sha1s = ((sha1, content_key) | ||||
for sha1 in sha1s | for sha1 in sha1s | ||||
for content_key in self._content_indexes['sha1'][sha1]) | for content_key in self._content_indexes['sha1'][sha1]) | ||||
matched = [] | matched = [] | ||||
next_content = None | |||||
for sha1, key in sha1s: | for sha1, key in sha1s: | ||||
if sha1 > end: | if sha1 > end: | ||||
break | break | ||||
if len(matched) >= limit: | if len(matched) >= limit: | ||||
return { | next_content = sha1 | ||||
'contents': matched, | break | ||||
'next': sha1, | |||||
} | |||||
matched.append({ | matched.append({ | ||||
'data': self._contents_data[key], | |||||
**self._contents[key], | **self._contents[key], | ||||
}) | }) | ||||
return { | return { | ||||
'contents': matched, | 'contents': matched, | ||||
'next': None, | 'next': next_content, | ||||
} | } | ||||
def content_get_metadata(self, sha1s): | def content_get_metadata(self, sha1s): | ||||
"""Retrieve content metadata in bulk | """Retrieve content metadata in bulk | ||||
Args: | Args: | ||||
content: iterable of content identifiers (sha1) | content: iterable of content identifiers (sha1) | ||||
Returns: | Returns: | ||||
▲ Show 20 Lines • Show All 1,044 Lines • Show Last 20 Lines |
It would make more sense to use a config dict here, like the pg storage.