Changeset View
Changeset View
Standalone View
Standalone View
swh/storage/cassandra/storage.py
# Copyright (C) 2019-2020 The Software Heritage developers | # Copyright (C) 2019-2020 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
import datetime | import datetime | ||||
import itertools | |||||
import json | import json | ||||
import random | import random | ||||
import re | import re | ||||
from typing import Any, Dict, List, Iterable, Optional, Union | from typing import Any, Dict, List, Iterable, Optional, Union | ||||
import uuid | |||||
import attr | import attr | ||||
import dateutil | import dateutil | ||||
from swh.model.model import ( | from swh.model.model import ( | ||||
Revision, | Revision, | ||||
Release, | Release, | ||||
Directory, | Directory, | ||||
▲ Show 20 Lines • Show All 1,009 Lines • ▼ Show 20 Lines | def origin_visit_get_random(self, type: str) -> Optional[Dict[str, Any]]: | ||||
if ( | if ( | ||||
visit_status["date"] > back_in_the_day | visit_status["date"] > back_in_the_day | ||||
and visit_status["status"] == "full" | and visit_status["status"] == "full" | ||||
): | ): | ||||
return visit_status | return visit_status | ||||
else: | else: | ||||
return None | return None | ||||
def tool_add(self, tools): | |||||
inserted = [] | |||||
for tool in tools: | |||||
tool = tool.copy() | |||||
tool_json = tool.copy() | |||||
tool_json["configuration"] = json.dumps( | |||||
tool["configuration"], sort_keys=True | |||||
).encode() | |||||
id_ = self._cql_runner.tool_get_one_uuid(**tool_json) | |||||
if not id_: | |||||
id_ = uuid.uuid1() | |||||
tool_json["id"] = id_ | |||||
self._cql_runner.tool_by_uuid_add_one(tool_json) | |||||
self._cql_runner.tool_add_one(tool_json) | |||||
tool["id"] = id_ | |||||
inserted.append(tool) | |||||
return inserted | |||||
def tool_get(self, tool): | |||||
id_ = self._cql_runner.tool_get_one_uuid( | |||||
tool["name"], | |||||
tool["version"], | |||||
json.dumps(tool["configuration"], sort_keys=True).encode(), | |||||
) | |||||
if id_: | |||||
tool = tool.copy() | |||||
tool["id"] = id_ | |||||
return tool | |||||
else: | |||||
return None | |||||
def stat_counters(self): | def stat_counters(self): | ||||
rows = self._cql_runner.stat_counters() | rows = self._cql_runner.stat_counters() | ||||
keys = ( | keys = ( | ||||
"content", | "content", | ||||
"directory", | "directory", | ||||
"origin", | "origin", | ||||
"origin_visit", | "origin_visit", | ||||
"release", | "release", | ||||
"revision", | "revision", | ||||
"skipped_content", | "skipped_content", | ||||
"snapshot", | "snapshot", | ||||
) | ) | ||||
stats = {key: 0 for key in keys} | stats = {key: 0 for key in keys} | ||||
stats.update({row.object_type: row.count for row in rows}) | stats.update({row.object_type: row.count for row in rows}) | ||||
return stats | return stats | ||||
def refresh_stat_counters(self): | def refresh_stat_counters(self): | ||||
pass | pass | ||||
def origin_metadata_add(self, origin_url, ts, provider, tool, metadata): | def origin_metadata_add( | ||||
# TODO | self, | ||||
raise NotImplementedError("not yet supported for Cassandra") | origin_url: str, | ||||
discovery_date: datetime.datetime, | |||||
def origin_metadata_get_by(self, origin_url, provider_type=None): | authority: Dict[str, Any], | ||||
# TODO | fetcher: Dict[str, Any], | ||||
raise NotImplementedError("not yet supported for Cassandra") | format: str, | ||||
metadata: bytes, | |||||
) -> None: | |||||
if not isinstance(origin_url, str): | |||||
raise StorageArgumentException( | |||||
"origin_id must be str, not %r" % (origin_url,) | |||||
) | |||||
if not self._cql_runner.metadata_authority_get(**authority): | |||||
raise StorageArgumentException(f"Unknown authority {authority}") | |||||
if not self._cql_runner.metadata_fetcher_get(**fetcher): | |||||
raise StorageArgumentException(f"Unknown fetcher {fetcher}") | |||||
self._cql_runner.origin_metadata_add( | |||||
origin_url, | |||||
authority["type"], | |||||
authority["url"], | |||||
discovery_date, | |||||
fetcher["name"], | |||||
fetcher["version"], | |||||
format, | |||||
metadata, | |||||
) | |||||
def metadata_provider_add( | def origin_metadata_get( | ||||
self, provider_name, provider_type, provider_url, metadata | self, | ||||
): | origin_url: str, | ||||
# TODO | authority: Dict[str, str], | ||||
raise NotImplementedError("not yet supported for Cassandra") | after: Optional[datetime.datetime] = None, | ||||
limit: Optional[int] = None, | |||||
) -> List[Dict[str, Any]]: | |||||
if not isinstance(origin_url, str): | |||||
raise TypeError("origin_url must be str, not %r" % (origin_url,)) | |||||
if after is None: | |||||
entries = self._cql_runner.origin_metadata_get( | |||||
origin_url, authority["type"], authority["url"] | |||||
) | |||||
else: | |||||
entries = self._cql_runner.origin_metadata_get_after( | |||||
origin_url, authority["type"], authority["url"], after | |||||
) | |||||
if limit: | |||||
entries = itertools.islice(entries, 0, limit) | |||||
results = [] | |||||
for entry in entries: | |||||
discovery_date = entry.discovery_date.replace(tzinfo=datetime.timezone.utc) | |||||
results.append( | |||||
{ | |||||
"origin_url": entry.origin, | |||||
"authority": { | |||||
"type": entry.authority_type, | |||||
"url": entry.authority_url, | |||||
}, | |||||
"fetcher": { | |||||
"name": entry.fetcher_name, | |||||
"version": entry.fetcher_version, | |||||
}, | |||||
"discovery_date": discovery_date, | |||||
"format": entry.format, | |||||
ardumont: don't we have some conversion to do on that field?
(in `metadata_fetcher_get` below, you do a… | |||||
Done Inline Actionsorigin metadata are blobs (because we get them from random sources), metadata fetcher metadatas are jsonb (because we write them at SWH) vlorentz: origin metadata are blobs (because we get them from random sources), metadata fetcher metadatas… | |||||
"metadata": entry.metadata, | |||||
} | |||||
) | |||||
return results | |||||
def metadata_provider_get(self, provider_id): | def metadata_fetcher_add( | ||||
# TODO | self, name: str, version: str, metadata: Dict[str, Any] | ||||
raise NotImplementedError("not yet supported for Cassandra") | ) -> None: | ||||
self._cql_runner.metadata_fetcher_add(name, version, json.dumps(metadata)) | |||||
def metadata_provider_get_by(self, provider): | |||||
# TODO | def metadata_fetcher_get(self, name: str, version: str) -> Optional[Dict[str, Any]]: | ||||
raise NotImplementedError("not yet supported for Cassandra") | fetcher = self._cql_runner.metadata_fetcher_get(name, version) | ||||
if fetcher: | |||||
return { | |||||
"name": fetcher.name, | |||||
"version": fetcher.version, | |||||
"metadata": json.loads(fetcher.metadata), | |||||
} | |||||
else: | |||||
return None | |||||
def metadata_authority_add( | |||||
self, type: str, url: str, metadata: Dict[str, Any] | |||||
) -> None: | |||||
self._cql_runner.metadata_authority_add(url, type, json.dumps(metadata)) | |||||
def metadata_authority_get(self, type: str, url: str) -> Optional[Dict[str, Any]]: | |||||
authority = self._cql_runner.metadata_authority_get(type, url) | |||||
if authority: | |||||
return { | |||||
"type": authority.type, | |||||
"url": authority.url, | |||||
"metadata": json.loads(authority.metadata), | |||||
} | |||||
else: | |||||
return None | |||||
def clear_buffers(self, object_types: Optional[Iterable[str]] = None) -> None: | def clear_buffers(self, object_types: Optional[Iterable[str]] = None) -> None: | ||||
"""Do nothing | """Do nothing | ||||
""" | """ | ||||
return None | return None | ||||
def flush(self, object_types: Optional[Iterable[str]] = None) -> Dict: | def flush(self, object_types: Optional[Iterable[str]] = None) -> Dict: | ||||
return {} | return {} |
don't we have some conversion to do on that field?
(in metadata_fetcher_get below, you do a json.loads so ¯\_(ツ)_/¯)