Changeset View
Changeset View
Standalone View
Standalone View
swh/storage/cassandra/storage.py
Show All 9 Lines | |||||
from typing import Any, Dict, List, Optional | from typing import Any, Dict, List, Optional | ||||
import uuid | import uuid | ||||
import attr | import attr | ||||
import dateutil | import dateutil | ||||
from swh.model.model import ( | from swh.model.model import ( | ||||
Revision, Release, Directory, DirectoryEntry, Content, SkippedContent, | Revision, Release, Directory, DirectoryEntry, Content, SkippedContent, | ||||
OriginVisit, | OriginVisit, Snapshot | ||||
) | ) | ||||
from swh.objstorage import get_objstorage | from swh.objstorage import get_objstorage | ||||
from swh.objstorage.exc import ObjNotFoundError | from swh.objstorage.exc import ObjNotFoundError | ||||
try: | try: | ||||
from swh.journal.writer import get_journal_writer | from swh.journal.writer import get_journal_writer | ||||
except ImportError: | except ImportError: | ||||
get_journal_writer = None # type: ignore | get_journal_writer = None # type: ignore | ||||
# mypy limitation, see https://github.com/python/mypy/issues/1153 | # mypy limitation, see https://github.com/python/mypy/issues/1153 | ||||
from .. import HashCollision | |||||
from ..exc import StorageArgumentException | |||||
from .common import TOKEN_BEGIN, TOKEN_END | from .common import TOKEN_BEGIN, TOKEN_END | ||||
from .converters import ( | from .converters import ( | ||||
revision_to_db, revision_from_db, release_to_db, release_from_db, | revision_to_db, revision_from_db, release_to_db, release_from_db, | ||||
) | ) | ||||
from .cql import CqlRunner | from .cql import CqlRunner | ||||
from .schema import HASH_ALGORITHMS | from .schema import HASH_ALGORITHMS | ||||
Show All 18 Lines | def __init__(self, hosts, keyspace, objstorage, | ||||
self.journal_writer = None | self.journal_writer = None | ||||
def check_config(self, *, check_write): | def check_config(self, *, check_write): | ||||
self._cql_runner.check_read() | self._cql_runner.check_read() | ||||
return True | return True | ||||
def _content_add(self, contents, with_data): | def _content_add(self, contents, with_data): | ||||
try: | |||||
contents = [Content.from_dict(c) for c in contents] | contents = [Content.from_dict(c) for c in contents] | ||||
except (KeyError, TypeError, ValueError) as e: | |||||
raise StorageArgumentException(*e.args) | |||||
# Filter-out content already in the database. | # Filter-out content already in the database. | ||||
contents = [c for c in contents | contents = [c for c in contents | ||||
if not self._cql_runner.content_get_from_pk(c.to_dict())] | if not self._cql_runner.content_get_from_pk(c.to_dict())] | ||||
if self.journal_writer: | if self.journal_writer: | ||||
for content in contents: | for content in contents: | ||||
content = content.to_dict() | content = content.to_dict() | ||||
Show All 35 Lines | def _content_add(self, contents, with_data): | ||||
# The proper way to do it would probably be a BATCH, but this | # The proper way to do it would probably be a BATCH, but this | ||||
# would be inefficient because of the number of partitions we | # would be inefficient because of the number of partitions we | ||||
# need to affect (len(HASH_ALGORITHMS)+1, which is currently 5) | # need to affect (len(HASH_ALGORITHMS)+1, which is currently 5) | ||||
for algo in {'sha1', 'sha1_git'}: | for algo in {'sha1', 'sha1_git'}: | ||||
pks = self._cql_runner.content_get_pks_from_single_hash( | pks = self._cql_runner.content_get_pks_from_single_hash( | ||||
algo, content.get_hash(algo)) | algo, content.get_hash(algo)) | ||||
if len(pks) > 1: | if len(pks) > 1: | ||||
# There are more than the one we just inserted. | # There are more than the one we just inserted. | ||||
from .. import HashCollision | |||||
raise HashCollision(algo, content.get_hash(algo), pks) | raise HashCollision(algo, content.get_hash(algo), pks) | ||||
summary = { | summary = { | ||||
'content:add': count_content_added, | 'content:add': count_content_added, | ||||
} | } | ||||
if with_data: | if with_data: | ||||
summary['content:add:bytes'] = count_content_bytes_added | summary['content:add:bytes'] = count_content_bytes_added | ||||
Show All 10 Lines | def content_update(self, content, keys=[]): | ||||
raise NotImplementedError( | raise NotImplementedError( | ||||
'content_update is not supported by the Cassandra backend') | 'content_update is not supported by the Cassandra backend') | ||||
def content_add_metadata(self, content): | def content_add_metadata(self, content): | ||||
return self._content_add(content, with_data=False) | return self._content_add(content, with_data=False) | ||||
def content_get(self, content): | def content_get(self, content): | ||||
if len(content) > BULK_BLOCK_CONTENT_LEN_MAX: | if len(content) > BULK_BLOCK_CONTENT_LEN_MAX: | ||||
raise ValueError( | raise StorageArgumentException( | ||||
"Sending at most %s contents." % BULK_BLOCK_CONTENT_LEN_MAX) | "Sending at most %s contents." % BULK_BLOCK_CONTENT_LEN_MAX) | ||||
for obj_id in content: | for obj_id in content: | ||||
try: | try: | ||||
data = self.objstorage.get(obj_id) | data = self.objstorage.get(obj_id) | ||||
except ObjNotFoundError: | except ObjNotFoundError: | ||||
yield None | yield None | ||||
continue | continue | ||||
yield {'sha1': obj_id, 'data': data} | yield {'sha1': obj_id, 'data': data} | ||||
def content_get_partition( | def content_get_partition( | ||||
self, partition_id: int, nb_partitions: int, limit: int = 1000, | self, partition_id: int, nb_partitions: int, limit: int = 1000, | ||||
page_token: str = None): | page_token: str = None): | ||||
if limit is None: | if limit is None: | ||||
raise ValueError('Development error: limit should not be None') | raise StorageArgumentException('limit should not be None') | ||||
# Compute start and end of the range of tokens covered by the | # Compute start and end of the range of tokens covered by the | ||||
# requested partition | # requested partition | ||||
partition_size = (TOKEN_END-TOKEN_BEGIN)//nb_partitions | partition_size = (TOKEN_END-TOKEN_BEGIN)//nb_partitions | ||||
range_start = TOKEN_BEGIN + partition_id*partition_size | range_start = TOKEN_BEGIN + partition_id*partition_size | ||||
range_end = TOKEN_BEGIN + (partition_id+1)*partition_size | range_end = TOKEN_BEGIN + (partition_id+1)*partition_size | ||||
# offset the range start according to the `page_token`. | # offset the range start according to the `page_token`. | ||||
if page_token is not None: | if page_token is not None: | ||||
if not (range_start <= int(page_token) <= range_end): | if not (range_start <= int(page_token) <= range_end): | ||||
raise ValueError('Invalid page_token.') | raise StorageArgumentException('Invalid page_token.') | ||||
range_start = int(page_token) | range_start = int(page_token) | ||||
# Get the first rows of the range | # Get the first rows of the range | ||||
rows = self._cql_runner.content_get_token_range( | rows = self._cql_runner.content_get_token_range( | ||||
range_start, range_end, limit) | range_start, range_end, limit) | ||||
rows = list(rows) | rows = list(rows) | ||||
if len(rows) == limit: | if len(rows) == limit: | ||||
Show All 31 Lines | def content_get_metadata( | ||||
result[content_metadata['sha1']].append(content_metadata) | result[content_metadata['sha1']].append(content_metadata) | ||||
return result | return result | ||||
def content_find(self, content): | def content_find(self, content): | ||||
# Find an algorithm that is common to all the requested contents. | # Find an algorithm that is common to all the requested contents. | ||||
# It will be used to do an initial filtering efficiently. | # It will be used to do an initial filtering efficiently. | ||||
filter_algos = list(set(content).intersection(HASH_ALGORITHMS)) | filter_algos = list(set(content).intersection(HASH_ALGORITHMS)) | ||||
if not filter_algos: | if not filter_algos: | ||||
raise ValueError('content keys must contain at least one of: ' | raise StorageArgumentException( | ||||
'content keys must contain at least one of: ' | |||||
'%s' % ', '.join(sorted(HASH_ALGORITHMS))) | '%s' % ', '.join(sorted(HASH_ALGORITHMS))) | ||||
common_algo = filter_algos[0] | common_algo = filter_algos[0] | ||||
# Find all contents whose common_algo matches at least one | # Find all contents whose common_algo matches at least one | ||||
# of the requests. | # of the requests. | ||||
found_pks = self._cql_runner.content_get_pks_from_single_hash( | found_pks = self._cql_runner.content_get_pks_from_single_hash( | ||||
common_algo, content[common_algo]) | common_algo, content[common_algo]) | ||||
found_pks = [pk._asdict() for pk in found_pks] | found_pks = [pk._asdict() for pk in found_pks] | ||||
Show All 29 Lines | class CassandraStorage: | ||||
def content_missing_per_sha1_git(self, contents): | def content_missing_per_sha1_git(self, contents): | ||||
return self.content_missing([{'sha1_git': c for c in contents}], | return self.content_missing([{'sha1_git': c for c in contents}], | ||||
key_hash='sha1_git') | key_hash='sha1_git') | ||||
def content_get_random(self): | def content_get_random(self): | ||||
return self._cql_runner.content_get_random().sha1_git | return self._cql_runner.content_get_random().sha1_git | ||||
def _skipped_content_add(self, contents): | def _skipped_content_add(self, contents): | ||||
try: | |||||
contents = [SkippedContent.from_dict(c) for c in contents] | contents = [SkippedContent.from_dict(c) for c in contents] | ||||
except (KeyError, TypeError, ValueError) as e: | |||||
raise StorageArgumentException(*e.args) | |||||
# Filter-out content already in the database. | # Filter-out content already in the database. | ||||
contents = [c for c in contents | contents = [c for c in contents | ||||
if not self._cql_runner.content_get_from_pk(c.to_dict())] | if not self._cql_runner.content_get_from_pk(c.to_dict())] | ||||
if self.journal_writer: | if self.journal_writer: | ||||
for content in contents: | for content in contents: | ||||
content = content.to_dict() | content = content.to_dict() | ||||
Show All 29 Lines | def directory_add(self, directories): | ||||
# Filter out directories that are already inserted. | # Filter out directories that are already inserted. | ||||
missing = self.directory_missing([dir_['id'] for dir_ in directories]) | missing = self.directory_missing([dir_['id'] for dir_ in directories]) | ||||
directories = [dir_ for dir_ in directories if dir_['id'] in missing] | directories = [dir_ for dir_ in directories if dir_['id'] in missing] | ||||
if self.journal_writer: | if self.journal_writer: | ||||
self.journal_writer.write_additions('directory', directories) | self.journal_writer.write_additions('directory', directories) | ||||
for directory in directories: | for directory in directories: | ||||
try: | |||||
directory = Directory.from_dict(directory) | directory = Directory.from_dict(directory) | ||||
except (KeyError, TypeError, ValueError) as e: | |||||
raise StorageArgumentException(*e.args) | |||||
# Add directory entries to the 'directory_entry' table | # Add directory entries to the 'directory_entry' table | ||||
for entry in directory.entries: | for entry in directory.entries: | ||||
entry = entry.to_dict() | entry = entry.to_dict() | ||||
entry['directory_id'] = directory.id | entry['directory_id'] = directory.id | ||||
self._cql_runner.directory_entry_add_one(entry) | self._cql_runner.directory_entry_add_one(entry) | ||||
# Add the directory *after* adding all the entries, so someone | # Add the directory *after* adding all the entries, so someone | ||||
▲ Show 20 Lines • Show All 89 Lines • ▼ Show 20 Lines | def revision_add(self, revisions): | ||||
# Filter-out revisions already in the database | # Filter-out revisions already in the database | ||||
missing = self.revision_missing([rev['id'] for rev in revisions]) | missing = self.revision_missing([rev['id'] for rev in revisions]) | ||||
revisions = [rev for rev in revisions if rev['id'] in missing] | revisions = [rev for rev in revisions if rev['id'] in missing] | ||||
if self.journal_writer: | if self.journal_writer: | ||||
self.journal_writer.write_additions('revision', revisions) | self.journal_writer.write_additions('revision', revisions) | ||||
for revision in revisions: | for revision in revisions: | ||||
try: | |||||
revision = revision_to_db(revision) | revision = revision_to_db(revision) | ||||
except (KeyError, TypeError, ValueError) as e: | |||||
raise StorageArgumentException(*e.args) | |||||
if revision: | if revision: | ||||
# Add parents first | # Add parents first | ||||
for (rank, parent) in enumerate(revision.parents): | for (rank, parent) in enumerate(revision.parents): | ||||
self._cql_runner.revision_parent_add_one( | self._cql_runner.revision_parent_add_one( | ||||
revision.id, rank, parent) | revision.id, rank, parent) | ||||
# Then write the main revision row. | # Then write the main revision row. | ||||
▲ Show 20 Lines • Show All 78 Lines • ▼ Show 20 Lines | def release_add(self, releases): | ||||
releases = list(releases) | releases = list(releases) | ||||
missing = self.release_missing([rel['id'] for rel in releases]) | missing = self.release_missing([rel['id'] for rel in releases]) | ||||
releases = [rel for rel in releases if rel['id'] in missing] | releases = [rel for rel in releases if rel['id'] in missing] | ||||
if self.journal_writer: | if self.journal_writer: | ||||
self.journal_writer.write_additions('release', releases) | self.journal_writer.write_additions('release', releases) | ||||
for release in releases: | for release in releases: | ||||
try: | |||||
release = release_to_db(release) | release = release_to_db(release) | ||||
except (KeyError, TypeError, ValueError) as e: | |||||
raise StorageArgumentException(*e.args) | |||||
if release: | if release: | ||||
self._cql_runner.release_add_one(release) | self._cql_runner.release_add_one(release) | ||||
return {'release:add': len(missing)} | return {'release:add': len(missing)} | ||||
def release_missing(self, releases): | def release_missing(self, releases): | ||||
return self._cql_runner.release_missing(releases) | return self._cql_runner.release_missing(releases) | ||||
def release_get(self, releases): | def release_get(self, releases): | ||||
rows = self._cql_runner.release_get(releases) | rows = self._cql_runner.release_get(releases) | ||||
rels = {} | rels = {} | ||||
for row in rows: | for row in rows: | ||||
release = Release(**row._asdict()) | release = Release(**row._asdict()) | ||||
release = release_from_db(release) | release = release_from_db(release) | ||||
rels[row.id] = release.to_dict() | rels[row.id] = release.to_dict() | ||||
for rel_id in releases: | for rel_id in releases: | ||||
yield rels.get(rel_id) | yield rels.get(rel_id) | ||||
def release_get_random(self): | def release_get_random(self): | ||||
return self._cql_runner.release_get_random().id | return self._cql_runner.release_get_random().id | ||||
def snapshot_add(self, snapshots): | def snapshot_add(self, snapshots): | ||||
snapshots = list(snapshots) | try: | ||||
snapshots = [Snapshot.from_dict(snap) for snap in snapshots] | |||||
except (KeyError, TypeError, ValueError) as e: | |||||
raise StorageArgumentException(*e.args) | |||||
missing = self._cql_runner.snapshot_missing( | missing = self._cql_runner.snapshot_missing( | ||||
[snp['id'] for snp in snapshots]) | [snp.id for snp in snapshots]) | ||||
snapshots = [snp for snp in snapshots if snp['id'] in missing] | snapshots = [snp for snp in snapshots if snp.id in missing] | ||||
for snapshot in snapshots: | for snapshot in snapshots: | ||||
if self.journal_writer: | if self.journal_writer: | ||||
self.journal_writer.write_addition('snapshot', snapshot) | self.journal_writer.write_addition('snapshot', snapshot) | ||||
# Add branches | # Add branches | ||||
for (branch_name, branch) in snapshot['branches'].items(): | for (branch_name, branch) in snapshot.branches.items(): | ||||
if branch is None: | if branch is None: | ||||
branch = {'target_type': None, 'target': None} | target_type = None | ||||
self._cql_runner.snapshot_branch_add_one({ | target = None | ||||
'snapshot_id': snapshot['id'], | else: | ||||
target_type = branch.target_type.value | |||||
target = branch.target | |||||
branch = { | |||||
'snapshot_id': snapshot.id, | |||||
'name': branch_name, | 'name': branch_name, | ||||
'target_type': branch['target_type'], | 'target_type': target_type, | ||||
'target': branch['target'], | 'target': target, | ||||
}) | } | ||||
self._cql_runner.snapshot_branch_add_one(branch) | |||||
# Add the snapshot *after* adding all the branches, so someone | # Add the snapshot *after* adding all the branches, so someone | ||||
# calling snapshot_get_branch in the meantime won't end up | # calling snapshot_get_branch in the meantime won't end up | ||||
# with half the branches. | # with half the branches. | ||||
self._cql_runner.snapshot_add_one(snapshot['id']) | self._cql_runner.snapshot_add_one(snapshot.id) | ||||
return {'snapshot:add': len(snapshots)} | return {'snapshot:add': len(snapshots)} | ||||
def snapshot_missing(self, snapshots): | def snapshot_missing(self, snapshots): | ||||
return self._cql_runner.snapshot_missing(snapshots) | return self._cql_runner.snapshot_missing(snapshots) | ||||
def snapshot_get(self, snapshot_id): | def snapshot_get(self, snapshot_id): | ||||
return self.snapshot_get_branches(snapshot_id) | return self.snapshot_get_branches(snapshot_id) | ||||
Show All 10 Lines | def snapshot_get_latest(self, origin, allowed_statuses=None): | ||||
visit = self.origin_visit_get_latest( | visit = self.origin_visit_get_latest( | ||||
origin, | origin, | ||||
allowed_statuses=allowed_statuses, | allowed_statuses=allowed_statuses, | ||||
require_snapshot=True) | require_snapshot=True) | ||||
if visit: | if visit: | ||||
assert visit['snapshot'] | assert visit['snapshot'] | ||||
if self._cql_runner.snapshot_missing([visit['snapshot']]): | if self._cql_runner.snapshot_missing([visit['snapshot']]): | ||||
raise ValueError('Visit references unknown snapshot') | raise StorageArgumentException( | ||||
'Visit references unknown snapshot') | |||||
return self.snapshot_get_branches(visit['snapshot']) | return self.snapshot_get_branches(visit['snapshot']) | ||||
def snapshot_count_branches(self, snapshot_id): | def snapshot_count_branches(self, snapshot_id): | ||||
if self._cql_runner.snapshot_missing([snapshot_id]): | if self._cql_runner.snapshot_missing([snapshot_id]): | ||||
# Makes sure we don't fetch branches for a snapshot that is | # Makes sure we don't fetch branches for a snapshot that is | ||||
# being added. | # being added. | ||||
return None | return None | ||||
rows = list(self._cql_runner.snapshot_count_branches(snapshot_id)) | rows = list(self._cql_runner.snapshot_count_branches(snapshot_id)) | ||||
▲ Show 20 Lines • Show All 89 Lines • ▼ Show 20 Lines | def origin_get(self, origins): | ||||
if isinstance(origins, dict): | if isinstance(origins, dict): | ||||
# Old API | # Old API | ||||
return_single = True | return_single = True | ||||
origins = [origins] | origins = [origins] | ||||
else: | else: | ||||
return_single = False | return_single = False | ||||
if any('id' in origin for origin in origins): | if any('id' in origin for origin in origins): | ||||
raise ValueError('Origin ids are not supported.') | raise StorageArgumentException('Origin ids are not supported.') | ||||
results = [self.origin_get_one(origin) for origin in origins] | results = [self.origin_get_one(origin) for origin in origins] | ||||
if return_single: | if return_single: | ||||
assert len(results) == 1 | assert len(results) == 1 | ||||
return results[0] | return results[0] | ||||
else: | else: | ||||
return results | return results | ||||
def origin_get_one(self, origin): | def origin_get_one(self, origin): | ||||
if 'id' in origin: | if 'id' in origin: | ||||
raise ValueError('Origin ids are not supported.') | raise StorageArgumentException('Origin ids are not supported.') | ||||
if 'url' not in origin: | |||||
raise StorageArgumentException('Missing origin url') | |||||
rows = self._cql_runner.origin_get_by_url(origin['url']) | rows = self._cql_runner.origin_get_by_url(origin['url']) | ||||
rows = list(rows) | rows = list(rows) | ||||
if rows: | if rows: | ||||
assert len(rows) == 1 | assert len(rows) == 1 | ||||
result = rows[0]._asdict() | result = rows[0]._asdict() | ||||
return { | return { | ||||
'url': result['url'], | 'url': result['url'], | ||||
Show All 13 Lines | class CassandraStorage: | ||||
def origin_list(self, page_token: Optional[str] = None, limit: int = 100 | def origin_list(self, page_token: Optional[str] = None, limit: int = 100 | ||||
) -> dict: | ) -> dict: | ||||
# Compute what token to begin the listing from | # Compute what token to begin the listing from | ||||
start_token = TOKEN_BEGIN | start_token = TOKEN_BEGIN | ||||
if page_token: | if page_token: | ||||
start_token = int(page_token) | start_token = int(page_token) | ||||
if not (TOKEN_BEGIN <= start_token <= TOKEN_END): | if not (TOKEN_BEGIN <= start_token <= TOKEN_END): | ||||
raise ValueError('Invalid page_token.') | raise StorageArgumentException('Invalid page_token.') | ||||
rows = self._cql_runner.origin_list(start_token, limit) | rows = self._cql_runner.origin_list(start_token, limit) | ||||
rows = list(rows) | rows = list(rows) | ||||
if len(rows) == limit: | if len(rows) == limit: | ||||
next_page_token: Optional[str] = str(rows[-1].tok+1) | next_page_token: Optional[str] = str(rows[-1].tok+1) | ||||
else: | else: | ||||
next_page_token = None | next_page_token = None | ||||
Show All 21 Lines | def origin_search(self, url_pattern, offset=0, limit=50, | ||||
{ | { | ||||
'url': orig.url, | 'url': orig.url, | ||||
} | } | ||||
for orig in origins[offset:offset+limit]] | for orig in origins[offset:offset+limit]] | ||||
def origin_add(self, origins): | def origin_add(self, origins): | ||||
origins = list(origins) | origins = list(origins) | ||||
if any('id' in origin for origin in origins): | if any('id' in origin for origin in origins): | ||||
raise ValueError('Origins must not already have an id.') | raise StorageArgumentException( | ||||
'Origins must not already have an id.') | |||||
results = [] | results = [] | ||||
for origin in origins: | for origin in origins: | ||||
self.origin_add_one(origin) | self.origin_add_one(origin) | ||||
results.append(origin) | results.append(origin) | ||||
return results | return results | ||||
def origin_add_one(self, origin): | def origin_add_one(self, origin): | ||||
known_origin = self.origin_get_one(origin) | known_origin = self.origin_get_one(origin) | ||||
Show All 30 Lines | def origin_visit_add(self, origin, date, type): | ||||
'snapshot': None, | 'snapshot': None, | ||||
'metadata': None, | 'metadata': None, | ||||
'visit': visit_id | 'visit': visit_id | ||||
} | } | ||||
if self.journal_writer: | if self.journal_writer: | ||||
self.journal_writer.write_addition('origin_visit', visit) | self.journal_writer.write_addition('origin_visit', visit) | ||||
try: | |||||
visit = OriginVisit.from_dict(visit) | |||||
except (KeyError, TypeError, ValueError) as e: | |||||
raise StorageArgumentException(*e.args) | |||||
self._cql_runner.origin_visit_add_one(visit) | self._cql_runner.origin_visit_add_one(visit) | ||||
return { | return { | ||||
'origin': origin_url, | 'origin': origin_url, | ||||
'visit': visit_id, | 'visit': visit_id, | ||||
} | } | ||||
def origin_visit_update(self, origin, visit_id, status=None, | def origin_visit_update(self, origin, visit_id, status=None, | ||||
metadata=None, snapshot=None): | metadata=None, snapshot=None): | ||||
origin_url = origin # TODO: rename the argument | origin_url = origin # TODO: rename the argument | ||||
# Get the existing data of the visit | # Get the existing data of the visit | ||||
row = self._cql_runner.origin_visit_get_one(origin_url, visit_id) | row = self._cql_runner.origin_visit_get_one(origin_url, visit_id) | ||||
if not row: | if not row: | ||||
raise ValueError('This origin visit does not exist.') | raise StorageArgumentException('This origin visit does not exist.') | ||||
try: | |||||
visit = OriginVisit.from_dict(self._format_origin_visit_row(row)) | visit = OriginVisit.from_dict(self._format_origin_visit_row(row)) | ||||
except (KeyError, TypeError, ValueError) as e: | |||||
raise StorageArgumentException(*e.args) | |||||
updates = {} | updates = {} | ||||
if status: | if status: | ||||
updates['status'] = status | updates['status'] = status | ||||
if metadata: | if metadata: | ||||
updates['metadata'] = metadata | updates['metadata'] = metadata | ||||
if snapshot: | if snapshot: | ||||
updates['snapshot'] = snapshot | updates['snapshot'] = snapshot | ||||
try: | |||||
visit = attr.evolve(visit, **updates) | visit = attr.evolve(visit, **updates) | ||||
except (KeyError, TypeError, ValueError) as e: | |||||
raise StorageArgumentException(*e.args) | |||||
if self.journal_writer: | if self.journal_writer: | ||||
self.journal_writer.write_update('origin_visit', visit) | self.journal_writer.write_update('origin_visit', visit) | ||||
self._cql_runner.origin_visit_update(origin_url, visit_id, updates) | self._cql_runner.origin_visit_update(origin_url, visit_id, updates) | ||||
def origin_visit_upsert(self, visits): | def origin_visit_upsert(self, visits): | ||||
visits = [visit.copy() for visit in visits] | visits = [visit.copy() for visit in visits] | ||||
▲ Show 20 Lines • Show All 133 Lines • Show Last 20 Lines |