Changeset View
Changeset View
Standalone View
Standalone View
swh/storage/cassandra/storage.py
Show All 11 Lines | |||||
import attr | import attr | ||||
import dateutil | import dateutil | ||||
from swh.model.model import ( | from swh.model.model import ( | ||||
Revision, Release, Directory, DirectoryEntry, Content, SkippedContent, | Revision, Release, Directory, DirectoryEntry, Content, SkippedContent, | ||||
OriginVisit, Snapshot, Origin | OriginVisit, Snapshot, Origin | ||||
) | ) | ||||
from swh.objstorage import get_objstorage | |||||
from swh.objstorage.exc import ObjNotFoundError | |||||
try: | try: | ||||
from swh.journal.writer import get_journal_writer | from swh.journal.writer import get_journal_writer | ||||
except ImportError: | except ImportError: | ||||
get_journal_writer = None # type: ignore | get_journal_writer = None # type: ignore | ||||
# mypy limitation, see https://github.com/python/mypy/issues/1153 | # mypy limitation, see https://github.com/python/mypy/issues/1153 | ||||
from swh.storage.objstorage import ObjStorage | |||||
from .. import HashCollision | from .. import HashCollision | ||||
from ..exc import StorageArgumentException | from ..exc import StorageArgumentException | ||||
from .common import TOKEN_BEGIN, TOKEN_END | from .common import TOKEN_BEGIN, TOKEN_END | ||||
from .converters import ( | from .converters import ( | ||||
revision_to_db, revision_from_db, release_to_db, release_from_db, | revision_to_db, revision_from_db, release_to_db, release_from_db, | ||||
) | ) | ||||
from .cql import CqlRunner | from .cql import CqlRunner | ||||
from .schema import HASH_ALGORITHMS | from .schema import HASH_ALGORITHMS | ||||
# Max block size of contents to return | # Max block size of contents to return | ||||
BULK_BLOCK_CONTENT_LEN_MAX = 10000 | BULK_BLOCK_CONTENT_LEN_MAX = 10000 | ||||
def now(): | def now(): | ||||
return datetime.datetime.now(tz=datetime.timezone.utc) | return datetime.datetime.now(tz=datetime.timezone.utc) | ||||
class CassandraStorage: | class CassandraStorage: | ||||
def __init__(self, hosts, keyspace, objstorage, | def __init__(self, hosts, keyspace, objstorage, | ||||
port=9042, journal_writer=None): | port=9042, journal_writer=None): | ||||
self._cql_runner = CqlRunner(hosts, keyspace, port) | self._cql_runner = CqlRunner(hosts, keyspace, port) | ||||
self.objstorage = get_objstorage(**objstorage) | |||||
if journal_writer: | if journal_writer: | ||||
self.journal_writer = get_journal_writer(**journal_writer) | self.journal_writer = get_journal_writer(**journal_writer) | ||||
else: | else: | ||||
self.journal_writer = None | self.journal_writer = None | ||||
self.objstorage = ObjStorage(objstorage) | |||||
def check_config(self, *, check_write): | def check_config(self, *, check_write): | ||||
self._cql_runner.check_read() | self._cql_runner.check_read() | ||||
return True | return True | ||||
def _content_add(self, contents: List[Content], with_data: bool) -> Dict: | def _content_add(self, contents: List[Content], with_data: bool) -> Dict: | ||||
# Filter-out content already in the database. | # Filter-out content already in the database. | ||||
contents = [c for c in contents | contents = [c for c in contents | ||||
if not self._cql_runner.content_get_from_pk(c.to_dict())] | if not self._cql_runner.content_get_from_pk(c.to_dict())] | ||||
if self.journal_writer: | if self.journal_writer: | ||||
for content in contents: | for content in contents: | ||||
cont = content.to_dict() | cont = content.to_dict() | ||||
if 'data' in cont: | if 'data' in cont: | ||||
del cont['data'] | del cont['data'] | ||||
self.journal_writer.write_addition('content', cont) | self.journal_writer.write_addition('content', cont) | ||||
count_contents = 0 | if with_data: | ||||
count_content_added = 0 | |||||
count_content_bytes_added = 0 | |||||
for content in contents: | |||||
# First insert to the objstorage, if the endpoint is | # First insert to the objstorage, if the endpoint is | ||||
# `content_add` (as opposed to `content_add_metadata`). | # `content_add` (as opposed to `content_add_metadata`). | ||||
# TODO: this should probably be done in concurrently to inserting | # TODO: this should probably be done in concurrently to inserting | ||||
# in index tables (but still before the main table; so an entry is | # in index tables (but still before the main table; so an entry is | ||||
# only added to the main table after everything else was | # only added to the main table after everything else was | ||||
# successfully inserted. | # successfully inserted. | ||||
count_contents += 1 | summary = self.objstorage.content_add( | ||||
if content.status != 'absent': | c for c in contents if c.status != 'absent') | ||||
count_content_added += 1 | content_add_bytes = summary['content:add:bytes'] | ||||
if with_data: | |||||
content_data = content.data | content_add = 0 | ||||
if content_data is None: | for content in contents: | ||||
raise StorageArgumentException('Missing data') | content_add += 1 | ||||
count_content_bytes_added += len(content_data) | |||||
self.objstorage.add(content_data, content.sha1) | |||||
# Then add to index tables | # Then add to index tables | ||||
for algo in HASH_ALGORITHMS: | for algo in HASH_ALGORITHMS: | ||||
self._cql_runner.content_index_add_one(algo, content) | self._cql_runner.content_index_add_one(algo, content) | ||||
# Then to the main table | # Then to the main table | ||||
self._cql_runner.content_add_one(content) | self._cql_runner.content_add_one(content) | ||||
# Note that we check for collisions *after* inserting. This | # Note that we check for collisions *after* inserting. This | ||||
# differs significantly from the pgsql storage, but checking | # differs significantly from the pgsql storage, but checking | ||||
# before insertion does not provide any guarantee in case | # before insertion does not provide any guarantee in case | ||||
# another thread inserts the colliding hash at the same time. | # another thread inserts the colliding hash at the same time. | ||||
# | # | ||||
# The proper way to do it would probably be a BATCH, but this | # The proper way to do it would probably be a BATCH, but this | ||||
# would be inefficient because of the number of partitions we | # would be inefficient because of the number of partitions we | ||||
# need to affect (len(HASH_ALGORITHMS)+1, which is currently 5) | # need to affect (len(HASH_ALGORITHMS)+1, which is currently 5) | ||||
for algo in {'sha1', 'sha1_git'}: | for algo in {'sha1', 'sha1_git'}: | ||||
pks = self._cql_runner.content_get_pks_from_single_hash( | pks = self._cql_runner.content_get_pks_from_single_hash( | ||||
algo, content.get_hash(algo)) | algo, content.get_hash(algo)) | ||||
if len(pks) > 1: | if len(pks) > 1: | ||||
# There are more than the one we just inserted. | # There are more than the one we just inserted. | ||||
raise HashCollision(algo, content.get_hash(algo), pks) | raise HashCollision(algo, content.get_hash(algo), pks) | ||||
summary = { | summary = { | ||||
'content:add': count_content_added, | 'content:add': content_add, | ||||
} | } | ||||
if with_data: | if with_data: | ||||
summary['content:add:bytes'] = count_content_bytes_added | summary['content:add:bytes'] = content_add_bytes | ||||
return summary | return summary | ||||
def content_add(self, content: Iterable[Content]) -> Dict: | def content_add(self, content: Iterable[Content]) -> Dict: | ||||
return self._content_add(list(content), with_data=True) | return self._content_add(list(content), with_data=True) | ||||
def content_update(self, content, keys=[]): | def content_update(self, content, keys=[]): | ||||
raise NotImplementedError( | raise NotImplementedError( | ||||
'content_update is not supported by the Cassandra backend') | 'content_update is not supported by the Cassandra backend') | ||||
def content_add_metadata(self, content: Iterable[Content]) -> Dict: | def content_add_metadata(self, content: Iterable[Content]) -> Dict: | ||||
return self._content_add(list(content), with_data=False) | return self._content_add(list(content), with_data=False) | ||||
def content_get(self, content): | def content_get(self, content): | ||||
if len(content) > BULK_BLOCK_CONTENT_LEN_MAX: | if len(content) > BULK_BLOCK_CONTENT_LEN_MAX: | ||||
raise StorageArgumentException( | raise StorageArgumentException( | ||||
"Sending at most %s contents." % BULK_BLOCK_CONTENT_LEN_MAX) | "Sending at most %s contents." % BULK_BLOCK_CONTENT_LEN_MAX) | ||||
for obj_id in content: | yield from self.objstorage.content_get(content) | ||||
try: | |||||
data = self.objstorage.get(obj_id) | |||||
except ObjNotFoundError: | |||||
yield None | |||||
continue | |||||
yield {'sha1': obj_id, 'data': data} | |||||
def content_get_partition( | def content_get_partition( | ||||
self, partition_id: int, nb_partitions: int, limit: int = 1000, | self, partition_id: int, nb_partitions: int, limit: int = 1000, | ||||
page_token: str = None): | page_token: str = None): | ||||
if limit is None: | if limit is None: | ||||
raise StorageArgumentException('limit should not be None') | raise StorageArgumentException('limit should not be None') | ||||
# Compute start and end of the range of tokens covered by the | # Compute start and end of the range of tokens covered by the | ||||
▲ Show 20 Lines • Show All 835 Lines • Show Last 20 Lines |