Page Menu
Home
Software Heritage
Search
Configure Global Search
Log In
Files
F9338217
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
97 KB
Subscribers
None
View Options
diff --git a/swh/web/api/utils.py b/swh/web/api/utils.py
index 33a23450..09f3cbdf 100644
--- a/swh/web/api/utils.py
+++ b/swh/web/api/utils.py
@@ -1,353 +1,306 @@
# Copyright (C) 2015-2018 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU Affero General Public License version 3, or any later version
# See top-level LICENSE file for more information
-import re
-from swh.web.common.utils import reverse, fmap
+from swh.web.common.utils import reverse
from swh.web.common.query import parse_hash
-def filter_endpoints(url_map, prefix_url_rule, blacklist=[]):
- """Filter endpoints by prefix url rule.
-
- Args:
- - url_map: Url Werkzeug.Map of rules
- - prefix_url_rule: prefix url string
- - blacklist: blacklist of some url
-
- Returns:
- Dictionary of url_rule with values methods and endpoint.
-
- The key is the url, the associated value is a dictionary of
- 'methods' (possible http methods) and 'endpoint' (python function)
-
- """
- out = {}
- for r in url_map:
- rule = r['rule']
- if rule == prefix_url_rule or rule in blacklist:
- continue
-
- if rule.startswith(prefix_url_rule):
- out[rule] = {'methods': sorted(map(str, r['methods'])),
- 'endpoint': r['endpoint']}
- return out
-
-
-def prepare_data_for_view(data, encoding='utf-8'):
- def prepare_data(s):
- # Note: can only be 'data' key with bytes of raw content
- if isinstance(s, bytes):
- try:
- return s.decode(encoding)
- except Exception:
- return "Cannot decode the data bytes, try and set another " \
- "encoding in the url (e.g. ?encoding=utf8) or " \
- "download directly the " \
- "content's raw data."
- if isinstance(s, str):
- return re.sub(r'/api/1/', r'/browse/', s)
-
- return s
-
- return fmap(prepare_data, data)
-
-
def filter_field_keys(data, field_keys):
"""Given an object instance (directory or list), and a csv field keys
to filter on.
Return the object instance with filtered keys.
Note: Returns obj as is if it's an instance of types not in (dictionary,
list)
Args:
- data: one object (dictionary, list...) to filter.
- field_keys: csv or set of keys to filter the object on
Returns:
obj filtered on field_keys
"""
if isinstance(data, map):
return map(lambda x: filter_field_keys(x, field_keys), data)
if isinstance(data, list):
return [filter_field_keys(x, field_keys) for x in data]
if isinstance(data, dict):
return {k: v for (k, v) in data.items() if k in field_keys}
return data
def person_to_string(person):
"""Map a person (person, committer, tagger, etc...) to a string.
"""
return ''.join([person['name'], ' <', person['email'], '>'])
def enrich_object(object):
"""Enrich an object (revision, release) with link to the 'target' of
type 'target_type'.
Args:
object: An object with target and target_type keys
(e.g. release, revision)
Returns:
Object enriched with target_url pointing to the right
swh.web.ui.api urls for the pointing object (revision,
release, content, directory)
"""
obj = object.copy()
if 'target' in obj and 'target_type' in obj:
if obj['target_type'] == 'revision':
obj['target_url'] = reverse('revision',
kwargs={'sha1_git': obj['target']})
elif obj['target_type'] == 'release':
obj['target_url'] = reverse('release',
kwargs={'sha1_git': obj['target']})
elif obj['target_type'] == 'content':
obj['target_url'] = \
reverse('content', kwargs={'q': 'sha1_git:' + obj['target']})
elif obj['target_type'] == 'directory':
obj['target_url'] = reverse('directory',
kwargs={'sha1_git': obj['target']})
if 'author' in obj:
author = obj['author']
obj['author_url'] = reverse('person',
kwargs={'person_id': author['id']})
return obj
enrich_release = enrich_object
def enrich_directory(directory, context_url=None):
"""Enrich directory with url to content or directory.
"""
if 'type' in directory:
target_type = directory['type']
target = directory['target']
if target_type == 'file':
directory['target_url'] = \
reverse('content', kwargs={'q': 'sha1_git:%s' % target})
if context_url:
directory['file_url'] = context_url + directory['name'] + '/'
elif target_type == 'dir':
directory['target_url'] = reverse('directory',
kwargs={'sha1_git': target})
if context_url:
directory['dir_url'] = context_url + directory['name'] + '/'
else:
directory['target_url'] = reverse('revision',
kwargs={'sha1_git': target})
if context_url:
directory['rev_url'] = context_url + directory['name'] + '/'
return directory
def enrich_metadata_endpoint(content):
"""Enrich metadata endpoint with link to the upper metadata endpoint.
"""
c = content.copy()
c['content_url'] = reverse('content', args=['sha1:%s' % c['id']])
return c
def enrich_content(content, top_url=False, query_string=None):
"""Enrich content with links to:
- data_url: its raw data
- filetype_url: its filetype information
- language_url: its programming language information
- license_url: its licensing information
Args:
content: dict of data associated to a swh content object
top_url: whether or not to include the content url in
the enriched data
query_string: optional query string of type '<algo>:<hash>'
used when requesting the content, it acts as a hint
for picking the same hash method when computing
the url listed above
Returns:
An enriched content dict filled with additional urls
"""
checksums = content
if 'checksums' in content:
checksums = content['checksums']
hash_algo = 'sha1'
if query_string:
hash_algo = parse_hash(query_string)[0]
if hash_algo in checksums:
q = '%s:%s' % (hash_algo, checksums[hash_algo])
if top_url:
content['content_url'] = reverse('content', kwargs={'q': q})
content['data_url'] = reverse('content-raw', kwargs={'q': q})
content['filetype_url'] = reverse('content-filetype',
kwargs={'q': q})
content['language_url'] = reverse('content-language',
kwargs={'q': q})
content['license_url'] = reverse('content-license',
kwargs={'q': q})
return content
def enrich_entity(entity):
"""Enrich entity with
"""
if 'uuid' in entity:
entity['uuid_url'] = reverse('entity',
kwargs={'uuid': entity['uuid']})
if 'parent' in entity and entity['parent']:
entity['parent_url'] = reverse('entity',
kwargs={'uuid': entity['parent']})
return entity
def _get_path_list(path_string):
"""Helper for enrich_revision: get a list of the sha1 id of the navigation
breadcrumbs, ordered from the oldest to the most recent.
Args:
path_string: the path as a '/'-separated string
Returns:
The navigation context as a list of sha1 revision ids
"""
return path_string.split('/')
def _get_revision_contexts(rev_id, context):
"""Helper for enrich_revision: retrieve for the revision id and potentially
the navigation breadcrumbs the context to pass to parents and children of
of the revision.
Args:
rev_id: the revision's sha1 id
context: the current navigation context
Returns:
The context for parents, children and the url of the direct child as a
tuple in that order.
"""
context_for_parents = None
context_for_children = None
url_direct_child = None
if not context:
return (rev_id, None, None)
path_list = _get_path_list(context)
context_for_parents = '%s/%s' % (context, rev_id)
prev_for_children = path_list[:-1]
if len(prev_for_children) > 0:
context_for_children = '/'.join(prev_for_children)
child_id = path_list[-1]
# This commit is not the first commit in the path
if context_for_children:
url_direct_child = reverse('revision-context',
kwargs={'sha1_git': child_id,
'context': context_for_children})
# This commit is the first commit in the path
else:
url_direct_child = reverse('revision', kwargs={'sha1_git': child_id})
return (context_for_parents, context_for_children, url_direct_child)
def _make_child_url(rev_children, context):
"""Helper for enrich_revision: retrieve the list of urls corresponding
to the children of the current revision according to the navigation
breadcrumbs.
Args:
rev_children: a list of revision id
context: the '/'-separated navigation breadcrumbs
Returns:
the list of the children urls according to the context
"""
children = []
for child in rev_children:
if context and child != _get_path_list(context)[-1]:
children.append(reverse('revision',
kwargs={'sha1_git': child}))
elif not context:
children.append(reverse('revision', kwargs={'sha1_git': child}))
return children
def enrich_revision(revision, context=None):
"""Enrich revision with links where it makes sense (directory, parents).
Keep track of the navigation breadcrumbs if they are specified.
Args:
revision: the revision as a dict
context: the navigation breadcrumbs as a /-separated string of revision
sha1_git
"""
ctx_parents, ctx_children, url_direct_child = _get_revision_contexts(
revision['id'], context)
revision['url'] = reverse('revision', kwargs={'sha1_git': revision['id']})
revision['history_url'] = reverse('revision-log',
kwargs={'sha1_git': revision['id']})
if context:
revision['history_context_url'] = reverse(
'revision-log', kwargs={'sha1_git': revision['id'],
'prev_sha1s': context})
if 'author' in revision:
author = revision['author']
revision['author_url'] = reverse('person',
kwargs={'person_id': author['id']})
if 'committer' in revision:
committer = revision['committer']
revision['committer_url'] = \
reverse('person', kwargs={'person_id': committer['id']})
if 'directory' in revision:
revision['directory_url'] = \
reverse('directory', kwargs={'sha1_git': revision['directory']})
if 'parents' in revision:
parents = []
for parent in revision['parents']:
parents.append({
'id': parent,
'url': reverse('revision', kwargs={'sha1_git': parent})
})
revision['parents'] = parents
if 'children' in revision:
children = _make_child_url(revision['children'], context)
if url_direct_child:
children.append(url_direct_child)
revision['children_urls'] = children
else:
if url_direct_child:
revision['children_urls'] = [url_direct_child]
if 'message_decoding_failed' in revision:
revision['message_url'] = reverse('revision-raw-message',
kwargs={'sha1_git': revision['id']})
return revision
diff --git a/swh/web/common/converters.py b/swh/web/common/converters.py
index a151fd00..986193a9 100644
--- a/swh/web/common/converters.py
+++ b/swh/web/common/converters.py
@@ -1,362 +1,390 @@
# Copyright (C) 2015-2018 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU Affero General Public License version 3, or any later version
# See top-level LICENSE file for more information
import datetime
import json
from swh.model import hashutil
from swh.core.utils import decode_with_escape
-from swh.web.common import utils
def _group_checksums(data):
"""Groups checksums values computed from hash functions used in swh
and stored in data dict under a single entry 'checksums'
"""
if data:
checksums = {}
for hash in hashutil.ALGORITHMS:
if hash in data and data[hash]:
checksums[hash] = data[hash]
del data[hash]
if len(checksums) > 0:
data['checksums'] = checksums
+def fmap(f, data):
+ """Map f to data at each level.
+
+ This must keep the origin data structure type:
+ - map -> map
+ - dict -> dict
+ - list -> list
+ - None -> None
+
+ Args:
+ f: function that expects one argument.
+ data: data to traverse to apply the f function.
+ list, map, dict or bare value.
+
+ Returns:
+ The same data-structure with modified values by the f function.
+
+ """
+ if data is None:
+ return data
+ if isinstance(data, map):
+ return map(lambda y: fmap(f, y), (x for x in data))
+ if isinstance(data, list):
+ return [fmap(f, x) for x in data]
+ if isinstance(data, dict):
+ return {k: fmap(f, v) for (k, v) in data.items()}
+ return f(data)
+
+
def from_swh(dict_swh, hashess={}, bytess={}, dates={}, blacklist={},
removables_if_empty={}, empty_dict={}, empty_list={},
convert={}, convert_fn=lambda x: x):
"""Convert from an swh dictionary to something reasonably json
serializable.
Args:
dict_swh: the origin dictionary needed to be transformed
hashess: list/set of keys representing hashes values (sha1, sha256,
sha1_git, etc...) as bytes. Those need to be transformed in
hexadecimal string
bytess: list/set of keys representing bytes values which needs to be
decoded
blacklist: set of keys to filter out from the conversion
convert: set of keys whose associated values need to be converted using
convert_fn
convert_fn: the conversion function to apply on the value of key in
'convert'
The remaining keys are copied as is in the output.
Returns:
dictionary equivalent as dict_swh only with its keys converted.
"""
def convert_hashes_bytes(v):
"""v is supposedly a hash as bytes, returns it converted in hex.
"""
if isinstance(v, bytes):
return hashutil.hash_to_hex(v)
return v
def convert_bytes(v):
"""v is supposedly a bytes string, decode as utf-8.
FIXME: Improve decoding policy.
If not utf-8, break!
"""
if isinstance(v, bytes):
return v.decode('utf-8')
return v
def convert_date(v):
"""
Args:
v (dict or datatime): either:
- a dict with three keys:
- timestamp (dict or integer timestamp)
- offset
- negative_utc
- or, a datetime
We convert it to a human-readable string
"""
if not v:
return v
if isinstance(v, datetime.datetime):
return v.isoformat()
tz = datetime.timezone(datetime.timedelta(minutes=v['offset']))
swh_timestamp = v['timestamp']
if isinstance(swh_timestamp, dict):
date = datetime.datetime.fromtimestamp(
swh_timestamp['seconds'], tz=tz)
else:
date = datetime.datetime.fromtimestamp(
swh_timestamp, tz=tz)
datestr = date.isoformat()
if v['offset'] == 0 and v['negative_utc']:
# remove the rightmost + and replace it with a -
return '-'.join(datestr.rsplit('+', 1))
return datestr
if not dict_swh:
return dict_swh
new_dict = {}
for key, value in dict_swh.items():
if key in blacklist or (key in removables_if_empty and not value):
continue
if key in dates:
new_dict[key] = convert_date(value)
elif key in convert:
new_dict[key] = convert_fn(value)
elif isinstance(value, dict):
new_dict[key] = from_swh(value,
hashess=hashess, bytess=bytess,
dates=dates, blacklist=blacklist,
removables_if_empty=removables_if_empty,
empty_dict=empty_dict,
empty_list=empty_list,
convert=convert,
convert_fn=convert_fn)
elif key in hashess:
- new_dict[key] = utils.fmap(convert_hashes_bytes, value)
+ new_dict[key] = fmap(convert_hashes_bytes, value)
elif key in bytess:
try:
- new_dict[key] = utils.fmap(convert_bytes, value)
+ new_dict[key] = fmap(convert_bytes, value)
except UnicodeDecodeError:
if 'decoding_failures' not in new_dict:
new_dict['decoding_failures'] = [key]
else:
new_dict['decoding_failures'].append(key)
- new_dict[key] = utils.fmap(decode_with_escape, value)
+ new_dict[key] = fmap(decode_with_escape, value)
elif key in empty_dict and not value:
new_dict[key] = {}
elif key in empty_list and not value:
new_dict[key] = []
else:
new_dict[key] = value
_group_checksums(new_dict)
return new_dict
def from_provenance(provenance):
"""Convert from a provenance information to a provenance dictionary.
Args:
provenance (dict): Dictionary with the following keys:
- content (sha1_git): the content's identifier
- revision (sha1_git): the revision the content was seen
- origin (int): the origin the content was seen
- visit (int): the visit it occurred
- path (bytes): the path the content was seen at
"""
return from_swh(provenance,
hashess={'content', 'revision'},
bytess={'path'})
def from_origin(origin):
"""Convert from an SWH origin to an origin dictionary.
"""
return from_swh(origin,
removables_if_empty={'lister', 'project'})
def from_release(release):
"""Convert from an SWH release to a json serializable release dictionary.
Args:
release (dict): dictionary with keys:
- id: identifier of the revision (sha1 in bytes)
- revision: identifier of the revision the release points to (sha1
in bytes)
comment: release's comment message (bytes)
name: release's name (string)
author: release's author identifier (swh's id)
synthetic: the synthetic property (boolean)
Returns:
dict: Release dictionary with the following keys:
- id: hexadecimal sha1 (string)
- revision: hexadecimal sha1 (string)
- comment: release's comment message (string)
- name: release's name (string)
- author: release's author identifier (swh's id)
- synthetic: the synthetic property (boolean)
"""
return from_swh(
release,
hashess={'id', 'target'},
bytess={'message', 'name', 'fullname', 'email'},
dates={'date'},
)
class SWHMetadataEncoder(json.JSONEncoder):
"""Special json encoder for metadata field which can contain bytes
encoded value.
"""
def default(self, obj):
if isinstance(obj, bytes):
return obj.decode('utf-8')
# Let the base class default method raise the TypeError
return json.JSONEncoder.default(self, obj)
def convert_revision_metadata(metadata):
"""Convert json specific dict to a json serializable one.
"""
if not metadata:
return {}
return json.loads(json.dumps(metadata, cls=SWHMetadataEncoder))
def from_revision(revision):
"""Convert from an SWH revision to a json serializable revision dictionary.
Args:
revision (dict): dict with keys:
- id: identifier of the revision (sha1 in bytes)
- directory: identifier of the directory the revision points to
(sha1 in bytes)
- author_name, author_email: author's revision name and email
- committer_name, committer_email: committer's revision name and
email
- message: revision's message
- date, date_offset: revision's author date
- committer_date, committer_date_offset: revision's commit date
- parents: list of parents for such revision
- synthetic: revision's property nature
- type: revision's type (git, tar or dsc at the moment)
- metadata: if the revision is synthetic, this can reference
dynamic properties.
Returns:
dict: Revision dictionary with the same keys as inputs, except:
- sha1s are in hexadecimal strings (id, directory)
- bytes are decoded in string (author_name, committer_name,
author_email, committer_email)
Remaining keys are left as is
"""
revision = from_swh(revision,
hashess={'id', 'directory', 'parents', 'children'},
bytess={'name', 'fullname', 'email'},
convert={'metadata'},
convert_fn=convert_revision_metadata,
dates={'date', 'committer_date'})
if revision:
if 'parents' in revision:
revision['merge'] = len(revision['parents']) > 1
if 'message' in revision:
try:
revision['message'] = revision['message'].decode('utf-8')
except UnicodeDecodeError:
revision['message_decoding_failed'] = True
revision['message'] = None
return revision
def from_content(content):
"""Convert swh content to serializable content dictionary.
"""
return from_swh(content,
hashess={'sha1', 'sha1_git', 'sha256', 'blake2s256'},
blacklist={'ctime'},
convert={'status'},
convert_fn=lambda v: 'absent' if v == 'hidden' else v)
def from_person(person):
"""Convert swh person to serializable person dictionary.
"""
return from_swh(person,
bytess={'name', 'fullname', 'email'})
def from_origin_visit(visit):
"""Convert swh origin_visit to serializable origin_visit dictionary.
"""
ov = from_swh(visit,
hashess={'target', 'snapshot'},
bytess={'branch'},
dates={'date'},
empty_dict={'metadata'})
# TODO: remove that piece of code once snapshot migration
# is totally effective in storage (no more occurrences)
if ov and 'occurrences' in ov:
ov['occurrences'] = {
decode_with_escape(k): v
for k, v in ov['occurrences'].items()
}
return ov
def from_snapshot(snapshot):
"""Convert swh snapshot to serializable snapshot dictionary.
"""
sv = from_swh(snapshot,
hashess={'id', 'target'})
if sv and 'branches' in sv:
sv['branches'] = {
decode_with_escape(k): v
for k, v in sv['branches'].items()
}
return sv
def from_directory_entry(dir_entry):
"""Convert swh person to serializable person dictionary.
"""
return from_swh(dir_entry,
hashess={'dir_id', 'sha1_git', 'sha1', 'sha256',
'blake2s256', 'target'},
bytess={'name'},
removables_if_empty={
'sha1', 'sha1_git', 'sha256', 'blake2s256', 'status'},
convert={'status'},
convert_fn=lambda v: 'absent' if v == 'hidden' else v)
def from_filetype(content_entry):
"""Convert swh person to serializable person dictionary.
"""
return from_swh(content_entry,
hashess={'id'},
bytess={'mimetype', 'encoding'})
diff --git a/swh/web/common/utils.py b/swh/web/common/utils.py
index c4977055..6dfca325 100644
--- a/swh/web/common/utils.py
+++ b/swh/web/common/utils.py
@@ -1,376 +1,347 @@
# Copyright (C) 2017-2018 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU Affero General Public License version 3, or any later version
# See top-level LICENSE file for more information
import docutils.parsers.rst
import docutils.utils
import re
from datetime import datetime, timezone
from dateutil import parser as date_parser
from dateutil import tz
from django.core.cache import cache
from django.core import urlresolvers
from django.http import QueryDict
from swh.model.exceptions import ValidationError
from swh.model.identifiers import (
persistent_identifier, parse_persistent_identifier,
CONTENT, DIRECTORY, RELEASE, REVISION, SNAPSHOT
)
from swh.web.common import service
from swh.web.common.exc import BadInputExc
def reverse(viewname, args=None, kwargs=None, query_params=None,
current_app=None, urlconf=None):
"""An override of django reverse function supporting query parameters.
Args:
viewname: the name of the django view from which to compute a url
args: list of url arguments ordered according to their position it
kwargs: dictionary of url arguments indexed by their names
query_params: dictionary of query parameters to append to the
reversed url
current_app: the name of the django app tighted to the view
urlconf: url configuration module
Returns:
The url of the requested view with processed arguments and
query parameters
"""
if kwargs:
kwargs = {k: v for k, v in kwargs.items() if v is not None}
url = urlresolvers.reverse(
viewname, urlconf=urlconf, args=args,
kwargs=kwargs, current_app=current_app)
if query_params:
query_params = {k: v for k, v in query_params.items() if v is not None}
if query_params and len(query_params) > 0:
query_dict = QueryDict('', mutable=True)
for k in sorted(query_params.keys()):
query_dict[k] = query_params[k]
url += ('?' + query_dict.urlencode(safe='/;:'))
return url
-def fmap(f, data):
- """Map f to data at each level.
-
- This must keep the origin data structure type:
- - map -> map
- - dict -> dict
- - list -> list
- - None -> None
-
- Args:
- f: function that expects one argument.
- data: data to traverse to apply the f function.
- list, map, dict or bare value.
-
- Returns:
- The same data-structure with modified values by the f function.
-
- """
- if data is None:
- return data
- if isinstance(data, map):
- return map(lambda y: fmap(f, y), (x for x in data))
- if isinstance(data, list):
- return [fmap(f, x) for x in data]
- if isinstance(data, dict):
- return {k: fmap(f, v) for (k, v) in data.items()}
- return f(data)
-
-
def datetime_to_utc(date):
"""Returns datetime in UTC without timezone info
Args:
date (datetime.datetime): input datetime with timezone info
Returns:
datetime.datime: datetime in UTC without timezone info
"""
if date.tzinfo:
return date.astimezone(tz.gettz('UTC')).replace(tzinfo=timezone.utc)
else:
return date
def parse_timestamp(timestamp):
"""Given a time or timestamp (as string), parse the result as UTC datetime.
Returns:
a timezone-aware datetime representing the parsed value.
None if the parsing fails.
Samples:
- 2016-01-12
- 2016-01-12T09:19:12+0100
- Today is January 1, 2047 at 8:21:00AM
- 1452591542
"""
if not timestamp:
return None
try:
date = date_parser.parse(timestamp, ignoretz=False, fuzzy=True)
return datetime_to_utc(date)
except Exception:
try:
return datetime.utcfromtimestamp(float(timestamp)).replace(
tzinfo=timezone.utc)
except (ValueError, OverflowError) as e:
raise BadInputExc(e)
def shorten_path(path):
"""Shorten the given path: for each hash present, only return the first
8 characters followed by an ellipsis"""
sha256_re = r'([0-9a-f]{8})[0-9a-z]{56}'
sha1_re = r'([0-9a-f]{8})[0-9a-f]{32}'
ret = re.sub(sha256_re, r'\1...', path)
return re.sub(sha1_re, r'\1...', ret)
def format_utc_iso_date(iso_date, fmt='%d %B %Y, %H:%M UTC'):
"""Turns a string reprensation of an ISO 8601 date string
to UTC and format it into a more human readable one.
For instance, from the following input
string: '2017-05-04T13:27:13+02:00' the following one
is returned: '04 May 2017, 11:27 UTC'.
Custom format string may also be provided
as parameter
Args:
iso_date (str): a string representation of an ISO 8601 date
fmt (str): optional date formatting string
Returns:
A formatted string representation of the input iso date
"""
if not iso_date:
return iso_date
date = parse_timestamp(iso_date)
return date.strftime(fmt)
def gen_path_info(path):
"""Function to generate path data navigation for use
with a breadcrumb in the swh web ui.
For instance, from a path /folder1/folder2/folder3,
it returns the following list::
[{'name': 'folder1', 'path': 'folder1'},
{'name': 'folder2', 'path': 'folder1/folder2'},
{'name': 'folder3', 'path': 'folder1/folder2/folder3'}]
Args:
path: a filesystem path
Returns:
A list of path data for navigation as illustrated above.
"""
path_info = []
if path:
sub_paths = path.strip('/').split('/')
path_from_root = ''
for p in sub_paths:
path_from_root += '/' + p
path_info.append({'name': p,
'path': path_from_root.strip('/')})
return path_info
def get_origin_visits(origin_info):
"""Function that returns the list of visits for a swh origin.
That list is put in cache in order to speedup the navigation
in the swh web browse ui.
Args:
origin_id (int): the id of the swh origin to fetch visits from
Returns:
A list of dict describing the origin visits::
[{'date': <UTC visit date in ISO format>,
'origin': <origin id>,
'status': <'full' | 'partial'>,
'visit': <visit id>
},
...
]
Raises:
NotFoundExc if the origin is not found
"""
cache_entry_id = 'origin_%s_visits' % origin_info['id']
cache_entry = cache.get(cache_entry_id)
if cache_entry:
return cache_entry
origin_visits = []
per_page = service.MAX_LIMIT
last_visit = None
while 1:
visits = list(service.lookup_origin_visits(origin_info['id'],
last_visit=last_visit,
per_page=per_page))
origin_visits += visits
if len(visits) < per_page:
break
else:
if not last_visit:
last_visit = per_page
else:
last_visit += per_page
def _visit_sort_key(visit):
ts = parse_timestamp(visit['date']).timestamp()
return ts + (float(visit['visit']) / 10e3)
for v in origin_visits:
if 'metadata' in v:
del v['metadata']
origin_visits = [dict(t) for t in set([tuple(d.items())
for d in origin_visits])]
origin_visits = sorted(origin_visits, key=lambda v: _visit_sort_key(v))
cache.set(cache_entry_id, origin_visits)
return origin_visits
def get_swh_persistent_id(object_type, object_id, scheme_version=1):
"""
Returns the persistent identifier for a swh object based on:
* the object type
* the object id
* the swh identifiers scheme version
Args:
object_type (str): the swh object type
(content/directory/release/revision/snapshot)
object_id (str): the swh object id (hexadecimal representation
of its hash value)
scheme_version (int): the scheme version of the swh
persistent identifiers
Returns:
str: the swh object persistent identifier
Raises:
BadInputExc if the provided parameters do not enable to
generate a valid identifier
"""
try:
swh_id = persistent_identifier(object_type, object_id, scheme_version)
except ValidationError as e:
raise BadInputExc('Invalid object (%s) for swh persistent id. %s' %
(object_id, e))
else:
return swh_id
def resolve_swh_persistent_id(swh_id, query_params=None):
"""
Try to resolve a SWH persistent id into an url for
browsing the pointed object.
Args:
swh_id (str): a SWH persistent identifier
query_params (django.http.QueryDict): optional dict filled with
query parameters to append to the browse url
Returns:
dict: a dict with the following keys:
* **swh_id_parsed (swh.model.identifiers.PersistentId)**: the parsed identifier
* **browse_url (str)**: the url for browsing the pointed object
Raises:
BadInputExc: if the provided identifier can not be parsed
""" # noqa
try:
swh_id_parsed = parse_persistent_identifier(swh_id)
object_type = swh_id_parsed.object_type
object_id = swh_id_parsed.object_id
browse_url = None
query_dict = QueryDict('', mutable=True)
if query_params and len(query_params) > 0:
for k in sorted(query_params.keys()):
query_dict[k] = query_params[k]
if 'origin' in swh_id_parsed.metadata:
query_dict['origin'] = swh_id_parsed.metadata['origin']
if object_type == CONTENT:
query_string = 'sha1_git:' + object_id
fragment = ''
if 'lines' in swh_id_parsed.metadata:
lines = swh_id_parsed.metadata['lines'].split('-')
fragment += '#L' + lines[0]
if len(lines) > 1:
fragment += '-L' + lines[1]
browse_url = reverse('browse-content',
kwargs={'query_string': query_string},
query_params=query_dict) + fragment
elif object_type == DIRECTORY:
browse_url = reverse('browse-directory',
kwargs={'sha1_git': object_id},
query_params=query_dict)
elif object_type == RELEASE:
browse_url = reverse('browse-release',
kwargs={'sha1_git': object_id},
query_params=query_dict)
elif object_type == REVISION:
browse_url = reverse('browse-revision',
kwargs={'sha1_git': object_id},
query_params=query_dict)
elif object_type == SNAPSHOT:
browse_url = reverse('browse-snapshot',
kwargs={'snapshot_id': object_id},
query_params=query_dict)
except ValidationError as ve:
raise BadInputExc('Error when parsing identifier. %s' %
' '.join(ve.messages))
else:
return {'swh_id_parsed': swh_id_parsed,
'browse_url': browse_url}
def parse_rst(text, report_level=2):
"""
Parse a reStructuredText string with docutils.
Args:
text (str): string with reStructuredText markups in it
report_level (int): level of docutils report messages to print
(1 info 2 warning 3 error 4 severe 5 none)
Returns:
docutils.nodes.document: a parsed docutils document
"""
parser = docutils.parsers.rst.Parser()
components = (docutils.parsers.rst.Parser,)
settings = docutils.frontend.OptionParser(
components=components).get_default_values()
settings.report_level = report_level
document = docutils.utils.new_document('rst-doc', settings=settings)
parser.parse(text, document)
return document
diff --git a/swh/web/tests/api/test_utils.py b/swh/web/tests/api/test_utils.py
index 5f96b83d..ce816a8a 100644
--- a/swh/web/tests/api/test_utils.py
+++ b/swh/web/tests/api/test_utils.py
@@ -1,880 +1,743 @@
# Copyright (C) 2015-2018 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU Affero General Public License version 3, or any later version
# See top-level LICENSE file for more information
import unittest
from unittest.mock import patch, call
from nose.tools import istest, nottest
from swh.web.api import utils
class UtilsTestCase(unittest.TestCase):
def setUp(self):
self.maxDiff = None
self.url_map = [dict(rule='/other/<slug>',
methods=set(['GET', 'POST', 'HEAD']),
endpoint='foo'),
dict(rule='/some/old/url/<slug>',
methods=set(['GET', 'POST']),
endpoint='blablafn'),
dict(rule='/other/old/url/<int:id>',
methods=set(['GET', 'HEAD']),
endpoint='bar'),
dict(rule='/other',
methods=set([]),
endpoint=None),
dict(rule='/other2',
methods=set([]),
endpoint=None)]
self.sample_content_hashes = {
'blake2s256': ('791e07fcea240ade6dccd0a9309141673'
'c31242cae9c237cf3855e151abc78e9'),
'sha1': 'dc2830a9e72f23c1dfebef4413003221baa5fb62',
'sha1_git': 'fe95a46679d128ff167b7c55df5d02356c5a1ae1',
'sha256': ('b5c7fe0536f44ef60c8780b6065d30bca74a5cd06'
'd78a4a71ba1ad064770f0c9')
}
- @istest
- def filter_endpoints_1(self):
- # when
- actual_data = utils.filter_endpoints(self.url_map, '/some')
-
- # then
- self.assertEquals(actual_data, {
- '/some/old/url/<slug>': {
- 'methods': ['GET', 'POST'],
- 'endpoint': 'blablafn'
- }
- })
-
- @istest
- def filter_endpoints_2(self):
- # when
- actual_data = utils.filter_endpoints(self.url_map, '/other',
- blacklist=['/other2'])
-
- # then
- # rules /other is skipped because its' exactly the prefix url
- # rules /other2 is skipped because it's blacklisted
- self.assertEquals(actual_data, {
- '/other/<slug>': {
- 'methods': ['GET', 'HEAD', 'POST'],
- 'endpoint': 'foo'
- },
- '/other/old/url/<int:id>': {
- 'methods': ['GET', 'HEAD'],
- 'endpoint': 'bar'
- }
- })
-
- @istest
- def prepare_data_for_view_default_encoding(self):
- self.maxDiff = None
- # given
- inputs = [
- {
- 'data': b'some blah data'
- },
- {
- 'data': 1,
- 'data_url': '/api/1/some/api/call',
- },
- {
- 'blah': 'foobar',
- 'blah_url': '/some/non/changed/api/call'
- }]
-
- # when
- actual_result = utils.prepare_data_for_view(inputs)
-
- # then
- self.assertEquals(actual_result, [
- {
- 'data': 'some blah data',
- },
- {
- 'data': 1,
- 'data_url': '/browse/some/api/call',
- },
- {
- 'blah': 'foobar',
- 'blah_url': '/some/non/changed/api/call'
- }
- ])
-
- @istest
- def prepare_data_for_view(self):
- self.maxDiff = None
- # given
- inputs = [
- {
- 'data': b'some blah data'
- },
- {
- 'data': 1,
- 'data_url': '/api/1/some/api/call',
- },
- {
- 'blah': 'foobar',
- 'blah_url': '/some/non/changed/api/call'
- }]
-
- # when
- actual_result = utils.prepare_data_for_view(inputs, encoding='ascii')
-
- # then
- self.assertEquals(actual_result, [
- {
- 'data': 'some blah data',
- },
- {
- 'data': 1,
- 'data_url': '/browse/some/api/call',
- },
- {
- 'blah': 'foobar',
- 'blah_url': '/some/non/changed/api/call'
- }
- ])
-
- @istest
- def prepare_data_for_view_ko_cannot_decode(self):
- self.maxDiff = None
- # given
- inputs = {
- 'data': 'hé dude!'.encode('utf8'),
- }
-
- actual_result = utils.prepare_data_for_view(inputs, encoding='ascii')
-
- # then
- self.assertEquals(actual_result, {
- 'data': "Cannot decode the data bytes, try and set another "
- "encoding in the url (e.g. ?encoding=utf8) or "
- "download directly the "
- "content's raw data.",
- })
-
@istest
def filter_field_keys_dict_unknown_keys(self):
# when
actual_res = utils.filter_field_keys(
{'directory': 1, 'file': 2, 'link': 3},
{'directory1', 'file2'})
# then
self.assertEqual(actual_res, {})
@istest
def filter_field_keys_dict(self):
# when
actual_res = utils.filter_field_keys(
{'directory': 1, 'file': 2, 'link': 3},
{'directory', 'link'})
# then
self.assertEqual(actual_res, {'directory': 1, 'link': 3})
@istest
def filter_field_keys_list_unknown_keys(self):
# when
actual_res = utils.filter_field_keys(
[{'directory': 1, 'file': 2, 'link': 3},
{'1': 1, '2': 2, 'link': 3}],
{'d'})
# then
self.assertEqual(actual_res, [{}, {}])
@istest
def filter_field_keys_map(self):
# when
actual_res = utils.filter_field_keys(
map(lambda x: {'i': x['i']+1, 'j': x['j']},
[{'i': 1, 'j': None},
{'i': 2, 'j': None},
{'i': 3, 'j': None}]),
{'i'})
# then
self.assertEqual(list(actual_res), [{'i': 2}, {'i': 3}, {'i': 4}])
@istest
def filter_field_keys_list(self):
# when
actual_res = utils.filter_field_keys(
[{'directory': 1, 'file': 2, 'link': 3},
{'dir': 1, 'fil': 2, 'lin': 3}],
{'directory', 'dir'})
# then
self.assertEqual(actual_res, [{'directory': 1}, {'dir': 1}])
@istest
def filter_field_keys_other(self):
# given
input_set = {1, 2}
# when
actual_res = utils.filter_field_keys(input_set, {'a', '1'})
# then
self.assertEqual(actual_res, input_set)
- @istest
- def fmap(self):
- self.assertEquals([2, 3, None, 4],
- utils.fmap(lambda x: x+1, [1, 2, None, 3]))
- self.assertEquals([11, 12, 13],
- list(utils.fmap(lambda x: x+10,
- map(lambda x: x, [1, 2, 3]))))
- self.assertEquals({'a': 2, 'b': 4},
- utils.fmap(lambda x: x*2, {'a': 1, 'b': 2}))
- self.assertEquals(100,
- utils.fmap(lambda x: x*10, 10))
- self.assertEquals({'a': [2, 6], 'b': 4},
- utils.fmap(lambda x: x*2, {'a': [1, 3], 'b': 2}))
-
- self.assertIsNone(utils.fmap(lambda x: x, None))
-
@istest
def person_to_string(self):
self.assertEqual(utils.person_to_string(dict(name='raboof',
email='foo@bar')),
'raboof <foo@bar>')
@istest
def enrich_release_0(self):
# when
actual_release = utils.enrich_release({})
# then
self.assertEqual(actual_release, {})
@patch('swh.web.api.utils.reverse')
@istest
def enrich_release_1(self, mock_django_reverse):
# given
def reverse_test_context(view_name, kwargs):
if view_name == 'content':
id = kwargs['q']
return '/api/1/content/%s/' % id
elif view_name == 'person':
id = kwargs['person_id']
return '/api/1/person/%s/' % id
else:
raise ValueError(
'This should not happened so fail if it does.')
mock_django_reverse.side_effect = reverse_test_context
# when
actual_release = utils.enrich_release({
'target': '123',
'target_type': 'content',
'author': {
'id': 100,
'name': 'author release name',
'email': 'author@email',
},
})
# then
self.assertEqual(actual_release, {
'target': '123',
'target_type': 'content',
'target_url': '/api/1/content/sha1_git:123/',
'author_url': '/api/1/person/100/',
'author': {
'id': 100,
'name': 'author release name',
'email': 'author@email',
},
})
mock_django_reverse.assert_has_calls([
call('content', kwargs={'q': 'sha1_git:123'}),
call('person', kwargs={'person_id': 100})
])
@patch('swh.web.api.utils.reverse')
@istest
def enrich_release_2(self, mock_django_reverse):
# given
mock_django_reverse.return_value = '/api/1/dir/23/'
# when
actual_release = utils.enrich_release({'target': '23',
'target_type': 'directory'})
# then
self.assertEqual(actual_release, {
'target': '23',
'target_type': 'directory',
'target_url': '/api/1/dir/23/'
})
mock_django_reverse.assert_called_once_with('directory',
kwargs={'sha1_git': '23'})
@patch('swh.web.api.utils.reverse')
@istest
def enrich_release_3(self, mock_django_reverse):
# given
mock_django_reverse.return_value = '/api/1/rev/3/'
# when
actual_release = utils.enrich_release({'target': '3',
'target_type': 'revision'})
# then
self.assertEqual(actual_release, {
'target': '3',
'target_type': 'revision',
'target_url': '/api/1/rev/3/'
})
mock_django_reverse.assert_called_once_with('revision',
kwargs={'sha1_git': '3'})
@patch('swh.web.api.utils.reverse')
@istest
def enrich_release_4(self, mock_django_reverse):
# given
mock_django_reverse.return_value = '/api/1/rev/4/'
# when
actual_release = utils.enrich_release({'target': '4',
'target_type': 'release'})
# then
self.assertEqual(actual_release, {
'target': '4',
'target_type': 'release',
'target_url': '/api/1/rev/4/'
})
mock_django_reverse.assert_called_once_with('release',
kwargs={'sha1_git': '4'})
@patch('swh.web.api.utils.reverse')
@istest
def enrich_directory_no_type(self, mock_django_reverse):
# when/then
self.assertEqual(utils.enrich_directory({'id': 'dir-id'}),
{'id': 'dir-id'})
# given
mock_django_reverse.return_value = '/api/content/sha1_git:123/'
# when
actual_directory = utils.enrich_directory({
'id': 'dir-id',
'type': 'file',
'target': '123',
})
# then
self.assertEqual(actual_directory, {
'id': 'dir-id',
'type': 'file',
'target': '123',
'target_url': '/api/content/sha1_git:123/',
})
mock_django_reverse.assert_called_once_with(
'content', kwargs={'q': 'sha1_git:123'})
@patch('swh.web.api.utils.reverse')
@istest
def enrich_directory_with_context_and_type_file(self, mock_django_reverse):
# given
mock_django_reverse.return_value = '/api/content/sha1_git:123/'
# when
actual_directory = utils.enrich_directory({
'id': 'dir-id',
'type': 'file',
'name': 'hy',
'target': '789',
}, context_url='/api/revision/revsha1/directory/prefix/path/')
# then
self.assertEqual(actual_directory, {
'id': 'dir-id',
'type': 'file',
'name': 'hy',
'target': '789',
'target_url': '/api/content/sha1_git:123/',
'file_url': '/api/revision/revsha1/directory'
'/prefix/path/hy/'
})
mock_django_reverse.assert_called_once_with(
'content', kwargs={'q': 'sha1_git:789'})
@patch('swh.web.api.utils.reverse')
@istest
def enrich_directory_with_context_and_type_dir(self, mock_django_reverse):
# given
mock_django_reverse.return_value = '/api/directory/456/'
# when
actual_directory = utils.enrich_directory({
'id': 'dir-id',
'type': 'dir',
'name': 'emacs-42',
'target_type': 'file',
'target': '456',
}, context_url='/api/revision/origin/2/directory/some/prefix/path/')
# then
self.assertEqual(actual_directory, {
'id': 'dir-id',
'type': 'dir',
'target_type': 'file',
'name': 'emacs-42',
'target': '456',
'target_url': '/api/directory/456/',
'dir_url': '/api/revision/origin/2/directory'
'/some/prefix/path/emacs-42/'
})
mock_django_reverse.assert_called_once_with('directory',
kwargs={'sha1_git': '456'})
@istest
def enrich_content_without_hashes(self):
# when/then
self.assertEqual(utils.enrich_content({'id': '123'}),
{'id': '123'})
@patch('swh.web.api.utils.reverse')
@istest
def enrich_content_with_hashes(self, mock_django_reverse):
for algo, hash in self.sample_content_hashes.items():
query_string = '%s:%s' % (algo, hash)
# given
mock_django_reverse.side_effect = [
'/api/content/%s/raw/' % query_string,
'/api/filetype/%s/' % query_string,
'/api/language/%s/' % query_string,
'/api/license/%s/' % query_string
]
# when
enriched_content = utils.enrich_content(
{
algo: hash,
},
query_string=query_string
)
# then
self.assertEqual(
enriched_content,
{
algo: hash,
'data_url': '/api/content/%s/raw/' % query_string,
'filetype_url': '/api/filetype/%s/' % query_string,
'language_url': '/api/language/%s/' % query_string,
'license_url': '/api/license/%s/' % query_string,
}
)
mock_django_reverse.assert_has_calls([
call('content-raw', kwargs={'q': query_string}),
call('content-filetype', kwargs={'q': query_string}),
call('content-language', kwargs={'q': query_string}),
call('content-license', kwargs={'q': query_string}),
])
mock_django_reverse.reset()
@patch('swh.web.api.utils.reverse')
@istest
def enrich_content_with_hashes_and_top_level_url(self,
mock_django_reverse):
for algo, hash in self.sample_content_hashes.items():
query_string = '%s:%s' % (algo, hash)
# given
mock_django_reverse.side_effect = [
'/api/content/%s/' % query_string,
'/api/content/%s/raw/' % query_string,
'/api/filetype/%s/' % query_string,
'/api/language/%s/' % query_string,
'/api/license/%s/' % query_string,
]
# when
enriched_content = utils.enrich_content(
{
algo: hash
},
top_url=True,
query_string=query_string
)
# then
self.assertEqual(
enriched_content,
{
algo: hash,
'content_url': '/api/content/%s/' % query_string,
'data_url': '/api/content/%s/raw/' % query_string,
'filetype_url': '/api/filetype/%s/' % query_string,
'language_url': '/api/language/%s/' % query_string,
'license_url': '/api/license/%s/' % query_string,
}
)
mock_django_reverse.assert_has_calls([
call('content', kwargs={'q': query_string}),
call('content-raw', kwargs={'q': query_string}),
call('content-filetype', kwargs={'q': query_string}),
call('content-language', kwargs={'q': query_string}),
call('content-license', kwargs={'q': query_string}),
])
mock_django_reverse.reset()
@istest
def enrich_entity_identity(self):
# when/then
self.assertEqual(utils.enrich_content({'id': '123'}),
{'id': '123'})
@patch('swh.web.api.utils.reverse')
@istest
def enrich_entity_with_sha1(self, mock_django_reverse):
# given
def reverse_test(view_name, kwargs):
return '/api/entity/' + kwargs['uuid'] + '/'
mock_django_reverse.side_effect = reverse_test
# when
actual_entity = utils.enrich_entity({
'uuid': 'uuid-1',
'parent': 'uuid-parent',
'name': 'something'
})
# then
self.assertEqual(actual_entity, {
'uuid': 'uuid-1',
'uuid_url': '/api/entity/uuid-1/',
'parent': 'uuid-parent',
'parent_url': '/api/entity/uuid-parent/',
'name': 'something',
})
mock_django_reverse.assert_has_calls(
[call('entity', kwargs={'uuid': 'uuid-1'}),
call('entity', kwargs={'uuid': 'uuid-parent'})])
@nottest
def _reverse_context_test(self, view_name, kwargs):
if view_name == 'revision':
return '/api/revision/%s/' % kwargs['sha1_git']
elif view_name == 'revision-context':
return '/api/revision/%s/prev/%s/' % (kwargs['sha1_git'], kwargs['context']) # noqa
elif view_name == 'revision-log':
if 'prev_sha1s' in kwargs:
return '/api/revision/%s/prev/%s/log/' % (kwargs['sha1_git'], kwargs['prev_sha1s']) # noqa
else:
return '/api/revision/%s/log/' % kwargs['sha1_git']
@patch('swh.web.api.utils.reverse')
@istest
def enrich_revision_without_children_or_parent(self, mock_django_reverse):
# given
def reverse_test(view_name, kwargs):
if view_name == 'revision':
return '/api/revision/' + kwargs['sha1_git'] + '/'
elif view_name == 'revision-log':
return '/api/revision/' + kwargs['sha1_git'] + '/log/'
elif view_name == 'directory':
return '/api/directory/' + kwargs['sha1_git'] + '/'
elif view_name == 'person':
return '/api/person/' + kwargs['person_id'] + '/'
mock_django_reverse.side_effect = reverse_test
# when
actual_revision = utils.enrich_revision({
'id': 'rev-id',
'directory': '123',
'author': {'id': '1'},
'committer': {'id': '2'},
})
expected_revision = {
'id': 'rev-id',
'directory': '123',
'url': '/api/revision/rev-id/',
'history_url': '/api/revision/rev-id/log/',
'directory_url': '/api/directory/123/',
'author': {'id': '1'},
'author_url': '/api/person/1/',
'committer': {'id': '2'},
'committer_url': '/api/person/2/'
}
# then
self.assertEqual(actual_revision, expected_revision)
mock_django_reverse.assert_has_calls(
[call('revision', kwargs={'sha1_git': 'rev-id'}),
call('revision-log', kwargs={'sha1_git': 'rev-id'}),
call('person', kwargs={'person_id': '1'}),
call('person', kwargs={'person_id': '2'}),
call('directory', kwargs={'sha1_git': '123'})])
@patch('swh.web.api.utils.reverse')
@istest
def enrich_revision_with_children_and_parent_no_dir(self,
mock_django_reverse):
# given
mock_django_reverse.side_effect = self._reverse_context_test
# when
actual_revision = utils.enrich_revision({
'id': 'rev-id',
'parents': ['123'],
'children': ['456'],
}, context='prev-rev')
expected_revision = {
'id': 'rev-id',
'url': '/api/revision/rev-id/',
'history_url': '/api/revision/rev-id/log/',
'history_context_url': '/api/revision/rev-id/prev/prev-rev/log/',
'parents': [{'id': '123', 'url': '/api/revision/123/'}],
'children': ['456'],
'children_urls': ['/api/revision/456/',
'/api/revision/prev-rev/'],
}
# then
self.assertEqual(actual_revision, expected_revision)
mock_django_reverse.assert_has_calls(
[call('revision', kwargs={'sha1_git': 'prev-rev'}),
call('revision', kwargs={'sha1_git': 'rev-id'}),
call('revision-log', kwargs={'sha1_git': 'rev-id'}),
call('revision-log', kwargs={'sha1_git': 'rev-id',
'prev_sha1s': 'prev-rev'}),
call('revision', kwargs={'sha1_git': '123'}),
call('revision', kwargs={'sha1_git': '456'})])
@patch('swh.web.api.utils.reverse')
@istest
def enrich_revision_no_context(self, mock_django_reverse):
# given
mock_django_reverse.side_effect = self._reverse_context_test
# when
actual_revision = utils.enrich_revision({
'id': 'rev-id',
'parents': ['123'],
'children': ['456'],
})
expected_revision = {
'id': 'rev-id',
'url': '/api/revision/rev-id/',
'history_url': '/api/revision/rev-id/log/',
'parents': [{'id': '123', 'url': '/api/revision/123/'}],
'children': ['456'],
'children_urls': ['/api/revision/456/']
}
# then
self.assertEqual(actual_revision, expected_revision)
mock_django_reverse.assert_has_calls(
[call('revision', kwargs={'sha1_git': 'rev-id'}),
call('revision-log', kwargs={'sha1_git': 'rev-id'}),
call('revision', kwargs={'sha1_git': '123'}),
call('revision', kwargs={'sha1_git': '456'})])
@patch('swh.web.api.utils.reverse')
@istest
def enrich_revision_context_empty_prev_list(self, mock_django_reverse):
# given
mock_django_reverse.side_effect = self._reverse_context_test
# when
expected_revision = {
'id': 'rev-id',
'url': '/api/revision/rev-id/',
'history_url': '/api/revision/rev-id/log/',
'history_context_url': ('/api/revision/rev-id/'
'prev/prev-rev/log/'),
'parents': [{'id': '123', 'url': '/api/revision/123/'}],
'children': ['456'],
'children_urls': ['/api/revision/456/',
'/api/revision/prev-rev/'],
}
actual_revision = utils.enrich_revision({
'id': 'rev-id',
'url': '/api/revision/rev-id/',
'parents': ['123'],
'children': ['456']}, context='prev-rev')
# then
self.assertEqual(actual_revision, expected_revision)
mock_django_reverse.assert_has_calls(
[call('revision', kwargs={'sha1_git': 'prev-rev'}),
call('revision', kwargs={'sha1_git': 'rev-id'}),
call('revision-log', kwargs={'sha1_git': 'rev-id'}),
call('revision-log', kwargs={'sha1_git': 'rev-id',
'prev_sha1s': 'prev-rev'}),
call('revision', kwargs={'sha1_git': '123'}),
call('revision', kwargs={'sha1_git': '456'})])
@patch('swh.web.api.utils.reverse')
@istest
def enrich_revision_context_some_prev_list(self, mock_django_reverse):
# given
mock_django_reverse.side_effect = self._reverse_context_test
# when
expected_revision = {
'id': 'rev-id',
'url': '/api/revision/rev-id/',
'history_url': '/api/revision/rev-id/log/',
'history_context_url': ('/api/revision/rev-id/'
'prev/prev1-rev/prev0-rev/log/'),
'parents': [{'id': '123', 'url': '/api/revision/123/'}],
'children': ['456'],
'children_urls': ['/api/revision/456/',
'/api/revision/prev0-rev/prev/prev1-rev/'],
}
actual_revision = utils.enrich_revision({
'id': 'rev-id',
'parents': ['123'],
'children': ['456']}, context='prev1-rev/prev0-rev')
# then
self.assertEqual(actual_revision, expected_revision)
mock_django_reverse.assert_has_calls(
[call('revision-context', kwargs={'context': 'prev1-rev',
'sha1_git': 'prev0-rev'}),
call('revision', kwargs={'sha1_git': 'rev-id'}),
call('revision-log', kwargs={'sha1_git': 'rev-id'}),
call('revision-log', kwargs={'prev_sha1s': 'prev1-rev/prev0-rev',
'sha1_git': 'rev-id'}),
call('revision', kwargs={'sha1_git': '123'}),
call('revision', kwargs={'sha1_git': '456'})])
@nottest
def _reverse_rev_message_test(self, view_name, kwargs):
if view_name == 'revision':
return '/api/revision/%s/' % kwargs['sha1_git']
elif view_name == 'revision-log':
if 'prev_sha1s' in kwargs and kwargs['prev_sha1s'] is not None:
return '/api/revision/%s/prev/%s/log/' % (kwargs['sha1_git'], kwargs['prev_sha1s']) # noqa
else:
return '/api/revision/%s/log/' % kwargs['sha1_git']
elif view_name == 'revision-raw-message':
return '/api/revision/' + kwargs['sha1_git'] + '/raw/'
else:
return '/api/revision/%s/prev/%s/' % (kwargs['sha1_git'], kwargs['context']) # noqa
@patch('swh.web.api.utils.reverse')
@istest
def enrich_revision_with_no_message(self, mock_django_reverse):
# given
mock_django_reverse.side_effect = self._reverse_rev_message_test
# when
expected_revision = {
'id': 'rev-id',
'url': '/api/revision/rev-id/',
'history_url': '/api/revision/rev-id/log/',
'history_context_url': ('/api/revision/rev-id/'
'prev/prev-rev/log/'),
'message': None,
'parents': [{'id': '123', 'url': '/api/revision/123/'}],
'children': ['456'],
'children_urls': ['/api/revision/456/',
'/api/revision/prev-rev/'],
}
actual_revision = utils.enrich_revision({
'id': 'rev-id',
'message': None,
'parents': ['123'],
'children': ['456'],
}, context='prev-rev')
# then
self.assertEqual(actual_revision, expected_revision)
mock_django_reverse.assert_has_calls(
[call('revision', kwargs={'sha1_git': 'prev-rev'}),
call('revision', kwargs={'sha1_git': 'rev-id'}),
call('revision-log', kwargs={'sha1_git': 'rev-id'}),
call('revision-log', kwargs={'sha1_git': 'rev-id',
'prev_sha1s': 'prev-rev'}),
call('revision', kwargs={'sha1_git': '123'}),
call('revision', kwargs={'sha1_git': '456'})]
)
@patch('swh.web.api.utils.reverse')
@istest
def enrich_revision_with_invalid_message(self, mock_django_reverse):
# given
mock_django_reverse.side_effect = self._reverse_rev_message_test
# when
actual_revision = utils.enrich_revision({
'id': 'rev-id',
'message': None,
'message_decoding_failed': True,
'parents': ['123'],
'children': ['456'],
}, context='prev-rev')
expected_revision = {
'id': 'rev-id',
'url': '/api/revision/rev-id/',
'history_url': '/api/revision/rev-id/log/',
'history_context_url': ('/api/revision/rev-id/'
'prev/prev-rev/log/'),
'message': None,
'message_decoding_failed': True,
'message_url': '/api/revision/rev-id/raw/',
'parents': [{'id': '123', 'url': '/api/revision/123/'}],
'children': ['456'],
'children_urls': ['/api/revision/456/',
'/api/revision/prev-rev/'],
}
# then
self.assertEqual(actual_revision, expected_revision)
mock_django_reverse.assert_has_calls(
[call('revision', kwargs={'sha1_git': 'prev-rev'}),
call('revision', kwargs={'sha1_git': 'rev-id'}),
call('revision-log', kwargs={'sha1_git': 'rev-id'}),
call('revision-log', kwargs={'sha1_git': 'rev-id',
'prev_sha1s': 'prev-rev'}),
call('revision', kwargs={'sha1_git': '123'}),
call('revision', kwargs={'sha1_git': '456'}),
call('revision-raw-message', kwargs={'sha1_git': 'rev-id'})])
diff --git a/swh/web/tests/common/test_converters.py b/swh/web/tests/common/test_converters.py
index f33e5102..cc4afdd3 100644
--- a/swh/web/tests/common/test_converters.py
+++ b/swh/web/tests/common/test_converters.py
@@ -1,791 +1,808 @@
# Copyright (C) 2015-2018 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU Affero General Public License version 3, or any later version
# See top-level LICENSE file for more information
import datetime
import unittest
from nose.tools import istest
from swh.model import hashutil
from swh.web.common import converters
class ConvertersTestCase(unittest.TestCase):
+
+ @istest
+ def fmap(self):
+ self.assertEquals([2, 3, None, 4],
+ converters.fmap(lambda x: x+1, [1, 2, None, 3]))
+ self.assertEquals([11, 12, 13],
+ list(converters.fmap(lambda x: x+10,
+ map(lambda x: x, [1, 2, 3]))))
+ self.assertEquals({'a': 2, 'b': 4},
+ converters.fmap(lambda x: x*2, {'a': 1, 'b': 2}))
+ self.assertEquals(100,
+ converters.fmap(lambda x: x*10, 10))
+ self.assertEquals({'a': [2, 6], 'b': 4},
+ converters.fmap(lambda x: x*2, {'a': [1, 3], 'b': 2})) # noqa
+
+ self.assertIsNone(converters.fmap(lambda x: x, None))
+
@istest
def from_swh(self):
some_input = {
'a': 'something',
'b': 'someone',
'c': b'sharp-0.3.4.tgz',
'd': hashutil.hash_to_bytes(
'b04caf10e9535160d90e874b45aa426de762f19f'),
'e': b'sharp.html/doc_002dS_005fISREG.html',
'g': [b'utf-8-to-decode', b'another-one'],
'h': 'something filtered',
'i': {'e': b'something'},
'j': {
'k': {
'l': [b'bytes thing', b'another thingy', b''],
'n': 'dont care either'
},
'm': 'dont care'
},
'o': 'something',
'p': b'foo',
'q': {'extra-headers': [['a', b'intact']]},
'w': None,
'r': {'p': 'also intact',
'q': 'bar'},
's': {
'timestamp': 42,
'offset': -420,
'negative_utc': None,
},
's1': {
'timestamp': {'seconds': 42, 'microseconds': 0},
'offset': -420,
'negative_utc': None,
},
's2': datetime.datetime(
2013, 7, 1, 20, 0, 0,
tzinfo=datetime.timezone.utc),
't': None,
'u': None,
'v': None,
'x': None,
}
expected_output = {
'a': 'something',
'b': 'someone',
'c': 'sharp-0.3.4.tgz',
'd': 'b04caf10e9535160d90e874b45aa426de762f19f',
'e': 'sharp.html/doc_002dS_005fISREG.html',
'g': ['utf-8-to-decode', 'another-one'],
'i': {'e': 'something'},
'j': {
'k': {
'l': ['bytes thing', 'another thingy', '']
}
},
'p': 'foo',
'q': {'extra-headers': [['a', 'intact']]},
'w': {},
'r': {'p': 'also intact',
'q': 'bar'},
's': '1969-12-31T17:00:42-07:00',
's1': '1969-12-31T17:00:42-07:00',
's2': '2013-07-01T20:00:00+00:00',
'u': {},
'v': [],
'x': None,
}
actual_output = converters.from_swh(
some_input,
hashess={'d', 'o', 'x'},
bytess={'c', 'e', 'g', 'l'},
dates={'s', 's1', 's2'},
blacklist={'h', 'm', 'n', 'o'},
removables_if_empty={'t'},
empty_dict={'u'},
empty_list={'v'},
convert={'p', 'q', 'w'},
convert_fn=converters.convert_revision_metadata)
self.assertEquals(expected_output, actual_output)
@istest
def from_swh_edge_cases_do_no_conversion_if_none_or_not_bytes(self):
some_input = {
'a': 'something',
'b': None,
'c': 'someone',
'd': None,
'e': None
}
expected_output = {
'a': 'something',
'b': None,
'c': 'someone',
'd': None,
'e': None
}
actual_output = converters.from_swh(some_input,
hashess={'a', 'b'},
bytess={'c', 'd'},
dates={'e'})
self.assertEquals(expected_output, actual_output)
@istest
def from_swh_edge_cases_convert_invalid_utf8_bytes(self):
some_input = {
'a': 'something',
'b': 'someone',
'c': b'a name \xff',
'd': b'an email \xff',
}
expected_output = {
'a': 'something',
'b': 'someone',
'c': 'a name \\xff',
'd': 'an email \\xff',
'decoding_failures': ['c', 'd']
}
actual_output = converters.from_swh(some_input,
hashess={'a', 'b'},
bytess={'c', 'd'})
for v in ['a', 'b', 'c', 'd']:
self.assertEqual(expected_output[v], actual_output[v])
self.assertEqual(len(expected_output['decoding_failures']),
len(actual_output['decoding_failures']))
for v in expected_output['decoding_failures']:
self.assertTrue(v in actual_output['decoding_failures'])
@istest
def from_swh_empty(self):
# when
self.assertEquals({}, converters.from_swh({}))
@istest
def from_swh_none(self):
# when
self.assertIsNone(converters.from_swh(None))
@istest
def from_provenance(self):
# given
input_provenance = {
'origin': 10,
'visit': 1,
'content': hashutil.hash_to_bytes(
'321caf10e9535160d90e874b45aa426de762f19f'),
'revision': hashutil.hash_to_bytes(
'123caf10e9535160d90e874b45aa426de762f19f'),
'path': b'octave-3.4.0/doc/interpreter/octave/doc_002dS_005fISREG'
}
expected_provenance = {
'origin': 10,
'visit': 1,
'content': '321caf10e9535160d90e874b45aa426de762f19f',
'revision': '123caf10e9535160d90e874b45aa426de762f19f',
'path': 'octave-3.4.0/doc/interpreter/octave/doc_002dS_005fISREG'
}
# when
actual_provenance = converters.from_provenance(input_provenance)
# then
self.assertEqual(actual_provenance, expected_provenance)
@istest
def from_origin(self):
# given
origin_input = {
'id': 9,
'type': 'ftp',
'url': 'rsync://ftp.gnu.org/gnu/octave',
'project': None,
'lister': None,
}
expected_origin = {
'id': 9,
'type': 'ftp',
'url': 'rsync://ftp.gnu.org/gnu/octave',
}
# when
actual_origin = converters.from_origin(origin_input)
# then
self.assertEqual(actual_origin, expected_origin)
@istest
def from_origin_visit(self):
snap_hash = 'b5f0b7f716735ebffe38505c60145c4fd9da6ca3'
for snap in [snap_hash, None]:
# given
visit = {
'date': {
'timestamp': datetime.datetime(
2015, 1, 1, 22, 0, 0,
tzinfo=datetime.timezone.utc).timestamp(),
'offset': 0,
'negative_utc': False,
},
'origin': 10,
'visit': 100,
'metadata': None,
'status': 'full',
'snapshot': hashutil.hash_to_bytes(snap) if snap else snap,
}
expected_visit = {
'date': '2015-01-01T22:00:00+00:00',
'origin': 10,
'visit': 100,
'metadata': {},
'status': 'full',
'snapshot': snap_hash if snap else snap
}
# when
actual_visit = converters.from_origin_visit(visit)
# then
self.assertEqual(actual_visit, expected_visit)
@istest
def from_release(self):
release_input = {
'id': hashutil.hash_to_bytes(
'aad23fa492a0c5fed0708a6703be875448c86884'),
'target': hashutil.hash_to_bytes(
'5e46d564378afc44b31bb89f99d5675195fbdf67'),
'target_type': 'revision',
'date': {
'timestamp': datetime.datetime(
2015, 1, 1, 22, 0, 0,
tzinfo=datetime.timezone.utc).timestamp(),
'offset': 0,
'negative_utc': False,
},
'author': {
'name': b'author name',
'fullname': b'Author Name author@email',
'email': b'author@email',
},
'name': b'v0.0.1',
'message': b'some comment on release',
'synthetic': True,
}
expected_release = {
'id': 'aad23fa492a0c5fed0708a6703be875448c86884',
'target': '5e46d564378afc44b31bb89f99d5675195fbdf67',
'target_type': 'revision',
'date': '2015-01-01T22:00:00+00:00',
'author': {
'name': 'author name',
'fullname': 'Author Name author@email',
'email': 'author@email',
},
'name': 'v0.0.1',
'message': 'some comment on release',
'target_type': 'revision',
'synthetic': True,
}
# when
actual_release = converters.from_release(release_input)
# then
self.assertEqual(actual_release, expected_release)
@istest
def from_release_no_revision(self):
release_input = {
'id': hashutil.hash_to_bytes(
'b2171ee2bdf119cd99a7ec7eff32fa8013ef9a4e'),
'target': None,
'date': {
'timestamp': datetime.datetime(
2016, 3, 2, 10, 0, 0,
tzinfo=datetime.timezone.utc).timestamp(),
'offset': 0,
'negative_utc': True,
},
'name': b'v0.1.1',
'message': b'comment on release',
'synthetic': False,
'author': {
'name': b'bob',
'fullname': b'Bob bob@alice.net',
'email': b'bob@alice.net',
},
}
expected_release = {
'id': 'b2171ee2bdf119cd99a7ec7eff32fa8013ef9a4e',
'target': None,
'date': '2016-03-02T10:00:00-00:00',
'name': 'v0.1.1',
'message': 'comment on release',
'synthetic': False,
'author': {
'name': 'bob',
'fullname': 'Bob bob@alice.net',
'email': 'bob@alice.net',
},
}
# when
actual_release = converters.from_release(release_input)
# then
self.assertEqual(actual_release, expected_release)
@istest
def from_revision(self):
revision_input = {
'id': hashutil.hash_to_bytes(
'18d8be353ed3480476f032475e7c233eff7371d5'),
'directory': hashutil.hash_to_bytes(
'7834ef7e7c357ce2af928115c6c6a42b7e2a44e6'),
'author': {
'name': b'Software Heritage',
'fullname': b'robot robot@softwareheritage.org',
'email': b'robot@softwareheritage.org',
},
'committer': {
'name': b'Software Heritage',
'fullname': b'robot robot@softwareheritage.org',
'email': b'robot@softwareheritage.org',
},
'message': b'synthetic revision message',
'date': {
'timestamp': datetime.datetime(
2000, 1, 17, 11, 23, 54,
tzinfo=datetime.timezone.utc).timestamp(),
'offset': 0,
'negative_utc': False,
},
'committer_date': {
'timestamp': datetime.datetime(
2000, 1, 17, 11, 23, 54,
tzinfo=datetime.timezone.utc).timestamp(),
'offset': 0,
'negative_utc': False,
},
'synthetic': True,
'type': 'tar',
'parents': [
hashutil.hash_to_bytes(
'29d8be353ed3480476f032475e7c244eff7371d5'),
hashutil.hash_to_bytes(
'30d8be353ed3480476f032475e7c244eff7371d5')
],
'children': [
hashutil.hash_to_bytes(
'123546353ed3480476f032475e7c244eff7371d5'),
],
'metadata': {
'extra_headers': [['gpgsig', b'some-signature']],
'original_artifact': [{
'archive_type': 'tar',
'name': 'webbase-5.7.0.tar.gz',
'sha1': '147f73f369733d088b7a6fa9c4e0273dcd3c7ccd',
'sha1_git': '6a15ea8b881069adedf11feceec35588f2cfe8f1',
'sha256': '401d0df797110bea805d358b85bcc1ced29549d3d73f'
'309d36484e7edf7bb912',
}]
},
}
expected_revision = {
'id': '18d8be353ed3480476f032475e7c233eff7371d5',
'directory': '7834ef7e7c357ce2af928115c6c6a42b7e2a44e6',
'author': {
'name': 'Software Heritage',
'fullname': 'robot robot@softwareheritage.org',
'email': 'robot@softwareheritage.org',
},
'committer': {
'name': 'Software Heritage',
'fullname': 'robot robot@softwareheritage.org',
'email': 'robot@softwareheritage.org',
},
'message': 'synthetic revision message',
'date': "2000-01-17T11:23:54+00:00",
'committer_date': "2000-01-17T11:23:54+00:00",
'children': [
'123546353ed3480476f032475e7c244eff7371d5'
],
'parents': [
'29d8be353ed3480476f032475e7c244eff7371d5',
'30d8be353ed3480476f032475e7c244eff7371d5'
],
'type': 'tar',
'synthetic': True,
'metadata': {
'extra_headers': [['gpgsig', 'some-signature']],
'original_artifact': [{
'archive_type': 'tar',
'name': 'webbase-5.7.0.tar.gz',
'sha1': '147f73f369733d088b7a6fa9c4e0273dcd3c7ccd',
'sha1_git': '6a15ea8b881069adedf11feceec35588f2cfe8f1',
'sha256': '401d0df797110bea805d358b85bcc1ced29549d3d73f'
'309d36484e7edf7bb912'
}]
},
'merge': True
}
# when
actual_revision = converters.from_revision(revision_input)
# then
self.assertEqual(actual_revision, expected_revision)
@istest
def from_revision_nomerge(self):
revision_input = {
'id': hashutil.hash_to_bytes(
'18d8be353ed3480476f032475e7c233eff7371d5'),
'parents': [
hashutil.hash_to_bytes(
'29d8be353ed3480476f032475e7c244eff7371d5')
]
}
expected_revision = {
'id': '18d8be353ed3480476f032475e7c233eff7371d5',
'parents': [
'29d8be353ed3480476f032475e7c244eff7371d5'
],
'merge': False
}
# when
actual_revision = converters.from_revision(revision_input)
# then
self.assertEqual(actual_revision, expected_revision)
@istest
def from_revision_noparents(self):
revision_input = {
'id': hashutil.hash_to_bytes(
'18d8be353ed3480476f032475e7c233eff7371d5'),
'directory': hashutil.hash_to_bytes(
'7834ef7e7c357ce2af928115c6c6a42b7e2a44e6'),
'author': {
'name': b'Software Heritage',
'fullname': b'robot robot@softwareheritage.org',
'email': b'robot@softwareheritage.org',
},
'committer': {
'name': b'Software Heritage',
'fullname': b'robot robot@softwareheritage.org',
'email': b'robot@softwareheritage.org',
},
'message': b'synthetic revision message',
'date': {
'timestamp': datetime.datetime(
2000, 1, 17, 11, 23, 54,
tzinfo=datetime.timezone.utc).timestamp(),
'offset': 0,
'negative_utc': False,
},
'committer_date': {
'timestamp': datetime.datetime(
2000, 1, 17, 11, 23, 54,
tzinfo=datetime.timezone.utc).timestamp(),
'offset': 0,
'negative_utc': False,
},
'synthetic': True,
'type': 'tar',
'children': [
hashutil.hash_to_bytes(
'123546353ed3480476f032475e7c244eff7371d5'),
],
'metadata': {
'original_artifact': [{
'archive_type': 'tar',
'name': 'webbase-5.7.0.tar.gz',
'sha1': '147f73f369733d088b7a6fa9c4e0273dcd3c7ccd',
'sha1_git': '6a15ea8b881069adedf11feceec35588f2cfe8f1',
'sha256': '401d0df797110bea805d358b85bcc1ced29549d3d73f'
'309d36484e7edf7bb912',
}]
},
}
expected_revision = {
'id': '18d8be353ed3480476f032475e7c233eff7371d5',
'directory': '7834ef7e7c357ce2af928115c6c6a42b7e2a44e6',
'author': {
'name': 'Software Heritage',
'fullname': 'robot robot@softwareheritage.org',
'email': 'robot@softwareheritage.org',
},
'committer': {
'name': 'Software Heritage',
'fullname': 'robot robot@softwareheritage.org',
'email': 'robot@softwareheritage.org',
},
'message': 'synthetic revision message',
'date': "2000-01-17T11:23:54+00:00",
'committer_date': "2000-01-17T11:23:54+00:00",
'children': [
'123546353ed3480476f032475e7c244eff7371d5'
],
'type': 'tar',
'synthetic': True,
'metadata': {
'original_artifact': [{
'archive_type': 'tar',
'name': 'webbase-5.7.0.tar.gz',
'sha1': '147f73f369733d088b7a6fa9c4e0273dcd3c7ccd',
'sha1_git': '6a15ea8b881069adedf11feceec35588f2cfe8f1',
'sha256': '401d0df797110bea805d358b85bcc1ced29549d3d73f'
'309d36484e7edf7bb912'
}]
}
}
# when
actual_revision = converters.from_revision(revision_input)
# then
self.assertEqual(actual_revision, expected_revision)
@istest
def from_revision_invalid(self):
revision_input = {
'id': hashutil.hash_to_bytes(
'18d8be353ed3480476f032475e7c233eff7371d5'),
'directory': hashutil.hash_to_bytes(
'7834ef7e7c357ce2af928115c6c6a42b7e2a44e6'),
'author': {
'name': b'Software Heritage',
'fullname': b'robot robot@softwareheritage.org',
'email': b'robot@softwareheritage.org',
},
'committer': {
'name': b'Software Heritage',
'fullname': b'robot robot@softwareheritage.org',
'email': b'robot@softwareheritage.org',
},
'message': b'invalid message \xff',
'date': {
'timestamp': datetime.datetime(
2000, 1, 17, 11, 23, 54,
tzinfo=datetime.timezone.utc).timestamp(),
'offset': 0,
'negative_utc': False,
},
'committer_date': {
'timestamp': datetime.datetime(
2000, 1, 17, 11, 23, 54,
tzinfo=datetime.timezone.utc).timestamp(),
'offset': 0,
'negative_utc': False,
},
'synthetic': True,
'type': 'tar',
'parents': [
hashutil.hash_to_bytes(
'29d8be353ed3480476f032475e7c244eff7371d5'),
hashutil.hash_to_bytes(
'30d8be353ed3480476f032475e7c244eff7371d5')
],
'children': [
hashutil.hash_to_bytes(
'123546353ed3480476f032475e7c244eff7371d5'),
],
'metadata': {
'original_artifact': [{
'archive_type': 'tar',
'name': 'webbase-5.7.0.tar.gz',
'sha1': '147f73f369733d088b7a6fa9c4e0273dcd3c7ccd',
'sha1_git': '6a15ea8b881069adedf11feceec35588f2cfe8f1',
'sha256': '401d0df797110bea805d358b85bcc1ced29549d3d73f'
'309d36484e7edf7bb912',
}]
},
}
expected_revision = {
'id': '18d8be353ed3480476f032475e7c233eff7371d5',
'directory': '7834ef7e7c357ce2af928115c6c6a42b7e2a44e6',
'author': {
'name': 'Software Heritage',
'fullname': 'robot robot@softwareheritage.org',
'email': 'robot@softwareheritage.org',
},
'committer': {
'name': 'Software Heritage',
'fullname': 'robot robot@softwareheritage.org',
'email': 'robot@softwareheritage.org',
},
'message': None,
'message_decoding_failed': True,
'date': "2000-01-17T11:23:54+00:00",
'committer_date': "2000-01-17T11:23:54+00:00",
'children': [
'123546353ed3480476f032475e7c244eff7371d5'
],
'parents': [
'29d8be353ed3480476f032475e7c244eff7371d5',
'30d8be353ed3480476f032475e7c244eff7371d5'
],
'type': 'tar',
'synthetic': True,
'metadata': {
'original_artifact': [{
'archive_type': 'tar',
'name': 'webbase-5.7.0.tar.gz',
'sha1': '147f73f369733d088b7a6fa9c4e0273dcd3c7ccd',
'sha1_git': '6a15ea8b881069adedf11feceec35588f2cfe8f1',
'sha256': '401d0df797110bea805d358b85bcc1ced29549d3d73f'
'309d36484e7edf7bb912'
}]
},
'merge': True
}
# when
actual_revision = converters.from_revision(revision_input)
# then
self.assertEqual(actual_revision, expected_revision)
@istest
def from_content_none(self):
self.assertIsNone(converters.from_content(None))
@istest
def from_content(self):
content_input = {
'sha1': hashutil.hash_to_bytes(
'5c6f0e2750f48fa0bd0c4cf5976ba0b9e02ebda5'),
'sha256': hashutil.hash_to_bytes(
'39007420ca5de7cb3cfc15196335507e'
'e76c98930e7e0afa4d2747d3bf96c926'),
'blake2s256': hashutil.hash_to_bytes(
'49007420ca5de7cb3cfc15196335507e'
'e76c98930e7e0afa4d2747d3bf96c926'),
'sha1_git': hashutil.hash_to_bytes(
'40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03'),
'ctime': 'something-which-is-filtered-out',
'data': b'data in bytes',
'length': 10,
'status': 'hidden',
}
# 'status' is filtered
expected_content = {
'checksums': {
'sha1': '5c6f0e2750f48fa0bd0c4cf5976ba0b9e02ebda5',
'sha256': '39007420ca5de7cb3cfc15196335507ee76c98'
'930e7e0afa4d2747d3bf96c926',
'blake2s256': '49007420ca5de7cb3cfc15196335507ee7'
'6c98930e7e0afa4d2747d3bf96c926',
'sha1_git': '40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03',
},
'data': b'data in bytes',
'length': 10,
'status': 'absent',
}
# when
actual_content = converters.from_content(content_input)
# then
self.assertEqual(actual_content, expected_content)
@istest
def from_person(self):
person_input = {
'id': 10,
'anything': 'else',
'name': b'bob',
'fullname': b'bob bob@alice.net',
'email': b'bob@foo.alice',
}
expected_person = {
'id': 10,
'anything': 'else',
'name': 'bob',
'fullname': 'bob bob@alice.net',
'email': 'bob@foo.alice',
}
# when
actual_person = converters.from_person(person_input)
# then
self.assertEqual(actual_person, expected_person)
@istest
def from_directory_entries(self):
dir_entries_input = {
'sha1': hashutil.hash_to_bytes(
'5c6f0e2750f48fa0bd0c4cf5976ba0b9e02ebda5'),
'sha256': hashutil.hash_to_bytes(
'39007420ca5de7cb3cfc15196335507e'
'e76c98930e7e0afa4d2747d3bf96c926'),
'sha1_git': hashutil.hash_to_bytes(
'40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03'),
'blake2s256': hashutil.hash_to_bytes(
'685395c5dc57cada459364f0946d3dd45bad5fcbab'
'c1048edb44380f1d31d0aa'),
'target': hashutil.hash_to_bytes(
'40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03'),
'dir_id': hashutil.hash_to_bytes(
'40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03'),
'name': b'bob',
'type': 10,
'status': 'hidden',
}
expected_dir_entries = {
'checksums': {
'sha1': '5c6f0e2750f48fa0bd0c4cf5976ba0b9e02ebda5',
'sha256': '39007420ca5de7cb3cfc15196335507ee76c98'
'930e7e0afa4d2747d3bf96c926',
'sha1_git': '40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03',
'blake2s256': '685395c5dc57cada459364f0946d3dd45bad5f'
'cbabc1048edb44380f1d31d0aa',
},
'target': '40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03',
'dir_id': '40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03',
'name': 'bob',
'type': 10,
'status': 'absent',
}
# when
actual_dir_entries = converters.from_directory_entry(dir_entries_input)
# then
self.assertEqual(actual_dir_entries, expected_dir_entries)
@istest
def from_filetype(self):
content_filetype = {
'id': hashutil.hash_to_bytes(
'5c6f0e2750f48fa0bd0c4cf5976ba0b9e02ebda5'),
'encoding': b'utf-8',
'mimetype': b'text/plain',
}
expected_content_filetype = {
'id': '5c6f0e2750f48fa0bd0c4cf5976ba0b9e02ebda5',
'encoding': 'utf-8',
'mimetype': 'text/plain',
}
# when
actual_content_filetype = converters.from_filetype(content_filetype)
# then
self.assertEqual(actual_content_filetype, expected_content_filetype)
File Metadata
Details
Attached
Mime Type
text/x-diff
Expires
Jul 4 2025, 8:38 AM (7 w, 22 h ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3304598
Attached To
rDWAPPS Web applications
Event Timeline
Log In to Comment