Changeset View
Changeset View
Standalone View
Standalone View
swh/storage/in_memory.py
Show All 16 Lines | |||||
from typing import Any, Dict, List, Optional | from typing import Any, Dict, List, Optional | ||||
import attr | import attr | ||||
from swh.model.model import ( | from swh.model.model import ( | ||||
BaseContent, Content, SkippedContent, Directory, Revision, Release, | BaseContent, Content, SkippedContent, Directory, Revision, Release, | ||||
Snapshot, OriginVisit, Origin, SHA1_SIZE) | Snapshot, OriginVisit, Origin, SHA1_SIZE) | ||||
from swh.model.hashutil import DEFAULT_ALGORITHMS, hash_to_bytes, hash_to_hex | from swh.model.hashutil import DEFAULT_ALGORITHMS, hash_to_bytes, hash_to_hex | ||||
from swh.objstorage import get_objstorage | from swh.storage.objstorage import ObjStorage | ||||
from swh.objstorage.exc import ObjNotFoundError | |||||
from . import HashCollision | from . import HashCollision | ||||
from .exc import StorageArgumentException | from .exc import StorageArgumentException | ||||
from .storage import get_journal_writer | from .storage import get_journal_writer | ||||
from .converters import origin_url_to_sha1 | from .converters import origin_url_to_sha1 | ||||
from .utils import get_partition_bounds_bytes | from .utils import get_partition_bounds_bytes | ||||
# Max block size of contents to return | # Max block size of contents to return | ||||
Show All 31 Lines | def reset(self): | ||||
self._origin_metadata = defaultdict(list) | self._origin_metadata = defaultdict(list) | ||||
self._tools = {} | self._tools = {} | ||||
self._metadata_providers = {} | self._metadata_providers = {} | ||||
self._objects = defaultdict(list) | self._objects = defaultdict(list) | ||||
# ideally we would want a skip list for both fast inserts and searches | # ideally we would want a skip list for both fast inserts and searches | ||||
self._sorted_sha1s = [] | self._sorted_sha1s = [] | ||||
self.objstorage = get_objstorage('memory', {}) | self.objstorage = ObjStorage({'cls': 'memory', 'args': {}}) | ||||
def check_config(self, *, check_write): | def check_config(self, *, check_write): | ||||
return True | return True | ||||
def _content_add(self, contents, with_data): | def _content_add(self, contents, with_data): | ||||
for content in contents: | for content in contents: | ||||
if content.status is None: | if content.status is None: | ||||
content.status = 'visible' | content.status = 'visible' | ||||
if content.status == 'absent': | if content.status == 'absent': | ||||
raise StorageArgumentException('content with status=absent') | raise StorageArgumentException('content with status=absent') | ||||
if content.length is None: | if content.length is None: | ||||
raise StorageArgumentException('content with length=None') | raise StorageArgumentException('content with length=None') | ||||
if self.journal_writer: | if self.journal_writer: | ||||
for content in contents: | for content in contents: | ||||
try: | try: | ||||
content = attr.evolve(content, data=None) | content = attr.evolve(content, data=None) | ||||
except (KeyError, TypeError, ValueError) as e: | except (KeyError, TypeError, ValueError) as e: | ||||
raise StorageArgumentException(*e.args) | raise StorageArgumentException(*e.args) | ||||
self.journal_writer.write_addition('content', content) | self.journal_writer.write_addition('content', content) | ||||
summary = { | content_add = 0 | ||||
'content:add': 0, | content_add_bytes = 0 | ||||
} | |||||
if with_data: | if with_data: | ||||
summary['content:add:bytes'] = 0 | summary = self.objstorage.content_add( | ||||
c.to_dict() for c in contents | |||||
if c.status != 'absent') | |||||
content_add_bytes = summary['content:add:bytes'] | |||||
for content in contents: | for content in contents: | ||||
key = self._content_key(content) | key = self._content_key(content) | ||||
if key in self._contents: | if key in self._contents: | ||||
continue | continue | ||||
for algorithm in DEFAULT_ALGORITHMS: | for algorithm in DEFAULT_ALGORITHMS: | ||||
hash_ = content.get_hash(algorithm) | hash_ = content.get_hash(algorithm) | ||||
if hash_ in self._content_indexes[algorithm]\ | if hash_ in self._content_indexes[algorithm]\ | ||||
and (algorithm not in {'blake2s256', 'sha256'}): | and (algorithm not in {'blake2s256', 'sha256'}): | ||||
raise HashCollision(algorithm, hash_, key) | raise HashCollision(algorithm, hash_, key) | ||||
for algorithm in DEFAULT_ALGORITHMS: | for algorithm in DEFAULT_ALGORITHMS: | ||||
hash_ = content.get_hash(algorithm) | hash_ = content.get_hash(algorithm) | ||||
self._content_indexes[algorithm][hash_].add(key) | self._content_indexes[algorithm][hash_].add(key) | ||||
self._objects[content.sha1_git].append( | self._objects[content.sha1_git].append( | ||||
('content', content.sha1)) | ('content', content.sha1)) | ||||
self._contents[key] = content | self._contents[key] = content | ||||
bisect.insort(self._sorted_sha1s, content.sha1) | bisect.insort(self._sorted_sha1s, content.sha1) | ||||
summary['content:add'] += 1 | |||||
if with_data: | |||||
content_data = self._contents[key].data | |||||
try: | try: | ||||
self._contents[key] = attr.evolve( | self._contents[key] = attr.evolve( | ||||
self._contents[key], | self._contents[key], | ||||
data=None) | data=None) | ||||
except (KeyError, TypeError, ValueError) as e: | except (KeyError, TypeError, ValueError) as e: | ||||
raise StorageArgumentException(*e.args) | raise StorageArgumentException(*e.args) | ||||
summary['content:add:bytes'] += len(content_data) | content_add += 1 | ||||
self.objstorage.add(content_data, content.sha1) | |||||
summary = { | |||||
'content:add': content_add, | |||||
} | |||||
if content_add_bytes: | |||||
summary['content:add:bytes'] = content_add_bytes | |||||
return summary | return summary | ||||
def content_add(self, content): | def content_add(self, content): | ||||
now = datetime.datetime.now(tz=datetime.timezone.utc) | now = datetime.datetime.now(tz=datetime.timezone.utc) | ||||
try: | try: | ||||
content = [attr.evolve(Content.from_dict(c), ctime=now) | content = [attr.evolve(Content.from_dict(c), ctime=now) | ||||
for c in content] | for c in content] | ||||
Show All 32 Lines | def content_add_metadata(self, content): | ||||
content = [Content.from_dict(c) for c in content] | content = [Content.from_dict(c) for c in content] | ||||
return self._content_add(content, with_data=False) | return self._content_add(content, with_data=False) | ||||
def content_get(self, content): | def content_get(self, content): | ||||
# FIXME: Make this method support slicing the `data`. | # FIXME: Make this method support slicing the `data`. | ||||
if len(content) > BULK_BLOCK_CONTENT_LEN_MAX: | if len(content) > BULK_BLOCK_CONTENT_LEN_MAX: | ||||
raise StorageArgumentException( | raise StorageArgumentException( | ||||
"Sending at most %s contents." % BULK_BLOCK_CONTENT_LEN_MAX) | "Sending at most %s contents." % BULK_BLOCK_CONTENT_LEN_MAX) | ||||
for obj_id in content: | yield from self.objstorage.content_get(content) | ||||
try: | |||||
data = self.objstorage.get(obj_id) | |||||
except ObjNotFoundError: | |||||
yield None | |||||
continue | |||||
yield {'sha1': obj_id, 'data': data} | |||||
def content_get_range(self, start, end, limit=1000): | def content_get_range(self, start, end, limit=1000): | ||||
if limit is None: | if limit is None: | ||||
raise StorageArgumentException('limit should not be None') | raise StorageArgumentException('limit should not be None') | ||||
from_index = bisect.bisect_left(self._sorted_sha1s, start) | from_index = bisect.bisect_left(self._sorted_sha1s, start) | ||||
sha1s = itertools.islice(self._sorted_sha1s, from_index, None) | sha1s = itertools.islice(self._sorted_sha1s, from_index, None) | ||||
sha1s = ((sha1, content_key) | sha1s = ((sha1, content_key) | ||||
for sha1 in sha1s | for sha1 in sha1s | ||||
▲ Show 20 Lines • Show All 902 Lines • Show Last 20 Lines |