diff --git a/swh/indexer/storage/__init__.py b/swh/indexer/storage/__init__.py --- a/swh/indexer/storage/__init__.py +++ b/swh/indexer/storage/__init__.py @@ -12,6 +12,7 @@ from swh.core.api import remote_api_endpoint from swh.storage.common import db_transaction_generator, db_transaction from swh.storage.exc import StorageDBError +from swh.storage.journal_writer import get_journal_writer from .db import Db from . import converters @@ -81,7 +82,8 @@ """SWH Indexer Storage """ - def __init__(self, db, min_pool_conns=1, max_pool_conns=10): + def __init__(self, db, min_pool_conns=1, max_pool_conns=10, + journal_writer=None): """ Args: db_conn: either a libpq connection string, or a psycopg2 connection @@ -99,6 +101,11 @@ except psycopg2.OperationalError as e: raise StorageDBError(e) + if journal_writer: + self.journal_writer = get_journal_writer(**journal_writer) + else: + self.journal_writer = None + def get_db(self): if self._db: return self._db @@ -730,6 +737,24 @@ """ _check_id_duplicates(metadata) + + if self.journal_writer: + journal_metadata = [] + for item in metadata: + item = item.copy() + tool = db.indexer_configuration_get_from_id( + item.pop('indexer_configuration_id')) + tool = dict(zip(db.indexer_configuration_cols, tool)) + item['tool'] = { + 'id': tool['id'], + 'version': tool['tool_version'], + 'name': tool['tool_name'], + 'configuration': tool['tool_configuration'], + } + journal_metadata.append(item) + self.journal_writer.write_additions( + 'origin_intrinsic_metadata', journal_metadata) + metadata.sort(key=lambda m: m['id']) db.mktemp_origin_intrinsic_metadata(cur) diff --git a/swh/indexer/storage/db.py b/swh/indexer/storage/db.py --- a/swh/indexer/storage/db.py +++ b/swh/indexer/storage/db.py @@ -457,3 +457,13 @@ (tool_name, tool_version, tool_configuration)) return cur.fetchone() + + def indexer_configuration_get_from_id(self, id_, cur=None): + cur = self._cursor(cur) + cur.execute('''select %s + from indexer_configuration + where id=%%s''' % ( + ','.join(self.indexer_configuration_cols)), + (id_,)) + + return cur.fetchone() diff --git a/swh/indexer/storage/in_memory.py b/swh/indexer/storage/in_memory.py --- a/swh/indexer/storage/in_memory.py +++ b/swh/indexer/storage/in_memory.py @@ -11,6 +11,8 @@ import math import re +from swh.storage.journal_writer import get_journal_writer + from . import MAPPING_NAMES SHA1_DIGEST_SIZE = 160 @@ -189,7 +191,7 @@ class IndexerStorage: """In-memory SWH indexer storage.""" - def __init__(self): + def __init__(self, journal_writer=None): self._tools = {} self._mimetypes = SubStorage(self._tools) self._languages = SubStorage(self._tools) @@ -199,6 +201,11 @@ self._revision_intrinsic_metadata = SubStorage(self._tools) self._origin_intrinsic_metadata = SubStorage(self._tools) + if journal_writer: + self.journal_writer = get_journal_writer(**journal_writer) + else: + self.journal_writer = None + def content_mimetype_missing(self, mimetypes): """Generate mimetypes missing from storage. @@ -643,6 +650,15 @@ or skip duplicates (false, the default) """ + if self.journal_writer: + journal_metadata = [] + for item in metadata: + item = item.copy() + item['tool'] = _transform_tool( + self._tools[item.pop('indexer_configuration_id')]) + journal_metadata.append(item) + self.journal_writer.write_additions( + 'origin_intrinsic_metadata', journal_metadata) self._origin_intrinsic_metadata.add(metadata, conflict_update) def origin_intrinsic_metadata_delete(self, entries): diff --git a/swh/indexer/tests/storage/test_api_client.py b/swh/indexer/tests/storage/test_api_client.py --- a/swh/indexer/tests/storage/test_api_client.py +++ b/swh/indexer/tests/storage/test_api_client.py @@ -6,9 +6,12 @@ import unittest from swh.core.api.tests.server_testing import ServerTestFixture +import swh.indexer.storage as storage from swh.indexer.storage import INDEXER_CFG_KEY from swh.indexer.storage.api.client import RemoteStorage from swh.indexer.storage.api.server import app +from swh.storage.journal_writer import \ + get_journal_writer, InMemoryJournalWriter from .test_storage import CommonTestStorage, BasePgTestStorage @@ -25,14 +28,28 @@ """ def setUp(self): + def mock_get_journal_writer(cls, args=None): + assert cls == 'inmemory' + return journal_writer + storage.get_journal_writer = mock_get_journal_writer + journal_writer = InMemoryJournalWriter() + self.journal_writer = journal_writer + self.config = { INDEXER_CFG_KEY: { 'cls': 'local', 'args': { 'db': 'dbname=%s' % self.TEST_DB_NAME, + 'journal_writer': { + 'cls': 'inmemory', + } } } } self.app = app super().setUp() self.storage = RemoteStorage(self.url()) + + def tearDown(self): + super().tearDown() + storage.get_journal_writer = get_journal_writer diff --git a/swh/indexer/tests/storage/test_in_memory.py b/swh/indexer/tests/storage/test_in_memory.py --- a/swh/indexer/tests/storage/test_in_memory.py +++ b/swh/indexer/tests/storage/test_in_memory.py @@ -8,9 +8,13 @@ self.storage_config = { 'cls': 'memory', 'args': { + 'journal_writer': { + 'cls': 'inmemory', + }, }, } super().setUp() + self.journal_writer = self.storage.journal_writer def reset_storage_tables(self): self.storage = self.storage.__class__() diff --git a/swh/indexer/tests/storage/test_storage.py b/swh/indexer/tests/storage/test_storage.py --- a/swh/indexer/tests/storage/test_storage.py +++ b/swh/indexer/tests/storage/test_storage.py @@ -90,6 +90,9 @@ 'cls': 'local', 'args': { 'db': 'dbname=%s' % self.TEST_DB_NAME, + 'journal_writer': { + 'cls': 'inmemory', + }, }, } @@ -987,7 +990,7 @@ }, ) - def test_origin_intrinsic_metadata_get(self): + def test_origin_intrinsic_metadata_add(self): # given tool_id = self.tools['swh-metadata-detector']['id'] @@ -1018,16 +1021,19 @@ actual_metadata = list(self.storage.origin_intrinsic_metadata_get( [self.origin_id_1, 42])) - expected_metadata = [{ + expected_metadata = { 'id': self.origin_id_1, 'origin_url': 'file:///dev/zero', 'metadata': metadata, 'tool': self.tools['swh-metadata-detector'], 'from_revision': self.revision_id_2, 'mappings': ['mapping1'], - }] + } - self.assertEqual(actual_metadata, expected_metadata) + self.maxDiff = None + self.assertEqual(actual_metadata, [expected_metadata]) + self.assertEqual(list(self.journal_writer.objects), + [('origin_intrinsic_metadata', expected_metadata)]) def test_origin_intrinsic_metadata_delete(self): # given @@ -1987,7 +1993,9 @@ class. """ - pass + def setUp(self): + super().setUp() + self.journal_writer = self.storage.journal_writer def test_mapping_names():