Page MenuHomeSoftware Heritage

No OneTemporary

diff --git a/PKG-INFO b/PKG-INFO
index 87edb52..7c3d5af 100644
--- a/PKG-INFO
+++ b/PKG-INFO
@@ -1,69 +1,69 @@
Metadata-Version: 2.1
Name: swh.indexer
-Version: 0.0.168
+Version: 0.0.169
Summary: Software Heritage Content Indexer
Home-page: https://forge.softwareheritage.org/diffusion/78/
Author: Software Heritage developers
Author-email: swh-devel@inria.fr
License: UNKNOWN
Project-URL: Bug Reports, https://forge.softwareheritage.org/maniphest
Project-URL: Funding, https://www.softwareheritage.org/donate
Project-URL: Source, https://forge.softwareheritage.org/source/swh-indexer
Description: swh-indexer
============
Tools to compute multiple indexes on SWH's raw contents:
- content:
- mimetype
- ctags
- language
- fossology-license
- metadata
- revision:
- metadata
An indexer is in charge of:
- looking up objects
- extracting information from those objects
- store those information in the swh-indexer db
There are multiple indexers working on different object types:
- content indexer: works with content sha1 hashes
- revision indexer: works with revision sha1 hashes
- origin indexer: works with origin identifiers
Indexation procedure:
- receive batch of ids
- retrieve the associated data depending on object type
- compute for that object some index
- store the result to swh's storage
Current content indexers:
- mimetype (queue swh_indexer_content_mimetype): detect the encoding
and mimetype
- language (queue swh_indexer_content_language): detect the
programming language
- ctags (queue swh_indexer_content_ctags): compute tags information
- fossology-license (queue swh_indexer_fossology_license): compute the
license
- metadata: translate file into translated_metadata dict
Current revision indexers:
- metadata: detects files containing metadata and retrieves translated_metadata
in content_metadata table in storage or run content indexer to translate
files.
Platform: UNKNOWN
Classifier: Programming Language :: Python :: 3
Classifier: Intended Audience :: Developers
Classifier: License :: OSI Approved :: GNU General Public License v3 (GPLv3)
Classifier: Operating System :: OS Independent
Classifier: Development Status :: 5 - Production/Stable
Description-Content-Type: text/markdown
Provides-Extra: testing
diff --git a/swh.indexer.egg-info/PKG-INFO b/swh.indexer.egg-info/PKG-INFO
index 87edb52..7c3d5af 100644
--- a/swh.indexer.egg-info/PKG-INFO
+++ b/swh.indexer.egg-info/PKG-INFO
@@ -1,69 +1,69 @@
Metadata-Version: 2.1
Name: swh.indexer
-Version: 0.0.168
+Version: 0.0.169
Summary: Software Heritage Content Indexer
Home-page: https://forge.softwareheritage.org/diffusion/78/
Author: Software Heritage developers
Author-email: swh-devel@inria.fr
License: UNKNOWN
Project-URL: Bug Reports, https://forge.softwareheritage.org/maniphest
Project-URL: Funding, https://www.softwareheritage.org/donate
Project-URL: Source, https://forge.softwareheritage.org/source/swh-indexer
Description: swh-indexer
============
Tools to compute multiple indexes on SWH's raw contents:
- content:
- mimetype
- ctags
- language
- fossology-license
- metadata
- revision:
- metadata
An indexer is in charge of:
- looking up objects
- extracting information from those objects
- store those information in the swh-indexer db
There are multiple indexers working on different object types:
- content indexer: works with content sha1 hashes
- revision indexer: works with revision sha1 hashes
- origin indexer: works with origin identifiers
Indexation procedure:
- receive batch of ids
- retrieve the associated data depending on object type
- compute for that object some index
- store the result to swh's storage
Current content indexers:
- mimetype (queue swh_indexer_content_mimetype): detect the encoding
and mimetype
- language (queue swh_indexer_content_language): detect the
programming language
- ctags (queue swh_indexer_content_ctags): compute tags information
- fossology-license (queue swh_indexer_fossology_license): compute the
license
- metadata: translate file into translated_metadata dict
Current revision indexers:
- metadata: detects files containing metadata and retrieves translated_metadata
in content_metadata table in storage or run content indexer to translate
files.
Platform: UNKNOWN
Classifier: Programming Language :: Python :: 3
Classifier: Intended Audience :: Developers
Classifier: License :: OSI Approved :: GNU General Public License v3 (GPLv3)
Classifier: Operating System :: OS Independent
Classifier: Development Status :: 5 - Production/Stable
Description-Content-Type: text/markdown
Provides-Extra: testing
diff --git a/swh/indexer/mimetype.py b/swh/indexer/mimetype.py
index 7d47c23..152e20e 100644
--- a/swh/indexer/mimetype.py
+++ b/swh/indexer/mimetype.py
@@ -1,157 +1,157 @@
# Copyright (C) 2016-2020 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
from typing import Optional, Dict, Any, List
import magic
from .indexer import ContentIndexer, ContentRangeIndexer
if not hasattr(magic.Magic, 'from_buffer'):
raise ImportError(
'Expected "import magic" to import python-magic, but file_magic '
'was imported instead.')
def compute_mimetype_encoding(raw_content: bytes) -> Dict[str, bytes]:
"""Determine mimetype and encoding from the raw content.
Args:
raw_content: content's raw data
Returns:
dict: mimetype and encoding key and corresponding values.
"""
m = magic.Magic(mime=True, mime_encoding=True)
res = m.from_buffer(raw_content)
try:
mimetype, encoding = res.split('; charset=')
except ValueError:
mimetype, encoding = res, ''
return {
'mimetype': mimetype,
'encoding': encoding,
}
class MixinMimetypeIndexer:
"""Mixin mimetype indexer.
See :class:`MimetypeIndexer` and :class:`MimetypeRangeIndexer`
"""
- tool: Dict[str, Any]
+ tool: Any
idx_storage: Any
ADDITIONAL_CONFIG = {
'tools': ('dict', {
'name': 'file',
'version': '1:5.30-1+deb9u1',
'configuration': {
"type": "library",
"debian-package": "python3-magic"
},
}),
'write_batch_size': ('int', 1000),
}
CONFIG_BASE_FILENAME = 'indexer/mimetype' # type: Optional[str]
def index(self, id: bytes, data: Optional[bytes] = None,
**kwargs) -> Dict[str, Any]:
"""Index sha1s' content and store result.
Args:
id: content's identifier
data: raw content in bytes
Returns:
dict: content's mimetype; dict keys being
- id: content's identifier (sha1)
- mimetype: mimetype in bytes
- encoding: encoding in bytes
"""
assert data is not None
properties = compute_mimetype_encoding(data)
properties.update({
'id': id,
'indexer_configuration_id': self.tool['id'],
})
return properties
def persist_index_computations(
self, results: List[Dict], policy_update: str
) -> Dict[str, int]:
"""Persist the results in storage.
Args:
results: list of content's mimetype dicts
(see :meth:`.index`)
policy_update: either 'update-dups' or 'ignore-dups' to
respectively update duplicates or ignore them
"""
return self.idx_storage.content_mimetype_add(
results, conflict_update=(policy_update == 'update-dups'))
class MimetypeIndexer(MixinMimetypeIndexer, ContentIndexer):
"""Mimetype Indexer working on list of content identifiers.
It:
- (optionally) filters out content already indexed (cf.
:meth:`.filter`)
- reads content from objstorage per the content's id (sha1)
- computes {mimetype, encoding} from that content
- stores result in storage
"""
def filter(self, ids):
"""Filter out known sha1s and return only missing ones.
"""
yield from self.idx_storage.content_mimetype_missing((
{
'id': sha1,
'indexer_configuration_id': self.tool['id'],
} for sha1 in ids
))
class MimetypeRangeIndexer(MixinMimetypeIndexer, ContentRangeIndexer):
"""Mimetype Range Indexer working on range of content identifiers.
It:
- (optionally) filters out content already indexed (cf
:meth:`.indexed_contents_in_range`)
- reads content from objstorage per the content's id (sha1)
- computes {mimetype, encoding} from that content
- stores result in storage
"""
def indexed_contents_in_range(
self, start: bytes, end: bytes
) -> Dict[str, Optional[bytes]]:
"""Retrieve indexed content id within range [start, end].
Args:
start: Starting bound from range identifier
end: End range identifier
Returns:
dict: a dict with keys:
- ids: iterable of content ids within the range.
- next: The next range of sha1 starts at
this sha1 if any
"""
return self.idx_storage.content_mimetype_get_range(
start, end, self.tool['id'])
diff --git a/swh/indexer/storage/__init__.py b/swh/indexer/storage/__init__.py
index 183e9c5..7578e52 100644
--- a/swh/indexer/storage/__init__.py
+++ b/swh/indexer/storage/__init__.py
@@ -1,533 +1,544 @@
# Copyright (C) 2015-2020 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import json
import psycopg2
import psycopg2.pool
from collections import defaultdict, Counter
from typing import Dict, List
from swh.storage.common import db_transaction_generator, db_transaction
from swh.storage.exc import StorageDBError
-from swh.storage.metrics import send_metric, timed, process_metrics
from . import converters
from .db import Db
from .exc import IndexerStorageArgumentException, DuplicateId
+from .metrics import process_metrics, send_metric, timed
INDEXER_CFG_KEY = 'indexer_storage'
MAPPING_NAMES = ['codemeta', 'gemspec', 'maven', 'npm', 'pkg-info']
def get_indexer_storage(cls, args):
"""Get an indexer storage object of class `storage_class` with
arguments `storage_args`.
Args:
cls (str): storage's class, either 'local' or 'remote'
args (dict): dictionary of arguments passed to the
storage class constructor
Returns:
an instance of swh.indexer's storage (either local or remote)
Raises:
ValueError if passed an unknown storage class.
"""
if cls == 'remote':
from .api.client import RemoteStorage as IndexerStorage
elif cls == 'local':
from . import IndexerStorage
elif cls == 'memory':
from .in_memory import IndexerStorage
else:
raise ValueError('Unknown indexer storage class `%s`' % cls)
return IndexerStorage(**args)
def check_id_duplicates(data):
"""
If any two dictionaries in `data` have the same id, raises
a `ValueError`.
Values associated to the key must be hashable.
Args:
data (List[dict]): List of dictionaries to be inserted
>>> check_id_duplicates([
... {'id': 'foo', 'data': 'spam'},
... {'id': 'bar', 'data': 'egg'},
... ])
>>> check_id_duplicates([
... {'id': 'foo', 'data': 'spam'},
... {'id': 'foo', 'data': 'egg'},
... ])
Traceback (most recent call last):
...
swh.indexer.storage.exc.DuplicateId: ['foo']
"""
counter = Counter(item['id'] for item in data)
duplicates = [id_ for (id_, count) in counter.items() if count >= 2]
if duplicates:
raise DuplicateId(duplicates)
class IndexerStorage:
"""SWH Indexer Storage
"""
def __init__(self, db, min_pool_conns=1, max_pool_conns=10):
"""
Args:
db_conn: either a libpq connection string, or a psycopg2 connection
"""
try:
if isinstance(db, psycopg2.extensions.connection):
self._pool = None
self._db = Db(db)
else:
self._pool = psycopg2.pool.ThreadedConnectionPool(
min_pool_conns, max_pool_conns, db
)
self._db = None
except psycopg2.OperationalError as e:
raise StorageDBError(e)
def get_db(self):
if self._db:
return self._db
return Db.from_pool(self._pool)
def put_db(self, db):
if db is not self._db:
db.put_conn()
+ @timed
@db_transaction()
def check_config(self, *, check_write, db=None, cur=None):
# Check permissions on one of the tables
if check_write:
check = 'INSERT'
else:
check = 'SELECT'
cur.execute(
"select has_table_privilege(current_user, 'content_mimetype', %s)", # noqa
(check,)
)
return cur.fetchone()[0]
+ @timed
@db_transaction_generator()
def content_mimetype_missing(self, mimetypes, db=None, cur=None):
for obj in db.content_mimetype_missing_from_list(mimetypes, cur):
yield obj[0]
def _content_get_range(self, content_type, start, end,
indexer_configuration_id, limit=1000,
with_textual_data=False,
db=None, cur=None):
if limit is None:
raise IndexerStorageArgumentException('limit should not be None')
if content_type not in db.content_indexer_names:
err = 'Wrong type. Should be one of [%s]' % (
','.join(db.content_indexer_names))
raise IndexerStorageArgumentException(err)
ids = []
next_id = None
for counter, obj in enumerate(db.content_get_range(
content_type, start, end, indexer_configuration_id,
limit=limit+1, with_textual_data=with_textual_data, cur=cur)):
_id = obj[0]
if counter >= limit:
next_id = _id
break
ids.append(_id)
return {
'ids': ids,
'next': next_id
}
+ @timed
@db_transaction()
def content_mimetype_get_range(self, start, end, indexer_configuration_id,
limit=1000, db=None, cur=None):
return self._content_get_range('mimetype', start, end,
indexer_configuration_id, limit=limit,
db=db, cur=cur)
@timed
@process_metrics
@db_transaction()
def content_mimetype_add(
self, mimetypes: List[Dict], conflict_update: bool = False,
db=None, cur=None) -> Dict[str, int]:
"""Add mimetypes to the storage (if conflict_update is True, this will
override existing data if any).
Returns:
A dict with the number of new elements added to the storage.
"""
check_id_duplicates(mimetypes)
mimetypes.sort(key=lambda m: m['id'])
db.mktemp_content_mimetype(cur)
db.copy_to(mimetypes, 'tmp_content_mimetype',
['id', 'mimetype', 'encoding', 'indexer_configuration_id'],
cur)
count = db.content_mimetype_add_from_temp(conflict_update, cur)
- send_metric('content_mimetype:add',
- count=count, method_name='content_mimetype_add')
return {
'content_mimetype:add': count
}
+ @timed
@db_transaction_generator()
def content_mimetype_get(self, ids, db=None, cur=None):
for c in db.content_mimetype_get_from_list(ids, cur):
yield converters.db_to_mimetype(
dict(zip(db.content_mimetype_cols, c)))
+ @timed
@db_transaction_generator()
def content_language_missing(self, languages, db=None, cur=None):
for obj in db.content_language_missing_from_list(languages, cur):
yield obj[0]
+ @timed
@db_transaction_generator()
def content_language_get(self, ids, db=None, cur=None):
for c in db.content_language_get_from_list(ids, cur):
yield converters.db_to_language(
dict(zip(db.content_language_cols, c)))
@timed
@process_metrics
@db_transaction()
def content_language_add(
self, languages: List[Dict], conflict_update: bool = False,
db=None, cur=None) -> Dict[str, int]:
check_id_duplicates(languages)
languages.sort(key=lambda m: m['id'])
db.mktemp_content_language(cur)
# empty language is mapped to 'unknown'
db.copy_to(
({
'id': l['id'],
'lang': 'unknown' if not l['lang'] else l['lang'],
'indexer_configuration_id': l['indexer_configuration_id'],
} for l in languages),
'tmp_content_language',
['id', 'lang', 'indexer_configuration_id'], cur)
count = db.content_language_add_from_temp(conflict_update, cur)
- send_metric('content_language:add',
- count=count, method_name='content_language_add')
return {
'content_language:add': count
}
+ @timed
@db_transaction_generator()
def content_ctags_missing(self, ctags, db=None, cur=None):
for obj in db.content_ctags_missing_from_list(ctags, cur):
yield obj[0]
+ @timed
@db_transaction_generator()
def content_ctags_get(self, ids, db=None, cur=None):
for c in db.content_ctags_get_from_list(ids, cur):
yield converters.db_to_ctags(dict(zip(db.content_ctags_cols, c)))
@timed
@process_metrics
@db_transaction()
def content_ctags_add(
self, ctags: List[Dict], conflict_update: bool = False,
db=None, cur=None) -> Dict[str, int]:
check_id_duplicates(ctags)
ctags.sort(key=lambda m: m['id'])
def _convert_ctags(__ctags):
"""Convert ctags dict to list of ctags.
"""
for ctags in __ctags:
yield from converters.ctags_to_db(ctags)
db.mktemp_content_ctags(cur)
db.copy_to(list(_convert_ctags(ctags)),
tblname='tmp_content_ctags',
columns=['id', 'name', 'kind', 'line',
'lang', 'indexer_configuration_id'],
cur=cur)
count = db.content_ctags_add_from_temp(conflict_update, cur)
- send_metric('content_ctags:add',
- count=count, method_name='content_ctags_add')
return {
'content_ctags:add': count
}
+ @timed
@db_transaction_generator()
def content_ctags_search(self, expression,
limit=10, last_sha1=None, db=None, cur=None):
for obj in db.content_ctags_search(expression, last_sha1, limit,
cur=cur):
yield converters.db_to_ctags(dict(zip(db.content_ctags_cols, obj)))
+ @timed
@db_transaction_generator()
def content_fossology_license_get(self, ids, db=None, cur=None):
d = defaultdict(list)
for c in db.content_fossology_license_get_from_list(ids, cur):
license = dict(zip(db.content_fossology_license_cols, c))
id_ = license['id']
d[id_].append(converters.db_to_fossology_license(license))
for id_, facts in d.items():
yield {id_: facts}
@timed
@process_metrics
@db_transaction()
def content_fossology_license_add(
self, licenses: List[Dict], conflict_update: bool = False,
db=None, cur=None) -> Dict[str, int]:
check_id_duplicates(licenses)
licenses.sort(key=lambda m: m['id'])
db.mktemp_content_fossology_license(cur)
db.copy_to(
({
'id': sha1['id'],
'indexer_configuration_id': sha1['indexer_configuration_id'],
'license': license,
} for sha1 in licenses
for license in sha1['licenses']),
tblname='tmp_content_fossology_license',
columns=['id', 'license', 'indexer_configuration_id'],
cur=cur)
count = db.content_fossology_license_add_from_temp(
conflict_update, cur)
- send_metric('content_fossology_license:add',
- count=count, method_name='content_fossology_license_add')
return {
'content_fossology_license:add': count
}
+ @timed
@db_transaction()
def content_fossology_license_get_range(
self, start, end, indexer_configuration_id,
limit=1000, db=None, cur=None):
return self._content_get_range('fossology_license', start, end,
indexer_configuration_id, limit=limit,
with_textual_data=True, db=db, cur=cur)
+ @timed
@db_transaction_generator()
def content_metadata_missing(self, metadata, db=None, cur=None):
for obj in db.content_metadata_missing_from_list(metadata, cur):
yield obj[0]
+ @timed
@db_transaction_generator()
def content_metadata_get(self, ids, db=None, cur=None):
for c in db.content_metadata_get_from_list(ids, cur):
yield converters.db_to_metadata(
dict(zip(db.content_metadata_cols, c)))
@timed
@process_metrics
@db_transaction()
def content_metadata_add(
self, metadata: List[Dict], conflict_update: bool = False,
db=None, cur=None) -> Dict[str, int]:
check_id_duplicates(metadata)
metadata.sort(key=lambda m: m['id'])
db.mktemp_content_metadata(cur)
db.copy_to(metadata, 'tmp_content_metadata',
['id', 'metadata', 'indexer_configuration_id'],
cur)
count = db.content_metadata_add_from_temp(conflict_update, cur)
- send_metric('content_metadata:add',
- count=count, method_name='content_metadata_add')
return {
'content_metadata:add': count,
}
+ @timed
@db_transaction_generator()
def revision_intrinsic_metadata_missing(self, metadata, db=None, cur=None):
for obj in db.revision_intrinsic_metadata_missing_from_list(
metadata, cur):
yield obj[0]
+ @timed
@db_transaction_generator()
def revision_intrinsic_metadata_get(self, ids, db=None, cur=None):
for c in db.revision_intrinsic_metadata_get_from_list(ids, cur):
yield converters.db_to_metadata(
dict(zip(db.revision_intrinsic_metadata_cols, c)))
@timed
@process_metrics
@db_transaction()
def revision_intrinsic_metadata_add(
self, metadata: List[Dict], conflict_update: bool = False,
db=None, cur=None) -> Dict[str, int]:
check_id_duplicates(metadata)
metadata.sort(key=lambda m: m['id'])
db.mktemp_revision_intrinsic_metadata(cur)
db.copy_to(metadata, 'tmp_revision_intrinsic_metadata',
['id', 'metadata', 'mappings',
'indexer_configuration_id'],
cur)
count = db.revision_intrinsic_metadata_add_from_temp(
conflict_update, cur)
- send_metric('revision_intrinsic_metadata:add',
- count=count, method_name='revision_intrinsic_metadata_add')
return {
'revision_intrinsic_metadata:add': count,
}
@timed
@process_metrics
@db_transaction()
def revision_intrinsic_metadata_delete(
self, entries: List[Dict], db=None, cur=None) -> Dict:
count = db.revision_intrinsic_metadata_delete(entries, cur)
return {
'revision_intrinsic_metadata:del': count
}
+ @timed
@db_transaction_generator()
def origin_intrinsic_metadata_get(self, ids, db=None, cur=None):
for c in db.origin_intrinsic_metadata_get_from_list(ids, cur):
yield converters.db_to_metadata(
dict(zip(db.origin_intrinsic_metadata_cols, c)))
@timed
@process_metrics
@db_transaction()
def origin_intrinsic_metadata_add(
self, metadata: List[Dict], conflict_update: bool = False,
db=None, cur=None) -> Dict[str, int]:
check_id_duplicates(metadata)
metadata.sort(key=lambda m: m['id'])
db.mktemp_origin_intrinsic_metadata(cur)
db.copy_to(metadata, 'tmp_origin_intrinsic_metadata',
['id', 'metadata',
'indexer_configuration_id',
'from_revision', 'mappings'],
cur)
count = db.origin_intrinsic_metadata_add_from_temp(
conflict_update, cur)
- send_metric('content_origin_intrinsic:add',
- count=count, method_name='content_origin_intrinsic_add')
return {
'origin_intrinsic_metadata:add': count,
}
@timed
@process_metrics
@db_transaction()
def origin_intrinsic_metadata_delete(
self, entries: List[Dict], db=None, cur=None) -> Dict:
count = db.origin_intrinsic_metadata_delete(entries, cur)
return {
'origin_intrinsic_metadata:del': count,
}
+ @timed
@db_transaction_generator()
def origin_intrinsic_metadata_search_fulltext(
self, conjunction, limit=100, db=None, cur=None):
for c in db.origin_intrinsic_metadata_search_fulltext(
conjunction, limit=limit, cur=cur):
yield converters.db_to_metadata(
dict(zip(db.origin_intrinsic_metadata_cols, c)))
+ @timed
@db_transaction()
def origin_intrinsic_metadata_search_by_producer(
self, page_token='', limit=100, ids_only=False,
mappings=None, tool_ids=None,
db=None, cur=None):
assert isinstance(page_token, str)
# we go to limit+1 to check whether we should add next_page_token in
# the response
res = db.origin_intrinsic_metadata_search_by_producer(
page_token, limit + 1, ids_only, mappings, tool_ids, cur)
result = {}
if ids_only:
result['origins'] = [origin for (origin,) in res]
if len(result['origins']) > limit:
result['origins'][limit:] = []
result['next_page_token'] = result['origins'][-1]
else:
result['origins'] = [converters.db_to_metadata(
dict(zip(db.origin_intrinsic_metadata_cols, c)))for c in res]
if len(result['origins']) > limit:
result['origins'][limit:] = []
result['next_page_token'] = result['origins'][-1]['id']
return result
+ @timed
@db_transaction()
def origin_intrinsic_metadata_stats(
self, db=None, cur=None):
mapping_names = [m for m in MAPPING_NAMES]
select_parts = []
# Count rows for each mapping
for mapping_name in mapping_names:
select_parts.append((
"sum(case when (mappings @> ARRAY['%s']) "
" then 1 else 0 end)"
) % mapping_name)
# Total
select_parts.append("sum(1)")
# Rows whose metadata has at least one key that is not '@context'
select_parts.append(
"sum(case when ('{}'::jsonb @> (metadata - '@context')) "
" then 0 else 1 end)")
cur.execute('select ' + ', '.join(select_parts)
+ ' from origin_intrinsic_metadata')
results = dict(zip(mapping_names + ['total', 'non_empty'],
cur.fetchone()))
return {
'total': results.pop('total'),
'non_empty': results.pop('non_empty'),
'per_mapping': results,
}
+ @timed
@db_transaction_generator()
def indexer_configuration_add(self, tools, db=None, cur=None):
db.mktemp_indexer_configuration(cur)
db.copy_to(tools, 'tmp_indexer_configuration',
['tool_name', 'tool_version', 'tool_configuration'],
cur)
tools = db.indexer_configuration_add_from_temp(cur)
+ count = 0
for line in tools:
yield dict(zip(db.indexer_configuration_cols, line))
+ count += 1
+ send_metric('indexer_configuration:add', count,
+ method_name='indexer_configuration_add')
+ @timed
@db_transaction()
def indexer_configuration_get(self, tool, db=None, cur=None):
tool_conf = tool['tool_configuration']
if isinstance(tool_conf, dict):
tool_conf = json.dumps(tool_conf)
idx = db.indexer_configuration_get(tool['tool_name'],
tool['tool_version'],
tool_conf)
if not idx:
return None
return dict(zip(db.indexer_configuration_cols, idx))
diff --git a/swh/indexer/storage/in_memory.py b/swh/indexer/storage/in_memory.py
index 189a117..43db6d3 100644
--- a/swh/indexer/storage/in_memory.py
+++ b/swh/indexer/storage/in_memory.py
@@ -1,456 +1,457 @@
# Copyright (C) 2018-2020 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import bisect
from collections import defaultdict, Counter
import itertools
import json
import operator
import math
import re
from typing import Any, Dict, List
from . import MAPPING_NAMES, check_id_duplicates
from .exc import IndexerStorageArgumentException
SHA1_DIGEST_SIZE = 160
def _transform_tool(tool):
return {
'id': tool['id'],
'name': tool['tool_name'],
'version': tool['tool_version'],
'configuration': tool['tool_configuration'],
}
def check_id_types(data: List[Dict[str, Any]]):
"""Checks all elements of the list have an 'id' whose type is 'bytes'."""
if not all(isinstance(item.get('id'), bytes) for item in data):
raise IndexerStorageArgumentException('identifiers must be bytes.')
class SubStorage:
"""Implements common missing/get/add logic for each indexer type."""
def __init__(self, tools):
self._tools = tools
self._sorted_ids = []
self._data = {} # map (id_, tool_id) -> metadata_dict
self._tools_per_id = defaultdict(set) # map id_ -> Set[tool_id]
def missing(self, ids):
"""List data missing from storage.
Args:
data (iterable): dictionaries with keys:
- **id** (bytes): sha1 identifier
- **indexer_configuration_id** (int): tool used to compute
the results
Yields:
missing sha1s
"""
for id_ in ids:
tool_id = id_['indexer_configuration_id']
id_ = id_['id']
if tool_id not in self._tools_per_id.get(id_, set()):
yield id_
def get(self, ids):
"""Retrieve data per id.
Args:
ids (iterable): sha1 checksums
Yields:
dict: dictionaries with the following keys:
- **id** (bytes)
- **tool** (dict): tool used to compute metadata
- arbitrary data (as provided to `add`)
"""
for id_ in ids:
for tool_id in self._tools_per_id.get(id_, set()):
key = (id_, tool_id)
yield {
'id': id_,
'tool': _transform_tool(self._tools[tool_id]),
**self._data[key],
}
def get_all(self):
yield from self.get(self._sorted_ids)
def get_range(self, start, end, indexer_configuration_id, limit):
"""Retrieve data within range [start, end] bound by limit.
Args:
**start** (bytes): Starting identifier range (expected smaller
than end)
**end** (bytes): Ending identifier range (expected larger
than start)
**indexer_configuration_id** (int): The tool used to index data
**limit** (int): Limit result
Raises:
IndexerStorageArgumentException for limit to None
Returns:
a dict with keys:
- **ids** [bytes]: iterable of content ids within the range.
- **next** (Optional[bytes]): The next range of sha1 starts at
this sha1 if any
"""
if limit is None:
raise IndexerStorageArgumentException('limit should not be None')
from_index = bisect.bisect_left(self._sorted_ids, start)
to_index = bisect.bisect_right(self._sorted_ids, end, lo=from_index)
if to_index - from_index >= limit:
return {
'ids': self._sorted_ids[from_index:from_index+limit],
'next': self._sorted_ids[from_index+limit],
}
else:
return {
'ids': self._sorted_ids[from_index:to_index],
'next': None,
}
def add(self, data: List[Dict], conflict_update: bool) -> int:
"""Add data not present in storage.
Args:
data (iterable): dictionaries with keys:
- **id**: sha1
- **indexer_configuration_id**: tool used to compute the
results
- arbitrary data
conflict_update (bool): Flag to determine if we want to overwrite
(true) or skip duplicates (false)
"""
data = list(data)
check_id_duplicates(data)
count = 0
for item in data:
item = item.copy()
tool_id = item.pop('indexer_configuration_id')
id_ = item.pop('id')
data_item = item
if not conflict_update and \
tool_id in self._tools_per_id.get(id_, set()):
# Duplicate, should not be updated
continue
key = (id_, tool_id)
self._data[key] = data_item
self._tools_per_id[id_].add(tool_id)
count += 1
if id_ not in self._sorted_ids:
bisect.insort(self._sorted_ids, id_)
return count
def add_merge(self, new_data: List[Dict], conflict_update: bool,
merged_key: str) -> int:
added = 0
+ all_subitems: List
for new_item in new_data:
id_ = new_item['id']
tool_id = new_item['indexer_configuration_id']
if conflict_update:
all_subitems = []
else:
existing = list(self.get([id_]))
all_subitems = [
old_subitem
for existing_item in existing
if existing_item['tool']['id'] == tool_id
for old_subitem in existing_item[merged_key]
]
for new_subitem in new_item[merged_key]:
if new_subitem not in all_subitems:
all_subitems.append(new_subitem)
added += self.add([
{
'id': id_,
'indexer_configuration_id': tool_id,
merged_key: all_subitems,
}
], conflict_update=True)
if id_ not in self._sorted_ids:
bisect.insort(self._sorted_ids, id_)
return added
def delete(self, entries: List[Dict]) -> int:
"""Delete entries and return the number of entries deleted.
"""
deleted = 0
for entry in entries:
(id_, tool_id) = (entry['id'], entry['indexer_configuration_id'])
key = (id_, tool_id)
if tool_id in self._tools_per_id[id_]:
self._tools_per_id[id_].remove(tool_id)
if key in self._data:
deleted += 1
del self._data[key]
return deleted
class IndexerStorage:
"""In-memory SWH indexer storage."""
def __init__(self):
self._tools = {}
self._mimetypes = SubStorage(self._tools)
self._languages = SubStorage(self._tools)
self._content_ctags = SubStorage(self._tools)
self._licenses = SubStorage(self._tools)
self._content_metadata = SubStorage(self._tools)
self._revision_intrinsic_metadata = SubStorage(self._tools)
self._origin_intrinsic_metadata = SubStorage(self._tools)
def check_config(self, *, check_write):
return True
def content_mimetype_missing(self, mimetypes):
yield from self._mimetypes.missing(mimetypes)
def content_mimetype_get_range(
self, start, end, indexer_configuration_id, limit=1000):
return self._mimetypes.get_range(
start, end, indexer_configuration_id, limit)
def content_mimetype_add(
self, mimetypes: List[Dict],
conflict_update: bool = False) -> Dict[str, int]:
check_id_types(mimetypes)
added = self._mimetypes.add(mimetypes, conflict_update)
return {'content_mimetype:add': added}
def content_mimetype_get(self, ids):
yield from self._mimetypes.get(ids)
def content_language_missing(self, languages):
yield from self._languages.missing(languages)
def content_language_get(self, ids):
yield from self._languages.get(ids)
def content_language_add(
self, languages: List[Dict],
conflict_update: bool = False) -> Dict[str, int]:
check_id_types(languages)
added = self._languages.add(languages, conflict_update)
return {'content_language:add': added}
def content_ctags_missing(self, ctags):
yield from self._content_ctags.missing(ctags)
def content_ctags_get(self, ids):
for item in self._content_ctags.get(ids):
for item_ctags_item in item['ctags']:
yield {
'id': item['id'],
'tool': item['tool'],
**item_ctags_item
}
def content_ctags_add(
self, ctags: List[Dict],
conflict_update: bool = False) -> Dict[str, int]:
check_id_types(ctags)
added = self._content_ctags.add_merge(ctags, conflict_update, 'ctags')
return {'content_ctags:add': added}
def content_ctags_search(self, expression,
limit=10, last_sha1=None):
nb_matches = 0
for ((id_, tool_id), item) in \
sorted(self._content_ctags._data.items()):
if id_ <= (last_sha1 or bytes(0 for _ in range(SHA1_DIGEST_SIZE))):
continue
for ctags_item in item['ctags']:
if ctags_item['name'] != expression:
continue
nb_matches += 1
yield {
'id': id_,
'tool': _transform_tool(self._tools[tool_id]),
**ctags_item
}
if nb_matches >= limit:
return
def content_fossology_license_get(self, ids):
# Rewrites the output of SubStorage.get from the old format to
# the new one. SubStorage.get should be updated once all other
# *_get methods use the new format.
# See: https://forge.softwareheritage.org/T1433
res = {}
for d in self._licenses.get(ids):
res.setdefault(d.pop('id'), []).append(d)
for (id_, facts) in res.items():
yield {id_: facts}
def content_fossology_license_add(
self, licenses: List[Dict],
conflict_update: bool = False) -> Dict[str, int]:
check_id_types(licenses)
added = self._licenses.add_merge(licenses, conflict_update, 'licenses')
return {'fossology_license_add:add': added}
def content_fossology_license_get_range(
self, start, end, indexer_configuration_id, limit=1000):
return self._licenses.get_range(
start, end, indexer_configuration_id, limit)
def content_metadata_missing(self, metadata):
yield from self._content_metadata.missing(metadata)
def content_metadata_get(self, ids):
yield from self._content_metadata.get(ids)
def content_metadata_add(
self, metadata: List[Dict],
conflict_update: bool = False) -> Dict[str, int]:
check_id_types(metadata)
added = self._content_metadata.add(metadata, conflict_update)
return {'content_metadata:add': added}
def revision_intrinsic_metadata_missing(self, metadata):
yield from self._revision_intrinsic_metadata.missing(metadata)
def revision_intrinsic_metadata_get(self, ids):
yield from self._revision_intrinsic_metadata.get(ids)
def revision_intrinsic_metadata_add(
self, metadata: List[Dict],
conflict_update: bool = False) -> Dict[str, int]:
check_id_types(metadata)
added = self._revision_intrinsic_metadata.add(
metadata, conflict_update)
return {'revision_intrinsic_metadata:add': added}
def revision_intrinsic_metadata_delete(self, entries: List[Dict]) -> Dict:
deleted = self._revision_intrinsic_metadata.delete(entries)
return {'revision_intrinsic_metadata:del': deleted}
def origin_intrinsic_metadata_get(self, ids):
yield from self._origin_intrinsic_metadata.get(ids)
def origin_intrinsic_metadata_add(
self, metadata: List[Dict],
conflict_update: bool = False) -> Dict[str, int]:
added = self._origin_intrinsic_metadata.add(metadata, conflict_update)
return {'origin_intrinsic_metadata:add': added}
def origin_intrinsic_metadata_delete(self, entries: List[Dict]) -> Dict:
deleted = self._origin_intrinsic_metadata.delete(entries)
return {'origin_intrinsic_metadata:del': deleted}
def origin_intrinsic_metadata_search_fulltext(
self, conjunction, limit=100):
# A very crude fulltext search implementation, but that's enough
# to work on English metadata
tokens_re = re.compile('[a-zA-Z0-9]+')
search_tokens = list(itertools.chain(
*map(tokens_re.findall, conjunction)))
def rank(data):
# Tokenize the metadata
text = json.dumps(data['metadata'])
text_tokens = tokens_re.findall(text)
text_token_occurences = Counter(text_tokens)
# Count the number of occurrences of search tokens in the text
score = 0
for search_token in search_tokens:
if text_token_occurences[search_token] == 0:
# Search token is not in the text.
return 0
score += text_token_occurences[search_token]
# Normalize according to the text's length
return score / math.log(len(text_tokens))
results = [(rank(data), data)
for data in self._origin_intrinsic_metadata.get_all()]
results = [(rank_, data) for (rank_, data) in results if rank_ > 0]
results.sort(key=operator.itemgetter(0), # Don't try to order 'data'
reverse=True)
for (rank_, result) in results[:limit]:
yield result
def origin_intrinsic_metadata_search_by_producer(
self, page_token='', limit=100, ids_only=False,
mappings=None, tool_ids=None):
assert isinstance(page_token, str)
nb_results = 0
if mappings is not None:
mappings = frozenset(mappings)
if tool_ids is not None:
tool_ids = frozenset(tool_ids)
origins = []
# we go to limit+1 to check whether we should add next_page_token in
# the response
for entry in self._origin_intrinsic_metadata.get_all():
if entry['id'] <= page_token:
continue
if nb_results >= (limit + 1):
break
if mappings is not None and mappings.isdisjoint(entry['mappings']):
continue
if tool_ids is not None and entry['tool']['id'] not in tool_ids:
continue
origins.append(entry)
nb_results += 1
result = {}
if len(origins) > limit:
origins = origins[:limit]
result['next_page_token'] = origins[-1]['id']
if ids_only:
origins = [origin['id'] for origin in origins]
result['origins'] = origins
return result
def origin_intrinsic_metadata_stats(self):
mapping_count = {m: 0 for m in MAPPING_NAMES}
total = non_empty = 0
for data in self._origin_intrinsic_metadata.get_all():
total += 1
if set(data['metadata']) - {'@context'}:
non_empty += 1
for mapping in data['mappings']:
mapping_count[mapping] += 1
return {
'per_mapping': mapping_count,
'total': total,
'non_empty': non_empty
}
def indexer_configuration_add(self, tools):
inserted = []
for tool in tools:
tool = tool.copy()
id_ = self._tool_key(tool)
tool['id'] = id_
self._tools[id_] = tool
inserted.append(tool)
return inserted
def indexer_configuration_get(self, tool):
return self._tools.get(self._tool_key(tool))
def _tool_key(self, tool):
return hash((tool['tool_name'], tool['tool_version'],
json.dumps(tool['tool_configuration'], sort_keys=True)))
diff --git a/version.txt b/version.txt
index a14f0a3..abd41b9 100644
--- a/version.txt
+++ b/version.txt
@@ -1 +1 @@
-v0.0.168-0-g54ac740
\ No newline at end of file
+v0.0.169-0-g895544f
\ No newline at end of file

File Metadata

Mime Type
text/x-diff
Expires
Jul 4 2025, 7:49 AM (10 w, 4 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3347304

Event Timeline