Changeset View
Changeset View
Standalone View
Standalone View
swh/storage/cassandra/storage.py
Show All 11 Lines | |||||
import attr | import attr | ||||
import dateutil | import dateutil | ||||
from swh.model.model import ( | from swh.model.model import ( | ||||
Revision, Release, Directory, DirectoryEntry, Content, SkippedContent, | Revision, Release, Directory, DirectoryEntry, Content, SkippedContent, | ||||
OriginVisit, Snapshot, Origin | OriginVisit, Snapshot, Origin | ||||
) | ) | ||||
try: | |||||
from swh.journal.writer import get_journal_writer | |||||
except ImportError: | |||||
get_journal_writer = None # type: ignore | |||||
# mypy limitation, see https://github.com/python/mypy/issues/1153 | |||||
from swh.storage.objstorage import ObjStorage | from swh.storage.objstorage import ObjStorage | ||||
from swh.storage.writer import JournalWriter | |||||
from .. import HashCollision | from .. import HashCollision | ||||
from ..exc import StorageArgumentException | from ..exc import StorageArgumentException | ||||
from .common import TOKEN_BEGIN, TOKEN_END | from .common import TOKEN_BEGIN, TOKEN_END | ||||
from .converters import ( | from .converters import ( | ||||
revision_to_db, revision_from_db, release_to_db, release_from_db, | revision_to_db, revision_from_db, release_to_db, release_from_db, | ||||
) | ) | ||||
from .cql import CqlRunner | from .cql import CqlRunner | ||||
from .schema import HASH_ALGORITHMS | from .schema import HASH_ALGORITHMS | ||||
# Max block size of contents to return | # Max block size of contents to return | ||||
BULK_BLOCK_CONTENT_LEN_MAX = 10000 | BULK_BLOCK_CONTENT_LEN_MAX = 10000 | ||||
def now(): | def now(): | ||||
return datetime.datetime.now(tz=datetime.timezone.utc) | return datetime.datetime.now(tz=datetime.timezone.utc) | ||||
class CassandraStorage: | class CassandraStorage: | ||||
def __init__(self, hosts, keyspace, objstorage, | def __init__(self, hosts, keyspace, objstorage, | ||||
port=9042, journal_writer=None): | port=9042, journal_writer=None): | ||||
self._cql_runner = CqlRunner(hosts, keyspace, port) | self._cql_runner = CqlRunner(hosts, keyspace, port) | ||||
self.journal_writer = JournalWriter(journal_writer) | |||||
if journal_writer: | |||||
self.journal_writer = get_journal_writer(**journal_writer) | |||||
else: | |||||
self.journal_writer = None | |||||
self.objstorage = ObjStorage(objstorage) | self.objstorage = ObjStorage(objstorage) | ||||
def check_config(self, *, check_write): | def check_config(self, *, check_write): | ||||
self._cql_runner.check_read() | self._cql_runner.check_read() | ||||
return True | return True | ||||
def _content_add(self, contents: List[Content], with_data: bool) -> Dict: | def _content_add(self, contents: List[Content], with_data: bool) -> Dict: | ||||
# Filter-out content already in the database. | # Filter-out content already in the database. | ||||
contents = [c for c in contents | contents = [c for c in contents | ||||
if not self._cql_runner.content_get_from_pk(c.to_dict())] | if not self._cql_runner.content_get_from_pk(c.to_dict())] | ||||
if self.journal_writer: | self.journal_writer.content_add(contents) | ||||
for content in contents: | |||||
cont = content.to_dict() | |||||
if 'data' in cont: | |||||
del cont['data'] | |||||
self.journal_writer.write_addition('content', cont) | |||||
if with_data: | if with_data: | ||||
# First insert to the objstorage, if the endpoint is | # First insert to the objstorage, if the endpoint is | ||||
# `content_add` (as opposed to `content_add_metadata`). | # `content_add` (as opposed to `content_add_metadata`). | ||||
# TODO: this should probably be done in concurrently to inserting | # TODO: this should probably be done in concurrently to inserting | ||||
# in index tables (but still before the main table; so an entry is | # in index tables (but still before the main table; so an entry is | ||||
# only added to the main table after everything else was | # only added to the main table after everything else was | ||||
# successfully inserted. | # successfully inserted. | ||||
▲ Show 20 Lines • Show All 163 Lines • ▼ Show 20 Lines | def content_get_random(self): | ||||
return self._cql_runner.content_get_random().sha1_git | return self._cql_runner.content_get_random().sha1_git | ||||
def _skipped_content_add(self, contents: Iterable[SkippedContent]) -> Dict: | def _skipped_content_add(self, contents: Iterable[SkippedContent]) -> Dict: | ||||
# Filter-out content already in the database. | # Filter-out content already in the database. | ||||
contents = [ | contents = [ | ||||
c for c in contents | c for c in contents | ||||
if not self._cql_runner.skipped_content_get_from_pk(c.to_dict())] | if not self._cql_runner.skipped_content_get_from_pk(c.to_dict())] | ||||
if self.journal_writer: | self.journal_writer.skipped_content_add(contents) | ||||
for content in contents: | |||||
cont = content.to_dict() | |||||
if 'data' in cont: | |||||
del cont['data'] | |||||
self.journal_writer.write_addition('content', cont) | |||||
for content in contents: | for content in contents: | ||||
# Add to index tables | # Add to index tables | ||||
for algo in HASH_ALGORITHMS: | for algo in HASH_ALGORITHMS: | ||||
if content.get_hash(algo) is not None: | if content.get_hash(algo) is not None: | ||||
self._cql_runner.skipped_content_index_add_one( | self._cql_runner.skipped_content_index_add_one( | ||||
algo, content) | algo, content) | ||||
Show All 14 Lines | class CassandraStorage: | ||||
def directory_add(self, directories: Iterable[Directory]) -> Dict: | def directory_add(self, directories: Iterable[Directory]) -> Dict: | ||||
directories = list(directories) | directories = list(directories) | ||||
# Filter out directories that are already inserted. | # Filter out directories that are already inserted. | ||||
missing = self.directory_missing([dir_.id for dir_ in directories]) | missing = self.directory_missing([dir_.id for dir_ in directories]) | ||||
directories = [dir_ for dir_ in directories if dir_.id in missing] | directories = [dir_ for dir_ in directories if dir_.id in missing] | ||||
if self.journal_writer: | self.journal_writer.directory_add(directories) | ||||
self.journal_writer.write_additions('directory', directories) | |||||
for directory in directories: | for directory in directories: | ||||
# Add directory entries to the 'directory_entry' table | # Add directory entries to the 'directory_entry' table | ||||
for entry in directory.entries: | for entry in directory.entries: | ||||
self._cql_runner.directory_entry_add_one({ | self._cql_runner.directory_entry_add_one({ | ||||
**entry.to_dict(), | **entry.to_dict(), | ||||
'directory_id': directory.id | 'directory_id': directory.id | ||||
}) | }) | ||||
▲ Show 20 Lines • Show All 87 Lines • ▼ Show 20 Lines | class CassandraStorage: | ||||
def revision_add(self, revisions: Iterable[Revision]) -> Dict: | def revision_add(self, revisions: Iterable[Revision]) -> Dict: | ||||
revisions = list(revisions) | revisions = list(revisions) | ||||
# Filter-out revisions already in the database | # Filter-out revisions already in the database | ||||
missing = self.revision_missing([rev.id for rev in revisions]) | missing = self.revision_missing([rev.id for rev in revisions]) | ||||
revisions = [rev for rev in revisions if rev.id in missing] | revisions = [rev for rev in revisions if rev.id in missing] | ||||
if self.journal_writer: | self.journal_writer.revision_add(revisions) | ||||
self.journal_writer.write_additions('revision', revisions) | |||||
for revision in revisions: | for revision in revisions: | ||||
revision = revision_to_db(revision) | revision = revision_to_db(revision) | ||||
if revision: | if revision: | ||||
# Add parents first | # Add parents first | ||||
for (rank, parent) in enumerate(revision.parents): | for (rank, parent) in enumerate(revision.parents): | ||||
self._cql_runner.revision_parent_add_one( | self._cql_runner.revision_parent_add_one( | ||||
revision.id, rank, parent) | revision.id, rank, parent) | ||||
▲ Show 20 Lines • Show All 75 Lines • ▼ Show 20 Lines | class CassandraStorage: | ||||
def revision_get_random(self): | def revision_get_random(self): | ||||
return self._cql_runner.revision_get_random().id | return self._cql_runner.revision_get_random().id | ||||
def release_add(self, releases: Iterable[Release]) -> Dict: | def release_add(self, releases: Iterable[Release]) -> Dict: | ||||
missing = self.release_missing([rel.id for rel in releases]) | missing = self.release_missing([rel.id for rel in releases]) | ||||
releases = [rel for rel in releases if rel.id in missing] | releases = [rel for rel in releases if rel.id in missing] | ||||
if self.journal_writer: | self.journal_writer.release_add(releases) | ||||
self.journal_writer.write_additions('release', releases) | |||||
for release in releases: | for release in releases: | ||||
if release: | if release: | ||||
release = release_to_db(release) | release = release_to_db(release) | ||||
self._cql_runner.release_add_one(release) | self._cql_runner.release_add_one(release) | ||||
return {'release:add': len(missing)} | return {'release:add': len(missing)} | ||||
Show All 15 Lines | def release_get_random(self): | ||||
return self._cql_runner.release_get_random().id | return self._cql_runner.release_get_random().id | ||||
def snapshot_add(self, snapshots: Iterable[Snapshot]) -> Dict: | def snapshot_add(self, snapshots: Iterable[Snapshot]) -> Dict: | ||||
missing = self._cql_runner.snapshot_missing( | missing = self._cql_runner.snapshot_missing( | ||||
[snp.id for snp in snapshots]) | [snp.id for snp in snapshots]) | ||||
snapshots = [snp for snp in snapshots if snp.id in missing] | snapshots = [snp for snp in snapshots if snp.id in missing] | ||||
for snapshot in snapshots: | for snapshot in snapshots: | ||||
if self.journal_writer: | self.journal_writer.snapshot_add(snapshot) | ||||
self.journal_writer.write_addition('snapshot', snapshot) | |||||
# Add branches | # Add branches | ||||
for (branch_name, branch) in snapshot.branches.items(): | for (branch_name, branch) in snapshot.branches.items(): | ||||
if branch is None: | if branch is None: | ||||
target_type = None | target_type = None | ||||
target = None | target = None | ||||
else: | else: | ||||
target_type = branch.target_type.value | target_type = branch.target_type.value | ||||
▲ Show 20 Lines • Show All 229 Lines • ▼ Show 20 Lines | def origin_add(self, origins: Iterable[Origin]) -> List[Dict]: | ||||
return results | return results | ||||
def origin_add_one(self, origin: Origin) -> str: | def origin_add_one(self, origin: Origin) -> str: | ||||
known_origin = self.origin_get_one(origin.to_dict()) | known_origin = self.origin_get_one(origin.to_dict()) | ||||
if known_origin: | if known_origin: | ||||
origin_url = known_origin['url'] | origin_url = known_origin['url'] | ||||
else: | else: | ||||
if self.journal_writer: | self.journal_writer.origin_add_one(origin) | ||||
self.journal_writer.write_addition('origin', origin) | |||||
self._cql_runner.origin_add_one(origin) | self._cql_runner.origin_add_one(origin) | ||||
origin_url = origin.url | origin_url = origin.url | ||||
return origin_url | return origin_url | ||||
def origin_visit_add( | def origin_visit_add( | ||||
self, origin, date, type) -> Optional[Dict[str, Union[str, int]]]: | self, origin, date, type) -> Optional[Dict[str, Union[str, int]]]: | ||||
Show All 16 Lines | def origin_visit_add( | ||||
'type': type, | 'type': type, | ||||
'status': 'ongoing', | 'status': 'ongoing', | ||||
'snapshot': None, | 'snapshot': None, | ||||
'metadata': None, | 'metadata': None, | ||||
'visit': visit_id | 'visit': visit_id | ||||
}) | }) | ||||
except (KeyError, TypeError, ValueError) as e: | except (KeyError, TypeError, ValueError) as e: | ||||
raise StorageArgumentException(*e.args) | raise StorageArgumentException(*e.args) | ||||
self.journal_writer.origin_visit_add(visit) | |||||
if self.journal_writer: | |||||
self.journal_writer.write_addition('origin_visit', visit) | |||||
self._cql_runner.origin_visit_add_one(visit) | self._cql_runner.origin_visit_add_one(visit) | ||||
return { | return { | ||||
'origin': origin_url, | 'origin': origin_url, | ||||
'visit': visit_id, | 'visit': visit_id, | ||||
} | } | ||||
Show All 19 Lines | def origin_visit_update( | ||||
if snapshot: | if snapshot: | ||||
updates['snapshot'] = snapshot | updates['snapshot'] = snapshot | ||||
try: | try: | ||||
visit = attr.evolve(visit, **updates) | visit = attr.evolve(visit, **updates) | ||||
except (KeyError, TypeError, ValueError) as e: | except (KeyError, TypeError, ValueError) as e: | ||||
raise StorageArgumentException(*e.args) | raise StorageArgumentException(*e.args) | ||||
if self.journal_writer: | self.journal_writer.origin_visit_update(visit) | ||||
self.journal_writer.write_update('origin_visit', visit) | |||||
self._cql_runner.origin_visit_update(origin_url, visit_id, updates) | self._cql_runner.origin_visit_update(origin_url, visit_id, updates) | ||||
def origin_visit_upsert(self, visits): | def origin_visit_upsert(self, visits): | ||||
visits = [visit.copy() for visit in visits] | visits = [visit.copy() for visit in visits] | ||||
for visit in visits: | for visit in visits: | ||||
if isinstance(visit['date'], str): | if isinstance(visit['date'], str): | ||||
visit['date'] = dateutil.parser.parse(visit['date']) | visit['date'] = dateutil.parser.parse(visit['date']) | ||||
if self.journal_writer: | self.journal_writer.origin_visit_upsert(visits) | ||||
for visit in visits: | |||||
self.journal_writer.write_addition('origin_visit', visit) | |||||
for visit in visits: | for visit in visits: | ||||
visit = visit.copy() | visit = visit.copy() | ||||
if visit.get('metadata'): | if visit.get('metadata'): | ||||
visit['metadata'] = json.dumps(visit['metadata']) | visit['metadata'] = json.dumps(visit['metadata']) | ||||
self._cql_runner.origin_visit_upsert(visit) | self._cql_runner.origin_visit_upsert(visit) | ||||
@staticmethod | @staticmethod | ||||
▲ Show 20 Lines • Show All 118 Lines • Show Last 20 Lines |