Changeset View
Changeset View
Standalone View
Standalone View
swh/storage/cassandra/storage.py
# Copyright (C) 2019-2020 The Software Heritage developers | # Copyright (C) 2019-2020 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
import datetime | import datetime | ||||
import json | import json | ||||
import random | import random | ||||
import re | import re | ||||
from typing import Any, Dict, List, Optional | from typing import Any, Dict, List, Iterable, Optional, Union | ||||
import uuid | import uuid | ||||
import attr | import attr | ||||
import dateutil | import dateutil | ||||
from swh.model.model import ( | from swh.model.model import ( | ||||
Revision, Release, Directory, DirectoryEntry, Content, SkippedContent, | Revision, Release, Directory, DirectoryEntry, Content, SkippedContent, | ||||
OriginVisit, Snapshot | OriginVisit, Snapshot, Origin | ||||
) | ) | ||||
from swh.objstorage import get_objstorage | from swh.objstorage import get_objstorage | ||||
from swh.objstorage.exc import ObjNotFoundError | from swh.objstorage.exc import ObjNotFoundError | ||||
try: | try: | ||||
from swh.journal.writer import get_journal_writer | from swh.journal.writer import get_journal_writer | ||||
except ImportError: | except ImportError: | ||||
get_journal_writer = None # type: ignore | get_journal_writer = None # type: ignore | ||||
# mypy limitation, see https://github.com/python/mypy/issues/1153 | # mypy limitation, see https://github.com/python/mypy/issues/1153 | ||||
Show All 29 Lines | def __init__(self, hosts, keyspace, objstorage, | ||||
else: | else: | ||||
self.journal_writer = None | self.journal_writer = None | ||||
def check_config(self, *, check_write): | def check_config(self, *, check_write): | ||||
self._cql_runner.check_read() | self._cql_runner.check_read() | ||||
return True | return True | ||||
def _content_add(self, contents, with_data): | def _content_add(self, contents: List[Content], with_data: bool) -> Dict: | ||||
ardumont: -> Dict | |||||
try: | |||||
contents = [Content.from_dict(c) for c in contents] | |||||
except (KeyError, TypeError, ValueError) as e: | |||||
raise StorageArgumentException(*e.args) | |||||
# Filter-out content already in the database. | # Filter-out content already in the database. | ||||
contents = [c for c in contents | contents = [c for c in contents | ||||
if not self._cql_runner.content_get_from_pk(c.to_dict())] | if not self._cql_runner.content_get_from_pk(c.to_dict())] | ||||
if self.journal_writer: | if self.journal_writer: | ||||
for content in contents: | for content in contents: | ||||
content = content.to_dict() | cont = content.to_dict() | ||||
if 'data' in content: | if 'data' in cont: | ||||
del content['data'] | del cont['data'] | ||||
self.journal_writer.write_addition('content', content) | self.journal_writer.write_addition('content', cont) | ||||
count_contents = 0 | count_contents = 0 | ||||
count_content_added = 0 | count_content_added = 0 | ||||
count_content_bytes_added = 0 | count_content_bytes_added = 0 | ||||
for content in contents: | for content in contents: | ||||
# First insert to the objstorage, if the endpoint is | # First insert to the objstorage, if the endpoint is | ||||
# `content_add` (as opposed to `content_add_metadata`). | # `content_add` (as opposed to `content_add_metadata`). | ||||
# TODO: this should probably be done in concurrently to inserting | # TODO: this should probably be done in concurrently to inserting | ||||
# in index tables (but still before the main table; so an entry is | # in index tables (but still before the main table; so an entry is | ||||
# only added to the main table after everything else was | # only added to the main table after everything else was | ||||
# successfully inserted. | # successfully inserted. | ||||
count_contents += 1 | count_contents += 1 | ||||
if content.status != 'absent': | if content.status != 'absent': | ||||
count_content_added += 1 | count_content_added += 1 | ||||
if with_data: | if with_data: | ||||
content_data = content.data | content_data = content.data | ||||
if content_data is None: | |||||
raise StorageArgumentException('Missing data') | |||||
count_content_bytes_added += len(content_data) | count_content_bytes_added += len(content_data) | ||||
self.objstorage.add(content_data, content.sha1) | self.objstorage.add(content_data, content.sha1) | ||||
# Then add to index tables | # Then add to index tables | ||||
for algo in HASH_ALGORITHMS: | for algo in HASH_ALGORITHMS: | ||||
self._cql_runner.content_index_add_one(algo, content) | self._cql_runner.content_index_add_one(algo, content) | ||||
# Then to the main table | # Then to the main table | ||||
Show All 18 Lines | def _content_add(self, contents: List[Content], with_data: bool) -> Dict: | ||||
'content:add': count_content_added, | 'content:add': count_content_added, | ||||
} | } | ||||
if with_data: | if with_data: | ||||
summary['content:add:bytes'] = count_content_bytes_added | summary['content:add:bytes'] = count_content_bytes_added | ||||
return summary | return summary | ||||
def content_add(self, content): | def content_add(self, content: Iterable[Content]) -> Dict: | ||||
content = [c.copy() for c in content] # semi-shallow copy | return self._content_add(list(content), with_data=True) | ||||
for item in content: | |||||
item['ctime'] = now() | |||||
return self._content_add(content, with_data=True) | |||||
def content_update(self, content, keys=[]): | def content_update(self, content, keys=[]): | ||||
raise NotImplementedError( | raise NotImplementedError( | ||||
'content_update is not supported by the Cassandra backend') | 'content_update is not supported by the Cassandra backend') | ||||
def content_add_metadata(self, content): | def content_add_metadata(self, content: Iterable[Content]) -> Dict: | ||||
return self._content_add(content, with_data=False) | return self._content_add(list(content), with_data=False) | ||||
def content_get(self, content): | def content_get(self, content): | ||||
if len(content) > BULK_BLOCK_CONTENT_LEN_MAX: | if len(content) > BULK_BLOCK_CONTENT_LEN_MAX: | ||||
raise StorageArgumentException( | raise StorageArgumentException( | ||||
"Sending at most %s contents." % BULK_BLOCK_CONTENT_LEN_MAX) | "Sending at most %s contents." % BULK_BLOCK_CONTENT_LEN_MAX) | ||||
for obj_id in content: | for obj_id in content: | ||||
try: | try: | ||||
data = self.objstorage.get(obj_id) | data = self.objstorage.get(obj_id) | ||||
▲ Show 20 Lines • Show All 108 Lines • ▼ Show 20 Lines | class CassandraStorage: | ||||
def content_missing_per_sha1_git(self, contents): | def content_missing_per_sha1_git(self, contents): | ||||
return self.content_missing([{'sha1_git': c for c in contents}], | return self.content_missing([{'sha1_git': c for c in contents}], | ||||
key_hash='sha1_git') | key_hash='sha1_git') | ||||
def content_get_random(self): | def content_get_random(self): | ||||
return self._cql_runner.content_get_random().sha1_git | return self._cql_runner.content_get_random().sha1_git | ||||
def _skipped_content_add(self, contents): | def _skipped_content_add(self, contents: Iterable[SkippedContent]) -> Dict: | ||||
try: | |||||
contents = [SkippedContent.from_dict(c) for c in contents] | |||||
except (KeyError, TypeError, ValueError) as e: | |||||
raise StorageArgumentException(*e.args) | |||||
# Filter-out content already in the database. | # Filter-out content already in the database. | ||||
contents = [ | contents = [ | ||||
c for c in contents | c for c in contents | ||||
if not self._cql_runner.skipped_content_get_from_pk(c.to_dict())] | if not self._cql_runner.skipped_content_get_from_pk(c.to_dict())] | ||||
if self.journal_writer: | if self.journal_writer: | ||||
for content in contents: | for content in contents: | ||||
content = content.to_dict() | cont = content.to_dict() | ||||
if 'data' in content: | if 'data' in cont: | ||||
del content['data'] | del cont['data'] | ||||
self.journal_writer.write_addition('content', content) | self.journal_writer.write_addition('content', cont) | ||||
for content in contents: | for content in contents: | ||||
# Add to index tables | # Add to index tables | ||||
for algo in HASH_ALGORITHMS: | for algo in HASH_ALGORITHMS: | ||||
if content.get_hash(algo) is not None: | if content.get_hash(algo) is not None: | ||||
self._cql_runner.skipped_content_index_add_one( | self._cql_runner.skipped_content_index_add_one( | ||||
algo, content) | algo, content) | ||||
# Then to the main table | # Then to the main table | ||||
self._cql_runner.skipped_content_add_one(content) | self._cql_runner.skipped_content_add_one(content) | ||||
return { | return { | ||||
'skipped_content:add': len(contents) | 'skipped_content:add': len(contents) | ||||
} | } | ||||
def skipped_content_add(self, content): | def skipped_content_add(self, content: Iterable[SkippedContent]) -> Dict: | ||||
content = [c.copy() for c in content] # semi-shallow copy | |||||
for item in content: | |||||
item['ctime'] = now() | |||||
return self._skipped_content_add(content) | return self._skipped_content_add(content) | ||||
def skipped_content_missing(self, contents): | def skipped_content_missing(self, contents): | ||||
for content in contents: | for content in contents: | ||||
if not self._cql_runner.skipped_content_get_from_pk(content): | if not self._cql_runner.skipped_content_get_from_pk(content): | ||||
yield content | yield content | ||||
def directory_add(self, directories): | def directory_add(self, directories: Iterable[Directory]) -> Dict: | ||||
directories = list(directories) | directories = list(directories) | ||||
# Filter out directories that are already inserted. | # Filter out directories that are already inserted. | ||||
missing = self.directory_missing([dir_['id'] for dir_ in directories]) | missing = self.directory_missing([dir_.id for dir_ in directories]) | ||||
directories = [dir_ for dir_ in directories if dir_['id'] in missing] | directories = [dir_ for dir_ in directories if dir_.id in missing] | ||||
if self.journal_writer: | if self.journal_writer: | ||||
self.journal_writer.write_additions('directory', directories) | self.journal_writer.write_additions('directory', directories) | ||||
for directory in directories: | for directory in directories: | ||||
try: | |||||
directory = Directory.from_dict(directory) | |||||
except (KeyError, TypeError, ValueError) as e: | |||||
raise StorageArgumentException(*e.args) | |||||
# Add directory entries to the 'directory_entry' table | # Add directory entries to the 'directory_entry' table | ||||
for entry in directory.entries: | for entry in directory.entries: | ||||
entry = entry.to_dict() | self._cql_runner.directory_entry_add_one({ | ||||
entry['directory_id'] = directory.id | **entry.to_dict(), | ||||
self._cql_runner.directory_entry_add_one(entry) | 'directory_id': directory.id | ||||
}) | |||||
# Add the directory *after* adding all the entries, so someone | # Add the directory *after* adding all the entries, so someone | ||||
# calling snapshot_get_branch in the meantime won't end up | # calling snapshot_get_branch in the meantime won't end up | ||||
# with half the entries. | # with half the entries. | ||||
self._cql_runner.directory_add_one(directory.id) | self._cql_runner.directory_add_one(directory.id) | ||||
return {'directory:add': len(missing)} | return {'directory:add': len(missing)} | ||||
▲ Show 20 Lines • Show All 72 Lines • ▼ Show 20 Lines | def _directory_entry_get_by_path(self, directory, paths, prefix): | ||||
first_item['target'], paths[1:], prefix + paths[0] + b'/') | first_item['target'], paths[1:], prefix + paths[0] + b'/') | ||||
def directory_ls(self, directory, recursive=False): | def directory_ls(self, directory, recursive=False): | ||||
yield from self._directory_ls(directory, recursive) | yield from self._directory_ls(directory, recursive) | ||||
def directory_get_random(self): | def directory_get_random(self): | ||||
return self._cql_runner.directory_get_random().id | return self._cql_runner.directory_get_random().id | ||||
def revision_add(self, revisions): | def revision_add(self, revisions: Iterable[Revision]) -> Dict: | ||||
revisions = list(revisions) | revisions = list(revisions) | ||||
# Filter-out revisions already in the database | # Filter-out revisions already in the database | ||||
missing = self.revision_missing([rev['id'] for rev in revisions]) | missing = self.revision_missing([rev.id for rev in revisions]) | ||||
revisions = [rev for rev in revisions if rev['id'] in missing] | revisions = [rev for rev in revisions if rev.id in missing] | ||||
if self.journal_writer: | if self.journal_writer: | ||||
self.journal_writer.write_additions('revision', revisions) | self.journal_writer.write_additions('revision', revisions) | ||||
for revision in revisions: | for revision in revisions: | ||||
try: | |||||
revision = revision_to_db(revision) | revision = revision_to_db(revision) | ||||
except (KeyError, TypeError, ValueError) as e: | |||||
raise StorageArgumentException(*e.args) | |||||
if revision: | if revision: | ||||
# Add parents first | # Add parents first | ||||
for (rank, parent) in enumerate(revision.parents): | for (rank, parent) in enumerate(revision.parents): | ||||
self._cql_runner.revision_parent_add_one( | self._cql_runner.revision_parent_add_one( | ||||
revision.id, rank, parent) | revision.id, rank, parent) | ||||
# Then write the main revision row. | # Then write the main revision row. | ||||
# Writing this after all parents were written ensures that | # Writing this after all parents were written ensures that | ||||
▲ Show 20 Lines • Show All 68 Lines • ▼ Show 20 Lines | class CassandraStorage: | ||||
def revision_shortlog(self, revisions, limit=None): | def revision_shortlog(self, revisions, limit=None): | ||||
seen = set() | seen = set() | ||||
yield from self._get_parent_revs(revisions, seen, limit, True) | yield from self._get_parent_revs(revisions, seen, limit, True) | ||||
def revision_get_random(self): | def revision_get_random(self): | ||||
return self._cql_runner.revision_get_random().id | return self._cql_runner.revision_get_random().id | ||||
def release_add(self, releases): | def release_add(self, releases: Iterable[Release]) -> Dict: | ||||
releases = list(releases) | missing = self.release_missing([rel.id for rel in releases]) | ||||
missing = self.release_missing([rel['id'] for rel in releases]) | releases = [rel for rel in releases if rel.id in missing] | ||||
releases = [rel for rel in releases if rel['id'] in missing] | |||||
if self.journal_writer: | if self.journal_writer: | ||||
self.journal_writer.write_additions('release', releases) | self.journal_writer.write_additions('release', releases) | ||||
for release in releases: | for release in releases: | ||||
try: | |||||
release = release_to_db(release) | |||||
except (KeyError, TypeError, ValueError) as e: | |||||
raise StorageArgumentException(*e.args) | |||||
if release: | if release: | ||||
release = release_to_db(release) | |||||
self._cql_runner.release_add_one(release) | self._cql_runner.release_add_one(release) | ||||
return {'release:add': len(missing)} | return {'release:add': len(missing)} | ||||
def release_missing(self, releases): | def release_missing(self, releases): | ||||
return self._cql_runner.release_missing(releases) | return self._cql_runner.release_missing(releases) | ||||
def release_get(self, releases): | def release_get(self, releases): | ||||
rows = self._cql_runner.release_get(releases) | rows = self._cql_runner.release_get(releases) | ||||
rels = {} | rels = {} | ||||
for row in rows: | for row in rows: | ||||
release = Release(**row._asdict()) | release = Release(**row._asdict()) | ||||
release = release_from_db(release) | release = release_from_db(release) | ||||
rels[row.id] = release.to_dict() | rels[row.id] = release.to_dict() | ||||
for rel_id in releases: | for rel_id in releases: | ||||
yield rels.get(rel_id) | yield rels.get(rel_id) | ||||
def release_get_random(self): | def release_get_random(self): | ||||
return self._cql_runner.release_get_random().id | return self._cql_runner.release_get_random().id | ||||
def snapshot_add(self, snapshots): | def snapshot_add(self, snapshots: Iterable[Snapshot]) -> Dict: | ||||
try: | |||||
snapshots = [Snapshot.from_dict(snap) for snap in snapshots] | |||||
except (KeyError, TypeError, ValueError) as e: | |||||
raise StorageArgumentException(*e.args) | |||||
missing = self._cql_runner.snapshot_missing( | missing = self._cql_runner.snapshot_missing( | ||||
[snp.id for snp in snapshots]) | [snp.id for snp in snapshots]) | ||||
snapshots = [snp for snp in snapshots if snp.id in missing] | snapshots = [snp for snp in snapshots if snp.id in missing] | ||||
for snapshot in snapshots: | for snapshot in snapshots: | ||||
if self.journal_writer: | if self.journal_writer: | ||||
self.journal_writer.write_addition('snapshot', snapshot) | self.journal_writer.write_addition('snapshot', snapshot) | ||||
# Add branches | # Add branches | ||||
for (branch_name, branch) in snapshot.branches.items(): | for (branch_name, branch) in snapshot.branches.items(): | ||||
if branch is None: | if branch is None: | ||||
target_type = None | target_type = None | ||||
target = None | target = None | ||||
else: | else: | ||||
target_type = branch.target_type.value | target_type = branch.target_type.value | ||||
target = branch.target | target = branch.target | ||||
branch = { | self._cql_runner.snapshot_branch_add_one({ | ||||
'snapshot_id': snapshot.id, | 'snapshot_id': snapshot.id, | ||||
'name': branch_name, | 'name': branch_name, | ||||
'target_type': target_type, | 'target_type': target_type, | ||||
'target': target, | 'target': target, | ||||
} | }) | ||||
self._cql_runner.snapshot_branch_add_one(branch) | |||||
# Add the snapshot *after* adding all the branches, so someone | # Add the snapshot *after* adding all the branches, so someone | ||||
# calling snapshot_get_branch in the meantime won't end up | # calling snapshot_get_branch in the meantime won't end up | ||||
# with half the branches. | # with half the branches. | ||||
self._cql_runner.snapshot_add_one(snapshot.id) | self._cql_runner.snapshot_add_one(snapshot.id) | ||||
return {'snapshot:add': len(snapshots)} | return {'snapshot:add': len(snapshots)} | ||||
▲ Show 20 Lines • Show All 201 Lines • ▼ Show 20 Lines | def origin_search(self, url_pattern, offset=0, limit=50, | ||||
if orig.next_visit_id > 1] | if orig.next_visit_id > 1] | ||||
return [ | return [ | ||||
{ | { | ||||
'url': orig.url, | 'url': orig.url, | ||||
} | } | ||||
for orig in origins[offset:offset+limit]] | for orig in origins[offset:offset+limit]] | ||||
def origin_add(self, origins): | def origin_add(self, origins: Iterable[Origin]) -> List[Dict]: | ||||
origins = list(origins) | |||||
if any('id' in origin for origin in origins): | |||||
raise StorageArgumentException( | |||||
'Origins must not already have an id.') | |||||
results = [] | results = [] | ||||
for origin in origins: | for origin in origins: | ||||
self.origin_add_one(origin) | self.origin_add_one(origin) | ||||
results.append(origin) | results.append(origin.to_dict()) | ||||
return results | return results | ||||
def origin_add_one(self, origin): | def origin_add_one(self, origin: Origin) -> str: | ||||
known_origin = self.origin_get_one(origin) | known_origin = self.origin_get_one(origin.to_dict()) | ||||
if known_origin: | if known_origin: | ||||
origin_url = known_origin['url'] | origin_url = known_origin['url'] | ||||
else: | else: | ||||
if self.journal_writer: | if self.journal_writer: | ||||
self.journal_writer.write_addition('origin', origin) | self.journal_writer.write_addition('origin', origin) | ||||
self._cql_runner.origin_add_one(origin) | self._cql_runner.origin_add_one(origin) | ||||
origin_url = origin['url'] | origin_url = origin.url | ||||
return origin_url | return origin_url | ||||
def origin_visit_add(self, origin, date, type): | def origin_visit_add( | ||||
self, origin, date, type) -> Optional[Dict[str, Union[str, int]]]: | |||||
origin_url = origin # TODO: rename the argument | origin_url = origin # TODO: rename the argument | ||||
if isinstance(date, str): | if isinstance(date, str): | ||||
date = dateutil.parser.parse(date) | date = dateutil.parser.parse(date) | ||||
origin = self.origin_get_one({'url': origin_url}) | origin = self.origin_get_one({'url': origin_url}) | ||||
if not origin: | if not origin: | ||||
return None | return None | ||||
visit_id = self._cql_runner.origin_generate_unique_visit_id(origin_url) | visit_id = self._cql_runner.origin_generate_unique_visit_id(origin_url) | ||||
visit = { | try: | ||||
visit = OriginVisit.from_dict({ | |||||
'origin': origin_url, | 'origin': origin_url, | ||||
'date': date, | 'date': date, | ||||
'type': type, | 'type': type, | ||||
'status': 'ongoing', | 'status': 'ongoing', | ||||
'snapshot': None, | 'snapshot': None, | ||||
'metadata': None, | 'metadata': None, | ||||
'visit': visit_id | 'visit': visit_id | ||||
} | }) | ||||
except (KeyError, TypeError, ValueError) as e: | |||||
raise StorageArgumentException(*e.args) | |||||
if self.journal_writer: | if self.journal_writer: | ||||
self.journal_writer.write_addition('origin_visit', visit) | self.journal_writer.write_addition('origin_visit', visit) | ||||
try: | |||||
visit = OriginVisit.from_dict(visit) | |||||
except (KeyError, TypeError, ValueError) as e: | |||||
raise StorageArgumentException(*e.args) | |||||
self._cql_runner.origin_visit_add_one(visit) | self._cql_runner.origin_visit_add_one(visit) | ||||
return { | return { | ||||
'origin': origin_url, | 'origin': origin_url, | ||||
'visit': visit_id, | 'visit': visit_id, | ||||
} | } | ||||
def origin_visit_update(self, origin, visit_id, status=None, | def origin_visit_update( | ||||
metadata=None, snapshot=None): | self, origin: str, visit_id: int, status: Optional[str] = None, | ||||
metadata: Optional[Dict] = None, snapshot: Optional[bytes] = None): | |||||
origin_url = origin # TODO: rename the argument | origin_url = origin # TODO: rename the argument | ||||
# Get the existing data of the visit | # Get the existing data of the visit | ||||
row = self._cql_runner.origin_visit_get_one(origin_url, visit_id) | row = self._cql_runner.origin_visit_get_one(origin_url, visit_id) | ||||
if not row: | if not row: | ||||
raise StorageArgumentException('This origin visit does not exist.') | raise StorageArgumentException('This origin visit does not exist.') | ||||
try: | try: | ||||
visit = OriginVisit.from_dict(self._format_origin_visit_row(row)) | visit = OriginVisit.from_dict(self._format_origin_visit_row(row)) | ||||
except (KeyError, TypeError, ValueError) as e: | except (KeyError, TypeError, ValueError) as e: | ||||
raise StorageArgumentException(*e.args) | raise StorageArgumentException(*e.args) | ||||
updates = {} | updates: Dict[str, Any] = {} | ||||
if status: | if status: | ||||
updates['status'] = status | updates['status'] = status | ||||
if metadata: | if metadata: | ||||
updates['metadata'] = metadata | updates['metadata'] = metadata | ||||
if snapshot: | if snapshot: | ||||
updates['snapshot'] = snapshot | updates['snapshot'] = snapshot | ||||
try: | try: | ||||
▲ Show 20 Lines • Show All 144 Lines • Show Last 20 Lines |
-> Dict