diff --git a/swh/web/ui/converters.py b/swh/web/ui/converters.py index 82e5d3d7..cd463fc8 100644 --- a/swh/web/ui/converters.py +++ b/swh/web/ui/converters.py @@ -1,262 +1,267 @@ # Copyright (C) 2015-2016 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information import datetime from swh.core import hashutil from swh.core.utils import decode_with_escape from swh.web.ui import utils def from_swh(dict_swh, hashess={}, bytess={}, dates={}, blacklist={}, - convert={}, convert_fn=lambda x: x): + removables_if_empty={}, convert={}, convert_fn=lambda x: x): """Convert from an swh dictionary to something reasonably json serializable. Args: - dict_swh: the origin dictionary needed to be transformed - hashess: list/set of keys representing hashes values (sha1, sha256, sha1_git, etc...) as bytes. Those need to be transformed in hexadecimal string - bytess: list/set of keys representing bytes values which needs to be decoded - blacklist: set of keys to filter out from the conversion - convert: set of keys whose associated values need to be converted using convert_fn - convert_fn: the conversion function to apply on the value of key in 'convert' The remaining keys are copied as is in the output. Returns: dictionary equivalent as dict_swh only with its keys `converted`. """ def convert_hashes_bytes(v): """v is supposedly a hash as bytes, returns it converted in hex. """ if v and isinstance(v, bytes): return hashutil.hash_to_hex(v) return v def convert_bytes(v): """v is supposedly a bytes string, decode as utf-8. FIXME: Improve decoding policy. If not utf-8, break! """ if v and isinstance(v, bytes): return v.decode('utf-8') return v def convert_date(v): """v is a dict with three keys: timestamp offset negative_utc We convert it to a human-readable string """ tz = datetime.timezone(datetime.timedelta(minutes=v['offset'])) date = datetime.datetime.fromtimestamp(v['timestamp'], tz=tz) datestr = date.isoformat() if v['offset'] == 0 and v['negative_utc']: # remove the rightmost + and replace it with a - return '-'.join(datestr.rsplit('+', 1)) return datestr if not dict_swh: return dict_swh new_dict = {} for key, value in dict_swh.items(): if key in blacklist: continue elif key in dates: new_dict[key] = convert_date(value) elif isinstance(value, dict): - new_dict[key] = from_swh(value, hashess, bytess, dates, blacklist, - convert, convert_fn) + new_dict[key] = from_swh(value, + hashess=hashess, bytess=bytess, + dates=dates, blacklist=blacklist, + removables_if_empty=removables_if_empty, + convert=convert, + convert_fn=convert_fn) elif key in hashess: new_dict[key] = utils.fmap(convert_hashes_bytes, value) elif key in bytess: try: new_dict[key] = utils.fmap(convert_bytes, value) except UnicodeDecodeError: if 'decoding_failures' not in new_dict: new_dict['decoding_failures'] = [key] else: new_dict['decoding_failures'].append(key) new_dict[key] = utils.fmap(decode_with_escape, value) elif key in convert: new_dict[key] = convert_fn(value) + elif key in removables_if_empty and not value: + continue else: new_dict[key] = value return new_dict def from_provenance(provenance): """Convert from a provenance information to a provenance dictionary. Args: provenance: Dictionary with the following keys: content (sha1_git) : the content's identifier revision (sha1_git) : the revision the content was seen origin (int) : the origin the content was seen visit (int) : the visit it occurred path (bytes) : the path the content was seen at """ return from_swh(provenance, hashess={'content', 'revision'}, bytess={'path'}) def from_origin(origin): """Convert from an SWH origin to an origin dictionary. """ return from_swh(origin, - hashess={'revision'}, - bytess={'path'}) + removables_if_empty={'lister', 'project'}) def from_release(release): """Convert from an SWH release to a json serializable release dictionary. Args: release: Dict with the following keys - id: identifier of the revision (sha1 in bytes) - revision: identifier of the revision the release points to (sha1 in bytes) - comment: release's comment message (bytes) - name: release's name (string) - author: release's author identifier (swh's id) - synthetic: the synthetic property (boolean) Returns: Release dictionary with the following keys: - id: hexadecimal sha1 (string) - revision: hexadecimal sha1 (string) - comment: release's comment message (string) - name: release's name (string) - author: release's author identifier (swh's id) - synthetic: the synthetic property (boolean) """ return from_swh( release, hashess={'id', 'target'}, bytess={'message', 'name', 'fullname', 'email'}, dates={'date'}, ) def from_revision(revision): """Convert from an SWH revision to a json serializable revision dictionary. Args: revision: Dict with the following keys - id: identifier of the revision (sha1 in bytes) - directory: identifier of the directory the revision points to (sha1 in bytes) - author_name, author_email: author's revision name and email - committer_name, committer_email: committer's revision name and email - message: revision's message - date, date_offset: revision's author date - committer_date, committer_date_offset: revision's commit date - parents: list of parents for such revision - synthetic: revision's property nature - type: revision's type (git, tar or dsc at the moment) - metadata: if the revision is synthetic, this can reference dynamic properties. Returns: Revision dictionary with the same keys as inputs, only: - sha1s are in hexadecimal strings (id, directory) - bytes are decoded in string (author_name, committer_name, author_email, committer_email) - remaining keys are left as is """ revision = from_swh(revision, hashess={'id', 'directory', 'parents', 'children'}, bytess={'name', 'fullname', 'email'}, dates={'date', 'committer_date'}) if revision: if 'parents' in revision: revision['merge'] = len(revision['parents']) > 1 if 'message' in revision: try: revision['message'] = revision['message'].decode('utf-8') except UnicodeDecodeError: revision['message_decoding_failed'] = True revision['message'] = None return revision def from_content(content): """Convert swh content to serializable content dictionary. """ return from_swh(content, hashess={'sha1', 'sha1_git', 'sha256'}, blacklist={'ctime'}, convert={'status'}, convert_fn=lambda v: 'absent' if v == 'hidden' else v) def from_person(person): """Convert swh person to serializable person dictionary. """ return from_swh(person, bytess={'name', 'fullname', 'email'}) def from_origin_visit(visit): """Convert swh origin_visit to serializable origin_visit dictionary. """ ov = from_swh(visit, hashess={'target'}, bytess={'branch'}, convert={'date'}, convert_fn=lambda d: d.timestamp()) if ov and 'occurrences' in ov: ov['occurrences'] = { decode_with_escape(k): v for k, v in ov['occurrences'].items() } return ov def from_directory_entry(dir_entry): """Convert swh person to serializable person dictionary. """ return from_swh(dir_entry, hashess={'dir_id', 'sha1_git', 'sha1', 'sha256', 'target'}, bytess={'name'}, convert={'status'}, convert_fn=lambda v: 'absent' if v == 'hidden' else v) def from_filetype(content_entry): """Convert swh person to serializable person dictionary. """ return from_swh(content_entry, hashess={'id'}, bytess={'mimetype', 'encoding'}) diff --git a/swh/web/ui/service.py b/swh/web/ui/service.py index 5049073b..4e91ba2a 100644 --- a/swh/web/ui/service.py +++ b/swh/web/ui/service.py @@ -1,764 +1,764 @@ # Copyright (C) 2015-2016 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information from collections import defaultdict from swh.core import hashutil from swh.web.ui import converters, query, backend from swh.web.ui.exc import NotFoundExc def lookup_multiple_hashes(hashes): """Lookup the passed hashes in a single DB connection, using batch processing. Args: An array of {filename: X, sha1: Y}, string X, hex sha1 string Y. Returns: The same array with elements updated with elem['found'] = true if the hash is present in storage, elem['found'] = false if not. """ hashlist = [hashutil.hex_to_hash(elem['sha1']) for elem in hashes] content_missing = backend.content_missing_per_sha1(hashlist) missing = [hashutil.hash_to_hex(x) for x in content_missing] for x in hashes: x.update({'found': True}) for h in hashes: if h['sha1'] in missing: h['found'] = False return hashes def lookup_expression(expression, last_sha1, per_page): """Lookup expression in raw content. Args: expression (str): An expression to lookup through raw indexed content last_sha1 (str): Last sha1 seen per_page (int): Number of results per page Returns: List of ctags whose content match the expression """ for ctag in backend.content_ctags_search(expression, last_sha1, per_page): ctag = converters.from_swh(ctag, hashess={'id'}) ctag['sha1'] = ctag['id'] ctag.pop('id') yield ctag def lookup_hash(q): """Checks if the storage contains a given content checksum Args: query string of the form Returns: Dict with key found containing the hash info if the hash is present, None if not. """ algo, hash = query.parse_hash(q) found = backend.content_find(algo, hash) return {'found': found, 'algo': algo} def search_hash(q): """Checks if the storage contains a given content checksum Args: query string of the form Returns: Dict with key found to True or False, according to whether the checksum is present or not """ algo, hash = query.parse_hash(q) found = backend.content_find(algo, hash) return {'found': found is not None} def lookup_content_provenance(q): """Return provenance information from a specified content. Args: q: query string of the form Yields: provenance information (dict) list if the content is found. """ algo, hash = query.parse_hash(q) provenances = backend.content_find_provenance(algo, hash) if not provenances: return None return (converters.from_provenance(p) for p in provenances) def _lookup_content_sha1(q): """Given a possible input, query for the content's sha1. Args: q: query string of the form Returns: binary sha1 if found or None """ algo, hash = query.parse_hash(q) if algo != 'sha1': hashes = backend.content_find(algo, hash) if not hashes: return None return hashes['sha1'] return hash def lookup_content_ctags(q): """Return ctags information from a specified content. Args: q: query string of the form Yields: ctags information (dict) list if the content is found. """ sha1 = _lookup_content_sha1(q) if not sha1: return None ctags = backend.content_ctags_get(sha1) if not ctags: return None for ctag in ctags: yield converters.from_swh(ctag, hashess={'id'}) def lookup_content_filetype(q): """Return filetype information from a specified content. Args: q: query string of the form Yields: filetype information (dict) list if the content is found. """ sha1 = _lookup_content_sha1(q) if not sha1: return None filetype = backend.content_filetype_get(sha1) if not filetype: return None return converters.from_filetype(filetype) def lookup_content_language(q): """Return language information from a specified content. Args: q: query string of the form Yields: language information (dict) list if the content is found. """ sha1 = _lookup_content_sha1(q) if not sha1: return None lang = backend.content_language_get(sha1) if not lang: return None return converters.from_swh(lang, hashess={'id'}) def lookup_content_license(q): """Return license information from a specified content. Args: q: query string of the form Yields: license information (dict) list if the content is found. """ sha1 = _lookup_content_sha1(q) if not sha1: return None lang = backend.content_license_get(sha1) if not lang: return None return converters.from_swh(lang, hashess={'id'}) def lookup_origin(origin): """Return information about the origin matching dict origin. Args: origin: origin's dict with keys either 'id' or ('type' AND 'url') Returns: origin information as dict. """ - return backend.origin_get(origin) + return converters.from_origin(backend.origin_get(origin)) def lookup_person(person_id): """Return information about the person with id person_id. Args: person_id as string Returns: person information as dict. """ person = backend.person_get(person_id) return converters.from_person(person) def lookup_directory(sha1_git): """Return information about the directory with id sha1_git. Args: sha1_git as string Returns: directory information as dict. """ _, sha1_git_bin = query.parse_hash_with_algorithms_or_throws( sha1_git, ['sha1'], # HACK: sha1_git really 'Only sha1_git is supported.') dir = backend.directory_get(sha1_git_bin) if not dir: return None directory_entries = backend.directory_ls(sha1_git_bin) return map(converters.from_directory_entry, directory_entries) def lookup_directory_with_path(directory_sha1_git, path_string): """Return directory information for entry with path path_string w.r.t. root directory pointed by directory_sha1_git Args: - directory_sha1_git: sha1_git corresponding to the directory to which we append paths to (hopefully) find the entry - the relative path to the entry starting from the directory pointed by directory_sha1_git Raises: NotFoundExc if the directory entry is not found """ _, sha1_git_bin = query.parse_hash_with_algorithms_or_throws( directory_sha1_git, ['sha1'], 'Only sha1_git is supported.') queried_dir = backend.directory_entry_get_by_path( sha1_git_bin, path_string) if not queried_dir: raise NotFoundExc(('Directory entry with path %s from %s not found') % (path_string, directory_sha1_git)) return converters.from_directory_entry(queried_dir) def lookup_release(release_sha1_git): """Return information about the release with sha1 release_sha1_git. Args: release_sha1_git: The release's sha1 as hexadecimal Returns: Release information as dict. Raises: ValueError if the identifier provided is not of sha1 nature. """ _, sha1_git_bin = query.parse_hash_with_algorithms_or_throws( release_sha1_git, ['sha1'], 'Only sha1_git is supported.') res = backend.release_get(sha1_git_bin) return converters.from_release(res) def lookup_revision(rev_sha1_git): """Return information about the revision with sha1 revision_sha1_git. Args: revision_sha1_git: The revision's sha1 as hexadecimal Returns: Revision information as dict. Raises: ValueError if the identifier provided is not of sha1 nature. """ _, sha1_git_bin = query.parse_hash_with_algorithms_or_throws( rev_sha1_git, ['sha1'], 'Only sha1_git is supported.') revision = backend.revision_get(sha1_git_bin) return converters.from_revision(revision) def lookup_revision_multiple(sha1_git_list): """Return information about the revision with sha1 revision_sha1_git. Args: revision_sha1_git: The revision's sha1 as hexadecimal Returns: Revision information as dict. Raises: ValueError if the identifier provided is not of sha1 nature. """ def to_sha1_bin(sha1_hex): _, sha1_git_bin = query.parse_hash_with_algorithms_or_throws( sha1_hex, ['sha1'], 'Only sha1_git is supported.') return sha1_git_bin sha1_bin_list = (to_sha1_bin(x) for x in sha1_git_list) revisions = backend.revision_get_multiple(sha1_bin_list) return (converters.from_revision(x) for x in revisions) def lookup_revision_message(rev_sha1_git): """Return the raw message of the revision with sha1 revision_sha1_git. Args: revision_sha1_git: The revision's sha1 as hexadecimal Returns: Decoded revision message as dict {'message': } Raises: ValueError if the identifier provided is not of sha1 nature. NotFoundExc if the revision is not found, or if it has no message """ _, sha1_git_bin = query.parse_hash_with_algorithms_or_throws( rev_sha1_git, ['sha1'], 'Only sha1_git is supported.') revision = backend.revision_get(sha1_git_bin) if not revision: raise NotFoundExc('Revision with sha1_git %s not found.' % rev_sha1_git) if 'message' not in revision: raise NotFoundExc('No message for revision with sha1_git %s.' % rev_sha1_git) res = {'message': revision['message']} return res def lookup_revision_by(origin_id, branch_name="refs/heads/master", timestamp=None): """Lookup revisions by origin_id, branch_name and timestamp. If: - branch_name is not provided, lookup using 'refs/heads/master' as default. - ts is not provided, use the most recent Args: - origin_id: origin of the revision. - branch_name: revision's branch. - timestamp: revision's time frame. Yields: The revisions matching the criterions. """ res = backend.revision_get_by(origin_id, branch_name, timestamp) return converters.from_revision(res) def lookup_revision_log(rev_sha1_git, limit): """Return information about the revision with sha1 revision_sha1_git. Args: revision_sha1_git: The revision's sha1 as hexadecimal limit: the maximum number of revisions returned Returns: Revision information as dict. Raises: ValueError if the identifier provided is not of sha1 nature. """ _, sha1_git_bin = query.parse_hash_with_algorithms_or_throws( rev_sha1_git, ['sha1'], 'Only sha1_git is supported.') revision_entries = backend.revision_log(sha1_git_bin, limit) return map(converters.from_revision, revision_entries) def lookup_revision_log_by(origin_id, branch_name, timestamp, limit): """Return information about the revision with sha1 revision_sha1_git. Args: origin_id: origin of the revision branch_name: revision's branch timestamp: revision's time frame limit: the maximum number of revisions returned Returns: Revision information as dict. Raises: NotFoundExc if no revision corresponds to the criterion NotFoundExc if the corresponding revision has no log """ revision_entries = backend.revision_log_by(origin_id, branch_name, timestamp, limit) if not revision_entries: return None return map(converters.from_revision, revision_entries) def lookup_revision_with_context_by(origin_id, branch_name, ts, sha1_git, limit=100): """Return information about revision sha1_git, limited to the sub-graph of all transitive parents of sha1_git_root. sha1_git_root being resolved through the lookup of a revision by origin_id, branch_name and ts. In other words, sha1_git is an ancestor of sha1_git_root. Args: - origin_id: origin of the revision. - branch_name: revision's branch. - timestamp: revision's time frame. - sha1_git: one of sha1_git_root's ancestors. - limit: limit the lookup to 100 revisions back. Returns: Pair of (root_revision, revision). Information on sha1_git if it is an ancestor of sha1_git_root including children leading to sha1_git_root Raises: - BadInputExc in case of unknown algo_hash or bad hash. - NotFoundExc if either revision is not found or if sha1_git is not an ancestor of sha1_git_root. """ rev_root = backend.revision_get_by(origin_id, branch_name, ts) if not rev_root: raise NotFoundExc('Revision with (origin_id: %s, branch_name: %s' ', ts: %s) not found.' % (origin_id, branch_name, ts)) return (converters.from_revision(rev_root), lookup_revision_with_context(rev_root, sha1_git, limit)) def lookup_revision_with_context(sha1_git_root, sha1_git, limit=100): """Return information about revision sha1_git, limited to the sub-graph of all transitive parents of sha1_git_root. In other words, sha1_git is an ancestor of sha1_git_root. Args: sha1_git_root: latest revision. The type is either a sha1 (as an hex string) or a non converted dict. sha1_git: one of sha1_git_root's ancestors limit: limit the lookup to 100 revisions back Returns: Information on sha1_git if it is an ancestor of sha1_git_root including children leading to sha1_git_root Raises: BadInputExc in case of unknown algo_hash or bad hash NotFoundExc if either revision is not found or if sha1_git is not an ancestor of sha1_git_root """ _, sha1_git_bin = query.parse_hash_with_algorithms_or_throws( sha1_git, ['sha1'], 'Only sha1_git is supported.') revision = backend.revision_get(sha1_git_bin) if not revision: raise NotFoundExc('Revision %s not found' % sha1_git) if isinstance(sha1_git_root, str): _, sha1_git_root_bin = query.parse_hash_with_algorithms_or_throws( sha1_git_root, ['sha1'], 'Only sha1_git is supported.') revision_root = backend.revision_get(sha1_git_root_bin) if not revision_root: raise NotFoundExc('Revision root %s not found' % sha1_git_root) else: sha1_git_root_bin = sha1_git_root['id'] revision_log = backend.revision_log(sha1_git_root_bin, limit) parents = {} children = defaultdict(list) for rev in revision_log: rev_id = rev['id'] parents[rev_id] = [] for parent_id in rev['parents']: parents[rev_id].append(parent_id) children[parent_id].append(rev_id) if revision['id'] not in parents: raise NotFoundExc('Revision %s is not an ancestor of %s' % (sha1_git, sha1_git_root)) revision['children'] = children[revision['id']] return converters.from_revision(revision) def lookup_directory_with_revision(sha1_git, dir_path=None, with_data=False): """Return information on directory pointed by revision with sha1_git. If dir_path is not provided, display top level directory. Otherwise, display the directory pointed by dir_path (if it exists). Args: sha1_git: revision's hash. dir_path: optional directory pointed to by that revision. with_data: boolean that indicates to retrieve the raw data if the path resolves to a content. Default to False (for the api) Returns: Information on the directory pointed to by that revision. Raises: BadInputExc in case of unknown algo_hash or bad hash. NotFoundExc either if the revision is not found or the path referenced does not exist. NotImplementedError in case of dir_path exists but do not reference a type 'dir' or 'file'. """ _, sha1_git_bin = query.parse_hash_with_algorithms_or_throws( sha1_git, ['sha1'], 'Only sha1_git is supported.') revision = backend.revision_get(sha1_git_bin) if not revision: raise NotFoundExc('Revision %s not found' % sha1_git) dir_sha1_git_bin = revision['directory'] if dir_path: entity = backend.directory_entry_get_by_path(dir_sha1_git_bin, dir_path) if not entity: raise NotFoundExc( "Directory or File '%s' pointed to by revision %s not found" % (dir_path, sha1_git)) else: entity = {'type': 'dir', 'target': dir_sha1_git_bin} if entity['type'] == 'dir': directory_entries = backend.directory_ls(entity['target']) return {'type': 'dir', 'path': '.' if not dir_path else dir_path, 'revision': sha1_git, 'content': map(converters.from_directory_entry, directory_entries)} elif entity['type'] == 'file': # content content = backend.content_find('sha1_git', entity['target']) if with_data: content['data'] = backend.content_get(content['sha1'])['data'] return {'type': 'file', 'path': '.' if not dir_path else dir_path, 'revision': sha1_git, 'content': converters.from_content(content)} else: raise NotImplementedError('Entity of type %s not implemented.' % entity['type']) def lookup_content(q): """Lookup the content designed by q. Args: q: The release's sha1 as hexadecimal """ algo, hash = query.parse_hash(q) c = backend.content_find(algo, hash) return converters.from_content(c) def lookup_content_raw(q): """Lookup the content defined by q. Args: q: query string of the form Returns: dict with 'sha1' and 'data' keys. data representing its raw data decoded. """ algo, hash = query.parse_hash(q) c = backend.content_find(algo, hash) if not c: return None content = backend.content_get(c['sha1']) return converters.from_content(content) def stat_counters(): """Return the stat counters for Software Heritage Returns: A dict mapping textual labels to integer values. """ return backend.stat_counters() def lookup_origin_visits(origin_id): """Yields the origin origin_ids' visits. Args: origin_id: origin to list visits for Yields: Dictionaries of origin_visit for that origin """ visits = backend.lookup_origin_visits(origin_id) for visit in visits: yield converters.from_origin_visit(visit) def lookup_origin_visit(origin_id, visit_id): """Return information about visit visit_id with origin origin_id. Args: origin_id: origin concerned by the visit visit_id: the visit identifier to lookup Yields: The dict origin_visit concerned """ visit = backend.lookup_origin_visit(origin_id, visit_id) return converters.from_origin_visit(visit) def lookup_entity_by_uuid(uuid): """Return the entity's hierarchy from its uuid. Args: uuid: entity's identifier. Returns: List of hierarchy entities from the entity with uuid. """ uuid = query.parse_uuid4(uuid) return backend.entity_get(uuid) def lookup_revision_through(revision, limit=100): """Retrieve a revision from the criterion stored in revision dictionary. Args: revision: Dictionary of criterion to lookup the revision with. Here are the supported combination of possible values: - origin_id, branch_name, ts, sha1_git - origin_id, branch_name, ts - sha1_git_root, sha1_git - sha1_git Returns: None if the revision is not found or the actual revision. """ if 'origin_id' in revision and \ 'branch_name' in revision and \ 'ts' in revision and \ 'sha1_git' in revision: return lookup_revision_with_context_by(revision['origin_id'], revision['branch_name'], revision['ts'], revision['sha1_git'], limit) if 'origin_id' in revision and \ 'branch_name' in revision and \ 'ts' in revision: return lookup_revision_by(revision['origin_id'], revision['branch_name'], revision['ts']) if 'sha1_git_root' in revision and \ 'sha1_git' in revision: return lookup_revision_with_context(revision['sha1_git_root'], revision['sha1_git'], limit) if 'sha1_git' in revision: return lookup_revision(revision['sha1_git']) # this should not happen raise NotImplementedError('Should not happen!') def lookup_directory_through_revision(revision, path=None, limit=100, with_data=False): """Retrieve the directory information from the revision. Args: revision: dictionary of criterion representing a revision to lookup path: directory's path to lookup. limit: optional query parameter to limit the revisions log. (default to 100). For now, note that this limit could impede the transitivity conclusion about sha1_git not being an ancestor of. with_data: indicate to retrieve the content's raw data if path resolves to a content. Returns: The directory pointing to by the revision criterions at path. """ rev = lookup_revision_through(revision, limit) if not rev: raise NotFoundExc('Revision with criterion %s not found!' % revision) return (rev['id'], lookup_directory_with_revision(rev['id'], path, with_data)) diff --git a/swh/web/ui/tests/test_converters.py b/swh/web/ui/tests/test_converters.py index 193f2267..d5e9b97b 100644 --- a/swh/web/ui/tests/test_converters.py +++ b/swh/web/ui/tests/test_converters.py @@ -1,715 +1,715 @@ # Copyright (C) 2015 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information import datetime import unittest from nose.tools import istest from swh.core import hashutil from swh.web.ui import converters class ConvertersTestCase(unittest.TestCase): @istest def from_swh(self): some_input = { 'a': 'something', 'b': 'someone', 'c': b'sharp-0.3.4.tgz', 'd': hashutil.hex_to_hash( 'b04caf10e9535160d90e874b45aa426de762f19f'), 'e': b'sharp.html/doc_002dS_005fISREG.html', 'g': [b'utf-8-to-decode', b'another-one'], 'h': 'something filtered', 'i': {'e': b'something'}, 'j': { 'k': { 'l': [b'bytes thing', b'another thingy'], 'n': 'dont care either' }, 'm': 'dont care' }, 'o': 'something', 'p': 'bar', 'q': 'intact', 'r': {'p': 'also intact', 'q': 'bar'}, 's': { 'timestamp': 42, 'offset': -420, 'negative_utc': None, - } + }, + 't': None, } expected_output = { 'a': 'something', 'b': 'someone', 'c': 'sharp-0.3.4.tgz', 'd': 'b04caf10e9535160d90e874b45aa426de762f19f', 'e': 'sharp.html/doc_002dS_005fISREG.html', 'g': ['utf-8-to-decode', 'another-one'], 'i': {'e': 'something'}, 'j': { 'k': { 'l': ['bytes thing', 'another thingy'] } }, 'p': 'foo', 'q': 'intact', 'r': {'p': 'also intact', 'q': 'foo'}, 's': '1969-12-31T17:00:42-07:00', } def test_convert_fn(v): return 'foo' if v == 'bar' else v actual_output = converters.from_swh(some_input, hashess={'d', 'o'}, bytess={'c', 'e', 'g', 'l'}, dates={'s'}, blacklist={'h', 'm', 'n', 'o'}, + removables_if_empty={'t'}, convert={'p', 'q'}, convert_fn=test_convert_fn) self.assertEquals(expected_output, actual_output) @istest def from_swh_edge_cases_do_no_conversion_if_none_or_not_bytes(self): some_input = { 'a': 'something', 'b': None, 'c': 'someone', 'd': None, } expected_output = { 'a': 'something', 'b': None, 'c': 'someone', 'd': None, } actual_output = converters.from_swh(some_input, hashess={'a', 'b'}, bytess={'c', 'd'}) self.assertEquals(expected_output, actual_output) @istest def from_swh_edge_cases_convert_invalid_utf8_bytes(self): some_input = { 'a': 'something', 'b': 'someone', 'c': b'a name \xff', 'd': b'an email \xff', } expected_output = { 'a': 'something', 'b': 'someone', 'c': 'a name \\xff', 'd': 'an email \\xff', 'decoding_failures': ['c', 'd'] } actual_output = converters.from_swh(some_input, hashess={'a', 'b'}, bytess={'c', 'd'}) for v in ['a', 'b', 'c', 'd']: self.assertEqual(expected_output[v], actual_output[v]) self.assertEqual(len(expected_output['decoding_failures']), len(actual_output['decoding_failures'])) for v in expected_output['decoding_failures']: self.assertTrue(v in actual_output['decoding_failures']) @istest def from_swh_empty(self): # when self.assertEquals({}, converters.from_swh({})) @istest def from_swh_none(self): # when self.assertIsNone(converters.from_swh(None)) @istest def from_provenance(self): # given input_provenance = { 'origin': 10, 'visit': 1, 'content': hashutil.hex_to_hash( '321caf10e9535160d90e874b45aa426de762f19f'), 'revision': hashutil.hex_to_hash( '123caf10e9535160d90e874b45aa426de762f19f'), 'path': b'octave-3.4.0/doc/interpreter/octave/doc_002dS_005fISREG' } expected_provenance = { 'origin': 10, 'visit': 1, 'content': '321caf10e9535160d90e874b45aa426de762f19f', 'revision': '123caf10e9535160d90e874b45aa426de762f19f', 'path': 'octave-3.4.0/doc/interpreter/octave/doc_002dS_005fISREG' } # when actual_provenance = converters.from_provenance(input_provenance) # then self.assertEqual(actual_provenance, expected_provenance) @istest def from_origin(self): # given origin_input = { - 'origin_type': 'ftp', - 'origin_url': 'rsync://ftp.gnu.org/gnu/octave', - 'branch': 'octave-3.4.0.tar.gz', - 'revision': b'\xb0L\xaf\x10\xe9SQ`\xd9\x0e\x87KE\xaaBm\xe7b\xf1\x9f', # noqa - 'path': b'octave-3.4.0/doc/interpreter/octave.html/doc_002dS_005fISREG.html' # noqa + 'id': 9, + 'type': 'ftp', + 'url': 'rsync://ftp.gnu.org/gnu/octave', + 'project': None, + 'lister': None, } expected_origin = { - 'origin_type': 'ftp', - 'origin_url': 'rsync://ftp.gnu.org/gnu/octave', - 'branch': 'octave-3.4.0.tar.gz', - 'revision': 'b04caf10e9535160d90e874b45aa426de762f19f', - 'path': 'octave-3.4.0/doc/interpreter/octave.html/doc_002dS_005fISREG.html' # noqa + 'id': 9, + 'type': 'ftp', + 'url': 'rsync://ftp.gnu.org/gnu/octave', } # when actual_origin = converters.from_origin(origin_input) # then self.assertEqual(actual_origin, expected_origin) @istest def from_release(self): release_input = { 'id': hashutil.hex_to_hash( 'aad23fa492a0c5fed0708a6703be875448c86884'), 'target': hashutil.hex_to_hash( '5e46d564378afc44b31bb89f99d5675195fbdf67'), 'target_type': 'revision', 'date': { 'timestamp': datetime.datetime( 2015, 1, 1, 22, 0, 0, tzinfo=datetime.timezone.utc).timestamp(), 'offset': 0, 'negative_utc': False, }, 'author': { 'name': b'author name', 'fullname': b'Author Name author@email', 'email': b'author@email', }, 'name': b'v0.0.1', 'message': b'some comment on release', 'synthetic': True, } expected_release = { 'id': 'aad23fa492a0c5fed0708a6703be875448c86884', 'target': '5e46d564378afc44b31bb89f99d5675195fbdf67', 'target_type': 'revision', 'date': '2015-01-01T22:00:00+00:00', 'author': { 'name': 'author name', 'fullname': 'Author Name author@email', 'email': 'author@email', }, 'name': 'v0.0.1', 'message': 'some comment on release', 'target_type': 'revision', 'synthetic': True, } # when actual_release = converters.from_release(release_input) # then self.assertEqual(actual_release, expected_release) @istest def from_release_no_revision(self): release_input = { 'id': hashutil.hex_to_hash( 'b2171ee2bdf119cd99a7ec7eff32fa8013ef9a4e'), 'target': None, 'date': { 'timestamp': datetime.datetime( 2016, 3, 2, 10, 0, 0, tzinfo=datetime.timezone.utc).timestamp(), 'offset': 0, 'negative_utc': True, }, 'name': b'v0.1.1', 'message': b'comment on release', 'synthetic': False, 'author': { 'name': b'bob', 'fullname': b'Bob bob@alice.net', 'email': b'bob@alice.net', }, } expected_release = { 'id': 'b2171ee2bdf119cd99a7ec7eff32fa8013ef9a4e', 'target': None, 'date': '2016-03-02T10:00:00-00:00', 'name': 'v0.1.1', 'message': 'comment on release', 'synthetic': False, 'author': { 'name': 'bob', 'fullname': 'Bob bob@alice.net', 'email': 'bob@alice.net', }, } # when actual_release = converters.from_release(release_input) # then self.assertEqual(actual_release, expected_release) @istest def from_revision(self): revision_input = { 'id': hashutil.hex_to_hash( '18d8be353ed3480476f032475e7c233eff7371d5'), 'directory': hashutil.hex_to_hash( '7834ef7e7c357ce2af928115c6c6a42b7e2a44e6'), 'author': { 'name': b'Software Heritage', 'fullname': b'robot robot@softwareheritage.org', 'email': b'robot@softwareheritage.org', }, 'committer': { 'name': b'Software Heritage', 'fullname': b'robot robot@softwareheritage.org', 'email': b'robot@softwareheritage.org', }, 'message': b'synthetic revision message', 'date': { 'timestamp': datetime.datetime( 2000, 1, 17, 11, 23, 54, tzinfo=datetime.timezone.utc).timestamp(), 'offset': 0, 'negative_utc': False, }, 'committer_date': { 'timestamp': datetime.datetime( 2000, 1, 17, 11, 23, 54, tzinfo=datetime.timezone.utc).timestamp(), 'offset': 0, 'negative_utc': False, }, 'synthetic': True, 'type': 'tar', 'parents': [ hashutil.hex_to_hash( '29d8be353ed3480476f032475e7c244eff7371d5'), hashutil.hex_to_hash( '30d8be353ed3480476f032475e7c244eff7371d5') ], 'children': [ hashutil.hex_to_hash( '123546353ed3480476f032475e7c244eff7371d5'), ], 'metadata': { 'original_artifact': [{ 'archive_type': 'tar', 'name': 'webbase-5.7.0.tar.gz', 'sha1': '147f73f369733d088b7a6fa9c4e0273dcd3c7ccd', 'sha1_git': '6a15ea8b881069adedf11feceec35588f2cfe8f1', 'sha256': '401d0df797110bea805d358b85bcc1ced29549d3d73f' '309d36484e7edf7bb912', }] }, } expected_revision = { 'id': '18d8be353ed3480476f032475e7c233eff7371d5', 'directory': '7834ef7e7c357ce2af928115c6c6a42b7e2a44e6', 'author': { 'name': 'Software Heritage', 'fullname': 'robot robot@softwareheritage.org', 'email': 'robot@softwareheritage.org', }, 'committer': { 'name': 'Software Heritage', 'fullname': 'robot robot@softwareheritage.org', 'email': 'robot@softwareheritage.org', }, 'message': 'synthetic revision message', 'date': "2000-01-17T11:23:54+00:00", 'committer_date': "2000-01-17T11:23:54+00:00", 'children': [ '123546353ed3480476f032475e7c244eff7371d5' ], 'parents': [ '29d8be353ed3480476f032475e7c244eff7371d5', '30d8be353ed3480476f032475e7c244eff7371d5' ], 'type': 'tar', 'synthetic': True, 'metadata': { 'original_artifact': [{ 'archive_type': 'tar', 'name': 'webbase-5.7.0.tar.gz', 'sha1': '147f73f369733d088b7a6fa9c4e0273dcd3c7ccd', 'sha1_git': '6a15ea8b881069adedf11feceec35588f2cfe8f1', 'sha256': '401d0df797110bea805d358b85bcc1ced29549d3d73f' '309d36484e7edf7bb912' }] }, 'merge': True } # when actual_revision = converters.from_revision(revision_input) # then self.assertEqual(actual_revision, expected_revision) @istest def from_revision_nomerge(self): revision_input = { 'id': hashutil.hex_to_hash( '18d8be353ed3480476f032475e7c233eff7371d5'), 'parents': [ hashutil.hex_to_hash( '29d8be353ed3480476f032475e7c244eff7371d5') ] } expected_revision = { 'id': '18d8be353ed3480476f032475e7c233eff7371d5', 'parents': [ '29d8be353ed3480476f032475e7c244eff7371d5' ], 'merge': False } # when actual_revision = converters.from_revision(revision_input) # then self.assertEqual(actual_revision, expected_revision) @istest def from_revision_noparents(self): revision_input = { 'id': hashutil.hex_to_hash( '18d8be353ed3480476f032475e7c233eff7371d5'), 'directory': hashutil.hex_to_hash( '7834ef7e7c357ce2af928115c6c6a42b7e2a44e6'), 'author': { 'name': b'Software Heritage', 'fullname': b'robot robot@softwareheritage.org', 'email': b'robot@softwareheritage.org', }, 'committer': { 'name': b'Software Heritage', 'fullname': b'robot robot@softwareheritage.org', 'email': b'robot@softwareheritage.org', }, 'message': b'synthetic revision message', 'date': { 'timestamp': datetime.datetime( 2000, 1, 17, 11, 23, 54, tzinfo=datetime.timezone.utc).timestamp(), 'offset': 0, 'negative_utc': False, }, 'committer_date': { 'timestamp': datetime.datetime( 2000, 1, 17, 11, 23, 54, tzinfo=datetime.timezone.utc).timestamp(), 'offset': 0, 'negative_utc': False, }, 'synthetic': True, 'type': 'tar', 'children': [ hashutil.hex_to_hash( '123546353ed3480476f032475e7c244eff7371d5'), ], 'metadata': { 'original_artifact': [{ 'archive_type': 'tar', 'name': 'webbase-5.7.0.tar.gz', 'sha1': '147f73f369733d088b7a6fa9c4e0273dcd3c7ccd', 'sha1_git': '6a15ea8b881069adedf11feceec35588f2cfe8f1', 'sha256': '401d0df797110bea805d358b85bcc1ced29549d3d73f' '309d36484e7edf7bb912', }] }, } expected_revision = { 'id': '18d8be353ed3480476f032475e7c233eff7371d5', 'directory': '7834ef7e7c357ce2af928115c6c6a42b7e2a44e6', 'author': { 'name': 'Software Heritage', 'fullname': 'robot robot@softwareheritage.org', 'email': 'robot@softwareheritage.org', }, 'committer': { 'name': 'Software Heritage', 'fullname': 'robot robot@softwareheritage.org', 'email': 'robot@softwareheritage.org', }, 'message': 'synthetic revision message', 'date': "2000-01-17T11:23:54+00:00", 'committer_date': "2000-01-17T11:23:54+00:00", 'children': [ '123546353ed3480476f032475e7c244eff7371d5' ], 'type': 'tar', 'synthetic': True, 'metadata': { 'original_artifact': [{ 'archive_type': 'tar', 'name': 'webbase-5.7.0.tar.gz', 'sha1': '147f73f369733d088b7a6fa9c4e0273dcd3c7ccd', 'sha1_git': '6a15ea8b881069adedf11feceec35588f2cfe8f1', 'sha256': '401d0df797110bea805d358b85bcc1ced29549d3d73f' '309d36484e7edf7bb912' }] } } # when actual_revision = converters.from_revision(revision_input) # then self.assertEqual(actual_revision, expected_revision) @istest def from_revision_invalid(self): revision_input = { 'id': hashutil.hex_to_hash( '18d8be353ed3480476f032475e7c233eff7371d5'), 'directory': hashutil.hex_to_hash( '7834ef7e7c357ce2af928115c6c6a42b7e2a44e6'), 'author': { 'name': b'Software Heritage', 'fullname': b'robot robot@softwareheritage.org', 'email': b'robot@softwareheritage.org', }, 'committer': { 'name': b'Software Heritage', 'fullname': b'robot robot@softwareheritage.org', 'email': b'robot@softwareheritage.org', }, 'message': b'invalid message \xff', 'date': { 'timestamp': datetime.datetime( 2000, 1, 17, 11, 23, 54, tzinfo=datetime.timezone.utc).timestamp(), 'offset': 0, 'negative_utc': False, }, 'committer_date': { 'timestamp': datetime.datetime( 2000, 1, 17, 11, 23, 54, tzinfo=datetime.timezone.utc).timestamp(), 'offset': 0, 'negative_utc': False, }, 'synthetic': True, 'type': 'tar', 'parents': [ hashutil.hex_to_hash( '29d8be353ed3480476f032475e7c244eff7371d5'), hashutil.hex_to_hash( '30d8be353ed3480476f032475e7c244eff7371d5') ], 'children': [ hashutil.hex_to_hash( '123546353ed3480476f032475e7c244eff7371d5'), ], 'metadata': { 'original_artifact': [{ 'archive_type': 'tar', 'name': 'webbase-5.7.0.tar.gz', 'sha1': '147f73f369733d088b7a6fa9c4e0273dcd3c7ccd', 'sha1_git': '6a15ea8b881069adedf11feceec35588f2cfe8f1', 'sha256': '401d0df797110bea805d358b85bcc1ced29549d3d73f' '309d36484e7edf7bb912', }] }, } expected_revision = { 'id': '18d8be353ed3480476f032475e7c233eff7371d5', 'directory': '7834ef7e7c357ce2af928115c6c6a42b7e2a44e6', 'author': { 'name': 'Software Heritage', 'fullname': 'robot robot@softwareheritage.org', 'email': 'robot@softwareheritage.org', }, 'committer': { 'name': 'Software Heritage', 'fullname': 'robot robot@softwareheritage.org', 'email': 'robot@softwareheritage.org', }, 'message': None, 'message_decoding_failed': True, 'date': "2000-01-17T11:23:54+00:00", 'committer_date': "2000-01-17T11:23:54+00:00", 'children': [ '123546353ed3480476f032475e7c244eff7371d5' ], 'parents': [ '29d8be353ed3480476f032475e7c244eff7371d5', '30d8be353ed3480476f032475e7c244eff7371d5' ], 'type': 'tar', 'synthetic': True, 'metadata': { 'original_artifact': [{ 'archive_type': 'tar', 'name': 'webbase-5.7.0.tar.gz', 'sha1': '147f73f369733d088b7a6fa9c4e0273dcd3c7ccd', 'sha1_git': '6a15ea8b881069adedf11feceec35588f2cfe8f1', 'sha256': '401d0df797110bea805d358b85bcc1ced29549d3d73f' '309d36484e7edf7bb912' }] }, 'merge': True } # when actual_revision = converters.from_revision(revision_input) # then self.assertEqual(actual_revision, expected_revision) @istest def from_content_None(self): self.assertIsNone(converters.from_content(None)) @istest def from_content(self): content_input = { 'sha1': hashutil.hex_to_hash('5c6f0e2750f48fa0bd0c4cf5976ba0b9e0' '2ebda5'), 'sha256': hashutil.hex_to_hash('39007420ca5de7cb3cfc15196335507e' 'e76c98930e7e0afa4d2747d3bf96c926'), 'sha1_git': hashutil.hex_to_hash('40e71b8614fcd89ccd17ca2b1d9e66' 'c5b00a6d03'), 'ctime': 'something-which-is-filtered-out', 'data': b'data in bytes', 'length': 10, 'status': 'hidden', } # 'status' is filtered expected_content = { 'sha1': '5c6f0e2750f48fa0bd0c4cf5976ba0b9e02ebda5', 'sha256': '39007420ca5de7cb3cfc15196335507ee76c98930e7e0afa4d274' '7d3bf96c926', 'sha1_git': '40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03', 'data': b'data in bytes', 'length': 10, 'status': 'absent', } # when actual_content = converters.from_content(content_input) # then self.assertEqual(actual_content, expected_content) @istest def from_person(self): person_input = { 'id': 10, 'anything': 'else', 'name': b'bob', 'fullname': b'bob bob@alice.net', 'email': b'bob@foo.alice', } expected_person = { 'id': 10, 'anything': 'else', 'name': 'bob', 'fullname': 'bob bob@alice.net', 'email': 'bob@foo.alice', } # when actual_person = converters.from_person(person_input) # then self.assertEqual(actual_person, expected_person) @istest def from_directory_entries(self): dir_entries_input = { 'sha1': hashutil.hex_to_hash('5c6f0e2750f48fa0bd0c4cf5976ba0b9e0' '2ebda5'), 'sha256': hashutil.hex_to_hash('39007420ca5de7cb3cfc15196335507e' 'e76c98930e7e0afa4d2747d3bf96c926'), 'sha1_git': hashutil.hex_to_hash('40e71b8614fcd89ccd17ca2b1d9e66' 'c5b00a6d03'), 'target': hashutil.hex_to_hash('40e71b8614fcd89ccd17ca2b1d9e66' 'c5b00a6d03'), 'dir_id': hashutil.hex_to_hash('40e71b8614fcd89ccd17ca2b1d9e66' 'c5b00a6d03'), 'name': b'bob', 'type': 10, 'status': 'hidden', } expected_dir_entries = { 'sha1': '5c6f0e2750f48fa0bd0c4cf5976ba0b9e02ebda5', 'sha256': '39007420ca5de7cb3cfc15196335507ee76c98930e7e0afa4d2747' 'd3bf96c926', 'sha1_git': '40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03', 'target': '40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03', 'dir_id': '40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03', 'name': 'bob', 'type': 10, 'status': 'absent', } # when actual_dir_entries = converters.from_directory_entry(dir_entries_input) # then self.assertEqual(actual_dir_entries, expected_dir_entries) @istest def from_filetype(self): content_filetype = { 'id': hashutil.hex_to_hash('5c6f0e2750f48fa0bd0c4cf5976ba0b9e02ebd' 'a5'), 'encoding': b'utf-8', 'mimetype': b'text/plain', } expected_content_filetype = { 'id': '5c6f0e2750f48fa0bd0c4cf5976ba0b9e02ebda5', 'encoding': 'utf-8', 'mimetype': 'text/plain', } # when actual_content_filetype = converters.from_filetype(content_filetype) # then self.assertEqual(actual_content_filetype, expected_content_filetype)