Changeset View
Changeset View
Standalone View
Standalone View
swh/indexer/storage/__init__.py
# Copyright (C) 2015-2020 The Software Heritage developers | # Copyright (C) 2015-2020 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
from collections import Counter | from collections import Counter | ||||
import itertools | import itertools | ||||
import json | import json | ||||
from typing import Dict, Iterable, List, Optional, Tuple | from typing import Dict, Iterable, List, Optional, Tuple | ||||
import psycopg2 | import psycopg2 | ||||
import psycopg2.pool | import psycopg2.pool | ||||
from swh.core.db.common import db_transaction, db_transaction_generator | from swh.core.db.common import db_transaction | ||||
from swh.model.hashutil import hash_to_bytes, hash_to_hex | from swh.model.hashutil import hash_to_bytes, hash_to_hex | ||||
from swh.model.model import SHA1_SIZE | from swh.model.model import SHA1_SIZE | ||||
from swh.storage.exc import StorageDBError | from swh.storage.exc import StorageDBError | ||||
from swh.storage.utils import get_partition_bounds_bytes | from swh.storage.utils import get_partition_bounds_bytes | ||||
from . import converters | from . import converters | ||||
from .db import Db | from .db import Db | ||||
from .exc import DuplicateId, IndexerStorageArgumentException | from .exc import DuplicateId, IndexerStorageArgumentException | ||||
▲ Show 20 Lines • Show All 114 Lines • ▼ Show 20 Lines | def check_config(self, *, check_write, db=None, cur=None): | ||||
cur.execute( | cur.execute( | ||||
"select has_table_privilege(current_user, 'content_mimetype', %s)", # noqa | "select has_table_privilege(current_user, 'content_mimetype', %s)", # noqa | ||||
(check,), | (check,), | ||||
) | ) | ||||
return cur.fetchone()[0] | return cur.fetchone()[0] | ||||
@timed | @timed | ||||
@db_transaction_generator() | @db_transaction() | ||||
def content_mimetype_missing( | def content_mimetype_missing( | ||||
self, mimetypes: Iterable[Dict], db=None, cur=None | self, mimetypes: Iterable[Dict], db=None, cur=None | ||||
) -> Iterable[Tuple[Sha1, int]]: | ) -> List[Tuple[Sha1, int]]: | ||||
for obj in db.content_mimetype_missing_from_list(mimetypes, cur): | return [obj[0] for obj in db.content_mimetype_missing_from_list(mimetypes, cur)] | ||||
yield obj[0] | |||||
@timed | @timed | ||||
@db_transaction() | @db_transaction() | ||||
def get_partition( | def get_partition( | ||||
self, | self, | ||||
indexer_type: str, | indexer_type: str, | ||||
indexer_configuration_id: int, | indexer_configuration_id: int, | ||||
partition_id: int, | partition_id: int, | ||||
▲ Show 20 Lines • Show All 101 Lines • ▼ Show 20 Lines | ) -> Dict[str, int]: | ||||
"tmp_content_mimetype", | "tmp_content_mimetype", | ||||
["id", "mimetype", "encoding", "indexer_configuration_id"], | ["id", "mimetype", "encoding", "indexer_configuration_id"], | ||||
cur, | cur, | ||||
) | ) | ||||
count = db.content_mimetype_add_from_temp(conflict_update, cur) | count = db.content_mimetype_add_from_temp(conflict_update, cur) | ||||
return {"content_mimetype:add": count} | return {"content_mimetype:add": count} | ||||
@timed | @timed | ||||
@db_transaction_generator() | @db_transaction() | ||||
def content_mimetype_get( | def content_mimetype_get( | ||||
self, ids: Iterable[Sha1], db=None, cur=None | self, ids: Iterable[Sha1], db=None, cur=None | ||||
) -> Iterable[ContentMimetypeRow]: | ) -> List[ContentMimetypeRow]: | ||||
for c in db.content_mimetype_get_from_list(ids, cur): | return [ | ||||
yield ContentMimetypeRow.from_dict( | ContentMimetypeRow.from_dict( | ||||
converters.db_to_mimetype(dict(zip(db.content_mimetype_cols, c))) | converters.db_to_mimetype(dict(zip(db.content_mimetype_cols, c))) | ||||
) | ) | ||||
for c in db.content_mimetype_get_from_list(ids, cur) | |||||
] | |||||
@timed | @timed | ||||
@db_transaction_generator() | @db_transaction() | ||||
def content_language_missing(self, languages, db=None, cur=None): | def content_language_missing(self, languages, db=None, cur=None): | ||||
for obj in db.content_language_missing_from_list(languages, cur): | return [obj[0] for obj in db.content_language_missing_from_list(languages, cur)] | ||||
yield obj[0] | |||||
@timed | @timed | ||||
@db_transaction_generator() | @db_transaction() | ||||
def content_language_get(self, ids, db=None, cur=None): | def content_language_get(self, ids, db=None, cur=None): | ||||
for c in db.content_language_get_from_list(ids, cur): | return [ | ||||
yield converters.db_to_language(dict(zip(db.content_language_cols, c))) | converters.db_to_language(dict(zip(db.content_language_cols, c))) | ||||
for c in db.content_language_get_from_list(ids, cur) | |||||
] | |||||
@timed | @timed | ||||
@process_metrics | @process_metrics | ||||
@db_transaction() | @db_transaction() | ||||
def content_language_add( | def content_language_add( | ||||
self, languages: List[Dict], conflict_update: bool = False, db=None, cur=None | self, languages: List[Dict], conflict_update: bool = False, db=None, cur=None | ||||
) -> Dict[str, int]: | ) -> Dict[str, int]: | ||||
check_id_duplicates(map(ContentLanguageRow.from_dict, languages)) | check_id_duplicates(map(ContentLanguageRow.from_dict, languages)) | ||||
Show All 13 Lines | ) -> Dict[str, int]: | ||||
["id", "lang", "indexer_configuration_id"], | ["id", "lang", "indexer_configuration_id"], | ||||
cur, | cur, | ||||
) | ) | ||||
count = db.content_language_add_from_temp(conflict_update, cur) | count = db.content_language_add_from_temp(conflict_update, cur) | ||||
return {"content_language:add": count} | return {"content_language:add": count} | ||||
@timed | @timed | ||||
@db_transaction_generator() | @db_transaction() | ||||
def content_ctags_missing(self, ctags, db=None, cur=None): | def content_ctags_missing(self, ctags, db=None, cur=None): | ||||
for obj in db.content_ctags_missing_from_list(ctags, cur): | return [obj[0] for obj in db.content_ctags_missing_from_list(ctags, cur)] | ||||
yield obj[0] | |||||
@timed | @timed | ||||
@db_transaction_generator() | @db_transaction() | ||||
def content_ctags_get(self, ids, db=None, cur=None): | def content_ctags_get(self, ids, db=None, cur=None): | ||||
for c in db.content_ctags_get_from_list(ids, cur): | return [ | ||||
yield converters.db_to_ctags(dict(zip(db.content_ctags_cols, c))) | converters.db_to_ctags(dict(zip(db.content_ctags_cols, c))) | ||||
for c in db.content_ctags_get_from_list(ids, cur) | |||||
] | |||||
@timed | @timed | ||||
@process_metrics | @process_metrics | ||||
@db_transaction() | @db_transaction() | ||||
def content_ctags_add( | def content_ctags_add( | ||||
self, ctags: List[Dict], conflict_update: bool = False, db=None, cur=None | self, ctags: List[Dict], conflict_update: bool = False, db=None, cur=None | ||||
) -> Dict[str, int]: | ) -> Dict[str, int]: | ||||
rows = list(itertools.chain.from_iterable(map(converters.ctags_to_db, ctags))) | rows = list(itertools.chain.from_iterable(map(converters.ctags_to_db, ctags))) | ||||
check_id_duplicates(map(ContentCtagsRow.from_dict, rows)) | check_id_duplicates(map(ContentCtagsRow.from_dict, rows)) | ||||
ctags.sort(key=lambda m: m["id"]) | ctags.sort(key=lambda m: m["id"]) | ||||
db.mktemp_content_ctags(cur) | db.mktemp_content_ctags(cur) | ||||
db.copy_to( | db.copy_to( | ||||
rows, | rows, | ||||
tblname="tmp_content_ctags", | tblname="tmp_content_ctags", | ||||
columns=["id", "name", "kind", "line", "lang", "indexer_configuration_id"], | columns=["id", "name", "kind", "line", "lang", "indexer_configuration_id"], | ||||
cur=cur, | cur=cur, | ||||
) | ) | ||||
count = db.content_ctags_add_from_temp(conflict_update, cur) | count = db.content_ctags_add_from_temp(conflict_update, cur) | ||||
return {"content_ctags:add": count} | return {"content_ctags:add": count} | ||||
@timed | @timed | ||||
@db_transaction_generator() | @db_transaction() | ||||
def content_ctags_search( | def content_ctags_search( | ||||
self, expression, limit=10, last_sha1=None, db=None, cur=None | self, expression, limit=10, last_sha1=None, db=None, cur=None | ||||
): | ): | ||||
for obj in db.content_ctags_search(expression, last_sha1, limit, cur=cur): | return [ | ||||
yield converters.db_to_ctags(dict(zip(db.content_ctags_cols, obj))) | converters.db_to_ctags(dict(zip(db.content_ctags_cols, obj))) | ||||
for obj in db.content_ctags_search(expression, last_sha1, limit, cur=cur) | |||||
] | |||||
@timed | @timed | ||||
@db_transaction_generator() | @db_transaction() | ||||
def content_fossology_license_get( | def content_fossology_license_get( | ||||
self, ids: Iterable[Sha1], db=None, cur=None | self, ids: Iterable[Sha1], db=None, cur=None | ||||
) -> Iterable[ContentLicenseRow]: | ) -> List[ContentLicenseRow]: | ||||
for c in db.content_fossology_license_get_from_list(ids, cur): | return [ | ||||
yield ContentLicenseRow.from_dict( | ContentLicenseRow.from_dict( | ||||
converters.db_to_fossology_license( | converters.db_to_fossology_license( | ||||
dict(zip(db.content_fossology_license_cols, c)) | dict(zip(db.content_fossology_license_cols, c)) | ||||
) | ) | ||||
) | ) | ||||
for c in db.content_fossology_license_get_from_list(ids, cur) | |||||
] | |||||
@timed | @timed | ||||
@process_metrics | @process_metrics | ||||
@db_transaction() | @db_transaction() | ||||
def content_fossology_license_add( | def content_fossology_license_add( | ||||
self, | self, | ||||
licenses: List[ContentLicenseRow], | licenses: List[ContentLicenseRow], | ||||
conflict_update: bool = False, | conflict_update: bool = False, | ||||
Show All 32 Lines | ) -> PagedResult[Sha1]: | ||||
page_token=page_token, | page_token=page_token, | ||||
limit=limit, | limit=limit, | ||||
with_textual_data=True, | with_textual_data=True, | ||||
db=db, | db=db, | ||||
cur=cur, | cur=cur, | ||||
) | ) | ||||
@timed | @timed | ||||
@db_transaction_generator() | @db_transaction() | ||||
def content_metadata_missing(self, metadata, db=None, cur=None): | def content_metadata_missing(self, metadata, db=None, cur=None): | ||||
for obj in db.content_metadata_missing_from_list(metadata, cur): | return [obj[0] for obj in db.content_metadata_missing_from_list(metadata, cur)] | ||||
yield obj[0] | |||||
@timed | @timed | ||||
@db_transaction_generator() | @db_transaction() | ||||
def content_metadata_get(self, ids, db=None, cur=None): | def content_metadata_get(self, ids, db=None, cur=None): | ||||
for c in db.content_metadata_get_from_list(ids, cur): | return [ | ||||
yield converters.db_to_metadata(dict(zip(db.content_metadata_cols, c))) | converters.db_to_metadata(dict(zip(db.content_metadata_cols, c))) | ||||
for c in db.content_metadata_get_from_list(ids, cur) | |||||
] | |||||
@timed | @timed | ||||
@process_metrics | @process_metrics | ||||
@db_transaction() | @db_transaction() | ||||
def content_metadata_add( | def content_metadata_add( | ||||
self, metadata: List[Dict], conflict_update: bool = False, db=None, cur=None | self, metadata: List[Dict], conflict_update: bool = False, db=None, cur=None | ||||
) -> Dict[str, int]: | ) -> Dict[str, int]: | ||||
check_id_duplicates(map(ContentMetadataRow.from_dict, metadata)) | check_id_duplicates(map(ContentMetadataRow.from_dict, metadata)) | ||||
metadata.sort(key=lambda m: m["id"]) | metadata.sort(key=lambda m: m["id"]) | ||||
db.mktemp_content_metadata(cur) | db.mktemp_content_metadata(cur) | ||||
db.copy_to( | db.copy_to( | ||||
metadata, | metadata, | ||||
"tmp_content_metadata", | "tmp_content_metadata", | ||||
["id", "metadata", "indexer_configuration_id"], | ["id", "metadata", "indexer_configuration_id"], | ||||
cur, | cur, | ||||
) | ) | ||||
count = db.content_metadata_add_from_temp(conflict_update, cur) | count = db.content_metadata_add_from_temp(conflict_update, cur) | ||||
return { | return { | ||||
"content_metadata:add": count, | "content_metadata:add": count, | ||||
} | } | ||||
@timed | @timed | ||||
@db_transaction_generator() | @db_transaction() | ||||
def revision_intrinsic_metadata_missing(self, metadata, db=None, cur=None): | def revision_intrinsic_metadata_missing(self, metadata, db=None, cur=None): | ||||
for obj in db.revision_intrinsic_metadata_missing_from_list(metadata, cur): | return [ | ||||
yield obj[0] | obj[0] | ||||
for obj in db.revision_intrinsic_metadata_missing_from_list(metadata, cur) | |||||
] | |||||
@timed | @timed | ||||
@db_transaction_generator() | @db_transaction() | ||||
def revision_intrinsic_metadata_get(self, ids, db=None, cur=None): | def revision_intrinsic_metadata_get(self, ids, db=None, cur=None): | ||||
for c in db.revision_intrinsic_metadata_get_from_list(ids, cur): | return [ | ||||
yield converters.db_to_metadata( | converters.db_to_metadata(dict(zip(db.revision_intrinsic_metadata_cols, c))) | ||||
dict(zip(db.revision_intrinsic_metadata_cols, c)) | for c in db.revision_intrinsic_metadata_get_from_list(ids, cur) | ||||
) | ] | ||||
@timed | @timed | ||||
@process_metrics | @process_metrics | ||||
@db_transaction() | @db_transaction() | ||||
def revision_intrinsic_metadata_add( | def revision_intrinsic_metadata_add( | ||||
self, metadata: List[Dict], conflict_update: bool = False, db=None, cur=None | self, metadata: List[Dict], conflict_update: bool = False, db=None, cur=None | ||||
) -> Dict[str, int]: | ) -> Dict[str, int]: | ||||
check_id_duplicates(map(RevisionIntrinsicMetadataRow.from_dict, metadata)) | check_id_duplicates(map(RevisionIntrinsicMetadataRow.from_dict, metadata)) | ||||
Show All 17 Lines | class IndexerStorage: | ||||
@db_transaction() | @db_transaction() | ||||
def revision_intrinsic_metadata_delete( | def revision_intrinsic_metadata_delete( | ||||
self, entries: List[Dict], db=None, cur=None | self, entries: List[Dict], db=None, cur=None | ||||
) -> Dict: | ) -> Dict: | ||||
count = db.revision_intrinsic_metadata_delete(entries, cur) | count = db.revision_intrinsic_metadata_delete(entries, cur) | ||||
return {"revision_intrinsic_metadata:del": count} | return {"revision_intrinsic_metadata:del": count} | ||||
@timed | @timed | ||||
@db_transaction_generator() | @db_transaction() | ||||
def origin_intrinsic_metadata_get(self, ids, db=None, cur=None): | def origin_intrinsic_metadata_get(self, ids, db=None, cur=None): | ||||
for c in db.origin_intrinsic_metadata_get_from_list(ids, cur): | return [ | ||||
yield converters.db_to_metadata( | converters.db_to_metadata(dict(zip(db.origin_intrinsic_metadata_cols, c))) | ||||
dict(zip(db.origin_intrinsic_metadata_cols, c)) | for c in db.origin_intrinsic_metadata_get_from_list(ids, cur) | ||||
) | ] | ||||
@timed | @timed | ||||
@process_metrics | @process_metrics | ||||
@db_transaction() | @db_transaction() | ||||
def origin_intrinsic_metadata_add( | def origin_intrinsic_metadata_add( | ||||
self, metadata: List[Dict], conflict_update: bool = False, db=None, cur=None | self, metadata: List[Dict], conflict_update: bool = False, db=None, cur=None | ||||
) -> Dict[str, int]: | ) -> Dict[str, int]: | ||||
check_id_duplicates(map(OriginIntrinsicMetadataRow.from_dict, metadata)) | check_id_duplicates(map(OriginIntrinsicMetadataRow.from_dict, metadata)) | ||||
Show All 19 Lines | def origin_intrinsic_metadata_delete( | ||||
self, entries: List[Dict], db=None, cur=None | self, entries: List[Dict], db=None, cur=None | ||||
) -> Dict: | ) -> Dict: | ||||
count = db.origin_intrinsic_metadata_delete(entries, cur) | count = db.origin_intrinsic_metadata_delete(entries, cur) | ||||
return { | return { | ||||
"origin_intrinsic_metadata:del": count, | "origin_intrinsic_metadata:del": count, | ||||
} | } | ||||
@timed | @timed | ||||
@db_transaction_generator() | @db_transaction() | ||||
def origin_intrinsic_metadata_search_fulltext( | def origin_intrinsic_metadata_search_fulltext( | ||||
self, conjunction, limit=100, db=None, cur=None | self, conjunction, limit=100, db=None, cur=None | ||||
): | ): | ||||
return [ | |||||
converters.db_to_metadata(dict(zip(db.origin_intrinsic_metadata_cols, c))) | |||||
for c in db.origin_intrinsic_metadata_search_fulltext( | for c in db.origin_intrinsic_metadata_search_fulltext( | ||||
conjunction, limit=limit, cur=cur | conjunction, limit=limit, cur=cur | ||||
): | |||||
yield converters.db_to_metadata( | |||||
dict(zip(db.origin_intrinsic_metadata_cols, c)) | |||||
) | ) | ||||
] | |||||
@timed | @timed | ||||
@db_transaction() | @db_transaction() | ||||
def origin_intrinsic_metadata_search_by_producer( | def origin_intrinsic_metadata_search_by_producer( | ||||
self, | self, | ||||
page_token="", | page_token="", | ||||
limit=100, | limit=100, | ||||
ids_only=False, | ids_only=False, | ||||
▲ Show 20 Lines • Show All 56 Lines • ▼ Show 20 Lines | def origin_intrinsic_metadata_stats(self, db=None, cur=None): | ||||
results = dict(zip(mapping_names + ["total", "non_empty"], cur.fetchone())) | results = dict(zip(mapping_names + ["total", "non_empty"], cur.fetchone())) | ||||
return { | return { | ||||
"total": results.pop("total"), | "total": results.pop("total"), | ||||
"non_empty": results.pop("non_empty"), | "non_empty": results.pop("non_empty"), | ||||
"per_mapping": results, | "per_mapping": results, | ||||
} | } | ||||
@timed | @timed | ||||
@db_transaction_generator() | @db_transaction() | ||||
def indexer_configuration_add(self, tools, db=None, cur=None): | def indexer_configuration_add(self, tools, db=None, cur=None): | ||||
db.mktemp_indexer_configuration(cur) | db.mktemp_indexer_configuration(cur) | ||||
db.copy_to( | db.copy_to( | ||||
tools, | tools, | ||||
"tmp_indexer_configuration", | "tmp_indexer_configuration", | ||||
["tool_name", "tool_version", "tool_configuration"], | ["tool_name", "tool_version", "tool_configuration"], | ||||
cur, | cur, | ||||
) | ) | ||||
tools = db.indexer_configuration_add_from_temp(cur) | tools = db.indexer_configuration_add_from_temp(cur) | ||||
count = 0 | results = [dict(zip(db.indexer_configuration_cols, line)) for line in tools] | ||||
for line in tools: | |||||
yield dict(zip(db.indexer_configuration_cols, line)) | |||||
count += 1 | |||||
send_metric( | send_metric( | ||||
"indexer_configuration:add", count, method_name="indexer_configuration_add" | "indexer_configuration:add", | ||||
len(results), | |||||
method_name="indexer_configuration_add", | |||||
) | ) | ||||
return results | |||||
@timed | @timed | ||||
@db_transaction() | @db_transaction() | ||||
def indexer_configuration_get(self, tool, db=None, cur=None): | def indexer_configuration_get(self, tool, db=None, cur=None): | ||||
tool_conf = tool["tool_configuration"] | tool_conf = tool["tool_configuration"] | ||||
if isinstance(tool_conf, dict): | if isinstance(tool_conf, dict): | ||||
tool_conf = json.dumps(tool_conf) | tool_conf = json.dumps(tool_conf) | ||||
idx = db.indexer_configuration_get( | idx = db.indexer_configuration_get( | ||||
tool["tool_name"], tool["tool_version"], tool_conf | tool["tool_name"], tool["tool_version"], tool_conf | ||||
) | ) | ||||
if not idx: | if not idx: | ||||
return None | return None | ||||
return dict(zip(db.indexer_configuration_cols, idx)) | return dict(zip(db.indexer_configuration_cols, idx)) |