diff --git a/swh/web/api/utils.py b/swh/web/api/utils.py index 33a234503..09f3cbdfe 100644 --- a/swh/web/api/utils.py +++ b/swh/web/api/utils.py @@ -1,353 +1,306 @@ # Copyright (C) 2015-2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information -import re -from swh.web.common.utils import reverse, fmap +from swh.web.common.utils import reverse from swh.web.common.query import parse_hash -def filter_endpoints(url_map, prefix_url_rule, blacklist=[]): - """Filter endpoints by prefix url rule. - - Args: - - url_map: Url Werkzeug.Map of rules - - prefix_url_rule: prefix url string - - blacklist: blacklist of some url - - Returns: - Dictionary of url_rule with values methods and endpoint. - - The key is the url, the associated value is a dictionary of - 'methods' (possible http methods) and 'endpoint' (python function) - - """ - out = {} - for r in url_map: - rule = r['rule'] - if rule == prefix_url_rule or rule in blacklist: - continue - - if rule.startswith(prefix_url_rule): - out[rule] = {'methods': sorted(map(str, r['methods'])), - 'endpoint': r['endpoint']} - return out - - -def prepare_data_for_view(data, encoding='utf-8'): - def prepare_data(s): - # Note: can only be 'data' key with bytes of raw content - if isinstance(s, bytes): - try: - return s.decode(encoding) - except Exception: - return "Cannot decode the data bytes, try and set another " \ - "encoding in the url (e.g. ?encoding=utf8) or " \ - "download directly the " \ - "content's raw data." - if isinstance(s, str): - return re.sub(r'/api/1/', r'/browse/', s) - - return s - - return fmap(prepare_data, data) - - def filter_field_keys(data, field_keys): """Given an object instance (directory or list), and a csv field keys to filter on. Return the object instance with filtered keys. Note: Returns obj as is if it's an instance of types not in (dictionary, list) Args: - data: one object (dictionary, list...) to filter. - field_keys: csv or set of keys to filter the object on Returns: obj filtered on field_keys """ if isinstance(data, map): return map(lambda x: filter_field_keys(x, field_keys), data) if isinstance(data, list): return [filter_field_keys(x, field_keys) for x in data] if isinstance(data, dict): return {k: v for (k, v) in data.items() if k in field_keys} return data def person_to_string(person): """Map a person (person, committer, tagger, etc...) to a string. """ return ''.join([person['name'], ' <', person['email'], '>']) def enrich_object(object): """Enrich an object (revision, release) with link to the 'target' of type 'target_type'. Args: object: An object with target and target_type keys (e.g. release, revision) Returns: Object enriched with target_url pointing to the right swh.web.ui.api urls for the pointing object (revision, release, content, directory) """ obj = object.copy() if 'target' in obj and 'target_type' in obj: if obj['target_type'] == 'revision': obj['target_url'] = reverse('revision', kwargs={'sha1_git': obj['target']}) elif obj['target_type'] == 'release': obj['target_url'] = reverse('release', kwargs={'sha1_git': obj['target']}) elif obj['target_type'] == 'content': obj['target_url'] = \ reverse('content', kwargs={'q': 'sha1_git:' + obj['target']}) elif obj['target_type'] == 'directory': obj['target_url'] = reverse('directory', kwargs={'sha1_git': obj['target']}) if 'author' in obj: author = obj['author'] obj['author_url'] = reverse('person', kwargs={'person_id': author['id']}) return obj enrich_release = enrich_object def enrich_directory(directory, context_url=None): """Enrich directory with url to content or directory. """ if 'type' in directory: target_type = directory['type'] target = directory['target'] if target_type == 'file': directory['target_url'] = \ reverse('content', kwargs={'q': 'sha1_git:%s' % target}) if context_url: directory['file_url'] = context_url + directory['name'] + '/' elif target_type == 'dir': directory['target_url'] = reverse('directory', kwargs={'sha1_git': target}) if context_url: directory['dir_url'] = context_url + directory['name'] + '/' else: directory['target_url'] = reverse('revision', kwargs={'sha1_git': target}) if context_url: directory['rev_url'] = context_url + directory['name'] + '/' return directory def enrich_metadata_endpoint(content): """Enrich metadata endpoint with link to the upper metadata endpoint. """ c = content.copy() c['content_url'] = reverse('content', args=['sha1:%s' % c['id']]) return c def enrich_content(content, top_url=False, query_string=None): """Enrich content with links to: - data_url: its raw data - filetype_url: its filetype information - language_url: its programming language information - license_url: its licensing information Args: content: dict of data associated to a swh content object top_url: whether or not to include the content url in the enriched data query_string: optional query string of type ':' used when requesting the content, it acts as a hint for picking the same hash method when computing the url listed above Returns: An enriched content dict filled with additional urls """ checksums = content if 'checksums' in content: checksums = content['checksums'] hash_algo = 'sha1' if query_string: hash_algo = parse_hash(query_string)[0] if hash_algo in checksums: q = '%s:%s' % (hash_algo, checksums[hash_algo]) if top_url: content['content_url'] = reverse('content', kwargs={'q': q}) content['data_url'] = reverse('content-raw', kwargs={'q': q}) content['filetype_url'] = reverse('content-filetype', kwargs={'q': q}) content['language_url'] = reverse('content-language', kwargs={'q': q}) content['license_url'] = reverse('content-license', kwargs={'q': q}) return content def enrich_entity(entity): """Enrich entity with """ if 'uuid' in entity: entity['uuid_url'] = reverse('entity', kwargs={'uuid': entity['uuid']}) if 'parent' in entity and entity['parent']: entity['parent_url'] = reverse('entity', kwargs={'uuid': entity['parent']}) return entity def _get_path_list(path_string): """Helper for enrich_revision: get a list of the sha1 id of the navigation breadcrumbs, ordered from the oldest to the most recent. Args: path_string: the path as a '/'-separated string Returns: The navigation context as a list of sha1 revision ids """ return path_string.split('/') def _get_revision_contexts(rev_id, context): """Helper for enrich_revision: retrieve for the revision id and potentially the navigation breadcrumbs the context to pass to parents and children of of the revision. Args: rev_id: the revision's sha1 id context: the current navigation context Returns: The context for parents, children and the url of the direct child as a tuple in that order. """ context_for_parents = None context_for_children = None url_direct_child = None if not context: return (rev_id, None, None) path_list = _get_path_list(context) context_for_parents = '%s/%s' % (context, rev_id) prev_for_children = path_list[:-1] if len(prev_for_children) > 0: context_for_children = '/'.join(prev_for_children) child_id = path_list[-1] # This commit is not the first commit in the path if context_for_children: url_direct_child = reverse('revision-context', kwargs={'sha1_git': child_id, 'context': context_for_children}) # This commit is the first commit in the path else: url_direct_child = reverse('revision', kwargs={'sha1_git': child_id}) return (context_for_parents, context_for_children, url_direct_child) def _make_child_url(rev_children, context): """Helper for enrich_revision: retrieve the list of urls corresponding to the children of the current revision according to the navigation breadcrumbs. Args: rev_children: a list of revision id context: the '/'-separated navigation breadcrumbs Returns: the list of the children urls according to the context """ children = [] for child in rev_children: if context and child != _get_path_list(context)[-1]: children.append(reverse('revision', kwargs={'sha1_git': child})) elif not context: children.append(reverse('revision', kwargs={'sha1_git': child})) return children def enrich_revision(revision, context=None): """Enrich revision with links where it makes sense (directory, parents). Keep track of the navigation breadcrumbs if they are specified. Args: revision: the revision as a dict context: the navigation breadcrumbs as a /-separated string of revision sha1_git """ ctx_parents, ctx_children, url_direct_child = _get_revision_contexts( revision['id'], context) revision['url'] = reverse('revision', kwargs={'sha1_git': revision['id']}) revision['history_url'] = reverse('revision-log', kwargs={'sha1_git': revision['id']}) if context: revision['history_context_url'] = reverse( 'revision-log', kwargs={'sha1_git': revision['id'], 'prev_sha1s': context}) if 'author' in revision: author = revision['author'] revision['author_url'] = reverse('person', kwargs={'person_id': author['id']}) if 'committer' in revision: committer = revision['committer'] revision['committer_url'] = \ reverse('person', kwargs={'person_id': committer['id']}) if 'directory' in revision: revision['directory_url'] = \ reverse('directory', kwargs={'sha1_git': revision['directory']}) if 'parents' in revision: parents = [] for parent in revision['parents']: parents.append({ 'id': parent, 'url': reverse('revision', kwargs={'sha1_git': parent}) }) revision['parents'] = parents if 'children' in revision: children = _make_child_url(revision['children'], context) if url_direct_child: children.append(url_direct_child) revision['children_urls'] = children else: if url_direct_child: revision['children_urls'] = [url_direct_child] if 'message_decoding_failed' in revision: revision['message_url'] = reverse('revision-raw-message', kwargs={'sha1_git': revision['id']}) return revision diff --git a/swh/web/common/converters.py b/swh/web/common/converters.py index a151fd00c..986193a9f 100644 --- a/swh/web/common/converters.py +++ b/swh/web/common/converters.py @@ -1,362 +1,390 @@ # Copyright (C) 2015-2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information import datetime import json from swh.model import hashutil from swh.core.utils import decode_with_escape -from swh.web.common import utils def _group_checksums(data): """Groups checksums values computed from hash functions used in swh and stored in data dict under a single entry 'checksums' """ if data: checksums = {} for hash in hashutil.ALGORITHMS: if hash in data and data[hash]: checksums[hash] = data[hash] del data[hash] if len(checksums) > 0: data['checksums'] = checksums +def fmap(f, data): + """Map f to data at each level. + + This must keep the origin data structure type: + - map -> map + - dict -> dict + - list -> list + - None -> None + + Args: + f: function that expects one argument. + data: data to traverse to apply the f function. + list, map, dict or bare value. + + Returns: + The same data-structure with modified values by the f function. + + """ + if data is None: + return data + if isinstance(data, map): + return map(lambda y: fmap(f, y), (x for x in data)) + if isinstance(data, list): + return [fmap(f, x) for x in data] + if isinstance(data, dict): + return {k: fmap(f, v) for (k, v) in data.items()} + return f(data) + + def from_swh(dict_swh, hashess={}, bytess={}, dates={}, blacklist={}, removables_if_empty={}, empty_dict={}, empty_list={}, convert={}, convert_fn=lambda x: x): """Convert from an swh dictionary to something reasonably json serializable. Args: dict_swh: the origin dictionary needed to be transformed hashess: list/set of keys representing hashes values (sha1, sha256, sha1_git, etc...) as bytes. Those need to be transformed in hexadecimal string bytess: list/set of keys representing bytes values which needs to be decoded blacklist: set of keys to filter out from the conversion convert: set of keys whose associated values need to be converted using convert_fn convert_fn: the conversion function to apply on the value of key in 'convert' The remaining keys are copied as is in the output. Returns: dictionary equivalent as dict_swh only with its keys converted. """ def convert_hashes_bytes(v): """v is supposedly a hash as bytes, returns it converted in hex. """ if isinstance(v, bytes): return hashutil.hash_to_hex(v) return v def convert_bytes(v): """v is supposedly a bytes string, decode as utf-8. FIXME: Improve decoding policy. If not utf-8, break! """ if isinstance(v, bytes): return v.decode('utf-8') return v def convert_date(v): """ Args: v (dict or datatime): either: - a dict with three keys: - timestamp (dict or integer timestamp) - offset - negative_utc - or, a datetime We convert it to a human-readable string """ if not v: return v if isinstance(v, datetime.datetime): return v.isoformat() tz = datetime.timezone(datetime.timedelta(minutes=v['offset'])) swh_timestamp = v['timestamp'] if isinstance(swh_timestamp, dict): date = datetime.datetime.fromtimestamp( swh_timestamp['seconds'], tz=tz) else: date = datetime.datetime.fromtimestamp( swh_timestamp, tz=tz) datestr = date.isoformat() if v['offset'] == 0 and v['negative_utc']: # remove the rightmost + and replace it with a - return '-'.join(datestr.rsplit('+', 1)) return datestr if not dict_swh: return dict_swh new_dict = {} for key, value in dict_swh.items(): if key in blacklist or (key in removables_if_empty and not value): continue if key in dates: new_dict[key] = convert_date(value) elif key in convert: new_dict[key] = convert_fn(value) elif isinstance(value, dict): new_dict[key] = from_swh(value, hashess=hashess, bytess=bytess, dates=dates, blacklist=blacklist, removables_if_empty=removables_if_empty, empty_dict=empty_dict, empty_list=empty_list, convert=convert, convert_fn=convert_fn) elif key in hashess: - new_dict[key] = utils.fmap(convert_hashes_bytes, value) + new_dict[key] = fmap(convert_hashes_bytes, value) elif key in bytess: try: - new_dict[key] = utils.fmap(convert_bytes, value) + new_dict[key] = fmap(convert_bytes, value) except UnicodeDecodeError: if 'decoding_failures' not in new_dict: new_dict['decoding_failures'] = [key] else: new_dict['decoding_failures'].append(key) - new_dict[key] = utils.fmap(decode_with_escape, value) + new_dict[key] = fmap(decode_with_escape, value) elif key in empty_dict and not value: new_dict[key] = {} elif key in empty_list and not value: new_dict[key] = [] else: new_dict[key] = value _group_checksums(new_dict) return new_dict def from_provenance(provenance): """Convert from a provenance information to a provenance dictionary. Args: provenance (dict): Dictionary with the following keys: - content (sha1_git): the content's identifier - revision (sha1_git): the revision the content was seen - origin (int): the origin the content was seen - visit (int): the visit it occurred - path (bytes): the path the content was seen at """ return from_swh(provenance, hashess={'content', 'revision'}, bytess={'path'}) def from_origin(origin): """Convert from an SWH origin to an origin dictionary. """ return from_swh(origin, removables_if_empty={'lister', 'project'}) def from_release(release): """Convert from an SWH release to a json serializable release dictionary. Args: release (dict): dictionary with keys: - id: identifier of the revision (sha1 in bytes) - revision: identifier of the revision the release points to (sha1 in bytes) comment: release's comment message (bytes) name: release's name (string) author: release's author identifier (swh's id) synthetic: the synthetic property (boolean) Returns: dict: Release dictionary with the following keys: - id: hexadecimal sha1 (string) - revision: hexadecimal sha1 (string) - comment: release's comment message (string) - name: release's name (string) - author: release's author identifier (swh's id) - synthetic: the synthetic property (boolean) """ return from_swh( release, hashess={'id', 'target'}, bytess={'message', 'name', 'fullname', 'email'}, dates={'date'}, ) class SWHMetadataEncoder(json.JSONEncoder): """Special json encoder for metadata field which can contain bytes encoded value. """ def default(self, obj): if isinstance(obj, bytes): return obj.decode('utf-8') # Let the base class default method raise the TypeError return json.JSONEncoder.default(self, obj) def convert_revision_metadata(metadata): """Convert json specific dict to a json serializable one. """ if not metadata: return {} return json.loads(json.dumps(metadata, cls=SWHMetadataEncoder)) def from_revision(revision): """Convert from an SWH revision to a json serializable revision dictionary. Args: revision (dict): dict with keys: - id: identifier of the revision (sha1 in bytes) - directory: identifier of the directory the revision points to (sha1 in bytes) - author_name, author_email: author's revision name and email - committer_name, committer_email: committer's revision name and email - message: revision's message - date, date_offset: revision's author date - committer_date, committer_date_offset: revision's commit date - parents: list of parents for such revision - synthetic: revision's property nature - type: revision's type (git, tar or dsc at the moment) - metadata: if the revision is synthetic, this can reference dynamic properties. Returns: dict: Revision dictionary with the same keys as inputs, except: - sha1s are in hexadecimal strings (id, directory) - bytes are decoded in string (author_name, committer_name, author_email, committer_email) Remaining keys are left as is """ revision = from_swh(revision, hashess={'id', 'directory', 'parents', 'children'}, bytess={'name', 'fullname', 'email'}, convert={'metadata'}, convert_fn=convert_revision_metadata, dates={'date', 'committer_date'}) if revision: if 'parents' in revision: revision['merge'] = len(revision['parents']) > 1 if 'message' in revision: try: revision['message'] = revision['message'].decode('utf-8') except UnicodeDecodeError: revision['message_decoding_failed'] = True revision['message'] = None return revision def from_content(content): """Convert swh content to serializable content dictionary. """ return from_swh(content, hashess={'sha1', 'sha1_git', 'sha256', 'blake2s256'}, blacklist={'ctime'}, convert={'status'}, convert_fn=lambda v: 'absent' if v == 'hidden' else v) def from_person(person): """Convert swh person to serializable person dictionary. """ return from_swh(person, bytess={'name', 'fullname', 'email'}) def from_origin_visit(visit): """Convert swh origin_visit to serializable origin_visit dictionary. """ ov = from_swh(visit, hashess={'target', 'snapshot'}, bytess={'branch'}, dates={'date'}, empty_dict={'metadata'}) # TODO: remove that piece of code once snapshot migration # is totally effective in storage (no more occurrences) if ov and 'occurrences' in ov: ov['occurrences'] = { decode_with_escape(k): v for k, v in ov['occurrences'].items() } return ov def from_snapshot(snapshot): """Convert swh snapshot to serializable snapshot dictionary. """ sv = from_swh(snapshot, hashess={'id', 'target'}) if sv and 'branches' in sv: sv['branches'] = { decode_with_escape(k): v for k, v in sv['branches'].items() } return sv def from_directory_entry(dir_entry): """Convert swh person to serializable person dictionary. """ return from_swh(dir_entry, hashess={'dir_id', 'sha1_git', 'sha1', 'sha256', 'blake2s256', 'target'}, bytess={'name'}, removables_if_empty={ 'sha1', 'sha1_git', 'sha256', 'blake2s256', 'status'}, convert={'status'}, convert_fn=lambda v: 'absent' if v == 'hidden' else v) def from_filetype(content_entry): """Convert swh person to serializable person dictionary. """ return from_swh(content_entry, hashess={'id'}, bytess={'mimetype', 'encoding'}) diff --git a/swh/web/common/utils.py b/swh/web/common/utils.py index c49770551..6dfca3251 100644 --- a/swh/web/common/utils.py +++ b/swh/web/common/utils.py @@ -1,376 +1,347 @@ # Copyright (C) 2017-2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information import docutils.parsers.rst import docutils.utils import re from datetime import datetime, timezone from dateutil import parser as date_parser from dateutil import tz from django.core.cache import cache from django.core import urlresolvers from django.http import QueryDict from swh.model.exceptions import ValidationError from swh.model.identifiers import ( persistent_identifier, parse_persistent_identifier, CONTENT, DIRECTORY, RELEASE, REVISION, SNAPSHOT ) from swh.web.common import service from swh.web.common.exc import BadInputExc def reverse(viewname, args=None, kwargs=None, query_params=None, current_app=None, urlconf=None): """An override of django reverse function supporting query parameters. Args: viewname: the name of the django view from which to compute a url args: list of url arguments ordered according to their position it kwargs: dictionary of url arguments indexed by their names query_params: dictionary of query parameters to append to the reversed url current_app: the name of the django app tighted to the view urlconf: url configuration module Returns: The url of the requested view with processed arguments and query parameters """ if kwargs: kwargs = {k: v for k, v in kwargs.items() if v is not None} url = urlresolvers.reverse( viewname, urlconf=urlconf, args=args, kwargs=kwargs, current_app=current_app) if query_params: query_params = {k: v for k, v in query_params.items() if v is not None} if query_params and len(query_params) > 0: query_dict = QueryDict('', mutable=True) for k in sorted(query_params.keys()): query_dict[k] = query_params[k] url += ('?' + query_dict.urlencode(safe='/;:')) return url -def fmap(f, data): - """Map f to data at each level. - - This must keep the origin data structure type: - - map -> map - - dict -> dict - - list -> list - - None -> None - - Args: - f: function that expects one argument. - data: data to traverse to apply the f function. - list, map, dict or bare value. - - Returns: - The same data-structure with modified values by the f function. - - """ - if data is None: - return data - if isinstance(data, map): - return map(lambda y: fmap(f, y), (x for x in data)) - if isinstance(data, list): - return [fmap(f, x) for x in data] - if isinstance(data, dict): - return {k: fmap(f, v) for (k, v) in data.items()} - return f(data) - - def datetime_to_utc(date): """Returns datetime in UTC without timezone info Args: date (datetime.datetime): input datetime with timezone info Returns: datetime.datime: datetime in UTC without timezone info """ if date.tzinfo: return date.astimezone(tz.gettz('UTC')).replace(tzinfo=timezone.utc) else: return date def parse_timestamp(timestamp): """Given a time or timestamp (as string), parse the result as UTC datetime. Returns: a timezone-aware datetime representing the parsed value. None if the parsing fails. Samples: - 2016-01-12 - 2016-01-12T09:19:12+0100 - Today is January 1, 2047 at 8:21:00AM - 1452591542 """ if not timestamp: return None try: date = date_parser.parse(timestamp, ignoretz=False, fuzzy=True) return datetime_to_utc(date) except Exception: try: return datetime.utcfromtimestamp(float(timestamp)).replace( tzinfo=timezone.utc) except (ValueError, OverflowError) as e: raise BadInputExc(e) def shorten_path(path): """Shorten the given path: for each hash present, only return the first 8 characters followed by an ellipsis""" sha256_re = r'([0-9a-f]{8})[0-9a-z]{56}' sha1_re = r'([0-9a-f]{8})[0-9a-f]{32}' ret = re.sub(sha256_re, r'\1...', path) return re.sub(sha1_re, r'\1...', ret) def format_utc_iso_date(iso_date, fmt='%d %B %Y, %H:%M UTC'): """Turns a string reprensation of an ISO 8601 date string to UTC and format it into a more human readable one. For instance, from the following input string: '2017-05-04T13:27:13+02:00' the following one is returned: '04 May 2017, 11:27 UTC'. Custom format string may also be provided as parameter Args: iso_date (str): a string representation of an ISO 8601 date fmt (str): optional date formatting string Returns: A formatted string representation of the input iso date """ if not iso_date: return iso_date date = parse_timestamp(iso_date) return date.strftime(fmt) def gen_path_info(path): """Function to generate path data navigation for use with a breadcrumb in the swh web ui. For instance, from a path /folder1/folder2/folder3, it returns the following list:: [{'name': 'folder1', 'path': 'folder1'}, {'name': 'folder2', 'path': 'folder1/folder2'}, {'name': 'folder3', 'path': 'folder1/folder2/folder3'}] Args: path: a filesystem path Returns: A list of path data for navigation as illustrated above. """ path_info = [] if path: sub_paths = path.strip('/').split('/') path_from_root = '' for p in sub_paths: path_from_root += '/' + p path_info.append({'name': p, 'path': path_from_root.strip('/')}) return path_info def get_origin_visits(origin_info): """Function that returns the list of visits for a swh origin. That list is put in cache in order to speedup the navigation in the swh web browse ui. Args: origin_id (int): the id of the swh origin to fetch visits from Returns: A list of dict describing the origin visits:: [{'date': , 'origin': , 'status': <'full' | 'partial'>, 'visit': }, ... ] Raises: NotFoundExc if the origin is not found """ cache_entry_id = 'origin_%s_visits' % origin_info['id'] cache_entry = cache.get(cache_entry_id) if cache_entry: return cache_entry origin_visits = [] per_page = service.MAX_LIMIT last_visit = None while 1: visits = list(service.lookup_origin_visits(origin_info['id'], last_visit=last_visit, per_page=per_page)) origin_visits += visits if len(visits) < per_page: break else: if not last_visit: last_visit = per_page else: last_visit += per_page def _visit_sort_key(visit): ts = parse_timestamp(visit['date']).timestamp() return ts + (float(visit['visit']) / 10e3) for v in origin_visits: if 'metadata' in v: del v['metadata'] origin_visits = [dict(t) for t in set([tuple(d.items()) for d in origin_visits])] origin_visits = sorted(origin_visits, key=lambda v: _visit_sort_key(v)) cache.set(cache_entry_id, origin_visits) return origin_visits def get_swh_persistent_id(object_type, object_id, scheme_version=1): """ Returns the persistent identifier for a swh object based on: * the object type * the object id * the swh identifiers scheme version Args: object_type (str): the swh object type (content/directory/release/revision/snapshot) object_id (str): the swh object id (hexadecimal representation of its hash value) scheme_version (int): the scheme version of the swh persistent identifiers Returns: str: the swh object persistent identifier Raises: BadInputExc if the provided parameters do not enable to generate a valid identifier """ try: swh_id = persistent_identifier(object_type, object_id, scheme_version) except ValidationError as e: raise BadInputExc('Invalid object (%s) for swh persistent id. %s' % (object_id, e)) else: return swh_id def resolve_swh_persistent_id(swh_id, query_params=None): """ Try to resolve a SWH persistent id into an url for browsing the pointed object. Args: swh_id (str): a SWH persistent identifier query_params (django.http.QueryDict): optional dict filled with query parameters to append to the browse url Returns: dict: a dict with the following keys: * **swh_id_parsed (swh.model.identifiers.PersistentId)**: the parsed identifier * **browse_url (str)**: the url for browsing the pointed object Raises: BadInputExc: if the provided identifier can not be parsed """ # noqa try: swh_id_parsed = parse_persistent_identifier(swh_id) object_type = swh_id_parsed.object_type object_id = swh_id_parsed.object_id browse_url = None query_dict = QueryDict('', mutable=True) if query_params and len(query_params) > 0: for k in sorted(query_params.keys()): query_dict[k] = query_params[k] if 'origin' in swh_id_parsed.metadata: query_dict['origin'] = swh_id_parsed.metadata['origin'] if object_type == CONTENT: query_string = 'sha1_git:' + object_id fragment = '' if 'lines' in swh_id_parsed.metadata: lines = swh_id_parsed.metadata['lines'].split('-') fragment += '#L' + lines[0] if len(lines) > 1: fragment += '-L' + lines[1] browse_url = reverse('browse-content', kwargs={'query_string': query_string}, query_params=query_dict) + fragment elif object_type == DIRECTORY: browse_url = reverse('browse-directory', kwargs={'sha1_git': object_id}, query_params=query_dict) elif object_type == RELEASE: browse_url = reverse('browse-release', kwargs={'sha1_git': object_id}, query_params=query_dict) elif object_type == REVISION: browse_url = reverse('browse-revision', kwargs={'sha1_git': object_id}, query_params=query_dict) elif object_type == SNAPSHOT: browse_url = reverse('browse-snapshot', kwargs={'snapshot_id': object_id}, query_params=query_dict) except ValidationError as ve: raise BadInputExc('Error when parsing identifier. %s' % ' '.join(ve.messages)) else: return {'swh_id_parsed': swh_id_parsed, 'browse_url': browse_url} def parse_rst(text, report_level=2): """ Parse a reStructuredText string with docutils. Args: text (str): string with reStructuredText markups in it report_level (int): level of docutils report messages to print (1 info 2 warning 3 error 4 severe 5 none) Returns: docutils.nodes.document: a parsed docutils document """ parser = docutils.parsers.rst.Parser() components = (docutils.parsers.rst.Parser,) settings = docutils.frontend.OptionParser( components=components).get_default_values() settings.report_level = report_level document = docutils.utils.new_document('rst-doc', settings=settings) parser.parse(text, document) return document diff --git a/swh/web/tests/api/test_utils.py b/swh/web/tests/api/test_utils.py index 5f96b83db..ce816a8a4 100644 --- a/swh/web/tests/api/test_utils.py +++ b/swh/web/tests/api/test_utils.py @@ -1,880 +1,743 @@ # Copyright (C) 2015-2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information import unittest from unittest.mock import patch, call from nose.tools import istest, nottest from swh.web.api import utils class UtilsTestCase(unittest.TestCase): def setUp(self): self.maxDiff = None self.url_map = [dict(rule='/other/', methods=set(['GET', 'POST', 'HEAD']), endpoint='foo'), dict(rule='/some/old/url/', methods=set(['GET', 'POST']), endpoint='blablafn'), dict(rule='/other/old/url/', methods=set(['GET', 'HEAD']), endpoint='bar'), dict(rule='/other', methods=set([]), endpoint=None), dict(rule='/other2', methods=set([]), endpoint=None)] self.sample_content_hashes = { 'blake2s256': ('791e07fcea240ade6dccd0a9309141673' 'c31242cae9c237cf3855e151abc78e9'), 'sha1': 'dc2830a9e72f23c1dfebef4413003221baa5fb62', 'sha1_git': 'fe95a46679d128ff167b7c55df5d02356c5a1ae1', 'sha256': ('b5c7fe0536f44ef60c8780b6065d30bca74a5cd06' 'd78a4a71ba1ad064770f0c9') } - @istest - def filter_endpoints_1(self): - # when - actual_data = utils.filter_endpoints(self.url_map, '/some') - - # then - self.assertEquals(actual_data, { - '/some/old/url/': { - 'methods': ['GET', 'POST'], - 'endpoint': 'blablafn' - } - }) - - @istest - def filter_endpoints_2(self): - # when - actual_data = utils.filter_endpoints(self.url_map, '/other', - blacklist=['/other2']) - - # then - # rules /other is skipped because its' exactly the prefix url - # rules /other2 is skipped because it's blacklisted - self.assertEquals(actual_data, { - '/other/': { - 'methods': ['GET', 'HEAD', 'POST'], - 'endpoint': 'foo' - }, - '/other/old/url/': { - 'methods': ['GET', 'HEAD'], - 'endpoint': 'bar' - } - }) - - @istest - def prepare_data_for_view_default_encoding(self): - self.maxDiff = None - # given - inputs = [ - { - 'data': b'some blah data' - }, - { - 'data': 1, - 'data_url': '/api/1/some/api/call', - }, - { - 'blah': 'foobar', - 'blah_url': '/some/non/changed/api/call' - }] - - # when - actual_result = utils.prepare_data_for_view(inputs) - - # then - self.assertEquals(actual_result, [ - { - 'data': 'some blah data', - }, - { - 'data': 1, - 'data_url': '/browse/some/api/call', - }, - { - 'blah': 'foobar', - 'blah_url': '/some/non/changed/api/call' - } - ]) - - @istest - def prepare_data_for_view(self): - self.maxDiff = None - # given - inputs = [ - { - 'data': b'some blah data' - }, - { - 'data': 1, - 'data_url': '/api/1/some/api/call', - }, - { - 'blah': 'foobar', - 'blah_url': '/some/non/changed/api/call' - }] - - # when - actual_result = utils.prepare_data_for_view(inputs, encoding='ascii') - - # then - self.assertEquals(actual_result, [ - { - 'data': 'some blah data', - }, - { - 'data': 1, - 'data_url': '/browse/some/api/call', - }, - { - 'blah': 'foobar', - 'blah_url': '/some/non/changed/api/call' - } - ]) - - @istest - def prepare_data_for_view_ko_cannot_decode(self): - self.maxDiff = None - # given - inputs = { - 'data': 'hé dude!'.encode('utf8'), - } - - actual_result = utils.prepare_data_for_view(inputs, encoding='ascii') - - # then - self.assertEquals(actual_result, { - 'data': "Cannot decode the data bytes, try and set another " - "encoding in the url (e.g. ?encoding=utf8) or " - "download directly the " - "content's raw data.", - }) - @istest def filter_field_keys_dict_unknown_keys(self): # when actual_res = utils.filter_field_keys( {'directory': 1, 'file': 2, 'link': 3}, {'directory1', 'file2'}) # then self.assertEqual(actual_res, {}) @istest def filter_field_keys_dict(self): # when actual_res = utils.filter_field_keys( {'directory': 1, 'file': 2, 'link': 3}, {'directory', 'link'}) # then self.assertEqual(actual_res, {'directory': 1, 'link': 3}) @istest def filter_field_keys_list_unknown_keys(self): # when actual_res = utils.filter_field_keys( [{'directory': 1, 'file': 2, 'link': 3}, {'1': 1, '2': 2, 'link': 3}], {'d'}) # then self.assertEqual(actual_res, [{}, {}]) @istest def filter_field_keys_map(self): # when actual_res = utils.filter_field_keys( map(lambda x: {'i': x['i']+1, 'j': x['j']}, [{'i': 1, 'j': None}, {'i': 2, 'j': None}, {'i': 3, 'j': None}]), {'i'}) # then self.assertEqual(list(actual_res), [{'i': 2}, {'i': 3}, {'i': 4}]) @istest def filter_field_keys_list(self): # when actual_res = utils.filter_field_keys( [{'directory': 1, 'file': 2, 'link': 3}, {'dir': 1, 'fil': 2, 'lin': 3}], {'directory', 'dir'}) # then self.assertEqual(actual_res, [{'directory': 1}, {'dir': 1}]) @istest def filter_field_keys_other(self): # given input_set = {1, 2} # when actual_res = utils.filter_field_keys(input_set, {'a', '1'}) # then self.assertEqual(actual_res, input_set) - @istest - def fmap(self): - self.assertEquals([2, 3, None, 4], - utils.fmap(lambda x: x+1, [1, 2, None, 3])) - self.assertEquals([11, 12, 13], - list(utils.fmap(lambda x: x+10, - map(lambda x: x, [1, 2, 3])))) - self.assertEquals({'a': 2, 'b': 4}, - utils.fmap(lambda x: x*2, {'a': 1, 'b': 2})) - self.assertEquals(100, - utils.fmap(lambda x: x*10, 10)) - self.assertEquals({'a': [2, 6], 'b': 4}, - utils.fmap(lambda x: x*2, {'a': [1, 3], 'b': 2})) - - self.assertIsNone(utils.fmap(lambda x: x, None)) - @istest def person_to_string(self): self.assertEqual(utils.person_to_string(dict(name='raboof', email='foo@bar')), 'raboof ') @istest def enrich_release_0(self): # when actual_release = utils.enrich_release({}) # then self.assertEqual(actual_release, {}) @patch('swh.web.api.utils.reverse') @istest def enrich_release_1(self, mock_django_reverse): # given def reverse_test_context(view_name, kwargs): if view_name == 'content': id = kwargs['q'] return '/api/1/content/%s/' % id elif view_name == 'person': id = kwargs['person_id'] return '/api/1/person/%s/' % id else: raise ValueError( 'This should not happened so fail if it does.') mock_django_reverse.side_effect = reverse_test_context # when actual_release = utils.enrich_release({ 'target': '123', 'target_type': 'content', 'author': { 'id': 100, 'name': 'author release name', 'email': 'author@email', }, }) # then self.assertEqual(actual_release, { 'target': '123', 'target_type': 'content', 'target_url': '/api/1/content/sha1_git:123/', 'author_url': '/api/1/person/100/', 'author': { 'id': 100, 'name': 'author release name', 'email': 'author@email', }, }) mock_django_reverse.assert_has_calls([ call('content', kwargs={'q': 'sha1_git:123'}), call('person', kwargs={'person_id': 100}) ]) @patch('swh.web.api.utils.reverse') @istest def enrich_release_2(self, mock_django_reverse): # given mock_django_reverse.return_value = '/api/1/dir/23/' # when actual_release = utils.enrich_release({'target': '23', 'target_type': 'directory'}) # then self.assertEqual(actual_release, { 'target': '23', 'target_type': 'directory', 'target_url': '/api/1/dir/23/' }) mock_django_reverse.assert_called_once_with('directory', kwargs={'sha1_git': '23'}) @patch('swh.web.api.utils.reverse') @istest def enrich_release_3(self, mock_django_reverse): # given mock_django_reverse.return_value = '/api/1/rev/3/' # when actual_release = utils.enrich_release({'target': '3', 'target_type': 'revision'}) # then self.assertEqual(actual_release, { 'target': '3', 'target_type': 'revision', 'target_url': '/api/1/rev/3/' }) mock_django_reverse.assert_called_once_with('revision', kwargs={'sha1_git': '3'}) @patch('swh.web.api.utils.reverse') @istest def enrich_release_4(self, mock_django_reverse): # given mock_django_reverse.return_value = '/api/1/rev/4/' # when actual_release = utils.enrich_release({'target': '4', 'target_type': 'release'}) # then self.assertEqual(actual_release, { 'target': '4', 'target_type': 'release', 'target_url': '/api/1/rev/4/' }) mock_django_reverse.assert_called_once_with('release', kwargs={'sha1_git': '4'}) @patch('swh.web.api.utils.reverse') @istest def enrich_directory_no_type(self, mock_django_reverse): # when/then self.assertEqual(utils.enrich_directory({'id': 'dir-id'}), {'id': 'dir-id'}) # given mock_django_reverse.return_value = '/api/content/sha1_git:123/' # when actual_directory = utils.enrich_directory({ 'id': 'dir-id', 'type': 'file', 'target': '123', }) # then self.assertEqual(actual_directory, { 'id': 'dir-id', 'type': 'file', 'target': '123', 'target_url': '/api/content/sha1_git:123/', }) mock_django_reverse.assert_called_once_with( 'content', kwargs={'q': 'sha1_git:123'}) @patch('swh.web.api.utils.reverse') @istest def enrich_directory_with_context_and_type_file(self, mock_django_reverse): # given mock_django_reverse.return_value = '/api/content/sha1_git:123/' # when actual_directory = utils.enrich_directory({ 'id': 'dir-id', 'type': 'file', 'name': 'hy', 'target': '789', }, context_url='/api/revision/revsha1/directory/prefix/path/') # then self.assertEqual(actual_directory, { 'id': 'dir-id', 'type': 'file', 'name': 'hy', 'target': '789', 'target_url': '/api/content/sha1_git:123/', 'file_url': '/api/revision/revsha1/directory' '/prefix/path/hy/' }) mock_django_reverse.assert_called_once_with( 'content', kwargs={'q': 'sha1_git:789'}) @patch('swh.web.api.utils.reverse') @istest def enrich_directory_with_context_and_type_dir(self, mock_django_reverse): # given mock_django_reverse.return_value = '/api/directory/456/' # when actual_directory = utils.enrich_directory({ 'id': 'dir-id', 'type': 'dir', 'name': 'emacs-42', 'target_type': 'file', 'target': '456', }, context_url='/api/revision/origin/2/directory/some/prefix/path/') # then self.assertEqual(actual_directory, { 'id': 'dir-id', 'type': 'dir', 'target_type': 'file', 'name': 'emacs-42', 'target': '456', 'target_url': '/api/directory/456/', 'dir_url': '/api/revision/origin/2/directory' '/some/prefix/path/emacs-42/' }) mock_django_reverse.assert_called_once_with('directory', kwargs={'sha1_git': '456'}) @istest def enrich_content_without_hashes(self): # when/then self.assertEqual(utils.enrich_content({'id': '123'}), {'id': '123'}) @patch('swh.web.api.utils.reverse') @istest def enrich_content_with_hashes(self, mock_django_reverse): for algo, hash in self.sample_content_hashes.items(): query_string = '%s:%s' % (algo, hash) # given mock_django_reverse.side_effect = [ '/api/content/%s/raw/' % query_string, '/api/filetype/%s/' % query_string, '/api/language/%s/' % query_string, '/api/license/%s/' % query_string ] # when enriched_content = utils.enrich_content( { algo: hash, }, query_string=query_string ) # then self.assertEqual( enriched_content, { algo: hash, 'data_url': '/api/content/%s/raw/' % query_string, 'filetype_url': '/api/filetype/%s/' % query_string, 'language_url': '/api/language/%s/' % query_string, 'license_url': '/api/license/%s/' % query_string, } ) mock_django_reverse.assert_has_calls([ call('content-raw', kwargs={'q': query_string}), call('content-filetype', kwargs={'q': query_string}), call('content-language', kwargs={'q': query_string}), call('content-license', kwargs={'q': query_string}), ]) mock_django_reverse.reset() @patch('swh.web.api.utils.reverse') @istest def enrich_content_with_hashes_and_top_level_url(self, mock_django_reverse): for algo, hash in self.sample_content_hashes.items(): query_string = '%s:%s' % (algo, hash) # given mock_django_reverse.side_effect = [ '/api/content/%s/' % query_string, '/api/content/%s/raw/' % query_string, '/api/filetype/%s/' % query_string, '/api/language/%s/' % query_string, '/api/license/%s/' % query_string, ] # when enriched_content = utils.enrich_content( { algo: hash }, top_url=True, query_string=query_string ) # then self.assertEqual( enriched_content, { algo: hash, 'content_url': '/api/content/%s/' % query_string, 'data_url': '/api/content/%s/raw/' % query_string, 'filetype_url': '/api/filetype/%s/' % query_string, 'language_url': '/api/language/%s/' % query_string, 'license_url': '/api/license/%s/' % query_string, } ) mock_django_reverse.assert_has_calls([ call('content', kwargs={'q': query_string}), call('content-raw', kwargs={'q': query_string}), call('content-filetype', kwargs={'q': query_string}), call('content-language', kwargs={'q': query_string}), call('content-license', kwargs={'q': query_string}), ]) mock_django_reverse.reset() @istest def enrich_entity_identity(self): # when/then self.assertEqual(utils.enrich_content({'id': '123'}), {'id': '123'}) @patch('swh.web.api.utils.reverse') @istest def enrich_entity_with_sha1(self, mock_django_reverse): # given def reverse_test(view_name, kwargs): return '/api/entity/' + kwargs['uuid'] + '/' mock_django_reverse.side_effect = reverse_test # when actual_entity = utils.enrich_entity({ 'uuid': 'uuid-1', 'parent': 'uuid-parent', 'name': 'something' }) # then self.assertEqual(actual_entity, { 'uuid': 'uuid-1', 'uuid_url': '/api/entity/uuid-1/', 'parent': 'uuid-parent', 'parent_url': '/api/entity/uuid-parent/', 'name': 'something', }) mock_django_reverse.assert_has_calls( [call('entity', kwargs={'uuid': 'uuid-1'}), call('entity', kwargs={'uuid': 'uuid-parent'})]) @nottest def _reverse_context_test(self, view_name, kwargs): if view_name == 'revision': return '/api/revision/%s/' % kwargs['sha1_git'] elif view_name == 'revision-context': return '/api/revision/%s/prev/%s/' % (kwargs['sha1_git'], kwargs['context']) # noqa elif view_name == 'revision-log': if 'prev_sha1s' in kwargs: return '/api/revision/%s/prev/%s/log/' % (kwargs['sha1_git'], kwargs['prev_sha1s']) # noqa else: return '/api/revision/%s/log/' % kwargs['sha1_git'] @patch('swh.web.api.utils.reverse') @istest def enrich_revision_without_children_or_parent(self, mock_django_reverse): # given def reverse_test(view_name, kwargs): if view_name == 'revision': return '/api/revision/' + kwargs['sha1_git'] + '/' elif view_name == 'revision-log': return '/api/revision/' + kwargs['sha1_git'] + '/log/' elif view_name == 'directory': return '/api/directory/' + kwargs['sha1_git'] + '/' elif view_name == 'person': return '/api/person/' + kwargs['person_id'] + '/' mock_django_reverse.side_effect = reverse_test # when actual_revision = utils.enrich_revision({ 'id': 'rev-id', 'directory': '123', 'author': {'id': '1'}, 'committer': {'id': '2'}, }) expected_revision = { 'id': 'rev-id', 'directory': '123', 'url': '/api/revision/rev-id/', 'history_url': '/api/revision/rev-id/log/', 'directory_url': '/api/directory/123/', 'author': {'id': '1'}, 'author_url': '/api/person/1/', 'committer': {'id': '2'}, 'committer_url': '/api/person/2/' } # then self.assertEqual(actual_revision, expected_revision) mock_django_reverse.assert_has_calls( [call('revision', kwargs={'sha1_git': 'rev-id'}), call('revision-log', kwargs={'sha1_git': 'rev-id'}), call('person', kwargs={'person_id': '1'}), call('person', kwargs={'person_id': '2'}), call('directory', kwargs={'sha1_git': '123'})]) @patch('swh.web.api.utils.reverse') @istest def enrich_revision_with_children_and_parent_no_dir(self, mock_django_reverse): # given mock_django_reverse.side_effect = self._reverse_context_test # when actual_revision = utils.enrich_revision({ 'id': 'rev-id', 'parents': ['123'], 'children': ['456'], }, context='prev-rev') expected_revision = { 'id': 'rev-id', 'url': '/api/revision/rev-id/', 'history_url': '/api/revision/rev-id/log/', 'history_context_url': '/api/revision/rev-id/prev/prev-rev/log/', 'parents': [{'id': '123', 'url': '/api/revision/123/'}], 'children': ['456'], 'children_urls': ['/api/revision/456/', '/api/revision/prev-rev/'], } # then self.assertEqual(actual_revision, expected_revision) mock_django_reverse.assert_has_calls( [call('revision', kwargs={'sha1_git': 'prev-rev'}), call('revision', kwargs={'sha1_git': 'rev-id'}), call('revision-log', kwargs={'sha1_git': 'rev-id'}), call('revision-log', kwargs={'sha1_git': 'rev-id', 'prev_sha1s': 'prev-rev'}), call('revision', kwargs={'sha1_git': '123'}), call('revision', kwargs={'sha1_git': '456'})]) @patch('swh.web.api.utils.reverse') @istest def enrich_revision_no_context(self, mock_django_reverse): # given mock_django_reverse.side_effect = self._reverse_context_test # when actual_revision = utils.enrich_revision({ 'id': 'rev-id', 'parents': ['123'], 'children': ['456'], }) expected_revision = { 'id': 'rev-id', 'url': '/api/revision/rev-id/', 'history_url': '/api/revision/rev-id/log/', 'parents': [{'id': '123', 'url': '/api/revision/123/'}], 'children': ['456'], 'children_urls': ['/api/revision/456/'] } # then self.assertEqual(actual_revision, expected_revision) mock_django_reverse.assert_has_calls( [call('revision', kwargs={'sha1_git': 'rev-id'}), call('revision-log', kwargs={'sha1_git': 'rev-id'}), call('revision', kwargs={'sha1_git': '123'}), call('revision', kwargs={'sha1_git': '456'})]) @patch('swh.web.api.utils.reverse') @istest def enrich_revision_context_empty_prev_list(self, mock_django_reverse): # given mock_django_reverse.side_effect = self._reverse_context_test # when expected_revision = { 'id': 'rev-id', 'url': '/api/revision/rev-id/', 'history_url': '/api/revision/rev-id/log/', 'history_context_url': ('/api/revision/rev-id/' 'prev/prev-rev/log/'), 'parents': [{'id': '123', 'url': '/api/revision/123/'}], 'children': ['456'], 'children_urls': ['/api/revision/456/', '/api/revision/prev-rev/'], } actual_revision = utils.enrich_revision({ 'id': 'rev-id', 'url': '/api/revision/rev-id/', 'parents': ['123'], 'children': ['456']}, context='prev-rev') # then self.assertEqual(actual_revision, expected_revision) mock_django_reverse.assert_has_calls( [call('revision', kwargs={'sha1_git': 'prev-rev'}), call('revision', kwargs={'sha1_git': 'rev-id'}), call('revision-log', kwargs={'sha1_git': 'rev-id'}), call('revision-log', kwargs={'sha1_git': 'rev-id', 'prev_sha1s': 'prev-rev'}), call('revision', kwargs={'sha1_git': '123'}), call('revision', kwargs={'sha1_git': '456'})]) @patch('swh.web.api.utils.reverse') @istest def enrich_revision_context_some_prev_list(self, mock_django_reverse): # given mock_django_reverse.side_effect = self._reverse_context_test # when expected_revision = { 'id': 'rev-id', 'url': '/api/revision/rev-id/', 'history_url': '/api/revision/rev-id/log/', 'history_context_url': ('/api/revision/rev-id/' 'prev/prev1-rev/prev0-rev/log/'), 'parents': [{'id': '123', 'url': '/api/revision/123/'}], 'children': ['456'], 'children_urls': ['/api/revision/456/', '/api/revision/prev0-rev/prev/prev1-rev/'], } actual_revision = utils.enrich_revision({ 'id': 'rev-id', 'parents': ['123'], 'children': ['456']}, context='prev1-rev/prev0-rev') # then self.assertEqual(actual_revision, expected_revision) mock_django_reverse.assert_has_calls( [call('revision-context', kwargs={'context': 'prev1-rev', 'sha1_git': 'prev0-rev'}), call('revision', kwargs={'sha1_git': 'rev-id'}), call('revision-log', kwargs={'sha1_git': 'rev-id'}), call('revision-log', kwargs={'prev_sha1s': 'prev1-rev/prev0-rev', 'sha1_git': 'rev-id'}), call('revision', kwargs={'sha1_git': '123'}), call('revision', kwargs={'sha1_git': '456'})]) @nottest def _reverse_rev_message_test(self, view_name, kwargs): if view_name == 'revision': return '/api/revision/%s/' % kwargs['sha1_git'] elif view_name == 'revision-log': if 'prev_sha1s' in kwargs and kwargs['prev_sha1s'] is not None: return '/api/revision/%s/prev/%s/log/' % (kwargs['sha1_git'], kwargs['prev_sha1s']) # noqa else: return '/api/revision/%s/log/' % kwargs['sha1_git'] elif view_name == 'revision-raw-message': return '/api/revision/' + kwargs['sha1_git'] + '/raw/' else: return '/api/revision/%s/prev/%s/' % (kwargs['sha1_git'], kwargs['context']) # noqa @patch('swh.web.api.utils.reverse') @istest def enrich_revision_with_no_message(self, mock_django_reverse): # given mock_django_reverse.side_effect = self._reverse_rev_message_test # when expected_revision = { 'id': 'rev-id', 'url': '/api/revision/rev-id/', 'history_url': '/api/revision/rev-id/log/', 'history_context_url': ('/api/revision/rev-id/' 'prev/prev-rev/log/'), 'message': None, 'parents': [{'id': '123', 'url': '/api/revision/123/'}], 'children': ['456'], 'children_urls': ['/api/revision/456/', '/api/revision/prev-rev/'], } actual_revision = utils.enrich_revision({ 'id': 'rev-id', 'message': None, 'parents': ['123'], 'children': ['456'], }, context='prev-rev') # then self.assertEqual(actual_revision, expected_revision) mock_django_reverse.assert_has_calls( [call('revision', kwargs={'sha1_git': 'prev-rev'}), call('revision', kwargs={'sha1_git': 'rev-id'}), call('revision-log', kwargs={'sha1_git': 'rev-id'}), call('revision-log', kwargs={'sha1_git': 'rev-id', 'prev_sha1s': 'prev-rev'}), call('revision', kwargs={'sha1_git': '123'}), call('revision', kwargs={'sha1_git': '456'})] ) @patch('swh.web.api.utils.reverse') @istest def enrich_revision_with_invalid_message(self, mock_django_reverse): # given mock_django_reverse.side_effect = self._reverse_rev_message_test # when actual_revision = utils.enrich_revision({ 'id': 'rev-id', 'message': None, 'message_decoding_failed': True, 'parents': ['123'], 'children': ['456'], }, context='prev-rev') expected_revision = { 'id': 'rev-id', 'url': '/api/revision/rev-id/', 'history_url': '/api/revision/rev-id/log/', 'history_context_url': ('/api/revision/rev-id/' 'prev/prev-rev/log/'), 'message': None, 'message_decoding_failed': True, 'message_url': '/api/revision/rev-id/raw/', 'parents': [{'id': '123', 'url': '/api/revision/123/'}], 'children': ['456'], 'children_urls': ['/api/revision/456/', '/api/revision/prev-rev/'], } # then self.assertEqual(actual_revision, expected_revision) mock_django_reverse.assert_has_calls( [call('revision', kwargs={'sha1_git': 'prev-rev'}), call('revision', kwargs={'sha1_git': 'rev-id'}), call('revision-log', kwargs={'sha1_git': 'rev-id'}), call('revision-log', kwargs={'sha1_git': 'rev-id', 'prev_sha1s': 'prev-rev'}), call('revision', kwargs={'sha1_git': '123'}), call('revision', kwargs={'sha1_git': '456'}), call('revision-raw-message', kwargs={'sha1_git': 'rev-id'})]) diff --git a/swh/web/tests/common/test_converters.py b/swh/web/tests/common/test_converters.py index f33e51027..cc4afdd33 100644 --- a/swh/web/tests/common/test_converters.py +++ b/swh/web/tests/common/test_converters.py @@ -1,791 +1,808 @@ # Copyright (C) 2015-2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information import datetime import unittest from nose.tools import istest from swh.model import hashutil from swh.web.common import converters class ConvertersTestCase(unittest.TestCase): + + @istest + def fmap(self): + self.assertEquals([2, 3, None, 4], + converters.fmap(lambda x: x+1, [1, 2, None, 3])) + self.assertEquals([11, 12, 13], + list(converters.fmap(lambda x: x+10, + map(lambda x: x, [1, 2, 3])))) + self.assertEquals({'a': 2, 'b': 4}, + converters.fmap(lambda x: x*2, {'a': 1, 'b': 2})) + self.assertEquals(100, + converters.fmap(lambda x: x*10, 10)) + self.assertEquals({'a': [2, 6], 'b': 4}, + converters.fmap(lambda x: x*2, {'a': [1, 3], 'b': 2})) # noqa + + self.assertIsNone(converters.fmap(lambda x: x, None)) + @istest def from_swh(self): some_input = { 'a': 'something', 'b': 'someone', 'c': b'sharp-0.3.4.tgz', 'd': hashutil.hash_to_bytes( 'b04caf10e9535160d90e874b45aa426de762f19f'), 'e': b'sharp.html/doc_002dS_005fISREG.html', 'g': [b'utf-8-to-decode', b'another-one'], 'h': 'something filtered', 'i': {'e': b'something'}, 'j': { 'k': { 'l': [b'bytes thing', b'another thingy', b''], 'n': 'dont care either' }, 'm': 'dont care' }, 'o': 'something', 'p': b'foo', 'q': {'extra-headers': [['a', b'intact']]}, 'w': None, 'r': {'p': 'also intact', 'q': 'bar'}, 's': { 'timestamp': 42, 'offset': -420, 'negative_utc': None, }, 's1': { 'timestamp': {'seconds': 42, 'microseconds': 0}, 'offset': -420, 'negative_utc': None, }, 's2': datetime.datetime( 2013, 7, 1, 20, 0, 0, tzinfo=datetime.timezone.utc), 't': None, 'u': None, 'v': None, 'x': None, } expected_output = { 'a': 'something', 'b': 'someone', 'c': 'sharp-0.3.4.tgz', 'd': 'b04caf10e9535160d90e874b45aa426de762f19f', 'e': 'sharp.html/doc_002dS_005fISREG.html', 'g': ['utf-8-to-decode', 'another-one'], 'i': {'e': 'something'}, 'j': { 'k': { 'l': ['bytes thing', 'another thingy', ''] } }, 'p': 'foo', 'q': {'extra-headers': [['a', 'intact']]}, 'w': {}, 'r': {'p': 'also intact', 'q': 'bar'}, 's': '1969-12-31T17:00:42-07:00', 's1': '1969-12-31T17:00:42-07:00', 's2': '2013-07-01T20:00:00+00:00', 'u': {}, 'v': [], 'x': None, } actual_output = converters.from_swh( some_input, hashess={'d', 'o', 'x'}, bytess={'c', 'e', 'g', 'l'}, dates={'s', 's1', 's2'}, blacklist={'h', 'm', 'n', 'o'}, removables_if_empty={'t'}, empty_dict={'u'}, empty_list={'v'}, convert={'p', 'q', 'w'}, convert_fn=converters.convert_revision_metadata) self.assertEquals(expected_output, actual_output) @istest def from_swh_edge_cases_do_no_conversion_if_none_or_not_bytes(self): some_input = { 'a': 'something', 'b': None, 'c': 'someone', 'd': None, 'e': None } expected_output = { 'a': 'something', 'b': None, 'c': 'someone', 'd': None, 'e': None } actual_output = converters.from_swh(some_input, hashess={'a', 'b'}, bytess={'c', 'd'}, dates={'e'}) self.assertEquals(expected_output, actual_output) @istest def from_swh_edge_cases_convert_invalid_utf8_bytes(self): some_input = { 'a': 'something', 'b': 'someone', 'c': b'a name \xff', 'd': b'an email \xff', } expected_output = { 'a': 'something', 'b': 'someone', 'c': 'a name \\xff', 'd': 'an email \\xff', 'decoding_failures': ['c', 'd'] } actual_output = converters.from_swh(some_input, hashess={'a', 'b'}, bytess={'c', 'd'}) for v in ['a', 'b', 'c', 'd']: self.assertEqual(expected_output[v], actual_output[v]) self.assertEqual(len(expected_output['decoding_failures']), len(actual_output['decoding_failures'])) for v in expected_output['decoding_failures']: self.assertTrue(v in actual_output['decoding_failures']) @istest def from_swh_empty(self): # when self.assertEquals({}, converters.from_swh({})) @istest def from_swh_none(self): # when self.assertIsNone(converters.from_swh(None)) @istest def from_provenance(self): # given input_provenance = { 'origin': 10, 'visit': 1, 'content': hashutil.hash_to_bytes( '321caf10e9535160d90e874b45aa426de762f19f'), 'revision': hashutil.hash_to_bytes( '123caf10e9535160d90e874b45aa426de762f19f'), 'path': b'octave-3.4.0/doc/interpreter/octave/doc_002dS_005fISREG' } expected_provenance = { 'origin': 10, 'visit': 1, 'content': '321caf10e9535160d90e874b45aa426de762f19f', 'revision': '123caf10e9535160d90e874b45aa426de762f19f', 'path': 'octave-3.4.0/doc/interpreter/octave/doc_002dS_005fISREG' } # when actual_provenance = converters.from_provenance(input_provenance) # then self.assertEqual(actual_provenance, expected_provenance) @istest def from_origin(self): # given origin_input = { 'id': 9, 'type': 'ftp', 'url': 'rsync://ftp.gnu.org/gnu/octave', 'project': None, 'lister': None, } expected_origin = { 'id': 9, 'type': 'ftp', 'url': 'rsync://ftp.gnu.org/gnu/octave', } # when actual_origin = converters.from_origin(origin_input) # then self.assertEqual(actual_origin, expected_origin) @istest def from_origin_visit(self): snap_hash = 'b5f0b7f716735ebffe38505c60145c4fd9da6ca3' for snap in [snap_hash, None]: # given visit = { 'date': { 'timestamp': datetime.datetime( 2015, 1, 1, 22, 0, 0, tzinfo=datetime.timezone.utc).timestamp(), 'offset': 0, 'negative_utc': False, }, 'origin': 10, 'visit': 100, 'metadata': None, 'status': 'full', 'snapshot': hashutil.hash_to_bytes(snap) if snap else snap, } expected_visit = { 'date': '2015-01-01T22:00:00+00:00', 'origin': 10, 'visit': 100, 'metadata': {}, 'status': 'full', 'snapshot': snap_hash if snap else snap } # when actual_visit = converters.from_origin_visit(visit) # then self.assertEqual(actual_visit, expected_visit) @istest def from_release(self): release_input = { 'id': hashutil.hash_to_bytes( 'aad23fa492a0c5fed0708a6703be875448c86884'), 'target': hashutil.hash_to_bytes( '5e46d564378afc44b31bb89f99d5675195fbdf67'), 'target_type': 'revision', 'date': { 'timestamp': datetime.datetime( 2015, 1, 1, 22, 0, 0, tzinfo=datetime.timezone.utc).timestamp(), 'offset': 0, 'negative_utc': False, }, 'author': { 'name': b'author name', 'fullname': b'Author Name author@email', 'email': b'author@email', }, 'name': b'v0.0.1', 'message': b'some comment on release', 'synthetic': True, } expected_release = { 'id': 'aad23fa492a0c5fed0708a6703be875448c86884', 'target': '5e46d564378afc44b31bb89f99d5675195fbdf67', 'target_type': 'revision', 'date': '2015-01-01T22:00:00+00:00', 'author': { 'name': 'author name', 'fullname': 'Author Name author@email', 'email': 'author@email', }, 'name': 'v0.0.1', 'message': 'some comment on release', 'target_type': 'revision', 'synthetic': True, } # when actual_release = converters.from_release(release_input) # then self.assertEqual(actual_release, expected_release) @istest def from_release_no_revision(self): release_input = { 'id': hashutil.hash_to_bytes( 'b2171ee2bdf119cd99a7ec7eff32fa8013ef9a4e'), 'target': None, 'date': { 'timestamp': datetime.datetime( 2016, 3, 2, 10, 0, 0, tzinfo=datetime.timezone.utc).timestamp(), 'offset': 0, 'negative_utc': True, }, 'name': b'v0.1.1', 'message': b'comment on release', 'synthetic': False, 'author': { 'name': b'bob', 'fullname': b'Bob bob@alice.net', 'email': b'bob@alice.net', }, } expected_release = { 'id': 'b2171ee2bdf119cd99a7ec7eff32fa8013ef9a4e', 'target': None, 'date': '2016-03-02T10:00:00-00:00', 'name': 'v0.1.1', 'message': 'comment on release', 'synthetic': False, 'author': { 'name': 'bob', 'fullname': 'Bob bob@alice.net', 'email': 'bob@alice.net', }, } # when actual_release = converters.from_release(release_input) # then self.assertEqual(actual_release, expected_release) @istest def from_revision(self): revision_input = { 'id': hashutil.hash_to_bytes( '18d8be353ed3480476f032475e7c233eff7371d5'), 'directory': hashutil.hash_to_bytes( '7834ef7e7c357ce2af928115c6c6a42b7e2a44e6'), 'author': { 'name': b'Software Heritage', 'fullname': b'robot robot@softwareheritage.org', 'email': b'robot@softwareheritage.org', }, 'committer': { 'name': b'Software Heritage', 'fullname': b'robot robot@softwareheritage.org', 'email': b'robot@softwareheritage.org', }, 'message': b'synthetic revision message', 'date': { 'timestamp': datetime.datetime( 2000, 1, 17, 11, 23, 54, tzinfo=datetime.timezone.utc).timestamp(), 'offset': 0, 'negative_utc': False, }, 'committer_date': { 'timestamp': datetime.datetime( 2000, 1, 17, 11, 23, 54, tzinfo=datetime.timezone.utc).timestamp(), 'offset': 0, 'negative_utc': False, }, 'synthetic': True, 'type': 'tar', 'parents': [ hashutil.hash_to_bytes( '29d8be353ed3480476f032475e7c244eff7371d5'), hashutil.hash_to_bytes( '30d8be353ed3480476f032475e7c244eff7371d5') ], 'children': [ hashutil.hash_to_bytes( '123546353ed3480476f032475e7c244eff7371d5'), ], 'metadata': { 'extra_headers': [['gpgsig', b'some-signature']], 'original_artifact': [{ 'archive_type': 'tar', 'name': 'webbase-5.7.0.tar.gz', 'sha1': '147f73f369733d088b7a6fa9c4e0273dcd3c7ccd', 'sha1_git': '6a15ea8b881069adedf11feceec35588f2cfe8f1', 'sha256': '401d0df797110bea805d358b85bcc1ced29549d3d73f' '309d36484e7edf7bb912', }] }, } expected_revision = { 'id': '18d8be353ed3480476f032475e7c233eff7371d5', 'directory': '7834ef7e7c357ce2af928115c6c6a42b7e2a44e6', 'author': { 'name': 'Software Heritage', 'fullname': 'robot robot@softwareheritage.org', 'email': 'robot@softwareheritage.org', }, 'committer': { 'name': 'Software Heritage', 'fullname': 'robot robot@softwareheritage.org', 'email': 'robot@softwareheritage.org', }, 'message': 'synthetic revision message', 'date': "2000-01-17T11:23:54+00:00", 'committer_date': "2000-01-17T11:23:54+00:00", 'children': [ '123546353ed3480476f032475e7c244eff7371d5' ], 'parents': [ '29d8be353ed3480476f032475e7c244eff7371d5', '30d8be353ed3480476f032475e7c244eff7371d5' ], 'type': 'tar', 'synthetic': True, 'metadata': { 'extra_headers': [['gpgsig', 'some-signature']], 'original_artifact': [{ 'archive_type': 'tar', 'name': 'webbase-5.7.0.tar.gz', 'sha1': '147f73f369733d088b7a6fa9c4e0273dcd3c7ccd', 'sha1_git': '6a15ea8b881069adedf11feceec35588f2cfe8f1', 'sha256': '401d0df797110bea805d358b85bcc1ced29549d3d73f' '309d36484e7edf7bb912' }] }, 'merge': True } # when actual_revision = converters.from_revision(revision_input) # then self.assertEqual(actual_revision, expected_revision) @istest def from_revision_nomerge(self): revision_input = { 'id': hashutil.hash_to_bytes( '18d8be353ed3480476f032475e7c233eff7371d5'), 'parents': [ hashutil.hash_to_bytes( '29d8be353ed3480476f032475e7c244eff7371d5') ] } expected_revision = { 'id': '18d8be353ed3480476f032475e7c233eff7371d5', 'parents': [ '29d8be353ed3480476f032475e7c244eff7371d5' ], 'merge': False } # when actual_revision = converters.from_revision(revision_input) # then self.assertEqual(actual_revision, expected_revision) @istest def from_revision_noparents(self): revision_input = { 'id': hashutil.hash_to_bytes( '18d8be353ed3480476f032475e7c233eff7371d5'), 'directory': hashutil.hash_to_bytes( '7834ef7e7c357ce2af928115c6c6a42b7e2a44e6'), 'author': { 'name': b'Software Heritage', 'fullname': b'robot robot@softwareheritage.org', 'email': b'robot@softwareheritage.org', }, 'committer': { 'name': b'Software Heritage', 'fullname': b'robot robot@softwareheritage.org', 'email': b'robot@softwareheritage.org', }, 'message': b'synthetic revision message', 'date': { 'timestamp': datetime.datetime( 2000, 1, 17, 11, 23, 54, tzinfo=datetime.timezone.utc).timestamp(), 'offset': 0, 'negative_utc': False, }, 'committer_date': { 'timestamp': datetime.datetime( 2000, 1, 17, 11, 23, 54, tzinfo=datetime.timezone.utc).timestamp(), 'offset': 0, 'negative_utc': False, }, 'synthetic': True, 'type': 'tar', 'children': [ hashutil.hash_to_bytes( '123546353ed3480476f032475e7c244eff7371d5'), ], 'metadata': { 'original_artifact': [{ 'archive_type': 'tar', 'name': 'webbase-5.7.0.tar.gz', 'sha1': '147f73f369733d088b7a6fa9c4e0273dcd3c7ccd', 'sha1_git': '6a15ea8b881069adedf11feceec35588f2cfe8f1', 'sha256': '401d0df797110bea805d358b85bcc1ced29549d3d73f' '309d36484e7edf7bb912', }] }, } expected_revision = { 'id': '18d8be353ed3480476f032475e7c233eff7371d5', 'directory': '7834ef7e7c357ce2af928115c6c6a42b7e2a44e6', 'author': { 'name': 'Software Heritage', 'fullname': 'robot robot@softwareheritage.org', 'email': 'robot@softwareheritage.org', }, 'committer': { 'name': 'Software Heritage', 'fullname': 'robot robot@softwareheritage.org', 'email': 'robot@softwareheritage.org', }, 'message': 'synthetic revision message', 'date': "2000-01-17T11:23:54+00:00", 'committer_date': "2000-01-17T11:23:54+00:00", 'children': [ '123546353ed3480476f032475e7c244eff7371d5' ], 'type': 'tar', 'synthetic': True, 'metadata': { 'original_artifact': [{ 'archive_type': 'tar', 'name': 'webbase-5.7.0.tar.gz', 'sha1': '147f73f369733d088b7a6fa9c4e0273dcd3c7ccd', 'sha1_git': '6a15ea8b881069adedf11feceec35588f2cfe8f1', 'sha256': '401d0df797110bea805d358b85bcc1ced29549d3d73f' '309d36484e7edf7bb912' }] } } # when actual_revision = converters.from_revision(revision_input) # then self.assertEqual(actual_revision, expected_revision) @istest def from_revision_invalid(self): revision_input = { 'id': hashutil.hash_to_bytes( '18d8be353ed3480476f032475e7c233eff7371d5'), 'directory': hashutil.hash_to_bytes( '7834ef7e7c357ce2af928115c6c6a42b7e2a44e6'), 'author': { 'name': b'Software Heritage', 'fullname': b'robot robot@softwareheritage.org', 'email': b'robot@softwareheritage.org', }, 'committer': { 'name': b'Software Heritage', 'fullname': b'robot robot@softwareheritage.org', 'email': b'robot@softwareheritage.org', }, 'message': b'invalid message \xff', 'date': { 'timestamp': datetime.datetime( 2000, 1, 17, 11, 23, 54, tzinfo=datetime.timezone.utc).timestamp(), 'offset': 0, 'negative_utc': False, }, 'committer_date': { 'timestamp': datetime.datetime( 2000, 1, 17, 11, 23, 54, tzinfo=datetime.timezone.utc).timestamp(), 'offset': 0, 'negative_utc': False, }, 'synthetic': True, 'type': 'tar', 'parents': [ hashutil.hash_to_bytes( '29d8be353ed3480476f032475e7c244eff7371d5'), hashutil.hash_to_bytes( '30d8be353ed3480476f032475e7c244eff7371d5') ], 'children': [ hashutil.hash_to_bytes( '123546353ed3480476f032475e7c244eff7371d5'), ], 'metadata': { 'original_artifact': [{ 'archive_type': 'tar', 'name': 'webbase-5.7.0.tar.gz', 'sha1': '147f73f369733d088b7a6fa9c4e0273dcd3c7ccd', 'sha1_git': '6a15ea8b881069adedf11feceec35588f2cfe8f1', 'sha256': '401d0df797110bea805d358b85bcc1ced29549d3d73f' '309d36484e7edf7bb912', }] }, } expected_revision = { 'id': '18d8be353ed3480476f032475e7c233eff7371d5', 'directory': '7834ef7e7c357ce2af928115c6c6a42b7e2a44e6', 'author': { 'name': 'Software Heritage', 'fullname': 'robot robot@softwareheritage.org', 'email': 'robot@softwareheritage.org', }, 'committer': { 'name': 'Software Heritage', 'fullname': 'robot robot@softwareheritage.org', 'email': 'robot@softwareheritage.org', }, 'message': None, 'message_decoding_failed': True, 'date': "2000-01-17T11:23:54+00:00", 'committer_date': "2000-01-17T11:23:54+00:00", 'children': [ '123546353ed3480476f032475e7c244eff7371d5' ], 'parents': [ '29d8be353ed3480476f032475e7c244eff7371d5', '30d8be353ed3480476f032475e7c244eff7371d5' ], 'type': 'tar', 'synthetic': True, 'metadata': { 'original_artifact': [{ 'archive_type': 'tar', 'name': 'webbase-5.7.0.tar.gz', 'sha1': '147f73f369733d088b7a6fa9c4e0273dcd3c7ccd', 'sha1_git': '6a15ea8b881069adedf11feceec35588f2cfe8f1', 'sha256': '401d0df797110bea805d358b85bcc1ced29549d3d73f' '309d36484e7edf7bb912' }] }, 'merge': True } # when actual_revision = converters.from_revision(revision_input) # then self.assertEqual(actual_revision, expected_revision) @istest def from_content_none(self): self.assertIsNone(converters.from_content(None)) @istest def from_content(self): content_input = { 'sha1': hashutil.hash_to_bytes( '5c6f0e2750f48fa0bd0c4cf5976ba0b9e02ebda5'), 'sha256': hashutil.hash_to_bytes( '39007420ca5de7cb3cfc15196335507e' 'e76c98930e7e0afa4d2747d3bf96c926'), 'blake2s256': hashutil.hash_to_bytes( '49007420ca5de7cb3cfc15196335507e' 'e76c98930e7e0afa4d2747d3bf96c926'), 'sha1_git': hashutil.hash_to_bytes( '40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03'), 'ctime': 'something-which-is-filtered-out', 'data': b'data in bytes', 'length': 10, 'status': 'hidden', } # 'status' is filtered expected_content = { 'checksums': { 'sha1': '5c6f0e2750f48fa0bd0c4cf5976ba0b9e02ebda5', 'sha256': '39007420ca5de7cb3cfc15196335507ee76c98' '930e7e0afa4d2747d3bf96c926', 'blake2s256': '49007420ca5de7cb3cfc15196335507ee7' '6c98930e7e0afa4d2747d3bf96c926', 'sha1_git': '40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03', }, 'data': b'data in bytes', 'length': 10, 'status': 'absent', } # when actual_content = converters.from_content(content_input) # then self.assertEqual(actual_content, expected_content) @istest def from_person(self): person_input = { 'id': 10, 'anything': 'else', 'name': b'bob', 'fullname': b'bob bob@alice.net', 'email': b'bob@foo.alice', } expected_person = { 'id': 10, 'anything': 'else', 'name': 'bob', 'fullname': 'bob bob@alice.net', 'email': 'bob@foo.alice', } # when actual_person = converters.from_person(person_input) # then self.assertEqual(actual_person, expected_person) @istest def from_directory_entries(self): dir_entries_input = { 'sha1': hashutil.hash_to_bytes( '5c6f0e2750f48fa0bd0c4cf5976ba0b9e02ebda5'), 'sha256': hashutil.hash_to_bytes( '39007420ca5de7cb3cfc15196335507e' 'e76c98930e7e0afa4d2747d3bf96c926'), 'sha1_git': hashutil.hash_to_bytes( '40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03'), 'blake2s256': hashutil.hash_to_bytes( '685395c5dc57cada459364f0946d3dd45bad5fcbab' 'c1048edb44380f1d31d0aa'), 'target': hashutil.hash_to_bytes( '40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03'), 'dir_id': hashutil.hash_to_bytes( '40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03'), 'name': b'bob', 'type': 10, 'status': 'hidden', } expected_dir_entries = { 'checksums': { 'sha1': '5c6f0e2750f48fa0bd0c4cf5976ba0b9e02ebda5', 'sha256': '39007420ca5de7cb3cfc15196335507ee76c98' '930e7e0afa4d2747d3bf96c926', 'sha1_git': '40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03', 'blake2s256': '685395c5dc57cada459364f0946d3dd45bad5f' 'cbabc1048edb44380f1d31d0aa', }, 'target': '40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03', 'dir_id': '40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03', 'name': 'bob', 'type': 10, 'status': 'absent', } # when actual_dir_entries = converters.from_directory_entry(dir_entries_input) # then self.assertEqual(actual_dir_entries, expected_dir_entries) @istest def from_filetype(self): content_filetype = { 'id': hashutil.hash_to_bytes( '5c6f0e2750f48fa0bd0c4cf5976ba0b9e02ebda5'), 'encoding': b'utf-8', 'mimetype': b'text/plain', } expected_content_filetype = { 'id': '5c6f0e2750f48fa0bd0c4cf5976ba0b9e02ebda5', 'encoding': 'utf-8', 'mimetype': 'text/plain', } # when actual_content_filetype = converters.from_filetype(content_filetype) # then self.assertEqual(actual_content_filetype, expected_content_filetype)