Page Menu
Home
Software Heritage
Search
Configure Global Search
Log In
Files
F9339876
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
141 KB
Subscribers
None
View Options
diff --git a/swh/web/ui/converters.py b/swh/web/ui/converters.py
index 71f33078..f462e9f7 100644
--- a/swh/web/ui/converters.py
+++ b/swh/web/ui/converters.py
@@ -1,301 +1,312 @@
# Copyright (C) 2015-2017 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU Affero General Public License version 3, or any later version
# See top-level LICENSE file for more information
import datetime
import json
from swh.core import hashutil
from swh.core.utils import decode_with_escape
from swh.web.ui import utils
def from_swh(dict_swh, hashess={}, bytess={}, dates={}, blacklist={},
removables_if_empty={}, empty_dict={}, empty_list={},
convert={}, convert_fn=lambda x: x):
"""Convert from an swh dictionary to something reasonably json
serializable.
Args:
- dict_swh: the origin dictionary needed to be transformed
- hashess: list/set of keys representing hashes values (sha1, sha256,
sha1_git, etc...) as bytes. Those need to be transformed in hexadecimal
string
- bytess: list/set of keys representing bytes values which needs to
be decoded
- blacklist: set of keys to filter out from the conversion
- convert: set of keys whose associated values need to be converted
using convert_fn
- convert_fn: the conversion function to apply on the value of key
in 'convert'
The remaining keys are copied as is in the output.
Returns:
dictionary equivalent as dict_swh only with its keys `converted`.
"""
def convert_hashes_bytes(v):
"""v is supposedly a hash as bytes, returns it converted in hex.
"""
if isinstance(v, bytes):
return hashutil.hash_to_hex(v)
return v
def convert_bytes(v):
"""v is supposedly a bytes string, decode as utf-8.
FIXME: Improve decoding policy.
If not utf-8, break!
"""
if isinstance(v, bytes):
return v.decode('utf-8')
return v
def convert_date(v):
- """v is a dict with three keys:
- timestamp
- offset
- negative_utc
+ """v is either:
+ - a dict with three keys:
+ - timestamp (dict or integer timestamp)
+ - offset
+ - negative_utc
+ - a datetime
+
+ We convert it to a human-readable string
- We convert it to a human-readable string
"""
+ if isinstance(v, datetime.datetime):
+ return v.isoformat()
+
tz = datetime.timezone(datetime.timedelta(minutes=v['offset']))
- date = datetime.datetime.fromtimestamp(v['timestamp'], tz=tz)
+ swh_timestamp = v['timestamp']
+ if isinstance(swh_timestamp, dict):
+ date = datetime.datetime.fromtimestamp(
+ swh_timestamp['seconds'], tz=tz)
+ else:
+ date = datetime.datetime.fromtimestamp(
+ swh_timestamp, tz=tz)
datestr = date.isoformat()
if v['offset'] == 0 and v['negative_utc']:
# remove the rightmost + and replace it with a -
return '-'.join(datestr.rsplit('+', 1))
return datestr
if not dict_swh:
return dict_swh
new_dict = {}
for key, value in dict_swh.items():
if key in blacklist or (key in removables_if_empty and not value):
continue
if key in dates:
new_dict[key] = convert_date(value)
elif key in convert:
new_dict[key] = convert_fn(value)
elif isinstance(value, dict):
new_dict[key] = from_swh(value,
hashess=hashess, bytess=bytess,
dates=dates, blacklist=blacklist,
removables_if_empty=removables_if_empty,
empty_dict=empty_dict,
empty_list=empty_list,
convert=convert,
convert_fn=convert_fn)
elif key in hashess:
new_dict[key] = utils.fmap(convert_hashes_bytes, value)
elif key in bytess:
try:
new_dict[key] = utils.fmap(convert_bytes, value)
except UnicodeDecodeError:
if 'decoding_failures' not in new_dict:
new_dict['decoding_failures'] = [key]
else:
new_dict['decoding_failures'].append(key)
new_dict[key] = utils.fmap(decode_with_escape, value)
elif key in empty_dict and not value:
new_dict[key] = {}
elif key in empty_list and not value:
new_dict[key] = []
else:
new_dict[key] = value
return new_dict
def from_provenance(provenance):
"""Convert from a provenance information to a provenance dictionary.
Args:
provenance: Dictionary with the following keys:
content (sha1_git) : the content's identifier
revision (sha1_git) : the revision the content was seen
origin (int) : the origin the content was seen
visit (int) : the visit it occurred
path (bytes) : the path the content was seen at
"""
return from_swh(provenance,
hashess={'content', 'revision'},
bytess={'path'})
def from_origin(origin):
"""Convert from an SWH origin to an origin dictionary.
"""
return from_swh(origin,
removables_if_empty={'lister', 'project'})
def from_release(release):
"""Convert from an SWH release to a json serializable release dictionary.
Args:
release: Dict with the following keys
- id: identifier of the revision (sha1 in bytes)
- revision: identifier of the revision the release points to (sha1 in
bytes)
- comment: release's comment message (bytes)
- name: release's name (string)
- author: release's author identifier (swh's id)
- synthetic: the synthetic property (boolean)
Returns:
Release dictionary with the following keys:
- id: hexadecimal sha1 (string)
- revision: hexadecimal sha1 (string)
- comment: release's comment message (string)
- name: release's name (string)
- author: release's author identifier (swh's id)
- synthetic: the synthetic property (boolean)
"""
return from_swh(
release,
hashess={'id', 'target'},
bytess={'message', 'name', 'fullname', 'email'},
dates={'date'},
)
class SWHMetadataEncoder(json.JSONEncoder):
"""Special json encoder for metadata field which can contain bytes
encoded value.
"""
def default(self, obj):
if isinstance(obj, bytes):
return obj.decode('utf-8')
# Let the base class default method raise the TypeError
return json.JSONEncoder.default(self, obj)
def convert_revision_metadata(metadata):
"""Convert json specific dict to a json serializable one.
"""
if not metadata:
return {}
return json.loads(json.dumps(metadata, cls=SWHMetadataEncoder))
def from_revision(revision):
"""Convert from an SWH revision to a json serializable revision dictionary.
Args:
revision: Dict with the following keys
- id: identifier of the revision (sha1 in bytes)
- directory: identifier of the directory the revision points to (sha1
in bytes)
- author_name, author_email: author's revision name and email
- committer_name, committer_email: committer's revision name and email
- message: revision's message
- date, date_offset: revision's author date
- committer_date, committer_date_offset: revision's commit date
- parents: list of parents for such revision
- synthetic: revision's property nature
- type: revision's type (git, tar or dsc at the moment)
- metadata: if the revision is synthetic, this can reference dynamic
properties.
Returns:
Revision dictionary with the same keys as inputs, only:
- sha1s are in hexadecimal strings (id, directory)
- bytes are decoded in string (author_name, committer_name,
author_email, committer_email)
- remaining keys are left as is
"""
revision = from_swh(revision,
hashess={'id', 'directory', 'parents', 'children'},
bytess={'name', 'fullname', 'email'},
convert={'metadata'},
convert_fn=convert_revision_metadata,
dates={'date', 'committer_date'})
if revision:
if 'parents' in revision:
revision['merge'] = len(revision['parents']) > 1
if 'message' in revision:
try:
revision['message'] = revision['message'].decode('utf-8')
except UnicodeDecodeError:
revision['message_decoding_failed'] = True
revision['message'] = None
return revision
def from_content(content):
"""Convert swh content to serializable content dictionary.
"""
return from_swh(content,
hashess={'sha1', 'sha1_git', 'sha256'},
blacklist={'ctime'},
convert={'status'},
convert_fn=lambda v: 'absent' if v == 'hidden' else v)
def from_person(person):
"""Convert swh person to serializable person dictionary.
"""
return from_swh(person,
bytess={'name', 'fullname', 'email'})
def from_origin_visit(visit):
"""Convert swh origin_visit to serializable origin_visit dictionary.
"""
ov = from_swh(visit,
hashess={'target'},
bytess={'branch'},
- convert={'date'},
- empty_dict={'metadata'},
- convert_fn=lambda d: d.timestamp())
+ dates={'date'},
+ empty_dict={'metadata'})
if ov and 'occurrences' in ov:
ov['occurrences'] = {
decode_with_escape(k): v
for k, v in ov['occurrences'].items()
}
return ov
def from_directory_entry(dir_entry):
"""Convert swh person to serializable person dictionary.
"""
return from_swh(dir_entry,
hashess={'dir_id', 'sha1_git', 'sha1', 'sha256', 'target'},
bytess={'name'},
removables_if_empty={
'sha1', 'sha1_git', 'sha256', 'status'},
convert={'status'},
convert_fn=lambda v: 'absent' if v == 'hidden' else v)
def from_filetype(content_entry):
"""Convert swh person to serializable person dictionary.
"""
return from_swh(content_entry,
hashess={'id'},
bytess={'mimetype', 'encoding'})
diff --git a/swh/web/ui/tests/test_converters.py b/swh/web/ui/tests/test_converters.py
index 10f108a5..b154f716 100644
--- a/swh/web/ui/tests/test_converters.py
+++ b/swh/web/ui/tests/test_converters.py
@@ -1,725 +1,735 @@
# Copyright (C) 2015 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU Affero General Public License version 3, or any later version
# See top-level LICENSE file for more information
import datetime
import unittest
from nose.tools import istest
from swh.core import hashutil
from swh.web.ui import converters
class ConvertersTestCase(unittest.TestCase):
@istest
def from_swh(self):
some_input = {
'a': 'something',
'b': 'someone',
'c': b'sharp-0.3.4.tgz',
'd': hashutil.hex_to_hash(
'b04caf10e9535160d90e874b45aa426de762f19f'),
'e': b'sharp.html/doc_002dS_005fISREG.html',
'g': [b'utf-8-to-decode', b'another-one'],
'h': 'something filtered',
'i': {'e': b'something'},
'j': {
'k': {
'l': [b'bytes thing', b'another thingy', b''],
'n': 'dont care either'
},
'm': 'dont care'
},
'o': 'something',
'p': b'foo',
'q': {'extra-headers': [['a', b'intact']]},
'w': None,
'r': {'p': 'also intact',
'q': 'bar'},
's': {
'timestamp': 42,
'offset': -420,
'negative_utc': None,
},
+ 's1': {
+ 'timestamp': {'seconds': 42, 'microseconds': 0},
+ 'offset': -420,
+ 'negative_utc': None,
+ },
+ 's2': datetime.datetime(
+ 2013, 7, 1, 20, 0, 0,
+ tzinfo=datetime.timezone.utc),
't': None,
'u': None,
'v': None,
'x': None,
}
expected_output = {
'a': 'something',
'b': 'someone',
'c': 'sharp-0.3.4.tgz',
'd': 'b04caf10e9535160d90e874b45aa426de762f19f',
'e': 'sharp.html/doc_002dS_005fISREG.html',
'g': ['utf-8-to-decode', 'another-one'],
'i': {'e': 'something'},
'j': {
'k': {
'l': ['bytes thing', 'another thingy', '']
}
},
'p': 'foo',
'q': {'extra-headers': [['a', 'intact']]},
'w': {},
'r': {'p': 'also intact',
'q': 'bar'},
's': '1969-12-31T17:00:42-07:00',
+ 's1': '1969-12-31T17:00:42-07:00',
+ 's2': '2013-07-01T20:00:00+00:00',
'u': {},
'v': [],
'x': None,
}
actual_output = converters.from_swh(
some_input,
hashess={'d', 'o', 'x'},
bytess={'c', 'e', 'g', 'l'},
- dates={'s'},
+ dates={'s', 's1', 's2'},
blacklist={'h', 'm', 'n', 'o'},
removables_if_empty={'t'},
empty_dict={'u'},
empty_list={'v'},
convert={'p', 'q', 'w'},
convert_fn=converters.convert_revision_metadata)
self.assertEquals(expected_output, actual_output)
@istest
def from_swh_edge_cases_do_no_conversion_if_none_or_not_bytes(self):
some_input = {
'a': 'something',
'b': None,
'c': 'someone',
'd': None,
}
expected_output = {
'a': 'something',
'b': None,
'c': 'someone',
'd': None,
}
actual_output = converters.from_swh(some_input,
hashess={'a', 'b'},
bytess={'c', 'd'})
self.assertEquals(expected_output, actual_output)
@istest
def from_swh_edge_cases_convert_invalid_utf8_bytes(self):
some_input = {
'a': 'something',
'b': 'someone',
'c': b'a name \xff',
'd': b'an email \xff',
}
expected_output = {
'a': 'something',
'b': 'someone',
'c': 'a name \\xff',
'd': 'an email \\xff',
'decoding_failures': ['c', 'd']
}
actual_output = converters.from_swh(some_input,
hashess={'a', 'b'},
bytess={'c', 'd'})
for v in ['a', 'b', 'c', 'd']:
self.assertEqual(expected_output[v], actual_output[v])
self.assertEqual(len(expected_output['decoding_failures']),
len(actual_output['decoding_failures']))
for v in expected_output['decoding_failures']:
self.assertTrue(v in actual_output['decoding_failures'])
@istest
def from_swh_empty(self):
# when
self.assertEquals({}, converters.from_swh({}))
@istest
def from_swh_none(self):
# when
self.assertIsNone(converters.from_swh(None))
@istest
def from_provenance(self):
# given
input_provenance = {
'origin': 10,
'visit': 1,
'content': hashutil.hex_to_hash(
'321caf10e9535160d90e874b45aa426de762f19f'),
'revision': hashutil.hex_to_hash(
'123caf10e9535160d90e874b45aa426de762f19f'),
'path': b'octave-3.4.0/doc/interpreter/octave/doc_002dS_005fISREG'
}
expected_provenance = {
'origin': 10,
'visit': 1,
'content': '321caf10e9535160d90e874b45aa426de762f19f',
'revision': '123caf10e9535160d90e874b45aa426de762f19f',
'path': 'octave-3.4.0/doc/interpreter/octave/doc_002dS_005fISREG'
}
# when
actual_provenance = converters.from_provenance(input_provenance)
# then
self.assertEqual(actual_provenance, expected_provenance)
@istest
def from_origin(self):
# given
origin_input = {
'id': 9,
'type': 'ftp',
'url': 'rsync://ftp.gnu.org/gnu/octave',
'project': None,
'lister': None,
}
expected_origin = {
'id': 9,
'type': 'ftp',
'url': 'rsync://ftp.gnu.org/gnu/octave',
}
# when
actual_origin = converters.from_origin(origin_input)
# then
self.assertEqual(actual_origin, expected_origin)
@istest
def from_release(self):
release_input = {
'id': hashutil.hex_to_hash(
'aad23fa492a0c5fed0708a6703be875448c86884'),
'target': hashutil.hex_to_hash(
'5e46d564378afc44b31bb89f99d5675195fbdf67'),
'target_type': 'revision',
'date': {
'timestamp': datetime.datetime(
2015, 1, 1, 22, 0, 0,
tzinfo=datetime.timezone.utc).timestamp(),
'offset': 0,
'negative_utc': False,
},
'author': {
'name': b'author name',
'fullname': b'Author Name author@email',
'email': b'author@email',
},
'name': b'v0.0.1',
'message': b'some comment on release',
'synthetic': True,
}
expected_release = {
'id': 'aad23fa492a0c5fed0708a6703be875448c86884',
'target': '5e46d564378afc44b31bb89f99d5675195fbdf67',
'target_type': 'revision',
'date': '2015-01-01T22:00:00+00:00',
'author': {
'name': 'author name',
'fullname': 'Author Name author@email',
'email': 'author@email',
},
'name': 'v0.0.1',
'message': 'some comment on release',
'target_type': 'revision',
'synthetic': True,
}
# when
actual_release = converters.from_release(release_input)
# then
self.assertEqual(actual_release, expected_release)
@istest
def from_release_no_revision(self):
release_input = {
'id': hashutil.hex_to_hash(
'b2171ee2bdf119cd99a7ec7eff32fa8013ef9a4e'),
'target': None,
'date': {
'timestamp': datetime.datetime(
2016, 3, 2, 10, 0, 0,
tzinfo=datetime.timezone.utc).timestamp(),
'offset': 0,
'negative_utc': True,
},
'name': b'v0.1.1',
'message': b'comment on release',
'synthetic': False,
'author': {
'name': b'bob',
'fullname': b'Bob bob@alice.net',
'email': b'bob@alice.net',
},
}
expected_release = {
'id': 'b2171ee2bdf119cd99a7ec7eff32fa8013ef9a4e',
'target': None,
'date': '2016-03-02T10:00:00-00:00',
'name': 'v0.1.1',
'message': 'comment on release',
'synthetic': False,
'author': {
'name': 'bob',
'fullname': 'Bob bob@alice.net',
'email': 'bob@alice.net',
},
}
# when
actual_release = converters.from_release(release_input)
# then
self.assertEqual(actual_release, expected_release)
@istest
def from_revision(self):
revision_input = {
'id': hashutil.hex_to_hash(
'18d8be353ed3480476f032475e7c233eff7371d5'),
'directory': hashutil.hex_to_hash(
'7834ef7e7c357ce2af928115c6c6a42b7e2a44e6'),
'author': {
'name': b'Software Heritage',
'fullname': b'robot robot@softwareheritage.org',
'email': b'robot@softwareheritage.org',
},
'committer': {
'name': b'Software Heritage',
'fullname': b'robot robot@softwareheritage.org',
'email': b'robot@softwareheritage.org',
},
'message': b'synthetic revision message',
'date': {
'timestamp': datetime.datetime(
2000, 1, 17, 11, 23, 54,
tzinfo=datetime.timezone.utc).timestamp(),
'offset': 0,
'negative_utc': False,
},
'committer_date': {
'timestamp': datetime.datetime(
2000, 1, 17, 11, 23, 54,
tzinfo=datetime.timezone.utc).timestamp(),
'offset': 0,
'negative_utc': False,
},
'synthetic': True,
'type': 'tar',
'parents': [
hashutil.hex_to_hash(
'29d8be353ed3480476f032475e7c244eff7371d5'),
hashutil.hex_to_hash(
'30d8be353ed3480476f032475e7c244eff7371d5')
],
'children': [
hashutil.hex_to_hash(
'123546353ed3480476f032475e7c244eff7371d5'),
],
'metadata': {
'extra_headers': [['gpgsig', b'some-signature']],
'original_artifact': [{
'archive_type': 'tar',
'name': 'webbase-5.7.0.tar.gz',
'sha1': '147f73f369733d088b7a6fa9c4e0273dcd3c7ccd',
'sha1_git': '6a15ea8b881069adedf11feceec35588f2cfe8f1',
'sha256': '401d0df797110bea805d358b85bcc1ced29549d3d73f'
'309d36484e7edf7bb912',
}]
},
}
expected_revision = {
'id': '18d8be353ed3480476f032475e7c233eff7371d5',
'directory': '7834ef7e7c357ce2af928115c6c6a42b7e2a44e6',
'author': {
'name': 'Software Heritage',
'fullname': 'robot robot@softwareheritage.org',
'email': 'robot@softwareheritage.org',
},
'committer': {
'name': 'Software Heritage',
'fullname': 'robot robot@softwareheritage.org',
'email': 'robot@softwareheritage.org',
},
'message': 'synthetic revision message',
'date': "2000-01-17T11:23:54+00:00",
'committer_date': "2000-01-17T11:23:54+00:00",
'children': [
'123546353ed3480476f032475e7c244eff7371d5'
],
'parents': [
'29d8be353ed3480476f032475e7c244eff7371d5',
'30d8be353ed3480476f032475e7c244eff7371d5'
],
'type': 'tar',
'synthetic': True,
'metadata': {
'extra_headers': [['gpgsig', 'some-signature']],
'original_artifact': [{
'archive_type': 'tar',
'name': 'webbase-5.7.0.tar.gz',
'sha1': '147f73f369733d088b7a6fa9c4e0273dcd3c7ccd',
'sha1_git': '6a15ea8b881069adedf11feceec35588f2cfe8f1',
'sha256': '401d0df797110bea805d358b85bcc1ced29549d3d73f'
'309d36484e7edf7bb912'
}]
},
'merge': True
}
# when
actual_revision = converters.from_revision(revision_input)
# then
self.assertEqual(actual_revision, expected_revision)
@istest
def from_revision_nomerge(self):
revision_input = {
'id': hashutil.hex_to_hash(
'18d8be353ed3480476f032475e7c233eff7371d5'),
'parents': [
hashutil.hex_to_hash(
'29d8be353ed3480476f032475e7c244eff7371d5')
]
}
expected_revision = {
'id': '18d8be353ed3480476f032475e7c233eff7371d5',
'parents': [
'29d8be353ed3480476f032475e7c244eff7371d5'
],
'merge': False
}
# when
actual_revision = converters.from_revision(revision_input)
# then
self.assertEqual(actual_revision, expected_revision)
@istest
def from_revision_noparents(self):
revision_input = {
'id': hashutil.hex_to_hash(
'18d8be353ed3480476f032475e7c233eff7371d5'),
'directory': hashutil.hex_to_hash(
'7834ef7e7c357ce2af928115c6c6a42b7e2a44e6'),
'author': {
'name': b'Software Heritage',
'fullname': b'robot robot@softwareheritage.org',
'email': b'robot@softwareheritage.org',
},
'committer': {
'name': b'Software Heritage',
'fullname': b'robot robot@softwareheritage.org',
'email': b'robot@softwareheritage.org',
},
'message': b'synthetic revision message',
'date': {
'timestamp': datetime.datetime(
2000, 1, 17, 11, 23, 54,
tzinfo=datetime.timezone.utc).timestamp(),
'offset': 0,
'negative_utc': False,
},
'committer_date': {
'timestamp': datetime.datetime(
2000, 1, 17, 11, 23, 54,
tzinfo=datetime.timezone.utc).timestamp(),
'offset': 0,
'negative_utc': False,
},
'synthetic': True,
'type': 'tar',
'children': [
hashutil.hex_to_hash(
'123546353ed3480476f032475e7c244eff7371d5'),
],
'metadata': {
'original_artifact': [{
'archive_type': 'tar',
'name': 'webbase-5.7.0.tar.gz',
'sha1': '147f73f369733d088b7a6fa9c4e0273dcd3c7ccd',
'sha1_git': '6a15ea8b881069adedf11feceec35588f2cfe8f1',
'sha256': '401d0df797110bea805d358b85bcc1ced29549d3d73f'
'309d36484e7edf7bb912',
}]
},
}
expected_revision = {
'id': '18d8be353ed3480476f032475e7c233eff7371d5',
'directory': '7834ef7e7c357ce2af928115c6c6a42b7e2a44e6',
'author': {
'name': 'Software Heritage',
'fullname': 'robot robot@softwareheritage.org',
'email': 'robot@softwareheritage.org',
},
'committer': {
'name': 'Software Heritage',
'fullname': 'robot robot@softwareheritage.org',
'email': 'robot@softwareheritage.org',
},
'message': 'synthetic revision message',
'date': "2000-01-17T11:23:54+00:00",
'committer_date': "2000-01-17T11:23:54+00:00",
'children': [
'123546353ed3480476f032475e7c244eff7371d5'
],
'type': 'tar',
'synthetic': True,
'metadata': {
'original_artifact': [{
'archive_type': 'tar',
'name': 'webbase-5.7.0.tar.gz',
'sha1': '147f73f369733d088b7a6fa9c4e0273dcd3c7ccd',
'sha1_git': '6a15ea8b881069adedf11feceec35588f2cfe8f1',
'sha256': '401d0df797110bea805d358b85bcc1ced29549d3d73f'
'309d36484e7edf7bb912'
}]
}
}
# when
actual_revision = converters.from_revision(revision_input)
# then
self.assertEqual(actual_revision, expected_revision)
@istest
def from_revision_invalid(self):
revision_input = {
'id': hashutil.hex_to_hash(
'18d8be353ed3480476f032475e7c233eff7371d5'),
'directory': hashutil.hex_to_hash(
'7834ef7e7c357ce2af928115c6c6a42b7e2a44e6'),
'author': {
'name': b'Software Heritage',
'fullname': b'robot robot@softwareheritage.org',
'email': b'robot@softwareheritage.org',
},
'committer': {
'name': b'Software Heritage',
'fullname': b'robot robot@softwareheritage.org',
'email': b'robot@softwareheritage.org',
},
'message': b'invalid message \xff',
'date': {
'timestamp': datetime.datetime(
2000, 1, 17, 11, 23, 54,
tzinfo=datetime.timezone.utc).timestamp(),
'offset': 0,
'negative_utc': False,
},
'committer_date': {
'timestamp': datetime.datetime(
2000, 1, 17, 11, 23, 54,
tzinfo=datetime.timezone.utc).timestamp(),
'offset': 0,
'negative_utc': False,
},
'synthetic': True,
'type': 'tar',
'parents': [
hashutil.hex_to_hash(
'29d8be353ed3480476f032475e7c244eff7371d5'),
hashutil.hex_to_hash(
'30d8be353ed3480476f032475e7c244eff7371d5')
],
'children': [
hashutil.hex_to_hash(
'123546353ed3480476f032475e7c244eff7371d5'),
],
'metadata': {
'original_artifact': [{
'archive_type': 'tar',
'name': 'webbase-5.7.0.tar.gz',
'sha1': '147f73f369733d088b7a6fa9c4e0273dcd3c7ccd',
'sha1_git': '6a15ea8b881069adedf11feceec35588f2cfe8f1',
'sha256': '401d0df797110bea805d358b85bcc1ced29549d3d73f'
'309d36484e7edf7bb912',
}]
},
}
expected_revision = {
'id': '18d8be353ed3480476f032475e7c233eff7371d5',
'directory': '7834ef7e7c357ce2af928115c6c6a42b7e2a44e6',
'author': {
'name': 'Software Heritage',
'fullname': 'robot robot@softwareheritage.org',
'email': 'robot@softwareheritage.org',
},
'committer': {
'name': 'Software Heritage',
'fullname': 'robot robot@softwareheritage.org',
'email': 'robot@softwareheritage.org',
},
'message': None,
'message_decoding_failed': True,
'date': "2000-01-17T11:23:54+00:00",
'committer_date': "2000-01-17T11:23:54+00:00",
'children': [
'123546353ed3480476f032475e7c244eff7371d5'
],
'parents': [
'29d8be353ed3480476f032475e7c244eff7371d5',
'30d8be353ed3480476f032475e7c244eff7371d5'
],
'type': 'tar',
'synthetic': True,
'metadata': {
'original_artifact': [{
'archive_type': 'tar',
'name': 'webbase-5.7.0.tar.gz',
'sha1': '147f73f369733d088b7a6fa9c4e0273dcd3c7ccd',
'sha1_git': '6a15ea8b881069adedf11feceec35588f2cfe8f1',
'sha256': '401d0df797110bea805d358b85bcc1ced29549d3d73f'
'309d36484e7edf7bb912'
}]
},
'merge': True
}
# when
actual_revision = converters.from_revision(revision_input)
# then
self.assertEqual(actual_revision, expected_revision)
@istest
def from_content_none(self):
self.assertIsNone(converters.from_content(None))
@istest
def from_content(self):
content_input = {
'sha1': hashutil.hex_to_hash('5c6f0e2750f48fa0bd0c4cf5976ba0b9e0'
'2ebda5'),
'sha256': hashutil.hex_to_hash('39007420ca5de7cb3cfc15196335507e'
'e76c98930e7e0afa4d2747d3bf96c926'),
'sha1_git': hashutil.hex_to_hash('40e71b8614fcd89ccd17ca2b1d9e66'
'c5b00a6d03'),
'ctime': 'something-which-is-filtered-out',
'data': b'data in bytes',
'length': 10,
'status': 'hidden',
}
# 'status' is filtered
expected_content = {
'sha1': '5c6f0e2750f48fa0bd0c4cf5976ba0b9e02ebda5',
'sha256': '39007420ca5de7cb3cfc15196335507ee76c98930e7e0afa4d274'
'7d3bf96c926',
'sha1_git': '40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03',
'data': b'data in bytes',
'length': 10,
'status': 'absent',
}
# when
actual_content = converters.from_content(content_input)
# then
self.assertEqual(actual_content, expected_content)
@istest
def from_person(self):
person_input = {
'id': 10,
'anything': 'else',
'name': b'bob',
'fullname': b'bob bob@alice.net',
'email': b'bob@foo.alice',
}
expected_person = {
'id': 10,
'anything': 'else',
'name': 'bob',
'fullname': 'bob bob@alice.net',
'email': 'bob@foo.alice',
}
# when
actual_person = converters.from_person(person_input)
# then
self.assertEqual(actual_person, expected_person)
@istest
def from_directory_entries(self):
dir_entries_input = {
'sha1': hashutil.hex_to_hash('5c6f0e2750f48fa0bd0c4cf5976ba0b9e0'
'2ebda5'),
'sha256': hashutil.hex_to_hash('39007420ca5de7cb3cfc15196335507e'
'e76c98930e7e0afa4d2747d3bf96c926'),
'sha1_git': hashutil.hex_to_hash('40e71b8614fcd89ccd17ca2b1d9e66'
'c5b00a6d03'),
'target': hashutil.hex_to_hash('40e71b8614fcd89ccd17ca2b1d9e66'
'c5b00a6d03'),
'dir_id': hashutil.hex_to_hash('40e71b8614fcd89ccd17ca2b1d9e66'
'c5b00a6d03'),
'name': b'bob',
'type': 10,
'status': 'hidden',
}
expected_dir_entries = {
'sha1': '5c6f0e2750f48fa0bd0c4cf5976ba0b9e02ebda5',
'sha256': '39007420ca5de7cb3cfc15196335507ee76c98930e7e0afa4d2747'
'd3bf96c926',
'sha1_git': '40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03',
'target': '40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03',
'dir_id': '40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03',
'name': 'bob',
'type': 10,
'status': 'absent',
}
# when
actual_dir_entries = converters.from_directory_entry(dir_entries_input)
# then
self.assertEqual(actual_dir_entries, expected_dir_entries)
@istest
def from_filetype(self):
content_filetype = {
'id': hashutil.hex_to_hash('5c6f0e2750f48fa0bd0c4cf5976ba0b9e02ebd'
'a5'),
'encoding': b'utf-8',
'mimetype': b'text/plain',
}
expected_content_filetype = {
'id': '5c6f0e2750f48fa0bd0c4cf5976ba0b9e02ebda5',
'encoding': 'utf-8',
'mimetype': 'text/plain',
}
# when
actual_content_filetype = converters.from_filetype(content_filetype)
# then
self.assertEqual(actual_content_filetype, expected_content_filetype)
diff --git a/swh/web/ui/tests/test_service.py b/swh/web/ui/tests/test_service.py
index 88df5085..ed02be7f 100644
--- a/swh/web/ui/tests/test_service.py
+++ b/swh/web/ui/tests/test_service.py
@@ -1,2043 +1,2043 @@
# Copyright (C) 2015-2016 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU Affero General Public License version 3, or any later version
# See top-level LICENSE file for more information
import datetime
from nose.tools import istest
from unittest.mock import MagicMock, patch, call
from swh.core.hashutil import hex_to_hash, hash_to_hex
from swh.web.ui import service
from swh.web.ui.exc import BadInputExc, NotFoundExc
from swh.web.ui.tests import test_app
class ServiceTestCase(test_app.SWHApiTestCase):
def setUp(self):
self.SHA1_SAMPLE = '18d8be353ed3480476f032475e7c233eff7371d5'
self.SHA1_SAMPLE_BIN = hex_to_hash(self.SHA1_SAMPLE)
self.SHA256_SAMPLE = ('39007420ca5de7cb3cfc15196335507e'
'e76c98930e7e0afa4d2747d3bf96c926')
self.SHA256_SAMPLE_BIN = hex_to_hash(self.SHA256_SAMPLE)
self.SHA1GIT_SAMPLE = '40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03'
self.SHA1GIT_SAMPLE_BIN = hex_to_hash(self.SHA1GIT_SAMPLE)
self.DIRECTORY_ID = '7834ef7e7c357ce2af928115c6c6a42b7e2a44e6'
self.DIRECTORY_ID_BIN = hex_to_hash(self.DIRECTORY_ID)
self.AUTHOR_ID_BIN = {
'name': b'author',
'email': b'author@company.org',
}
self.AUTHOR_ID = {
'name': 'author',
'email': 'author@company.org',
}
self.COMMITTER_ID_BIN = {
'name': b'committer',
'email': b'committer@corp.org',
}
self.COMMITTER_ID = {
'name': 'committer',
'email': 'committer@corp.org',
}
self.SAMPLE_DATE_RAW = {
'timestamp': datetime.datetime(
2000, 1, 17, 11, 23, 54,
tzinfo=datetime.timezone.utc,
).timestamp(),
'offset': 0,
'negative_utc': False,
}
self.SAMPLE_DATE = '2000-01-17T11:23:54+00:00'
self.SAMPLE_MESSAGE_BIN = b'elegant fix for bug 31415957'
self.SAMPLE_MESSAGE = 'elegant fix for bug 31415957'
self.SAMPLE_REVISION = {
'id': self.SHA1_SAMPLE,
'directory': self.DIRECTORY_ID,
'author': self.AUTHOR_ID,
'committer': self.COMMITTER_ID,
'message': self.SAMPLE_MESSAGE,
'date': self.SAMPLE_DATE,
'committer_date': self.SAMPLE_DATE,
'synthetic': False,
'type': 'git',
'parents': [],
'metadata': {},
'merge': False
}
self.SAMPLE_REVISION_RAW = {
'id': self.SHA1_SAMPLE_BIN,
'directory': self.DIRECTORY_ID_BIN,
'author': self.AUTHOR_ID_BIN,
'committer': self.COMMITTER_ID_BIN,
'message': self.SAMPLE_MESSAGE_BIN,
'date': self.SAMPLE_DATE_RAW,
'committer_date': self.SAMPLE_DATE_RAW,
'synthetic': False,
'type': 'git',
'parents': [],
'metadata': [],
}
self.SAMPLE_CONTENT = {
'sha1': self.SHA1_SAMPLE,
'sha256': self.SHA256_SAMPLE,
'sha1_git': self.SHA1GIT_SAMPLE,
'length': 190,
'status': 'absent'
}
self.SAMPLE_CONTENT_RAW = {
'sha1': self.SHA1_SAMPLE_BIN,
'sha256': self.SHA256_SAMPLE_BIN,
'sha1_git': self.SHA1GIT_SAMPLE_BIN,
'length': 190,
'status': 'hidden'
}
self.date_origin_visit1 = datetime.datetime(
2015, 1, 1, 22, 0, 0,
tzinfo=datetime.timezone.utc)
self.origin_visit1 = {
'date': self.date_origin_visit1,
'origin': 1,
'visit': 1
}
@patch('swh.web.ui.service.backend')
@istest
def lookup_multiple_hashes_ball_missing(self, mock_backend):
# given
mock_backend.content_missing_per_sha1 = MagicMock(return_value=[])
# when
actual_lookup = service.lookup_multiple_hashes(
[{'filename': 'a',
'sha1': '456caf10e9535160d90e874b45aa426de762f19f'},
{'filename': 'b',
'sha1': '745bab676c8f3cec8016e0c39ea61cf57e518865'}])
# then
self.assertEquals(actual_lookup, [
{'filename': 'a',
'sha1': '456caf10e9535160d90e874b45aa426de762f19f',
'found': True},
{'filename': 'b',
'sha1': '745bab676c8f3cec8016e0c39ea61cf57e518865',
'found': True}
])
@patch('swh.web.ui.service.backend')
@istest
def lookup_multiple_hashes_some_missing(self, mock_backend):
# given
mock_backend.content_missing_per_sha1 = MagicMock(return_value=[
hex_to_hash('456caf10e9535160d90e874b45aa426de762f19f')
])
# when
actual_lookup = service.lookup_multiple_hashes(
[{'filename': 'a',
'sha1': '456caf10e9535160d90e874b45aa426de762f19f'},
{'filename': 'b',
'sha1': '745bab676c8f3cec8016e0c39ea61cf57e518865'}])
# then
self.assertEquals(actual_lookup, [
{'filename': 'a',
'sha1': '456caf10e9535160d90e874b45aa426de762f19f',
'found': False},
{'filename': 'b',
'sha1': '745bab676c8f3cec8016e0c39ea61cf57e518865',
'found': True}
])
@patch('swh.web.ui.service.backend')
@istest
def lookup_hash_does_not_exist(self, mock_backend):
# given
mock_backend.content_find = MagicMock(return_value=None)
# when
actual_lookup = service.lookup_hash(
'sha1_git:123caf10e9535160d90e874b45aa426de762f19f')
# then
self.assertEquals({'found': None,
'algo': 'sha1_git'}, actual_lookup)
# check the function has been called with parameters
mock_backend.content_find.assert_called_with(
'sha1_git',
hex_to_hash('123caf10e9535160d90e874b45aa426de762f19f'))
@patch('swh.web.ui.service.backend')
@istest
def lookup_hash_exist(self, mock_backend):
# given
stub_content = {
'sha1': hex_to_hash('456caf10e9535160d90e874b45aa426de762f19f')
}
mock_backend.content_find = MagicMock(return_value=stub_content)
# when
actual_lookup = service.lookup_hash(
'sha1:456caf10e9535160d90e874b45aa426de762f19f')
# then
self.assertEquals({'found': stub_content,
'algo': 'sha1'}, actual_lookup)
mock_backend.content_find.assert_called_with(
'sha1',
hex_to_hash('456caf10e9535160d90e874b45aa426de762f19f'),
)
@patch('swh.web.ui.service.backend')
@istest
def search_hash_does_not_exist(self, mock_backend):
# given
mock_backend.content_find = MagicMock(return_value=None)
# when
actual_lookup = service.search_hash(
'sha1_git:123caf10e9535160d90e874b45aa426de762f19f')
# then
self.assertEquals({'found': False}, actual_lookup)
# check the function has been called with parameters
mock_backend.content_find.assert_called_with(
'sha1_git',
hex_to_hash('123caf10e9535160d90e874b45aa426de762f19f'))
@patch('swh.web.ui.service.backend')
@istest
def search_hash_exist(self, mock_backend):
# given
stub_content = {
'sha1': hex_to_hash('456caf10e9535160d90e874b45aa426de762f19f')
}
mock_backend.content_find = MagicMock(return_value=stub_content)
# when
actual_lookup = service.search_hash(
'sha1:456caf10e9535160d90e874b45aa426de762f19f')
# then
self.assertEquals({'found': True}, actual_lookup)
mock_backend.content_find.assert_called_with(
'sha1',
hex_to_hash('456caf10e9535160d90e874b45aa426de762f19f'),
)
@patch('swh.web.ui.service.backend')
@istest
def lookup_content_ctags(self, mock_backend):
# given
mock_backend.content_ctags_get = MagicMock(
return_value=[{
'id': hex_to_hash(
'123caf10e9535160d90e874b45aa426de762f19f'),
'line': 100,
'name': 'hello',
'kind': 'function',
'tool_name': 'ctags',
'tool_version': 'some-version',
}])
expected_ctags = [{
'id': '123caf10e9535160d90e874b45aa426de762f19f',
'line': 100,
'name': 'hello',
'kind': 'function',
'tool_name': 'ctags',
'tool_version': 'some-version',
}]
# when
actual_ctags = list(service.lookup_content_ctags(
'sha1:123caf10e9535160d90e874b45aa426de762f19f'))
# then
self.assertEqual(actual_ctags, expected_ctags)
mock_backend.content_ctags_get.assert_called_with(
hex_to_hash('123caf10e9535160d90e874b45aa426de762f19f'))
@patch('swh.web.ui.service.backend')
@istest
def lookup_content_ctags_no_hash(self, mock_backend):
# given
mock_backend.content_find.return_value = None
mock_backend.content_ctags_get = MagicMock(
return_value=None)
# when
actual_ctags = list(service.lookup_content_ctags(
'sha1_git:123caf10e9535160d90e874b45aa426de762f19f'))
# then
self.assertEqual(actual_ctags, [])
mock_backend.content_find.assert_called_once_with(
'sha1_git', hex_to_hash(
'123caf10e9535160d90e874b45aa426de762f19f'))
@patch('swh.web.ui.service.backend')
@istest
def lookup_content_filetype(self, mock_backend):
# given
mock_backend.content_filetype_get = MagicMock(
return_value={
'id': hex_to_hash(
'123caf10e9535160d90e874b45aa426de762f19f'),
'mimetype': b'text/x-c++',
'encoding': b'us-ascii',
})
expected_filetype = {
'id': '123caf10e9535160d90e874b45aa426de762f19f',
'mimetype': 'text/x-c++',
'encoding': 'us-ascii',
}
# when
actual_filetype = service.lookup_content_filetype(
'sha1:123caf10e9535160d90e874b45aa426de762f19f')
# then
self.assertEqual(actual_filetype, expected_filetype)
mock_backend.content_filetype_get.assert_called_with(
hex_to_hash('123caf10e9535160d90e874b45aa426de762f19f'))
@patch('swh.web.ui.service.backend')
@istest
def lookup_content_filetype_2(self, mock_backend):
# given
mock_backend.content_find = MagicMock(
return_value={
'sha1': hex_to_hash(
'123caf10e9535160d90e874b45aa426de762f19f')
}
)
mock_backend.content_filetype_get = MagicMock(
return_value={
'id': hex_to_hash(
'123caf10e9535160d90e874b45aa426de762f19f'),
'mimetype': b'text/x-python',
'encoding': b'us-ascii',
}
)
expected_filetype = {
'id': '123caf10e9535160d90e874b45aa426de762f19f',
'mimetype': 'text/x-python',
'encoding': 'us-ascii',
}
# when
actual_filetype = service.lookup_content_filetype(
'sha1_git:456caf10e9535160d90e874b45aa426de762f19f')
# then
self.assertEqual(actual_filetype, expected_filetype)
mock_backend.content_find(
'sha1_git', hex_to_hash('456caf10e9535160d90e874b45aa426de762f19f')
)
mock_backend.content_filetype_get.assert_called_with(
hex_to_hash('123caf10e9535160d90e874b45aa426de762f19f'))
@patch('swh.web.ui.service.backend')
@istest
def lookup_content_language(self, mock_backend):
# given
mock_backend.content_language_get = MagicMock(
return_value={
'id': hex_to_hash(
'123caf10e9535160d90e874b45aa426de762f19f'),
'lang': 'python',
})
expected_language = {
'id': '123caf10e9535160d90e874b45aa426de762f19f',
'lang': 'python',
}
# when
actual_language = service.lookup_content_language(
'sha1:123caf10e9535160d90e874b45aa426de762f19f')
# then
self.assertEqual(actual_language, expected_language)
mock_backend.content_language_get.assert_called_with(
hex_to_hash('123caf10e9535160d90e874b45aa426de762f19f'))
@patch('swh.web.ui.service.backend')
@istest
def lookup_content_language_2(self, mock_backend):
# given
mock_backend.content_find = MagicMock(
return_value={
'sha1': hex_to_hash(
'123caf10e9535160d90e874b45aa426de762f19f')
}
)
mock_backend.content_language_get = MagicMock(
return_value={
'id': hex_to_hash(
'123caf10e9535160d90e874b45aa426de762f19f'),
'lang': 'haskell',
}
)
expected_language = {
'id': '123caf10e9535160d90e874b45aa426de762f19f',
'lang': 'haskell',
}
# when
actual_language = service.lookup_content_language(
'sha1_git:456caf10e9535160d90e874b45aa426de762f19f')
# then
self.assertEqual(actual_language, expected_language)
mock_backend.content_find(
'sha1_git', hex_to_hash('456caf10e9535160d90e874b45aa426de762f19f')
)
mock_backend.content_language_get.assert_called_with(
hex_to_hash('123caf10e9535160d90e874b45aa426de762f19f'))
@patch('swh.web.ui.service.backend')
@istest
def lookup_expression(self, mock_backend):
# given
mock_backend.content_ctags_search = MagicMock(
return_value=[{
'id': hex_to_hash(
'123caf10e9535160d90e874b45aa426de762f19f'),
'name': 'foobar',
'kind': 'variable',
'lang': 'C',
'line': 10
}])
expected_ctags = [{
'sha1': '123caf10e9535160d90e874b45aa426de762f19f',
'name': 'foobar',
'kind': 'variable',
'lang': 'C',
'line': 10
}]
# when
actual_ctags = list(service.lookup_expression(
'foobar', last_sha1='hash', per_page=10))
# then
self.assertEqual(actual_ctags, expected_ctags)
mock_backend.content_ctags_search.assert_called_with(
'foobar', 'hash', 10)
@patch('swh.web.ui.service.backend')
@istest
def lookup_expression_no_result(self, mock_backend):
# given
mock_backend.content_ctags_search = MagicMock(
return_value=[])
expected_ctags = []
# when
actual_ctags = list(service.lookup_expression(
'barfoo', last_sha1='hash', per_page=10))
# then
self.assertEqual(actual_ctags, expected_ctags)
mock_backend.content_ctags_search.assert_called_with(
'barfoo', 'hash', 10)
@patch('swh.web.ui.service.backend')
@istest
def lookup_content_license(self, mock_backend):
# given
mock_backend.content_license_get = MagicMock(
return_value={
'id': hex_to_hash(
'123caf10e9535160d90e874b45aa426de762f19f'),
'lang': 'python',
})
expected_license = {
'id': '123caf10e9535160d90e874b45aa426de762f19f',
'lang': 'python',
}
# when
actual_license = service.lookup_content_license(
'sha1:123caf10e9535160d90e874b45aa426de762f19f')
# then
self.assertEqual(actual_license, expected_license)
mock_backend.content_license_get.assert_called_with(
hex_to_hash('123caf10e9535160d90e874b45aa426de762f19f'))
@patch('swh.web.ui.service.backend')
@istest
def lookup_content_license_2(self, mock_backend):
# given
mock_backend.content_find = MagicMock(
return_value={
'sha1': hex_to_hash(
'123caf10e9535160d90e874b45aa426de762f19f')
}
)
mock_backend.content_license_get = MagicMock(
return_value={
'id': hex_to_hash(
'123caf10e9535160d90e874b45aa426de762f19f'),
'lang': 'haskell',
}
)
expected_license = {
'id': '123caf10e9535160d90e874b45aa426de762f19f',
'lang': 'haskell',
}
# when
actual_license = service.lookup_content_license(
'sha1_git:456caf10e9535160d90e874b45aa426de762f19f')
# then
self.assertEqual(actual_license, expected_license)
mock_backend.content_find(
'sha1_git', hex_to_hash('456caf10e9535160d90e874b45aa426de762f19f')
)
mock_backend.content_license_get.assert_called_with(
hex_to_hash('123caf10e9535160d90e874b45aa426de762f19f'))
@patch('swh.web.ui.service.backend')
@istest
def lookup_content_provenance(self, mock_backend):
# given
mock_backend.content_find_provenance = MagicMock(
return_value=(p for p in [{
'content': hex_to_hash(
'123caf10e9535160d90e874b45aa426de762f19f'),
'revision': hex_to_hash(
'456caf10e9535160d90e874b45aa426de762f19f'),
'origin': 100,
'visit': 1,
'path': b'octavio-3.4.0/octave.html/doc_002dS_005fISREG.html'
}]))
expected_provenances = [{
'content': '123caf10e9535160d90e874b45aa426de762f19f',
'revision': '456caf10e9535160d90e874b45aa426de762f19f',
'origin': 100,
'visit': 1,
'path': 'octavio-3.4.0/octave.html/doc_002dS_005fISREG.html'
}]
# when
actual_provenances = service.lookup_content_provenance(
'sha1_git:123caf10e9535160d90e874b45aa426de762f19f')
# then
self.assertEqual(list(actual_provenances), expected_provenances)
mock_backend.content_find_provenance.assert_called_with(
'sha1_git',
hex_to_hash('123caf10e9535160d90e874b45aa426de762f19f'))
@patch('swh.web.ui.service.backend')
@istest
def lookup_content_provenance_not_found(self, mock_backend):
# given
mock_backend.content_find_provenance = MagicMock(return_value=None)
# when
actual_provenances = service.lookup_content_provenance(
'sha1_git:456caf10e9535160d90e874b45aa426de762f19f')
# then
self.assertIsNone(actual_provenances)
mock_backend.content_find_provenance.assert_called_with(
'sha1_git',
hex_to_hash('456caf10e9535160d90e874b45aa426de762f19f'))
@patch('swh.web.ui.service.backend')
@istest
def stat_counters(self, mock_backend):
# given
input_stats = {
"content": 1770830,
"directory": 211683,
"directory_entry_dir": 209167,
"directory_entry_file": 1807094,
"directory_entry_rev": 0,
"entity": 0,
"entity_history": 0,
"occurrence": 0,
"occurrence_history": 19600,
"origin": 1096,
"person": 0,
"release": 8584,
"revision": 7792,
"revision_history": 0,
"skipped_content": 0
}
mock_backend.stat_counters = MagicMock(return_value=input_stats)
# when
actual_stats = service.stat_counters()
# then
expected_stats = input_stats
self.assertEqual(actual_stats, expected_stats)
mock_backend.stat_counters.assert_called_with()
@patch('swh.web.ui.service.backend')
@istest
def lookup_origin_visits(self, mock_backend):
# given
date_origin_visit2 = datetime.datetime(
2013, 7, 1, 20, 0, 0,
tzinfo=datetime.timezone.utc)
date_origin_visit3 = datetime.datetime(
2015, 1, 1, 21, 0, 0,
tzinfo=datetime.timezone.utc)
stub_result = [self.origin_visit1, {
'date': date_origin_visit2,
'origin': 1,
'visit': 2,
'target': hex_to_hash('65a55bbdf3629f916219feb3dcc7393ded1bc8db'),
'branch': b'master',
'target_type': 'release',
'metadata': None,
}, {
'date': date_origin_visit3,
'origin': 1,
'visit': 3
}]
mock_backend.lookup_origin_visits.return_value = stub_result
# when
expected_origin_visits = [{
- 'date': self.origin_visit1['date'].timestamp(),
+ 'date': self.origin_visit1['date'].isoformat(),
'origin': self.origin_visit1['origin'],
'visit': self.origin_visit1['visit']
}, {
- 'date': date_origin_visit2.timestamp(),
+ 'date': date_origin_visit2.isoformat(),
'origin': 1,
'visit': 2,
'target': '65a55bbdf3629f916219feb3dcc7393ded1bc8db',
'branch': 'master',
'target_type': 'release',
'metadata': {},
}, {
- 'date': date_origin_visit3.timestamp(),
+ 'date': date_origin_visit3.isoformat(),
'origin': 1,
'visit': 3
}]
actual_origin_visits = service.lookup_origin_visits(6)
# then
self.assertEqual(list(actual_origin_visits), expected_origin_visits)
mock_backend.lookup_origin_visits.assert_called_once_with(
6, last_visit=None, limit=10)
@patch('swh.web.ui.service.backend')
@istest
def lookup_origin_visit(self, mock_backend):
# given
stub_result = self.origin_visit1
mock_backend.lookup_origin_visit.return_value = stub_result
expected_origin_visit = {
- 'date': self.origin_visit1['date'].timestamp(),
+ 'date': self.origin_visit1['date'].isoformat(),
'origin': self.origin_visit1['origin'],
'visit': self.origin_visit1['visit']
}
# when
actual_origin_visit = service.lookup_origin_visit(1, 1)
# then
self.assertEqual(actual_origin_visit, expected_origin_visit)
mock_backend.lookup_origin_visit.assert_called_once_with(1, 1)
@patch('swh.web.ui.service.backend')
@istest
def lookup_origin(self, mock_backend):
# given
mock_backend.origin_get = MagicMock(return_value={
'id': 'origin-id',
'lister': 'uuid-lister',
'project': 'uuid-project',
'url': 'ftp://some/url/to/origin',
'type': 'ftp'})
# when
actual_origin = service.lookup_origin({'id': 'origin-id'})
# then
self.assertEqual(actual_origin, {'id': 'origin-id',
'lister': 'uuid-lister',
'project': 'uuid-project',
'url': 'ftp://some/url/to/origin',
'type': 'ftp'})
mock_backend.origin_get.assert_called_with({'id': 'origin-id'})
@patch('swh.web.ui.service.backend')
@istest
def lookup_release_ko_id_checksum_not_ok_because_not_a_sha1(self,
mock_backend):
# given
mock_backend.release_get = MagicMock()
with self.assertRaises(BadInputExc) as cm:
# when
service.lookup_release('not-a-sha1')
self.assertIn('invalid checksum', cm.exception.args[0])
mock_backend.release_get.called = False
@patch('swh.web.ui.service.backend')
@istest
def lookup_release_ko_id_checksum_ok_but_not_a_sha1(self, mock_backend):
# given
mock_backend.release_get = MagicMock()
# when
with self.assertRaises(BadInputExc) as cm:
service.lookup_release(
'13c1d34d138ec13b5ebad226dc2528dc7506c956e4646f62d4daf5'
'1aea892abe')
self.assertIn('sha1_git supported', cm.exception.args[0])
mock_backend.release_get.called = False
@patch('swh.web.ui.service.backend')
@istest
def lookup_directory_with_path_not_found(self, mock_backend):
# given
mock_backend.lookup_directory_with_path = MagicMock(return_value=None)
sha1_git = '65a55bbdf3629f916219feb3dcc7393ded1bc8db'
# when
actual_directory = mock_backend.lookup_directory_with_path(
sha1_git, 'some/path/here')
self.assertIsNone(actual_directory)
@patch('swh.web.ui.service.backend')
@istest
def lookup_directory_with_path_found(self, mock_backend):
# given
sha1_git = '65a55bbdf3629f916219feb3dcc7393ded1bc8db'
entry = {'id': 'dir-id',
'type': 'dir',
'name': 'some/path/foo'}
mock_backend.lookup_directory_with_path = MagicMock(return_value=entry)
# when
actual_directory = mock_backend.lookup_directory_with_path(
sha1_git, 'some/path/here')
self.assertEqual(entry, actual_directory)
@patch('swh.web.ui.service.backend')
@istest
def lookup_release(self, mock_backend):
# given
mock_backend.release_get = MagicMock(return_value={
'id': hex_to_hash('65a55bbdf3629f916219feb3dcc7393ded1bc8db'),
'target': None,
'date': {
'timestamp': datetime.datetime(
2015, 1, 1, 22, 0, 0,
tzinfo=datetime.timezone.utc).timestamp(),
'offset': 0,
'negative_utc': True,
},
'name': b'v0.0.1',
'message': b'synthetic release',
'synthetic': True,
})
# when
actual_release = service.lookup_release(
'65a55bbdf3629f916219feb3dcc7393ded1bc8db')
# then
self.assertEqual(actual_release, {
'id': '65a55bbdf3629f916219feb3dcc7393ded1bc8db',
'target': None,
'date': '2015-01-01T22:00:00-00:00',
'name': 'v0.0.1',
'message': 'synthetic release',
'synthetic': True,
})
mock_backend.release_get.assert_called_with(
hex_to_hash('65a55bbdf3629f916219feb3dcc7393ded1bc8db'))
@istest
def lookup_revision_with_context_ko_not_a_sha1_1(self):
# given
sha1_git = '13c1d34d138ec13b5ebad226dc2528dc7506c956e4646f62d4' \
'daf51aea892abe'
sha1_git_root = '65a55bbdf3629f916219feb3dcc7393ded1bc8db'
# when
with self.assertRaises(BadInputExc) as cm:
service.lookup_revision_with_context(sha1_git_root, sha1_git)
self.assertIn('Only sha1_git is supported', cm.exception.args[0])
@istest
def lookup_revision_with_context_ko_not_a_sha1_2(self):
# given
sha1_git_root = '65a55bbdf3629f916219feb3dcc7393ded1bc8db'
sha1_git = '13c1d34d138ec13b5ebad226dc2528dc7506c956e4646f6' \
'2d4daf51aea892abe'
# when
with self.assertRaises(BadInputExc) as cm:
service.lookup_revision_with_context(sha1_git_root, sha1_git)
self.assertIn('Only sha1_git is supported', cm.exception.args[0])
@patch('swh.web.ui.service.backend')
@istest
def lookup_revision_with_context_ko_sha1_git_does_not_exist(
self,
mock_backend):
# given
sha1_git_root = '65a55bbdf3629f916219feb3dcc7393ded1bc8db'
sha1_git = '777777bdf3629f916219feb3dcc7393ded1bc8db'
sha1_git_bin = hex_to_hash(sha1_git)
mock_backend.revision_get.return_value = None
# when
with self.assertRaises(NotFoundExc) as cm:
service.lookup_revision_with_context(sha1_git_root, sha1_git)
self.assertIn('Revision 777777bdf3629f916219feb3dcc7393ded1bc8db'
' not found', cm.exception.args[0])
mock_backend.revision_get.assert_called_once_with(
sha1_git_bin)
@patch('swh.web.ui.service.backend')
@istest
def lookup_revision_with_context_ko_root_sha1_git_does_not_exist(
self,
mock_backend):
# given
sha1_git_root = '65a55bbdf3629f916219feb3dcc7393ded1bc8db'
sha1_git = '777777bdf3629f916219feb3dcc7393ded1bc8db'
sha1_git_root_bin = hex_to_hash(sha1_git_root)
sha1_git_bin = hex_to_hash(sha1_git)
mock_backend.revision_get.side_effect = ['foo', None]
# when
with self.assertRaises(NotFoundExc) as cm:
service.lookup_revision_with_context(sha1_git_root, sha1_git)
self.assertIn('Revision 65a55bbdf3629f916219feb3dcc7393ded1bc8db'
' not found', cm.exception.args[0])
mock_backend.revision_get.assert_has_calls([call(sha1_git_bin),
call(sha1_git_root_bin)])
@patch('swh.web.ui.service.backend')
@patch('swh.web.ui.service.query')
@istest
def lookup_revision_with_context(self, mock_query, mock_backend):
# given
sha1_git_root = '666'
sha1_git = '883'
sha1_git_root_bin = b'666'
sha1_git_bin = b'883'
sha1_git_root_dict = {
'id': sha1_git_root_bin,
'parents': [b'999'],
}
sha1_git_dict = {
'id': sha1_git_bin,
'parents': [],
'directory': b'278',
}
stub_revisions = [
sha1_git_root_dict,
{
'id': b'999',
'parents': [b'777', b'883', b'888'],
},
{
'id': b'777',
'parents': [b'883'],
},
sha1_git_dict,
{
'id': b'888',
'parents': [b'889'],
},
{
'id': b'889',
'parents': [],
},
]
# inputs ok
mock_query.parse_hash_with_algorithms_or_throws.side_effect = [
('sha1', sha1_git_bin),
('sha1', sha1_git_root_bin)
]
# lookup revision first 883, then 666 (both exists)
mock_backend.revision_get.side_effect = [
sha1_git_dict,
sha1_git_root_dict
]
mock_backend.revision_log = MagicMock(
return_value=stub_revisions)
# when
actual_revision = service.lookup_revision_with_context(
sha1_git_root,
sha1_git)
# then
self.assertEquals(actual_revision, {
'id': hash_to_hex(sha1_git_bin),
'parents': [],
'children': [hash_to_hex(b'999'), hash_to_hex(b'777')],
'directory': hash_to_hex(b'278'),
'merge': False
})
mock_query.parse_hash_with_algorithms_or_throws.assert_has_calls(
[call(sha1_git, ['sha1'], 'Only sha1_git is supported.'),
call(sha1_git_root, ['sha1'], 'Only sha1_git is supported.')])
mock_backend.revision_log.assert_called_with(
sha1_git_root_bin, 100)
@patch('swh.web.ui.service.backend')
@patch('swh.web.ui.service.query')
@istest
def lookup_revision_with_context_sha1_git_root_already_retrieved_as_dict(
self, mock_query, mock_backend):
# given
sha1_git = '883'
sha1_git_root_bin = b'666'
sha1_git_bin = b'883'
sha1_git_root_dict = {
'id': sha1_git_root_bin,
'parents': [b'999'],
}
sha1_git_dict = {
'id': sha1_git_bin,
'parents': [],
'directory': b'278',
}
stub_revisions = [
sha1_git_root_dict,
{
'id': b'999',
'parents': [b'777', b'883', b'888'],
},
{
'id': b'777',
'parents': [b'883'],
},
sha1_git_dict,
{
'id': b'888',
'parents': [b'889'],
},
{
'id': b'889',
'parents': [],
},
]
# inputs ok
mock_query.parse_hash_with_algorithms_or_throws.return_value = (
'sha1', sha1_git_bin)
# lookup only on sha1
mock_backend.revision_get.return_value = sha1_git_dict
mock_backend.revision_log.return_value = stub_revisions
# when
actual_revision = service.lookup_revision_with_context(
{'id': sha1_git_root_bin},
sha1_git)
# then
self.assertEquals(actual_revision, {
'id': hash_to_hex(sha1_git_bin),
'parents': [],
'children': [hash_to_hex(b'999'), hash_to_hex(b'777')],
'directory': hash_to_hex(b'278'),
'merge': False
})
mock_query.parse_hash_with_algorithms_or_throws.assert_called_once_with( # noqa
sha1_git, ['sha1'], 'Only sha1_git is supported.')
mock_backend.revision_get.assert_called_once_with(sha1_git_bin)
mock_backend.revision_log.assert_called_with(
sha1_git_root_bin, 100)
@patch('swh.web.ui.service.backend')
@patch('swh.web.ui.service.query')
@istest
def lookup_directory_with_revision_ko_revision_not_found(self,
mock_query,
mock_backend):
# given
mock_query.parse_hash_with_algorithms_or_throws.return_value = ('sha1',
b'123')
mock_backend.revision_get.return_value = None
# when
with self.assertRaises(NotFoundExc) as cm:
service.lookup_directory_with_revision('123')
self.assertIn('Revision 123 not found', cm.exception.args[0])
mock_query.parse_hash_with_algorithms_or_throws.assert_called_once_with
('123', ['sha1'], 'Only sha1_git is supported.')
mock_backend.revision_get.assert_called_once_with(b'123')
@patch('swh.web.ui.service.backend')
@patch('swh.web.ui.service.query')
@istest
def lookup_directory_with_revision_ko_revision_with_path_to_nowhere(
self,
mock_query,
mock_backend):
# given
mock_query.parse_hash_with_algorithms_or_throws.return_value = ('sha1',
b'123')
dir_id = b'dir-id-as-sha1'
mock_backend.revision_get.return_value = {
'directory': dir_id,
}
mock_backend.directory_entry_get_by_path.return_value = None
# when
with self.assertRaises(NotFoundExc) as cm:
service.lookup_directory_with_revision(
'123',
'path/to/something/unknown')
self.assertIn("Directory/File 'path/to/something/unknown' " +
"pointed to by revision 123 not found",
cm.exception.args[0])
mock_query.parse_hash_with_algorithms_or_throws.assert_called_once_with
('123', ['sha1'], 'Only sha1_git is supported.')
mock_backend.revision_get.assert_called_once_with(b'123')
mock_backend.directory_entry_get_by_path.assert_called_once_with(
b'dir-id-as-sha1', 'path/to/something/unknown')
@patch('swh.web.ui.service.backend')
@patch('swh.web.ui.service.query')
@istest
def lookup_directory_with_revision_ko_type_not_implemented(
self,
mock_query,
mock_backend):
# given
mock_query.parse_hash_with_algorithms_or_throws.return_value = ('sha1',
b'123')
dir_id = b'dir-id-as-sha1'
mock_backend.revision_get.return_value = {
'directory': dir_id,
}
mock_backend.directory_entry_get_by_path.return_value = {
'type': 'rev',
'name': b'some/path/to/rev',
'target': b'456'
}
stub_content = {
'id': b'12',
'type': 'file'
}
mock_backend.content_get.return_value = stub_content
# when
with self.assertRaises(NotImplementedError) as cm:
service.lookup_directory_with_revision(
'123',
'some/path/to/rev')
self.assertIn("Entity of type 'rev' not implemented.",
cm.exception.args[0])
# then
mock_query.parse_hash_with_algorithms_or_throws.assert_called_once_with
('123', ['sha1'], 'Only sha1_git is supported.')
mock_backend.revision_get.assert_called_once_with(b'123')
mock_backend.directory_entry_get_by_path.assert_called_once_with(
b'dir-id-as-sha1', 'some/path/to/rev')
@patch('swh.web.ui.service.backend')
@patch('swh.web.ui.service.query')
@istest
def lookup_directory_with_revision_revision_without_path(self,
mock_query,
mock_backend):
# given
mock_query.parse_hash_with_algorithms_or_throws.return_value = ('sha1',
b'123')
dir_id = b'dir-id-as-sha1'
mock_backend.revision_get.return_value = {
'directory': dir_id,
}
stub_dir_entries = [{
'id': b'123',
'type': 'dir'
}, {
'id': b'456',
'type': 'file'
}]
mock_backend.directory_ls.return_value = stub_dir_entries
# when
actual_directory_entries = service.lookup_directory_with_revision(
'123')
self.assertEqual(actual_directory_entries['type'], 'dir')
self.assertEqual(list(actual_directory_entries['content']),
stub_dir_entries)
mock_query.parse_hash_with_algorithms_or_throws.assert_called_once_with
('123', ['sha1'], 'Only sha1_git is supported.')
mock_backend.revision_get.assert_called_once_with(b'123')
mock_backend.directory_ls.assert_called_once_with(dir_id)
@patch('swh.web.ui.service.backend')
@patch('swh.web.ui.service.query')
@istest
def lookup_directory_with_revision_revision_with_path_to_dir(self,
mock_query,
mock_backend):
# given
mock_query.parse_hash_with_algorithms_or_throws.return_value = ('sha1',
b'123')
dir_id = b'dir-id-as-sha1'
mock_backend.revision_get.return_value = {
'directory': dir_id,
}
stub_dir_entries = [{
'id': b'12',
'type': 'dir'
}, {
'id': b'34',
'type': 'file'
}]
mock_backend.directory_entry_get_by_path.return_value = {
'type': 'dir',
'name': b'some/path',
'target': b'456'
}
mock_backend.directory_ls.return_value = stub_dir_entries
# when
actual_directory_entries = service.lookup_directory_with_revision(
'123',
'some/path')
self.assertEqual(actual_directory_entries['type'], 'dir')
self.assertEqual(actual_directory_entries['revision'], '123')
self.assertEqual(actual_directory_entries['path'], 'some/path')
self.assertEqual(list(actual_directory_entries['content']),
stub_dir_entries)
mock_query.parse_hash_with_algorithms_or_throws.assert_called_once_with
('123', ['sha1'], 'Only sha1_git is supported.')
mock_backend.revision_get.assert_called_once_with(b'123')
mock_backend.directory_entry_get_by_path.assert_called_once_with(
dir_id,
'some/path')
mock_backend.directory_ls.assert_called_once_with(b'456')
@patch('swh.web.ui.service.backend')
@patch('swh.web.ui.service.query')
@istest
def lookup_directory_with_revision_revision_with_path_to_file_without_data(
self,
mock_query,
mock_backend):
# given
mock_query.parse_hash_with_algorithms_or_throws.return_value = ('sha1',
b'123')
dir_id = b'dir-id-as-sha1'
mock_backend.revision_get.return_value = {
'directory': dir_id,
}
mock_backend.directory_entry_get_by_path.return_value = {
'type': 'file',
'name': b'some/path/to/file',
'target': b'789'
}
stub_content = {
'status': 'visible',
}
mock_backend.content_find.return_value = stub_content
# when
actual_content = service.lookup_directory_with_revision(
'123',
'some/path/to/file')
# then
self.assertEqual(actual_content, {'type': 'file',
'revision': '123',
'path': 'some/path/to/file',
'content': stub_content})
mock_query.parse_hash_with_algorithms_or_throws.assert_called_once_with
('123', ['sha1'], 'Only sha1_git is supported.')
mock_backend.revision_get.assert_called_once_with(b'123')
mock_backend.directory_entry_get_by_path.assert_called_once_with(
b'dir-id-as-sha1', 'some/path/to/file')
mock_backend.content_find.assert_called_once_with('sha1_git', b'789')
@patch('swh.web.ui.service.backend')
@patch('swh.web.ui.service.query')
@istest
def lookup_directory_with_revision_revision_with_path_to_file_with_data(
self,
mock_query,
mock_backend):
# given
mock_query.parse_hash_with_algorithms_or_throws.return_value = ('sha1',
b'123')
dir_id = b'dir-id-as-sha1'
mock_backend.revision_get.return_value = {
'directory': dir_id,
}
mock_backend.directory_entry_get_by_path.return_value = {
'type': 'file',
'name': b'some/path/to/file',
'target': b'789'
}
stub_content = {
'status': 'visible',
'sha1': b'content-sha1'
}
mock_backend.content_find.return_value = stub_content
mock_backend.content_get.return_value = {
'sha1': b'content-sha1',
'data': b'some raw data'
}
expected_content = {
'status': 'visible',
'sha1': hash_to_hex(b'content-sha1'),
'data': b'some raw data'
}
# when
actual_content = service.lookup_directory_with_revision(
'123',
'some/path/to/file',
with_data=True)
# then
self.assertEqual(actual_content, {'type': 'file',
'revision': '123',
'path': 'some/path/to/file',
'content': expected_content})
mock_query.parse_hash_with_algorithms_or_throws.assert_called_once_with
('123', ['sha1'], 'Only sha1_git is supported.')
mock_backend.revision_get.assert_called_once_with(b'123')
mock_backend.directory_entry_get_by_path.assert_called_once_with(
b'dir-id-as-sha1', 'some/path/to/file')
mock_backend.content_find.assert_called_once_with('sha1_git', b'789')
mock_backend.content_get.assert_called_once_with(b'content-sha1')
@patch('swh.web.ui.service.backend')
@istest
def lookup_revision(self, mock_backend):
# given
mock_backend.revision_get = MagicMock(
return_value=self.SAMPLE_REVISION_RAW)
# when
actual_revision = service.lookup_revision(
self.SHA1_SAMPLE)
# then
self.assertEqual(actual_revision, self.SAMPLE_REVISION)
mock_backend.revision_get.assert_called_with(
self.SHA1_SAMPLE_BIN)
@patch('swh.web.ui.service.backend')
@istest
def lookup_revision_invalid_msg(self, mock_backend):
# given
stub_rev = self.SAMPLE_REVISION_RAW
stub_rev['message'] = b'elegant fix for bug \xff'
expected_revision = self.SAMPLE_REVISION
expected_revision['message'] = None
expected_revision['message_decoding_failed'] = True
mock_backend.revision_get = MagicMock(return_value=stub_rev)
# when
actual_revision = service.lookup_revision(
self.SHA1_SAMPLE)
# then
self.assertEqual(actual_revision, expected_revision)
mock_backend.revision_get.assert_called_with(
self.SHA1_SAMPLE_BIN)
@patch('swh.web.ui.service.backend')
@istest
def lookup_revision_msg_ok(self, mock_backend):
# given
mock_backend.revision_get.return_value = self.SAMPLE_REVISION_RAW
# when
rv = service.lookup_revision_message(
self.SHA1_SAMPLE)
# then
self.assertEquals(rv, {'message': self.SAMPLE_MESSAGE_BIN})
mock_backend.revision_get.assert_called_with(
self.SHA1_SAMPLE_BIN)
@patch('swh.web.ui.service.backend')
@istest
def lookup_revision_msg_absent(self, mock_backend):
# given
stub_revision = self.SAMPLE_REVISION_RAW
del stub_revision['message']
mock_backend.revision_get.return_value = stub_revision
# when
with self.assertRaises(NotFoundExc) as cm:
service.lookup_revision_message(
self.SHA1_SAMPLE)
# then
mock_backend.revision_get.assert_called_with(
self.SHA1_SAMPLE_BIN)
self.assertEqual(cm.exception.args[0], 'No message for revision '
'with sha1_git '
'18d8be353ed3480476f032475e7c233eff7371d5.')
@patch('swh.web.ui.service.backend')
@istest
def lookup_revision_msg_norev(self, mock_backend):
# given
mock_backend.revision_get.return_value = None
# when
with self.assertRaises(NotFoundExc) as cm:
service.lookup_revision_message(
self.SHA1_SAMPLE)
# then
mock_backend.revision_get.assert_called_with(
self.SHA1_SAMPLE_BIN)
self.assertEqual(cm.exception.args[0], 'Revision with sha1_git '
'18d8be353ed3480476f032475e7c233eff7371d5 '
'not found.')
@patch('swh.web.ui.service.backend')
@istest
def lookup_revision_multiple(self, mock_backend):
# given
sha1 = self.SHA1_SAMPLE
sha1_other = 'adc83b19e793491b1c6ea0fd8b46cd9f32e592fc'
stub_revisions = [
self.SAMPLE_REVISION_RAW,
{
'id': hex_to_hash(sha1_other),
'directory': 'abcdbe353ed3480476f032475e7c233eff7371d5',
'author': {
'name': b'name',
'email': b'name@surname.org',
},
'committer': {
'name': b'name',
'email': b'name@surname.org',
},
'message': b'ugly fix for bug 42',
'date': {
'timestamp': datetime.datetime(
2000, 1, 12, 5, 23, 54,
tzinfo=datetime.timezone.utc).timestamp(),
'offset': 0,
'negative_utc': False
},
'date_offset': 0,
'committer_date': {
'timestamp': datetime.datetime(
2000, 1, 12, 5, 23, 54,
tzinfo=datetime.timezone.utc).timestamp(),
'offset': 0,
'negative_utc': False
},
'committer_date_offset': 0,
'synthetic': False,
'type': 'git',
'parents': [],
'metadata': [],
}
]
mock_backend.revision_get_multiple.return_value = stub_revisions
# when
actual_revisions = service.lookup_revision_multiple(
[sha1, sha1_other])
# then
self.assertEqual(list(actual_revisions), [
self.SAMPLE_REVISION,
{
'id': sha1_other,
'directory': 'abcdbe353ed3480476f032475e7c233eff7371d5',
'author': {
'name': 'name',
'email': 'name@surname.org',
},
'committer': {
'name': 'name',
'email': 'name@surname.org',
},
'message': 'ugly fix for bug 42',
'date': '2000-01-12T05:23:54+00:00',
'date_offset': 0,
'committer_date': '2000-01-12T05:23:54+00:00',
'committer_date_offset': 0,
'synthetic': False,
'type': 'git',
'parents': [],
'metadata': {},
'merge': False
}
])
self.assertEqual(
list(mock_backend.revision_get_multiple.call_args[0][0]),
[hex_to_hash(sha1),
hex_to_hash(sha1_other)])
@patch('swh.web.ui.service.backend')
@istest
def lookup_revision_multiple_none_found(self, mock_backend):
# given
sha1_bin = self.SHA1_SAMPLE
sha1_other = 'adc83b19e793491b1c6ea0fd8b46cd9f32e592fc'
mock_backend.revision_get_multiple.return_value = []
# then
actual_revisions = service.lookup_revision_multiple(
[sha1_bin, sha1_other])
self.assertEqual(list(actual_revisions), [])
self.assertEqual(
list(mock_backend.revision_get_multiple.call_args[0][0]),
[hex_to_hash(self.SHA1_SAMPLE),
hex_to_hash(sha1_other)])
@patch('swh.web.ui.service.backend')
@istest
def lookup_revision_log(self, mock_backend):
# given
stub_revision_log = [self.SAMPLE_REVISION_RAW]
mock_backend.revision_log = MagicMock(return_value=stub_revision_log)
# when
actual_revision = service.lookup_revision_log(
'abcdbe353ed3480476f032475e7c233eff7371d5',
limit=25)
# then
self.assertEqual(list(actual_revision), [self.SAMPLE_REVISION])
mock_backend.revision_log.assert_called_with(
hex_to_hash('abcdbe353ed3480476f032475e7c233eff7371d5'), 25)
@patch('swh.web.ui.service.backend')
@istest
def lookup_revision_log_by(self, mock_backend):
# given
stub_revision_log = [self.SAMPLE_REVISION_RAW]
mock_backend.revision_log_by = MagicMock(
return_value=stub_revision_log)
# when
actual_log = service.lookup_revision_log_by(
1, 'refs/heads/master', None, limit=100)
# then
self.assertEqual(list(actual_log), [self.SAMPLE_REVISION])
mock_backend.revision_log_by.assert_called_with(
1, 'refs/heads/master', None, 100)
@patch('swh.web.ui.service.backend')
@istest
def lookup_revision_log_by_nolog(self, mock_backend):
# given
mock_backend.revision_log_by = MagicMock(return_value=None)
# when
res = service.lookup_revision_log_by(
1, 'refs/heads/master', None, limit=100)
# then
self.assertEquals(res, None)
mock_backend.revision_log_by.assert_called_with(
1, 'refs/heads/master', None, 100)
@patch('swh.web.ui.service.backend')
@istest
def lookup_content_raw_not_found(self, mock_backend):
# given
mock_backend.content_find = MagicMock(return_value=None)
# when
actual_content = service.lookup_content_raw(
'sha1:18d8be353ed3480476f032475e7c233eff7371d5')
# then
self.assertIsNone(actual_content)
mock_backend.content_find.assert_called_with(
'sha1', hex_to_hash(self.SHA1_SAMPLE))
@patch('swh.web.ui.service.backend')
@istest
def lookup_content_raw(self, mock_backend):
# given
mock_backend.content_find = MagicMock(return_value={
'sha1': self.SHA1_SAMPLE,
})
mock_backend.content_get = MagicMock(return_value={
'data': b'binary data'})
# when
actual_content = service.lookup_content_raw(
'sha256:%s' % self.SHA256_SAMPLE)
# then
self.assertEquals(actual_content, {'data': b'binary data'})
mock_backend.content_find.assert_called_once_with(
'sha256', self.SHA256_SAMPLE_BIN)
mock_backend.content_get.assert_called_once_with(
self.SHA1_SAMPLE)
@patch('swh.web.ui.service.backend')
@istest
def lookup_content_not_found(self, mock_backend):
# given
mock_backend.content_find = MagicMock(return_value=None)
# when
actual_content = service.lookup_content(
'sha1:%s' % self.SHA1_SAMPLE)
# then
self.assertIsNone(actual_content)
mock_backend.content_find.assert_called_with(
'sha1', self.SHA1_SAMPLE_BIN)
@patch('swh.web.ui.service.backend')
@istest
def lookup_content_with_sha1(self, mock_backend):
# given
mock_backend.content_find = MagicMock(
return_value=self.SAMPLE_CONTENT_RAW)
# when
actual_content = service.lookup_content(
'sha1:%s' % self.SHA1_SAMPLE)
# then
self.assertEqual(actual_content, self.SAMPLE_CONTENT)
mock_backend.content_find.assert_called_with(
'sha1', hex_to_hash(self.SHA1_SAMPLE))
@patch('swh.web.ui.service.backend')
@istest
def lookup_content_with_sha256(self, mock_backend):
# given
stub_content = self.SAMPLE_CONTENT_RAW
stub_content['status'] = 'visible'
expected_content = self.SAMPLE_CONTENT
expected_content['status'] = 'visible'
mock_backend.content_find = MagicMock(
return_value=stub_content)
# when
actual_content = service.lookup_content(
'sha256:%s' % self.SHA256_SAMPLE)
# then
self.assertEqual(actual_content, expected_content)
mock_backend.content_find.assert_called_with(
'sha256', self.SHA256_SAMPLE_BIN)
@patch('swh.web.ui.service.backend')
@istest
def lookup_person(self, mock_backend):
# given
mock_backend.person_get = MagicMock(return_value={
'id': 'person_id',
'name': b'some_name',
'email': b'some-email',
})
# when
actual_person = service.lookup_person('person_id')
# then
self.assertEqual(actual_person, {
'id': 'person_id',
'name': 'some_name',
'email': 'some-email',
})
mock_backend.person_get.assert_called_with('person_id')
@patch('swh.web.ui.service.backend')
@istest
def lookup_directory_bad_checksum(self, mock_backend):
# given
mock_backend.directory_ls = MagicMock()
# when
with self.assertRaises(BadInputExc):
service.lookup_directory('directory_id')
# then
mock_backend.directory_ls.called = False
@patch('swh.web.ui.service.backend')
@patch('swh.web.ui.service.query')
@istest
def lookup_directory_not_found(self, mock_query, mock_backend):
# given
mock_query.parse_hash_with_algorithms_or_throws.return_value = (
'sha1',
'directory-id-bin')
mock_backend.directory_get.return_value = None
# when
actual_dir = service.lookup_directory('directory_id')
# then
self.assertIsNone(actual_dir)
mock_query.parse_hash_with_algorithms_or_throws.assert_called_with(
'directory_id', ['sha1'], 'Only sha1_git is supported.')
mock_backend.directory_get.assert_called_with('directory-id-bin')
mock_backend.directory_ls.called = False
@patch('swh.web.ui.service.backend')
@patch('swh.web.ui.service.query')
@istest
def lookup_directory(self, mock_query, mock_backend):
mock_query.parse_hash_with_algorithms_or_throws.return_value = (
'sha1',
'directory-sha1-bin')
# something that exists is all that matters here
mock_backend.directory_get.return_value = {'id': b'directory-sha1-bin'}
# given
stub_dir_entries = [{
'sha1': self.SHA1_SAMPLE_BIN,
'sha256': self.SHA256_SAMPLE_BIN,
'sha1_git': self.SHA1GIT_SAMPLE_BIN,
'target': hex_to_hash('40e71b8614fcd89ccd17ca2b1d9e66'
'c5b00a6d03'),
'dir_id': self.DIRECTORY_ID_BIN,
'name': b'bob',
'type': 10,
}]
expected_dir_entries = [{
'sha1': self.SHA1_SAMPLE,
'sha256': self.SHA256_SAMPLE,
'sha1_git': self.SHA1GIT_SAMPLE,
'target': '40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03',
'dir_id': self.DIRECTORY_ID,
'name': 'bob',
'type': 10,
}]
mock_backend.directory_ls.return_value = stub_dir_entries
# when
actual_directory_ls = list(service.lookup_directory(
'directory-sha1'))
# then
self.assertEqual(actual_directory_ls, expected_dir_entries)
mock_query.parse_hash_with_algorithms_or_throws.assert_called_with(
'directory-sha1', ['sha1'], 'Only sha1_git is supported.')
mock_backend.directory_ls.assert_called_with(
'directory-sha1-bin')
@patch('swh.web.ui.service.backend')
@istest
def lookup_revision_by_nothing_found(self, mock_backend):
# given
mock_backend.revision_get_by.return_value = None
# when
actual_revisions = service.lookup_revision_by(1)
# then
self.assertIsNone(actual_revisions)
mock_backend.revision_get_by(1, 'master', None)
@patch('swh.web.ui.service.backend')
@istest
def lookup_revision_by(self, mock_backend):
# given
stub_rev = self.SAMPLE_REVISION_RAW
expected_rev = self.SAMPLE_REVISION
mock_backend.revision_get_by.return_value = stub_rev
# when
actual_revision = service.lookup_revision_by(10, 'master2', 'some-ts')
# then
self.assertEquals(actual_revision, expected_rev)
mock_backend.revision_get_by(1, 'master2', 'some-ts')
@patch('swh.web.ui.service.backend')
@istest
def lookup_revision_by_nomerge(self, mock_backend):
# given
stub_rev = self.SAMPLE_REVISION_RAW
stub_rev['parents'] = [
hex_to_hash('adc83b19e793491b1c6ea0fd8b46cd9f32e592fc')]
expected_rev = self.SAMPLE_REVISION
expected_rev['parents'] = ['adc83b19e793491b1c6ea0fd8b46cd9f32e592fc']
mock_backend.revision_get_by.return_value = stub_rev
# when
actual_revision = service.lookup_revision_by(10, 'master2', 'some-ts')
# then
self.assertEquals(actual_revision, expected_rev)
mock_backend.revision_get_by(1, 'master2', 'some-ts')
@patch('swh.web.ui.service.backend')
@istest
def lookup_revision_by_merge(self, mock_backend):
# given
stub_rev = self.SAMPLE_REVISION_RAW
stub_rev['parents'] = [
hex_to_hash('adc83b19e793491b1c6ea0fd8b46cd9f32e592fc'),
hex_to_hash('ffff3b19e793491b1c6db0fd8b46cd9f32e592fc')
]
expected_rev = self.SAMPLE_REVISION
expected_rev['parents'] = [
'adc83b19e793491b1c6ea0fd8b46cd9f32e592fc',
'ffff3b19e793491b1c6db0fd8b46cd9f32e592fc'
]
expected_rev['merge'] = True
mock_backend.revision_get_by.return_value = stub_rev
# when
actual_revision = service.lookup_revision_by(10, 'master2', 'some-ts')
# then
self.assertEquals(actual_revision, expected_rev)
mock_backend.revision_get_by(1, 'master2', 'some-ts')
@patch('swh.web.ui.service.backend')
@istest
def lookup_revision_with_context_by_ko(self, mock_backend):
# given
mock_backend.revision_get_by.return_value = None
# when
with self.assertRaises(NotFoundExc) as cm:
origin_id = 1
branch_name = 'master3'
ts = None
service.lookup_revision_with_context_by(origin_id, branch_name, ts,
'sha1')
# then
self.assertIn(
'Revision with (origin_id: %s, branch_name: %s'
', ts: %s) not found.' % (origin_id,
branch_name,
ts), cm.exception.args[0])
mock_backend.revision_get_by.assert_called_once_with(
origin_id, branch_name, ts)
@patch('swh.web.ui.service.lookup_revision_with_context')
@patch('swh.web.ui.service.backend')
@istest
def lookup_revision_with_context_by(self, mock_backend,
mock_lookup_revision_with_context):
# given
stub_root_rev = {'id': 'root-rev-id'}
mock_backend.revision_get_by.return_value = {'id': 'root-rev-id'}
stub_rev = {'id': 'rev-found'}
mock_lookup_revision_with_context.return_value = stub_rev
# when
origin_id = 1
branch_name = 'master3'
ts = None
sha1_git = 'sha1'
actual_root_rev, actual_rev = service.lookup_revision_with_context_by(
origin_id, branch_name, ts, sha1_git)
# then
self.assertEquals(actual_root_rev, stub_root_rev)
self.assertEquals(actual_rev, stub_rev)
mock_backend.revision_get_by.assert_called_once_with(
origin_id, branch_name, ts)
mock_lookup_revision_with_context.assert_called_once_with(
stub_root_rev, sha1_git, 100)
@patch('swh.web.ui.service.backend')
@patch('swh.web.ui.service.query')
@istest
def lookup_entity_by_uuid(self, mock_query, mock_backend):
# given
uuid_test = 'correct-uuid'
mock_query.parse_uuid4.return_value = uuid_test
stub_entities = [{'uuid': uuid_test}]
mock_backend.entity_get.return_value = stub_entities
# when
actual_entities = list(service.lookup_entity_by_uuid(uuid_test))
# then
self.assertEquals(actual_entities, stub_entities)
mock_query.parse_uuid4.assert_called_once_with(uuid_test)
mock_backend.entity_get.assert_called_once_with(uuid_test)
@istest
def lookup_revision_through_ko_not_implemented(self):
# then
with self.assertRaises(NotImplementedError):
service.lookup_revision_through({
'something-unknown': 10,
})
@patch('swh.web.ui.service.lookup_revision_with_context_by')
@istest
def lookup_revision_through_with_context_by(self, mock_lookup):
# given
stub_rev = {'id': 'rev'}
mock_lookup.return_value = stub_rev
# when
actual_revision = service.lookup_revision_through({
'origin_id': 1,
'branch_name': 'master',
'ts': None,
'sha1_git': 'sha1-git'
}, limit=1000)
# then
self.assertEquals(actual_revision, stub_rev)
mock_lookup.assert_called_once_with(
1, 'master', None, 'sha1-git', 1000)
@patch('swh.web.ui.service.lookup_revision_by')
@istest
def lookup_revision_through_with_revision_by(self, mock_lookup):
# given
stub_rev = {'id': 'rev'}
mock_lookup.return_value = stub_rev
# when
actual_revision = service.lookup_revision_through({
'origin_id': 2,
'branch_name': 'master2',
'ts': 'some-ts',
}, limit=10)
# then
self.assertEquals(actual_revision, stub_rev)
mock_lookup.assert_called_once_with(
2, 'master2', 'some-ts')
@patch('swh.web.ui.service.lookup_revision_with_context')
@istest
def lookup_revision_through_with_context(self, mock_lookup):
# given
stub_rev = {'id': 'rev'}
mock_lookup.return_value = stub_rev
# when
actual_revision = service.lookup_revision_through({
'sha1_git_root': 'some-sha1-root',
'sha1_git': 'some-sha1',
})
# then
self.assertEquals(actual_revision, stub_rev)
mock_lookup.assert_called_once_with(
'some-sha1-root', 'some-sha1', 100)
@patch('swh.web.ui.service.lookup_revision')
@istest
def lookup_revision_through_with_revision(self, mock_lookup):
# given
stub_rev = {'id': 'rev'}
mock_lookup.return_value = stub_rev
# when
actual_revision = service.lookup_revision_through({
'sha1_git': 'some-sha1',
})
# then
self.assertEquals(actual_revision, stub_rev)
mock_lookup.assert_called_once_with(
'some-sha1')
@patch('swh.web.ui.service.lookup_revision_through')
@istest
def lookup_directory_through_revision_ko_not_found(
self, mock_lookup_rev):
# given
mock_lookup_rev.return_value = None
# when
with self.assertRaises(NotFoundExc):
service.lookup_directory_through_revision(
{'id': 'rev'}, 'some/path', 100)
mock_lookup_rev.assert_called_once_with({'id': 'rev'}, 100)
@patch('swh.web.ui.service.lookup_revision_through')
@patch('swh.web.ui.service.lookup_directory_with_revision')
@istest
def lookup_directory_through_revision_ok_with_data(
self, mock_lookup_dir, mock_lookup_rev):
# given
mock_lookup_rev.return_value = {'id': 'rev-id'}
mock_lookup_dir.return_value = {'type': 'dir',
'content': []}
# when
rev_id, dir_result = service.lookup_directory_through_revision(
{'id': 'rev'}, 'some/path', 100)
# then
self.assertEquals(rev_id, 'rev-id')
self.assertEquals(dir_result, {'type': 'dir',
'content': []})
mock_lookup_rev.assert_called_once_with({'id': 'rev'}, 100)
mock_lookup_dir.assert_called_once_with('rev-id', 'some/path', False)
@patch('swh.web.ui.service.lookup_revision_through')
@patch('swh.web.ui.service.lookup_directory_with_revision')
@istest
def lookup_directory_through_revision_ok_with_content(
self, mock_lookup_dir, mock_lookup_rev):
# given
mock_lookup_rev.return_value = {'id': 'rev-id'}
stub_result = {'type': 'file',
'revision': 'rev-id',
'content': {'data': b'blah',
'sha1': 'sha1'}}
mock_lookup_dir.return_value = stub_result
# when
rev_id, dir_result = service.lookup_directory_through_revision(
{'id': 'rev'}, 'some/path', 10, with_data=True)
# then
self.assertEquals(rev_id, 'rev-id')
self.assertEquals(dir_result, stub_result)
mock_lookup_rev.assert_called_once_with({'id': 'rev'}, 10)
mock_lookup_dir.assert_called_once_with('rev-id', 'some/path', True)
diff --git a/swh/web/ui/views/browse.py b/swh/web/ui/views/browse.py
index 792ad932..ae13aa6f 100644
--- a/swh/web/ui/views/browse.py
+++ b/swh/web/ui/views/browse.py
@@ -1,1009 +1,1019 @@
-# Copyright (C) 2015-2016 The Software Heritage developers
+# Copyright (C) 2015-2017 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU Affero General Public License version 3, or any later version
# See top-level LICENSE file for more information
+import dateutil.parser
+
from encodings.aliases import aliases
from flask import render_template, request, url_for, redirect
from swh.core.hashutil import ALGORITHMS
from swh.core.utils import grouper
from .. import service, utils
from ..exc import BadInputExc, NotFoundExc
from ..main import app
from . import api
hash_filter_keys = ALGORITHMS
def api_lookup(api_fn, query):
"""Lookup api with api_fn function with parameter query.
Example:
filetype = api_lookup('api_content_filetype', 'sha1:blah')
if filetype:
content['mimetype'] = filetype['mimetype']
"""
try:
return api_fn(query)
except (NotFoundExc, BadInputExc):
return None
@app.route('/origin/search/')
def search_origin():
"""
Redirect request with GET params for an origin to our fragmented URI scheme
"""
if request.method == 'GET':
data = request.args
origin_id = data.get('origin_id')
if origin_id:
return redirect(url_for('browse_origin', origin_id=origin_id))
args = ['origin_type', 'origin_url']
values = {arg: data.get(arg) for arg in args if data.get(arg)}
if 'origin_type' in values and 'origin_url' in values:
return redirect(url_for('browse_origin', **values))
@app.route('/directory/search/')
def search_directory():
"""
Redirect request with GET params for a directory to our fragmented
URI scheme
"""
def url_for_filtered(endpoint, **kwargs):
"""Make url_for ignore keyword args that have an empty string for value
"""
filtered = {k: v for k, v in kwargs.items() if kwargs[k]}
return url_for(endpoint, **filtered)
if request.method == 'GET':
data = request.args
sha1_git = data.get('sha1_git')
if sha1_git:
if 'dir_path' in data:
# dir_path exists only in requests for a revision's directory
return redirect(url_for_filtered(
'browse_revision_directory',
sha1_git=sha1_git,
dir_path=data.get('dir_path')
))
return redirect(url_for_filtered(
'browse_directory',
sha1_git=sha1_git,
path=data.get('path')
))
args = ['origin_id', 'branch_name', 'ts', 'path']
values = {arg: data.get(arg) for arg in args if data.get(arg)}
if 'origin_id' in values:
return redirect(url_for('browse_revision_directory_through_origin',
**values))
@app.route('/revision/search/')
def search_revision():
"""
Redirect request with GET params for a revision to our fragmented
URI scheme
"""
if request.method == 'GET':
data = request.args
sha1_git = data.get('sha1_git')
if sha1_git:
return redirect(url_for('browse_revision', sha1_git=sha1_git))
args = ['origin_id', 'branch_name', 'ts']
values = {arg: data.get(arg) for arg in args if data.get(arg)}
if 'origin_id' in values:
return redirect(url_for('browse_revision_with_origin', **values))
@app.route('/content/symbol/', methods=['GET'])
def search_symbol():
"""Search for symbols in contents.
Returns:
dict representing data to look for in swh storage.
"""
env = {
'result': None,
'per_page': None,
'message': '',
'linknext': None,
'linkprev': None,
}
# Read form or get information
data = request.args
q = data.get('q')
per_page = data.get('per_page')
env['q'] = q
if per_page:
env['per_page'] = per_page
if q:
try:
result = api.api_content_symbol(q)
if result:
headers = result.get('headers')
result = utils.prepare_data_for_view(result['results'])
env['result'] = result
if headers:
if 'link-next' in headers:
next_last_sha1 = result[-1]['sha1']
params = {
'q': q,
'last_sha1': next_last_sha1,
}
if per_page:
params['per_page'] = per_page
env['linknext'] = url_for('search_symbol', **params)
except BadInputExc as e:
env['message'] = str(e)
return render_template('symbols.html', **env)
@app.route('/content/search/', methods=['GET', 'POST'])
def search_content():
"""Search for hashes in swh-storage.
One form to submit either:
- hash query to look up in swh storage
- file hashes calculated client-side to be queried in swh storage
- both
Returns:
dict representing data to look for in swh storage.
The following keys are returned:
- search_stats: {'nbfiles': X, 'pct': Y} the number of total
queried files and percentage of files not in storage respectively
- responses: array of {'filename': X, 'sha1': Y, 'found': Z}
- messages: General messages.
TODO:
Batch-process with all checksums, not just sha1
"""
env = {'search_res': None,
'search_stats': None,
'message': []}
search_stats = {'nbfiles': 0, 'pct': 0}
search_res = None
message = ''
# Get with a single hash request
if request.method == 'POST':
# Post form submission with many hash requests
q = None
else:
data = request.args
q = data.get('q')
try:
search = api.api_check_content_known(q)
search_res = search['search_res']
search_stats = search['search_stats']
except BadInputExc as e:
message = str(e)
env['search_stats'] = search_stats
env['search_res'] = search_res
env['message'] = message
return render_template('search.html', **env)
@app.route('/browse/')
def browse():
"""Render the user-facing browse view
"""
return render_template('browse.html')
@app.route('/browse/content/<string:q>/')
def browse_content(q):
"""Given a hash and a checksum, display the content's meta-data.
Args:
q is of the form algo_hash:hash with algo_hash in
(sha1, sha1_git, sha256)
Returns:
Information on one possible origin for such content.
Raises:
BadInputExc in case of unknown algo_hash or bad hash
NotFoundExc if the content is not found.
"""
env = {'q': q,
'message': None,
'content': None}
encoding = request.args.get('encoding', 'utf8')
if encoding not in aliases:
env['message'] = 'Encoding %s not supported.' \
'Supported Encodings: %s' % (
encoding, list(aliases.keys()))
return render_template('content.html', **env)
try:
content = api.api_content_metadata(q)
filetype = api_lookup(api.api_content_filetype, q)
if filetype:
content['mimetype'] = filetype.get('mimetype')
content['encoding'] = filetype.get('encoding')
else:
content['mimetype'] = None
content['encoding'] = None
language = api_lookup(api.api_content_language, q)
if language:
content['language'] = language.get('lang')
else:
content['language'] = None
licenses = api_lookup(api.api_content_license, q)
if licenses:
content['licenses'] = ', '.join(licenses.get('licenses', []))
else:
content['licenses'] = None
content_raw = service.lookup_content_raw(q)
if content_raw:
content['data'] = content_raw['data']
else:
content['data'] = None
ctags = api_lookup(api.api_content_ctags, q)
if ctags:
url = url_for('browse_content', q=q)
content['ctags'] = grouper((
'<a href="%s#l-%s">%s</a>' % (
url,
ctag['line'],
ctag['line'])
for ctag in ctags
), 20)
else:
content['ctags'] = None
env['content'] = utils.prepare_data_for_view(content,
encoding=encoding)
except (NotFoundExc, BadInputExc) as e:
env['message'] = str(e)
return render_template('content.html', **env)
@app.route('/browse/content/<string:q>/raw/')
def browse_content_raw(q):
"""Given a hash and a checksum, display the content's raw data.
Args:
q is of the form algo_hash:hash with algo_hash in
(sha1, sha1_git, sha256)
Returns:
Information on one possible origin for such content.
Raises:
BadInputExc in case of unknown algo_hash or bad hash
NotFoundExc if the content is not found.
"""
return redirect(url_for('api_content_raw', q=q))
def _origin_seen(q, data):
"""Given an origin, compute a message string with the right information.
Args:
origin: a dictionary with keys:
- origin: a dictionary with type and url keys
- occurrence: a dictionary with a validity range
Returns:
Message as a string
"""
origin_type = data['origin_type']
origin_url = data['origin_url']
revision = data['revision']
branch = data['branch']
path = data['path']
return """The content with hash %s has been seen on origin with type '%s'
at url '%s'. The revision was identified at '%s' on branch '%s'.
The file's path referenced was '%s'.""" % (q,
origin_type,
origin_url,
revision,
branch,
path)
# @app.route('/browse/content/<string:q>/origin/')
def browse_content_with_origin(q):
"""Show content information.
Args:
- q: query string of the form <algo_hash:hash> with
`algo_hash` in sha1, sha1_git, sha256.
This means that several different URLs (at least one per
HASH_ALGO) will point to the same content sha: the sha with
'hash' format
Returns:
The content's information at for a given checksum.
"""
env = {'q': q}
try:
origin = api.api_content_checksum_to_origin(q)
message = _origin_seen(q, origin)
except (NotFoundExc, BadInputExc) as e:
message = str(e)
env['message'] = message
return render_template('content-with-origin.html', **env)
@app.route('/browse/directory/<string:sha1_git>/')
@app.route('/browse/directory/<string:sha1_git>/<path:path>/')
def browse_directory(sha1_git, path=None):
"""Show directory information.
Args:
- sha1_git: the directory's sha1 git identifier. If path
is set, the base directory for the relative path to the entry
- path: the path to the requested entry, relative to
the directory pointed by sha1_git
Returns:
The content's information at sha1_git, or at sha1_git/path if
path is set.
"""
env = {'sha1_git': sha1_git,
'files': []}
try:
if path:
env['message'] = ('Listing for directory with path %s from %s:'
% (path, sha1_git))
dir_or_file = service.lookup_directory_with_path(
sha1_git, path)
if dir_or_file['type'] == 'file':
fsha = 'sha256:%s' % dir_or_file['sha256']
content = api.api_content_metadata(fsha)
content_raw = service.lookup_content_raw(fsha)
if content_raw: # FIXME: currently assuming utf8 encoding
content['data'] = content_raw['data']
env['content'] = utils.prepare_data_for_view(
content, encoding='utf-8')
return render_template('content.html', **env)
else:
directory_files = api.api_directory(dir_or_file['target'])
env['files'] = utils.prepare_data_for_view(directory_files)
else:
env['message'] = "Listing for directory %s:" % sha1_git
directory_files = api.api_directory(sha1_git)
env['files'] = utils.prepare_data_for_view(directory_files)
except (NotFoundExc, BadInputExc) as e:
env['message'] = str(e)
return render_template('directory.html', **env)
@app.route('/browse/origin/<string:origin_type>/url/<path:origin_url>/')
@app.route('/browse/origin/<int:origin_id>/')
def browse_origin(origin_id=None, origin_type=None, origin_url=None):
"""Browse origin matching given criteria - either origin_id or
origin_type and origin_path.
Args:
- origin_id: origin's swh identifier
- origin_type: origin's type
- origin_url: origin's URL
"""
# URLs for the calendar JS plugin
env = {'browse_url': None,
'visit_url': None,
'origin': None}
try:
origin = api.api_origin(origin_id, origin_type, origin_url)
env['origin'] = origin
env['browse_url'] = url_for('browse_revision_with_origin',
origin_id=origin['id'])
- env['visit_url'] = url_for('api_origin_visits',
+ env['visit_url'] = url_for('browse_origin_visits',
origin_id=origin['id'])
except (NotFoundExc, BadInputExc) as e:
env['message'] = str(e)
return render_template('origin.html', **env)
+@app.route('/browse/origin/<int:origin_id>/visits/')
+def browse_origin_visits(origin_id):
+ origin = api.api_origin_visits(origin_id)['results']
+ for v in origin:
+ v['date'] = dateutil.parser.parse(v['date']).timestamp()
+ return origin
+
+
@app.route('/browse/person/<int:person_id>/')
def browse_person(person_id):
"""Browse person with id id.
"""
env = {'person_id': person_id,
'person': None,
'message': None}
try:
env['person'] = api.api_person(person_id)
except (NotFoundExc, BadInputExc) as e:
env['message'] = str(e)
return render_template('person.html', **env)
@app.route('/browse/release/<string:sha1_git>/')
def browse_release(sha1_git):
"""Browse release with sha1_git.
"""
env = {'sha1_git': sha1_git,
'message': None,
'release': None}
try:
rel = api.api_release(sha1_git)
env['release'] = utils.prepare_data_for_view(rel)
except (NotFoundExc, BadInputExc) as e:
env['message'] = str(e)
return render_template('release.html', **env)
@app.route('/browse/revision/<string:sha1_git>/')
@app.route('/browse/revision/<string:sha1_git>/prev/<path:prev_sha1s>/')
def browse_revision(sha1_git, prev_sha1s=None):
"""Browse the revision with git SHA1 sha1_git_cur, while optionally keeping
the context from which we came as a list of previous (i.e. later)
revisions' sha1s.
Args:
sha1_git: the requested revision's sha1_git.
prev_sha1s: an optional string of /-separated sha1s representing our
context, ordered by descending revision date.
Returns:
Information about revision of git SHA1 sha1_git_cur, with relevant URLS
pointing to the context augmented with sha1_git_cur.
Example:
GET /browse/revision/
"""
env = {'sha1_git': sha1_git,
'message': None,
'revision': None}
try:
rev = api.api_revision(sha1_git, prev_sha1s)
env['revision'] = utils.prepare_data_for_view(rev)
except (NotFoundExc, BadInputExc) as e:
env['message'] = str(e)
return render_template('revision.html', **env)
@app.route('/browse/revision/<string:sha1_git>/raw/')
def browse_revision_raw_message(sha1_git):
"""Given a sha1_git, display the corresponding revision's raw message.
"""
return redirect(url_for('api_revision_raw_message', sha1_git=sha1_git))
@app.route('/browse/revision/<string:sha1_git>/log/')
@app.route('/browse/revision/<string:sha1_git>/prev/<path:prev_sha1s>/log/')
def browse_revision_log(sha1_git, prev_sha1s=None):
"""Browse revision with sha1_git's log. If the navigation path through the
commit tree is specified, we intersect the earliest revision's log with the
revisions the user browsed through - ie the path taken to the specified
revision.
Args:
sha1_git: the current revision's SHA1_git checksum
prev_sha1s: optionally, the path through which we want log information
"""
env = {'sha1_git': sha1_git,
'sha1_url': '/browse/revision/%s/' % sha1_git,
'message': None,
'revisions': []}
try:
revision_data = api.api_revision_log(sha1_git, prev_sha1s)
revisions = revision_data['revisions']
next_revs_url = revision_data['next_revs_url']
env['revisions'] = map(utils.prepare_data_for_view, revisions)
env['next_revs_url'] = utils.prepare_data_for_view(next_revs_url)
except (NotFoundExc, BadInputExc) as e:
env['message'] = str(e)
return render_template('revision-log.html', **env)
@app.route('/browse/revision'
'/origin/<int:origin_id>/log/')
@app.route('/browse/revision'
'/origin/<int:origin_id>'
'/branch/<path:branch_name>/log/')
@app.route('/browse/revision'
'/origin/<int:origin_id>'
'/branch/<path:branch_name>'
'/ts/<string:ts>/log/')
@app.route('/browse/revision'
'/origin/<int:origin_id>'
'/ts/<string:ts>/log/')
def browse_revision_log_by(origin_id,
branch_name='refs/heads/master',
timestamp=None):
"""Browse the revision described by origin, branch name and timestamp's
log
Args:
origin_id: the revision's origin
branch_name: the revision's branch
timestamp: the requested timeframe for the revision
Returns:
The revision log of the described revision as a list of revisions
if it is found.
"""
env = {'sha1_git': None,
'origin_id': origin_id,
'origin_url': '/browse/origin/%d/' % origin_id,
'branch_name': branch_name,
'timestamp': timestamp,
'message': None,
'revisions': []}
try:
revisions = api.api_revision_log_by(
origin_id, branch_name, timestamp)
env['revisions'] = map(utils.prepare_data_for_view, revisions)
except (NotFoundExc, BadInputExc) as e:
env['message'] = str(e)
return render_template('revision-log.html', **env)
@app.route('/browse/revision/<string:sha1_git_cur>/prev/<path:sha1s>/')
def browse_with_rev_context(sha1_git_cur, sha1s):
"""Browse the revision with git SHA1 sha1_git_cur, while keeping the context
from which we came as a list of previous (i.e. later) revisions' sha1s.
Args:
sha1_git_cur: the requested revision's sha1_git.
sha1s: a string of /-separated sha1s representing our context, ordered
by descending revision date.
Returns:
Information about revision of git SHA1 sha1_git_cur, with relevant URLS
pointing to the context augmented with sha1_git_cur.
Example:
GET /browse/revision/
"""
env = {'sha1_git': sha1_git_cur,
'message': None,
'revision': None}
try:
revision = api.api_revision(
sha1_git_cur, sha1s)
env['revision'] = utils.prepare_data_for_view(revision)
except (BadInputExc, NotFoundExc) as e:
env['message'] = str(e)
return render_template('revision.html', **env)
@app.route('/browse/revision/<string:sha1_git_root>/history/<sha1_git>/')
def browse_revision_history(sha1_git_root, sha1_git):
"""Display information about revision sha1_git, limited to the
sub-graph of all transitive parents of sha1_git_root.
In other words, sha1_git is an ancestor of sha1_git_root.
Args:
sha1_git_root: latest revision of the browsed history.
sha1_git: one of sha1_git_root's ancestors.
limit: optional query parameter to limit the revisions log
(default to 100). For now, note that this limit could impede the
transitivity conclusion about sha1_git not being an ancestor of
sha1_git_root (even if it is).
Returns:
Information on sha1_git if it is an ancestor of sha1_git_root
including children leading to sha1_git_root.
"""
env = {'sha1_git_root': sha1_git_root,
'sha1_git': sha1_git,
'message': None,
'keys': [],
'revision': None}
if sha1_git == sha1_git_root:
return redirect(url_for('browse_revision',
sha1_git=sha1_git))
try:
revision = api.api_revision_history(sha1_git_root,
sha1_git)
env['revision'] = utils.prepare_data_for_view(revision)
except (BadInputExc, NotFoundExc) as e:
env['message'] = str(e)
return render_template('revision.html', **env)
@app.route('/browse/revision/<string:sha1_git>/directory/')
@app.route('/browse/revision/<string:sha1_git>/directory/<path:dir_path>/')
def browse_revision_directory(sha1_git, dir_path=None):
"""Browse directory from revision with sha1_git.
"""
env = {
'sha1_git': sha1_git,
'path': '.' if not dir_path else dir_path,
'message': None,
'result': None
}
encoding = request.args.get('encoding', 'utf8')
if encoding not in aliases:
env['message'] = 'Encoding %s not supported.' \
'Supported Encodings: %s' % (
encoding, list(aliases.keys()))
return render_template('revision-directory.html', **env)
try:
result = api.api_revision_directory(sha1_git, dir_path, with_data=True)
result['content'] = utils.prepare_data_for_view(result['content'],
encoding=encoding)
env['revision'] = result['revision']
env['result'] = result
except (BadInputExc, NotFoundExc) as e:
env['message'] = str(e)
return render_template('revision-directory.html', **env)
@app.route('/browse/revision/<string:sha1_git_root>'
'/history/<sha1_git>'
'/directory/')
@app.route('/browse/revision/<string:sha1_git_root>'
'/history/<sha1_git>'
'/directory/<path:path>/')
def browse_revision_history_directory(sha1_git_root, sha1_git, path=None):
"""Return information about directory pointed to by the revision
defined as: revision sha1_git, limited to the sub-graph of all
transitive parents of sha1_git_root.
Args:
sha1_git_root: latest revision of the browsed history.
sha1_git: one of sha1_git_root's ancestors.
path: optional directory pointed to by that revision.
limit: optional query parameter to limit the revisions log
(default to 100). For now, note that this limit could impede the
transitivity conclusion about sha1_git not being an ancestor of
sha1_git_root (even if it is).
Returns:
Information on the directory pointed to by that revision.
Raises:
BadInputExc in case of unknown algo_hash or bad hash.
NotFoundExc if either revision is not found or if sha1_git is not an
ancestor of sha1_git_root or the path referenced does not exist
"""
env = {
'sha1_git_root': sha1_git_root,
'sha1_git': sha1_git,
'path': '.' if not path else path,
'message': None,
'result': None
}
encoding = request.args.get('encoding', 'utf8')
if encoding not in aliases:
env['message'] = 'Encoding %s not supported.' \
'Supported Encodings: %s' % (
encoding, list(aliases.keys()))
return render_template('revision-directory.html', **env)
if sha1_git == sha1_git_root:
return redirect(url_for('browse_revision_directory',
sha1_git=sha1_git,
path=path,
encoding=encoding),
code=301)
try:
result = api.api_revision_history_directory(sha1_git_root,
sha1_git,
path,
with_data=True)
env['revision'] = result['revision']
env['content'] = utils.prepare_data_for_view(result['content'],
encoding=encoding)
env['result'] = result
except (BadInputExc, NotFoundExc) as e:
env['message'] = str(e)
return render_template('revision-directory.html', **env)
@app.route('/browse/revision'
'/origin/<int:origin_id>'
'/history/<sha1_git>'
'/directory/')
@app.route('/browse/revision'
'/origin/<int:origin_id>'
'/history/<sha1_git>'
'/directory/<path:path>/')
@app.route('/browse/revision'
'/origin/<int:origin_id>'
'/branch/<path:branch_name>'
'/history/<sha1_git>'
'/directory/')
@app.route('/browse/revision'
'/origin/<int:origin_id>'
'/branch/<path:branch_name>'
'/history/<sha1_git>'
'/directory/<path:path>/')
@app.route('/browse/revision'
'/origin/<int:origin_id>'
'/ts/<string:ts>'
'/history/<sha1_git>'
'/directory/')
@app.route('/browse/revision'
'/origin/<int:origin_id>'
'/ts/<string:ts>'
'/history/<sha1_git>'
'/directory/<path:path>/')
@app.route('/browse/revision'
'/origin/<int:origin_id>'
'/branch/<path:branch_name>'
'/ts/<string:ts>'
'/history/<sha1_git>'
'/directory/')
@app.route('/browse/revision'
'/origin/<int:origin_id>'
'/branch/<path:branch_name>'
'/ts/<string:ts>'
'/history/<sha1_git>'
'/directory/<path:path>/')
def browse_directory_through_revision_with_origin_history(
origin_id,
branch_name="refs/heads/master",
ts=None,
sha1_git=None,
path=None):
env = {
'origin_id': origin_id,
'branch_name': branch_name,
'ts': ts,
'sha1_git': sha1_git,
'path': '.' if not path else path,
'message': None,
'result': None
}
encoding = request.args.get('encoding', 'utf8')
if encoding not in aliases:
env['message'] = (('Encoding %s not supported.'
'Supported Encodings: %s') % (
encoding, list(aliases.keys())))
return render_template('revision-directory.html', **env)
try:
result = api.api_directory_through_revision_with_origin_history(
origin_id, branch_name, ts, sha1_git, path, with_data=True)
env['revision'] = result['revision']
env['content'] = utils.prepare_data_for_view(result['content'],
encoding=encoding)
env['result'] = result
except (BadInputExc, NotFoundExc) as e:
env['message'] = str(e)
return render_template('revision-directory.html', **env)
@app.route('/browse/revision'
'/origin/')
@app.route('/browse/revision'
'/origin/<int:origin_id>/')
@app.route('/browse/revision'
'/origin/<int:origin_id>'
'/branch/<path:branch_name>/')
@app.route('/browse/revision'
'/origin/<int:origin_id>'
'/branch/<path:branch_name>'
'/ts/<string:ts>/')
@app.route('/browse/revision'
'/origin/<int:origin_id>'
'/ts/<string:ts>/')
def browse_revision_with_origin(origin_id,
branch_name="refs/heads/master",
ts=None):
"""Instead of having to specify a (root) revision by SHA1_GIT, users
might want to specify a place and a time. In SWH a "place" is an
origin; a "time" is a timestamp at which some place has been
observed by SWH crawlers.
Args:
origin_id: origin's identifier (default to 1).
branch_name: the optional branch for the given origin (default
to master).
timestamp: optional timestamp (default to the nearest time
crawl of timestamp).
Returns:
Information on the revision if found.
Raises:
BadInputExc in case of unknown algo_hash or bad hash.
NotFoundExc if the revision is not found.
"""
env = {'message': None,
'revision': None}
try:
revision = api.api_revision_with_origin(origin_id,
branch_name,
ts)
env['revision'] = utils.prepare_data_for_view(revision)
except (ValueError, NotFoundExc, BadInputExc) as e:
env['message'] = str(e)
return render_template('revision.html', **env)
@app.route('/browse/revision'
'/origin/<int:origin_id>'
'/history/<sha1_git>/')
@app.route('/browse/revision'
'/origin/<int:origin_id>'
'/branch/<path:branch_name>'
'/history/<sha1_git>/')
@app.route('/browse/revision'
'/origin/<int:origin_id>'
'/branch/<path:branch_name>'
'/ts/<string:ts>'
'/history/<sha1_git>/')
def browse_revision_history_through_origin(origin_id,
branch_name='refs/heads/master',
ts=None,
sha1_git=None):
"""Return information about revision sha1_git, limited to the
sub-graph of all transitive parents of the revision root identified
by (origin_id, branch_name, ts).
Given sha1_git_root such root revision's identifier, in other words,
sha1_git is an ancestor of sha1_git_root.
Args:
origin_id: origin's identifier (default to 1).
branch_name: the optional branch for the given origin (default
to master).
timestamp: optional timestamp (default to the nearest time
crawl of timestamp).
sha1_git: one of sha1_git_root's ancestors.
limit: optional query parameter to limit the revisions log
(default to 100). For now, note that this limit could impede the
transitivity conclusion about sha1_git not being an ancestor of
sha1_git_root (even if it is).
Returns:
Information on sha1_git if it is an ancestor of sha1_git_root
including children leading to sha1_git_root.
"""
env = {'message': None,
'revision': None}
try:
revision = api.api_revision_history_through_origin(
origin_id,
branch_name,
ts,
sha1_git)
env['revision'] = utils.prepare_data_for_view(revision)
except (ValueError, BadInputExc, NotFoundExc) as e:
env['message'] = str(e)
return render_template('revision.html', **env)
@app.route('/browse/revision'
'/origin/<int:origin_id>'
'/directory/')
@app.route('/browse/revision'
'/origin/<int:origin_id>'
'/directory/<path:path>')
@app.route('/browse/revision'
'/origin/<int:origin_id>'
'/branch/<path:branch_name>'
'/directory/')
@app.route('/browse/revision'
'/origin/<int:origin_id>'
'/branch/<path:branch_name>'
'/directory/<path:path>/')
@app.route('/browse/revision'
'/origin/<int:origin_id>'
'/branch/<path:branch_name>'
'/ts/<string:ts>'
'/directory/')
@app.route('/browse/revision'
'/origin/<int:origin_id>'
'/branch/<path:branch_name>'
'/ts/<string:ts>'
'/directory/<path:path>/')
def browse_revision_directory_through_origin(origin_id,
branch_name='refs/heads/master',
ts=None,
path=None):
env = {'message': None,
'origin_id': origin_id,
'ts': ts,
'path': '.' if not path else path,
'result': None}
encoding = request.args.get('encoding', 'utf8')
if encoding not in aliases:
env['message'] = 'Encoding %s not supported.' \
'Supported Encodings: %s' % (
encoding, list(aliases.keys()))
return render_template('revision-directory.html', **env)
try:
result = api.api_directory_through_revision_origin(
origin_id,
branch_name,
ts,
path,
with_data=True)
result['content'] = utils.prepare_data_for_view(result['content'],
encoding=encoding)
env['revision'] = result['revision']
env['result'] = result
except (ValueError, BadInputExc, NotFoundExc) as e:
env['message'] = str(e)
return render_template('revision-directory.html', **env)
@app.route('/browse/entity/')
@app.route('/browse/entity/<string:uuid>/')
def browse_entity(uuid):
env = {'entities': [],
'message': None}
try:
entities = api.api_entity_by_uuid(uuid)
env['entities'] = entities
except (NotFoundExc, BadInputExc) as e:
env['message'] = str(e)
return render_template('entity.html', **env)
File Metadata
Details
Attached
Mime Type
text/x-diff
Expires
Jul 4 2025, 9:59 AM (5 w, 20 h ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3235828
Attached To
rDWAPPS Web applications
Event Timeline
Log In to Comment