Page MenuHomeSoftware Heritage

No OneTemporary

diff --git a/swh/search/cli.py b/swh/search/cli.py
index 79f303b..79bd031 100644
--- a/swh/search/cli.py
+++ b/swh/search/cli.py
@@ -1,86 +1,86 @@
# Copyright (C) 2019 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import functools
import click
from swh.core import config
from swh.core.cli import CONTEXT_SETTINGS
from swh.journal.cli import get_journal_client
from . import get_search
from .journal_client import process_journal_objects
from .api.server import load_and_check_config, app
@click.group(name='search', context_settings=CONTEXT_SETTINGS)
@click.option('--config-file', '-C', default=None,
type=click.Path(exists=True, dir_okay=False,),
help="Configuration file.")
@click.pass_context
def cli(ctx, config_file):
'''Software Heritage Search tools.'''
ctx.ensure_object(dict)
conf = config.read(config_file)
ctx.obj['config'] = conf
@cli.command('initialize')
@click.pass_context
def initialize(ctx):
"""Creates Elasticsearch indices."""
search = get_search(**ctx.obj['config']['search'])
search.initialize()
print('Done.')
@cli.group('journal-client')
@click.pass_context
def journal_client(ctx):
""""""
pass
@journal_client.command('objects')
@click.option('--max-messages', '-m', default=None, type=int,
help='Maximum number of objects to replay. Default is to '
'run forever.')
@click.pass_context
def journal_client_objects(ctx, max_messages):
"""Listens for new objects from the SWH Journal, and schedules tasks
to run relevant indexers (currently, only origin)
on these new objects."""
client = get_journal_client(ctx, object_types=['origin'])
search = get_search(**ctx.obj['config']['search'])
worker_fn = functools.partial(
process_journal_objects,
search=search,
)
nb_messages = 0
try:
while not max_messages or nb_messages < max_messages:
nb_messages += client.process(worker_fn)
print('Processed %d messages.' % nb_messages)
except KeyboardInterrupt:
ctx.exit(0)
else:
print('Done.')
@cli.command('rpc-serve')
-@click.argument('config-path', required=1)
+@click.argument('config-path', required=True)
@click.option('--host', default='0.0.0.0', help="Host to run the server")
@click.option('--port', default=5010, type=click.INT,
help="Binding port of the server")
@click.option('--debug/--nodebug', default=True,
help="Indicates if the server should run in debug mode")
def rpc_server(config_path, host, port, debug):
"""Starts a Software Heritage Indexer RPC HTTP server."""
api_cfg = load_and_check_config(config_path, type='any')
app.config.update(api_cfg)
app.run(host, port=int(port), debug=bool(debug))
diff --git a/swh/search/elasticsearch.py b/swh/search/elasticsearch.py
index 328c200..bde055e 100644
--- a/swh/search/elasticsearch.py
+++ b/swh/search/elasticsearch.py
@@ -1,220 +1,222 @@
# Copyright (C) 2019 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import base64
-from typing import Iterable, Dict, List, Iterator
+from typing import Any, Iterable, Dict, List, Iterator, Optional
from elasticsearch import Elasticsearch
from elasticsearch.helpers import bulk, scan
import msgpack
from swh.core.api import remote_api_endpoint
from swh.model import model
from swh.model.identifiers import origin_identifier
def _sanitize_origin(origin):
origin = origin.copy()
res = {
'url': origin.pop('url')
}
for field_name in ('intrinsic_metadata', 'has_visits'):
if field_name in origin:
res[field_name] = origin.pop(field_name)
return res
class ElasticSearch:
def __init__(self, hosts: List[str]):
self._backend = Elasticsearch(hosts=hosts)
@remote_api_endpoint('check')
def check(self):
return self._backend.ping()
def deinitialize(self) -> None:
"""Removes all indices from the Elasticsearch backend"""
self._backend.indices.delete(index='*')
def initialize(self) -> None:
"""Declare Elasticsearch indices and mappings"""
self._backend.indices.create(
index='origin',
body={
'mappings': {
'properties': {
'url': {
'type': 'text',
# TODO: consider removing fielddata when
# swh-storage allows querying by hash, so the
# full URL does not have to be stored in ES'
# memory. See:
# https://www.elastic.co/guide/en/elasticsearch/reference/current/fielddata.html#before-enabling-fielddata
'fielddata': True,
# To split URLs into token on any character
# that is not alphanumerical
'analyzer': 'simple',
'fields': {
'as_you_type': {
'type': 'search_as_you_type',
'analyzer': 'simple',
}
}
},
'has_visits': {
'type': 'boolean',
},
'intrinsic_metadata': {
'type': 'nested',
'properties': {
'@context': {
# don't bother indexing tokens
'type': 'keyword',
}
},
},
}
}
}
)
@remote_api_endpoint('origin/update')
def origin_update(self, documents: Iterable[dict]) -> None:
documents = map(_sanitize_origin, documents)
actions = [
{
'_op_type': 'update',
'_id': origin_identifier(document),
'_index': 'origin',
'doc': document,
'doc_as_upsert': True,
}
for document in documents
]
# TODO: make refresh='wait_for' configurable (we don't need it
# in production, it will probably be a performance issue)
bulk(self._backend, actions, index='origin', refresh='wait_for')
def origin_dump(self) -> Iterator[model.Origin]:
"""Returns all content in Elasticsearch's index. Not exposed
publicly; but useful for tests."""
results = scan(self._backend, index='*')
for hit in results:
yield self._backend.termvectors(
index='origin', id=hit['_id'],
fields=['*'])
@remote_api_endpoint('origin/search')
def origin_search(
self, *,
url_pattern: str = None, metadata_pattern: str = None,
with_visit: bool = False,
scroll_token: str = None, count: int = 50
) -> Dict[str, object]:
"""Searches for origins matching the `url_pattern`.
Args:
url_pattern (str): Part of thr URL to search for
with_visit (bool): Whether origins with no visit are to be
filtered out
scroll_token (str): Opaque value used for pagination.
count (int): number of results to return.
Returns:
a dictionary with keys:
* `scroll_token`:
opaque value used for fetching more results. `None` if there
are no more result.
* `results`:
list of dictionaries with key:
* `url`: URL of a matching origin
"""
- query_clauses = []
+ query_clauses = [] # type: List[Dict[str, Any]]
if url_pattern:
query_clauses.append({
'multi_match': {
'query': url_pattern,
'type': 'bool_prefix',
'fields': [
'url.as_you_type',
'url.as_you_type._2gram',
'url.as_you_type._3gram',
]
}
})
if metadata_pattern:
query_clauses.append({
'nested': {
'path': 'intrinsic_metadata',
'query': {
'multi_match': {
'query': metadata_pattern,
'fields': ['intrinsic_metadata.*']
}
},
}
})
if not query_clauses:
raise ValueError(
'At least one of url_pattern and metadata_pattern '
'must be provided.')
if with_visit:
query_clauses.append({
'term': {
'has_visits': True,
}
})
body = {
'query': {
'bool': {
'must': query_clauses,
}
},
'size': count,
'sort': [
{'_score': 'desc'},
{'_id': 'asc'},
]
}
if scroll_token:
# TODO: use ElasticSearch's scroll API?
- scroll_token = msgpack.loads(base64.b64decode(scroll_token))
+ scroll_token_content = msgpack.loads(
+ base64.b64decode(scroll_token))
body['search_after'] = \
- [scroll_token[b'score'], scroll_token[b'id'].decode('ascii')]
+ [scroll_token_content[b'score'],
+ scroll_token_content[b'id'].decode('ascii')]
res = self._backend.search(
index='origin',
body=body,
size=count,
)
hits = res['hits']['hits']
if len(hits) == count:
last_hit = hits[-1]
- next_scroll_token = {
+ next_scroll_token_content = {
b'score': last_hit['_score'],
b'id': last_hit['_id'],
}
next_scroll_token = base64.b64encode(msgpack.dumps(
- next_scroll_token))
+ next_scroll_token_content)) # type: Optional[bytes]
else:
next_scroll_token = None
return {
'scroll_token': next_scroll_token,
'results': [
{
# TODO: also add 'id'?
'url': hit['_source']['url'],
}
for hit in hits
]
}
diff --git a/swh/search/in_memory.py b/swh/search/in_memory.py
index 0b3418d..421729c 100644
--- a/swh/search/in_memory.py
+++ b/swh/search/in_memory.py
@@ -1,122 +1,123 @@
# Copyright (C) 2019 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import base64
from collections import defaultdict
import itertools
import re
-from typing import Iterable, Dict
+from typing import Any, Dict, Iterable, Iterator, List, Optional
import msgpack
from swh.core.api import remote_api_endpoint
from swh.model.identifiers import origin_identifier
def _sanitize_origin(origin):
origin = origin.copy()
res = {
'url': origin.pop('url')
}
for field_name in ('type', 'intrinsic_metadata'):
if field_name in origin:
res[field_name] = origin.pop(field_name)
return res
class InMemorySearch:
def __init__(self):
pass
@remote_api_endpoint('check')
def check(self):
return True
def deinitialize(self) -> None:
if hasattr(self, '_origins'):
del self._origins
del self._origin_ids
def initialize(self) -> None:
- self._origins = defaultdict(dict)
- self._origin_ids = []
+ self._origins = defaultdict(dict) # type: Dict[str, Dict[str, Any]]
+ self._origin_ids = [] # type: List[str]
_url_splitter = re.compile(r'\W')
@remote_api_endpoint('origin/update')
def origin_update(self, documents: Iterable[dict]) -> None:
for document in documents:
document = document.copy()
id_ = origin_identifier(document)
if 'url' in document:
document['_url_tokens'] = \
set(self._url_splitter.split(document['url']))
self._origins[id_].update(document)
if id_ not in self._origin_ids:
self._origin_ids.append(id_)
@remote_api_endpoint('origin/search')
def origin_search(
self, *,
url_pattern: str = None, metadata_pattern: str = None,
with_visit: bool = False,
scroll_token: str = None, count: int = 50
) -> Dict[str, object]:
- matches = (self._origins[id_] for id_ in self._origin_ids)
+ matches = (self._origins[id_] for id_ in self._origin_ids) # type: Iterator[Dict[str, Any]]
if url_pattern:
tokens = set(self._url_splitter.split(url_pattern))
def predicate(match):
missing_tokens = tokens - match['_url_tokens']
if len(missing_tokens) == 0:
return True
elif len(missing_tokens) > 1:
return False
else:
# There is one missing token, look up by prefix.
(missing_token,) = missing_tokens
return any(token.startswith(missing_token)
for token in match['_url_tokens'])
matches = filter(predicate, matches)
if metadata_pattern:
raise NotImplementedError(
'Metadata search is not implemented in the in-memory backend.')
if not url_pattern and not metadata_pattern:
raise ValueError(
'At least one of url_pattern and metadata_pattern '
'must be provided.')
if with_visit:
matches = filter(lambda o: o.get('has_visits'), matches)
if scroll_token:
- scroll_token = msgpack.loads(base64.b64decode(scroll_token))
- start_at_index = scroll_token[b'start_at_index']
+ scroll_token_content = msgpack.loads(
+ base64.b64decode(scroll_token))
+ start_at_index = scroll_token_content[b'start_at_index']
else:
start_at_index = 0
hits = list(itertools.islice(
matches, start_at_index, start_at_index+count))
if len(hits) == count:
- next_scroll_token = {
+ next_scroll_token_content = {
b'start_at_index': start_at_index+count,
}
next_scroll_token = base64.b64encode(msgpack.dumps(
- next_scroll_token))
+ next_scroll_token_content)) # type: Optional[bytes]
else:
next_scroll_token = None
return {
'scroll_token': next_scroll_token,
'results': [
{'url': hit['url']}
for hit in hits
]
}

File Metadata

Mime Type
text/x-diff
Expires
Fri, Jul 4, 3:17 PM (4 d, 22 h ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3293317

Event Timeline