diff --git a/swh/indexer/mimetype.py b/swh/indexer/mimetype.py index cce9eef..7d47c23 100644 --- a/swh/indexer/mimetype.py +++ b/swh/indexer/mimetype.py @@ -1,154 +1,157 @@ # Copyright (C) 2016-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from typing import Optional, Dict, Any, List import magic from .indexer import ContentIndexer, ContentRangeIndexer if not hasattr(magic.Magic, 'from_buffer'): raise ImportError( 'Expected "import magic" to import python-magic, but file_magic ' 'was imported instead.') def compute_mimetype_encoding(raw_content: bytes) -> Dict[str, bytes]: """Determine mimetype and encoding from the raw content. Args: raw_content: content's raw data Returns: dict: mimetype and encoding key and corresponding values. """ m = magic.Magic(mime=True, mime_encoding=True) res = m.from_buffer(raw_content) - (mimetype, encoding) = res.split('; charset=') + try: + mimetype, encoding = res.split('; charset=') + except ValueError: + mimetype, encoding = res, '' return { 'mimetype': mimetype, 'encoding': encoding, } class MixinMimetypeIndexer: """Mixin mimetype indexer. See :class:`MimetypeIndexer` and :class:`MimetypeRangeIndexer` """ tool: Dict[str, Any] idx_storage: Any ADDITIONAL_CONFIG = { 'tools': ('dict', { 'name': 'file', 'version': '1:5.30-1+deb9u1', 'configuration': { "type": "library", "debian-package": "python3-magic" }, }), 'write_batch_size': ('int', 1000), } CONFIG_BASE_FILENAME = 'indexer/mimetype' # type: Optional[str] def index(self, id: bytes, data: Optional[bytes] = None, **kwargs) -> Dict[str, Any]: """Index sha1s' content and store result. Args: id: content's identifier data: raw content in bytes Returns: dict: content's mimetype; dict keys being - id: content's identifier (sha1) - mimetype: mimetype in bytes - encoding: encoding in bytes """ assert data is not None properties = compute_mimetype_encoding(data) properties.update({ 'id': id, 'indexer_configuration_id': self.tool['id'], }) return properties def persist_index_computations( self, results: List[Dict], policy_update: str ) -> Dict[str, int]: """Persist the results in storage. Args: results: list of content's mimetype dicts (see :meth:`.index`) policy_update: either 'update-dups' or 'ignore-dups' to respectively update duplicates or ignore them """ return self.idx_storage.content_mimetype_add( results, conflict_update=(policy_update == 'update-dups')) class MimetypeIndexer(MixinMimetypeIndexer, ContentIndexer): """Mimetype Indexer working on list of content identifiers. It: - (optionally) filters out content already indexed (cf. :meth:`.filter`) - reads content from objstorage per the content's id (sha1) - computes {mimetype, encoding} from that content - stores result in storage """ def filter(self, ids): """Filter out known sha1s and return only missing ones. """ yield from self.idx_storage.content_mimetype_missing(( { 'id': sha1, 'indexer_configuration_id': self.tool['id'], } for sha1 in ids )) class MimetypeRangeIndexer(MixinMimetypeIndexer, ContentRangeIndexer): """Mimetype Range Indexer working on range of content identifiers. It: - (optionally) filters out content already indexed (cf :meth:`.indexed_contents_in_range`) - reads content from objstorage per the content's id (sha1) - computes {mimetype, encoding} from that content - stores result in storage """ def indexed_contents_in_range( self, start: bytes, end: bytes ) -> Dict[str, Optional[bytes]]: """Retrieve indexed content id within range [start, end]. Args: start: Starting bound from range identifier end: End range identifier Returns: dict: a dict with keys: - ids: iterable of content ids within the range. - next: The next range of sha1 starts at this sha1 if any """ return self.idx_storage.content_mimetype_get_range( start, end, self.tool['id']) diff --git a/swh/indexer/tests/test_mimetype.py b/swh/indexer/tests/test_mimetype.py index cf39bc3..91895c9 100644 --- a/swh/indexer/tests/test_mimetype.py +++ b/swh/indexer/tests/test_mimetype.py @@ -1,147 +1,148 @@ -# Copyright (C) 2017-2018 The Software Heritage developers +# Copyright (C) 2017-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import pytest import unittest from typing import Any, Dict from swh.indexer.mimetype import ( MimetypeIndexer, MimetypeRangeIndexer, compute_mimetype_encoding ) from swh.indexer.tests.utils import ( CommonContentIndexerTest, CommonContentIndexerRangeTest, BASE_TEST_CONFIG, fill_storage, fill_obj_storage, filter_dict, ) -class BasicTest(unittest.TestCase): - def test_compute_mimetype_encoding(self): - """Compute mimetype encoding should return results""" - for _input, _mimetype, _encoding in [ - ('du français'.encode(), 'text/plain', 'utf-8'), - (b'def __init__(self):', 'text/x-python', 'us-ascii')]: - - actual_result = compute_mimetype_encoding(_input) - self.assertEqual(actual_result, { - 'mimetype': _mimetype, - 'encoding': _encoding - }) +def test_compute_mimetype_encoding(): + """Compute mimetype encoding should return results""" + for _input, _mimetype, _encoding in [ + ('du français'.encode(), 'text/plain', 'utf-8'), + (b'def __init__(self):', 'text/x-python', 'us-ascii'), + (b'\xff\xfe\x00\x00\x00\x00\xff\xfe\xff\xff', + 'application/octet-stream', '') + ]: + actual_result = compute_mimetype_encoding(_input) + assert actual_result == { + 'mimetype': _mimetype, + 'encoding': _encoding + } CONFIG = { **BASE_TEST_CONFIG, 'tools': { 'name': 'file', 'version': '1:5.30-1+deb9u1', 'configuration': { "type": "library", "debian-package": "python3-magic" }, }, } # type: Dict[str, Any] class TestMimetypeIndexer(CommonContentIndexerTest, unittest.TestCase): """Mimetype indexer test scenarios: - Known sha1s in the input list have their data indexed - Unknown sha1 in the input list are not indexed """ legacy_get_format = True def get_indexer_results(self, ids): yield from self.idx_storage.content_mimetype_get(ids) def setUp(self): self.indexer = MimetypeIndexer(config=CONFIG) self.indexer.catch_exceptions = False self.idx_storage = self.indexer.idx_storage fill_storage(self.indexer.storage) fill_obj_storage(self.indexer.objstorage) self.id0 = '01c9379dfc33803963d07c1ccc748d3fe4c96bb5' self.id1 = '688a5ef812c53907562fe379d4b3851e69c7cb15' self.id2 = 'da39a3ee5e6b4b0d3255bfef95601890afd80709' tool = {k.replace('tool_', ''): v for (k, v) in self.indexer.tool.items()} self.expected_results = { self.id0: { 'id': self.id0, 'tool': tool, 'mimetype': 'text/plain', 'encoding': 'us-ascii', }, self.id1: { 'id': self.id1, 'tool': tool, 'mimetype': 'text/plain', 'encoding': 'us-ascii', }, self.id2: { 'id': self.id2, 'tool': tool, 'mimetype': 'application/x-empty', 'encoding': 'binary', } } RANGE_CONFIG = dict(list(CONFIG.items()) + [('write_batch_size', 100)]) class TestMimetypeRangeIndexer( CommonContentIndexerRangeTest, unittest.TestCase): """Range Mimetype Indexer tests. - new data within range are indexed - no data outside a range are indexed - with filtering existing indexed data prior to compute new index - without filtering existing indexed data prior to compute new index """ def setUp(self): super().setUp() self.indexer = MimetypeRangeIndexer(config=RANGE_CONFIG) self.indexer.catch_exceptions = False fill_storage(self.indexer.storage) fill_obj_storage(self.indexer.objstorage) self.id0 = '01c9379dfc33803963d07c1ccc748d3fe4c96bb5' self.id1 = '02fb2c89e14f7fab46701478c83779c7beb7b069' self.id2 = '103bc087db1d26afc3a0283f38663d081e9b01e6' tool_id = self.indexer.tool['id'] self.expected_results = { self.id0: { 'encoding': 'us-ascii', 'id': self.id0, 'indexer_configuration_id': tool_id, 'mimetype': 'text/plain'}, self.id1: { 'encoding': 'us-ascii', 'id': self.id1, 'indexer_configuration_id': tool_id, 'mimetype': 'text/x-python'}, self.id2: { 'encoding': 'us-ascii', 'id': self.id2, 'indexer_configuration_id': tool_id, 'mimetype': 'text/plain'} } def test_mimetype_w_no_tool(): with pytest.raises(ValueError): MimetypeIndexer(config=filter_dict(CONFIG, 'tools')) def test_mimetype_range_w_no_tool(): with pytest.raises(ValueError): MimetypeRangeIndexer(config=filter_dict(CONFIG, 'tools'))