Page Menu
Home
Software Heritage
Search
Configure Global Search
Log In
Files
F9345317
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
15 KB
Subscribers
None
View Options
diff --git a/swh/search/cli.py b/swh/search/cli.py
index 79f303b..79bd031 100644
--- a/swh/search/cli.py
+++ b/swh/search/cli.py
@@ -1,86 +1,86 @@
# Copyright (C) 2019 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import functools
import click
from swh.core import config
from swh.core.cli import CONTEXT_SETTINGS
from swh.journal.cli import get_journal_client
from . import get_search
from .journal_client import process_journal_objects
from .api.server import load_and_check_config, app
@click.group(name='search', context_settings=CONTEXT_SETTINGS)
@click.option('--config-file', '-C', default=None,
type=click.Path(exists=True, dir_okay=False,),
help="Configuration file.")
@click.pass_context
def cli(ctx, config_file):
'''Software Heritage Search tools.'''
ctx.ensure_object(dict)
conf = config.read(config_file)
ctx.obj['config'] = conf
@cli.command('initialize')
@click.pass_context
def initialize(ctx):
"""Creates Elasticsearch indices."""
search = get_search(**ctx.obj['config']['search'])
search.initialize()
print('Done.')
@cli.group('journal-client')
@click.pass_context
def journal_client(ctx):
""""""
pass
@journal_client.command('objects')
@click.option('--max-messages', '-m', default=None, type=int,
help='Maximum number of objects to replay. Default is to '
'run forever.')
@click.pass_context
def journal_client_objects(ctx, max_messages):
"""Listens for new objects from the SWH Journal, and schedules tasks
to run relevant indexers (currently, only origin)
on these new objects."""
client = get_journal_client(ctx, object_types=['origin'])
search = get_search(**ctx.obj['config']['search'])
worker_fn = functools.partial(
process_journal_objects,
search=search,
)
nb_messages = 0
try:
while not max_messages or nb_messages < max_messages:
nb_messages += client.process(worker_fn)
print('Processed %d messages.' % nb_messages)
except KeyboardInterrupt:
ctx.exit(0)
else:
print('Done.')
@cli.command('rpc-serve')
-@click.argument('config-path', required=1)
+@click.argument('config-path', required=True)
@click.option('--host', default='0.0.0.0', help="Host to run the server")
@click.option('--port', default=5010, type=click.INT,
help="Binding port of the server")
@click.option('--debug/--nodebug', default=True,
help="Indicates if the server should run in debug mode")
def rpc_server(config_path, host, port, debug):
"""Starts a Software Heritage Indexer RPC HTTP server."""
api_cfg = load_and_check_config(config_path, type='any')
app.config.update(api_cfg)
app.run(host, port=int(port), debug=bool(debug))
diff --git a/swh/search/elasticsearch.py b/swh/search/elasticsearch.py
index 328c200..bde055e 100644
--- a/swh/search/elasticsearch.py
+++ b/swh/search/elasticsearch.py
@@ -1,220 +1,222 @@
# Copyright (C) 2019 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import base64
-from typing import Iterable, Dict, List, Iterator
+from typing import Any, Iterable, Dict, List, Iterator, Optional
from elasticsearch import Elasticsearch
from elasticsearch.helpers import bulk, scan
import msgpack
from swh.core.api import remote_api_endpoint
from swh.model import model
from swh.model.identifiers import origin_identifier
def _sanitize_origin(origin):
origin = origin.copy()
res = {
'url': origin.pop('url')
}
for field_name in ('intrinsic_metadata', 'has_visits'):
if field_name in origin:
res[field_name] = origin.pop(field_name)
return res
class ElasticSearch:
def __init__(self, hosts: List[str]):
self._backend = Elasticsearch(hosts=hosts)
@remote_api_endpoint('check')
def check(self):
return self._backend.ping()
def deinitialize(self) -> None:
"""Removes all indices from the Elasticsearch backend"""
self._backend.indices.delete(index='*')
def initialize(self) -> None:
"""Declare Elasticsearch indices and mappings"""
self._backend.indices.create(
index='origin',
body={
'mappings': {
'properties': {
'url': {
'type': 'text',
# TODO: consider removing fielddata when
# swh-storage allows querying by hash, so the
# full URL does not have to be stored in ES'
# memory. See:
# https://www.elastic.co/guide/en/elasticsearch/reference/current/fielddata.html#before-enabling-fielddata
'fielddata': True,
# To split URLs into token on any character
# that is not alphanumerical
'analyzer': 'simple',
'fields': {
'as_you_type': {
'type': 'search_as_you_type',
'analyzer': 'simple',
}
}
},
'has_visits': {
'type': 'boolean',
},
'intrinsic_metadata': {
'type': 'nested',
'properties': {
'@context': {
# don't bother indexing tokens
'type': 'keyword',
}
},
},
}
}
}
)
@remote_api_endpoint('origin/update')
def origin_update(self, documents: Iterable[dict]) -> None:
documents = map(_sanitize_origin, documents)
actions = [
{
'_op_type': 'update',
'_id': origin_identifier(document),
'_index': 'origin',
'doc': document,
'doc_as_upsert': True,
}
for document in documents
]
# TODO: make refresh='wait_for' configurable (we don't need it
# in production, it will probably be a performance issue)
bulk(self._backend, actions, index='origin', refresh='wait_for')
def origin_dump(self) -> Iterator[model.Origin]:
"""Returns all content in Elasticsearch's index. Not exposed
publicly; but useful for tests."""
results = scan(self._backend, index='*')
for hit in results:
yield self._backend.termvectors(
index='origin', id=hit['_id'],
fields=['*'])
@remote_api_endpoint('origin/search')
def origin_search(
self, *,
url_pattern: str = None, metadata_pattern: str = None,
with_visit: bool = False,
scroll_token: str = None, count: int = 50
) -> Dict[str, object]:
"""Searches for origins matching the `url_pattern`.
Args:
url_pattern (str): Part of thr URL to search for
with_visit (bool): Whether origins with no visit are to be
filtered out
scroll_token (str): Opaque value used for pagination.
count (int): number of results to return.
Returns:
a dictionary with keys:
* `scroll_token`:
opaque value used for fetching more results. `None` if there
are no more result.
* `results`:
list of dictionaries with key:
* `url`: URL of a matching origin
"""
- query_clauses = []
+ query_clauses = [] # type: List[Dict[str, Any]]
if url_pattern:
query_clauses.append({
'multi_match': {
'query': url_pattern,
'type': 'bool_prefix',
'fields': [
'url.as_you_type',
'url.as_you_type._2gram',
'url.as_you_type._3gram',
]
}
})
if metadata_pattern:
query_clauses.append({
'nested': {
'path': 'intrinsic_metadata',
'query': {
'multi_match': {
'query': metadata_pattern,
'fields': ['intrinsic_metadata.*']
}
},
}
})
if not query_clauses:
raise ValueError(
'At least one of url_pattern and metadata_pattern '
'must be provided.')
if with_visit:
query_clauses.append({
'term': {
'has_visits': True,
}
})
body = {
'query': {
'bool': {
'must': query_clauses,
}
},
'size': count,
'sort': [
{'_score': 'desc'},
{'_id': 'asc'},
]
}
if scroll_token:
# TODO: use ElasticSearch's scroll API?
- scroll_token = msgpack.loads(base64.b64decode(scroll_token))
+ scroll_token_content = msgpack.loads(
+ base64.b64decode(scroll_token))
body['search_after'] = \
- [scroll_token[b'score'], scroll_token[b'id'].decode('ascii')]
+ [scroll_token_content[b'score'],
+ scroll_token_content[b'id'].decode('ascii')]
res = self._backend.search(
index='origin',
body=body,
size=count,
)
hits = res['hits']['hits']
if len(hits) == count:
last_hit = hits[-1]
- next_scroll_token = {
+ next_scroll_token_content = {
b'score': last_hit['_score'],
b'id': last_hit['_id'],
}
next_scroll_token = base64.b64encode(msgpack.dumps(
- next_scroll_token))
+ next_scroll_token_content)) # type: Optional[bytes]
else:
next_scroll_token = None
return {
'scroll_token': next_scroll_token,
'results': [
{
# TODO: also add 'id'?
'url': hit['_source']['url'],
}
for hit in hits
]
}
diff --git a/swh/search/in_memory.py b/swh/search/in_memory.py
index 0b3418d..421729c 100644
--- a/swh/search/in_memory.py
+++ b/swh/search/in_memory.py
@@ -1,122 +1,123 @@
# Copyright (C) 2019 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import base64
from collections import defaultdict
import itertools
import re
-from typing import Iterable, Dict
+from typing import Any, Dict, Iterable, Iterator, List, Optional
import msgpack
from swh.core.api import remote_api_endpoint
from swh.model.identifiers import origin_identifier
def _sanitize_origin(origin):
origin = origin.copy()
res = {
'url': origin.pop('url')
}
for field_name in ('type', 'intrinsic_metadata'):
if field_name in origin:
res[field_name] = origin.pop(field_name)
return res
class InMemorySearch:
def __init__(self):
pass
@remote_api_endpoint('check')
def check(self):
return True
def deinitialize(self) -> None:
if hasattr(self, '_origins'):
del self._origins
del self._origin_ids
def initialize(self) -> None:
- self._origins = defaultdict(dict)
- self._origin_ids = []
+ self._origins = defaultdict(dict) # type: Dict[str, Dict[str, Any]]
+ self._origin_ids = [] # type: List[str]
_url_splitter = re.compile(r'\W')
@remote_api_endpoint('origin/update')
def origin_update(self, documents: Iterable[dict]) -> None:
for document in documents:
document = document.copy()
id_ = origin_identifier(document)
if 'url' in document:
document['_url_tokens'] = \
set(self._url_splitter.split(document['url']))
self._origins[id_].update(document)
if id_ not in self._origin_ids:
self._origin_ids.append(id_)
@remote_api_endpoint('origin/search')
def origin_search(
self, *,
url_pattern: str = None, metadata_pattern: str = None,
with_visit: bool = False,
scroll_token: str = None, count: int = 50
) -> Dict[str, object]:
- matches = (self._origins[id_] for id_ in self._origin_ids)
+ matches = (self._origins[id_] for id_ in self._origin_ids) # type: Iterator[Dict[str, Any]]
if url_pattern:
tokens = set(self._url_splitter.split(url_pattern))
def predicate(match):
missing_tokens = tokens - match['_url_tokens']
if len(missing_tokens) == 0:
return True
elif len(missing_tokens) > 1:
return False
else:
# There is one missing token, look up by prefix.
(missing_token,) = missing_tokens
return any(token.startswith(missing_token)
for token in match['_url_tokens'])
matches = filter(predicate, matches)
if metadata_pattern:
raise NotImplementedError(
'Metadata search is not implemented in the in-memory backend.')
if not url_pattern and not metadata_pattern:
raise ValueError(
'At least one of url_pattern and metadata_pattern '
'must be provided.')
if with_visit:
matches = filter(lambda o: o.get('has_visits'), matches)
if scroll_token:
- scroll_token = msgpack.loads(base64.b64decode(scroll_token))
- start_at_index = scroll_token[b'start_at_index']
+ scroll_token_content = msgpack.loads(
+ base64.b64decode(scroll_token))
+ start_at_index = scroll_token_content[b'start_at_index']
else:
start_at_index = 0
hits = list(itertools.islice(
matches, start_at_index, start_at_index+count))
if len(hits) == count:
- next_scroll_token = {
+ next_scroll_token_content = {
b'start_at_index': start_at_index+count,
}
next_scroll_token = base64.b64encode(msgpack.dumps(
- next_scroll_token))
+ next_scroll_token_content)) # type: Optional[bytes]
else:
next_scroll_token = None
return {
'scroll_token': next_scroll_token,
'results': [
{'url': hit['url']}
for hit in hits
]
}
File Metadata
Details
Attached
Mime Type
text/x-diff
Expires
Fri, Jul 4, 3:17 PM (5 d, 3 h ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3293317
Attached To
rDSEA Archive search
Event Timeline
Log In to Comment