Changeset View
Changeset View
Standalone View
Standalone View
swh/indexer/storage/__init__.py
# Copyright (C) 2015-2020 The Software Heritage developers | # Copyright (C) 2015-2020 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
import json | import json | ||||
import psycopg2 | import psycopg2 | ||||
import psycopg2.pool | import psycopg2.pool | ||||
from collections import defaultdict | from collections import defaultdict, Counter | ||||
from swh.storage.common import db_transaction_generator, db_transaction | from swh.storage.common import db_transaction_generator, db_transaction | ||||
from swh.storage.exc import StorageDBError | from swh.storage.exc import StorageDBError | ||||
from .db import Db | |||||
from . import converters | from . import converters | ||||
from .db import Db | |||||
from .exc import IndexerStorageArgumentException, DuplicateId | |||||
INDEXER_CFG_KEY = 'indexer_storage' | INDEXER_CFG_KEY = 'indexer_storage' | ||||
MAPPING_NAMES = ['codemeta', 'gemspec', 'maven', 'npm', 'pkg-info'] | MAPPING_NAMES = ['codemeta', 'gemspec', 'maven', 'npm', 'pkg-info'] | ||||
Show All 20 Lines | def get_indexer_storage(cls, args): | ||||
elif cls == 'memory': | elif cls == 'memory': | ||||
from .in_memory import IndexerStorage | from .in_memory import IndexerStorage | ||||
else: | else: | ||||
raise ValueError('Unknown indexer storage class `%s`' % cls) | raise ValueError('Unknown indexer storage class `%s`' % cls) | ||||
return IndexerStorage(**args) | return IndexerStorage(**args) | ||||
def _check_id_duplicates(data): | def check_id_duplicates(data): | ||||
""" | """ | ||||
If any two dictionaries in `data` have the same id, raises | If any two dictionaries in `data` have the same id, raises | ||||
a `ValueError`. | a `ValueError`. | ||||
Values associated to the key must be hashable. | Values associated to the key must be hashable. | ||||
Args: | Args: | ||||
data (List[dict]): List of dictionaries to be inserted | data (List[dict]): List of dictionaries to be inserted | ||||
>>> _check_id_duplicates([ | >>> check_id_duplicates([ | ||||
... {'id': 'foo', 'data': 'spam'}, | ... {'id': 'foo', 'data': 'spam'}, | ||||
... {'id': 'bar', 'data': 'egg'}, | ... {'id': 'bar', 'data': 'egg'}, | ||||
... ]) | ... ]) | ||||
>>> _check_id_duplicates([ | >>> check_id_duplicates([ | ||||
... {'id': 'foo', 'data': 'spam'}, | ... {'id': 'foo', 'data': 'spam'}, | ||||
... {'id': 'foo', 'data': 'egg'}, | ... {'id': 'foo', 'data': 'egg'}, | ||||
... ]) | ... ]) | ||||
Traceback (most recent call last): | Traceback (most recent call last): | ||||
... | ... | ||||
ValueError: The same id is present more than once. | swh.indexer.storage.exc.DuplicateId: ['foo'] | ||||
""" | """ | ||||
if len({item['id'] for item in data}) < len(data): | counter = Counter(item['id'] for item in data) | ||||
raise ValueError('The same id is present more than once.') | duplicates = [id_ for (id_, count) in counter.items() if count >= 2] | ||||
if duplicates: | |||||
raise DuplicateId(duplicates) | |||||
class IndexerStorage: | class IndexerStorage: | ||||
"""SWH Indexer Storage | """SWH Indexer Storage | ||||
""" | """ | ||||
def __init__(self, db, min_pool_conns=1, max_pool_conns=10): | def __init__(self, db, min_pool_conns=1, max_pool_conns=10): | ||||
""" | """ | ||||
▲ Show 20 Lines • Show All 41 Lines • ▼ Show 20 Lines | def content_mimetype_missing(self, mimetypes, db=None, cur=None): | ||||
for obj in db.content_mimetype_missing_from_list(mimetypes, cur): | for obj in db.content_mimetype_missing_from_list(mimetypes, cur): | ||||
yield obj[0] | yield obj[0] | ||||
def _content_get_range(self, content_type, start, end, | def _content_get_range(self, content_type, start, end, | ||||
indexer_configuration_id, limit=1000, | indexer_configuration_id, limit=1000, | ||||
with_textual_data=False, | with_textual_data=False, | ||||
db=None, cur=None): | db=None, cur=None): | ||||
if limit is None: | if limit is None: | ||||
raise ValueError('Development error: limit should not be None') | raise IndexerStorageArgumentException('limit should not be None') | ||||
if content_type not in db.content_indexer_names: | if content_type not in db.content_indexer_names: | ||||
err = 'Development error: Wrong type. Should be one of [%s]' % ( | err = 'Wrong type. Should be one of [%s]' % ( | ||||
','.join(db.content_indexer_names)) | ','.join(db.content_indexer_names)) | ||||
raise ValueError(err) | raise IndexerStorageArgumentException(err) | ||||
ids = [] | ids = [] | ||||
next_id = None | next_id = None | ||||
for counter, obj in enumerate(db.content_get_range( | for counter, obj in enumerate(db.content_get_range( | ||||
content_type, start, end, indexer_configuration_id, | content_type, start, end, indexer_configuration_id, | ||||
limit=limit+1, with_textual_data=with_textual_data, cur=cur)): | limit=limit+1, with_textual_data=with_textual_data, cur=cur)): | ||||
_id = obj[0] | _id = obj[0] | ||||
if counter >= limit: | if counter >= limit: | ||||
Show All 12 Lines | def content_mimetype_get_range(self, start, end, indexer_configuration_id, | ||||
limit=1000, db=None, cur=None): | limit=1000, db=None, cur=None): | ||||
return self._content_get_range('mimetype', start, end, | return self._content_get_range('mimetype', start, end, | ||||
indexer_configuration_id, limit=limit, | indexer_configuration_id, limit=limit, | ||||
db=db, cur=cur) | db=db, cur=cur) | ||||
@db_transaction() | @db_transaction() | ||||
def content_mimetype_add(self, mimetypes, conflict_update=False, db=None, | def content_mimetype_add(self, mimetypes, conflict_update=False, db=None, | ||||
cur=None): | cur=None): | ||||
_check_id_duplicates(mimetypes) | check_id_duplicates(mimetypes) | ||||
mimetypes.sort(key=lambda m: m['id']) | mimetypes.sort(key=lambda m: m['id']) | ||||
db.mktemp_content_mimetype(cur) | db.mktemp_content_mimetype(cur) | ||||
db.copy_to(mimetypes, 'tmp_content_mimetype', | db.copy_to(mimetypes, 'tmp_content_mimetype', | ||||
['id', 'mimetype', 'encoding', 'indexer_configuration_id'], | ['id', 'mimetype', 'encoding', 'indexer_configuration_id'], | ||||
cur) | cur) | ||||
db.content_mimetype_add_from_temp(conflict_update, cur) | db.content_mimetype_add_from_temp(conflict_update, cur) | ||||
@db_transaction_generator() | @db_transaction_generator() | ||||
Show All 11 Lines | class IndexerStorage: | ||||
def content_language_get(self, ids, db=None, cur=None): | def content_language_get(self, ids, db=None, cur=None): | ||||
for c in db.content_language_get_from_list(ids, cur): | for c in db.content_language_get_from_list(ids, cur): | ||||
yield converters.db_to_language( | yield converters.db_to_language( | ||||
dict(zip(db.content_language_cols, c))) | dict(zip(db.content_language_cols, c))) | ||||
@db_transaction() | @db_transaction() | ||||
def content_language_add(self, languages, conflict_update=False, db=None, | def content_language_add(self, languages, conflict_update=False, db=None, | ||||
cur=None): | cur=None): | ||||
_check_id_duplicates(languages) | check_id_duplicates(languages) | ||||
languages.sort(key=lambda m: m['id']) | languages.sort(key=lambda m: m['id']) | ||||
db.mktemp_content_language(cur) | db.mktemp_content_language(cur) | ||||
# empty language is mapped to 'unknown' | # empty language is mapped to 'unknown' | ||||
db.copy_to( | db.copy_to( | ||||
({ | ({ | ||||
'id': l['id'], | 'id': l['id'], | ||||
'lang': 'unknown' if not l['lang'] else l['lang'], | 'lang': 'unknown' if not l['lang'] else l['lang'], | ||||
'indexer_configuration_id': l['indexer_configuration_id'], | 'indexer_configuration_id': l['indexer_configuration_id'], | ||||
Show All 11 Lines | class IndexerStorage: | ||||
@db_transaction_generator() | @db_transaction_generator() | ||||
def content_ctags_get(self, ids, db=None, cur=None): | def content_ctags_get(self, ids, db=None, cur=None): | ||||
for c in db.content_ctags_get_from_list(ids, cur): | for c in db.content_ctags_get_from_list(ids, cur): | ||||
yield converters.db_to_ctags(dict(zip(db.content_ctags_cols, c))) | yield converters.db_to_ctags(dict(zip(db.content_ctags_cols, c))) | ||||
@db_transaction() | @db_transaction() | ||||
def content_ctags_add(self, ctags, conflict_update=False, db=None, | def content_ctags_add(self, ctags, conflict_update=False, db=None, | ||||
cur=None): | cur=None): | ||||
_check_id_duplicates(ctags) | check_id_duplicates(ctags) | ||||
ctags.sort(key=lambda m: m['id']) | ctags.sort(key=lambda m: m['id']) | ||||
def _convert_ctags(__ctags): | def _convert_ctags(__ctags): | ||||
"""Convert ctags dict to list of ctags. | """Convert ctags dict to list of ctags. | ||||
""" | """ | ||||
for ctags in __ctags: | for ctags in __ctags: | ||||
yield from converters.ctags_to_db(ctags) | yield from converters.ctags_to_db(ctags) | ||||
Show All 24 Lines | def content_fossology_license_get(self, ids, db=None, cur=None): | ||||
d[id_].append(converters.db_to_fossology_license(license)) | d[id_].append(converters.db_to_fossology_license(license)) | ||||
for id_, facts in d.items(): | for id_, facts in d.items(): | ||||
yield {id_: facts} | yield {id_: facts} | ||||
@db_transaction() | @db_transaction() | ||||
def content_fossology_license_add(self, licenses, conflict_update=False, | def content_fossology_license_add(self, licenses, conflict_update=False, | ||||
db=None, cur=None): | db=None, cur=None): | ||||
_check_id_duplicates(licenses) | check_id_duplicates(licenses) | ||||
licenses.sort(key=lambda m: m['id']) | licenses.sort(key=lambda m: m['id']) | ||||
db.mktemp_content_fossology_license(cur) | db.mktemp_content_fossology_license(cur) | ||||
db.copy_to( | db.copy_to( | ||||
({ | ({ | ||||
'id': sha1['id'], | 'id': sha1['id'], | ||||
'indexer_configuration_id': sha1['indexer_configuration_id'], | 'indexer_configuration_id': sha1['indexer_configuration_id'], | ||||
'license': license, | 'license': license, | ||||
} for sha1 in licenses | } for sha1 in licenses | ||||
Show All 20 Lines | class IndexerStorage: | ||||
def content_metadata_get(self, ids, db=None, cur=None): | def content_metadata_get(self, ids, db=None, cur=None): | ||||
for c in db.content_metadata_get_from_list(ids, cur): | for c in db.content_metadata_get_from_list(ids, cur): | ||||
yield converters.db_to_metadata( | yield converters.db_to_metadata( | ||||
dict(zip(db.content_metadata_cols, c))) | dict(zip(db.content_metadata_cols, c))) | ||||
@db_transaction() | @db_transaction() | ||||
def content_metadata_add(self, metadata, conflict_update=False, db=None, | def content_metadata_add(self, metadata, conflict_update=False, db=None, | ||||
cur=None): | cur=None): | ||||
_check_id_duplicates(metadata) | check_id_duplicates(metadata) | ||||
metadata.sort(key=lambda m: m['id']) | metadata.sort(key=lambda m: m['id']) | ||||
db.mktemp_content_metadata(cur) | db.mktemp_content_metadata(cur) | ||||
db.copy_to(metadata, 'tmp_content_metadata', | db.copy_to(metadata, 'tmp_content_metadata', | ||||
['id', 'metadata', 'indexer_configuration_id'], | ['id', 'metadata', 'indexer_configuration_id'], | ||||
cur) | cur) | ||||
db.content_metadata_add_from_temp(conflict_update, cur) | db.content_metadata_add_from_temp(conflict_update, cur) | ||||
@db_transaction_generator() | @db_transaction_generator() | ||||
def revision_intrinsic_metadata_missing(self, metadata, db=None, cur=None): | def revision_intrinsic_metadata_missing(self, metadata, db=None, cur=None): | ||||
for obj in db.revision_intrinsic_metadata_missing_from_list( | for obj in db.revision_intrinsic_metadata_missing_from_list( | ||||
metadata, cur): | metadata, cur): | ||||
yield obj[0] | yield obj[0] | ||||
@db_transaction_generator() | @db_transaction_generator() | ||||
def revision_intrinsic_metadata_get(self, ids, db=None, cur=None): | def revision_intrinsic_metadata_get(self, ids, db=None, cur=None): | ||||
for c in db.revision_intrinsic_metadata_get_from_list(ids, cur): | for c in db.revision_intrinsic_metadata_get_from_list(ids, cur): | ||||
yield converters.db_to_metadata( | yield converters.db_to_metadata( | ||||
dict(zip(db.revision_intrinsic_metadata_cols, c))) | dict(zip(db.revision_intrinsic_metadata_cols, c))) | ||||
@db_transaction() | @db_transaction() | ||||
def revision_intrinsic_metadata_add(self, metadata, conflict_update=False, | def revision_intrinsic_metadata_add(self, metadata, conflict_update=False, | ||||
db=None, cur=None): | db=None, cur=None): | ||||
_check_id_duplicates(metadata) | check_id_duplicates(metadata) | ||||
metadata.sort(key=lambda m: m['id']) | metadata.sort(key=lambda m: m['id']) | ||||
db.mktemp_revision_intrinsic_metadata(cur) | db.mktemp_revision_intrinsic_metadata(cur) | ||||
db.copy_to(metadata, 'tmp_revision_intrinsic_metadata', | db.copy_to(metadata, 'tmp_revision_intrinsic_metadata', | ||||
['id', 'metadata', 'mappings', | ['id', 'metadata', 'mappings', | ||||
'indexer_configuration_id'], | 'indexer_configuration_id'], | ||||
cur) | cur) | ||||
db.revision_intrinsic_metadata_add_from_temp(conflict_update, cur) | db.revision_intrinsic_metadata_add_from_temp(conflict_update, cur) | ||||
@db_transaction() | @db_transaction() | ||||
def revision_intrinsic_metadata_delete(self, entries, db=None, cur=None): | def revision_intrinsic_metadata_delete(self, entries, db=None, cur=None): | ||||
db.revision_intrinsic_metadata_delete(entries, cur) | db.revision_intrinsic_metadata_delete(entries, cur) | ||||
@db_transaction_generator() | @db_transaction_generator() | ||||
def origin_intrinsic_metadata_get(self, ids, db=None, cur=None): | def origin_intrinsic_metadata_get(self, ids, db=None, cur=None): | ||||
for c in db.origin_intrinsic_metadata_get_from_list(ids, cur): | for c in db.origin_intrinsic_metadata_get_from_list(ids, cur): | ||||
yield converters.db_to_metadata( | yield converters.db_to_metadata( | ||||
dict(zip(db.origin_intrinsic_metadata_cols, c))) | dict(zip(db.origin_intrinsic_metadata_cols, c))) | ||||
@db_transaction() | @db_transaction() | ||||
def origin_intrinsic_metadata_add(self, metadata, | def origin_intrinsic_metadata_add(self, metadata, | ||||
conflict_update=False, db=None, | conflict_update=False, db=None, | ||||
cur=None): | cur=None): | ||||
_check_id_duplicates(metadata) | check_id_duplicates(metadata) | ||||
metadata.sort(key=lambda m: m['id']) | metadata.sort(key=lambda m: m['id']) | ||||
db.mktemp_origin_intrinsic_metadata(cur) | db.mktemp_origin_intrinsic_metadata(cur) | ||||
db.copy_to(metadata, 'tmp_origin_intrinsic_metadata', | db.copy_to(metadata, 'tmp_origin_intrinsic_metadata', | ||||
['id', 'metadata', | ['id', 'metadata', | ||||
'indexer_configuration_id', | 'indexer_configuration_id', | ||||
'from_revision', 'mappings'], | 'from_revision', 'mappings'], | ||||
▲ Show 20 Lines • Show All 92 Lines • Show Last 20 Lines |