diff --git a/swh/indexer/codemeta.py b/swh/indexer/codemeta.py
index 7cc316f..d02cdf2 100644
--- a/swh/indexer/codemeta.py
+++ b/swh/indexer/codemeta.py
@@ -1,129 +1,147 @@
# Copyright (C) 2018 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
+import collections
import csv
+import itertools
import json
import os.path
import re
import swh.indexer
from pyld import jsonld
_DATA_DIR = os.path.join(os.path.dirname(swh.indexer.__file__), 'data')
CROSSWALK_TABLE_PATH = os.path.join(_DATA_DIR, 'codemeta', 'crosswalk.csv')
CODEMETA_CONTEXT_PATH = os.path.join(_DATA_DIR, 'codemeta', 'codemeta.jsonld')
with open(CODEMETA_CONTEXT_PATH) as fd:
CODEMETA_CONTEXT = json.load(fd)
CODEMETA_CONTEXT_URL = 'https://doi.org/10.5063/schema/codemeta-2.0'
CODEMETA_ALTERNATE_CONTEXT_URLS = {
('https://raw.githubusercontent.com/codemeta/codemeta/'
'master/codemeta.jsonld')
}
CODEMETA_URI = 'https://codemeta.github.io/terms/'
SCHEMA_URI = 'http://schema.org/'
PROPERTY_BLACKLIST = {
# CodeMeta properties that we cannot properly represent.
SCHEMA_URI + 'softwareRequirements',
CODEMETA_URI + 'softwareSuggestions',
# Duplicate of 'author'
SCHEMA_URI + 'creator',
}
_codemeta_field_separator = re.compile(r'\s*[,/]\s*')
def make_absolute_uri(local_name):
definition = CODEMETA_CONTEXT['@context'][local_name]
if isinstance(definition, str):
return definition
elif isinstance(definition, dict):
prefixed_name = definition['@id']
(prefix, local_name) = prefixed_name.split(':')
if prefix == 'schema':
canonical_name = SCHEMA_URI + local_name
elif prefix == 'codemeta':
canonical_name = CODEMETA_URI + local_name
else:
assert False, prefix
return canonical_name
else:
assert False, definition
def _read_crosstable(fd):
reader = csv.reader(fd)
try:
header = next(reader)
except StopIteration:
raise ValueError('empty file')
data_sources = set(header) - {'Parent Type', 'Property',
'Type', 'Description'}
assert 'codemeta-V1' in data_sources
codemeta_translation = {data_source: {} for data_source in data_sources}
terms = set()
for line in reader: # For each canonical name
local_name = dict(zip(header, line))['Property']
if not local_name:
continue
canonical_name = make_absolute_uri(local_name)
if canonical_name in PROPERTY_BLACKLIST:
continue
terms.add(canonical_name)
for (col, value) in zip(header, line): # For each cell in the row
if col in data_sources:
# If that's not the parentType/property/type/description
for local_name in _codemeta_field_separator.split(value):
# For each of the data source's properties that maps
# to this canonical name
if local_name.strip():
codemeta_translation[col][local_name.strip()] = \
canonical_name
return (terms, codemeta_translation)
with open(CROSSWALK_TABLE_PATH) as fd:
(CODEMETA_TERMS, CROSSWALK_TABLE) = _read_crosstable(fd)
def _document_loader(url):
"""Document loader for pyld.
Reads the local codemeta.jsonld file instead of fetching it
from the Internet every single time."""
if url == CODEMETA_CONTEXT_URL or url in CODEMETA_ALTERNATE_CONTEXT_URLS:
return {
'contextUrl': None,
'documentUrl': url,
'document': CODEMETA_CONTEXT,
}
elif url == CODEMETA_URI:
raise Exception('{} is CodeMeta\'s URI, use {} as context url'.format(
CODEMETA_URI, CODEMETA_CONTEXT_URL))
else:
raise Exception(url)
def compact(doc):
"""Same as `pyld.jsonld.compact`, but in the context of CodeMeta."""
return jsonld.compact(doc, CODEMETA_CONTEXT_URL,
options={'documentLoader': _document_loader})
def expand(doc):
"""Same as `pyld.jsonld.expand`, but in the context of CodeMeta."""
return jsonld.expand(doc,
options={'documentLoader': _document_loader})
+
+
+def merge_documents(documents):
+ """Takes a list of metadata dicts, each generated from a different
+ metadata file, and merges them.
+
+ Removes duplicates, if any."""
+ documents = list(itertools.chain.from_iterable(map(expand, documents)))
+ merged_document = collections.defaultdict(list)
+ for document in documents:
+ for (key, values) in document.items():
+ for value in values:
+ if value not in merged_document[key]:
+ merged_document[key].append(value)
+
+ return compact(merged_document)
diff --git a/swh/indexer/metadata.py b/swh/indexer/metadata.py
index 129a447..ecb097a 100644
--- a/swh/indexer/metadata.py
+++ b/swh/indexer/metadata.py
@@ -1,357 +1,356 @@
# Copyright (C) 2017-2018 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
from copy import deepcopy
from swh.core.utils import grouper
+from swh.indexer.codemeta import merge_documents
from swh.indexer.indexer import ContentIndexer, RevisionIndexer, OriginIndexer
from swh.indexer.origin_head import OriginHeadIndexer
from swh.indexer.metadata_dictionary import MAPPINGS
from swh.indexer.metadata_detector import detect_metadata
-from swh.indexer.metadata_detector import extract_minimal_metadata_dict
from swh.indexer.storage import INDEXER_CFG_KEY
from swh.model import hashutil
REVISION_GET_BATCH_SIZE = 10
ORIGIN_GET_BATCH_SIZE = 10
def call_with_batches(f, args, batch_size):
"""Calls a function with batches of args, and concatenates the results.
"""
groups = grouper(args, batch_size)
for group in groups:
yield from f(list(group))
class ContentMetadataIndexer(ContentIndexer):
"""Content-level indexer
This indexer is in charge of:
- filtering out content already indexed in content_metadata
- reading content from objstorage with the content's id sha1
- computing metadata by given context
- using the metadata_dictionary as the 'swh-metadata-translator' tool
- store result in content_metadata table
"""
def filter(self, ids):
"""Filter out known sha1s and return only missing ones.
"""
yield from self.idx_storage.content_metadata_missing((
{
'id': sha1,
'indexer_configuration_id': self.tool['id'],
} for sha1 in ids
))
def index(self, id, data, log_suffix='unknown revision'):
"""Index sha1s' content and store result.
Args:
id (bytes): content's identifier
data (bytes): raw content in bytes
Returns:
dict: dictionary representing a content_metadata. If the
translation wasn't successful the metadata keys will
be returned as None
"""
result = {
'id': id,
'indexer_configuration_id': self.tool['id'],
'metadata': None
}
try:
mapping_name = self.tool['tool_configuration']['context']
log_suffix += ', content_id=%s' % hashutil.hash_to_hex(id)
result['metadata'] = \
MAPPINGS[mapping_name](log_suffix).translate(data)
except Exception:
self.log.exception(
"Problem during metadata translation "
"for content %s" % hashutil.hash_to_hex(id))
if result['metadata'] is None:
return None
return result
def persist_index_computations(self, results, policy_update):
"""Persist the results in storage.
Args:
results ([dict]): list of content_metadata, dict with the
following keys:
- id (bytes): content's identifier (sha1)
- metadata (jsonb): detected metadata
policy_update ([str]): either 'update-dups' or 'ignore-dups' to
respectively update duplicates or ignore them
"""
self.idx_storage.content_metadata_add(
results, conflict_update=(policy_update == 'update-dups'))
class RevisionMetadataIndexer(RevisionIndexer):
"""Revision-level indexer
This indexer is in charge of:
- filtering revisions already indexed in revision_intrinsic_metadata table
with defined computation tool
- retrieve all entry_files in root directory
- use metadata_detector for file_names containing metadata
- compute metadata translation if necessary and possible (depends on tool)
- send sha1s to content indexing if possible
- store the results for revision
"""
ADDITIONAL_CONFIG = {
'tools': ('dict', {
'name': 'swh-metadata-detector',
'version': '0.0.2',
'configuration': {
},
}),
}
def filter(self, sha1_gits):
"""Filter out known sha1s and return only missing ones.
"""
yield from self.idx_storage.revision_intrinsic_metadata_missing((
{
'id': sha1_git,
'indexer_configuration_id': self.tool['id'],
} for sha1_git in sha1_gits
))
def index(self, rev):
"""Index rev by processing it and organizing result.
use metadata_detector to iterate on filenames
- if one filename detected -> sends file to content indexer
- if multiple file detected -> translation needed at revision level
Args:
rev (dict): revision artifact from storage
Returns:
dict: dictionary representing a revision_intrinsic_metadata, with
keys:
- id (str): rev's identifier (sha1_git)
- indexer_configuration_id (bytes): tool used
- metadata: dict of retrieved metadata
"""
result = {
'id': rev['id'],
'indexer_configuration_id': self.tool['id'],
'mappings': None,
'metadata': None
}
try:
root_dir = rev['directory']
dir_ls = list(self.storage.directory_ls(root_dir, recursive=False))
if [entry['type'] for entry in dir_ls] == ['dir']:
# If the root is just a single directory, recurse into it
# eg. PyPI packages, GNU tarballs
subdir = dir_ls[0]['target']
dir_ls = self.storage.directory_ls(subdir, recursive=False)
files = [entry for entry in dir_ls if entry['type'] == 'file']
detected_files = detect_metadata(files)
(mappings, metadata) = self.translate_revision_intrinsic_metadata(
detected_files,
log_suffix='revision=%s' % hashutil.hash_to_hex(rev['id']))
result['mappings'] = mappings
result['metadata'] = metadata
except Exception as e:
self.log.exception(
'Problem when indexing rev: %r', e)
return result
def persist_index_computations(self, results, policy_update):
"""Persist the results in storage.
Args:
results ([dict]): list of content_mimetype, dict with the
following keys:
- id (bytes): content's identifier (sha1)
- mimetype (bytes): mimetype in bytes
- encoding (bytes): encoding in bytes
policy_update ([str]): either 'update-dups' or 'ignore-dups' to
respectively update duplicates or ignore them
"""
# TODO: add functions in storage to keep data in
# revision_intrinsic_metadata
self.idx_storage.revision_intrinsic_metadata_add(
results, conflict_update=(policy_update == 'update-dups'))
def translate_revision_intrinsic_metadata(
self, detected_files, log_suffix):
"""
Determine plan of action to translate metadata when containing
one or multiple detected files:
Args:
detected_files (dict): dictionary mapping context names (e.g.,
"npm", "authors") to list of sha1
Returns:
(List[str], dict): list of mappings used and dict with
translated metadata according to the CodeMeta vocabulary
"""
used_mappings = [MAPPINGS[context].name for context in detected_files]
metadata = []
tool = {
'name': 'swh-metadata-translator',
'version': '0.0.2',
'configuration': {
},
}
# TODO: iterate on each context, on each file
# -> get raw_contents
# -> translate each content
config = {
k: self.config[k]
for k in [INDEXER_CFG_KEY, 'objstorage', 'storage']
}
config['tools'] = [tool]
for context in detected_files.keys():
cfg = deepcopy(config)
cfg['tools'][0]['configuration']['context'] = context
c_metadata_indexer = ContentMetadataIndexer(config=cfg)
# sha1s that are in content_metadata table
sha1s_in_storage = []
metadata_generator = self.idx_storage.content_metadata_get(
detected_files[context])
for c in metadata_generator:
# extracting metadata
sha1 = c['id']
sha1s_in_storage.append(sha1)
local_metadata = c['metadata']
# local metadata is aggregated
if local_metadata:
metadata.append(local_metadata)
sha1s_filtered = [item for item in detected_files[context]
if item not in sha1s_in_storage]
if sha1s_filtered:
# content indexing
try:
c_metadata_indexer.run(sha1s_filtered,
policy_update='ignore-dups',
log_suffix=log_suffix)
# on the fly possibility:
for result in c_metadata_indexer.results:
local_metadata = result['metadata']
metadata.append(local_metadata)
except Exception:
self.log.exception(
"Exception while indexing metadata on contents")
- # transform metadata into min set with swh-metadata-detector
- min_metadata = extract_minimal_metadata_dict(metadata)
- return (used_mappings, min_metadata)
+ metadata = merge_documents(metadata)
+ return (used_mappings, metadata)
class OriginMetadataIndexer(OriginIndexer):
ADDITIONAL_CONFIG = RevisionMetadataIndexer.ADDITIONAL_CONFIG
USE_TOOLS = False
def __init__(self, config=None, **kwargs):
super().__init__(config=config, **kwargs)
self.origin_head_indexer = OriginHeadIndexer(config=config)
self.revision_metadata_indexer = RevisionMetadataIndexer(config=config)
def index_list(self, origin_urls):
head_rev_ids = []
origins_with_head = []
origins = list(call_with_batches(
self.storage.origin_get,
[{'url': url} for url in origin_urls], ORIGIN_GET_BATCH_SIZE))
for origin in origins:
head_result = self.origin_head_indexer.index(origin['url'])
if head_result:
head_result['origin_id'] = origin['id']
origins_with_head.append(origin)
head_rev_ids.append(head_result['revision_id'])
head_revs = list(call_with_batches(
self.storage.revision_get,
head_rev_ids, REVISION_GET_BATCH_SIZE))
assert len(head_revs) == len(head_rev_ids)
results = []
for (origin, rev) in zip(origins_with_head, head_revs):
if not rev:
self.log.warning('Missing head revision of origin %r',
origin['url'])
continue
rev_metadata = self.revision_metadata_indexer.index(rev)
orig_metadata = {
'from_revision': rev_metadata['id'],
'id': origin['id'],
'origin_url': origin['url'],
'metadata': rev_metadata['metadata'],
'mappings': rev_metadata['mappings'],
'indexer_configuration_id':
rev_metadata['indexer_configuration_id'],
}
results.append((orig_metadata, rev_metadata))
return results
def persist_index_computations(self, results, policy_update):
conflict_update = (policy_update == 'update-dups')
# Deduplicate revisions
rev_metadata = []
orig_metadata = []
revs_to_delete = []
origs_to_delete = []
for (orig_item, rev_item) in results:
assert rev_item['metadata'] == orig_item['metadata']
if not rev_item['metadata'] or \
rev_item['metadata'].keys() <= {'@context'}:
# If we didn't find any metadata, don't store a DB record
# (and delete existing ones, if any)
if rev_item not in revs_to_delete:
revs_to_delete.append(rev_item)
if orig_item not in origs_to_delete:
origs_to_delete.append(orig_item)
else:
if rev_item not in rev_metadata:
rev_metadata.append(rev_item)
if orig_item not in orig_metadata:
orig_metadata.append(orig_item)
if rev_metadata:
self.idx_storage.revision_intrinsic_metadata_add(
rev_metadata, conflict_update=conflict_update)
if orig_metadata:
self.idx_storage.origin_intrinsic_metadata_add(
orig_metadata, conflict_update=conflict_update)
# revs_to_delete should always be empty unless we changed a mapping
# to detect less files or less content.
# However, origs_to_delete may be empty whenever an upstream deletes
# a metadata file.
if origs_to_delete:
self.idx_storage.origin_intrinsic_metadata_delete(origs_to_delete)
if revs_to_delete:
self.idx_storage.revision_intrinsic_metadata_delete(revs_to_delete)
diff --git a/swh/indexer/metadata_detector.py b/swh/indexer/metadata_detector.py
index fb7fc3f..b8e99b5 100644
--- a/swh/indexer/metadata_detector.py
+++ b/swh/indexer/metadata_detector.py
@@ -1,62 +1,24 @@
# Copyright (C) 2017 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
-from swh.indexer.codemeta import compact, expand
-from swh.indexer.codemeta import make_absolute_uri
from swh.indexer.metadata_dictionary import MAPPINGS
def detect_metadata(files):
"""
Detects files potentially containing metadata
Args:
file_entries (list): list of files
Returns:
dict: {mapping_filenames[name]:f['sha1']} (may be empty)
"""
results = {}
for (mapping_name, mapping) in MAPPINGS.items():
matches = mapping.detect_metadata_files(files)
if matches:
results[mapping_name] = matches
return results
-
-
-_MINIMAL_PROPERTY_SET = {
- "developmentStatus", "version", "operatingSystem", "description",
- "keywords", "issueTracker", "name", "author", "relatedLink",
- "url", "license", "maintainer", "email", "identifier",
- "codeRepository"}
-
-MINIMAL_METADATA_SET = {make_absolute_uri(prop)
- for prop in _MINIMAL_PROPERTY_SET}
-
-
-def extract_minimal_metadata_dict(metadata_list):
- """
- Every item in the metadata_list is a dict of translated_metadata in the
- CodeMeta vocabulary.
-
- We wish to extract a minimal set of terms and keep all values corresponding
- to this term without duplication.
-
- Args:
- metadata_list (list): list of dicts of translated_metadata
-
- Returns:
- dict: minimal_dict; dict with selected values of metadata
- """
- minimal_dict = {}
- for document in metadata_list:
- for metadata_item in expand(document):
- for (term, value) in metadata_item.items():
- if term in MINIMAL_METADATA_SET:
- if term not in minimal_dict:
- minimal_dict[term] = [value]
- elif value not in minimal_dict[term]:
- minimal_dict[term].append(value)
- return compact(minimal_dict)
diff --git a/swh/indexer/tests/test_metadata.py b/swh/indexer/tests/test_metadata.py
index 1f50746..8427314 100644
--- a/swh/indexer/tests/test_metadata.py
+++ b/swh/indexer/tests/test_metadata.py
@@ -1,1210 +1,1211 @@
# Copyright (C) 2017-2018 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import json
import unittest
from hypothesis import given, strategies, settings, HealthCheck
from swh.model.hashutil import hash_to_bytes
from swh.indexer.codemeta import CODEMETA_TERMS, CROSSWALK_TABLE
+from swh.indexer.codemeta import merge_documents
from swh.indexer.metadata_dictionary import MAPPINGS
from swh.indexer.metadata_dictionary.base import merge_values
from swh.indexer.metadata_detector import (
- detect_metadata, extract_minimal_metadata_dict
+ detect_metadata
)
from swh.indexer.metadata import (
ContentMetadataIndexer, RevisionMetadataIndexer
)
from .utils import (
BASE_TEST_CONFIG, fill_obj_storage, fill_storage,
YARN_PARSER_METADATA, json_document_strategy,
xml_document_strategy,
)
TRANSLATOR_TOOL = {
'name': 'swh-metadata-translator',
'version': '0.0.2',
'configuration': {
'type': 'local',
'context': 'NpmMapping'
}
}
class ContentMetadataTestIndexer(ContentMetadataIndexer):
"""Specific Metadata whose configuration is enough to satisfy the
indexing tests.
"""
def parse_config_file(self, *args, **kwargs):
assert False, 'should not be called; the rev indexer configures it.'
REVISION_METADATA_CONFIG = {
**BASE_TEST_CONFIG,
'tools': TRANSLATOR_TOOL,
}
class Metadata(unittest.TestCase):
"""
Tests metadata_mock_tool tool for Metadata detection
"""
def setUp(self):
"""
shows the entire diff in the results
"""
self.maxDiff = None
self.npm_mapping = MAPPINGS['NpmMapping']()
self.codemeta_mapping = MAPPINGS['CodemetaMapping']()
self.maven_mapping = MAPPINGS['MavenMapping']()
self.pkginfo_mapping = MAPPINGS['PythonPkginfoMapping']()
self.gemspec_mapping = MAPPINGS['GemspecMapping']()
def test_crosstable(self):
self.assertEqual(CROSSWALK_TABLE['NodeJS'], {
'repository': 'http://schema.org/codeRepository',
'os': 'http://schema.org/operatingSystem',
'cpu': 'http://schema.org/processorRequirements',
'engines':
'http://schema.org/processorRequirements',
'author': 'http://schema.org/author',
'author.email': 'http://schema.org/email',
'author.name': 'http://schema.org/name',
'contributor': 'http://schema.org/contributor',
'keywords': 'http://schema.org/keywords',
'license': 'http://schema.org/license',
'version': 'http://schema.org/version',
'description': 'http://schema.org/description',
'name': 'http://schema.org/name',
'bugs': 'https://codemeta.github.io/terms/issueTracker',
'homepage': 'http://schema.org/url'
})
def test_merge_values(self):
self.assertEqual(
merge_values('a', 'b'),
['a', 'b'])
self.assertEqual(
merge_values(['a', 'b'], 'c'),
['a', 'b', 'c'])
self.assertEqual(
merge_values('a', ['b', 'c']),
['a', 'b', 'c'])
self.assertEqual(
merge_values({'@list': ['a']}, {'@list': ['b']}),
{'@list': ['a', 'b']})
self.assertEqual(
merge_values({'@list': ['a', 'b']}, {'@list': ['c']}),
{'@list': ['a', 'b', 'c']})
with self.assertRaises(ValueError):
merge_values({'@list': ['a']}, 'b')
with self.assertRaises(ValueError):
merge_values('a', {'@list': ['b']})
with self.assertRaises(ValueError):
merge_values({'@list': ['a']}, ['b'])
with self.assertRaises(ValueError):
merge_values(['a'], {'@list': ['b']})
self.assertEqual(
merge_values('a', None),
'a')
self.assertEqual(
merge_values(['a', 'b'], None),
['a', 'b'])
self.assertEqual(
merge_values(None, ['b', 'c']),
['b', 'c'])
self.assertEqual(
merge_values({'@list': ['a']}, None),
{'@list': ['a']})
self.assertEqual(
merge_values(None, {'@list': ['a']}),
{'@list': ['a']})
def test_compute_metadata_none(self):
"""
testing content empty content is empty
should return None
"""
# given
content = b""
# None if no metadata was found or an error occurred
declared_metadata = None
# when
result = self.npm_mapping.translate(content)
# then
self.assertEqual(declared_metadata, result)
def test_compute_metadata_npm(self):
"""
testing only computation of metadata with hard_mapping_npm
"""
# given
content = b"""
{
"name": "test_metadata",
"version": "0.0.2",
"description": "Simple package.json test for indexer",
"repository": {
"type": "git",
"url": "https://github.com/moranegg/metadata_test"
},
"author": {
"email": "moranegg@example.com",
"name": "Morane G"
}
}
"""
declared_metadata = {
'@context': 'https://doi.org/10.5063/schema/codemeta-2.0',
'type': 'SoftwareSourceCode',
'name': 'test_metadata',
'version': '0.0.2',
'description': 'Simple package.json test for indexer',
'codeRepository':
'git+https://github.com/moranegg/metadata_test',
'author': [{
'type': 'Person',
'name': 'Morane G',
'email': 'moranegg@example.com',
}],
}
# when
result = self.npm_mapping.translate(content)
# then
self.assertEqual(declared_metadata, result)
- def test_extract_minimal_metadata_dict(self):
+ def test_merge_documents(self):
"""
Test the creation of a coherent minimal metadata set
"""
# given
metadata_list = [{
'@context': 'https://doi.org/10.5063/schema/codemeta-2.0',
'name': 'test_1',
'version': '0.0.2',
'description': 'Simple package.json test for indexer',
'codeRepository':
'git+https://github.com/moranegg/metadata_test',
}, {
'@context': 'https://doi.org/10.5063/schema/codemeta-2.0',
'name': 'test_0_1',
'version': '0.0.2',
'description': 'Simple package.json test for indexer',
'codeRepository':
'git+https://github.com/moranegg/metadata_test'
}, {
'@context': 'https://doi.org/10.5063/schema/codemeta-2.0',
'name': 'test_metadata',
'version': '0.0.2',
'author': 'moranegg',
}]
# when
- results = extract_minimal_metadata_dict(metadata_list)
+ results = merge_documents(metadata_list)
# then
expected_results = {
'@context': 'https://doi.org/10.5063/schema/codemeta-2.0',
"version": '0.0.2',
"description": 'Simple package.json test for indexer',
"name": ['test_1', 'test_0_1', 'test_metadata'],
"author": ['moranegg'],
"codeRepository":
'git+https://github.com/moranegg/metadata_test',
}
self.assertEqual(expected_results, results)
def test_index_content_metadata_npm(self):
"""
testing NPM with package.json
- one sha1 uses a file that can't be translated to metadata and
should return None in the translated metadata
"""
# given
sha1s = [
hash_to_bytes('26a9f72a7c87cc9205725cfd879f514ff4f3d8d5'),
hash_to_bytes('d4c647f0fc257591cc9ba1722484229780d1c607'),
hash_to_bytes('02fb2c89e14f7fab46701478c83779c7beb7b069'),
]
# this metadata indexer computes only metadata for package.json
# in npm context with a hard mapping
config = BASE_TEST_CONFIG.copy()
config['tools'] = [TRANSLATOR_TOOL]
metadata_indexer = ContentMetadataTestIndexer(config=config)
fill_obj_storage(metadata_indexer.objstorage)
fill_storage(metadata_indexer.storage)
# when
metadata_indexer.run(sha1s, policy_update='ignore-dups')
results = list(metadata_indexer.idx_storage.content_metadata_get(
sha1s))
expected_results = [{
'metadata': {
'@context': 'https://doi.org/10.5063/schema/codemeta-2.0',
'type': 'SoftwareSourceCode',
'codeRepository':
'git+https://github.com/moranegg/metadata_test',
'description': 'Simple package.json test for indexer',
'name': 'test_metadata',
'version': '0.0.1'
},
'id': hash_to_bytes('26a9f72a7c87cc9205725cfd879f514ff4f3d8d5'),
}, {
'metadata': {
'@context': 'https://doi.org/10.5063/schema/codemeta-2.0',
'type': 'SoftwareSourceCode',
'issueTracker':
'https://github.com/npm/npm/issues',
'author': [{
'type': 'Person',
'name': 'Isaac Z. Schlueter',
'email': 'i@izs.me',
'url': 'http://blog.izs.me',
}],
'codeRepository':
'git+https://github.com/npm/npm',
'description': 'a package manager for JavaScript',
'license': 'https://spdx.org/licenses/Artistic-2.0',
'version': '5.0.3',
'name': 'npm',
'keywords': [
'install',
'modules',
'package manager',
'package.json'
],
'url': 'https://docs.npmjs.com/'
},
'id': hash_to_bytes('d4c647f0fc257591cc9ba1722484229780d1c607')
}]
for result in results:
del result['tool']
# The assertion below returns False sometimes because of nested lists
self.assertEqual(expected_results, results)
def test_npm_bugs_normalization(self):
# valid dictionary
package_json = b"""{
"name": "foo",
"bugs": {
"url": "https://github.com/owner/project/issues",
"email": "foo@example.com"
}
}"""
result = self.npm_mapping.translate(package_json)
self.assertEqual(result, {
'@context': 'https://doi.org/10.5063/schema/codemeta-2.0',
'name': 'foo',
'issueTracker': 'https://github.com/owner/project/issues',
'type': 'SoftwareSourceCode',
})
# "invalid" dictionary
package_json = b"""{
"name": "foo",
"bugs": {
"email": "foo@example.com"
}
}"""
result = self.npm_mapping.translate(package_json)
self.assertEqual(result, {
'@context': 'https://doi.org/10.5063/schema/codemeta-2.0',
'name': 'foo',
'type': 'SoftwareSourceCode',
})
# string
package_json = b"""{
"name": "foo",
"bugs": "https://github.com/owner/project/issues"
}"""
result = self.npm_mapping.translate(package_json)
self.assertEqual(result, {
'@context': 'https://doi.org/10.5063/schema/codemeta-2.0',
'name': 'foo',
'issueTracker': 'https://github.com/owner/project/issues',
'type': 'SoftwareSourceCode',
})
def test_npm_repository_normalization(self):
# normal
package_json = b"""{
"name": "foo",
"repository": {
"type" : "git",
"url" : "https://github.com/npm/cli.git"
}
}"""
result = self.npm_mapping.translate(package_json)
self.assertEqual(result, {
'@context': 'https://doi.org/10.5063/schema/codemeta-2.0',
'name': 'foo',
'codeRepository': 'git+https://github.com/npm/cli.git',
'type': 'SoftwareSourceCode',
})
# missing url
package_json = b"""{
"name": "foo",
"repository": {
"type" : "git"
}
}"""
result = self.npm_mapping.translate(package_json)
self.assertEqual(result, {
'@context': 'https://doi.org/10.5063/schema/codemeta-2.0',
'name': 'foo',
'type': 'SoftwareSourceCode',
})
# github shortcut
package_json = b"""{
"name": "foo",
"repository": "github:npm/cli"
}"""
result = self.npm_mapping.translate(package_json)
expected_result = {
'@context': 'https://doi.org/10.5063/schema/codemeta-2.0',
'name': 'foo',
'codeRepository': 'git+https://github.com/npm/cli.git',
'type': 'SoftwareSourceCode',
}
self.assertEqual(result, expected_result)
# github shortshortcut
package_json = b"""{
"name": "foo",
"repository": "npm/cli"
}"""
result = self.npm_mapping.translate(package_json)
self.assertEqual(result, expected_result)
# gitlab shortcut
package_json = b"""{
"name": "foo",
"repository": "gitlab:user/repo"
}"""
result = self.npm_mapping.translate(package_json)
self.assertEqual(result, {
'@context': 'https://doi.org/10.5063/schema/codemeta-2.0',
'name': 'foo',
'codeRepository': 'git+https://gitlab.com/user/repo.git',
'type': 'SoftwareSourceCode',
})
def test_detect_metadata_package_json(self):
# given
df = [{
'sha1_git': b'abc',
'name': b'index.js',
'target': b'abc',
'length': 897,
'status': 'visible',
'type': 'file',
'perms': 33188,
'dir_id': b'dir_a',
'sha1': b'bcd'
},
{
'sha1_git': b'aab',
'name': b'package.json',
'target': b'aab',
'length': 712,
'status': 'visible',
'type': 'file',
'perms': 33188,
'dir_id': b'dir_a',
'sha1': b'cde'
}]
# when
results = detect_metadata(df)
expected_results = {
'NpmMapping': [
b'cde'
]
}
# then
self.assertEqual(expected_results, results)
def test_compute_metadata_valid_codemeta(self):
raw_content = (
b"""{
"@context": "https://doi.org/10.5063/schema/codemeta-2.0",
"@type": "SoftwareSourceCode",
"identifier": "CodeMeta",
"description": "CodeMeta is a concept vocabulary that can be used to standardize the exchange of software metadata across repositories and organizations.",
"name": "CodeMeta: Minimal metadata schemas for science software and code, in JSON-LD",
"codeRepository": "https://github.com/codemeta/codemeta",
"issueTracker": "https://github.com/codemeta/codemeta/issues",
"license": "https://spdx.org/licenses/Apache-2.0",
"version": "2.0",
"author": [
{
"@type": "Person",
"givenName": "Carl",
"familyName": "Boettiger",
"email": "cboettig@gmail.com",
"@id": "http://orcid.org/0000-0002-1642-628X"
},
{
"@type": "Person",
"givenName": "Matthew B.",
"familyName": "Jones",
"email": "jones@nceas.ucsb.edu",
"@id": "http://orcid.org/0000-0003-0077-4738"
}
],
"maintainer": {
"@type": "Person",
"givenName": "Carl",
"familyName": "Boettiger",
"email": "cboettig@gmail.com",
"@id": "http://orcid.org/0000-0002-1642-628X"
},
"contIntegration": "https://travis-ci.org/codemeta/codemeta",
"developmentStatus": "active",
"downloadUrl": "https://github.com/codemeta/codemeta/archive/2.0.zip",
"funder": {
"@id": "https://doi.org/10.13039/100000001",
"@type": "Organization",
"name": "National Science Foundation"
},
"funding":"1549758; Codemeta: A Rosetta Stone for Metadata in Scientific Software",
"keywords": [
"metadata",
"software"
],
"version":"2.0",
"dateCreated":"2017-06-05",
"datePublished":"2017-06-05",
"programmingLanguage": "JSON-LD"
}""") # noqa
expected_result = {
"@context": "https://doi.org/10.5063/schema/codemeta-2.0",
"type": "SoftwareSourceCode",
"identifier": "CodeMeta",
"description":
"CodeMeta is a concept vocabulary that can "
"be used to standardize the exchange of software metadata "
"across repositories and organizations.",
"name":
"CodeMeta: Minimal metadata schemas for science "
"software and code, in JSON-LD",
"codeRepository": "https://github.com/codemeta/codemeta",
"issueTracker": "https://github.com/codemeta/codemeta/issues",
"license": "https://spdx.org/licenses/Apache-2.0",
"version": "2.0",
"author": [
{
"type": "Person",
"givenName": "Carl",
"familyName": "Boettiger",
"email": "cboettig@gmail.com",
"id": "http://orcid.org/0000-0002-1642-628X"
},
{
"type": "Person",
"givenName": "Matthew B.",
"familyName": "Jones",
"email": "jones@nceas.ucsb.edu",
"id": "http://orcid.org/0000-0003-0077-4738"
}
],
"maintainer": {
"type": "Person",
"givenName": "Carl",
"familyName": "Boettiger",
"email": "cboettig@gmail.com",
"id": "http://orcid.org/0000-0002-1642-628X"
},
"contIntegration": "https://travis-ci.org/codemeta/codemeta",
"developmentStatus": "active",
"downloadUrl":
"https://github.com/codemeta/codemeta/archive/2.0.zip",
"funder": {
"id": "https://doi.org/10.13039/100000001",
"type": "Organization",
"name": "National Science Foundation"
},
"funding": "1549758; Codemeta: A Rosetta Stone for Metadata "
"in Scientific Software",
"keywords": [
"metadata",
"software"
],
"version": "2.0",
"dateCreated": "2017-06-05",
"datePublished": "2017-06-05",
"programmingLanguage": "JSON-LD"
}
result = self.codemeta_mapping.translate(raw_content)
self.assertEqual(result, expected_result)
def test_compute_metadata_codemeta_alternate_context(self):
raw_content = (
b"""{
"@context": "https://raw.githubusercontent.com/codemeta/codemeta/master/codemeta.jsonld",
"@type": "SoftwareSourceCode",
"identifier": "CodeMeta"
}""") # noqa
expected_result = {
"@context": "https://doi.org/10.5063/schema/codemeta-2.0",
"type": "SoftwareSourceCode",
"identifier": "CodeMeta",
}
result = self.codemeta_mapping.translate(raw_content)
self.assertEqual(result, expected_result)
def test_compute_metadata_maven(self):
raw_content = b"""
Maven Default Project
4.0.0
com.mycompany.app
my-app
1.2.3
central
Maven Repository Switchboard
default
http://repo1.maven.org/maven2
false
Apache License, Version 2.0
https://www.apache.org/licenses/LICENSE-2.0.txt
repo
A business-friendly OSS license
"""
result = self.maven_mapping.translate(raw_content)
self.assertEqual(result, {
'@context': 'https://doi.org/10.5063/schema/codemeta-2.0',
'type': 'SoftwareSourceCode',
'name': 'Maven Default Project',
'identifier': 'com.mycompany.app',
'version': '1.2.3',
'license': 'https://www.apache.org/licenses/LICENSE-2.0.txt',
'codeRepository':
'http://repo1.maven.org/maven2/com/mycompany/app/my-app',
})
def test_compute_metadata_maven_empty(self):
raw_content = b"""
"""
result = self.maven_mapping.translate(raw_content)
self.assertEqual(result, {
'@context': 'https://doi.org/10.5063/schema/codemeta-2.0',
'type': 'SoftwareSourceCode',
})
def test_compute_metadata_maven_almost_empty(self):
raw_content = b"""
"""
result = self.maven_mapping.translate(raw_content)
self.assertEqual(result, {
'@context': 'https://doi.org/10.5063/schema/codemeta-2.0',
'type': 'SoftwareSourceCode',
})
def test_compute_metadata_maven_invalid_xml(self):
expected_warning = (
'WARNING:swh.indexer.metadata_dictionary.maven.MavenMapping:'
'Error parsing XML from foo')
raw_content = b"""
"""
with self.assertLogs('swh.indexer.metadata_dictionary',
level='WARNING') as cm:
result = MAPPINGS["MavenMapping"]('foo').translate(raw_content)
self.assertEqual(cm.output, [expected_warning])
self.assertEqual(result, None)
raw_content = b"""
"""
with self.assertLogs('swh.indexer.metadata_dictionary',
level='WARNING') as cm:
result = MAPPINGS["MavenMapping"]('foo').translate(raw_content)
self.assertEqual(cm.output, [expected_warning])
self.assertEqual(result, None)
def test_compute_metadata_maven_unknown_encoding(self):
expected_warning = (
'WARNING:swh.indexer.metadata_dictionary.maven.MavenMapping:'
'Error detecting XML encoding from foo')
raw_content = b"""
"""
with self.assertLogs('swh.indexer.metadata_dictionary',
level='WARNING') as cm:
result = MAPPINGS["MavenMapping"]('foo').translate(raw_content)
self.assertEqual(cm.output, [expected_warning])
self.assertEqual(result, None)
raw_content = b"""
"""
with self.assertLogs('swh.indexer.metadata_dictionary',
level='WARNING') as cm:
result = MAPPINGS["MavenMapping"]('foo').translate(raw_content)
self.assertEqual(cm.output, [expected_warning])
self.assertEqual(result, None)
def test_compute_metadata_maven_invalid_encoding(self):
expected_warning = (
'WARNING:swh.indexer.metadata_dictionary.maven.MavenMapping:'
'Error unidecoding XML from foo')
raw_content = b"""
"""
with self.assertLogs('swh.indexer.metadata_dictionary',
level='WARNING') as cm:
result = MAPPINGS["MavenMapping"]('foo').translate(raw_content)
self.assertEqual(cm.output, [expected_warning])
self.assertEqual(result, None)
def test_compute_metadata_maven_minimal(self):
raw_content = b"""
Maven Default Project
4.0.0
com.mycompany.app
my-app
1.2.3
"""
result = self.maven_mapping.translate(raw_content)
self.assertEqual(result, {
'@context': 'https://doi.org/10.5063/schema/codemeta-2.0',
'type': 'SoftwareSourceCode',
'name': 'Maven Default Project',
'identifier': 'com.mycompany.app',
'version': '1.2.3',
'codeRepository':
'https://repo.maven.apache.org/maven2/com/mycompany/app/my-app',
})
def test_compute_metadata_maven_empty_nodes(self):
raw_content = b"""
Maven Default Project
4.0.0
com.mycompany.app
my-app
1.2.3
"""
result = self.maven_mapping.translate(raw_content)
self.assertEqual(result, {
'@context': 'https://doi.org/10.5063/schema/codemeta-2.0',
'type': 'SoftwareSourceCode',
'name': 'Maven Default Project',
'identifier': 'com.mycompany.app',
'version': '1.2.3',
'codeRepository':
'https://repo.maven.apache.org/maven2/com/mycompany/app/my-app',
})
raw_content = b"""
Maven Default Project
4.0.0
com.mycompany.app
my-app
"""
result = self.maven_mapping.translate(raw_content)
self.assertEqual(result, {
'@context': 'https://doi.org/10.5063/schema/codemeta-2.0',
'type': 'SoftwareSourceCode',
'name': 'Maven Default Project',
'identifier': 'com.mycompany.app',
'codeRepository':
'https://repo.maven.apache.org/maven2/com/mycompany/app/my-app',
})
raw_content = b"""
4.0.0
com.mycompany.app
my-app
1.2.3
"""
result = self.maven_mapping.translate(raw_content)
self.assertEqual(result, {
'@context': 'https://doi.org/10.5063/schema/codemeta-2.0',
'type': 'SoftwareSourceCode',
'identifier': 'com.mycompany.app',
'version': '1.2.3',
'codeRepository':
'https://repo.maven.apache.org/maven2/com/mycompany/app/my-app',
})
raw_content = b"""
Maven Default Project
4.0.0
com.mycompany.app
my-app
1.2.3
"""
result = self.maven_mapping.translate(raw_content)
self.assertEqual(result, {
'@context': 'https://doi.org/10.5063/schema/codemeta-2.0',
'type': 'SoftwareSourceCode',
'name': 'Maven Default Project',
'identifier': 'com.mycompany.app',
'version': '1.2.3',
'codeRepository':
'https://repo.maven.apache.org/maven2/com/mycompany/app/my-app',
})
raw_content = b"""
1.2.3
"""
result = self.maven_mapping.translate(raw_content)
self.assertEqual(result, {
'@context': 'https://doi.org/10.5063/schema/codemeta-2.0',
'type': 'SoftwareSourceCode',
'version': '1.2.3',
})
def test_compute_metadata_maven_invalid_licenses(self):
raw_content = b"""
Maven Default Project
4.0.0
com.mycompany.app
my-app
1.2.3
foo
"""
result = self.maven_mapping.translate(raw_content)
self.assertEqual(result, {
'@context': 'https://doi.org/10.5063/schema/codemeta-2.0',
'type': 'SoftwareSourceCode',
'name': 'Maven Default Project',
'identifier': 'com.mycompany.app',
'version': '1.2.3',
'codeRepository':
'https://repo.maven.apache.org/maven2/com/mycompany/app/my-app',
})
def test_compute_metadata_maven_multiple(self):
'''Tests when there are multiple code repos and licenses.'''
raw_content = b"""
Maven Default Project
4.0.0
com.mycompany.app
my-app
1.2.3
central
Maven Repository Switchboard
default
http://repo1.maven.org/maven2
false
example
Example Maven Repo
default
http://example.org/maven2
Apache License, Version 2.0
https://www.apache.org/licenses/LICENSE-2.0.txt
repo
A business-friendly OSS license
MIT license
https://opensource.org/licenses/MIT
"""
result = self.maven_mapping.translate(raw_content)
self.assertEqual(result, {
'@context': 'https://doi.org/10.5063/schema/codemeta-2.0',
'type': 'SoftwareSourceCode',
'name': 'Maven Default Project',
'identifier': 'com.mycompany.app',
'version': '1.2.3',
'license': [
'https://www.apache.org/licenses/LICENSE-2.0.txt',
'https://opensource.org/licenses/MIT',
],
'codeRepository': [
'http://repo1.maven.org/maven2/com/mycompany/app/my-app',
'http://example.org/maven2/com/mycompany/app/my-app',
]
})
def test_compute_metadata_pkginfo(self):
raw_content = (b"""\
Metadata-Version: 2.1
Name: swh.core
Version: 0.0.49
Summary: Software Heritage core utilities
Home-page: https://forge.softwareheritage.org/diffusion/DCORE/
Author: Software Heritage developers
Author-email: swh-devel@inria.fr
License: UNKNOWN
Project-URL: Bug Reports, https://forge.softwareheritage.org/maniphest
Project-URL: Funding, https://www.softwareheritage.org/donate
Project-URL: Source, https://forge.softwareheritage.org/source/swh-core
Description: swh-core
========
\x20
core library for swh's modules:
- config parser
- hash computations
- serialization
- logging mechanism
\x20
Platform: UNKNOWN
Classifier: Programming Language :: Python :: 3
Classifier: Intended Audience :: Developers
Classifier: License :: OSI Approved :: GNU General Public License v3 (GPLv3)
Classifier: Operating System :: OS Independent
Classifier: Development Status :: 5 - Production/Stable
Description-Content-Type: text/markdown
Provides-Extra: testing
""") # noqa
result = self.pkginfo_mapping.translate(raw_content)
self.assertCountEqual(result['description'], [
'Software Heritage core utilities', # note the comma here
'swh-core\n'
'========\n'
'\n'
"core library for swh's modules:\n"
'- config parser\n'
'- hash computations\n'
'- serialization\n'
'- logging mechanism\n'
''],
result)
del result['description']
self.assertEqual(result, {
'@context': 'https://doi.org/10.5063/schema/codemeta-2.0',
'type': 'SoftwareSourceCode',
'url': 'https://forge.softwareheritage.org/diffusion/DCORE/',
'name': 'swh.core',
'author': [{
'type': 'Person',
'name': 'Software Heritage developers',
'email': 'swh-devel@inria.fr',
}],
'version': '0.0.49',
})
def test_compute_metadata_pkginfo_utf8(self):
raw_content = (b'''\
Metadata-Version: 1.1
Name: snowpyt
Description-Content-Type: UNKNOWN
Description: foo
Hydrology N\xc2\xb083
''') # noqa
result = self.pkginfo_mapping.translate(raw_content)
self.assertEqual(result, {
'@context': 'https://doi.org/10.5063/schema/codemeta-2.0',
'type': 'SoftwareSourceCode',
'name': 'snowpyt',
'description': 'foo\nHydrology N°83',
})
def test_compute_metadata_pkginfo_keywords(self):
raw_content = (b"""\
Metadata-Version: 2.1
Name: foo
Keywords: foo bar baz
""") # noqa
result = self.pkginfo_mapping.translate(raw_content)
self.assertEqual(result, {
'@context': 'https://doi.org/10.5063/schema/codemeta-2.0',
'type': 'SoftwareSourceCode',
'name': 'foo',
'keywords': ['foo', 'bar', 'baz'],
})
def test_compute_metadata_pkginfo_license(self):
raw_content = (b"""\
Metadata-Version: 2.1
Name: foo
License: MIT
""") # noqa
result = self.pkginfo_mapping.translate(raw_content)
self.assertEqual(result, {
'@context': 'https://doi.org/10.5063/schema/codemeta-2.0',
'type': 'SoftwareSourceCode',
'name': 'foo',
'license': 'MIT',
})
def test_gemspec_base(self):
raw_content = b"""
Gem::Specification.new do |s|
s.name = 'example'
s.version = '0.1.0'
s.licenses = ['MIT']
s.summary = "This is an example!"
s.description = "Much longer explanation of the example!"
s.authors = ["Ruby Coder"]
s.email = 'rubycoder@example.com'
s.files = ["lib/example.rb"]
s.homepage = 'https://rubygems.org/gems/example'
s.metadata = { "source_code_uri" => "https://github.com/example/example" }
end"""
result = self.gemspec_mapping.translate(raw_content)
self.assertCountEqual(result.pop('description'), [
"This is an example!",
"Much longer explanation of the example!"
])
self.assertEqual(result, {
'@context': 'https://doi.org/10.5063/schema/codemeta-2.0',
'type': 'SoftwareSourceCode',
'author': ['Ruby Coder'],
'name': 'example',
'license': 'https://spdx.org/licenses/MIT',
'codeRepository': 'https://rubygems.org/gems/example',
'email': 'rubycoder@example.com',
'version': '0.1.0',
})
def test_gemspec_two_author_fields(self):
raw_content = b"""
Gem::Specification.new do |s|
s.authors = ["Ruby Coder1"]
s.author = "Ruby Coder2"
end"""
result = self.gemspec_mapping.translate(raw_content)
self.assertCountEqual(result.pop('author'), [
'Ruby Coder1', 'Ruby Coder2'])
self.assertEqual(result, {
'@context': 'https://doi.org/10.5063/schema/codemeta-2.0',
'type': 'SoftwareSourceCode',
})
def test_gemspec_invalid_author(self):
raw_content = b"""
Gem::Specification.new do |s|
s.author = ["Ruby Coder"]
end"""
result = self.gemspec_mapping.translate(raw_content)
self.assertEqual(result, {
'@context': 'https://doi.org/10.5063/schema/codemeta-2.0',
'type': 'SoftwareSourceCode',
})
raw_content = b"""
Gem::Specification.new do |s|
s.author = "Ruby Coder1",
end"""
result = self.gemspec_mapping.translate(raw_content)
self.assertEqual(result, {
'@context': 'https://doi.org/10.5063/schema/codemeta-2.0',
'type': 'SoftwareSourceCode',
})
raw_content = b"""
Gem::Specification.new do |s|
s.authors = ["Ruby Coder1", ["Ruby Coder2"]]
end"""
result = self.gemspec_mapping.translate(raw_content)
self.assertEqual(result, {
'@context': 'https://doi.org/10.5063/schema/codemeta-2.0',
'type': 'SoftwareSourceCode',
'author': ['Ruby Coder1'],
})
def test_gemspec_alternative_header(self):
raw_content = b"""
require './lib/version'
Gem::Specification.new { |s|
s.name = 'rb-system-with-aliases'
s.summary = 'execute system commands with aliases'
}
"""
result = self.gemspec_mapping.translate(raw_content)
self.assertEqual(result, {
'@context': 'https://doi.org/10.5063/schema/codemeta-2.0',
'type': 'SoftwareSourceCode',
'name': 'rb-system-with-aliases',
'description': 'execute system commands with aliases',
})
@settings(suppress_health_check=[HealthCheck.too_slow])
@given(json_document_strategy(
keys=list(MAPPINGS['NpmMapping'].mapping)))
def test_npm_adversarial(self, doc):
raw = json.dumps(doc).encode()
self.npm_mapping.translate(raw)
@settings(suppress_health_check=[HealthCheck.too_slow])
@given(json_document_strategy(keys=CODEMETA_TERMS))
def test_codemeta_adversarial(self, doc):
raw = json.dumps(doc).encode()
self.codemeta_mapping.translate(raw)
@settings(suppress_health_check=[HealthCheck.too_slow])
@given(xml_document_strategy(
keys=list(MAPPINGS['MavenMapping'].mapping),
root='project',
xmlns='http://maven.apache.org/POM/4.0.0'))
def test_maven_adversarial(self, doc):
self.maven_mapping.translate(doc)
@settings(suppress_health_check=[HealthCheck.too_slow])
@given(strategies.dictionaries(
# keys
strategies.one_of(
strategies.text(),
*map(strategies.just, MAPPINGS['GemspecMapping'].mapping)
),
# values
strategies.recursive(
strategies.characters(),
lambda children: strategies.lists(children, 1)
)
))
def test_gemspec_adversarial(self, doc):
parts = [b'Gem::Specification.new do |s|\n']
for (k, v) in doc.items():
parts.append(' s.{} = {}\n'.format(k, repr(v)).encode())
parts.append(b'end\n')
self.gemspec_mapping.translate(b''.join(parts))
def test_revision_metadata_indexer(self):
metadata_indexer = RevisionMetadataIndexer(
config=REVISION_METADATA_CONFIG)
fill_obj_storage(metadata_indexer.objstorage)
fill_storage(metadata_indexer.storage)
tool = metadata_indexer.idx_storage.indexer_configuration_get(
{'tool_'+k: v for (k, v) in TRANSLATOR_TOOL.items()})
assert tool is not None
metadata_indexer.idx_storage.content_metadata_add([{
'indexer_configuration_id': tool['id'],
'id': b'cde',
'metadata': YARN_PARSER_METADATA,
}])
sha1_gits = [
hash_to_bytes('8dbb6aeb036e7fd80664eb8bfd1507881af1ba9f'),
]
metadata_indexer.run(sha1_gits, 'update-dups')
results = list(
metadata_indexer.idx_storage.
revision_intrinsic_metadata_get(sha1_gits))
expected_results = [{
'id': hash_to_bytes('8dbb6aeb036e7fd80664eb8bfd1507881af1ba9f'),
'tool': TRANSLATOR_TOOL,
'metadata': YARN_PARSER_METADATA,
'mappings': ['npm'],
}]
for result in results:
del result['tool']['id']
# then
self.assertEqual(expected_results, results)
def test_revision_metadata_indexer_single_root_dir(self):
metadata_indexer = RevisionMetadataIndexer(
config=REVISION_METADATA_CONFIG)
fill_obj_storage(metadata_indexer.objstorage)
fill_storage(metadata_indexer.storage)
# Add a parent directory, that is the only directory at the root
# of the revision
rev_id = hash_to_bytes('8dbb6aeb036e7fd80664eb8bfd1507881af1ba9f')
subdir_id = metadata_indexer.storage._revisions[rev_id]['directory']
metadata_indexer.storage._revisions[rev_id]['directory'] = b'123456'
metadata_indexer.storage.directory_add([{
'id': b'123456',
'entries': [{
'target': subdir_id,
'type': 'dir',
'length': None,
'name': b'foobar-1.0.0',
'sha1': None,
'perms': 16384,
'sha1_git': None,
'status': None,
'sha256': None
}],
}])
tool = metadata_indexer.idx_storage.indexer_configuration_get(
{'tool_'+k: v for (k, v) in TRANSLATOR_TOOL.items()})
assert tool is not None
metadata_indexer.idx_storage.content_metadata_add([{
'indexer_configuration_id': tool['id'],
'id': b'cde',
'metadata': YARN_PARSER_METADATA,
}])
sha1_gits = [
hash_to_bytes('8dbb6aeb036e7fd80664eb8bfd1507881af1ba9f'),
]
metadata_indexer.run(sha1_gits, 'update-dups')
results = list(
metadata_indexer.idx_storage.
revision_intrinsic_metadata_get(sha1_gits))
expected_results = [{
'id': hash_to_bytes('8dbb6aeb036e7fd80664eb8bfd1507881af1ba9f'),
'tool': TRANSLATOR_TOOL,
'metadata': YARN_PARSER_METADATA,
'mappings': ['npm'],
}]
for result in results:
del result['tool']['id']
# then
self.assertEqual(expected_results, results)
diff --git a/swh/indexer/tests/utils.py b/swh/indexer/tests/utils.py
index a22de1b..f908b51 100644
--- a/swh/indexer/tests/utils.py
+++ b/swh/indexer/tests/utils.py
@@ -1,761 +1,762 @@
# Copyright (C) 2017-2018 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import abc
import datetime
import functools
import random
import unittest
from hypothesis import strategies
from swh.model import hashutil
from swh.model.hashutil import hash_to_bytes, hash_to_hex
from swh.indexer.storage import INDEXER_CFG_KEY
BASE_TEST_CONFIG = {
'storage': {
'cls': 'memory',
'args': {
},
},
'objstorage': {
'cls': 'memory',
'args': {
},
},
INDEXER_CFG_KEY: {
'cls': 'memory',
'args': {
},
},
}
ORIGINS = [
{
'lister': None,
'project': None,
'type': 'git',
'url': 'https://github.com/SoftwareHeritage/swh-storage'},
{
'lister': None,
'project': None,
'type': 'ftp',
'url': 'rsync://ftp.gnu.org/gnu/3dldf'},
{
'lister': None,
'project': None,
'type': 'deposit',
'url': 'https://forge.softwareheritage.org/source/jesuisgpl/'},
{
'lister': None,
'project': None,
'type': 'pypi',
'url': 'https://pypi.org/project/limnoria/'},
{
'lister': None,
'project': None,
'type': 'svn',
'url': 'http://0-512-md.googlecode.com/svn/'},
{
'lister': None,
'project': None,
'type': 'git',
'url': 'https://github.com/librariesio/yarn-parser'},
{
'lister': None,
'project': None,
'type': 'git',
'url': 'https://github.com/librariesio/yarn-parser.git'},
]
SNAPSHOTS = [
{
'origin': 'https://github.com/SoftwareHeritage/swh-storage',
'branches': {
b'refs/heads/add-revision-origin-cache': {
'target': b'L[\xce\x1c\x88\x8eF\t\xf1"\x19\x1e\xfb\xc0'
b's\xe7/\xe9l\x1e',
'target_type': 'revision'},
b'HEAD': {
'target': b'8K\x12\x00d\x03\xcc\xe4]bS\xe3\x8f{\xd7}'
b'\xac\xefrm',
'target_type': 'revision'},
b'refs/tags/v0.0.103': {
'target': b'\xb6"Im{\xfdLb\xb0\x94N\xea\x96m\x13x\x88+'
b'\x0f\xdd',
'target_type': 'release'},
}},
{
'origin': 'rsync://ftp.gnu.org/gnu/3dldf',
'branches': {
b'3DLDF-1.1.4.tar.gz': {
'target': b'dJ\xfb\x1c\x91\xf4\x82B%]6\xa2\x90|\xd3\xfc'
b'"G\x99\x11',
'target_type': 'revision'},
b'3DLDF-2.0.2.tar.gz': {
'target': b'\xb6\x0e\xe7\x9e9\xac\xaa\x19\x9e='
b'\xd1\xc5\x00\\\xc6\xfc\xe0\xa6\xb4V',
'target_type': 'revision'},
b'3DLDF-2.0.3-examples.tar.gz': {
'target': b'!H\x19\xc0\xee\x82-\x12F1\xbd\x97'
b'\xfe\xadZ\x80\x80\xc1\x83\xff',
'target_type': 'revision'},
b'3DLDF-2.0.3.tar.gz': {
'target': b'\x8e\xa9\x8e/\xea}\x9feF\xf4\x9f\xfd\xee'
b'\xcc\x1a\xb4`\x8c\x8by',
'target_type': 'revision'},
b'3DLDF-2.0.tar.gz': {
'target': b'F6*\xff(?\x19a\xef\xb6\xc2\x1fv$S\xe3G'
b'\xd3\xd1m',
b'target_type': 'revision'}
}},
{
'origin': 'https://forge.softwareheritage.org/source/jesuisgpl/',
'branches': {
b'master': {
'target': b'\xe7n\xa4\x9c\x9f\xfb\xb7\xf76\x11\x08{'
b'\xa6\xe9\x99\xb1\x9e]q\xeb',
'target_type': 'revision'}
},
'id': b"h\xc0\xd2a\x04\xd4~'\x8d\xd6\xbe\x07\xeda\xfa\xfbV"
b"\x1d\r "},
{
'origin': 'https://pypi.org/project/limnoria/',
'branches': {
b'HEAD': {
'target': b'releases/2018.09.09',
'target_type': 'alias'},
b'releases/2018.09.01': {
'target': b'<\xee1(\xe8\x8d_\xc1\xc9\xa6rT\xf1\x1d'
b'\xbb\xdfF\xfdw\xcf',
'target_type': 'revision'},
b'releases/2018.09.09': {
'target': b'\x83\xb9\xb6\xc7\x05\xb1%\xd0\xfem\xd8k'
b'A\x10\x9d\xc5\xfa2\xf8t',
'target_type': 'revision'}},
'id': b'{\xda\x8e\x84\x7fX\xff\x92\x80^\x93V\x18\xa3\xfay'
b'\x12\x9e\xd6\xb3'},
{
'origin': 'http://0-512-md.googlecode.com/svn/',
'branches': {
b'master': {
'target': b'\xe4?r\xe1,\x88\xab\xec\xe7\x9a\x87\xb8'
b'\xc9\xad#.\x1bw=\x18',
'target_type': 'revision'}},
'id': b'\xa1\xa2\x8c\n\xb3\x87\xa8\xf9\xe0a\x8c\xb7'
b'\x05\xea\xb8\x1f\xc4H\xf4s'},
{
'origin': 'https://github.com/librariesio/yarn-parser',
'branches': {
b'HEAD': {
'target': hash_to_bytes(
'8dbb6aeb036e7fd80664eb8bfd1507881af1ba9f'),
'target_type': 'revision'}}},
{
'origin': 'https://github.com/librariesio/yarn-parser.git',
'branches': {
b'HEAD': {
'target': hash_to_bytes(
'8dbb6aeb036e7fd80664eb8bfd1507881af1ba9f'),
'target_type': 'revision'}}},
]
REVISIONS = [{
'id': hash_to_bytes('8dbb6aeb036e7fd80664eb8bfd1507881af1ba9f'),
'author': {
'id': 26,
'name': b'Andrew Nesbitt',
'fullname': b'Andrew Nesbitt ',
'email': b'andrewnez@gmail.com'
},
'committer': {
'id': 26,
'name': b'Andrew Nesbitt',
'fullname': b'Andrew Nesbitt ',
'email': b'andrewnez@gmail.com'
},
'synthetic': False,
'date': {
'negative_utc': False,
'timestamp': {
'seconds': 1487596456,
'microseconds': 0
},
'offset': 0
},
'directory': b'10'
}]
DIRECTORY_ID = b'10'
DIRECTORY = [{
'sha1_git': b'abc',
'name': b'index.js',
'target': b'abc',
'length': 897,
'status': 'visible',
'type': 'file',
'perms': 33188,
'sha1': b'bcd'
},
{
'sha1_git': b'aab',
'name': b'package.json',
'target': b'aab',
'length': 712,
'status': 'visible',
'type': 'file',
'perms': 33188,
'sha1': b'cde'
},
{
'target': b'11',
'type': 'dir',
'length': None,
'name': b'.github',
'sha1': None,
'perms': 16384,
'sha1_git': None,
'status': None,
'sha256': None
}
]
SHA1_TO_LICENSES = {
'01c9379dfc33803963d07c1ccc748d3fe4c96bb5': ['GPL'],
'02fb2c89e14f7fab46701478c83779c7beb7b069': ['Apache2.0'],
'103bc087db1d26afc3a0283f38663d081e9b01e6': ['MIT'],
'688a5ef812c53907562fe379d4b3851e69c7cb15': ['AGPL'],
'da39a3ee5e6b4b0d3255bfef95601890afd80709': [],
}
SHA1_TO_CTAGS = {
'01c9379dfc33803963d07c1ccc748d3fe4c96bb5': [{
'name': 'foo',
'kind': 'str',
'line': 10,
'lang': 'bar',
}],
'd4c647f0fc257591cc9ba1722484229780d1c607': [{
'name': 'let',
'kind': 'int',
'line': 100,
'lang': 'haskell',
}],
'688a5ef812c53907562fe379d4b3851e69c7cb15': [{
'name': 'symbol',
'kind': 'float',
'line': 99,
'lang': 'python',
}],
}
OBJ_STORAGE_DATA = {
'01c9379dfc33803963d07c1ccc748d3fe4c96bb5': b'this is some text',
'688a5ef812c53907562fe379d4b3851e69c7cb15': b'another text',
'8986af901dd2043044ce8f0d8fc039153641cf17': b'yet another text',
'02fb2c89e14f7fab46701478c83779c7beb7b069': b"""
import unittest
import logging
from swh.indexer.mimetype import MimetypeIndexer
from swh.indexer.tests.test_utils import MockObjStorage
class MockStorage():
def content_mimetype_add(self, mimetypes):
self.state = mimetypes
self.conflict_update = conflict_update
def indexer_configuration_add(self, tools):
return [{
'id': 10,
}]
""",
'103bc087db1d26afc3a0283f38663d081e9b01e6': b"""
#ifndef __AVL__
#define __AVL__
typedef struct _avl_tree avl_tree;
typedef struct _data_t {
int content;
} data_t;
""",
'93666f74f1cf635c8c8ac118879da6ec5623c410': b"""
(should 'pygments (recognize 'lisp 'easily))
""",
'26a9f72a7c87cc9205725cfd879f514ff4f3d8d5': b"""
{
"name": "test_metadata",
"version": "0.0.1",
"description": "Simple package.json test for indexer",
"repository": {
"type": "git",
"url": "https://github.com/moranegg/metadata_test"
}
}
""",
'd4c647f0fc257591cc9ba1722484229780d1c607': b"""
{
"version": "5.0.3",
"name": "npm",
"description": "a package manager for JavaScript",
"keywords": [
"install",
"modules",
"package manager",
"package.json"
],
"preferGlobal": true,
"config": {
"publishtest": false
},
"homepage": "https://docs.npmjs.com/",
"author": "Isaac Z. Schlueter (http://blog.izs.me)",
"repository": {
"type": "git",
"url": "https://github.com/npm/npm"
},
"bugs": {
"url": "https://github.com/npm/npm/issues"
},
"dependencies": {
"JSONStream": "~1.3.1",
"abbrev": "~1.1.0",
"ansi-regex": "~2.1.1",
"ansicolors": "~0.3.2",
"ansistyles": "~0.1.3"
},
"devDependencies": {
"tacks": "~1.2.6",
"tap": "~10.3.2"
},
"license": "Artistic-2.0"
}
""",
'a7ab314d8a11d2c93e3dcf528ca294e7b431c449': b"""
""",
'da39a3ee5e6b4b0d3255bfef95601890afd80709': b'',
'636465': b"""
{
"name": "yarn-parser",
"version": "1.0.0",
"description": "Tiny web service for parsing yarn.lock files",
"main": "index.js",
"scripts": {
"start": "node index.js",
"test": "mocha"
},
"engines": {
"node": "9.8.0"
},
"repository": {
"type": "git",
"url": "git+https://github.com/librariesio/yarn-parser.git"
},
"keywords": [
"yarn",
"parse",
"lock",
"dependencies"
],
"author": "Andrew Nesbitt",
"license": "AGPL-3.0",
"bugs": {
"url": "https://github.com/librariesio/yarn-parser/issues"
},
"homepage": "https://github.com/librariesio/yarn-parser#readme",
"dependencies": {
"@yarnpkg/lockfile": "^1.0.0",
"body-parser": "^1.15.2",
"express": "^4.14.0"
},
"devDependencies": {
"chai": "^4.1.2",
"mocha": "^5.2.0",
"request": "^2.87.0",
"test": "^0.6.0"
}
}
"""
}
YARN_PARSER_METADATA = {
'@context': 'https://doi.org/10.5063/schema/codemeta-2.0',
'url':
'https://github.com/librariesio/yarn-parser#readme',
'codeRepository':
'git+git+https://github.com/librariesio/yarn-parser.git',
'author': [{
'type': 'Person',
'name': 'Andrew Nesbitt'
}],
'license': 'https://spdx.org/licenses/AGPL-3.0',
'version': '1.0.0',
'description':
'Tiny web service for parsing yarn.lock files',
'issueTracker':
'https://github.com/librariesio/yarn-parser/issues',
'name': 'yarn-parser',
'keywords': ['yarn', 'parse', 'lock', 'dependencies'],
+ 'type': 'SoftwareSourceCode',
}
json_dict_keys = strategies.one_of(
strategies.characters(),
*map(strategies.just, ['type', 'url', 'name', 'email', '@id',
'@context', 'repository', 'license',
'repositories', 'licenses'
]),
)
"""Hypothesis strategy that generates strings, with an emphasis on those
that are often used as dictionary keys in metadata files."""
generic_json_document = strategies.recursive(
strategies.none() | strategies.booleans() | strategies.floats() |
strategies.characters(),
lambda children: (
strategies.lists(children, 1) |
strategies.dictionaries(json_dict_keys, children, min_size=1)
)
)
"""Hypothesis strategy that generates possible values for values of JSON
metadata files."""
def json_document_strategy(keys=None):
"""Generates an hypothesis strategy that generates metadata files
for a JSON-based format that uses the given keys."""
if keys is None:
keys = strategies.characters()
else:
keys = strategies.one_of(map(strategies.just, keys))
return strategies.dictionaries(keys, generic_json_document, min_size=1)
def _tree_to_xml(root, xmlns, data):
def encode(s):
"Skips unpaired surrogates generated by json_document_strategy"
return s.encode('utf8', 'replace')
def to_xml(data, indent=b' '):
if data is None:
return b''
elif isinstance(data, (bool, str, int, float)):
return indent + encode(str(data))
elif isinstance(data, list):
return b'\n'.join(to_xml(v, indent=indent) for v in data)
elif isinstance(data, dict):
lines = []
for (key, value) in data.items():
lines.append(indent + encode('<{}>'.format(key)))
lines.append(to_xml(value, indent=indent+b' '))
lines.append(indent + encode('{}>'.format(key)))
return b'\n'.join(lines)
else:
raise TypeError(data)
return b'\n'.join([
'<{} xmlns="{}">'.format(root, xmlns).encode(),
to_xml(data),
'{}>'.format(root).encode(),
])
class TreeToXmlTest(unittest.TestCase):
def test_leaves(self):
self.assertEqual(
_tree_to_xml('root', 'http://example.com', None),
b'\n\n'
)
self.assertEqual(
_tree_to_xml('root', 'http://example.com', True),
b'\n True\n'
)
self.assertEqual(
_tree_to_xml('root', 'http://example.com', 'abc'),
b'\n abc\n'
)
self.assertEqual(
_tree_to_xml('root', 'http://example.com', 42),
b'\n 42\n'
)
self.assertEqual(
_tree_to_xml('root', 'http://example.com', 3.14),
b'\n 3.14\n'
)
def test_dict(self):
self.assertIn(
_tree_to_xml('root', 'http://example.com', {
'foo': 'bar',
'baz': 'qux'
}),
[
b'\n'
b' \n bar\n \n'
b' \n qux\n \n'
b'',
b'\n'
b' \n qux\n \n'
b' \n bar\n \n'
b''
]
)
def test_list(self):
self.assertEqual(
_tree_to_xml('root', 'http://example.com', [
{'foo': 'bar'},
{'foo': 'baz'},
]),
b'\n'
b' \n bar\n \n'
b' \n baz\n \n'
b''
)
def xml_document_strategy(keys, root, xmlns):
"""Generates an hypothesis strategy that generates metadata files
for an XML format that uses the given keys."""
return strategies.builds(
functools.partial(_tree_to_xml, root, xmlns),
json_document_strategy(keys))
def filter_dict(d, keys):
'return a copy of the dict with keys deleted'
if not isinstance(keys, (list, tuple)):
keys = (keys, )
return dict((k, v) for (k, v) in d.items() if k not in keys)
def fill_obj_storage(obj_storage):
"""Add some content in an object storage."""
for (obj_id, content) in OBJ_STORAGE_DATA.items():
obj_storage.add(content, obj_id=hash_to_bytes(obj_id))
def fill_storage(storage):
for origin in ORIGINS:
storage.origin_add_one(origin)
for snap in SNAPSHOTS:
origin_url = snap['origin']
visit = storage.origin_visit_add(origin_url, datetime.datetime.now())
snap_id = snap.get('id') or \
bytes([random.randint(0, 255) for _ in range(32)])
storage.snapshot_add([{
'id': snap_id,
'branches': snap['branches']
}])
storage.origin_visit_update(
origin_url, visit['visit'], status='full', snapshot=snap_id)
storage.revision_add(REVISIONS)
storage.directory_add([{
'id': DIRECTORY_ID,
'entries': DIRECTORY,
}])
for (obj_id, content) in OBJ_STORAGE_DATA.items():
content_hashes = hashutil.MultiHash.from_data(content).digest()
storage.content_add([{
'data': content,
'length': len(content),
'status': 'visible',
'sha1': hash_to_bytes(obj_id),
'sha1_git': hash_to_bytes(obj_id),
'sha256': content_hashes['sha256'],
'blake2s256': content_hashes['blake2s256']
}])
class CommonContentIndexerTest(metaclass=abc.ABCMeta):
legacy_get_format = False
"""True if and only if the tested indexer uses the legacy format.
see: https://forge.softwareheritage.org/T1433
"""
def get_indexer_results(self, ids):
"""Override this for indexers that don't have a mock storage."""
return self.indexer.idx_storage.state
def assert_legacy_results_ok(self, sha1s, expected_results=None):
# XXX old format, remove this when all endpoints are
# updated to the new one
# see: https://forge.softwareheritage.org/T1433
sha1s = [sha1 if isinstance(sha1, bytes) else hash_to_bytes(sha1)
for sha1 in sha1s]
actual_results = list(self.get_indexer_results(sha1s))
if expected_results is None:
expected_results = self.expected_results
self.assertEqual(len(expected_results), len(actual_results),
(expected_results, actual_results))
for indexed_data in actual_results:
_id = indexed_data['id']
expected_data = expected_results[hashutil.hash_to_hex(_id)].copy()
expected_data['id'] = _id
self.assertEqual(indexed_data, expected_data)
def assert_results_ok(self, sha1s, expected_results=None):
if self.legacy_get_format:
self.assert_legacy_results_ok(sha1s, expected_results)
return
sha1s = [sha1 if isinstance(sha1, bytes) else hash_to_bytes(sha1)
for sha1 in sha1s]
actual_results = list(self.get_indexer_results(sha1s))
if expected_results is None:
expected_results = self.expected_results
self.assertEqual(len(expected_results), len(actual_results),
(expected_results, actual_results))
for indexed_data in actual_results:
(_id, indexed_data) = list(indexed_data.items())[0]
expected_data = expected_results[hashutil.hash_to_hex(_id)].copy()
expected_data = [expected_data]
self.assertEqual(indexed_data, expected_data)
def test_index(self):
"""Known sha1 have their data indexed
"""
sha1s = [self.id0, self.id1, self.id2]
# when
self.indexer.run(sha1s, policy_update='update-dups')
self.assert_results_ok(sha1s)
# 2nd pass
self.indexer.run(sha1s, policy_update='ignore-dups')
self.assert_results_ok(sha1s)
def test_index_one_unknown_sha1(self):
"""Unknown sha1 are not indexed"""
sha1s = [self.id1,
'799a5ef812c53907562fe379d4b3851e69c7cb15', # unknown
'800a5ef812c53907562fe379d4b3851e69c7cb15'] # unknown
# when
self.indexer.run(sha1s, policy_update='update-dups')
# then
expected_results = {
k: v for k, v in self.expected_results.items() if k in sha1s
}
self.assert_results_ok(sha1s, expected_results)
class CommonContentIndexerRangeTest:
"""Allows to factorize tests on range indexer.
"""
def setUp(self):
self.contents = sorted(OBJ_STORAGE_DATA)
def assert_results_ok(self, start, end, actual_results,
expected_results=None):
if expected_results is None:
expected_results = self.expected_results
actual_results = list(actual_results)
for indexed_data in actual_results:
_id = indexed_data['id']
assert isinstance(_id, bytes)
indexed_data = indexed_data.copy()
indexed_data['id'] = hash_to_hex(indexed_data['id'])
self.assertEqual(indexed_data, expected_results[hash_to_hex(_id)])
self.assertTrue(start <= _id <= end)
_tool_id = indexed_data['indexer_configuration_id']
self.assertEqual(_tool_id, self.indexer.tool['id'])
def test__index_contents(self):
"""Indexing contents without existing data results in indexed data
"""
_start, _end = [self.contents[0], self.contents[2]] # output hex ids
start, end = map(hashutil.hash_to_bytes, (_start, _end))
# given
actual_results = list(self.indexer._index_contents(
start, end, indexed={}))
self.assert_results_ok(start, end, actual_results)
def test__index_contents_with_indexed_data(self):
"""Indexing contents with existing data results in less indexed data
"""
_start, _end = [self.contents[0], self.contents[2]] # output hex ids
start, end = map(hashutil.hash_to_bytes, (_start, _end))
data_indexed = [self.id0, self.id2]
# given
actual_results = self.indexer._index_contents(
start, end, indexed=set(map(hash_to_bytes, data_indexed)))
# craft the expected results
expected_results = self.expected_results.copy()
for already_indexed_key in data_indexed:
expected_results.pop(already_indexed_key)
self.assert_results_ok(
start, end, actual_results, expected_results)
def test_generate_content_get(self):
"""Optimal indexing should result in indexed data
"""
_start, _end = [self.contents[0], self.contents[2]] # output hex ids
start, end = map(hashutil.hash_to_bytes, (_start, _end))
# given
actual_results = self.indexer.run(start, end)
# then
self.assertTrue(actual_results)
def test_generate_content_get_input_as_bytes(self):
"""Optimal indexing should result in indexed data
Input are in bytes here.
"""
_start, _end = [self.contents[0], self.contents[2]] # output hex ids
start, end = map(hashutil.hash_to_bytes, (_start, _end))
# given
actual_results = self.indexer.run( # checks the bytes input this time
start, end, skip_existing=False)
# no already indexed data so same result as prior test
# then
self.assertTrue(actual_results)
def test_generate_content_get_no_result(self):
"""No result indexed returns False"""
_start, _end = ['0000000000000000000000000000000000000000',
'0000000000000000000000000000000000000001']
start, end = map(hashutil.hash_to_bytes, (_start, _end))
# given
actual_results = self.indexer.run(
start, end, incremental=False)
# then
self.assertFalse(actual_results)