Page MenuHomeSoftware Heritage

No OneTemporary

diff --git a/swh/web/api/utils.py b/swh/web/api/utils.py
index 33a23450..09f3cbdf 100644
--- a/swh/web/api/utils.py
+++ b/swh/web/api/utils.py
@@ -1,353 +1,306 @@
# Copyright (C) 2015-2018 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU Affero General Public License version 3, or any later version
# See top-level LICENSE file for more information
-import re
-from swh.web.common.utils import reverse, fmap
+from swh.web.common.utils import reverse
from swh.web.common.query import parse_hash
-def filter_endpoints(url_map, prefix_url_rule, blacklist=[]):
- """Filter endpoints by prefix url rule.
-
- Args:
- - url_map: Url Werkzeug.Map of rules
- - prefix_url_rule: prefix url string
- - blacklist: blacklist of some url
-
- Returns:
- Dictionary of url_rule with values methods and endpoint.
-
- The key is the url, the associated value is a dictionary of
- 'methods' (possible http methods) and 'endpoint' (python function)
-
- """
- out = {}
- for r in url_map:
- rule = r['rule']
- if rule == prefix_url_rule or rule in blacklist:
- continue
-
- if rule.startswith(prefix_url_rule):
- out[rule] = {'methods': sorted(map(str, r['methods'])),
- 'endpoint': r['endpoint']}
- return out
-
-
-def prepare_data_for_view(data, encoding='utf-8'):
- def prepare_data(s):
- # Note: can only be 'data' key with bytes of raw content
- if isinstance(s, bytes):
- try:
- return s.decode(encoding)
- except Exception:
- return "Cannot decode the data bytes, try and set another " \
- "encoding in the url (e.g. ?encoding=utf8) or " \
- "download directly the " \
- "content's raw data."
- if isinstance(s, str):
- return re.sub(r'/api/1/', r'/browse/', s)
-
- return s
-
- return fmap(prepare_data, data)
-
-
def filter_field_keys(data, field_keys):
"""Given an object instance (directory or list), and a csv field keys
to filter on.
Return the object instance with filtered keys.
Note: Returns obj as is if it's an instance of types not in (dictionary,
list)
Args:
- data: one object (dictionary, list...) to filter.
- field_keys: csv or set of keys to filter the object on
Returns:
obj filtered on field_keys
"""
if isinstance(data, map):
return map(lambda x: filter_field_keys(x, field_keys), data)
if isinstance(data, list):
return [filter_field_keys(x, field_keys) for x in data]
if isinstance(data, dict):
return {k: v for (k, v) in data.items() if k in field_keys}
return data
def person_to_string(person):
"""Map a person (person, committer, tagger, etc...) to a string.
"""
return ''.join([person['name'], ' <', person['email'], '>'])
def enrich_object(object):
"""Enrich an object (revision, release) with link to the 'target' of
type 'target_type'.
Args:
object: An object with target and target_type keys
(e.g. release, revision)
Returns:
Object enriched with target_url pointing to the right
swh.web.ui.api urls for the pointing object (revision,
release, content, directory)
"""
obj = object.copy()
if 'target' in obj and 'target_type' in obj:
if obj['target_type'] == 'revision':
obj['target_url'] = reverse('revision',
kwargs={'sha1_git': obj['target']})
elif obj['target_type'] == 'release':
obj['target_url'] = reverse('release',
kwargs={'sha1_git': obj['target']})
elif obj['target_type'] == 'content':
obj['target_url'] = \
reverse('content', kwargs={'q': 'sha1_git:' + obj['target']})
elif obj['target_type'] == 'directory':
obj['target_url'] = reverse('directory',
kwargs={'sha1_git': obj['target']})
if 'author' in obj:
author = obj['author']
obj['author_url'] = reverse('person',
kwargs={'person_id': author['id']})
return obj
enrich_release = enrich_object
def enrich_directory(directory, context_url=None):
"""Enrich directory with url to content or directory.
"""
if 'type' in directory:
target_type = directory['type']
target = directory['target']
if target_type == 'file':
directory['target_url'] = \
reverse('content', kwargs={'q': 'sha1_git:%s' % target})
if context_url:
directory['file_url'] = context_url + directory['name'] + '/'
elif target_type == 'dir':
directory['target_url'] = reverse('directory',
kwargs={'sha1_git': target})
if context_url:
directory['dir_url'] = context_url + directory['name'] + '/'
else:
directory['target_url'] = reverse('revision',
kwargs={'sha1_git': target})
if context_url:
directory['rev_url'] = context_url + directory['name'] + '/'
return directory
def enrich_metadata_endpoint(content):
"""Enrich metadata endpoint with link to the upper metadata endpoint.
"""
c = content.copy()
c['content_url'] = reverse('content', args=['sha1:%s' % c['id']])
return c
def enrich_content(content, top_url=False, query_string=None):
"""Enrich content with links to:
- data_url: its raw data
- filetype_url: its filetype information
- language_url: its programming language information
- license_url: its licensing information
Args:
content: dict of data associated to a swh content object
top_url: whether or not to include the content url in
the enriched data
query_string: optional query string of type '<algo>:<hash>'
used when requesting the content, it acts as a hint
for picking the same hash method when computing
the url listed above
Returns:
An enriched content dict filled with additional urls
"""
checksums = content
if 'checksums' in content:
checksums = content['checksums']
hash_algo = 'sha1'
if query_string:
hash_algo = parse_hash(query_string)[0]
if hash_algo in checksums:
q = '%s:%s' % (hash_algo, checksums[hash_algo])
if top_url:
content['content_url'] = reverse('content', kwargs={'q': q})
content['data_url'] = reverse('content-raw', kwargs={'q': q})
content['filetype_url'] = reverse('content-filetype',
kwargs={'q': q})
content['language_url'] = reverse('content-language',
kwargs={'q': q})
content['license_url'] = reverse('content-license',
kwargs={'q': q})
return content
def enrich_entity(entity):
"""Enrich entity with
"""
if 'uuid' in entity:
entity['uuid_url'] = reverse('entity',
kwargs={'uuid': entity['uuid']})
if 'parent' in entity and entity['parent']:
entity['parent_url'] = reverse('entity',
kwargs={'uuid': entity['parent']})
return entity
def _get_path_list(path_string):
"""Helper for enrich_revision: get a list of the sha1 id of the navigation
breadcrumbs, ordered from the oldest to the most recent.
Args:
path_string: the path as a '/'-separated string
Returns:
The navigation context as a list of sha1 revision ids
"""
return path_string.split('/')
def _get_revision_contexts(rev_id, context):
"""Helper for enrich_revision: retrieve for the revision id and potentially
the navigation breadcrumbs the context to pass to parents and children of
of the revision.
Args:
rev_id: the revision's sha1 id
context: the current navigation context
Returns:
The context for parents, children and the url of the direct child as a
tuple in that order.
"""
context_for_parents = None
context_for_children = None
url_direct_child = None
if not context:
return (rev_id, None, None)
path_list = _get_path_list(context)
context_for_parents = '%s/%s' % (context, rev_id)
prev_for_children = path_list[:-1]
if len(prev_for_children) > 0:
context_for_children = '/'.join(prev_for_children)
child_id = path_list[-1]
# This commit is not the first commit in the path
if context_for_children:
url_direct_child = reverse('revision-context',
kwargs={'sha1_git': child_id,
'context': context_for_children})
# This commit is the first commit in the path
else:
url_direct_child = reverse('revision', kwargs={'sha1_git': child_id})
return (context_for_parents, context_for_children, url_direct_child)
def _make_child_url(rev_children, context):
"""Helper for enrich_revision: retrieve the list of urls corresponding
to the children of the current revision according to the navigation
breadcrumbs.
Args:
rev_children: a list of revision id
context: the '/'-separated navigation breadcrumbs
Returns:
the list of the children urls according to the context
"""
children = []
for child in rev_children:
if context and child != _get_path_list(context)[-1]:
children.append(reverse('revision',
kwargs={'sha1_git': child}))
elif not context:
children.append(reverse('revision', kwargs={'sha1_git': child}))
return children
def enrich_revision(revision, context=None):
"""Enrich revision with links where it makes sense (directory, parents).
Keep track of the navigation breadcrumbs if they are specified.
Args:
revision: the revision as a dict
context: the navigation breadcrumbs as a /-separated string of revision
sha1_git
"""
ctx_parents, ctx_children, url_direct_child = _get_revision_contexts(
revision['id'], context)
revision['url'] = reverse('revision', kwargs={'sha1_git': revision['id']})
revision['history_url'] = reverse('revision-log',
kwargs={'sha1_git': revision['id']})
if context:
revision['history_context_url'] = reverse(
'revision-log', kwargs={'sha1_git': revision['id'],
'prev_sha1s': context})
if 'author' in revision:
author = revision['author']
revision['author_url'] = reverse('person',
kwargs={'person_id': author['id']})
if 'committer' in revision:
committer = revision['committer']
revision['committer_url'] = \
reverse('person', kwargs={'person_id': committer['id']})
if 'directory' in revision:
revision['directory_url'] = \
reverse('directory', kwargs={'sha1_git': revision['directory']})
if 'parents' in revision:
parents = []
for parent in revision['parents']:
parents.append({
'id': parent,
'url': reverse('revision', kwargs={'sha1_git': parent})
})
revision['parents'] = parents
if 'children' in revision:
children = _make_child_url(revision['children'], context)
if url_direct_child:
children.append(url_direct_child)
revision['children_urls'] = children
else:
if url_direct_child:
revision['children_urls'] = [url_direct_child]
if 'message_decoding_failed' in revision:
revision['message_url'] = reverse('revision-raw-message',
kwargs={'sha1_git': revision['id']})
return revision
diff --git a/swh/web/common/converters.py b/swh/web/common/converters.py
index a151fd00..986193a9 100644
--- a/swh/web/common/converters.py
+++ b/swh/web/common/converters.py
@@ -1,362 +1,390 @@
# Copyright (C) 2015-2018 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU Affero General Public License version 3, or any later version
# See top-level LICENSE file for more information
import datetime
import json
from swh.model import hashutil
from swh.core.utils import decode_with_escape
-from swh.web.common import utils
def _group_checksums(data):
"""Groups checksums values computed from hash functions used in swh
and stored in data dict under a single entry 'checksums'
"""
if data:
checksums = {}
for hash in hashutil.ALGORITHMS:
if hash in data and data[hash]:
checksums[hash] = data[hash]
del data[hash]
if len(checksums) > 0:
data['checksums'] = checksums
+def fmap(f, data):
+ """Map f to data at each level.
+
+ This must keep the origin data structure type:
+ - map -> map
+ - dict -> dict
+ - list -> list
+ - None -> None
+
+ Args:
+ f: function that expects one argument.
+ data: data to traverse to apply the f function.
+ list, map, dict or bare value.
+
+ Returns:
+ The same data-structure with modified values by the f function.
+
+ """
+ if data is None:
+ return data
+ if isinstance(data, map):
+ return map(lambda y: fmap(f, y), (x for x in data))
+ if isinstance(data, list):
+ return [fmap(f, x) for x in data]
+ if isinstance(data, dict):
+ return {k: fmap(f, v) for (k, v) in data.items()}
+ return f(data)
+
+
def from_swh(dict_swh, hashess={}, bytess={}, dates={}, blacklist={},
removables_if_empty={}, empty_dict={}, empty_list={},
convert={}, convert_fn=lambda x: x):
"""Convert from an swh dictionary to something reasonably json
serializable.
Args:
dict_swh: the origin dictionary needed to be transformed
hashess: list/set of keys representing hashes values (sha1, sha256,
sha1_git, etc...) as bytes. Those need to be transformed in
hexadecimal string
bytess: list/set of keys representing bytes values which needs to be
decoded
blacklist: set of keys to filter out from the conversion
convert: set of keys whose associated values need to be converted using
convert_fn
convert_fn: the conversion function to apply on the value of key in
'convert'
The remaining keys are copied as is in the output.
Returns:
dictionary equivalent as dict_swh only with its keys converted.
"""
def convert_hashes_bytes(v):
"""v is supposedly a hash as bytes, returns it converted in hex.
"""
if isinstance(v, bytes):
return hashutil.hash_to_hex(v)
return v
def convert_bytes(v):
"""v is supposedly a bytes string, decode as utf-8.
FIXME: Improve decoding policy.
If not utf-8, break!
"""
if isinstance(v, bytes):
return v.decode('utf-8')
return v
def convert_date(v):
"""
Args:
v (dict or datatime): either:
- a dict with three keys:
- timestamp (dict or integer timestamp)
- offset
- negative_utc
- or, a datetime
We convert it to a human-readable string
"""
if not v:
return v
if isinstance(v, datetime.datetime):
return v.isoformat()
tz = datetime.timezone(datetime.timedelta(minutes=v['offset']))
swh_timestamp = v['timestamp']
if isinstance(swh_timestamp, dict):
date = datetime.datetime.fromtimestamp(
swh_timestamp['seconds'], tz=tz)
else:
date = datetime.datetime.fromtimestamp(
swh_timestamp, tz=tz)
datestr = date.isoformat()
if v['offset'] == 0 and v['negative_utc']:
# remove the rightmost + and replace it with a -
return '-'.join(datestr.rsplit('+', 1))
return datestr
if not dict_swh:
return dict_swh
new_dict = {}
for key, value in dict_swh.items():
if key in blacklist or (key in removables_if_empty and not value):
continue
if key in dates:
new_dict[key] = convert_date(value)
elif key in convert:
new_dict[key] = convert_fn(value)
elif isinstance(value, dict):
new_dict[key] = from_swh(value,
hashess=hashess, bytess=bytess,
dates=dates, blacklist=blacklist,
removables_if_empty=removables_if_empty,
empty_dict=empty_dict,
empty_list=empty_list,
convert=convert,
convert_fn=convert_fn)
elif key in hashess:
- new_dict[key] = utils.fmap(convert_hashes_bytes, value)
+ new_dict[key] = fmap(convert_hashes_bytes, value)
elif key in bytess:
try:
- new_dict[key] = utils.fmap(convert_bytes, value)
+ new_dict[key] = fmap(convert_bytes, value)
except UnicodeDecodeError:
if 'decoding_failures' not in new_dict:
new_dict['decoding_failures'] = [key]
else:
new_dict['decoding_failures'].append(key)
- new_dict[key] = utils.fmap(decode_with_escape, value)
+ new_dict[key] = fmap(decode_with_escape, value)
elif key in empty_dict and not value:
new_dict[key] = {}
elif key in empty_list and not value:
new_dict[key] = []
else:
new_dict[key] = value
_group_checksums(new_dict)
return new_dict
def from_provenance(provenance):
"""Convert from a provenance information to a provenance dictionary.
Args:
provenance (dict): Dictionary with the following keys:
- content (sha1_git): the content's identifier
- revision (sha1_git): the revision the content was seen
- origin (int): the origin the content was seen
- visit (int): the visit it occurred
- path (bytes): the path the content was seen at
"""
return from_swh(provenance,
hashess={'content', 'revision'},
bytess={'path'})
def from_origin(origin):
"""Convert from an SWH origin to an origin dictionary.
"""
return from_swh(origin,
removables_if_empty={'lister', 'project'})
def from_release(release):
"""Convert from an SWH release to a json serializable release dictionary.
Args:
release (dict): dictionary with keys:
- id: identifier of the revision (sha1 in bytes)
- revision: identifier of the revision the release points to (sha1
in bytes)
comment: release's comment message (bytes)
name: release's name (string)
author: release's author identifier (swh's id)
synthetic: the synthetic property (boolean)
Returns:
dict: Release dictionary with the following keys:
- id: hexadecimal sha1 (string)
- revision: hexadecimal sha1 (string)
- comment: release's comment message (string)
- name: release's name (string)
- author: release's author identifier (swh's id)
- synthetic: the synthetic property (boolean)
"""
return from_swh(
release,
hashess={'id', 'target'},
bytess={'message', 'name', 'fullname', 'email'},
dates={'date'},
)
class SWHMetadataEncoder(json.JSONEncoder):
"""Special json encoder for metadata field which can contain bytes
encoded value.
"""
def default(self, obj):
if isinstance(obj, bytes):
return obj.decode('utf-8')
# Let the base class default method raise the TypeError
return json.JSONEncoder.default(self, obj)
def convert_revision_metadata(metadata):
"""Convert json specific dict to a json serializable one.
"""
if not metadata:
return {}
return json.loads(json.dumps(metadata, cls=SWHMetadataEncoder))
def from_revision(revision):
"""Convert from an SWH revision to a json serializable revision dictionary.
Args:
revision (dict): dict with keys:
- id: identifier of the revision (sha1 in bytes)
- directory: identifier of the directory the revision points to
(sha1 in bytes)
- author_name, author_email: author's revision name and email
- committer_name, committer_email: committer's revision name and
email
- message: revision's message
- date, date_offset: revision's author date
- committer_date, committer_date_offset: revision's commit date
- parents: list of parents for such revision
- synthetic: revision's property nature
- type: revision's type (git, tar or dsc at the moment)
- metadata: if the revision is synthetic, this can reference
dynamic properties.
Returns:
dict: Revision dictionary with the same keys as inputs, except:
- sha1s are in hexadecimal strings (id, directory)
- bytes are decoded in string (author_name, committer_name,
author_email, committer_email)
Remaining keys are left as is
"""
revision = from_swh(revision,
hashess={'id', 'directory', 'parents', 'children'},
bytess={'name', 'fullname', 'email'},
convert={'metadata'},
convert_fn=convert_revision_metadata,
dates={'date', 'committer_date'})
if revision:
if 'parents' in revision:
revision['merge'] = len(revision['parents']) > 1
if 'message' in revision:
try:
revision['message'] = revision['message'].decode('utf-8')
except UnicodeDecodeError:
revision['message_decoding_failed'] = True
revision['message'] = None
return revision
def from_content(content):
"""Convert swh content to serializable content dictionary.
"""
return from_swh(content,
hashess={'sha1', 'sha1_git', 'sha256', 'blake2s256'},
blacklist={'ctime'},
convert={'status'},
convert_fn=lambda v: 'absent' if v == 'hidden' else v)
def from_person(person):
"""Convert swh person to serializable person dictionary.
"""
return from_swh(person,
bytess={'name', 'fullname', 'email'})
def from_origin_visit(visit):
"""Convert swh origin_visit to serializable origin_visit dictionary.
"""
ov = from_swh(visit,
hashess={'target', 'snapshot'},
bytess={'branch'},
dates={'date'},
empty_dict={'metadata'})
# TODO: remove that piece of code once snapshot migration
# is totally effective in storage (no more occurrences)
if ov and 'occurrences' in ov:
ov['occurrences'] = {
decode_with_escape(k): v
for k, v in ov['occurrences'].items()
}
return ov
def from_snapshot(snapshot):
"""Convert swh snapshot to serializable snapshot dictionary.
"""
sv = from_swh(snapshot,
hashess={'id', 'target'})
if sv and 'branches' in sv:
sv['branches'] = {
decode_with_escape(k): v
for k, v in sv['branches'].items()
}
return sv
def from_directory_entry(dir_entry):
"""Convert swh person to serializable person dictionary.
"""
return from_swh(dir_entry,
hashess={'dir_id', 'sha1_git', 'sha1', 'sha256',
'blake2s256', 'target'},
bytess={'name'},
removables_if_empty={
'sha1', 'sha1_git', 'sha256', 'blake2s256', 'status'},
convert={'status'},
convert_fn=lambda v: 'absent' if v == 'hidden' else v)
def from_filetype(content_entry):
"""Convert swh person to serializable person dictionary.
"""
return from_swh(content_entry,
hashess={'id'},
bytess={'mimetype', 'encoding'})
diff --git a/swh/web/common/utils.py b/swh/web/common/utils.py
index c4977055..6dfca325 100644
--- a/swh/web/common/utils.py
+++ b/swh/web/common/utils.py
@@ -1,376 +1,347 @@
# Copyright (C) 2017-2018 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU Affero General Public License version 3, or any later version
# See top-level LICENSE file for more information
import docutils.parsers.rst
import docutils.utils
import re
from datetime import datetime, timezone
from dateutil import parser as date_parser
from dateutil import tz
from django.core.cache import cache
from django.core import urlresolvers
from django.http import QueryDict
from swh.model.exceptions import ValidationError
from swh.model.identifiers import (
persistent_identifier, parse_persistent_identifier,
CONTENT, DIRECTORY, RELEASE, REVISION, SNAPSHOT
)
from swh.web.common import service
from swh.web.common.exc import BadInputExc
def reverse(viewname, args=None, kwargs=None, query_params=None,
current_app=None, urlconf=None):
"""An override of django reverse function supporting query parameters.
Args:
viewname: the name of the django view from which to compute a url
args: list of url arguments ordered according to their position it
kwargs: dictionary of url arguments indexed by their names
query_params: dictionary of query parameters to append to the
reversed url
current_app: the name of the django app tighted to the view
urlconf: url configuration module
Returns:
The url of the requested view with processed arguments and
query parameters
"""
if kwargs:
kwargs = {k: v for k, v in kwargs.items() if v is not None}
url = urlresolvers.reverse(
viewname, urlconf=urlconf, args=args,
kwargs=kwargs, current_app=current_app)
if query_params:
query_params = {k: v for k, v in query_params.items() if v is not None}
if query_params and len(query_params) > 0:
query_dict = QueryDict('', mutable=True)
for k in sorted(query_params.keys()):
query_dict[k] = query_params[k]
url += ('?' + query_dict.urlencode(safe='/;:'))
return url
-def fmap(f, data):
- """Map f to data at each level.
-
- This must keep the origin data structure type:
- - map -> map
- - dict -> dict
- - list -> list
- - None -> None
-
- Args:
- f: function that expects one argument.
- data: data to traverse to apply the f function.
- list, map, dict or bare value.
-
- Returns:
- The same data-structure with modified values by the f function.
-
- """
- if data is None:
- return data
- if isinstance(data, map):
- return map(lambda y: fmap(f, y), (x for x in data))
- if isinstance(data, list):
- return [fmap(f, x) for x in data]
- if isinstance(data, dict):
- return {k: fmap(f, v) for (k, v) in data.items()}
- return f(data)
-
-
def datetime_to_utc(date):
"""Returns datetime in UTC without timezone info
Args:
date (datetime.datetime): input datetime with timezone info
Returns:
datetime.datime: datetime in UTC without timezone info
"""
if date.tzinfo:
return date.astimezone(tz.gettz('UTC')).replace(tzinfo=timezone.utc)
else:
return date
def parse_timestamp(timestamp):
"""Given a time or timestamp (as string), parse the result as UTC datetime.
Returns:
a timezone-aware datetime representing the parsed value.
None if the parsing fails.
Samples:
- 2016-01-12
- 2016-01-12T09:19:12+0100
- Today is January 1, 2047 at 8:21:00AM
- 1452591542
"""
if not timestamp:
return None
try:
date = date_parser.parse(timestamp, ignoretz=False, fuzzy=True)
return datetime_to_utc(date)
except Exception:
try:
return datetime.utcfromtimestamp(float(timestamp)).replace(
tzinfo=timezone.utc)
except (ValueError, OverflowError) as e:
raise BadInputExc(e)
def shorten_path(path):
"""Shorten the given path: for each hash present, only return the first
8 characters followed by an ellipsis"""
sha256_re = r'([0-9a-f]{8})[0-9a-z]{56}'
sha1_re = r'([0-9a-f]{8})[0-9a-f]{32}'
ret = re.sub(sha256_re, r'\1...', path)
return re.sub(sha1_re, r'\1...', ret)
def format_utc_iso_date(iso_date, fmt='%d %B %Y, %H:%M UTC'):
"""Turns a string reprensation of an ISO 8601 date string
to UTC and format it into a more human readable one.
For instance, from the following input
string: '2017-05-04T13:27:13+02:00' the following one
is returned: '04 May 2017, 11:27 UTC'.
Custom format string may also be provided
as parameter
Args:
iso_date (str): a string representation of an ISO 8601 date
fmt (str): optional date formatting string
Returns:
A formatted string representation of the input iso date
"""
if not iso_date:
return iso_date
date = parse_timestamp(iso_date)
return date.strftime(fmt)
def gen_path_info(path):
"""Function to generate path data navigation for use
with a breadcrumb in the swh web ui.
For instance, from a path /folder1/folder2/folder3,
it returns the following list::
[{'name': 'folder1', 'path': 'folder1'},
{'name': 'folder2', 'path': 'folder1/folder2'},
{'name': 'folder3', 'path': 'folder1/folder2/folder3'}]
Args:
path: a filesystem path
Returns:
A list of path data for navigation as illustrated above.
"""
path_info = []
if path:
sub_paths = path.strip('/').split('/')
path_from_root = ''
for p in sub_paths:
path_from_root += '/' + p
path_info.append({'name': p,
'path': path_from_root.strip('/')})
return path_info
def get_origin_visits(origin_info):
"""Function that returns the list of visits for a swh origin.
That list is put in cache in order to speedup the navigation
in the swh web browse ui.
Args:
origin_id (int): the id of the swh origin to fetch visits from
Returns:
A list of dict describing the origin visits::
[{'date': <UTC visit date in ISO format>,
'origin': <origin id>,
'status': <'full' | 'partial'>,
'visit': <visit id>
},
...
]
Raises:
NotFoundExc if the origin is not found
"""
cache_entry_id = 'origin_%s_visits' % origin_info['id']
cache_entry = cache.get(cache_entry_id)
if cache_entry:
return cache_entry
origin_visits = []
per_page = service.MAX_LIMIT
last_visit = None
while 1:
visits = list(service.lookup_origin_visits(origin_info['id'],
last_visit=last_visit,
per_page=per_page))
origin_visits += visits
if len(visits) < per_page:
break
else:
if not last_visit:
last_visit = per_page
else:
last_visit += per_page
def _visit_sort_key(visit):
ts = parse_timestamp(visit['date']).timestamp()
return ts + (float(visit['visit']) / 10e3)
for v in origin_visits:
if 'metadata' in v:
del v['metadata']
origin_visits = [dict(t) for t in set([tuple(d.items())
for d in origin_visits])]
origin_visits = sorted(origin_visits, key=lambda v: _visit_sort_key(v))
cache.set(cache_entry_id, origin_visits)
return origin_visits
def get_swh_persistent_id(object_type, object_id, scheme_version=1):
"""
Returns the persistent identifier for a swh object based on:
* the object type
* the object id
* the swh identifiers scheme version
Args:
object_type (str): the swh object type
(content/directory/release/revision/snapshot)
object_id (str): the swh object id (hexadecimal representation
of its hash value)
scheme_version (int): the scheme version of the swh
persistent identifiers
Returns:
str: the swh object persistent identifier
Raises:
BadInputExc if the provided parameters do not enable to
generate a valid identifier
"""
try:
swh_id = persistent_identifier(object_type, object_id, scheme_version)
except ValidationError as e:
raise BadInputExc('Invalid object (%s) for swh persistent id. %s' %
(object_id, e))
else:
return swh_id
def resolve_swh_persistent_id(swh_id, query_params=None):
"""
Try to resolve a SWH persistent id into an url for
browsing the pointed object.
Args:
swh_id (str): a SWH persistent identifier
query_params (django.http.QueryDict): optional dict filled with
query parameters to append to the browse url
Returns:
dict: a dict with the following keys:
* **swh_id_parsed (swh.model.identifiers.PersistentId)**: the parsed identifier
* **browse_url (str)**: the url for browsing the pointed object
Raises:
BadInputExc: if the provided identifier can not be parsed
""" # noqa
try:
swh_id_parsed = parse_persistent_identifier(swh_id)
object_type = swh_id_parsed.object_type
object_id = swh_id_parsed.object_id
browse_url = None
query_dict = QueryDict('', mutable=True)
if query_params and len(query_params) > 0:
for k in sorted(query_params.keys()):
query_dict[k] = query_params[k]
if 'origin' in swh_id_parsed.metadata:
query_dict['origin'] = swh_id_parsed.metadata['origin']
if object_type == CONTENT:
query_string = 'sha1_git:' + object_id
fragment = ''
if 'lines' in swh_id_parsed.metadata:
lines = swh_id_parsed.metadata['lines'].split('-')
fragment += '#L' + lines[0]
if len(lines) > 1:
fragment += '-L' + lines[1]
browse_url = reverse('browse-content',
kwargs={'query_string': query_string},
query_params=query_dict) + fragment
elif object_type == DIRECTORY:
browse_url = reverse('browse-directory',
kwargs={'sha1_git': object_id},
query_params=query_dict)
elif object_type == RELEASE:
browse_url = reverse('browse-release',
kwargs={'sha1_git': object_id},
query_params=query_dict)
elif object_type == REVISION:
browse_url = reverse('browse-revision',
kwargs={'sha1_git': object_id},
query_params=query_dict)
elif object_type == SNAPSHOT:
browse_url = reverse('browse-snapshot',
kwargs={'snapshot_id': object_id},
query_params=query_dict)
except ValidationError as ve:
raise BadInputExc('Error when parsing identifier. %s' %
' '.join(ve.messages))
else:
return {'swh_id_parsed': swh_id_parsed,
'browse_url': browse_url}
def parse_rst(text, report_level=2):
"""
Parse a reStructuredText string with docutils.
Args:
text (str): string with reStructuredText markups in it
report_level (int): level of docutils report messages to print
(1 info 2 warning 3 error 4 severe 5 none)
Returns:
docutils.nodes.document: a parsed docutils document
"""
parser = docutils.parsers.rst.Parser()
components = (docutils.parsers.rst.Parser,)
settings = docutils.frontend.OptionParser(
components=components).get_default_values()
settings.report_level = report_level
document = docutils.utils.new_document('rst-doc', settings=settings)
parser.parse(text, document)
return document
diff --git a/swh/web/tests/api/test_utils.py b/swh/web/tests/api/test_utils.py
index 5f96b83d..ce816a8a 100644
--- a/swh/web/tests/api/test_utils.py
+++ b/swh/web/tests/api/test_utils.py
@@ -1,880 +1,743 @@
# Copyright (C) 2015-2018 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU Affero General Public License version 3, or any later version
# See top-level LICENSE file for more information
import unittest
from unittest.mock import patch, call
from nose.tools import istest, nottest
from swh.web.api import utils
class UtilsTestCase(unittest.TestCase):
def setUp(self):
self.maxDiff = None
self.url_map = [dict(rule='/other/<slug>',
methods=set(['GET', 'POST', 'HEAD']),
endpoint='foo'),
dict(rule='/some/old/url/<slug>',
methods=set(['GET', 'POST']),
endpoint='blablafn'),
dict(rule='/other/old/url/<int:id>',
methods=set(['GET', 'HEAD']),
endpoint='bar'),
dict(rule='/other',
methods=set([]),
endpoint=None),
dict(rule='/other2',
methods=set([]),
endpoint=None)]
self.sample_content_hashes = {
'blake2s256': ('791e07fcea240ade6dccd0a9309141673'
'c31242cae9c237cf3855e151abc78e9'),
'sha1': 'dc2830a9e72f23c1dfebef4413003221baa5fb62',
'sha1_git': 'fe95a46679d128ff167b7c55df5d02356c5a1ae1',
'sha256': ('b5c7fe0536f44ef60c8780b6065d30bca74a5cd06'
'd78a4a71ba1ad064770f0c9')
}
- @istest
- def filter_endpoints_1(self):
- # when
- actual_data = utils.filter_endpoints(self.url_map, '/some')
-
- # then
- self.assertEquals(actual_data, {
- '/some/old/url/<slug>': {
- 'methods': ['GET', 'POST'],
- 'endpoint': 'blablafn'
- }
- })
-
- @istest
- def filter_endpoints_2(self):
- # when
- actual_data = utils.filter_endpoints(self.url_map, '/other',
- blacklist=['/other2'])
-
- # then
- # rules /other is skipped because its' exactly the prefix url
- # rules /other2 is skipped because it's blacklisted
- self.assertEquals(actual_data, {
- '/other/<slug>': {
- 'methods': ['GET', 'HEAD', 'POST'],
- 'endpoint': 'foo'
- },
- '/other/old/url/<int:id>': {
- 'methods': ['GET', 'HEAD'],
- 'endpoint': 'bar'
- }
- })
-
- @istest
- def prepare_data_for_view_default_encoding(self):
- self.maxDiff = None
- # given
- inputs = [
- {
- 'data': b'some blah data'
- },
- {
- 'data': 1,
- 'data_url': '/api/1/some/api/call',
- },
- {
- 'blah': 'foobar',
- 'blah_url': '/some/non/changed/api/call'
- }]
-
- # when
- actual_result = utils.prepare_data_for_view(inputs)
-
- # then
- self.assertEquals(actual_result, [
- {
- 'data': 'some blah data',
- },
- {
- 'data': 1,
- 'data_url': '/browse/some/api/call',
- },
- {
- 'blah': 'foobar',
- 'blah_url': '/some/non/changed/api/call'
- }
- ])
-
- @istest
- def prepare_data_for_view(self):
- self.maxDiff = None
- # given
- inputs = [
- {
- 'data': b'some blah data'
- },
- {
- 'data': 1,
- 'data_url': '/api/1/some/api/call',
- },
- {
- 'blah': 'foobar',
- 'blah_url': '/some/non/changed/api/call'
- }]
-
- # when
- actual_result = utils.prepare_data_for_view(inputs, encoding='ascii')
-
- # then
- self.assertEquals(actual_result, [
- {
- 'data': 'some blah data',
- },
- {
- 'data': 1,
- 'data_url': '/browse/some/api/call',
- },
- {
- 'blah': 'foobar',
- 'blah_url': '/some/non/changed/api/call'
- }
- ])
-
- @istest
- def prepare_data_for_view_ko_cannot_decode(self):
- self.maxDiff = None
- # given
- inputs = {
- 'data': 'hé dude!'.encode('utf8'),
- }
-
- actual_result = utils.prepare_data_for_view(inputs, encoding='ascii')
-
- # then
- self.assertEquals(actual_result, {
- 'data': "Cannot decode the data bytes, try and set another "
- "encoding in the url (e.g. ?encoding=utf8) or "
- "download directly the "
- "content's raw data.",
- })
-
@istest
def filter_field_keys_dict_unknown_keys(self):
# when
actual_res = utils.filter_field_keys(
{'directory': 1, 'file': 2, 'link': 3},
{'directory1', 'file2'})
# then
self.assertEqual(actual_res, {})
@istest
def filter_field_keys_dict(self):
# when
actual_res = utils.filter_field_keys(
{'directory': 1, 'file': 2, 'link': 3},
{'directory', 'link'})
# then
self.assertEqual(actual_res, {'directory': 1, 'link': 3})
@istest
def filter_field_keys_list_unknown_keys(self):
# when
actual_res = utils.filter_field_keys(
[{'directory': 1, 'file': 2, 'link': 3},
{'1': 1, '2': 2, 'link': 3}],
{'d'})
# then
self.assertEqual(actual_res, [{}, {}])
@istest
def filter_field_keys_map(self):
# when
actual_res = utils.filter_field_keys(
map(lambda x: {'i': x['i']+1, 'j': x['j']},
[{'i': 1, 'j': None},
{'i': 2, 'j': None},
{'i': 3, 'j': None}]),
{'i'})
# then
self.assertEqual(list(actual_res), [{'i': 2}, {'i': 3}, {'i': 4}])
@istest
def filter_field_keys_list(self):
# when
actual_res = utils.filter_field_keys(
[{'directory': 1, 'file': 2, 'link': 3},
{'dir': 1, 'fil': 2, 'lin': 3}],
{'directory', 'dir'})
# then
self.assertEqual(actual_res, [{'directory': 1}, {'dir': 1}])
@istest
def filter_field_keys_other(self):
# given
input_set = {1, 2}
# when
actual_res = utils.filter_field_keys(input_set, {'a', '1'})
# then
self.assertEqual(actual_res, input_set)
- @istest
- def fmap(self):
- self.assertEquals([2, 3, None, 4],
- utils.fmap(lambda x: x+1, [1, 2, None, 3]))
- self.assertEquals([11, 12, 13],
- list(utils.fmap(lambda x: x+10,
- map(lambda x: x, [1, 2, 3]))))
- self.assertEquals({'a': 2, 'b': 4},
- utils.fmap(lambda x: x*2, {'a': 1, 'b': 2}))
- self.assertEquals(100,
- utils.fmap(lambda x: x*10, 10))
- self.assertEquals({'a': [2, 6], 'b': 4},
- utils.fmap(lambda x: x*2, {'a': [1, 3], 'b': 2}))
-
- self.assertIsNone(utils.fmap(lambda x: x, None))
-
@istest
def person_to_string(self):
self.assertEqual(utils.person_to_string(dict(name='raboof',
email='foo@bar')),
'raboof <foo@bar>')
@istest
def enrich_release_0(self):
# when
actual_release = utils.enrich_release({})
# then
self.assertEqual(actual_release, {})
@patch('swh.web.api.utils.reverse')
@istest
def enrich_release_1(self, mock_django_reverse):
# given
def reverse_test_context(view_name, kwargs):
if view_name == 'content':
id = kwargs['q']
return '/api/1/content/%s/' % id
elif view_name == 'person':
id = kwargs['person_id']
return '/api/1/person/%s/' % id
else:
raise ValueError(
'This should not happened so fail if it does.')
mock_django_reverse.side_effect = reverse_test_context
# when
actual_release = utils.enrich_release({
'target': '123',
'target_type': 'content',
'author': {
'id': 100,
'name': 'author release name',
'email': 'author@email',
},
})
# then
self.assertEqual(actual_release, {
'target': '123',
'target_type': 'content',
'target_url': '/api/1/content/sha1_git:123/',
'author_url': '/api/1/person/100/',
'author': {
'id': 100,
'name': 'author release name',
'email': 'author@email',
},
})
mock_django_reverse.assert_has_calls([
call('content', kwargs={'q': 'sha1_git:123'}),
call('person', kwargs={'person_id': 100})
])
@patch('swh.web.api.utils.reverse')
@istest
def enrich_release_2(self, mock_django_reverse):
# given
mock_django_reverse.return_value = '/api/1/dir/23/'
# when
actual_release = utils.enrich_release({'target': '23',
'target_type': 'directory'})
# then
self.assertEqual(actual_release, {
'target': '23',
'target_type': 'directory',
'target_url': '/api/1/dir/23/'
})
mock_django_reverse.assert_called_once_with('directory',
kwargs={'sha1_git': '23'})
@patch('swh.web.api.utils.reverse')
@istest
def enrich_release_3(self, mock_django_reverse):
# given
mock_django_reverse.return_value = '/api/1/rev/3/'
# when
actual_release = utils.enrich_release({'target': '3',
'target_type': 'revision'})
# then
self.assertEqual(actual_release, {
'target': '3',
'target_type': 'revision',
'target_url': '/api/1/rev/3/'
})
mock_django_reverse.assert_called_once_with('revision',
kwargs={'sha1_git': '3'})
@patch('swh.web.api.utils.reverse')
@istest
def enrich_release_4(self, mock_django_reverse):
# given
mock_django_reverse.return_value = '/api/1/rev/4/'
# when
actual_release = utils.enrich_release({'target': '4',
'target_type': 'release'})
# then
self.assertEqual(actual_release, {
'target': '4',
'target_type': 'release',
'target_url': '/api/1/rev/4/'
})
mock_django_reverse.assert_called_once_with('release',
kwargs={'sha1_git': '4'})
@patch('swh.web.api.utils.reverse')
@istest
def enrich_directory_no_type(self, mock_django_reverse):
# when/then
self.assertEqual(utils.enrich_directory({'id': 'dir-id'}),
{'id': 'dir-id'})
# given
mock_django_reverse.return_value = '/api/content/sha1_git:123/'
# when
actual_directory = utils.enrich_directory({
'id': 'dir-id',
'type': 'file',
'target': '123',
})
# then
self.assertEqual(actual_directory, {
'id': 'dir-id',
'type': 'file',
'target': '123',
'target_url': '/api/content/sha1_git:123/',
})
mock_django_reverse.assert_called_once_with(
'content', kwargs={'q': 'sha1_git:123'})
@patch('swh.web.api.utils.reverse')
@istest
def enrich_directory_with_context_and_type_file(self, mock_django_reverse):
# given
mock_django_reverse.return_value = '/api/content/sha1_git:123/'
# when
actual_directory = utils.enrich_directory({
'id': 'dir-id',
'type': 'file',
'name': 'hy',
'target': '789',
}, context_url='/api/revision/revsha1/directory/prefix/path/')
# then
self.assertEqual(actual_directory, {
'id': 'dir-id',
'type': 'file',
'name': 'hy',
'target': '789',
'target_url': '/api/content/sha1_git:123/',
'file_url': '/api/revision/revsha1/directory'
'/prefix/path/hy/'
})
mock_django_reverse.assert_called_once_with(
'content', kwargs={'q': 'sha1_git:789'})
@patch('swh.web.api.utils.reverse')
@istest
def enrich_directory_with_context_and_type_dir(self, mock_django_reverse):
# given
mock_django_reverse.return_value = '/api/directory/456/'
# when
actual_directory = utils.enrich_directory({
'id': 'dir-id',
'type': 'dir',
'name': 'emacs-42',
'target_type': 'file',
'target': '456',
}, context_url='/api/revision/origin/2/directory/some/prefix/path/')
# then
self.assertEqual(actual_directory, {
'id': 'dir-id',
'type': 'dir',
'target_type': 'file',
'name': 'emacs-42',
'target': '456',
'target_url': '/api/directory/456/',
'dir_url': '/api/revision/origin/2/directory'
'/some/prefix/path/emacs-42/'
})
mock_django_reverse.assert_called_once_with('directory',
kwargs={'sha1_git': '456'})
@istest
def enrich_content_without_hashes(self):
# when/then
self.assertEqual(utils.enrich_content({'id': '123'}),
{'id': '123'})
@patch('swh.web.api.utils.reverse')
@istest
def enrich_content_with_hashes(self, mock_django_reverse):
for algo, hash in self.sample_content_hashes.items():
query_string = '%s:%s' % (algo, hash)
# given
mock_django_reverse.side_effect = [
'/api/content/%s/raw/' % query_string,
'/api/filetype/%s/' % query_string,
'/api/language/%s/' % query_string,
'/api/license/%s/' % query_string
]
# when
enriched_content = utils.enrich_content(
{
algo: hash,
},
query_string=query_string
)
# then
self.assertEqual(
enriched_content,
{
algo: hash,
'data_url': '/api/content/%s/raw/' % query_string,
'filetype_url': '/api/filetype/%s/' % query_string,
'language_url': '/api/language/%s/' % query_string,
'license_url': '/api/license/%s/' % query_string,
}
)
mock_django_reverse.assert_has_calls([
call('content-raw', kwargs={'q': query_string}),
call('content-filetype', kwargs={'q': query_string}),
call('content-language', kwargs={'q': query_string}),
call('content-license', kwargs={'q': query_string}),
])
mock_django_reverse.reset()
@patch('swh.web.api.utils.reverse')
@istest
def enrich_content_with_hashes_and_top_level_url(self,
mock_django_reverse):
for algo, hash in self.sample_content_hashes.items():
query_string = '%s:%s' % (algo, hash)
# given
mock_django_reverse.side_effect = [
'/api/content/%s/' % query_string,
'/api/content/%s/raw/' % query_string,
'/api/filetype/%s/' % query_string,
'/api/language/%s/' % query_string,
'/api/license/%s/' % query_string,
]
# when
enriched_content = utils.enrich_content(
{
algo: hash
},
top_url=True,
query_string=query_string
)
# then
self.assertEqual(
enriched_content,
{
algo: hash,
'content_url': '/api/content/%s/' % query_string,
'data_url': '/api/content/%s/raw/' % query_string,
'filetype_url': '/api/filetype/%s/' % query_string,
'language_url': '/api/language/%s/' % query_string,
'license_url': '/api/license/%s/' % query_string,
}
)
mock_django_reverse.assert_has_calls([
call('content', kwargs={'q': query_string}),
call('content-raw', kwargs={'q': query_string}),
call('content-filetype', kwargs={'q': query_string}),
call('content-language', kwargs={'q': query_string}),
call('content-license', kwargs={'q': query_string}),
])
mock_django_reverse.reset()
@istest
def enrich_entity_identity(self):
# when/then
self.assertEqual(utils.enrich_content({'id': '123'}),
{'id': '123'})
@patch('swh.web.api.utils.reverse')
@istest
def enrich_entity_with_sha1(self, mock_django_reverse):
# given
def reverse_test(view_name, kwargs):
return '/api/entity/' + kwargs['uuid'] + '/'
mock_django_reverse.side_effect = reverse_test
# when
actual_entity = utils.enrich_entity({
'uuid': 'uuid-1',
'parent': 'uuid-parent',
'name': 'something'
})
# then
self.assertEqual(actual_entity, {
'uuid': 'uuid-1',
'uuid_url': '/api/entity/uuid-1/',
'parent': 'uuid-parent',
'parent_url': '/api/entity/uuid-parent/',
'name': 'something',
})
mock_django_reverse.assert_has_calls(
[call('entity', kwargs={'uuid': 'uuid-1'}),
call('entity', kwargs={'uuid': 'uuid-parent'})])
@nottest
def _reverse_context_test(self, view_name, kwargs):
if view_name == 'revision':
return '/api/revision/%s/' % kwargs['sha1_git']
elif view_name == 'revision-context':
return '/api/revision/%s/prev/%s/' % (kwargs['sha1_git'], kwargs['context']) # noqa
elif view_name == 'revision-log':
if 'prev_sha1s' in kwargs:
return '/api/revision/%s/prev/%s/log/' % (kwargs['sha1_git'], kwargs['prev_sha1s']) # noqa
else:
return '/api/revision/%s/log/' % kwargs['sha1_git']
@patch('swh.web.api.utils.reverse')
@istest
def enrich_revision_without_children_or_parent(self, mock_django_reverse):
# given
def reverse_test(view_name, kwargs):
if view_name == 'revision':
return '/api/revision/' + kwargs['sha1_git'] + '/'
elif view_name == 'revision-log':
return '/api/revision/' + kwargs['sha1_git'] + '/log/'
elif view_name == 'directory':
return '/api/directory/' + kwargs['sha1_git'] + '/'
elif view_name == 'person':
return '/api/person/' + kwargs['person_id'] + '/'
mock_django_reverse.side_effect = reverse_test
# when
actual_revision = utils.enrich_revision({
'id': 'rev-id',
'directory': '123',
'author': {'id': '1'},
'committer': {'id': '2'},
})
expected_revision = {
'id': 'rev-id',
'directory': '123',
'url': '/api/revision/rev-id/',
'history_url': '/api/revision/rev-id/log/',
'directory_url': '/api/directory/123/',
'author': {'id': '1'},
'author_url': '/api/person/1/',
'committer': {'id': '2'},
'committer_url': '/api/person/2/'
}
# then
self.assertEqual(actual_revision, expected_revision)
mock_django_reverse.assert_has_calls(
[call('revision', kwargs={'sha1_git': 'rev-id'}),
call('revision-log', kwargs={'sha1_git': 'rev-id'}),
call('person', kwargs={'person_id': '1'}),
call('person', kwargs={'person_id': '2'}),
call('directory', kwargs={'sha1_git': '123'})])
@patch('swh.web.api.utils.reverse')
@istest
def enrich_revision_with_children_and_parent_no_dir(self,
mock_django_reverse):
# given
mock_django_reverse.side_effect = self._reverse_context_test
# when
actual_revision = utils.enrich_revision({
'id': 'rev-id',
'parents': ['123'],
'children': ['456'],
}, context='prev-rev')
expected_revision = {
'id': 'rev-id',
'url': '/api/revision/rev-id/',
'history_url': '/api/revision/rev-id/log/',
'history_context_url': '/api/revision/rev-id/prev/prev-rev/log/',
'parents': [{'id': '123', 'url': '/api/revision/123/'}],
'children': ['456'],
'children_urls': ['/api/revision/456/',
'/api/revision/prev-rev/'],
}
# then
self.assertEqual(actual_revision, expected_revision)
mock_django_reverse.assert_has_calls(
[call('revision', kwargs={'sha1_git': 'prev-rev'}),
call('revision', kwargs={'sha1_git': 'rev-id'}),
call('revision-log', kwargs={'sha1_git': 'rev-id'}),
call('revision-log', kwargs={'sha1_git': 'rev-id',
'prev_sha1s': 'prev-rev'}),
call('revision', kwargs={'sha1_git': '123'}),
call('revision', kwargs={'sha1_git': '456'})])
@patch('swh.web.api.utils.reverse')
@istest
def enrich_revision_no_context(self, mock_django_reverse):
# given
mock_django_reverse.side_effect = self._reverse_context_test
# when
actual_revision = utils.enrich_revision({
'id': 'rev-id',
'parents': ['123'],
'children': ['456'],
})
expected_revision = {
'id': 'rev-id',
'url': '/api/revision/rev-id/',
'history_url': '/api/revision/rev-id/log/',
'parents': [{'id': '123', 'url': '/api/revision/123/'}],
'children': ['456'],
'children_urls': ['/api/revision/456/']
}
# then
self.assertEqual(actual_revision, expected_revision)
mock_django_reverse.assert_has_calls(
[call('revision', kwargs={'sha1_git': 'rev-id'}),
call('revision-log', kwargs={'sha1_git': 'rev-id'}),
call('revision', kwargs={'sha1_git': '123'}),
call('revision', kwargs={'sha1_git': '456'})])
@patch('swh.web.api.utils.reverse')
@istest
def enrich_revision_context_empty_prev_list(self, mock_django_reverse):
# given
mock_django_reverse.side_effect = self._reverse_context_test
# when
expected_revision = {
'id': 'rev-id',
'url': '/api/revision/rev-id/',
'history_url': '/api/revision/rev-id/log/',
'history_context_url': ('/api/revision/rev-id/'
'prev/prev-rev/log/'),
'parents': [{'id': '123', 'url': '/api/revision/123/'}],
'children': ['456'],
'children_urls': ['/api/revision/456/',
'/api/revision/prev-rev/'],
}
actual_revision = utils.enrich_revision({
'id': 'rev-id',
'url': '/api/revision/rev-id/',
'parents': ['123'],
'children': ['456']}, context='prev-rev')
# then
self.assertEqual(actual_revision, expected_revision)
mock_django_reverse.assert_has_calls(
[call('revision', kwargs={'sha1_git': 'prev-rev'}),
call('revision', kwargs={'sha1_git': 'rev-id'}),
call('revision-log', kwargs={'sha1_git': 'rev-id'}),
call('revision-log', kwargs={'sha1_git': 'rev-id',
'prev_sha1s': 'prev-rev'}),
call('revision', kwargs={'sha1_git': '123'}),
call('revision', kwargs={'sha1_git': '456'})])
@patch('swh.web.api.utils.reverse')
@istest
def enrich_revision_context_some_prev_list(self, mock_django_reverse):
# given
mock_django_reverse.side_effect = self._reverse_context_test
# when
expected_revision = {
'id': 'rev-id',
'url': '/api/revision/rev-id/',
'history_url': '/api/revision/rev-id/log/',
'history_context_url': ('/api/revision/rev-id/'
'prev/prev1-rev/prev0-rev/log/'),
'parents': [{'id': '123', 'url': '/api/revision/123/'}],
'children': ['456'],
'children_urls': ['/api/revision/456/',
'/api/revision/prev0-rev/prev/prev1-rev/'],
}
actual_revision = utils.enrich_revision({
'id': 'rev-id',
'parents': ['123'],
'children': ['456']}, context='prev1-rev/prev0-rev')
# then
self.assertEqual(actual_revision, expected_revision)
mock_django_reverse.assert_has_calls(
[call('revision-context', kwargs={'context': 'prev1-rev',
'sha1_git': 'prev0-rev'}),
call('revision', kwargs={'sha1_git': 'rev-id'}),
call('revision-log', kwargs={'sha1_git': 'rev-id'}),
call('revision-log', kwargs={'prev_sha1s': 'prev1-rev/prev0-rev',
'sha1_git': 'rev-id'}),
call('revision', kwargs={'sha1_git': '123'}),
call('revision', kwargs={'sha1_git': '456'})])
@nottest
def _reverse_rev_message_test(self, view_name, kwargs):
if view_name == 'revision':
return '/api/revision/%s/' % kwargs['sha1_git']
elif view_name == 'revision-log':
if 'prev_sha1s' in kwargs and kwargs['prev_sha1s'] is not None:
return '/api/revision/%s/prev/%s/log/' % (kwargs['sha1_git'], kwargs['prev_sha1s']) # noqa
else:
return '/api/revision/%s/log/' % kwargs['sha1_git']
elif view_name == 'revision-raw-message':
return '/api/revision/' + kwargs['sha1_git'] + '/raw/'
else:
return '/api/revision/%s/prev/%s/' % (kwargs['sha1_git'], kwargs['context']) # noqa
@patch('swh.web.api.utils.reverse')
@istest
def enrich_revision_with_no_message(self, mock_django_reverse):
# given
mock_django_reverse.side_effect = self._reverse_rev_message_test
# when
expected_revision = {
'id': 'rev-id',
'url': '/api/revision/rev-id/',
'history_url': '/api/revision/rev-id/log/',
'history_context_url': ('/api/revision/rev-id/'
'prev/prev-rev/log/'),
'message': None,
'parents': [{'id': '123', 'url': '/api/revision/123/'}],
'children': ['456'],
'children_urls': ['/api/revision/456/',
'/api/revision/prev-rev/'],
}
actual_revision = utils.enrich_revision({
'id': 'rev-id',
'message': None,
'parents': ['123'],
'children': ['456'],
}, context='prev-rev')
# then
self.assertEqual(actual_revision, expected_revision)
mock_django_reverse.assert_has_calls(
[call('revision', kwargs={'sha1_git': 'prev-rev'}),
call('revision', kwargs={'sha1_git': 'rev-id'}),
call('revision-log', kwargs={'sha1_git': 'rev-id'}),
call('revision-log', kwargs={'sha1_git': 'rev-id',
'prev_sha1s': 'prev-rev'}),
call('revision', kwargs={'sha1_git': '123'}),
call('revision', kwargs={'sha1_git': '456'})]
)
@patch('swh.web.api.utils.reverse')
@istest
def enrich_revision_with_invalid_message(self, mock_django_reverse):
# given
mock_django_reverse.side_effect = self._reverse_rev_message_test
# when
actual_revision = utils.enrich_revision({
'id': 'rev-id',
'message': None,
'message_decoding_failed': True,
'parents': ['123'],
'children': ['456'],
}, context='prev-rev')
expected_revision = {
'id': 'rev-id',
'url': '/api/revision/rev-id/',
'history_url': '/api/revision/rev-id/log/',
'history_context_url': ('/api/revision/rev-id/'
'prev/prev-rev/log/'),
'message': None,
'message_decoding_failed': True,
'message_url': '/api/revision/rev-id/raw/',
'parents': [{'id': '123', 'url': '/api/revision/123/'}],
'children': ['456'],
'children_urls': ['/api/revision/456/',
'/api/revision/prev-rev/'],
}
# then
self.assertEqual(actual_revision, expected_revision)
mock_django_reverse.assert_has_calls(
[call('revision', kwargs={'sha1_git': 'prev-rev'}),
call('revision', kwargs={'sha1_git': 'rev-id'}),
call('revision-log', kwargs={'sha1_git': 'rev-id'}),
call('revision-log', kwargs={'sha1_git': 'rev-id',
'prev_sha1s': 'prev-rev'}),
call('revision', kwargs={'sha1_git': '123'}),
call('revision', kwargs={'sha1_git': '456'}),
call('revision-raw-message', kwargs={'sha1_git': 'rev-id'})])
diff --git a/swh/web/tests/common/test_converters.py b/swh/web/tests/common/test_converters.py
index f33e5102..cc4afdd3 100644
--- a/swh/web/tests/common/test_converters.py
+++ b/swh/web/tests/common/test_converters.py
@@ -1,791 +1,808 @@
# Copyright (C) 2015-2018 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU Affero General Public License version 3, or any later version
# See top-level LICENSE file for more information
import datetime
import unittest
from nose.tools import istest
from swh.model import hashutil
from swh.web.common import converters
class ConvertersTestCase(unittest.TestCase):
+
+ @istest
+ def fmap(self):
+ self.assertEquals([2, 3, None, 4],
+ converters.fmap(lambda x: x+1, [1, 2, None, 3]))
+ self.assertEquals([11, 12, 13],
+ list(converters.fmap(lambda x: x+10,
+ map(lambda x: x, [1, 2, 3]))))
+ self.assertEquals({'a': 2, 'b': 4},
+ converters.fmap(lambda x: x*2, {'a': 1, 'b': 2}))
+ self.assertEquals(100,
+ converters.fmap(lambda x: x*10, 10))
+ self.assertEquals({'a': [2, 6], 'b': 4},
+ converters.fmap(lambda x: x*2, {'a': [1, 3], 'b': 2})) # noqa
+
+ self.assertIsNone(converters.fmap(lambda x: x, None))
+
@istest
def from_swh(self):
some_input = {
'a': 'something',
'b': 'someone',
'c': b'sharp-0.3.4.tgz',
'd': hashutil.hash_to_bytes(
'b04caf10e9535160d90e874b45aa426de762f19f'),
'e': b'sharp.html/doc_002dS_005fISREG.html',
'g': [b'utf-8-to-decode', b'another-one'],
'h': 'something filtered',
'i': {'e': b'something'},
'j': {
'k': {
'l': [b'bytes thing', b'another thingy', b''],
'n': 'dont care either'
},
'm': 'dont care'
},
'o': 'something',
'p': b'foo',
'q': {'extra-headers': [['a', b'intact']]},
'w': None,
'r': {'p': 'also intact',
'q': 'bar'},
's': {
'timestamp': 42,
'offset': -420,
'negative_utc': None,
},
's1': {
'timestamp': {'seconds': 42, 'microseconds': 0},
'offset': -420,
'negative_utc': None,
},
's2': datetime.datetime(
2013, 7, 1, 20, 0, 0,
tzinfo=datetime.timezone.utc),
't': None,
'u': None,
'v': None,
'x': None,
}
expected_output = {
'a': 'something',
'b': 'someone',
'c': 'sharp-0.3.4.tgz',
'd': 'b04caf10e9535160d90e874b45aa426de762f19f',
'e': 'sharp.html/doc_002dS_005fISREG.html',
'g': ['utf-8-to-decode', 'another-one'],
'i': {'e': 'something'},
'j': {
'k': {
'l': ['bytes thing', 'another thingy', '']
}
},
'p': 'foo',
'q': {'extra-headers': [['a', 'intact']]},
'w': {},
'r': {'p': 'also intact',
'q': 'bar'},
's': '1969-12-31T17:00:42-07:00',
's1': '1969-12-31T17:00:42-07:00',
's2': '2013-07-01T20:00:00+00:00',
'u': {},
'v': [],
'x': None,
}
actual_output = converters.from_swh(
some_input,
hashess={'d', 'o', 'x'},
bytess={'c', 'e', 'g', 'l'},
dates={'s', 's1', 's2'},
blacklist={'h', 'm', 'n', 'o'},
removables_if_empty={'t'},
empty_dict={'u'},
empty_list={'v'},
convert={'p', 'q', 'w'},
convert_fn=converters.convert_revision_metadata)
self.assertEquals(expected_output, actual_output)
@istest
def from_swh_edge_cases_do_no_conversion_if_none_or_not_bytes(self):
some_input = {
'a': 'something',
'b': None,
'c': 'someone',
'd': None,
'e': None
}
expected_output = {
'a': 'something',
'b': None,
'c': 'someone',
'd': None,
'e': None
}
actual_output = converters.from_swh(some_input,
hashess={'a', 'b'},
bytess={'c', 'd'},
dates={'e'})
self.assertEquals(expected_output, actual_output)
@istest
def from_swh_edge_cases_convert_invalid_utf8_bytes(self):
some_input = {
'a': 'something',
'b': 'someone',
'c': b'a name \xff',
'd': b'an email \xff',
}
expected_output = {
'a': 'something',
'b': 'someone',
'c': 'a name \\xff',
'd': 'an email \\xff',
'decoding_failures': ['c', 'd']
}
actual_output = converters.from_swh(some_input,
hashess={'a', 'b'},
bytess={'c', 'd'})
for v in ['a', 'b', 'c', 'd']:
self.assertEqual(expected_output[v], actual_output[v])
self.assertEqual(len(expected_output['decoding_failures']),
len(actual_output['decoding_failures']))
for v in expected_output['decoding_failures']:
self.assertTrue(v in actual_output['decoding_failures'])
@istest
def from_swh_empty(self):
# when
self.assertEquals({}, converters.from_swh({}))
@istest
def from_swh_none(self):
# when
self.assertIsNone(converters.from_swh(None))
@istest
def from_provenance(self):
# given
input_provenance = {
'origin': 10,
'visit': 1,
'content': hashutil.hash_to_bytes(
'321caf10e9535160d90e874b45aa426de762f19f'),
'revision': hashutil.hash_to_bytes(
'123caf10e9535160d90e874b45aa426de762f19f'),
'path': b'octave-3.4.0/doc/interpreter/octave/doc_002dS_005fISREG'
}
expected_provenance = {
'origin': 10,
'visit': 1,
'content': '321caf10e9535160d90e874b45aa426de762f19f',
'revision': '123caf10e9535160d90e874b45aa426de762f19f',
'path': 'octave-3.4.0/doc/interpreter/octave/doc_002dS_005fISREG'
}
# when
actual_provenance = converters.from_provenance(input_provenance)
# then
self.assertEqual(actual_provenance, expected_provenance)
@istest
def from_origin(self):
# given
origin_input = {
'id': 9,
'type': 'ftp',
'url': 'rsync://ftp.gnu.org/gnu/octave',
'project': None,
'lister': None,
}
expected_origin = {
'id': 9,
'type': 'ftp',
'url': 'rsync://ftp.gnu.org/gnu/octave',
}
# when
actual_origin = converters.from_origin(origin_input)
# then
self.assertEqual(actual_origin, expected_origin)
@istest
def from_origin_visit(self):
snap_hash = 'b5f0b7f716735ebffe38505c60145c4fd9da6ca3'
for snap in [snap_hash, None]:
# given
visit = {
'date': {
'timestamp': datetime.datetime(
2015, 1, 1, 22, 0, 0,
tzinfo=datetime.timezone.utc).timestamp(),
'offset': 0,
'negative_utc': False,
},
'origin': 10,
'visit': 100,
'metadata': None,
'status': 'full',
'snapshot': hashutil.hash_to_bytes(snap) if snap else snap,
}
expected_visit = {
'date': '2015-01-01T22:00:00+00:00',
'origin': 10,
'visit': 100,
'metadata': {},
'status': 'full',
'snapshot': snap_hash if snap else snap
}
# when
actual_visit = converters.from_origin_visit(visit)
# then
self.assertEqual(actual_visit, expected_visit)
@istest
def from_release(self):
release_input = {
'id': hashutil.hash_to_bytes(
'aad23fa492a0c5fed0708a6703be875448c86884'),
'target': hashutil.hash_to_bytes(
'5e46d564378afc44b31bb89f99d5675195fbdf67'),
'target_type': 'revision',
'date': {
'timestamp': datetime.datetime(
2015, 1, 1, 22, 0, 0,
tzinfo=datetime.timezone.utc).timestamp(),
'offset': 0,
'negative_utc': False,
},
'author': {
'name': b'author name',
'fullname': b'Author Name author@email',
'email': b'author@email',
},
'name': b'v0.0.1',
'message': b'some comment on release',
'synthetic': True,
}
expected_release = {
'id': 'aad23fa492a0c5fed0708a6703be875448c86884',
'target': '5e46d564378afc44b31bb89f99d5675195fbdf67',
'target_type': 'revision',
'date': '2015-01-01T22:00:00+00:00',
'author': {
'name': 'author name',
'fullname': 'Author Name author@email',
'email': 'author@email',
},
'name': 'v0.0.1',
'message': 'some comment on release',
'target_type': 'revision',
'synthetic': True,
}
# when
actual_release = converters.from_release(release_input)
# then
self.assertEqual(actual_release, expected_release)
@istest
def from_release_no_revision(self):
release_input = {
'id': hashutil.hash_to_bytes(
'b2171ee2bdf119cd99a7ec7eff32fa8013ef9a4e'),
'target': None,
'date': {
'timestamp': datetime.datetime(
2016, 3, 2, 10, 0, 0,
tzinfo=datetime.timezone.utc).timestamp(),
'offset': 0,
'negative_utc': True,
},
'name': b'v0.1.1',
'message': b'comment on release',
'synthetic': False,
'author': {
'name': b'bob',
'fullname': b'Bob bob@alice.net',
'email': b'bob@alice.net',
},
}
expected_release = {
'id': 'b2171ee2bdf119cd99a7ec7eff32fa8013ef9a4e',
'target': None,
'date': '2016-03-02T10:00:00-00:00',
'name': 'v0.1.1',
'message': 'comment on release',
'synthetic': False,
'author': {
'name': 'bob',
'fullname': 'Bob bob@alice.net',
'email': 'bob@alice.net',
},
}
# when
actual_release = converters.from_release(release_input)
# then
self.assertEqual(actual_release, expected_release)
@istest
def from_revision(self):
revision_input = {
'id': hashutil.hash_to_bytes(
'18d8be353ed3480476f032475e7c233eff7371d5'),
'directory': hashutil.hash_to_bytes(
'7834ef7e7c357ce2af928115c6c6a42b7e2a44e6'),
'author': {
'name': b'Software Heritage',
'fullname': b'robot robot@softwareheritage.org',
'email': b'robot@softwareheritage.org',
},
'committer': {
'name': b'Software Heritage',
'fullname': b'robot robot@softwareheritage.org',
'email': b'robot@softwareheritage.org',
},
'message': b'synthetic revision message',
'date': {
'timestamp': datetime.datetime(
2000, 1, 17, 11, 23, 54,
tzinfo=datetime.timezone.utc).timestamp(),
'offset': 0,
'negative_utc': False,
},
'committer_date': {
'timestamp': datetime.datetime(
2000, 1, 17, 11, 23, 54,
tzinfo=datetime.timezone.utc).timestamp(),
'offset': 0,
'negative_utc': False,
},
'synthetic': True,
'type': 'tar',
'parents': [
hashutil.hash_to_bytes(
'29d8be353ed3480476f032475e7c244eff7371d5'),
hashutil.hash_to_bytes(
'30d8be353ed3480476f032475e7c244eff7371d5')
],
'children': [
hashutil.hash_to_bytes(
'123546353ed3480476f032475e7c244eff7371d5'),
],
'metadata': {
'extra_headers': [['gpgsig', b'some-signature']],
'original_artifact': [{
'archive_type': 'tar',
'name': 'webbase-5.7.0.tar.gz',
'sha1': '147f73f369733d088b7a6fa9c4e0273dcd3c7ccd',
'sha1_git': '6a15ea8b881069adedf11feceec35588f2cfe8f1',
'sha256': '401d0df797110bea805d358b85bcc1ced29549d3d73f'
'309d36484e7edf7bb912',
}]
},
}
expected_revision = {
'id': '18d8be353ed3480476f032475e7c233eff7371d5',
'directory': '7834ef7e7c357ce2af928115c6c6a42b7e2a44e6',
'author': {
'name': 'Software Heritage',
'fullname': 'robot robot@softwareheritage.org',
'email': 'robot@softwareheritage.org',
},
'committer': {
'name': 'Software Heritage',
'fullname': 'robot robot@softwareheritage.org',
'email': 'robot@softwareheritage.org',
},
'message': 'synthetic revision message',
'date': "2000-01-17T11:23:54+00:00",
'committer_date': "2000-01-17T11:23:54+00:00",
'children': [
'123546353ed3480476f032475e7c244eff7371d5'
],
'parents': [
'29d8be353ed3480476f032475e7c244eff7371d5',
'30d8be353ed3480476f032475e7c244eff7371d5'
],
'type': 'tar',
'synthetic': True,
'metadata': {
'extra_headers': [['gpgsig', 'some-signature']],
'original_artifact': [{
'archive_type': 'tar',
'name': 'webbase-5.7.0.tar.gz',
'sha1': '147f73f369733d088b7a6fa9c4e0273dcd3c7ccd',
'sha1_git': '6a15ea8b881069adedf11feceec35588f2cfe8f1',
'sha256': '401d0df797110bea805d358b85bcc1ced29549d3d73f'
'309d36484e7edf7bb912'
}]
},
'merge': True
}
# when
actual_revision = converters.from_revision(revision_input)
# then
self.assertEqual(actual_revision, expected_revision)
@istest
def from_revision_nomerge(self):
revision_input = {
'id': hashutil.hash_to_bytes(
'18d8be353ed3480476f032475e7c233eff7371d5'),
'parents': [
hashutil.hash_to_bytes(
'29d8be353ed3480476f032475e7c244eff7371d5')
]
}
expected_revision = {
'id': '18d8be353ed3480476f032475e7c233eff7371d5',
'parents': [
'29d8be353ed3480476f032475e7c244eff7371d5'
],
'merge': False
}
# when
actual_revision = converters.from_revision(revision_input)
# then
self.assertEqual(actual_revision, expected_revision)
@istest
def from_revision_noparents(self):
revision_input = {
'id': hashutil.hash_to_bytes(
'18d8be353ed3480476f032475e7c233eff7371d5'),
'directory': hashutil.hash_to_bytes(
'7834ef7e7c357ce2af928115c6c6a42b7e2a44e6'),
'author': {
'name': b'Software Heritage',
'fullname': b'robot robot@softwareheritage.org',
'email': b'robot@softwareheritage.org',
},
'committer': {
'name': b'Software Heritage',
'fullname': b'robot robot@softwareheritage.org',
'email': b'robot@softwareheritage.org',
},
'message': b'synthetic revision message',
'date': {
'timestamp': datetime.datetime(
2000, 1, 17, 11, 23, 54,
tzinfo=datetime.timezone.utc).timestamp(),
'offset': 0,
'negative_utc': False,
},
'committer_date': {
'timestamp': datetime.datetime(
2000, 1, 17, 11, 23, 54,
tzinfo=datetime.timezone.utc).timestamp(),
'offset': 0,
'negative_utc': False,
},
'synthetic': True,
'type': 'tar',
'children': [
hashutil.hash_to_bytes(
'123546353ed3480476f032475e7c244eff7371d5'),
],
'metadata': {
'original_artifact': [{
'archive_type': 'tar',
'name': 'webbase-5.7.0.tar.gz',
'sha1': '147f73f369733d088b7a6fa9c4e0273dcd3c7ccd',
'sha1_git': '6a15ea8b881069adedf11feceec35588f2cfe8f1',
'sha256': '401d0df797110bea805d358b85bcc1ced29549d3d73f'
'309d36484e7edf7bb912',
}]
},
}
expected_revision = {
'id': '18d8be353ed3480476f032475e7c233eff7371d5',
'directory': '7834ef7e7c357ce2af928115c6c6a42b7e2a44e6',
'author': {
'name': 'Software Heritage',
'fullname': 'robot robot@softwareheritage.org',
'email': 'robot@softwareheritage.org',
},
'committer': {
'name': 'Software Heritage',
'fullname': 'robot robot@softwareheritage.org',
'email': 'robot@softwareheritage.org',
},
'message': 'synthetic revision message',
'date': "2000-01-17T11:23:54+00:00",
'committer_date': "2000-01-17T11:23:54+00:00",
'children': [
'123546353ed3480476f032475e7c244eff7371d5'
],
'type': 'tar',
'synthetic': True,
'metadata': {
'original_artifact': [{
'archive_type': 'tar',
'name': 'webbase-5.7.0.tar.gz',
'sha1': '147f73f369733d088b7a6fa9c4e0273dcd3c7ccd',
'sha1_git': '6a15ea8b881069adedf11feceec35588f2cfe8f1',
'sha256': '401d0df797110bea805d358b85bcc1ced29549d3d73f'
'309d36484e7edf7bb912'
}]
}
}
# when
actual_revision = converters.from_revision(revision_input)
# then
self.assertEqual(actual_revision, expected_revision)
@istest
def from_revision_invalid(self):
revision_input = {
'id': hashutil.hash_to_bytes(
'18d8be353ed3480476f032475e7c233eff7371d5'),
'directory': hashutil.hash_to_bytes(
'7834ef7e7c357ce2af928115c6c6a42b7e2a44e6'),
'author': {
'name': b'Software Heritage',
'fullname': b'robot robot@softwareheritage.org',
'email': b'robot@softwareheritage.org',
},
'committer': {
'name': b'Software Heritage',
'fullname': b'robot robot@softwareheritage.org',
'email': b'robot@softwareheritage.org',
},
'message': b'invalid message \xff',
'date': {
'timestamp': datetime.datetime(
2000, 1, 17, 11, 23, 54,
tzinfo=datetime.timezone.utc).timestamp(),
'offset': 0,
'negative_utc': False,
},
'committer_date': {
'timestamp': datetime.datetime(
2000, 1, 17, 11, 23, 54,
tzinfo=datetime.timezone.utc).timestamp(),
'offset': 0,
'negative_utc': False,
},
'synthetic': True,
'type': 'tar',
'parents': [
hashutil.hash_to_bytes(
'29d8be353ed3480476f032475e7c244eff7371d5'),
hashutil.hash_to_bytes(
'30d8be353ed3480476f032475e7c244eff7371d5')
],
'children': [
hashutil.hash_to_bytes(
'123546353ed3480476f032475e7c244eff7371d5'),
],
'metadata': {
'original_artifact': [{
'archive_type': 'tar',
'name': 'webbase-5.7.0.tar.gz',
'sha1': '147f73f369733d088b7a6fa9c4e0273dcd3c7ccd',
'sha1_git': '6a15ea8b881069adedf11feceec35588f2cfe8f1',
'sha256': '401d0df797110bea805d358b85bcc1ced29549d3d73f'
'309d36484e7edf7bb912',
}]
},
}
expected_revision = {
'id': '18d8be353ed3480476f032475e7c233eff7371d5',
'directory': '7834ef7e7c357ce2af928115c6c6a42b7e2a44e6',
'author': {
'name': 'Software Heritage',
'fullname': 'robot robot@softwareheritage.org',
'email': 'robot@softwareheritage.org',
},
'committer': {
'name': 'Software Heritage',
'fullname': 'robot robot@softwareheritage.org',
'email': 'robot@softwareheritage.org',
},
'message': None,
'message_decoding_failed': True,
'date': "2000-01-17T11:23:54+00:00",
'committer_date': "2000-01-17T11:23:54+00:00",
'children': [
'123546353ed3480476f032475e7c244eff7371d5'
],
'parents': [
'29d8be353ed3480476f032475e7c244eff7371d5',
'30d8be353ed3480476f032475e7c244eff7371d5'
],
'type': 'tar',
'synthetic': True,
'metadata': {
'original_artifact': [{
'archive_type': 'tar',
'name': 'webbase-5.7.0.tar.gz',
'sha1': '147f73f369733d088b7a6fa9c4e0273dcd3c7ccd',
'sha1_git': '6a15ea8b881069adedf11feceec35588f2cfe8f1',
'sha256': '401d0df797110bea805d358b85bcc1ced29549d3d73f'
'309d36484e7edf7bb912'
}]
},
'merge': True
}
# when
actual_revision = converters.from_revision(revision_input)
# then
self.assertEqual(actual_revision, expected_revision)
@istest
def from_content_none(self):
self.assertIsNone(converters.from_content(None))
@istest
def from_content(self):
content_input = {
'sha1': hashutil.hash_to_bytes(
'5c6f0e2750f48fa0bd0c4cf5976ba0b9e02ebda5'),
'sha256': hashutil.hash_to_bytes(
'39007420ca5de7cb3cfc15196335507e'
'e76c98930e7e0afa4d2747d3bf96c926'),
'blake2s256': hashutil.hash_to_bytes(
'49007420ca5de7cb3cfc15196335507e'
'e76c98930e7e0afa4d2747d3bf96c926'),
'sha1_git': hashutil.hash_to_bytes(
'40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03'),
'ctime': 'something-which-is-filtered-out',
'data': b'data in bytes',
'length': 10,
'status': 'hidden',
}
# 'status' is filtered
expected_content = {
'checksums': {
'sha1': '5c6f0e2750f48fa0bd0c4cf5976ba0b9e02ebda5',
'sha256': '39007420ca5de7cb3cfc15196335507ee76c98'
'930e7e0afa4d2747d3bf96c926',
'blake2s256': '49007420ca5de7cb3cfc15196335507ee7'
'6c98930e7e0afa4d2747d3bf96c926',
'sha1_git': '40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03',
},
'data': b'data in bytes',
'length': 10,
'status': 'absent',
}
# when
actual_content = converters.from_content(content_input)
# then
self.assertEqual(actual_content, expected_content)
@istest
def from_person(self):
person_input = {
'id': 10,
'anything': 'else',
'name': b'bob',
'fullname': b'bob bob@alice.net',
'email': b'bob@foo.alice',
}
expected_person = {
'id': 10,
'anything': 'else',
'name': 'bob',
'fullname': 'bob bob@alice.net',
'email': 'bob@foo.alice',
}
# when
actual_person = converters.from_person(person_input)
# then
self.assertEqual(actual_person, expected_person)
@istest
def from_directory_entries(self):
dir_entries_input = {
'sha1': hashutil.hash_to_bytes(
'5c6f0e2750f48fa0bd0c4cf5976ba0b9e02ebda5'),
'sha256': hashutil.hash_to_bytes(
'39007420ca5de7cb3cfc15196335507e'
'e76c98930e7e0afa4d2747d3bf96c926'),
'sha1_git': hashutil.hash_to_bytes(
'40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03'),
'blake2s256': hashutil.hash_to_bytes(
'685395c5dc57cada459364f0946d3dd45bad5fcbab'
'c1048edb44380f1d31d0aa'),
'target': hashutil.hash_to_bytes(
'40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03'),
'dir_id': hashutil.hash_to_bytes(
'40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03'),
'name': b'bob',
'type': 10,
'status': 'hidden',
}
expected_dir_entries = {
'checksums': {
'sha1': '5c6f0e2750f48fa0bd0c4cf5976ba0b9e02ebda5',
'sha256': '39007420ca5de7cb3cfc15196335507ee76c98'
'930e7e0afa4d2747d3bf96c926',
'sha1_git': '40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03',
'blake2s256': '685395c5dc57cada459364f0946d3dd45bad5f'
'cbabc1048edb44380f1d31d0aa',
},
'target': '40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03',
'dir_id': '40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03',
'name': 'bob',
'type': 10,
'status': 'absent',
}
# when
actual_dir_entries = converters.from_directory_entry(dir_entries_input)
# then
self.assertEqual(actual_dir_entries, expected_dir_entries)
@istest
def from_filetype(self):
content_filetype = {
'id': hashutil.hash_to_bytes(
'5c6f0e2750f48fa0bd0c4cf5976ba0b9e02ebda5'),
'encoding': b'utf-8',
'mimetype': b'text/plain',
}
expected_content_filetype = {
'id': '5c6f0e2750f48fa0bd0c4cf5976ba0b9e02ebda5',
'encoding': 'utf-8',
'mimetype': 'text/plain',
}
# when
actual_content_filetype = converters.from_filetype(content_filetype)
# then
self.assertEqual(actual_content_filetype, expected_content_filetype)

File Metadata

Mime Type
text/x-diff
Expires
Jul 4 2025, 8:38 AM (7 w, 1 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3304598

Event Timeline