Page Menu
Home
Software Heritage
Search
Configure Global Search
Log In
Files
F7147908
D1959.id6592.diff
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
8 KB
Subscribers
None
D1959.id6592.diff
View Options
diff --git a/swh/indexer/storage/__init__.py b/swh/indexer/storage/__init__.py
--- a/swh/indexer/storage/__init__.py
+++ b/swh/indexer/storage/__init__.py
@@ -12,6 +12,7 @@
from swh.core.api import remote_api_endpoint
from swh.storage.common import db_transaction_generator, db_transaction
from swh.storage.exc import StorageDBError
+from swh.storage.journal_writer import get_journal_writer
from .db import Db
from . import converters
@@ -81,7 +82,8 @@
"""SWH Indexer Storage
"""
- def __init__(self, db, min_pool_conns=1, max_pool_conns=10):
+ def __init__(self, db, min_pool_conns=1, max_pool_conns=10,
+ journal_writer=None):
"""
Args:
db_conn: either a libpq connection string, or a psycopg2 connection
@@ -99,6 +101,11 @@
except psycopg2.OperationalError as e:
raise StorageDBError(e)
+ if journal_writer:
+ self.journal_writer = get_journal_writer(**journal_writer)
+ else:
+ self.journal_writer = None
+
def get_db(self):
if self._db:
return self._db
@@ -730,6 +737,24 @@
"""
_check_id_duplicates(metadata)
+
+ if self.journal_writer:
+ journal_metadata = []
+ for item in metadata:
+ item = item.copy()
+ tool = db.indexer_configuration_get_from_id(
+ item.pop('indexer_configuration_id'))
+ tool = dict(zip(db.indexer_configuration_cols, tool))
+ item['tool'] = {
+ 'id': tool['id'],
+ 'version': tool['tool_version'],
+ 'name': tool['tool_name'],
+ 'configuration': tool['tool_configuration'],
+ }
+ journal_metadata.append(item)
+ self.journal_writer.write_additions(
+ 'origin_intrinsic_metadata', journal_metadata)
+
metadata.sort(key=lambda m: m['id'])
db.mktemp_origin_intrinsic_metadata(cur)
diff --git a/swh/indexer/storage/db.py b/swh/indexer/storage/db.py
--- a/swh/indexer/storage/db.py
+++ b/swh/indexer/storage/db.py
@@ -457,3 +457,13 @@
(tool_name, tool_version, tool_configuration))
return cur.fetchone()
+
+ def indexer_configuration_get_from_id(self, id_, cur=None):
+ cur = self._cursor(cur)
+ cur.execute('''select %s
+ from indexer_configuration
+ where id=%%s''' % (
+ ','.join(self.indexer_configuration_cols)),
+ (id_,))
+
+ return cur.fetchone()
diff --git a/swh/indexer/storage/in_memory.py b/swh/indexer/storage/in_memory.py
--- a/swh/indexer/storage/in_memory.py
+++ b/swh/indexer/storage/in_memory.py
@@ -11,6 +11,8 @@
import math
import re
+from swh.storage.journal_writer import get_journal_writer
+
from . import MAPPING_NAMES
SHA1_DIGEST_SIZE = 160
@@ -189,7 +191,7 @@
class IndexerStorage:
"""In-memory SWH indexer storage."""
- def __init__(self):
+ def __init__(self, journal_writer=None):
self._tools = {}
self._mimetypes = SubStorage(self._tools)
self._languages = SubStorage(self._tools)
@@ -199,6 +201,11 @@
self._revision_intrinsic_metadata = SubStorage(self._tools)
self._origin_intrinsic_metadata = SubStorage(self._tools)
+ if journal_writer:
+ self.journal_writer = get_journal_writer(**journal_writer)
+ else:
+ self.journal_writer = None
+
def content_mimetype_missing(self, mimetypes):
"""Generate mimetypes missing from storage.
@@ -643,6 +650,15 @@
or skip duplicates (false, the default)
"""
+ if self.journal_writer:
+ journal_metadata = []
+ for item in metadata:
+ item = item.copy()
+ item['tool'] = _transform_tool(
+ self._tools[item.pop('indexer_configuration_id')])
+ journal_metadata.append(item)
+ self.journal_writer.write_additions(
+ 'origin_intrinsic_metadata', journal_metadata)
self._origin_intrinsic_metadata.add(metadata, conflict_update)
def origin_intrinsic_metadata_delete(self, entries):
diff --git a/swh/indexer/tests/storage/test_api_client.py b/swh/indexer/tests/storage/test_api_client.py
--- a/swh/indexer/tests/storage/test_api_client.py
+++ b/swh/indexer/tests/storage/test_api_client.py
@@ -6,9 +6,12 @@
import unittest
from swh.core.api.tests.server_testing import ServerTestFixture
+import swh.indexer.storage as storage
from swh.indexer.storage import INDEXER_CFG_KEY
from swh.indexer.storage.api.client import RemoteStorage
from swh.indexer.storage.api.server import app
+from swh.storage.journal_writer import \
+ get_journal_writer, InMemoryJournalWriter
from .test_storage import CommonTestStorage, BasePgTestStorage
@@ -25,14 +28,28 @@
"""
def setUp(self):
+ def mock_get_journal_writer(cls, args=None):
+ assert cls == 'inmemory'
+ return journal_writer
+ storage.get_journal_writer = mock_get_journal_writer
+ journal_writer = InMemoryJournalWriter()
+ self.journal_writer = journal_writer
+
self.config = {
INDEXER_CFG_KEY: {
'cls': 'local',
'args': {
'db': 'dbname=%s' % self.TEST_DB_NAME,
+ 'journal_writer': {
+ 'cls': 'inmemory',
+ }
}
}
}
self.app = app
super().setUp()
self.storage = RemoteStorage(self.url())
+
+ def tearDown(self):
+ super().tearDown()
+ storage.get_journal_writer = get_journal_writer
diff --git a/swh/indexer/tests/storage/test_in_memory.py b/swh/indexer/tests/storage/test_in_memory.py
--- a/swh/indexer/tests/storage/test_in_memory.py
+++ b/swh/indexer/tests/storage/test_in_memory.py
@@ -8,9 +8,13 @@
self.storage_config = {
'cls': 'memory',
'args': {
+ 'journal_writer': {
+ 'cls': 'inmemory',
+ },
},
}
super().setUp()
+ self.journal_writer = self.storage.journal_writer
def reset_storage_tables(self):
self.storage = self.storage.__class__()
diff --git a/swh/indexer/tests/storage/test_storage.py b/swh/indexer/tests/storage/test_storage.py
--- a/swh/indexer/tests/storage/test_storage.py
+++ b/swh/indexer/tests/storage/test_storage.py
@@ -90,6 +90,9 @@
'cls': 'local',
'args': {
'db': 'dbname=%s' % self.TEST_DB_NAME,
+ 'journal_writer': {
+ 'cls': 'inmemory',
+ },
},
}
@@ -987,7 +990,7 @@
},
)
- def test_origin_intrinsic_metadata_get(self):
+ def test_origin_intrinsic_metadata_add(self):
# given
tool_id = self.tools['swh-metadata-detector']['id']
@@ -1018,16 +1021,19 @@
actual_metadata = list(self.storage.origin_intrinsic_metadata_get(
[self.origin_id_1, 42]))
- expected_metadata = [{
+ expected_metadata = {
'id': self.origin_id_1,
'origin_url': 'file:///dev/zero',
'metadata': metadata,
'tool': self.tools['swh-metadata-detector'],
'from_revision': self.revision_id_2,
'mappings': ['mapping1'],
- }]
+ }
- self.assertEqual(actual_metadata, expected_metadata)
+ self.maxDiff = None
+ self.assertEqual(actual_metadata, [expected_metadata])
+ self.assertEqual(list(self.journal_writer.objects),
+ [('origin_intrinsic_metadata', expected_metadata)])
def test_origin_intrinsic_metadata_delete(self):
# given
@@ -1987,7 +1993,9 @@
class.
"""
- pass
+ def setUp(self):
+ super().setUp()
+ self.journal_writer = self.storage.journal_writer
def test_mapping_names():
File Metadata
Details
Attached
Mime Type
text/plain
Expires
Thu, Jan 23, 2:53 AM (20 h, 25 m)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3222546
Attached To
D1959: Publish origin_intrinsic_metadata to Kafka.
Event Timeline
Log In to Comment