diff --git a/swh/indexer/codemeta.py b/swh/indexer/codemeta.py
index cda2058..b670b87 100644
--- a/swh/indexer/codemeta.py
+++ b/swh/indexer/codemeta.py
@@ -1,127 +1,127 @@
# Copyright (C) 2018 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import csv
import json
import os.path
import re
import swh.indexer
from pyld import jsonld
_DATA_DIR = os.path.join(os.path.dirname(swh.indexer.__file__), 'data')
CROSSWALK_TABLE_PATH = os.path.join(_DATA_DIR, 'codemeta', 'crosswalk.csv')
CODEMETA_CONTEXT_PATH = os.path.join(_DATA_DIR, 'codemeta', 'codemeta.jsonld')
with open(CODEMETA_CONTEXT_PATH) as fd:
CODEMETA_CONTEXT = json.load(fd)
CODEMETA_CONTEXT_URL = 'https://doi.org/10.5063/schema/codemeta-2.0'
CODEMETA_ALTERNATE_CONTEXT_URLS = {
('https://raw.githubusercontent.com/codemeta/codemeta/'
'master/codemeta.jsonld')
}
CODEMETA_URI = 'https://codemeta.github.io/terms/'
SCHEMA_URI = 'http://schema.org/'
PROPERTY_BLACKLIST = {
# CodeMeta properties that we cannot properly represent.
SCHEMA_URI + 'softwareRequirements',
CODEMETA_URI + 'softwareSuggestions',
# Duplicate of 'author'
SCHEMA_URI + 'creator',
}
_codemeta_field_separator = re.compile(r'\s*[,/]\s*')
def make_absolute_uri(local_name):
definition = CODEMETA_CONTEXT['@context'][local_name]
if isinstance(definition, str):
return definition
elif isinstance(definition, dict):
prefixed_name = definition['@id']
(prefix, local_name) = prefixed_name.split(':')
if prefix == 'schema':
canonical_name = SCHEMA_URI + local_name
elif prefix == 'codemeta':
canonical_name = CODEMETA_URI + local_name
else:
assert False, prefix
return canonical_name
else:
assert False, definition
def _read_crosstable(fd):
reader = csv.reader(fd)
try:
header = next(reader)
except StopIteration:
raise ValueError('empty file')
data_sources = set(header) - {'Parent Type', 'Property',
'Type', 'Description'}
assert 'codemeta-V1' in data_sources
codemeta_translation = {data_source: {} for data_source in data_sources}
for line in reader: # For each canonical name
local_name = dict(zip(header, line))['Property']
if not local_name:
continue
canonical_name = make_absolute_uri(local_name)
if canonical_name in PROPERTY_BLACKLIST:
continue
for (col, value) in zip(header, line): # For each cell in the row
if col in data_sources:
# If that's not the parentType/property/type/description
for local_name in _codemeta_field_separator.split(value):
# For each of the data source's properties that maps
# to this canonical name
if local_name.strip():
codemeta_translation[col][local_name.strip()] = \
canonical_name
- return codemeta_translation
+ return (header, codemeta_translation)
with open(CROSSWALK_TABLE_PATH) as fd:
- CROSSWALK_TABLE = _read_crosstable(fd)
+ (CODEMETA_KEYS, CROSSWALK_TABLE) = _read_crosstable(fd)
def _document_loader(url):
"""Document loader for pyld.
Reads the local codemeta.jsonld file instead of fetching it
from the Internet every single time."""
if url == CODEMETA_CONTEXT_URL or url in CODEMETA_ALTERNATE_CONTEXT_URLS:
return {
'contextUrl': None,
'documentUrl': url,
'document': CODEMETA_CONTEXT,
}
elif url == CODEMETA_URI:
raise Exception('{} is CodeMeta\'s URI, use {} as context url'.format(
CODEMETA_URI, CODEMETA_CONTEXT_URL))
else:
raise Exception(url)
def compact(doc):
"""Same as `pyld.jsonld.compact`, but in the context of CodeMeta."""
return jsonld.compact(doc, CODEMETA_CONTEXT_URL,
options={'documentLoader': _document_loader})
def expand(doc):
"""Same as `pyld.jsonld.expand`, but in the context of CodeMeta."""
return jsonld.expand(doc,
options={'documentLoader': _document_loader})
diff --git a/swh/indexer/tests/test_metadata.py b/swh/indexer/tests/test_metadata.py
index c67312b..0b93a64 100644
--- a/swh/indexer/tests/test_metadata.py
+++ b/swh/indexer/tests/test_metadata.py
@@ -1,1091 +1,1132 @@
# Copyright (C) 2017-2018 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
+import json
import unittest
+from hypothesis import given, strategies
+import xmltodict
+
from swh.model.hashutil import hash_to_bytes
+from swh.indexer.codemeta import CODEMETA_KEYS
from swh.indexer.metadata_dictionary import (
CROSSWALK_TABLE, MAPPINGS, merge_values)
from swh.indexer.metadata_detector import (
detect_metadata, extract_minimal_metadata_dict
)
from swh.indexer.metadata import (
ContentMetadataIndexer, RevisionMetadataIndexer
)
from .utils import (
BASE_TEST_CONFIG, fill_obj_storage, fill_storage,
- YARN_PARSER_METADATA
+ YARN_PARSER_METADATA, json_document_strategy
)
TRANSLATOR_TOOL = {
'name': 'swh-metadata-translator',
'version': '0.0.2',
'configuration': {
'type': 'local',
'context': 'NpmMapping'
}
}
class ContentMetadataTestIndexer(ContentMetadataIndexer):
"""Specific Metadata whose configuration is enough to satisfy the
indexing tests.
"""
def parse_config_file(self, *args, **kwargs):
assert False, 'should not be called; the rev indexer configures it.'
REVISION_METADATA_CONFIG = {
**BASE_TEST_CONFIG,
'tools': TRANSLATOR_TOOL,
}
class Metadata(unittest.TestCase):
"""
Tests metadata_mock_tool tool for Metadata detection
"""
def setUp(self):
"""
shows the entire diff in the results
"""
self.maxDiff = None
self.npm_mapping = MAPPINGS['NpmMapping']()
self.codemeta_mapping = MAPPINGS['CodemetaMapping']()
self.maven_mapping = MAPPINGS['MavenMapping']()
self.pkginfo_mapping = MAPPINGS['PythonPkginfoMapping']()
self.gemspec_mapping = MAPPINGS['GemspecMapping']()
def test_crosstable(self):
self.assertEqual(CROSSWALK_TABLE['NodeJS'], {
'repository': 'http://schema.org/codeRepository',
'os': 'http://schema.org/operatingSystem',
'cpu': 'http://schema.org/processorRequirements',
'engines':
'http://schema.org/processorRequirements',
'author': 'http://schema.org/author',
'author.email': 'http://schema.org/email',
'author.name': 'http://schema.org/name',
'contributor': 'http://schema.org/contributor',
'keywords': 'http://schema.org/keywords',
'license': 'http://schema.org/license',
'version': 'http://schema.org/version',
'description': 'http://schema.org/description',
'name': 'http://schema.org/name',
'bugs': 'https://codemeta.github.io/terms/issueTracker',
'homepage': 'http://schema.org/url'
})
def test_merge_values(self):
self.assertEqual(
merge_values('a', 'b'),
['a', 'b'])
self.assertEqual(
merge_values(['a', 'b'], 'c'),
['a', 'b', 'c'])
self.assertEqual(
merge_values('a', ['b', 'c']),
['a', 'b', 'c'])
self.assertEqual(
merge_values({'@list': ['a']}, {'@list': ['b']}),
{'@list': ['a', 'b']})
self.assertEqual(
merge_values({'@list': ['a', 'b']}, {'@list': ['c']}),
{'@list': ['a', 'b', 'c']})
with self.assertRaises(ValueError):
merge_values({'@list': ['a']}, 'b')
with self.assertRaises(ValueError):
merge_values('a', {'@list': ['b']})
with self.assertRaises(ValueError):
merge_values({'@list': ['a']}, ['b'])
with self.assertRaises(ValueError):
merge_values(['a'], {'@list': ['b']})
self.assertEqual(
merge_values('a', None),
'a')
self.assertEqual(
merge_values(['a', 'b'], None),
['a', 'b'])
self.assertEqual(
merge_values(None, ['b', 'c']),
['b', 'c'])
self.assertEqual(
merge_values({'@list': ['a']}, None),
{'@list': ['a']})
self.assertEqual(
merge_values(None, {'@list': ['a']}),
{'@list': ['a']})
def test_compute_metadata_none(self):
"""
testing content empty content is empty
should return None
"""
# given
content = b""
# None if no metadata was found or an error occurred
declared_metadata = None
# when
result = self.npm_mapping.translate(content)
# then
self.assertEqual(declared_metadata, result)
def test_compute_metadata_npm(self):
"""
testing only computation of metadata with hard_mapping_npm
"""
# given
content = b"""
{
"name": "test_metadata",
"version": "0.0.2",
"description": "Simple package.json test for indexer",
"repository": {
"type": "git",
"url": "https://github.com/moranegg/metadata_test"
},
"author": {
"email": "moranegg@example.com",
"name": "Morane G"
}
}
"""
declared_metadata = {
'@context': 'https://doi.org/10.5063/schema/codemeta-2.0',
'type': 'SoftwareSourceCode',
'name': 'test_metadata',
'version': '0.0.2',
'description': 'Simple package.json test for indexer',
'codeRepository':
'git+https://github.com/moranegg/metadata_test',
'author': [{
'type': 'Person',
'name': 'Morane G',
'email': 'moranegg@example.com',
}],
}
# when
result = self.npm_mapping.translate(content)
# then
self.assertEqual(declared_metadata, result)
def test_extract_minimal_metadata_dict(self):
"""
Test the creation of a coherent minimal metadata set
"""
# given
metadata_list = [{
'@context': 'https://doi.org/10.5063/schema/codemeta-2.0',
'name': 'test_1',
'version': '0.0.2',
'description': 'Simple package.json test for indexer',
'codeRepository':
'git+https://github.com/moranegg/metadata_test',
}, {
'@context': 'https://doi.org/10.5063/schema/codemeta-2.0',
'name': 'test_0_1',
'version': '0.0.2',
'description': 'Simple package.json test for indexer',
'codeRepository':
'git+https://github.com/moranegg/metadata_test'
}, {
'@context': 'https://doi.org/10.5063/schema/codemeta-2.0',
'name': 'test_metadata',
'version': '0.0.2',
'author': 'moranegg',
}]
# when
results = extract_minimal_metadata_dict(metadata_list)
# then
expected_results = {
'@context': 'https://doi.org/10.5063/schema/codemeta-2.0',
"version": '0.0.2',
"description": 'Simple package.json test for indexer',
"name": ['test_1', 'test_0_1', 'test_metadata'],
"author": ['moranegg'],
"codeRepository":
'git+https://github.com/moranegg/metadata_test',
}
self.assertEqual(expected_results, results)
def test_index_content_metadata_npm(self):
"""
testing NPM with package.json
- one sha1 uses a file that can't be translated to metadata and
should return None in the translated metadata
"""
# given
sha1s = [
hash_to_bytes('26a9f72a7c87cc9205725cfd879f514ff4f3d8d5'),
hash_to_bytes('d4c647f0fc257591cc9ba1722484229780d1c607'),
hash_to_bytes('02fb2c89e14f7fab46701478c83779c7beb7b069'),
]
# this metadata indexer computes only metadata for package.json
# in npm context with a hard mapping
config = BASE_TEST_CONFIG.copy()
config['tools'] = [TRANSLATOR_TOOL]
metadata_indexer = ContentMetadataTestIndexer(config=config)
fill_obj_storage(metadata_indexer.objstorage)
fill_storage(metadata_indexer.storage)
# when
metadata_indexer.run(sha1s, policy_update='ignore-dups')
results = list(metadata_indexer.idx_storage.content_metadata_get(
sha1s))
expected_results = [{
'translated_metadata': {
'@context': 'https://doi.org/10.5063/schema/codemeta-2.0',
'type': 'SoftwareSourceCode',
'codeRepository':
'git+https://github.com/moranegg/metadata_test',
'description': 'Simple package.json test for indexer',
'name': 'test_metadata',
'version': '0.0.1'
},
'id': hash_to_bytes('26a9f72a7c87cc9205725cfd879f514ff4f3d8d5'),
}, {
'translated_metadata': {
'@context': 'https://doi.org/10.5063/schema/codemeta-2.0',
'type': 'SoftwareSourceCode',
'issueTracker':
'https://github.com/npm/npm/issues',
'author': [{
'type': 'Person',
'name': 'Isaac Z. Schlueter',
'email': 'i@izs.me',
'url': 'http://blog.izs.me',
}],
'codeRepository':
'git+https://github.com/npm/npm',
'description': 'a package manager for JavaScript',
'license': 'https://spdx.org/licenses/Artistic-2.0',
'version': '5.0.3',
'name': 'npm',
'keywords': [
'install',
'modules',
'package manager',
'package.json'
],
'url': 'https://docs.npmjs.com/'
},
'id': hash_to_bytes('d4c647f0fc257591cc9ba1722484229780d1c607')
}]
for result in results:
del result['tool']
# The assertion below returns False sometimes because of nested lists
self.assertEqual(expected_results, results)
def test_npm_bugs_normalization(self):
# valid dictionary
package_json = b"""{
"name": "foo",
"bugs": {
"url": "https://github.com/owner/project/issues",
"email": "foo@example.com"
}
}"""
result = self.npm_mapping.translate(package_json)
self.assertEqual(result, {
'@context': 'https://doi.org/10.5063/schema/codemeta-2.0',
'name': 'foo',
'issueTracker': 'https://github.com/owner/project/issues',
'type': 'SoftwareSourceCode',
})
# "invalid" dictionary
package_json = b"""{
"name": "foo",
"bugs": {
"email": "foo@example.com"
}
}"""
result = self.npm_mapping.translate(package_json)
self.assertEqual(result, {
'@context': 'https://doi.org/10.5063/schema/codemeta-2.0',
'name': 'foo',
'type': 'SoftwareSourceCode',
})
# string
package_json = b"""{
"name": "foo",
"bugs": "https://github.com/owner/project/issues"
}"""
result = self.npm_mapping.translate(package_json)
self.assertEqual(result, {
'@context': 'https://doi.org/10.5063/schema/codemeta-2.0',
'name': 'foo',
'issueTracker': 'https://github.com/owner/project/issues',
'type': 'SoftwareSourceCode',
})
def test_npm_repository_normalization(self):
# normal
package_json = b"""{
"name": "foo",
"repository": {
"type" : "git",
"url" : "https://github.com/npm/cli.git"
}
}"""
result = self.npm_mapping.translate(package_json)
self.assertEqual(result, {
'@context': 'https://doi.org/10.5063/schema/codemeta-2.0',
'name': 'foo',
'codeRepository': 'git+https://github.com/npm/cli.git',
'type': 'SoftwareSourceCode',
})
# missing url
package_json = b"""{
"name": "foo",
"repository": {
"type" : "git"
}
}"""
result = self.npm_mapping.translate(package_json)
self.assertEqual(result, {
'@context': 'https://doi.org/10.5063/schema/codemeta-2.0',
'name': 'foo',
'type': 'SoftwareSourceCode',
})
# github shortcut
package_json = b"""{
"name": "foo",
"repository": "github:npm/cli"
}"""
result = self.npm_mapping.translate(package_json)
expected_result = {
'@context': 'https://doi.org/10.5063/schema/codemeta-2.0',
'name': 'foo',
'codeRepository': 'git+https://github.com/npm/cli.git',
'type': 'SoftwareSourceCode',
}
self.assertEqual(result, expected_result)
# github shortshortcut
package_json = b"""{
"name": "foo",
"repository": "npm/cli"
}"""
result = self.npm_mapping.translate(package_json)
self.assertEqual(result, expected_result)
# gitlab shortcut
package_json = b"""{
"name": "foo",
"repository": "gitlab:user/repo"
}"""
result = self.npm_mapping.translate(package_json)
self.assertEqual(result, {
'@context': 'https://doi.org/10.5063/schema/codemeta-2.0',
'name': 'foo',
'codeRepository': 'git+https://gitlab.com/user/repo.git',
'type': 'SoftwareSourceCode',
})
def test_detect_metadata_package_json(self):
# given
df = [{
'sha1_git': b'abc',
'name': b'index.js',
'target': b'abc',
'length': 897,
'status': 'visible',
'type': 'file',
'perms': 33188,
'dir_id': b'dir_a',
'sha1': b'bcd'
},
{
'sha1_git': b'aab',
'name': b'package.json',
'target': b'aab',
'length': 712,
'status': 'visible',
'type': 'file',
'perms': 33188,
'dir_id': b'dir_a',
'sha1': b'cde'
}]
# when
results = detect_metadata(df)
expected_results = {
'NpmMapping': [
b'cde'
]
}
# then
self.assertEqual(expected_results, results)
def test_compute_metadata_valid_codemeta(self):
raw_content = (
b"""{
"@context": "https://doi.org/10.5063/schema/codemeta-2.0",
"@type": "SoftwareSourceCode",
"identifier": "CodeMeta",
"description": "CodeMeta is a concept vocabulary that can be used to standardize the exchange of software metadata across repositories and organizations.",
"name": "CodeMeta: Minimal metadata schemas for science software and code, in JSON-LD",
"codeRepository": "https://github.com/codemeta/codemeta",
"issueTracker": "https://github.com/codemeta/codemeta/issues",
"license": "https://spdx.org/licenses/Apache-2.0",
"version": "2.0",
"author": [
{
"@type": "Person",
"givenName": "Carl",
"familyName": "Boettiger",
"email": "cboettig@gmail.com",
"@id": "http://orcid.org/0000-0002-1642-628X"
},
{
"@type": "Person",
"givenName": "Matthew B.",
"familyName": "Jones",
"email": "jones@nceas.ucsb.edu",
"@id": "http://orcid.org/0000-0003-0077-4738"
}
],
"maintainer": {
"@type": "Person",
"givenName": "Carl",
"familyName": "Boettiger",
"email": "cboettig@gmail.com",
"@id": "http://orcid.org/0000-0002-1642-628X"
},
"contIntegration": "https://travis-ci.org/codemeta/codemeta",
"developmentStatus": "active",
"downloadUrl": "https://github.com/codemeta/codemeta/archive/2.0.zip",
"funder": {
"@id": "https://doi.org/10.13039/100000001",
"@type": "Organization",
"name": "National Science Foundation"
},
"funding":"1549758; Codemeta: A Rosetta Stone for Metadata in Scientific Software",
"keywords": [
"metadata",
"software"
],
"version":"2.0",
"dateCreated":"2017-06-05",
"datePublished":"2017-06-05",
"programmingLanguage": "JSON-LD"
}""") # noqa
expected_result = {
"@context": "https://doi.org/10.5063/schema/codemeta-2.0",
"type": "SoftwareSourceCode",
"identifier": "CodeMeta",
"description":
"CodeMeta is a concept vocabulary that can "
"be used to standardize the exchange of software metadata "
"across repositories and organizations.",
"name":
"CodeMeta: Minimal metadata schemas for science "
"software and code, in JSON-LD",
"codeRepository": "https://github.com/codemeta/codemeta",
"issueTracker": "https://github.com/codemeta/codemeta/issues",
"license": "https://spdx.org/licenses/Apache-2.0",
"version": "2.0",
"author": [
{
"type": "Person",
"givenName": "Carl",
"familyName": "Boettiger",
"email": "cboettig@gmail.com",
"id": "http://orcid.org/0000-0002-1642-628X"
},
{
"type": "Person",
"givenName": "Matthew B.",
"familyName": "Jones",
"email": "jones@nceas.ucsb.edu",
"id": "http://orcid.org/0000-0003-0077-4738"
}
],
"maintainer": {
"type": "Person",
"givenName": "Carl",
"familyName": "Boettiger",
"email": "cboettig@gmail.com",
"id": "http://orcid.org/0000-0002-1642-628X"
},
"contIntegration": "https://travis-ci.org/codemeta/codemeta",
"developmentStatus": "active",
"downloadUrl":
"https://github.com/codemeta/codemeta/archive/2.0.zip",
"funder": {
"id": "https://doi.org/10.13039/100000001",
"type": "Organization",
"name": "National Science Foundation"
},
"funding": "1549758; Codemeta: A Rosetta Stone for Metadata "
"in Scientific Software",
"keywords": [
"metadata",
"software"
],
"version": "2.0",
"dateCreated": "2017-06-05",
"datePublished": "2017-06-05",
"programmingLanguage": "JSON-LD"
}
result = self.codemeta_mapping.translate(raw_content)
self.assertEqual(result, expected_result)
def test_compute_metadata_codemeta_alternate_context(self):
raw_content = (
b"""{
"@context": "https://raw.githubusercontent.com/codemeta/codemeta/master/codemeta.jsonld",
"@type": "SoftwareSourceCode",
"identifier": "CodeMeta"
}""") # noqa
expected_result = {
"@context": "https://doi.org/10.5063/schema/codemeta-2.0",
"type": "SoftwareSourceCode",
"identifier": "CodeMeta",
}
result = self.codemeta_mapping.translate(raw_content)
self.assertEqual(result, expected_result)
def test_compute_metadata_maven(self):
raw_content = b"""
Maven Default Project
4.0.0
com.mycompany.app
my-app
1.2.3
central
Maven Repository Switchboard
default
http://repo1.maven.org/maven2
false
Apache License, Version 2.0
https://www.apache.org/licenses/LICENSE-2.0.txt
repo
A business-friendly OSS license
"""
result = self.maven_mapping.translate(raw_content)
self.assertEqual(result, {
'@context': 'https://doi.org/10.5063/schema/codemeta-2.0',
'type': 'SoftwareSourceCode',
'name': 'Maven Default Project',
'identifier': 'com.mycompany.app',
'version': '1.2.3',
'license': 'https://www.apache.org/licenses/LICENSE-2.0.txt',
'codeRepository':
'http://repo1.maven.org/maven2/com/mycompany/app/my-app',
})
def test_compute_metadata_maven_empty(self):
raw_content = b"""
"""
result = self.maven_mapping.translate(raw_content)
self.assertEqual(result, {
'@context': 'https://doi.org/10.5063/schema/codemeta-2.0',
'type': 'SoftwareSourceCode',
})
def test_compute_metadata_maven_almost_empty(self):
raw_content = b"""
"""
result = self.maven_mapping.translate(raw_content)
self.assertEqual(result, {
'@context': 'https://doi.org/10.5063/schema/codemeta-2.0',
'type': 'SoftwareSourceCode',
})
def test_compute_metadata_maven_invalid_xml(self):
expected_warning = (
'WARNING:swh.indexer.metadata_dictionary.MavenMapping:'
'Error parsing XML from foo')
raw_content = b"""
"""
with self.assertLogs('swh.indexer.metadata_dictionary',
level='WARNING') as cm:
result = MAPPINGS["MavenMapping"]('foo').translate(raw_content)
self.assertEqual(cm.output, [expected_warning])
self.assertEqual(result, None)
raw_content = b"""
"""
with self.assertLogs('swh.indexer.metadata_dictionary',
level='WARNING') as cm:
result = MAPPINGS["MavenMapping"]('foo').translate(raw_content)
self.assertEqual(cm.output, [expected_warning])
self.assertEqual(result, None)
def test_compute_metadata_maven_unknown_encoding(self):
expected_warning = (
'WARNING:swh.indexer.metadata_dictionary.MavenMapping:'
'Error detecting XML encoding from foo')
raw_content = b"""
"""
with self.assertLogs('swh.indexer.metadata_dictionary',
level='WARNING') as cm:
result = MAPPINGS["MavenMapping"]('foo').translate(raw_content)
self.assertEqual(cm.output, [expected_warning])
self.assertEqual(result, None)
raw_content = b"""
"""
with self.assertLogs('swh.indexer.metadata_dictionary',
level='WARNING') as cm:
result = MAPPINGS["MavenMapping"]('foo').translate(raw_content)
self.assertEqual(cm.output, [expected_warning])
self.assertEqual(result, None)
def test_compute_metadata_maven_invalid_encoding(self):
expected_warning = (
'WARNING:swh.indexer.metadata_dictionary.MavenMapping:'
'Error unidecoding XML from foo')
raw_content = b"""
"""
with self.assertLogs('swh.indexer.metadata_dictionary',
level='WARNING') as cm:
result = MAPPINGS["MavenMapping"]('foo').translate(raw_content)
self.assertEqual(cm.output, [expected_warning])
self.assertEqual(result, None)
def test_compute_metadata_maven_minimal(self):
raw_content = b"""
Maven Default Project
4.0.0
com.mycompany.app
my-app
1.2.3
"""
result = self.maven_mapping.translate(raw_content)
self.assertEqual(result, {
'@context': 'https://doi.org/10.5063/schema/codemeta-2.0',
'type': 'SoftwareSourceCode',
'name': 'Maven Default Project',
'identifier': 'com.mycompany.app',
'version': '1.2.3',
'codeRepository':
'https://repo.maven.apache.org/maven2/com/mycompany/app/my-app',
})
def test_compute_metadata_maven_empty_nodes(self):
raw_content = b"""
Maven Default Project
4.0.0
com.mycompany.app
my-app
1.2.3
"""
result = self.maven_mapping.translate(raw_content)
self.assertEqual(result, {
'@context': 'https://doi.org/10.5063/schema/codemeta-2.0',
'type': 'SoftwareSourceCode',
'name': 'Maven Default Project',
'identifier': 'com.mycompany.app',
'version': '1.2.3',
'codeRepository':
'https://repo.maven.apache.org/maven2/com/mycompany/app/my-app',
})
raw_content = b"""
Maven Default Project
4.0.0
com.mycompany.app
my-app
"""
result = self.maven_mapping.translate(raw_content)
self.assertEqual(result, {
'@context': 'https://doi.org/10.5063/schema/codemeta-2.0',
'type': 'SoftwareSourceCode',
'name': 'Maven Default Project',
'identifier': 'com.mycompany.app',
'codeRepository':
'https://repo.maven.apache.org/maven2/com/mycompany/app/my-app',
})
raw_content = b"""
4.0.0
com.mycompany.app
my-app
1.2.3
"""
result = self.maven_mapping.translate(raw_content)
self.assertEqual(result, {
'@context': 'https://doi.org/10.5063/schema/codemeta-2.0',
'type': 'SoftwareSourceCode',
'identifier': 'com.mycompany.app',
'version': '1.2.3',
'codeRepository':
'https://repo.maven.apache.org/maven2/com/mycompany/app/my-app',
})
raw_content = b"""
Maven Default Project
4.0.0
com.mycompany.app
my-app
1.2.3
"""
result = self.maven_mapping.translate(raw_content)
self.assertEqual(result, {
'@context': 'https://doi.org/10.5063/schema/codemeta-2.0',
'type': 'SoftwareSourceCode',
'name': 'Maven Default Project',
'identifier': 'com.mycompany.app',
'version': '1.2.3',
'codeRepository':
'https://repo.maven.apache.org/maven2/com/mycompany/app/my-app',
})
raw_content = b"""
1.2.3
"""
result = self.maven_mapping.translate(raw_content)
self.assertEqual(result, {
'@context': 'https://doi.org/10.5063/schema/codemeta-2.0',
'type': 'SoftwareSourceCode',
'version': '1.2.3',
})
def test_compute_metadata_maven_invalid_licenses(self):
raw_content = b"""
Maven Default Project
4.0.0
com.mycompany.app
my-app
1.2.3
foo
"""
result = self.maven_mapping.translate(raw_content)
self.assertEqual(result, {
'@context': 'https://doi.org/10.5063/schema/codemeta-2.0',
'type': 'SoftwareSourceCode',
'name': 'Maven Default Project',
'identifier': 'com.mycompany.app',
'version': '1.2.3',
'codeRepository':
'https://repo.maven.apache.org/maven2/com/mycompany/app/my-app',
})
def test_compute_metadata_maven_multiple(self):
'''Tests when there are multiple code repos and licenses.'''
raw_content = b"""
Maven Default Project
4.0.0
com.mycompany.app
my-app
1.2.3
central
Maven Repository Switchboard
default
http://repo1.maven.org/maven2
false
example
Example Maven Repo
default
http://example.org/maven2
Apache License, Version 2.0
https://www.apache.org/licenses/LICENSE-2.0.txt
repo
A business-friendly OSS license
MIT license
https://opensource.org/licenses/MIT
"""
result = self.maven_mapping.translate(raw_content)
self.assertEqual(result, {
'@context': 'https://doi.org/10.5063/schema/codemeta-2.0',
'type': 'SoftwareSourceCode',
'name': 'Maven Default Project',
'identifier': 'com.mycompany.app',
'version': '1.2.3',
'license': [
'https://www.apache.org/licenses/LICENSE-2.0.txt',
'https://opensource.org/licenses/MIT',
],
'codeRepository': [
'http://repo1.maven.org/maven2/com/mycompany/app/my-app',
'http://example.org/maven2/com/mycompany/app/my-app',
]
})
def test_compute_metadata_pkginfo(self):
raw_content = (b"""\
Metadata-Version: 2.1
Name: swh.core
Version: 0.0.49
Summary: Software Heritage core utilities
Home-page: https://forge.softwareheritage.org/diffusion/DCORE/
Author: Software Heritage developers
Author-email: swh-devel@inria.fr
License: UNKNOWN
Project-URL: Bug Reports, https://forge.softwareheritage.org/maniphest
Project-URL: Funding, https://www.softwareheritage.org/donate
Project-URL: Source, https://forge.softwareheritage.org/source/swh-core
Description: swh-core
========
\x20
core library for swh's modules:
- config parser
- hash computations
- serialization
- logging mechanism
\x20
Platform: UNKNOWN
Classifier: Programming Language :: Python :: 3
Classifier: Intended Audience :: Developers
Classifier: License :: OSI Approved :: GNU General Public License v3 (GPLv3)
Classifier: Operating System :: OS Independent
Classifier: Development Status :: 5 - Production/Stable
Description-Content-Type: text/markdown
Provides-Extra: testing
""") # noqa
result = self.pkginfo_mapping.translate(raw_content)
self.assertCountEqual(result['description'], [
'Software Heritage core utilities', # note the comma here
'swh-core\n'
'========\n'
'\n'
"core library for swh's modules:\n"
'- config parser\n'
'- hash computations\n'
'- serialization\n'
'- logging mechanism\n'
''],
result)
del result['description']
self.assertEqual(result, {
'@context': 'https://doi.org/10.5063/schema/codemeta-2.0',
'type': 'SoftwareSourceCode',
'url': 'https://forge.softwareheritage.org/diffusion/DCORE/',
'name': 'swh.core',
'author': [{
'type': 'Person',
'name': 'Software Heritage developers',
'email': 'swh-devel@inria.fr',
}],
'version': '0.0.49',
})
def test_compute_metadata_pkginfo_utf8(self):
raw_content = (b'''\
Metadata-Version: 1.1
Name: snowpyt
Description-Content-Type: UNKNOWN
Description: foo
Hydrology N\xc2\xb083
''') # noqa
result = self.pkginfo_mapping.translate(raw_content)
self.assertEqual(result, {
'@context': 'https://doi.org/10.5063/schema/codemeta-2.0',
'type': 'SoftwareSourceCode',
'name': 'snowpyt',
'description': 'foo\nHydrology N°83',
})
def test_compute_metadata_pkginfo_license(self):
raw_content = (b"""\
Metadata-Version: 2.1
Name: foo
License: MIT
""") # noqa
result = self.pkginfo_mapping.translate(raw_content)
self.assertEqual(result, {
'@context': 'https://doi.org/10.5063/schema/codemeta-2.0',
'type': 'SoftwareSourceCode',
'name': 'foo',
'license': 'MIT',
})
def test_gemspec_base(self):
raw_content = b"""
Gem::Specification.new do |s|
s.name = 'example'
s.version = '0.1.0'
s.licenses = ['MIT']
s.summary = "This is an example!"
s.description = "Much longer explanation of the example!"
s.authors = ["Ruby Coder"]
s.email = 'rubycoder@example.com'
s.files = ["lib/example.rb"]
s.homepage = 'https://rubygems.org/gems/example'
s.metadata = { "source_code_uri" => "https://github.com/example/example" }
end"""
result = self.gemspec_mapping.translate(raw_content)
self.assertCountEqual(result.pop('description'), [
"This is an example!",
"Much longer explanation of the example!"
])
self.assertEqual(result, {
'@context': 'https://doi.org/10.5063/schema/codemeta-2.0',
'type': 'SoftwareSourceCode',
'author': ['Ruby Coder'],
'name': 'example',
'license': 'https://spdx.org/licenses/MIT',
'codeRepository': 'https://rubygems.org/gems/example',
'email': 'rubycoder@example.com',
'version': '0.1.0',
})
def test_gemspec_two_author_fields(self):
raw_content = b"""
Gem::Specification.new do |s|
s.authors = ["Ruby Coder1"]
s.author = "Ruby Coder2"
end"""
result = self.gemspec_mapping.translate(raw_content)
self.assertCountEqual(result.pop('author'), [
'Ruby Coder1', 'Ruby Coder2'])
self.assertEqual(result, {
'@context': 'https://doi.org/10.5063/schema/codemeta-2.0',
'type': 'SoftwareSourceCode',
})
def test_gemspec_invalid_author(self):
raw_content = b"""
Gem::Specification.new do |s|
s.author = ["Ruby Coder"]
end"""
result = self.gemspec_mapping.translate(raw_content)
self.assertEqual(result, {
'@context': 'https://doi.org/10.5063/schema/codemeta-2.0',
'type': 'SoftwareSourceCode',
})
raw_content = b"""
Gem::Specification.new do |s|
s.author = "Ruby Coder1",
end"""
result = self.gemspec_mapping.translate(raw_content)
self.assertEqual(result, {
'@context': 'https://doi.org/10.5063/schema/codemeta-2.0',
'type': 'SoftwareSourceCode',
})
raw_content = b"""
Gem::Specification.new do |s|
s.authors = ["Ruby Coder1", ["Ruby Coder2"]]
end"""
result = self.gemspec_mapping.translate(raw_content)
self.assertEqual(result, {
'@context': 'https://doi.org/10.5063/schema/codemeta-2.0',
'type': 'SoftwareSourceCode',
'author': ['Ruby Coder1'],
})
def test_gemspec_alternative_header(self):
raw_content = b"""
require './lib/version'
Gem::Specification.new { |s|
s.name = 'rb-system-with-aliases'
s.summary = 'execute system commands with aliases'
}
"""
result = self.gemspec_mapping.translate(raw_content)
self.assertEqual(result, {
'@context': 'https://doi.org/10.5063/schema/codemeta-2.0',
'type': 'SoftwareSourceCode',
'name': 'rb-system-with-aliases',
'description': 'execute system commands with aliases',
})
+ @given(json_document_strategy(
+ keys=list(MAPPINGS['NpmMapping'].mapping)))
+ def test_npm_adversarial(self, doc):
+ raw = json.dumps(doc).encode()
+ self.npm_mapping.translate(raw)
+
+ @given(json_document_strategy(keys=CODEMETA_KEYS))
+ def test_codemeta_adversarial(self, doc):
+ raw = json.dumps(doc).encode()
+ self.codemeta_mapping.translate(raw)
+
+ @given(json_document_strategy(
+ keys=list(MAPPINGS['MavenMapping'].mapping)))
+ def test_maven_adversarial(self, doc):
+ raw = xmltodict.unparse({'project': doc}, pretty=True)
+ self.maven_mapping.translate(raw)
+
+ @given(strategies.dictionaries(
+ # keys
+ strategies.one_of(
+ strategies.characters(),
+ *map(strategies.just, MAPPINGS['GemspecMapping'].mapping)
+ ),
+ # values
+ strategies.recursive(
+ strategies.characters(),
+ lambda children: strategies.lists(children, 1)
+ )
+ ))
+ def test_gemspec_adversarial(self, doc):
+ parts = ['Gem::Specification.new do |s|\n']
+ for (k, v) in doc.items():
+ parts.append(' s.{} = {}\n'.format(k, repr(v)))
+ parts.append('end\n')
+ self.maven_mapping.translate(''.join(parts))
+
def test_revision_metadata_indexer(self):
metadata_indexer = RevisionMetadataIndexer(
config=REVISION_METADATA_CONFIG)
fill_obj_storage(metadata_indexer.objstorage)
fill_storage(metadata_indexer.storage)
tool = metadata_indexer.idx_storage.indexer_configuration_get(
{'tool_'+k: v for (k, v) in TRANSLATOR_TOOL.items()})
assert tool is not None
metadata_indexer.idx_storage.content_metadata_add([{
'indexer_configuration_id': tool['id'],
'id': b'cde',
'translated_metadata': YARN_PARSER_METADATA,
}])
sha1_gits = [
hash_to_bytes('8dbb6aeb036e7fd80664eb8bfd1507881af1ba9f'),
]
metadata_indexer.run(sha1_gits, 'update-dups')
results = list(metadata_indexer.idx_storage.revision_metadata_get(
sha1_gits))
expected_results = [{
'id': hash_to_bytes('8dbb6aeb036e7fd80664eb8bfd1507881af1ba9f'),
'tool': TRANSLATOR_TOOL,
'translated_metadata': YARN_PARSER_METADATA,
'mappings': ['npm'],
}]
for result in results:
del result['tool']['id']
# then
self.assertEqual(expected_results, results)
diff --git a/swh/indexer/tests/utils.py b/swh/indexer/tests/utils.py
index e31e3f9..c19bb7e 100644
--- a/swh/indexer/tests/utils.py
+++ b/swh/indexer/tests/utils.py
@@ -1,631 +1,666 @@
# Copyright (C) 2017-2018 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import abc
import datetime
import hashlib
import random
+from hypothesis import strategies
+
from swh.model import hashutil
from swh.model.hashutil import hash_to_bytes, hash_to_hex
from swh.indexer.storage import INDEXER_CFG_KEY
BASE_TEST_CONFIG = {
'storage': {
'cls': 'memory',
'args': {
},
},
'objstorage': {
'cls': 'memory',
'args': {
},
},
INDEXER_CFG_KEY: {
'cls': 'memory',
'args': {
},
},
}
ORIGINS = [
{
'id': 52189575,
'lister': None,
'project': None,
'type': 'git',
'url': 'https://github.com/SoftwareHeritage/swh-storage'},
{
'id': 4423668,
'lister': None,
'project': None,
'type': 'ftp',
'url': 'rsync://ftp.gnu.org/gnu/3dldf'},
{
'id': 77775770,
'lister': None,
'project': None,
'type': 'deposit',
'url': 'https://forge.softwareheritage.org/source/jesuisgpl/'},
{
'id': 85072327,
'lister': None,
'project': None,
'type': 'pypi',
'url': 'https://pypi.org/project/limnoria/'},
{
'id': 49908349,
'lister': None,
'project': None,
'type': 'svn',
'url': 'http://0-512-md.googlecode.com/svn/'},
{
'id': 54974445,
'lister': None,
'project': None,
'type': 'git',
'url': 'https://github.com/librariesio/yarn-parser'},
]
SNAPSHOTS = {
52189575: {
'branches': {
b'refs/heads/add-revision-origin-cache': {
'target': b'L[\xce\x1c\x88\x8eF\t\xf1"\x19\x1e\xfb\xc0'
b's\xe7/\xe9l\x1e',
'target_type': 'revision'},
b'HEAD': {
'target': b'8K\x12\x00d\x03\xcc\xe4]bS\xe3\x8f{\xd7}'
b'\xac\xefrm',
'target_type': 'revision'},
b'refs/tags/v0.0.103': {
'target': b'\xb6"Im{\xfdLb\xb0\x94N\xea\x96m\x13x\x88+'
b'\x0f\xdd',
'target_type': 'release'},
}},
4423668: {
'branches': {
b'3DLDF-1.1.4.tar.gz': {
'target': b'dJ\xfb\x1c\x91\xf4\x82B%]6\xa2\x90|\xd3\xfc'
b'"G\x99\x11',
'target_type': 'revision'},
b'3DLDF-2.0.2.tar.gz': {
'target': b'\xb6\x0e\xe7\x9e9\xac\xaa\x19\x9e='
b'\xd1\xc5\x00\\\xc6\xfc\xe0\xa6\xb4V',
'target_type': 'revision'},
b'3DLDF-2.0.3-examples.tar.gz': {
'target': b'!H\x19\xc0\xee\x82-\x12F1\xbd\x97'
b'\xfe\xadZ\x80\x80\xc1\x83\xff',
'target_type': 'revision'},
b'3DLDF-2.0.3.tar.gz': {
'target': b'\x8e\xa9\x8e/\xea}\x9feF\xf4\x9f\xfd\xee'
b'\xcc\x1a\xb4`\x8c\x8by',
'target_type': 'revision'},
b'3DLDF-2.0.tar.gz': {
'target': b'F6*\xff(?\x19a\xef\xb6\xc2\x1fv$S\xe3G'
b'\xd3\xd1m',
b'target_type': 'revision'}
}},
77775770: {
'branches': {
b'master': {
'target': b'\xe7n\xa4\x9c\x9f\xfb\xb7\xf76\x11\x08{'
b'\xa6\xe9\x99\xb1\x9e]q\xeb',
'target_type': 'revision'}
},
'id': b"h\xc0\xd2a\x04\xd4~'\x8d\xd6\xbe\x07\xeda\xfa\xfbV"
b"\x1d\r "},
85072327: {
'branches': {
b'HEAD': {
'target': b'releases/2018.09.09',
'target_type': 'alias'},
b'releases/2018.09.01': {
'target': b'<\xee1(\xe8\x8d_\xc1\xc9\xa6rT\xf1\x1d'
b'\xbb\xdfF\xfdw\xcf',
'target_type': 'revision'},
b'releases/2018.09.09': {
'target': b'\x83\xb9\xb6\xc7\x05\xb1%\xd0\xfem\xd8k'
b'A\x10\x9d\xc5\xfa2\xf8t',
'target_type': 'revision'}},
'id': b'{\xda\x8e\x84\x7fX\xff\x92\x80^\x93V\x18\xa3\xfay'
b'\x12\x9e\xd6\xb3'},
49908349: {
'branches': {
b'master': {
'target': b'\xe4?r\xe1,\x88\xab\xec\xe7\x9a\x87\xb8'
b'\xc9\xad#.\x1bw=\x18',
'target_type': 'revision'}},
'id': b'\xa1\xa2\x8c\n\xb3\x87\xa8\xf9\xe0a\x8c\xb7'
b'\x05\xea\xb8\x1f\xc4H\xf4s'},
54974445: {
'branches': {
b'HEAD': {
'target': hash_to_bytes(
'8dbb6aeb036e7fd80664eb8bfd1507881af1ba9f'),
'target_type': 'revision'}}}
}
REVISIONS = [{
'id': hash_to_bytes('8dbb6aeb036e7fd80664eb8bfd1507881af1ba9f'),
'author': {
'id': 26,
'name': b'Andrew Nesbitt',
'fullname': b'Andrew Nesbitt ',
'email': b'andrewnez@gmail.com'
},
'committer': {
'id': 26,
'name': b'Andrew Nesbitt',
'fullname': b'Andrew Nesbitt ',
'email': b'andrewnez@gmail.com'
},
'synthetic': False,
'date': {
'negative_utc': False,
'timestamp': {
'seconds': 1487596456,
'microseconds': 0
},
'offset': 0
},
'directory': b'10'
}]
DIRECTORY_ID = b'10'
DIRECTORY = [{
'sha1_git': b'abc',
'name': b'index.js',
'target': b'abc',
'length': 897,
'status': 'visible',
'type': 'file',
'perms': 33188,
'sha1': b'bcd'
},
{
'sha1_git': b'aab',
'name': b'package.json',
'target': b'aab',
'length': 712,
'status': 'visible',
'type': 'file',
'perms': 33188,
'sha1': b'cde'
},
{
'target': b'11',
'type': 'dir',
'length': None,
'name': b'.github',
'sha1': None,
'perms': 16384,
'sha1_git': None,
'status': None,
'sha256': None
}
]
SHA1_TO_LICENSES = {
'01c9379dfc33803963d07c1ccc748d3fe4c96bb5': ['GPL'],
'02fb2c89e14f7fab46701478c83779c7beb7b069': ['Apache2.0'],
'103bc087db1d26afc3a0283f38663d081e9b01e6': ['MIT'],
'688a5ef812c53907562fe379d4b3851e69c7cb15': ['AGPL'],
'da39a3ee5e6b4b0d3255bfef95601890afd80709': [],
}
SHA1_TO_CTAGS = {
'01c9379dfc33803963d07c1ccc748d3fe4c96bb5': [{
'name': 'foo',
'kind': 'str',
'line': 10,
'lang': 'bar',
}],
'd4c647f0fc257591cc9ba1722484229780d1c607': [{
'name': 'let',
'kind': 'int',
'line': 100,
'lang': 'haskell',
}],
'688a5ef812c53907562fe379d4b3851e69c7cb15': [{
'name': 'symbol',
'kind': 'float',
'line': 99,
'lang': 'python',
}],
}
OBJ_STORAGE_DATA = {
'01c9379dfc33803963d07c1ccc748d3fe4c96bb5': b'this is some text',
'688a5ef812c53907562fe379d4b3851e69c7cb15': b'another text',
'8986af901dd2043044ce8f0d8fc039153641cf17': b'yet another text',
'02fb2c89e14f7fab46701478c83779c7beb7b069': b"""
import unittest
import logging
from swh.indexer.mimetype import MimetypeIndexer
from swh.indexer.tests.test_utils import MockObjStorage
class MockStorage():
def content_mimetype_add(self, mimetypes):
self.state = mimetypes
self.conflict_update = conflict_update
def indexer_configuration_add(self, tools):
return [{
'id': 10,
}]
""",
'103bc087db1d26afc3a0283f38663d081e9b01e6': b"""
#ifndef __AVL__
#define __AVL__
typedef struct _avl_tree avl_tree;
typedef struct _data_t {
int content;
} data_t;
""",
'93666f74f1cf635c8c8ac118879da6ec5623c410': b"""
(should 'pygments (recognize 'lisp 'easily))
""",
'26a9f72a7c87cc9205725cfd879f514ff4f3d8d5': b"""
{
"name": "test_metadata",
"version": "0.0.1",
"description": "Simple package.json test for indexer",
"repository": {
"type": "git",
"url": "https://github.com/moranegg/metadata_test"
}
}
""",
'd4c647f0fc257591cc9ba1722484229780d1c607': b"""
{
"version": "5.0.3",
"name": "npm",
"description": "a package manager for JavaScript",
"keywords": [
"install",
"modules",
"package manager",
"package.json"
],
"preferGlobal": true,
"config": {
"publishtest": false
},
"homepage": "https://docs.npmjs.com/",
"author": "Isaac Z. Schlueter (http://blog.izs.me)",
"repository": {
"type": "git",
"url": "https://github.com/npm/npm"
},
"bugs": {
"url": "https://github.com/npm/npm/issues"
},
"dependencies": {
"JSONStream": "~1.3.1",
"abbrev": "~1.1.0",
"ansi-regex": "~2.1.1",
"ansicolors": "~0.3.2",
"ansistyles": "~0.1.3"
},
"devDependencies": {
"tacks": "~1.2.6",
"tap": "~10.3.2"
},
"license": "Artistic-2.0"
}
""",
'a7ab314d8a11d2c93e3dcf528ca294e7b431c449': b"""
""",
'da39a3ee5e6b4b0d3255bfef95601890afd80709': b'',
'636465': b"""
{
"name": "yarn-parser",
"version": "1.0.0",
"description": "Tiny web service for parsing yarn.lock files",
"main": "index.js",
"scripts": {
"start": "node index.js",
"test": "mocha"
},
"engines": {
"node": "9.8.0"
},
"repository": {
"type": "git",
"url": "git+https://github.com/librariesio/yarn-parser.git"
},
"keywords": [
"yarn",
"parse",
"lock",
"dependencies"
],
"author": "Andrew Nesbitt",
"license": "AGPL-3.0",
"bugs": {
"url": "https://github.com/librariesio/yarn-parser/issues"
},
"homepage": "https://github.com/librariesio/yarn-parser#readme",
"dependencies": {
"@yarnpkg/lockfile": "^1.0.0",
"body-parser": "^1.15.2",
"express": "^4.14.0"
},
"devDependencies": {
"chai": "^4.1.2",
"mocha": "^5.2.0",
"request": "^2.87.0",
"test": "^0.6.0"
}
}
"""
}
YARN_PARSER_METADATA = {
'@context': 'https://doi.org/10.5063/schema/codemeta-2.0',
'url':
'https://github.com/librariesio/yarn-parser#readme',
'codeRepository':
'git+git+https://github.com/librariesio/yarn-parser.git',
'author': [{
'type': 'Person',
'name': 'Andrew Nesbitt'
}],
'license': 'https://spdx.org/licenses/AGPL-3.0',
'version': '1.0.0',
'description':
'Tiny web service for parsing yarn.lock files',
'issueTracker':
'https://github.com/librariesio/yarn-parser/issues',
'name': 'yarn-parser',
'keywords': ['yarn', 'parse', 'lock', 'dependencies'],
}
+json_dict_keys = strategies.one_of(
+ strategies.characters(),
+ *map(strategies.just, ['type', 'url', 'name', 'email', '@id',
+ '@context', 'repository', 'license',
+ ]),
+)
+"""Hypothesis strategy that generates strings, with an emphasis on those
+that are often used as dictionary keys in metadata files."""
+
+
+generic_json_document = strategies.recursive(
+ strategies.none() | strategies.booleans() | strategies.floats() |
+ strategies.characters(),
+ lambda children: (
+ strategies.lists(children, 1) |
+ strategies.dictionaries(json_dict_keys, children, min_size=1)
+ )
+)
+"""Hypothesis strategy that generates possible values for values of JSON
+metadata files."""
+
+
+def json_document_strategy(keys=None):
+ """Generates an hypothesis strategy that generates metadata files
+ for a format that uses the given keys."""
+ if keys is None:
+ keys = strategies.characters()
+ else:
+ keys = strategies.one_of(map(strategies.just, keys))
+
+ return strategies.dictionaries(keys, generic_json_document, min_size=1)
+
+
def filter_dict(d, keys):
'return a copy of the dict with keys deleted'
if not isinstance(keys, (list, tuple)):
keys = (keys, )
return dict((k, v) for (k, v) in d.items() if k not in keys)
def fill_obj_storage(obj_storage):
"""Add some content in an object storage."""
for (obj_id, content) in OBJ_STORAGE_DATA.items():
obj_storage.add(content, obj_id=hash_to_bytes(obj_id))
def fill_storage(storage):
for origin in ORIGINS:
origin = origin.copy()
del origin['id']
storage.origin_add_one(origin)
for (orig_pseudo_id, snap) in SNAPSHOTS.items():
for orig in ORIGINS:
if orig_pseudo_id == orig['id']:
origin_id = storage.origin_get(
{'type': orig['type'], 'url': orig['url']})['id']
break
else:
assert False
visit = storage.origin_visit_add(origin_id, datetime.datetime.now())
snap_id = snap.get('id') or \
bytes([random.randint(0, 255) for _ in range(32)])
storage.snapshot_add(origin_id, visit['visit'], {
'id': snap_id,
'branches': snap['branches']
})
storage.revision_add(REVISIONS)
storage.directory_add([{
'id': DIRECTORY_ID,
'entries': DIRECTORY,
}])
for (obj_id, content) in OBJ_STORAGE_DATA.items():
# TODO: use MultiHash
if hasattr(hashlib, 'blake2s'):
blake2s256 = hashlib.blake2s(content, digest_size=32).digest()
else:
# fallback for Python <3.6
blake2s256 = bytes([random.randint(0, 255) for _ in range(32)])
storage.content_add([{
'data': content,
'length': len(content),
'status': 'visible',
'sha1': hash_to_bytes(obj_id),
'sha1_git': hash_to_bytes(obj_id),
'sha256': hashlib.sha256(content).digest(),
'blake2s256': blake2s256
}])
class CommonContentIndexerTest(metaclass=abc.ABCMeta):
legacy_get_format = False
"""True if and only if the tested indexer uses the legacy format.
see: https://forge.softwareheritage.org/T1433
"""
def get_indexer_results(self, ids):
"""Override this for indexers that don't have a mock storage."""
return self.indexer.idx_storage.state
def assert_legacy_results_ok(self, sha1s, expected_results=None):
# XXX old format, remove this when all endpoints are
# updated to the new one
# see: https://forge.softwareheritage.org/T1433
sha1s = [sha1 if isinstance(sha1, bytes) else hash_to_bytes(sha1)
for sha1 in sha1s]
actual_results = list(self.get_indexer_results(sha1s))
if expected_results is None:
expected_results = self.expected_results
self.assertEqual(len(expected_results), len(actual_results),
(expected_results, actual_results))
for indexed_data in actual_results:
_id = indexed_data['id']
expected_data = expected_results[hashutil.hash_to_hex(_id)].copy()
expected_data['id'] = _id
self.assertEqual(indexed_data, expected_data)
def assert_results_ok(self, sha1s, expected_results=None):
if self.legacy_get_format:
self.assert_legacy_results_ok(sha1s, expected_results)
return
sha1s = [sha1 if isinstance(sha1, bytes) else hash_to_bytes(sha1)
for sha1 in sha1s]
actual_results = list(self.get_indexer_results(sha1s))
if expected_results is None:
expected_results = self.expected_results
self.assertEqual(len(expected_results), len(actual_results),
(expected_results, actual_results))
for indexed_data in actual_results:
(_id, indexed_data) = list(indexed_data.items())[0]
expected_data = expected_results[hashutil.hash_to_hex(_id)].copy()
expected_data = [expected_data]
self.assertEqual(indexed_data, expected_data)
def test_index(self):
"""Known sha1 have their data indexed
"""
sha1s = [self.id0, self.id1, self.id2]
# when
self.indexer.run(sha1s, policy_update='update-dups')
self.assert_results_ok(sha1s)
# 2nd pass
self.indexer.run(sha1s, policy_update='ignore-dups')
self.assert_results_ok(sha1s)
def test_index_one_unknown_sha1(self):
"""Unknown sha1 are not indexed"""
sha1s = [self.id1,
'799a5ef812c53907562fe379d4b3851e69c7cb15', # unknown
'800a5ef812c53907562fe379d4b3851e69c7cb15'] # unknown
# when
self.indexer.run(sha1s, policy_update='update-dups')
# then
expected_results = {
k: v for k, v in self.expected_results.items() if k in sha1s
}
self.assert_results_ok(sha1s, expected_results)
class CommonContentIndexerRangeTest:
"""Allows to factorize tests on range indexer.
"""
def setUp(self):
self.contents = sorted(OBJ_STORAGE_DATA)
def assert_results_ok(self, start, end, actual_results,
expected_results=None):
if expected_results is None:
expected_results = self.expected_results
actual_results = list(actual_results)
for indexed_data in actual_results:
_id = indexed_data['id']
assert isinstance(_id, bytes)
indexed_data = indexed_data.copy()
indexed_data['id'] = hash_to_hex(indexed_data['id'])
self.assertEqual(indexed_data, expected_results[hash_to_hex(_id)])
self.assertTrue(start <= _id <= end)
_tool_id = indexed_data['indexer_configuration_id']
self.assertEqual(_tool_id, self.indexer.tool['id'])
def test__index_contents(self):
"""Indexing contents without existing data results in indexed data
"""
_start, _end = [self.contents[0], self.contents[2]] # output hex ids
start, end = map(hashutil.hash_to_bytes, (_start, _end))
# given
actual_results = list(self.indexer._index_contents(
start, end, indexed={}))
self.assert_results_ok(start, end, actual_results)
def test__index_contents_with_indexed_data(self):
"""Indexing contents with existing data results in less indexed data
"""
_start, _end = [self.contents[0], self.contents[2]] # output hex ids
start, end = map(hashutil.hash_to_bytes, (_start, _end))
data_indexed = [self.id0, self.id2]
# given
actual_results = self.indexer._index_contents(
start, end, indexed=set(map(hash_to_bytes, data_indexed)))
# craft the expected results
expected_results = self.expected_results.copy()
for already_indexed_key in data_indexed:
expected_results.pop(already_indexed_key)
self.assert_results_ok(
start, end, actual_results, expected_results)
def test_generate_content_get(self):
"""Optimal indexing should result in indexed data
"""
_start, _end = [self.contents[0], self.contents[2]] # output hex ids
start, end = map(hashutil.hash_to_bytes, (_start, _end))
# given
actual_results = self.indexer.run(start, end)
# then
self.assertTrue(actual_results)
def test_generate_content_get_input_as_bytes(self):
"""Optimal indexing should result in indexed data
Input are in bytes here.
"""
_start, _end = [self.contents[0], self.contents[2]] # output hex ids
start, end = map(hashutil.hash_to_bytes, (_start, _end))
# given
actual_results = self.indexer.run( # checks the bytes input this time
start, end, skip_existing=False)
# no already indexed data so same result as prior test
# then
self.assertTrue(actual_results)
def test_generate_content_get_no_result(self):
"""No result indexed returns False"""
_start, _end = ['0000000000000000000000000000000000000000',
'0000000000000000000000000000000000000001']
start, end = map(hashutil.hash_to_bytes, (_start, _end))
# given
actual_results = self.indexer.run(
start, end, incremental=False)
# then
self.assertFalse(actual_results)