Changeset View
Changeset View
Standalone View
Standalone View
swh/storage/cassandra/storage.py
# Copyright (C) 2019-2020 The Software Heritage developers | # Copyright (C) 2019-2020 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
import datetime | import datetime | ||||
import json | import json | ||||
import random | import random | ||||
import re | import re | ||||
from typing import Any, Dict, List, Optional | from typing import Any, Dict, List, Optional | ||||
import uuid | import uuid | ||||
import attr | import attr | ||||
import dateutil | import dateutil | ||||
from swh.model.model import ( | from swh.model.model import ( | ||||
Revision, Release, Directory, DirectoryEntry, Content, OriginVisit, | Revision, Release, Directory, DirectoryEntry, Content, SkippedContent, | ||||
OriginVisit, | |||||
) | ) | ||||
from swh.objstorage import get_objstorage | from swh.objstorage import get_objstorage | ||||
from swh.objstorage.exc import ObjNotFoundError | from swh.objstorage.exc import ObjNotFoundError | ||||
try: | try: | ||||
from swh.journal.writer import get_journal_writer | from swh.journal.writer import get_journal_writer | ||||
except ImportError: | except ImportError: | ||||
get_journal_writer = None # type: ignore | get_journal_writer = None # type: ignore | ||||
# mypy limitation, see https://github.com/python/mypy/issues/1153 | # mypy limitation, see https://github.com/python/mypy/issues/1153 | ||||
▲ Show 20 Lines • Show All 85 Lines • ▼ Show 20 Lines | def _content_add(self, contents, with_data): | ||||
algo, content.get_hash(algo)) | algo, content.get_hash(algo)) | ||||
if len(pks) > 1: | if len(pks) > 1: | ||||
# There are more than the one we just inserted. | # There are more than the one we just inserted. | ||||
from .. import HashCollision | from .. import HashCollision | ||||
raise HashCollision(algo, content.get_hash(algo), pks) | raise HashCollision(algo, content.get_hash(algo), pks) | ||||
summary = { | summary = { | ||||
'content:add': count_content_added, | 'content:add': count_content_added, | ||||
'skipped_content:add': count_contents - count_content_added, | |||||
} | } | ||||
if with_data: | if with_data: | ||||
summary['content:add:bytes'] = count_content_bytes_added | summary['content:add:bytes'] = count_content_bytes_added | ||||
return summary | return summary | ||||
def content_add(self, content): | def content_add(self, content): | ||||
▲ Show 20 Lines • Show All 75 Lines • ▼ Show 20 Lines | def content_get_metadata( | ||||
# Rows in 'content' are inserted after corresponding | # Rows in 'content' are inserted after corresponding | ||||
# rows in 'content_by_*', so we might be missing it | # rows in 'content_by_*', so we might be missing it | ||||
if res: | if res: | ||||
content_metadata = res._asdict() | content_metadata = res._asdict() | ||||
content_metadata.pop('ctime') | content_metadata.pop('ctime') | ||||
result[content_metadata['sha1']].append(content_metadata) | result[content_metadata['sha1']].append(content_metadata) | ||||
return result | return result | ||||
def skipped_content_missing(self, contents): | |||||
# TODO | |||||
raise NotImplementedError('not yet supported for Cassandra') | |||||
def content_find(self, content): | def content_find(self, content): | ||||
# Find an algorithm that is common to all the requested contents. | # Find an algorithm that is common to all the requested contents. | ||||
# It will be used to do an initial filtering efficiently. | # It will be used to do an initial filtering efficiently. | ||||
filter_algos = list(set(content).intersection(HASH_ALGORITHMS)) | filter_algos = list(set(content).intersection(HASH_ALGORITHMS)) | ||||
if not filter_algos: | if not filter_algos: | ||||
raise ValueError('content keys must contain at least one of: ' | raise ValueError('content keys must contain at least one of: ' | ||||
'%s' % ', '.join(sorted(HASH_ALGORITHMS))) | '%s' % ', '.join(sorted(HASH_ALGORITHMS))) | ||||
common_algo = filter_algos[0] | common_algo = filter_algos[0] | ||||
Show All 35 Lines | class CassandraStorage: | ||||
def content_missing_per_sha1_git(self, contents): | def content_missing_per_sha1_git(self, contents): | ||||
return self.content_missing([{'sha1_git': c for c in contents}], | return self.content_missing([{'sha1_git': c for c in contents}], | ||||
key_hash='sha1_git') | key_hash='sha1_git') | ||||
def content_get_random(self): | def content_get_random(self): | ||||
return self._cql_runner.content_get_random().sha1_git | return self._cql_runner.content_get_random().sha1_git | ||||
def _skipped_content_add(self, contents): | |||||
contents = [SkippedContent.from_dict(c) for c in contents] | |||||
# Filter-out content already in the database. | |||||
contents = [ | |||||
c for c in contents | |||||
if not self._cql_runner.skipped_content_get_from_pk(c.to_dict())] | |||||
if self.journal_writer: | |||||
for content in contents: | |||||
content = content.to_dict() | |||||
if 'data' in content: | |||||
del content['data'] | |||||
self.journal_writer.write_addition('content', content) | |||||
for content in contents: | |||||
# Add to index tables | |||||
for algo in HASH_ALGORITHMS: | |||||
if content.get_hash(algo) is not None: | |||||
self._cql_runner.skipped_content_index_add_one( | |||||
algo, content) | |||||
# Then to the main table | |||||
self._cql_runner.skipped_content_add_one(content) | |||||
return { | |||||
'skipped_content:add': len(contents) | |||||
} | |||||
def skipped_content_add(self, content): | |||||
content = [c.copy() for c in content] # semi-shallow copy | |||||
for item in content: | |||||
item['ctime'] = now() | |||||
return self._skipped_content_add(content) | |||||
def skipped_content_missing(self, contents): | |||||
for content in contents: | |||||
if not self._cql_runner.skipped_content_get_from_pk(content): | |||||
yield content | |||||
def directory_add(self, directories): | def directory_add(self, directories): | ||||
directories = list(directories) | directories = list(directories) | ||||
# Filter out directories that are already inserted. | # Filter out directories that are already inserted. | ||||
missing = self.directory_missing([dir_['id'] for dir_ in directories]) | missing = self.directory_missing([dir_['id'] for dir_ in directories]) | ||||
directories = [dir_ for dir_ in directories if dir_['id'] in missing] | directories = [dir_ for dir_ in directories if dir_['id'] in missing] | ||||
if self.journal_writer: | if self.journal_writer: | ||||
▲ Show 20 Lines • Show All 679 Lines • Show Last 20 Lines |