Changeset View
Changeset View
Standalone View
Standalone View
swh/storage/cassandra/storage.py
# Copyright (C) 2019-2020 The Software Heritage developers | # Copyright (C) 2019-2020 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
import datetime | import datetime | ||||
import json | import json | ||||
import random | import random | ||||
import re | import re | ||||
from typing import Any, Dict, List, Iterable, Optional, Union | from typing import Any, Dict, List, Iterable, Optional, Union | ||||
import uuid | import uuid | ||||
import attr | import attr | ||||
import dateutil | import dateutil | ||||
from swh.model.model import ( | from swh.model.model import ( | ||||
Revision, Release, Directory, DirectoryEntry, Content, SkippedContent, | Revision, Release, Directory, DirectoryEntry, Content, | ||||
OriginVisit, Snapshot, Origin | SkippedContent, OriginVisit, Snapshot, Origin | ||||
) | ) | ||||
from swh.model.hashutil import DEFAULT_ALGORITHMS | from swh.model.hashutil import DEFAULT_ALGORITHMS | ||||
from swh.storage.objstorage import ObjStorage | from swh.storage.objstorage import ObjStorage | ||||
from swh.storage.writer import JournalWriter | from swh.storage.writer import JournalWriter | ||||
from .. import HashCollision | from .. import HashCollision | ||||
from ..exc import StorageArgumentException | from ..exc import StorageArgumentException | ||||
from .common import TOKEN_BEGIN, TOKEN_END | from .common import TOKEN_BEGIN, TOKEN_END | ||||
▲ Show 20 Lines • Show All 636 Lines • ▼ Show 20 Lines | def origin_get(self, origins): | ||||
results = [self.origin_get_one(origin) for origin in origins] | results = [self.origin_get_one(origin) for origin in origins] | ||||
if return_single: | if return_single: | ||||
assert len(results) == 1 | assert len(results) == 1 | ||||
return results[0] | return results[0] | ||||
else: | else: | ||||
return results | return results | ||||
def origin_get_one(self, origin): | def origin_get_one(self, origin: Dict[str, Any]) -> Optional[ | ||||
Dict[str, Any]]: | |||||
if 'id' in origin: | if 'id' in origin: | ||||
raise StorageArgumentException('Origin ids are not supported.') | raise StorageArgumentException('Origin ids are not supported.') | ||||
if 'url' not in origin: | if 'url' not in origin: | ||||
raise StorageArgumentException('Missing origin url') | raise StorageArgumentException('Missing origin url') | ||||
rows = self._cql_runner.origin_get_by_url(origin['url']) | rows = self._cql_runner.origin_get_by_url(origin['url']) | ||||
rows = list(rows) | rows = list(rows) | ||||
if rows: | if rows: | ||||
▲ Show 20 Lines • Show All 72 Lines • ▼ Show 20 Lines | def origin_add_one(self, origin: Origin) -> str: | ||||
else: | else: | ||||
self.journal_writer.origin_add_one(origin) | self.journal_writer.origin_add_one(origin) | ||||
self._cql_runner.origin_add_one(origin) | self._cql_runner.origin_add_one(origin) | ||||
origin_url = origin.url | origin_url = origin.url | ||||
return origin_url | return origin_url | ||||
def origin_visit_add( | def origin_visit_add(self, origin_url: str, | ||||
self, origin, date, type) -> Optional[Dict[str, Union[str, int]]]: | date: Union[str, datetime.datetime], | ||||
origin_url = origin # TODO: rename the argument | type: str) -> OriginVisit: | ||||
if isinstance(date, str): | if isinstance(date, str): | ||||
# FIXME: Converge on iso8601 at some point | |||||
date = dateutil.parser.parse(date) | date = dateutil.parser.parse(date) | ||||
elif not isinstance(date, datetime.datetime): | |||||
raise StorageArgumentException( | |||||
'Date must be a datetime or a string') | |||||
origin = self.origin_get_one({'url': origin_url}) | if not self.origin_get_one({'url': origin_url}): | ||||
raise StorageArgumentException( | |||||
if not origin: | 'Unknown origin %s', origin_url) | ||||
return None | |||||
visit_id = self._cql_runner.origin_generate_unique_visit_id(origin_url) | visit_id = self._cql_runner.origin_generate_unique_visit_id(origin_url) | ||||
try: | try: | ||||
visit = OriginVisit.from_dict({ | visit = OriginVisit.from_dict({ | ||||
'origin': origin_url, | 'origin': origin_url, | ||||
'date': date, | 'date': date, | ||||
'type': type, | 'type': type, | ||||
'status': 'ongoing', | 'status': 'ongoing', | ||||
'snapshot': None, | 'snapshot': None, | ||||
'metadata': None, | 'metadata': None, | ||||
'visit': visit_id | 'visit': visit_id | ||||
}) | }) | ||||
except (KeyError, TypeError, ValueError) as e: | except (KeyError, TypeError, ValueError) as e: | ||||
raise StorageArgumentException(*e.args) | raise StorageArgumentException(*e.args) | ||||
self.journal_writer.origin_visit_add(visit) | self.journal_writer.origin_visit_add(visit) | ||||
self._cql_runner.origin_visit_add_one(visit) | self._cql_runner.origin_visit_add_one(visit) | ||||
return visit | |||||
return { | |||||
'origin': origin_url, | |||||
'visit': visit_id, | |||||
} | |||||
def origin_visit_update( | def origin_visit_update( | ||||
self, origin: str, visit_id: int, status: Optional[str] = None, | self, origin: str, visit_id: int, status: Optional[str] = None, | ||||
metadata: Optional[Dict] = None, snapshot: Optional[bytes] = None): | metadata: Optional[Dict] = None, snapshot: Optional[bytes] = None): | ||||
origin_url = origin # TODO: rename the argument | origin_url = origin # TODO: rename the argument | ||||
# Get the existing data of the visit | # Get the existing data of the visit | ||||
row = self._cql_runner.origin_visit_get_one(origin_url, visit_id) | row = self._cql_runner.origin_visit_get_one(origin_url, visit_id) | ||||
▲ Show 20 Lines • Show All 148 Lines • Show Last 20 Lines |