Changeset View
Changeset View
Standalone View
Standalone View
swh/indexer/storage/__init__.py
# Copyright (C) 2015-2020 The Software Heritage developers | # Copyright (C) 2015-2022 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
from collections import Counter | from collections import Counter | ||||
from importlib import import_module | from importlib import import_module | ||||
import json | import json | ||||
from typing import Dict, Iterable, List, Optional, Tuple, Union | from typing import Dict, Iterable, List, Optional, Tuple, Union | ||||
Show All 15 Lines | |||||
from .interface import PagedResult, Sha1 | from .interface import PagedResult, Sha1 | ||||
from .metrics import process_metrics, send_metric, timed | from .metrics import process_metrics, send_metric, timed | ||||
from .model import ( | from .model import ( | ||||
ContentCtagsRow, | ContentCtagsRow, | ||||
ContentLanguageRow, | ContentLanguageRow, | ||||
ContentLicenseRow, | ContentLicenseRow, | ||||
ContentMetadataRow, | ContentMetadataRow, | ||||
ContentMimetypeRow, | ContentMimetypeRow, | ||||
DirectoryIntrinsicMetadataRow, | |||||
OriginIntrinsicMetadataRow, | OriginIntrinsicMetadataRow, | ||||
RevisionIntrinsicMetadataRow, | |||||
) | ) | ||||
from .writer import JournalWriter | from .writer import JournalWriter | ||||
INDEXER_CFG_KEY = "indexer_storage" | INDEXER_CFG_KEY = "indexer_storage" | ||||
MAPPING_NAMES = ["cff", "codemeta", "gemspec", "maven", "npm", "pkg-info"] | MAPPING_NAMES = ["cff", "codemeta", "gemspec", "maven", "npm", "pkg-info"] | ||||
▲ Show 20 Lines • Show All 72 Lines • ▼ Show 20 Lines | def check_id_duplicates(data): | ||||
""" # noqa | """ # noqa | ||||
counter = Counter(tuple(sorted(item.unique_key().items())) for item in data) | counter = Counter(tuple(sorted(item.unique_key().items())) for item in data) | ||||
duplicates = [id_ for (id_, count) in counter.items() if count >= 2] | duplicates = [id_ for (id_, count) in counter.items() if count >= 2] | ||||
if duplicates: | if duplicates: | ||||
raise DuplicateId(list(map(dict, duplicates))) | raise DuplicateId(list(map(dict, duplicates))) | ||||
class IndexerStorage: | class IndexerStorage: | ||||
"""SWH Indexer Storage""" | """SWH Indexer Storage Datastore""" | ||||
current_version = 134 | |||||
def __init__(self, db, min_pool_conns=1, max_pool_conns=10, journal_writer=None): | def __init__(self, db, min_pool_conns=1, max_pool_conns=10, journal_writer=None): | ||||
""" | """ | ||||
Args: | Args: | ||||
db: either a libpq connection string, or a psycopg2 connection | db: either a libpq connection string, or a psycopg2 connection | ||||
journal_writer: configuration passed to | journal_writer: configuration passed to | ||||
`swh.journal.writer.get_journal_writer` | `swh.journal.writer.get_journal_writer` | ||||
Show All 15 Lines | def get_db(self): | ||||
if self._db: | if self._db: | ||||
return self._db | return self._db | ||||
return Db.from_pool(self._pool) | return Db.from_pool(self._pool) | ||||
def put_db(self, db): | def put_db(self, db): | ||||
if db is not self._db: | if db is not self._db: | ||||
db.put_conn() | db.put_conn() | ||||
@db_transaction() | |||||
def get_current_version(self, *, db=None, cur=None): | |||||
return db.current_version | |||||
@timed | @timed | ||||
@db_transaction() | @db_transaction() | ||||
def check_config(self, *, check_write, db=None, cur=None): | def check_config(self, *, check_write, db=None, cur=None): | ||||
# Check permissions on one of the tables | # Check permissions on one of the tables | ||||
if check_write: | if check_write: | ||||
check = "INSERT" | check = "INSERT" | ||||
else: | else: | ||||
check = "SELECT" | check = "SELECT" | ||||
▲ Show 20 Lines • Show All 350 Lines • ▼ Show 20 Lines | ) -> Dict[str, int]: | ||||
) | ) | ||||
count = db.content_metadata_add_from_temp(cur) | count = db.content_metadata_add_from_temp(cur) | ||||
return { | return { | ||||
"content_metadata:add": count, | "content_metadata:add": count, | ||||
} | } | ||||
@timed | @timed | ||||
@db_transaction() | @db_transaction() | ||||
def revision_intrinsic_metadata_missing( | def directory_intrinsic_metadata_missing( | ||||
self, metadata: Iterable[Dict], db=None, cur=None | self, metadata: Iterable[Dict], db=None, cur=None | ||||
) -> List[Tuple[Sha1, int]]: | ) -> List[Tuple[Sha1, int]]: | ||||
return [ | return [ | ||||
obj[0] | obj[0] | ||||
for obj in db.revision_intrinsic_metadata_missing_from_list(metadata, cur) | for obj in db.directory_intrinsic_metadata_missing_from_list(metadata, cur) | ||||
] | ] | ||||
@timed | @timed | ||||
@db_transaction() | @db_transaction() | ||||
def revision_intrinsic_metadata_get( | def directory_intrinsic_metadata_get( | ||||
self, ids: Iterable[Sha1], db=None, cur=None | self, ids: Iterable[Sha1], db=None, cur=None | ||||
) -> List[RevisionIntrinsicMetadataRow]: | ) -> List[DirectoryIntrinsicMetadataRow]: | ||||
return [ | return [ | ||||
RevisionIntrinsicMetadataRow.from_dict( | DirectoryIntrinsicMetadataRow.from_dict( | ||||
converters.db_to_metadata( | converters.db_to_metadata( | ||||
dict(zip(db.revision_intrinsic_metadata_cols, c)) | dict(zip(db.directory_intrinsic_metadata_cols, c)) | ||||
) | ) | ||||
) | ) | ||||
for c in db.revision_intrinsic_metadata_get_from_list(ids, cur) | for c in db.directory_intrinsic_metadata_get_from_list(ids, cur) | ||||
] | ] | ||||
@timed | @timed | ||||
@process_metrics | @process_metrics | ||||
@db_transaction() | @db_transaction() | ||||
def revision_intrinsic_metadata_add( | def directory_intrinsic_metadata_add( | ||||
self, | self, | ||||
metadata: List[RevisionIntrinsicMetadataRow], | metadata: List[DirectoryIntrinsicMetadataRow], | ||||
db=None, | db=None, | ||||
cur=None, | cur=None, | ||||
) -> Dict[str, int]: | ) -> Dict[str, int]: | ||||
check_id_duplicates(metadata) | check_id_duplicates(metadata) | ||||
metadata.sort(key=lambda m: m.id) | metadata.sort(key=lambda m: m.id) | ||||
self.journal_writer.write_additions("revision_intrinsic_metadata", metadata) | self.journal_writer.write_additions("directory_intrinsic_metadata", metadata) | ||||
db.mktemp_revision_intrinsic_metadata(cur) | db.mktemp_directory_intrinsic_metadata(cur) | ||||
db.copy_to( | db.copy_to( | ||||
[m.to_dict() for m in metadata], | [m.to_dict() for m in metadata], | ||||
"tmp_revision_intrinsic_metadata", | "tmp_directory_intrinsic_metadata", | ||||
["id", "metadata", "mappings", "indexer_configuration_id"], | ["id", "metadata", "mappings", "indexer_configuration_id"], | ||||
cur, | cur, | ||||
) | ) | ||||
count = db.revision_intrinsic_metadata_add_from_temp(cur) | count = db.directory_intrinsic_metadata_add_from_temp(cur) | ||||
return { | return { | ||||
"revision_intrinsic_metadata:add": count, | "directory_intrinsic_metadata:add": count, | ||||
} | } | ||||
@timed | @timed | ||||
@db_transaction() | @db_transaction() | ||||
def origin_intrinsic_metadata_get( | def origin_intrinsic_metadata_get( | ||||
self, urls: Iterable[str], db=None, cur=None | self, urls: Iterable[str], db=None, cur=None | ||||
) -> List[OriginIntrinsicMetadataRow]: | ) -> List[OriginIntrinsicMetadataRow]: | ||||
return [ | return [ | ||||
Show All 18 Lines | ) -> Dict[str, int]: | ||||
metadata.sort(key=lambda m: m.id) | metadata.sort(key=lambda m: m.id) | ||||
self.journal_writer.write_additions("origin_intrinsic_metadata", metadata) | self.journal_writer.write_additions("origin_intrinsic_metadata", metadata) | ||||
db.mktemp_origin_intrinsic_metadata(cur) | db.mktemp_origin_intrinsic_metadata(cur) | ||||
db.copy_to( | db.copy_to( | ||||
[m.to_dict() for m in metadata], | [m.to_dict() for m in metadata], | ||||
"tmp_origin_intrinsic_metadata", | "tmp_origin_intrinsic_metadata", | ||||
["id", "metadata", "indexer_configuration_id", "from_revision", "mappings"], | [ | ||||
"id", | |||||
"metadata", | |||||
"indexer_configuration_id", | |||||
"from_directory", | |||||
"mappings", | |||||
], | |||||
cur, | cur, | ||||
) | ) | ||||
count = db.origin_intrinsic_metadata_add_from_temp(cur) | count = db.origin_intrinsic_metadata_add_from_temp(cur) | ||||
return { | return { | ||||
"origin_intrinsic_metadata:add": count, | "origin_intrinsic_metadata:add": count, | ||||
} | } | ||||
@timed | @timed | ||||
▲ Show 20 Lines • Show All 138 Lines • Show Last 20 Lines |