Page MenuHomeSoftware Heritage

No OneTemporary

diff --git a/swh/web/ui/converters.py b/swh/web/ui/converters.py
index 71f33078..f462e9f7 100644
--- a/swh/web/ui/converters.py
+++ b/swh/web/ui/converters.py
@@ -1,301 +1,312 @@
# Copyright (C) 2015-2017 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU Affero General Public License version 3, or any later version
# See top-level LICENSE file for more information
import datetime
import json
from swh.core import hashutil
from swh.core.utils import decode_with_escape
from swh.web.ui import utils
def from_swh(dict_swh, hashess={}, bytess={}, dates={}, blacklist={},
removables_if_empty={}, empty_dict={}, empty_list={},
convert={}, convert_fn=lambda x: x):
"""Convert from an swh dictionary to something reasonably json
serializable.
Args:
- dict_swh: the origin dictionary needed to be transformed
- hashess: list/set of keys representing hashes values (sha1, sha256,
sha1_git, etc...) as bytes. Those need to be transformed in hexadecimal
string
- bytess: list/set of keys representing bytes values which needs to
be decoded
- blacklist: set of keys to filter out from the conversion
- convert: set of keys whose associated values need to be converted
using convert_fn
- convert_fn: the conversion function to apply on the value of key
in 'convert'
The remaining keys are copied as is in the output.
Returns:
dictionary equivalent as dict_swh only with its keys `converted`.
"""
def convert_hashes_bytes(v):
"""v is supposedly a hash as bytes, returns it converted in hex.
"""
if isinstance(v, bytes):
return hashutil.hash_to_hex(v)
return v
def convert_bytes(v):
"""v is supposedly a bytes string, decode as utf-8.
FIXME: Improve decoding policy.
If not utf-8, break!
"""
if isinstance(v, bytes):
return v.decode('utf-8')
return v
def convert_date(v):
- """v is a dict with three keys:
- timestamp
- offset
- negative_utc
+ """v is either:
+ - a dict with three keys:
+ - timestamp (dict or integer timestamp)
+ - offset
+ - negative_utc
+ - a datetime
+
+ We convert it to a human-readable string
- We convert it to a human-readable string
"""
+ if isinstance(v, datetime.datetime):
+ return v.isoformat()
+
tz = datetime.timezone(datetime.timedelta(minutes=v['offset']))
- date = datetime.datetime.fromtimestamp(v['timestamp'], tz=tz)
+ swh_timestamp = v['timestamp']
+ if isinstance(swh_timestamp, dict):
+ date = datetime.datetime.fromtimestamp(
+ swh_timestamp['seconds'], tz=tz)
+ else:
+ date = datetime.datetime.fromtimestamp(
+ swh_timestamp, tz=tz)
datestr = date.isoformat()
if v['offset'] == 0 and v['negative_utc']:
# remove the rightmost + and replace it with a -
return '-'.join(datestr.rsplit('+', 1))
return datestr
if not dict_swh:
return dict_swh
new_dict = {}
for key, value in dict_swh.items():
if key in blacklist or (key in removables_if_empty and not value):
continue
if key in dates:
new_dict[key] = convert_date(value)
elif key in convert:
new_dict[key] = convert_fn(value)
elif isinstance(value, dict):
new_dict[key] = from_swh(value,
hashess=hashess, bytess=bytess,
dates=dates, blacklist=blacklist,
removables_if_empty=removables_if_empty,
empty_dict=empty_dict,
empty_list=empty_list,
convert=convert,
convert_fn=convert_fn)
elif key in hashess:
new_dict[key] = utils.fmap(convert_hashes_bytes, value)
elif key in bytess:
try:
new_dict[key] = utils.fmap(convert_bytes, value)
except UnicodeDecodeError:
if 'decoding_failures' not in new_dict:
new_dict['decoding_failures'] = [key]
else:
new_dict['decoding_failures'].append(key)
new_dict[key] = utils.fmap(decode_with_escape, value)
elif key in empty_dict and not value:
new_dict[key] = {}
elif key in empty_list and not value:
new_dict[key] = []
else:
new_dict[key] = value
return new_dict
def from_provenance(provenance):
"""Convert from a provenance information to a provenance dictionary.
Args:
provenance: Dictionary with the following keys:
content (sha1_git) : the content's identifier
revision (sha1_git) : the revision the content was seen
origin (int) : the origin the content was seen
visit (int) : the visit it occurred
path (bytes) : the path the content was seen at
"""
return from_swh(provenance,
hashess={'content', 'revision'},
bytess={'path'})
def from_origin(origin):
"""Convert from an SWH origin to an origin dictionary.
"""
return from_swh(origin,
removables_if_empty={'lister', 'project'})
def from_release(release):
"""Convert from an SWH release to a json serializable release dictionary.
Args:
release: Dict with the following keys
- id: identifier of the revision (sha1 in bytes)
- revision: identifier of the revision the release points to (sha1 in
bytes)
- comment: release's comment message (bytes)
- name: release's name (string)
- author: release's author identifier (swh's id)
- synthetic: the synthetic property (boolean)
Returns:
Release dictionary with the following keys:
- id: hexadecimal sha1 (string)
- revision: hexadecimal sha1 (string)
- comment: release's comment message (string)
- name: release's name (string)
- author: release's author identifier (swh's id)
- synthetic: the synthetic property (boolean)
"""
return from_swh(
release,
hashess={'id', 'target'},
bytess={'message', 'name', 'fullname', 'email'},
dates={'date'},
)
class SWHMetadataEncoder(json.JSONEncoder):
"""Special json encoder for metadata field which can contain bytes
encoded value.
"""
def default(self, obj):
if isinstance(obj, bytes):
return obj.decode('utf-8')
# Let the base class default method raise the TypeError
return json.JSONEncoder.default(self, obj)
def convert_revision_metadata(metadata):
"""Convert json specific dict to a json serializable one.
"""
if not metadata:
return {}
return json.loads(json.dumps(metadata, cls=SWHMetadataEncoder))
def from_revision(revision):
"""Convert from an SWH revision to a json serializable revision dictionary.
Args:
revision: Dict with the following keys
- id: identifier of the revision (sha1 in bytes)
- directory: identifier of the directory the revision points to (sha1
in bytes)
- author_name, author_email: author's revision name and email
- committer_name, committer_email: committer's revision name and email
- message: revision's message
- date, date_offset: revision's author date
- committer_date, committer_date_offset: revision's commit date
- parents: list of parents for such revision
- synthetic: revision's property nature
- type: revision's type (git, tar or dsc at the moment)
- metadata: if the revision is synthetic, this can reference dynamic
properties.
Returns:
Revision dictionary with the same keys as inputs, only:
- sha1s are in hexadecimal strings (id, directory)
- bytes are decoded in string (author_name, committer_name,
author_email, committer_email)
- remaining keys are left as is
"""
revision = from_swh(revision,
hashess={'id', 'directory', 'parents', 'children'},
bytess={'name', 'fullname', 'email'},
convert={'metadata'},
convert_fn=convert_revision_metadata,
dates={'date', 'committer_date'})
if revision:
if 'parents' in revision:
revision['merge'] = len(revision['parents']) > 1
if 'message' in revision:
try:
revision['message'] = revision['message'].decode('utf-8')
except UnicodeDecodeError:
revision['message_decoding_failed'] = True
revision['message'] = None
return revision
def from_content(content):
"""Convert swh content to serializable content dictionary.
"""
return from_swh(content,
hashess={'sha1', 'sha1_git', 'sha256'},
blacklist={'ctime'},
convert={'status'},
convert_fn=lambda v: 'absent' if v == 'hidden' else v)
def from_person(person):
"""Convert swh person to serializable person dictionary.
"""
return from_swh(person,
bytess={'name', 'fullname', 'email'})
def from_origin_visit(visit):
"""Convert swh origin_visit to serializable origin_visit dictionary.
"""
ov = from_swh(visit,
hashess={'target'},
bytess={'branch'},
- convert={'date'},
- empty_dict={'metadata'},
- convert_fn=lambda d: d.timestamp())
+ dates={'date'},
+ empty_dict={'metadata'})
if ov and 'occurrences' in ov:
ov['occurrences'] = {
decode_with_escape(k): v
for k, v in ov['occurrences'].items()
}
return ov
def from_directory_entry(dir_entry):
"""Convert swh person to serializable person dictionary.
"""
return from_swh(dir_entry,
hashess={'dir_id', 'sha1_git', 'sha1', 'sha256', 'target'},
bytess={'name'},
removables_if_empty={
'sha1', 'sha1_git', 'sha256', 'status'},
convert={'status'},
convert_fn=lambda v: 'absent' if v == 'hidden' else v)
def from_filetype(content_entry):
"""Convert swh person to serializable person dictionary.
"""
return from_swh(content_entry,
hashess={'id'},
bytess={'mimetype', 'encoding'})
diff --git a/swh/web/ui/tests/test_converters.py b/swh/web/ui/tests/test_converters.py
index 10f108a5..b154f716 100644
--- a/swh/web/ui/tests/test_converters.py
+++ b/swh/web/ui/tests/test_converters.py
@@ -1,725 +1,735 @@
# Copyright (C) 2015 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU Affero General Public License version 3, or any later version
# See top-level LICENSE file for more information
import datetime
import unittest
from nose.tools import istest
from swh.core import hashutil
from swh.web.ui import converters
class ConvertersTestCase(unittest.TestCase):
@istest
def from_swh(self):
some_input = {
'a': 'something',
'b': 'someone',
'c': b'sharp-0.3.4.tgz',
'd': hashutil.hex_to_hash(
'b04caf10e9535160d90e874b45aa426de762f19f'),
'e': b'sharp.html/doc_002dS_005fISREG.html',
'g': [b'utf-8-to-decode', b'another-one'],
'h': 'something filtered',
'i': {'e': b'something'},
'j': {
'k': {
'l': [b'bytes thing', b'another thingy', b''],
'n': 'dont care either'
},
'm': 'dont care'
},
'o': 'something',
'p': b'foo',
'q': {'extra-headers': [['a', b'intact']]},
'w': None,
'r': {'p': 'also intact',
'q': 'bar'},
's': {
'timestamp': 42,
'offset': -420,
'negative_utc': None,
},
+ 's1': {
+ 'timestamp': {'seconds': 42, 'microseconds': 0},
+ 'offset': -420,
+ 'negative_utc': None,
+ },
+ 's2': datetime.datetime(
+ 2013, 7, 1, 20, 0, 0,
+ tzinfo=datetime.timezone.utc),
't': None,
'u': None,
'v': None,
'x': None,
}
expected_output = {
'a': 'something',
'b': 'someone',
'c': 'sharp-0.3.4.tgz',
'd': 'b04caf10e9535160d90e874b45aa426de762f19f',
'e': 'sharp.html/doc_002dS_005fISREG.html',
'g': ['utf-8-to-decode', 'another-one'],
'i': {'e': 'something'},
'j': {
'k': {
'l': ['bytes thing', 'another thingy', '']
}
},
'p': 'foo',
'q': {'extra-headers': [['a', 'intact']]},
'w': {},
'r': {'p': 'also intact',
'q': 'bar'},
's': '1969-12-31T17:00:42-07:00',
+ 's1': '1969-12-31T17:00:42-07:00',
+ 's2': '2013-07-01T20:00:00+00:00',
'u': {},
'v': [],
'x': None,
}
actual_output = converters.from_swh(
some_input,
hashess={'d', 'o', 'x'},
bytess={'c', 'e', 'g', 'l'},
- dates={'s'},
+ dates={'s', 's1', 's2'},
blacklist={'h', 'm', 'n', 'o'},
removables_if_empty={'t'},
empty_dict={'u'},
empty_list={'v'},
convert={'p', 'q', 'w'},
convert_fn=converters.convert_revision_metadata)
self.assertEquals(expected_output, actual_output)
@istest
def from_swh_edge_cases_do_no_conversion_if_none_or_not_bytes(self):
some_input = {
'a': 'something',
'b': None,
'c': 'someone',
'd': None,
}
expected_output = {
'a': 'something',
'b': None,
'c': 'someone',
'd': None,
}
actual_output = converters.from_swh(some_input,
hashess={'a', 'b'},
bytess={'c', 'd'})
self.assertEquals(expected_output, actual_output)
@istest
def from_swh_edge_cases_convert_invalid_utf8_bytes(self):
some_input = {
'a': 'something',
'b': 'someone',
'c': b'a name \xff',
'd': b'an email \xff',
}
expected_output = {
'a': 'something',
'b': 'someone',
'c': 'a name \\xff',
'd': 'an email \\xff',
'decoding_failures': ['c', 'd']
}
actual_output = converters.from_swh(some_input,
hashess={'a', 'b'},
bytess={'c', 'd'})
for v in ['a', 'b', 'c', 'd']:
self.assertEqual(expected_output[v], actual_output[v])
self.assertEqual(len(expected_output['decoding_failures']),
len(actual_output['decoding_failures']))
for v in expected_output['decoding_failures']:
self.assertTrue(v in actual_output['decoding_failures'])
@istest
def from_swh_empty(self):
# when
self.assertEquals({}, converters.from_swh({}))
@istest
def from_swh_none(self):
# when
self.assertIsNone(converters.from_swh(None))
@istest
def from_provenance(self):
# given
input_provenance = {
'origin': 10,
'visit': 1,
'content': hashutil.hex_to_hash(
'321caf10e9535160d90e874b45aa426de762f19f'),
'revision': hashutil.hex_to_hash(
'123caf10e9535160d90e874b45aa426de762f19f'),
'path': b'octave-3.4.0/doc/interpreter/octave/doc_002dS_005fISREG'
}
expected_provenance = {
'origin': 10,
'visit': 1,
'content': '321caf10e9535160d90e874b45aa426de762f19f',
'revision': '123caf10e9535160d90e874b45aa426de762f19f',
'path': 'octave-3.4.0/doc/interpreter/octave/doc_002dS_005fISREG'
}
# when
actual_provenance = converters.from_provenance(input_provenance)
# then
self.assertEqual(actual_provenance, expected_provenance)
@istest
def from_origin(self):
# given
origin_input = {
'id': 9,
'type': 'ftp',
'url': 'rsync://ftp.gnu.org/gnu/octave',
'project': None,
'lister': None,
}
expected_origin = {
'id': 9,
'type': 'ftp',
'url': 'rsync://ftp.gnu.org/gnu/octave',
}
# when
actual_origin = converters.from_origin(origin_input)
# then
self.assertEqual(actual_origin, expected_origin)
@istest
def from_release(self):
release_input = {
'id': hashutil.hex_to_hash(
'aad23fa492a0c5fed0708a6703be875448c86884'),
'target': hashutil.hex_to_hash(
'5e46d564378afc44b31bb89f99d5675195fbdf67'),
'target_type': 'revision',
'date': {
'timestamp': datetime.datetime(
2015, 1, 1, 22, 0, 0,
tzinfo=datetime.timezone.utc).timestamp(),
'offset': 0,
'negative_utc': False,
},
'author': {
'name': b'author name',
'fullname': b'Author Name author@email',
'email': b'author@email',
},
'name': b'v0.0.1',
'message': b'some comment on release',
'synthetic': True,
}
expected_release = {
'id': 'aad23fa492a0c5fed0708a6703be875448c86884',
'target': '5e46d564378afc44b31bb89f99d5675195fbdf67',
'target_type': 'revision',
'date': '2015-01-01T22:00:00+00:00',
'author': {
'name': 'author name',
'fullname': 'Author Name author@email',
'email': 'author@email',
},
'name': 'v0.0.1',
'message': 'some comment on release',
'target_type': 'revision',
'synthetic': True,
}
# when
actual_release = converters.from_release(release_input)
# then
self.assertEqual(actual_release, expected_release)
@istest
def from_release_no_revision(self):
release_input = {
'id': hashutil.hex_to_hash(
'b2171ee2bdf119cd99a7ec7eff32fa8013ef9a4e'),
'target': None,
'date': {
'timestamp': datetime.datetime(
2016, 3, 2, 10, 0, 0,
tzinfo=datetime.timezone.utc).timestamp(),
'offset': 0,
'negative_utc': True,
},
'name': b'v0.1.1',
'message': b'comment on release',
'synthetic': False,
'author': {
'name': b'bob',
'fullname': b'Bob bob@alice.net',
'email': b'bob@alice.net',
},
}
expected_release = {
'id': 'b2171ee2bdf119cd99a7ec7eff32fa8013ef9a4e',
'target': None,
'date': '2016-03-02T10:00:00-00:00',
'name': 'v0.1.1',
'message': 'comment on release',
'synthetic': False,
'author': {
'name': 'bob',
'fullname': 'Bob bob@alice.net',
'email': 'bob@alice.net',
},
}
# when
actual_release = converters.from_release(release_input)
# then
self.assertEqual(actual_release, expected_release)
@istest
def from_revision(self):
revision_input = {
'id': hashutil.hex_to_hash(
'18d8be353ed3480476f032475e7c233eff7371d5'),
'directory': hashutil.hex_to_hash(
'7834ef7e7c357ce2af928115c6c6a42b7e2a44e6'),
'author': {
'name': b'Software Heritage',
'fullname': b'robot robot@softwareheritage.org',
'email': b'robot@softwareheritage.org',
},
'committer': {
'name': b'Software Heritage',
'fullname': b'robot robot@softwareheritage.org',
'email': b'robot@softwareheritage.org',
},
'message': b'synthetic revision message',
'date': {
'timestamp': datetime.datetime(
2000, 1, 17, 11, 23, 54,
tzinfo=datetime.timezone.utc).timestamp(),
'offset': 0,
'negative_utc': False,
},
'committer_date': {
'timestamp': datetime.datetime(
2000, 1, 17, 11, 23, 54,
tzinfo=datetime.timezone.utc).timestamp(),
'offset': 0,
'negative_utc': False,
},
'synthetic': True,
'type': 'tar',
'parents': [
hashutil.hex_to_hash(
'29d8be353ed3480476f032475e7c244eff7371d5'),
hashutil.hex_to_hash(
'30d8be353ed3480476f032475e7c244eff7371d5')
],
'children': [
hashutil.hex_to_hash(
'123546353ed3480476f032475e7c244eff7371d5'),
],
'metadata': {
'extra_headers': [['gpgsig', b'some-signature']],
'original_artifact': [{
'archive_type': 'tar',
'name': 'webbase-5.7.0.tar.gz',
'sha1': '147f73f369733d088b7a6fa9c4e0273dcd3c7ccd',
'sha1_git': '6a15ea8b881069adedf11feceec35588f2cfe8f1',
'sha256': '401d0df797110bea805d358b85bcc1ced29549d3d73f'
'309d36484e7edf7bb912',
}]
},
}
expected_revision = {
'id': '18d8be353ed3480476f032475e7c233eff7371d5',
'directory': '7834ef7e7c357ce2af928115c6c6a42b7e2a44e6',
'author': {
'name': 'Software Heritage',
'fullname': 'robot robot@softwareheritage.org',
'email': 'robot@softwareheritage.org',
},
'committer': {
'name': 'Software Heritage',
'fullname': 'robot robot@softwareheritage.org',
'email': 'robot@softwareheritage.org',
},
'message': 'synthetic revision message',
'date': "2000-01-17T11:23:54+00:00",
'committer_date': "2000-01-17T11:23:54+00:00",
'children': [
'123546353ed3480476f032475e7c244eff7371d5'
],
'parents': [
'29d8be353ed3480476f032475e7c244eff7371d5',
'30d8be353ed3480476f032475e7c244eff7371d5'
],
'type': 'tar',
'synthetic': True,
'metadata': {
'extra_headers': [['gpgsig', 'some-signature']],
'original_artifact': [{
'archive_type': 'tar',
'name': 'webbase-5.7.0.tar.gz',
'sha1': '147f73f369733d088b7a6fa9c4e0273dcd3c7ccd',
'sha1_git': '6a15ea8b881069adedf11feceec35588f2cfe8f1',
'sha256': '401d0df797110bea805d358b85bcc1ced29549d3d73f'
'309d36484e7edf7bb912'
}]
},
'merge': True
}
# when
actual_revision = converters.from_revision(revision_input)
# then
self.assertEqual(actual_revision, expected_revision)
@istest
def from_revision_nomerge(self):
revision_input = {
'id': hashutil.hex_to_hash(
'18d8be353ed3480476f032475e7c233eff7371d5'),
'parents': [
hashutil.hex_to_hash(
'29d8be353ed3480476f032475e7c244eff7371d5')
]
}
expected_revision = {
'id': '18d8be353ed3480476f032475e7c233eff7371d5',
'parents': [
'29d8be353ed3480476f032475e7c244eff7371d5'
],
'merge': False
}
# when
actual_revision = converters.from_revision(revision_input)
# then
self.assertEqual(actual_revision, expected_revision)
@istest
def from_revision_noparents(self):
revision_input = {
'id': hashutil.hex_to_hash(
'18d8be353ed3480476f032475e7c233eff7371d5'),
'directory': hashutil.hex_to_hash(
'7834ef7e7c357ce2af928115c6c6a42b7e2a44e6'),
'author': {
'name': b'Software Heritage',
'fullname': b'robot robot@softwareheritage.org',
'email': b'robot@softwareheritage.org',
},
'committer': {
'name': b'Software Heritage',
'fullname': b'robot robot@softwareheritage.org',
'email': b'robot@softwareheritage.org',
},
'message': b'synthetic revision message',
'date': {
'timestamp': datetime.datetime(
2000, 1, 17, 11, 23, 54,
tzinfo=datetime.timezone.utc).timestamp(),
'offset': 0,
'negative_utc': False,
},
'committer_date': {
'timestamp': datetime.datetime(
2000, 1, 17, 11, 23, 54,
tzinfo=datetime.timezone.utc).timestamp(),
'offset': 0,
'negative_utc': False,
},
'synthetic': True,
'type': 'tar',
'children': [
hashutil.hex_to_hash(
'123546353ed3480476f032475e7c244eff7371d5'),
],
'metadata': {
'original_artifact': [{
'archive_type': 'tar',
'name': 'webbase-5.7.0.tar.gz',
'sha1': '147f73f369733d088b7a6fa9c4e0273dcd3c7ccd',
'sha1_git': '6a15ea8b881069adedf11feceec35588f2cfe8f1',
'sha256': '401d0df797110bea805d358b85bcc1ced29549d3d73f'
'309d36484e7edf7bb912',
}]
},
}
expected_revision = {
'id': '18d8be353ed3480476f032475e7c233eff7371d5',
'directory': '7834ef7e7c357ce2af928115c6c6a42b7e2a44e6',
'author': {
'name': 'Software Heritage',
'fullname': 'robot robot@softwareheritage.org',
'email': 'robot@softwareheritage.org',
},
'committer': {
'name': 'Software Heritage',
'fullname': 'robot robot@softwareheritage.org',
'email': 'robot@softwareheritage.org',
},
'message': 'synthetic revision message',
'date': "2000-01-17T11:23:54+00:00",
'committer_date': "2000-01-17T11:23:54+00:00",
'children': [
'123546353ed3480476f032475e7c244eff7371d5'
],
'type': 'tar',
'synthetic': True,
'metadata': {
'original_artifact': [{
'archive_type': 'tar',
'name': 'webbase-5.7.0.tar.gz',
'sha1': '147f73f369733d088b7a6fa9c4e0273dcd3c7ccd',
'sha1_git': '6a15ea8b881069adedf11feceec35588f2cfe8f1',
'sha256': '401d0df797110bea805d358b85bcc1ced29549d3d73f'
'309d36484e7edf7bb912'
}]
}
}
# when
actual_revision = converters.from_revision(revision_input)
# then
self.assertEqual(actual_revision, expected_revision)
@istest
def from_revision_invalid(self):
revision_input = {
'id': hashutil.hex_to_hash(
'18d8be353ed3480476f032475e7c233eff7371d5'),
'directory': hashutil.hex_to_hash(
'7834ef7e7c357ce2af928115c6c6a42b7e2a44e6'),
'author': {
'name': b'Software Heritage',
'fullname': b'robot robot@softwareheritage.org',
'email': b'robot@softwareheritage.org',
},
'committer': {
'name': b'Software Heritage',
'fullname': b'robot robot@softwareheritage.org',
'email': b'robot@softwareheritage.org',
},
'message': b'invalid message \xff',
'date': {
'timestamp': datetime.datetime(
2000, 1, 17, 11, 23, 54,
tzinfo=datetime.timezone.utc).timestamp(),
'offset': 0,
'negative_utc': False,
},
'committer_date': {
'timestamp': datetime.datetime(
2000, 1, 17, 11, 23, 54,
tzinfo=datetime.timezone.utc).timestamp(),
'offset': 0,
'negative_utc': False,
},
'synthetic': True,
'type': 'tar',
'parents': [
hashutil.hex_to_hash(
'29d8be353ed3480476f032475e7c244eff7371d5'),
hashutil.hex_to_hash(
'30d8be353ed3480476f032475e7c244eff7371d5')
],
'children': [
hashutil.hex_to_hash(
'123546353ed3480476f032475e7c244eff7371d5'),
],
'metadata': {
'original_artifact': [{
'archive_type': 'tar',
'name': 'webbase-5.7.0.tar.gz',
'sha1': '147f73f369733d088b7a6fa9c4e0273dcd3c7ccd',
'sha1_git': '6a15ea8b881069adedf11feceec35588f2cfe8f1',
'sha256': '401d0df797110bea805d358b85bcc1ced29549d3d73f'
'309d36484e7edf7bb912',
}]
},
}
expected_revision = {
'id': '18d8be353ed3480476f032475e7c233eff7371d5',
'directory': '7834ef7e7c357ce2af928115c6c6a42b7e2a44e6',
'author': {
'name': 'Software Heritage',
'fullname': 'robot robot@softwareheritage.org',
'email': 'robot@softwareheritage.org',
},
'committer': {
'name': 'Software Heritage',
'fullname': 'robot robot@softwareheritage.org',
'email': 'robot@softwareheritage.org',
},
'message': None,
'message_decoding_failed': True,
'date': "2000-01-17T11:23:54+00:00",
'committer_date': "2000-01-17T11:23:54+00:00",
'children': [
'123546353ed3480476f032475e7c244eff7371d5'
],
'parents': [
'29d8be353ed3480476f032475e7c244eff7371d5',
'30d8be353ed3480476f032475e7c244eff7371d5'
],
'type': 'tar',
'synthetic': True,
'metadata': {
'original_artifact': [{
'archive_type': 'tar',
'name': 'webbase-5.7.0.tar.gz',
'sha1': '147f73f369733d088b7a6fa9c4e0273dcd3c7ccd',
'sha1_git': '6a15ea8b881069adedf11feceec35588f2cfe8f1',
'sha256': '401d0df797110bea805d358b85bcc1ced29549d3d73f'
'309d36484e7edf7bb912'
}]
},
'merge': True
}
# when
actual_revision = converters.from_revision(revision_input)
# then
self.assertEqual(actual_revision, expected_revision)
@istest
def from_content_none(self):
self.assertIsNone(converters.from_content(None))
@istest
def from_content(self):
content_input = {
'sha1': hashutil.hex_to_hash('5c6f0e2750f48fa0bd0c4cf5976ba0b9e0'
'2ebda5'),
'sha256': hashutil.hex_to_hash('39007420ca5de7cb3cfc15196335507e'
'e76c98930e7e0afa4d2747d3bf96c926'),
'sha1_git': hashutil.hex_to_hash('40e71b8614fcd89ccd17ca2b1d9e66'
'c5b00a6d03'),
'ctime': 'something-which-is-filtered-out',
'data': b'data in bytes',
'length': 10,
'status': 'hidden',
}
# 'status' is filtered
expected_content = {
'sha1': '5c6f0e2750f48fa0bd0c4cf5976ba0b9e02ebda5',
'sha256': '39007420ca5de7cb3cfc15196335507ee76c98930e7e0afa4d274'
'7d3bf96c926',
'sha1_git': '40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03',
'data': b'data in bytes',
'length': 10,
'status': 'absent',
}
# when
actual_content = converters.from_content(content_input)
# then
self.assertEqual(actual_content, expected_content)
@istest
def from_person(self):
person_input = {
'id': 10,
'anything': 'else',
'name': b'bob',
'fullname': b'bob bob@alice.net',
'email': b'bob@foo.alice',
}
expected_person = {
'id': 10,
'anything': 'else',
'name': 'bob',
'fullname': 'bob bob@alice.net',
'email': 'bob@foo.alice',
}
# when
actual_person = converters.from_person(person_input)
# then
self.assertEqual(actual_person, expected_person)
@istest
def from_directory_entries(self):
dir_entries_input = {
'sha1': hashutil.hex_to_hash('5c6f0e2750f48fa0bd0c4cf5976ba0b9e0'
'2ebda5'),
'sha256': hashutil.hex_to_hash('39007420ca5de7cb3cfc15196335507e'
'e76c98930e7e0afa4d2747d3bf96c926'),
'sha1_git': hashutil.hex_to_hash('40e71b8614fcd89ccd17ca2b1d9e66'
'c5b00a6d03'),
'target': hashutil.hex_to_hash('40e71b8614fcd89ccd17ca2b1d9e66'
'c5b00a6d03'),
'dir_id': hashutil.hex_to_hash('40e71b8614fcd89ccd17ca2b1d9e66'
'c5b00a6d03'),
'name': b'bob',
'type': 10,
'status': 'hidden',
}
expected_dir_entries = {
'sha1': '5c6f0e2750f48fa0bd0c4cf5976ba0b9e02ebda5',
'sha256': '39007420ca5de7cb3cfc15196335507ee76c98930e7e0afa4d2747'
'd3bf96c926',
'sha1_git': '40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03',
'target': '40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03',
'dir_id': '40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03',
'name': 'bob',
'type': 10,
'status': 'absent',
}
# when
actual_dir_entries = converters.from_directory_entry(dir_entries_input)
# then
self.assertEqual(actual_dir_entries, expected_dir_entries)
@istest
def from_filetype(self):
content_filetype = {
'id': hashutil.hex_to_hash('5c6f0e2750f48fa0bd0c4cf5976ba0b9e02ebd'
'a5'),
'encoding': b'utf-8',
'mimetype': b'text/plain',
}
expected_content_filetype = {
'id': '5c6f0e2750f48fa0bd0c4cf5976ba0b9e02ebda5',
'encoding': 'utf-8',
'mimetype': 'text/plain',
}
# when
actual_content_filetype = converters.from_filetype(content_filetype)
# then
self.assertEqual(actual_content_filetype, expected_content_filetype)
diff --git a/swh/web/ui/tests/test_service.py b/swh/web/ui/tests/test_service.py
index 88df5085..ed02be7f 100644
--- a/swh/web/ui/tests/test_service.py
+++ b/swh/web/ui/tests/test_service.py
@@ -1,2043 +1,2043 @@
# Copyright (C) 2015-2016 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU Affero General Public License version 3, or any later version
# See top-level LICENSE file for more information
import datetime
from nose.tools import istest
from unittest.mock import MagicMock, patch, call
from swh.core.hashutil import hex_to_hash, hash_to_hex
from swh.web.ui import service
from swh.web.ui.exc import BadInputExc, NotFoundExc
from swh.web.ui.tests import test_app
class ServiceTestCase(test_app.SWHApiTestCase):
def setUp(self):
self.SHA1_SAMPLE = '18d8be353ed3480476f032475e7c233eff7371d5'
self.SHA1_SAMPLE_BIN = hex_to_hash(self.SHA1_SAMPLE)
self.SHA256_SAMPLE = ('39007420ca5de7cb3cfc15196335507e'
'e76c98930e7e0afa4d2747d3bf96c926')
self.SHA256_SAMPLE_BIN = hex_to_hash(self.SHA256_SAMPLE)
self.SHA1GIT_SAMPLE = '40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03'
self.SHA1GIT_SAMPLE_BIN = hex_to_hash(self.SHA1GIT_SAMPLE)
self.DIRECTORY_ID = '7834ef7e7c357ce2af928115c6c6a42b7e2a44e6'
self.DIRECTORY_ID_BIN = hex_to_hash(self.DIRECTORY_ID)
self.AUTHOR_ID_BIN = {
'name': b'author',
'email': b'author@company.org',
}
self.AUTHOR_ID = {
'name': 'author',
'email': 'author@company.org',
}
self.COMMITTER_ID_BIN = {
'name': b'committer',
'email': b'committer@corp.org',
}
self.COMMITTER_ID = {
'name': 'committer',
'email': 'committer@corp.org',
}
self.SAMPLE_DATE_RAW = {
'timestamp': datetime.datetime(
2000, 1, 17, 11, 23, 54,
tzinfo=datetime.timezone.utc,
).timestamp(),
'offset': 0,
'negative_utc': False,
}
self.SAMPLE_DATE = '2000-01-17T11:23:54+00:00'
self.SAMPLE_MESSAGE_BIN = b'elegant fix for bug 31415957'
self.SAMPLE_MESSAGE = 'elegant fix for bug 31415957'
self.SAMPLE_REVISION = {
'id': self.SHA1_SAMPLE,
'directory': self.DIRECTORY_ID,
'author': self.AUTHOR_ID,
'committer': self.COMMITTER_ID,
'message': self.SAMPLE_MESSAGE,
'date': self.SAMPLE_DATE,
'committer_date': self.SAMPLE_DATE,
'synthetic': False,
'type': 'git',
'parents': [],
'metadata': {},
'merge': False
}
self.SAMPLE_REVISION_RAW = {
'id': self.SHA1_SAMPLE_BIN,
'directory': self.DIRECTORY_ID_BIN,
'author': self.AUTHOR_ID_BIN,
'committer': self.COMMITTER_ID_BIN,
'message': self.SAMPLE_MESSAGE_BIN,
'date': self.SAMPLE_DATE_RAW,
'committer_date': self.SAMPLE_DATE_RAW,
'synthetic': False,
'type': 'git',
'parents': [],
'metadata': [],
}
self.SAMPLE_CONTENT = {
'sha1': self.SHA1_SAMPLE,
'sha256': self.SHA256_SAMPLE,
'sha1_git': self.SHA1GIT_SAMPLE,
'length': 190,
'status': 'absent'
}
self.SAMPLE_CONTENT_RAW = {
'sha1': self.SHA1_SAMPLE_BIN,
'sha256': self.SHA256_SAMPLE_BIN,
'sha1_git': self.SHA1GIT_SAMPLE_BIN,
'length': 190,
'status': 'hidden'
}
self.date_origin_visit1 = datetime.datetime(
2015, 1, 1, 22, 0, 0,
tzinfo=datetime.timezone.utc)
self.origin_visit1 = {
'date': self.date_origin_visit1,
'origin': 1,
'visit': 1
}
@patch('swh.web.ui.service.backend')
@istest
def lookup_multiple_hashes_ball_missing(self, mock_backend):
# given
mock_backend.content_missing_per_sha1 = MagicMock(return_value=[])
# when
actual_lookup = service.lookup_multiple_hashes(
[{'filename': 'a',
'sha1': '456caf10e9535160d90e874b45aa426de762f19f'},
{'filename': 'b',
'sha1': '745bab676c8f3cec8016e0c39ea61cf57e518865'}])
# then
self.assertEquals(actual_lookup, [
{'filename': 'a',
'sha1': '456caf10e9535160d90e874b45aa426de762f19f',
'found': True},
{'filename': 'b',
'sha1': '745bab676c8f3cec8016e0c39ea61cf57e518865',
'found': True}
])
@patch('swh.web.ui.service.backend')
@istest
def lookup_multiple_hashes_some_missing(self, mock_backend):
# given
mock_backend.content_missing_per_sha1 = MagicMock(return_value=[
hex_to_hash('456caf10e9535160d90e874b45aa426de762f19f')
])
# when
actual_lookup = service.lookup_multiple_hashes(
[{'filename': 'a',
'sha1': '456caf10e9535160d90e874b45aa426de762f19f'},
{'filename': 'b',
'sha1': '745bab676c8f3cec8016e0c39ea61cf57e518865'}])
# then
self.assertEquals(actual_lookup, [
{'filename': 'a',
'sha1': '456caf10e9535160d90e874b45aa426de762f19f',
'found': False},
{'filename': 'b',
'sha1': '745bab676c8f3cec8016e0c39ea61cf57e518865',
'found': True}
])
@patch('swh.web.ui.service.backend')
@istest
def lookup_hash_does_not_exist(self, mock_backend):
# given
mock_backend.content_find = MagicMock(return_value=None)
# when
actual_lookup = service.lookup_hash(
'sha1_git:123caf10e9535160d90e874b45aa426de762f19f')
# then
self.assertEquals({'found': None,
'algo': 'sha1_git'}, actual_lookup)
# check the function has been called with parameters
mock_backend.content_find.assert_called_with(
'sha1_git',
hex_to_hash('123caf10e9535160d90e874b45aa426de762f19f'))
@patch('swh.web.ui.service.backend')
@istest
def lookup_hash_exist(self, mock_backend):
# given
stub_content = {
'sha1': hex_to_hash('456caf10e9535160d90e874b45aa426de762f19f')
}
mock_backend.content_find = MagicMock(return_value=stub_content)
# when
actual_lookup = service.lookup_hash(
'sha1:456caf10e9535160d90e874b45aa426de762f19f')
# then
self.assertEquals({'found': stub_content,
'algo': 'sha1'}, actual_lookup)
mock_backend.content_find.assert_called_with(
'sha1',
hex_to_hash('456caf10e9535160d90e874b45aa426de762f19f'),
)
@patch('swh.web.ui.service.backend')
@istest
def search_hash_does_not_exist(self, mock_backend):
# given
mock_backend.content_find = MagicMock(return_value=None)
# when
actual_lookup = service.search_hash(
'sha1_git:123caf10e9535160d90e874b45aa426de762f19f')
# then
self.assertEquals({'found': False}, actual_lookup)
# check the function has been called with parameters
mock_backend.content_find.assert_called_with(
'sha1_git',
hex_to_hash('123caf10e9535160d90e874b45aa426de762f19f'))
@patch('swh.web.ui.service.backend')
@istest
def search_hash_exist(self, mock_backend):
# given
stub_content = {
'sha1': hex_to_hash('456caf10e9535160d90e874b45aa426de762f19f')
}
mock_backend.content_find = MagicMock(return_value=stub_content)
# when
actual_lookup = service.search_hash(
'sha1:456caf10e9535160d90e874b45aa426de762f19f')
# then
self.assertEquals({'found': True}, actual_lookup)
mock_backend.content_find.assert_called_with(
'sha1',
hex_to_hash('456caf10e9535160d90e874b45aa426de762f19f'),
)
@patch('swh.web.ui.service.backend')
@istest
def lookup_content_ctags(self, mock_backend):
# given
mock_backend.content_ctags_get = MagicMock(
return_value=[{
'id': hex_to_hash(
'123caf10e9535160d90e874b45aa426de762f19f'),
'line': 100,
'name': 'hello',
'kind': 'function',
'tool_name': 'ctags',
'tool_version': 'some-version',
}])
expected_ctags = [{
'id': '123caf10e9535160d90e874b45aa426de762f19f',
'line': 100,
'name': 'hello',
'kind': 'function',
'tool_name': 'ctags',
'tool_version': 'some-version',
}]
# when
actual_ctags = list(service.lookup_content_ctags(
'sha1:123caf10e9535160d90e874b45aa426de762f19f'))
# then
self.assertEqual(actual_ctags, expected_ctags)
mock_backend.content_ctags_get.assert_called_with(
hex_to_hash('123caf10e9535160d90e874b45aa426de762f19f'))
@patch('swh.web.ui.service.backend')
@istest
def lookup_content_ctags_no_hash(self, mock_backend):
# given
mock_backend.content_find.return_value = None
mock_backend.content_ctags_get = MagicMock(
return_value=None)
# when
actual_ctags = list(service.lookup_content_ctags(
'sha1_git:123caf10e9535160d90e874b45aa426de762f19f'))
# then
self.assertEqual(actual_ctags, [])
mock_backend.content_find.assert_called_once_with(
'sha1_git', hex_to_hash(
'123caf10e9535160d90e874b45aa426de762f19f'))
@patch('swh.web.ui.service.backend')
@istest
def lookup_content_filetype(self, mock_backend):
# given
mock_backend.content_filetype_get = MagicMock(
return_value={
'id': hex_to_hash(
'123caf10e9535160d90e874b45aa426de762f19f'),
'mimetype': b'text/x-c++',
'encoding': b'us-ascii',
})
expected_filetype = {
'id': '123caf10e9535160d90e874b45aa426de762f19f',
'mimetype': 'text/x-c++',
'encoding': 'us-ascii',
}
# when
actual_filetype = service.lookup_content_filetype(
'sha1:123caf10e9535160d90e874b45aa426de762f19f')
# then
self.assertEqual(actual_filetype, expected_filetype)
mock_backend.content_filetype_get.assert_called_with(
hex_to_hash('123caf10e9535160d90e874b45aa426de762f19f'))
@patch('swh.web.ui.service.backend')
@istest
def lookup_content_filetype_2(self, mock_backend):
# given
mock_backend.content_find = MagicMock(
return_value={
'sha1': hex_to_hash(
'123caf10e9535160d90e874b45aa426de762f19f')
}
)
mock_backend.content_filetype_get = MagicMock(
return_value={
'id': hex_to_hash(
'123caf10e9535160d90e874b45aa426de762f19f'),
'mimetype': b'text/x-python',
'encoding': b'us-ascii',
}
)
expected_filetype = {
'id': '123caf10e9535160d90e874b45aa426de762f19f',
'mimetype': 'text/x-python',
'encoding': 'us-ascii',
}
# when
actual_filetype = service.lookup_content_filetype(
'sha1_git:456caf10e9535160d90e874b45aa426de762f19f')
# then
self.assertEqual(actual_filetype, expected_filetype)
mock_backend.content_find(
'sha1_git', hex_to_hash('456caf10e9535160d90e874b45aa426de762f19f')
)
mock_backend.content_filetype_get.assert_called_with(
hex_to_hash('123caf10e9535160d90e874b45aa426de762f19f'))
@patch('swh.web.ui.service.backend')
@istest
def lookup_content_language(self, mock_backend):
# given
mock_backend.content_language_get = MagicMock(
return_value={
'id': hex_to_hash(
'123caf10e9535160d90e874b45aa426de762f19f'),
'lang': 'python',
})
expected_language = {
'id': '123caf10e9535160d90e874b45aa426de762f19f',
'lang': 'python',
}
# when
actual_language = service.lookup_content_language(
'sha1:123caf10e9535160d90e874b45aa426de762f19f')
# then
self.assertEqual(actual_language, expected_language)
mock_backend.content_language_get.assert_called_with(
hex_to_hash('123caf10e9535160d90e874b45aa426de762f19f'))
@patch('swh.web.ui.service.backend')
@istest
def lookup_content_language_2(self, mock_backend):
# given
mock_backend.content_find = MagicMock(
return_value={
'sha1': hex_to_hash(
'123caf10e9535160d90e874b45aa426de762f19f')
}
)
mock_backend.content_language_get = MagicMock(
return_value={
'id': hex_to_hash(
'123caf10e9535160d90e874b45aa426de762f19f'),
'lang': 'haskell',
}
)
expected_language = {
'id': '123caf10e9535160d90e874b45aa426de762f19f',
'lang': 'haskell',
}
# when
actual_language = service.lookup_content_language(
'sha1_git:456caf10e9535160d90e874b45aa426de762f19f')
# then
self.assertEqual(actual_language, expected_language)
mock_backend.content_find(
'sha1_git', hex_to_hash('456caf10e9535160d90e874b45aa426de762f19f')
)
mock_backend.content_language_get.assert_called_with(
hex_to_hash('123caf10e9535160d90e874b45aa426de762f19f'))
@patch('swh.web.ui.service.backend')
@istest
def lookup_expression(self, mock_backend):
# given
mock_backend.content_ctags_search = MagicMock(
return_value=[{
'id': hex_to_hash(
'123caf10e9535160d90e874b45aa426de762f19f'),
'name': 'foobar',
'kind': 'variable',
'lang': 'C',
'line': 10
}])
expected_ctags = [{
'sha1': '123caf10e9535160d90e874b45aa426de762f19f',
'name': 'foobar',
'kind': 'variable',
'lang': 'C',
'line': 10
}]
# when
actual_ctags = list(service.lookup_expression(
'foobar', last_sha1='hash', per_page=10))
# then
self.assertEqual(actual_ctags, expected_ctags)
mock_backend.content_ctags_search.assert_called_with(
'foobar', 'hash', 10)
@patch('swh.web.ui.service.backend')
@istest
def lookup_expression_no_result(self, mock_backend):
# given
mock_backend.content_ctags_search = MagicMock(
return_value=[])
expected_ctags = []
# when
actual_ctags = list(service.lookup_expression(
'barfoo', last_sha1='hash', per_page=10))
# then
self.assertEqual(actual_ctags, expected_ctags)
mock_backend.content_ctags_search.assert_called_with(
'barfoo', 'hash', 10)
@patch('swh.web.ui.service.backend')
@istest
def lookup_content_license(self, mock_backend):
# given
mock_backend.content_license_get = MagicMock(
return_value={
'id': hex_to_hash(
'123caf10e9535160d90e874b45aa426de762f19f'),
'lang': 'python',
})
expected_license = {
'id': '123caf10e9535160d90e874b45aa426de762f19f',
'lang': 'python',
}
# when
actual_license = service.lookup_content_license(
'sha1:123caf10e9535160d90e874b45aa426de762f19f')
# then
self.assertEqual(actual_license, expected_license)
mock_backend.content_license_get.assert_called_with(
hex_to_hash('123caf10e9535160d90e874b45aa426de762f19f'))
@patch('swh.web.ui.service.backend')
@istest
def lookup_content_license_2(self, mock_backend):
# given
mock_backend.content_find = MagicMock(
return_value={
'sha1': hex_to_hash(
'123caf10e9535160d90e874b45aa426de762f19f')
}
)
mock_backend.content_license_get = MagicMock(
return_value={
'id': hex_to_hash(
'123caf10e9535160d90e874b45aa426de762f19f'),
'lang': 'haskell',
}
)
expected_license = {
'id': '123caf10e9535160d90e874b45aa426de762f19f',
'lang': 'haskell',
}
# when
actual_license = service.lookup_content_license(
'sha1_git:456caf10e9535160d90e874b45aa426de762f19f')
# then
self.assertEqual(actual_license, expected_license)
mock_backend.content_find(
'sha1_git', hex_to_hash('456caf10e9535160d90e874b45aa426de762f19f')
)
mock_backend.content_license_get.assert_called_with(
hex_to_hash('123caf10e9535160d90e874b45aa426de762f19f'))
@patch('swh.web.ui.service.backend')
@istest
def lookup_content_provenance(self, mock_backend):
# given
mock_backend.content_find_provenance = MagicMock(
return_value=(p for p in [{
'content': hex_to_hash(
'123caf10e9535160d90e874b45aa426de762f19f'),
'revision': hex_to_hash(
'456caf10e9535160d90e874b45aa426de762f19f'),
'origin': 100,
'visit': 1,
'path': b'octavio-3.4.0/octave.html/doc_002dS_005fISREG.html'
}]))
expected_provenances = [{
'content': '123caf10e9535160d90e874b45aa426de762f19f',
'revision': '456caf10e9535160d90e874b45aa426de762f19f',
'origin': 100,
'visit': 1,
'path': 'octavio-3.4.0/octave.html/doc_002dS_005fISREG.html'
}]
# when
actual_provenances = service.lookup_content_provenance(
'sha1_git:123caf10e9535160d90e874b45aa426de762f19f')
# then
self.assertEqual(list(actual_provenances), expected_provenances)
mock_backend.content_find_provenance.assert_called_with(
'sha1_git',
hex_to_hash('123caf10e9535160d90e874b45aa426de762f19f'))
@patch('swh.web.ui.service.backend')
@istest
def lookup_content_provenance_not_found(self, mock_backend):
# given
mock_backend.content_find_provenance = MagicMock(return_value=None)
# when
actual_provenances = service.lookup_content_provenance(
'sha1_git:456caf10e9535160d90e874b45aa426de762f19f')
# then
self.assertIsNone(actual_provenances)
mock_backend.content_find_provenance.assert_called_with(
'sha1_git',
hex_to_hash('456caf10e9535160d90e874b45aa426de762f19f'))
@patch('swh.web.ui.service.backend')
@istest
def stat_counters(self, mock_backend):
# given
input_stats = {
"content": 1770830,
"directory": 211683,
"directory_entry_dir": 209167,
"directory_entry_file": 1807094,
"directory_entry_rev": 0,
"entity": 0,
"entity_history": 0,
"occurrence": 0,
"occurrence_history": 19600,
"origin": 1096,
"person": 0,
"release": 8584,
"revision": 7792,
"revision_history": 0,
"skipped_content": 0
}
mock_backend.stat_counters = MagicMock(return_value=input_stats)
# when
actual_stats = service.stat_counters()
# then
expected_stats = input_stats
self.assertEqual(actual_stats, expected_stats)
mock_backend.stat_counters.assert_called_with()
@patch('swh.web.ui.service.backend')
@istest
def lookup_origin_visits(self, mock_backend):
# given
date_origin_visit2 = datetime.datetime(
2013, 7, 1, 20, 0, 0,
tzinfo=datetime.timezone.utc)
date_origin_visit3 = datetime.datetime(
2015, 1, 1, 21, 0, 0,
tzinfo=datetime.timezone.utc)
stub_result = [self.origin_visit1, {
'date': date_origin_visit2,
'origin': 1,
'visit': 2,
'target': hex_to_hash('65a55bbdf3629f916219feb3dcc7393ded1bc8db'),
'branch': b'master',
'target_type': 'release',
'metadata': None,
}, {
'date': date_origin_visit3,
'origin': 1,
'visit': 3
}]
mock_backend.lookup_origin_visits.return_value = stub_result
# when
expected_origin_visits = [{
- 'date': self.origin_visit1['date'].timestamp(),
+ 'date': self.origin_visit1['date'].isoformat(),
'origin': self.origin_visit1['origin'],
'visit': self.origin_visit1['visit']
}, {
- 'date': date_origin_visit2.timestamp(),
+ 'date': date_origin_visit2.isoformat(),
'origin': 1,
'visit': 2,
'target': '65a55bbdf3629f916219feb3dcc7393ded1bc8db',
'branch': 'master',
'target_type': 'release',
'metadata': {},
}, {
- 'date': date_origin_visit3.timestamp(),
+ 'date': date_origin_visit3.isoformat(),
'origin': 1,
'visit': 3
}]
actual_origin_visits = service.lookup_origin_visits(6)
# then
self.assertEqual(list(actual_origin_visits), expected_origin_visits)
mock_backend.lookup_origin_visits.assert_called_once_with(
6, last_visit=None, limit=10)
@patch('swh.web.ui.service.backend')
@istest
def lookup_origin_visit(self, mock_backend):
# given
stub_result = self.origin_visit1
mock_backend.lookup_origin_visit.return_value = stub_result
expected_origin_visit = {
- 'date': self.origin_visit1['date'].timestamp(),
+ 'date': self.origin_visit1['date'].isoformat(),
'origin': self.origin_visit1['origin'],
'visit': self.origin_visit1['visit']
}
# when
actual_origin_visit = service.lookup_origin_visit(1, 1)
# then
self.assertEqual(actual_origin_visit, expected_origin_visit)
mock_backend.lookup_origin_visit.assert_called_once_with(1, 1)
@patch('swh.web.ui.service.backend')
@istest
def lookup_origin(self, mock_backend):
# given
mock_backend.origin_get = MagicMock(return_value={
'id': 'origin-id',
'lister': 'uuid-lister',
'project': 'uuid-project',
'url': 'ftp://some/url/to/origin',
'type': 'ftp'})
# when
actual_origin = service.lookup_origin({'id': 'origin-id'})
# then
self.assertEqual(actual_origin, {'id': 'origin-id',
'lister': 'uuid-lister',
'project': 'uuid-project',
'url': 'ftp://some/url/to/origin',
'type': 'ftp'})
mock_backend.origin_get.assert_called_with({'id': 'origin-id'})
@patch('swh.web.ui.service.backend')
@istest
def lookup_release_ko_id_checksum_not_ok_because_not_a_sha1(self,
mock_backend):
# given
mock_backend.release_get = MagicMock()
with self.assertRaises(BadInputExc) as cm:
# when
service.lookup_release('not-a-sha1')
self.assertIn('invalid checksum', cm.exception.args[0])
mock_backend.release_get.called = False
@patch('swh.web.ui.service.backend')
@istest
def lookup_release_ko_id_checksum_ok_but_not_a_sha1(self, mock_backend):
# given
mock_backend.release_get = MagicMock()
# when
with self.assertRaises(BadInputExc) as cm:
service.lookup_release(
'13c1d34d138ec13b5ebad226dc2528dc7506c956e4646f62d4daf5'
'1aea892abe')
self.assertIn('sha1_git supported', cm.exception.args[0])
mock_backend.release_get.called = False
@patch('swh.web.ui.service.backend')
@istest
def lookup_directory_with_path_not_found(self, mock_backend):
# given
mock_backend.lookup_directory_with_path = MagicMock(return_value=None)
sha1_git = '65a55bbdf3629f916219feb3dcc7393ded1bc8db'
# when
actual_directory = mock_backend.lookup_directory_with_path(
sha1_git, 'some/path/here')
self.assertIsNone(actual_directory)
@patch('swh.web.ui.service.backend')
@istest
def lookup_directory_with_path_found(self, mock_backend):
# given
sha1_git = '65a55bbdf3629f916219feb3dcc7393ded1bc8db'
entry = {'id': 'dir-id',
'type': 'dir',
'name': 'some/path/foo'}
mock_backend.lookup_directory_with_path = MagicMock(return_value=entry)
# when
actual_directory = mock_backend.lookup_directory_with_path(
sha1_git, 'some/path/here')
self.assertEqual(entry, actual_directory)
@patch('swh.web.ui.service.backend')
@istest
def lookup_release(self, mock_backend):
# given
mock_backend.release_get = MagicMock(return_value={
'id': hex_to_hash('65a55bbdf3629f916219feb3dcc7393ded1bc8db'),
'target': None,
'date': {
'timestamp': datetime.datetime(
2015, 1, 1, 22, 0, 0,
tzinfo=datetime.timezone.utc).timestamp(),
'offset': 0,
'negative_utc': True,
},
'name': b'v0.0.1',
'message': b'synthetic release',
'synthetic': True,
})
# when
actual_release = service.lookup_release(
'65a55bbdf3629f916219feb3dcc7393ded1bc8db')
# then
self.assertEqual(actual_release, {
'id': '65a55bbdf3629f916219feb3dcc7393ded1bc8db',
'target': None,
'date': '2015-01-01T22:00:00-00:00',
'name': 'v0.0.1',
'message': 'synthetic release',
'synthetic': True,
})
mock_backend.release_get.assert_called_with(
hex_to_hash('65a55bbdf3629f916219feb3dcc7393ded1bc8db'))
@istest
def lookup_revision_with_context_ko_not_a_sha1_1(self):
# given
sha1_git = '13c1d34d138ec13b5ebad226dc2528dc7506c956e4646f62d4' \
'daf51aea892abe'
sha1_git_root = '65a55bbdf3629f916219feb3dcc7393ded1bc8db'
# when
with self.assertRaises(BadInputExc) as cm:
service.lookup_revision_with_context(sha1_git_root, sha1_git)
self.assertIn('Only sha1_git is supported', cm.exception.args[0])
@istest
def lookup_revision_with_context_ko_not_a_sha1_2(self):
# given
sha1_git_root = '65a55bbdf3629f916219feb3dcc7393ded1bc8db'
sha1_git = '13c1d34d138ec13b5ebad226dc2528dc7506c956e4646f6' \
'2d4daf51aea892abe'
# when
with self.assertRaises(BadInputExc) as cm:
service.lookup_revision_with_context(sha1_git_root, sha1_git)
self.assertIn('Only sha1_git is supported', cm.exception.args[0])
@patch('swh.web.ui.service.backend')
@istest
def lookup_revision_with_context_ko_sha1_git_does_not_exist(
self,
mock_backend):
# given
sha1_git_root = '65a55bbdf3629f916219feb3dcc7393ded1bc8db'
sha1_git = '777777bdf3629f916219feb3dcc7393ded1bc8db'
sha1_git_bin = hex_to_hash(sha1_git)
mock_backend.revision_get.return_value = None
# when
with self.assertRaises(NotFoundExc) as cm:
service.lookup_revision_with_context(sha1_git_root, sha1_git)
self.assertIn('Revision 777777bdf3629f916219feb3dcc7393ded1bc8db'
' not found', cm.exception.args[0])
mock_backend.revision_get.assert_called_once_with(
sha1_git_bin)
@patch('swh.web.ui.service.backend')
@istest
def lookup_revision_with_context_ko_root_sha1_git_does_not_exist(
self,
mock_backend):
# given
sha1_git_root = '65a55bbdf3629f916219feb3dcc7393ded1bc8db'
sha1_git = '777777bdf3629f916219feb3dcc7393ded1bc8db'
sha1_git_root_bin = hex_to_hash(sha1_git_root)
sha1_git_bin = hex_to_hash(sha1_git)
mock_backend.revision_get.side_effect = ['foo', None]
# when
with self.assertRaises(NotFoundExc) as cm:
service.lookup_revision_with_context(sha1_git_root, sha1_git)
self.assertIn('Revision 65a55bbdf3629f916219feb3dcc7393ded1bc8db'
' not found', cm.exception.args[0])
mock_backend.revision_get.assert_has_calls([call(sha1_git_bin),
call(sha1_git_root_bin)])
@patch('swh.web.ui.service.backend')
@patch('swh.web.ui.service.query')
@istest
def lookup_revision_with_context(self, mock_query, mock_backend):
# given
sha1_git_root = '666'
sha1_git = '883'
sha1_git_root_bin = b'666'
sha1_git_bin = b'883'
sha1_git_root_dict = {
'id': sha1_git_root_bin,
'parents': [b'999'],
}
sha1_git_dict = {
'id': sha1_git_bin,
'parents': [],
'directory': b'278',
}
stub_revisions = [
sha1_git_root_dict,
{
'id': b'999',
'parents': [b'777', b'883', b'888'],
},
{
'id': b'777',
'parents': [b'883'],
},
sha1_git_dict,
{
'id': b'888',
'parents': [b'889'],
},
{
'id': b'889',
'parents': [],
},
]
# inputs ok
mock_query.parse_hash_with_algorithms_or_throws.side_effect = [
('sha1', sha1_git_bin),
('sha1', sha1_git_root_bin)
]
# lookup revision first 883, then 666 (both exists)
mock_backend.revision_get.side_effect = [
sha1_git_dict,
sha1_git_root_dict
]
mock_backend.revision_log = MagicMock(
return_value=stub_revisions)
# when
actual_revision = service.lookup_revision_with_context(
sha1_git_root,
sha1_git)
# then
self.assertEquals(actual_revision, {
'id': hash_to_hex(sha1_git_bin),
'parents': [],
'children': [hash_to_hex(b'999'), hash_to_hex(b'777')],
'directory': hash_to_hex(b'278'),
'merge': False
})
mock_query.parse_hash_with_algorithms_or_throws.assert_has_calls(
[call(sha1_git, ['sha1'], 'Only sha1_git is supported.'),
call(sha1_git_root, ['sha1'], 'Only sha1_git is supported.')])
mock_backend.revision_log.assert_called_with(
sha1_git_root_bin, 100)
@patch('swh.web.ui.service.backend')
@patch('swh.web.ui.service.query')
@istest
def lookup_revision_with_context_sha1_git_root_already_retrieved_as_dict(
self, mock_query, mock_backend):
# given
sha1_git = '883'
sha1_git_root_bin = b'666'
sha1_git_bin = b'883'
sha1_git_root_dict = {
'id': sha1_git_root_bin,
'parents': [b'999'],
}
sha1_git_dict = {
'id': sha1_git_bin,
'parents': [],
'directory': b'278',
}
stub_revisions = [
sha1_git_root_dict,
{
'id': b'999',
'parents': [b'777', b'883', b'888'],
},
{
'id': b'777',
'parents': [b'883'],
},
sha1_git_dict,
{
'id': b'888',
'parents': [b'889'],
},
{
'id': b'889',
'parents': [],
},
]
# inputs ok
mock_query.parse_hash_with_algorithms_or_throws.return_value = (
'sha1', sha1_git_bin)
# lookup only on sha1
mock_backend.revision_get.return_value = sha1_git_dict
mock_backend.revision_log.return_value = stub_revisions
# when
actual_revision = service.lookup_revision_with_context(
{'id': sha1_git_root_bin},
sha1_git)
# then
self.assertEquals(actual_revision, {
'id': hash_to_hex(sha1_git_bin),
'parents': [],
'children': [hash_to_hex(b'999'), hash_to_hex(b'777')],
'directory': hash_to_hex(b'278'),
'merge': False
})
mock_query.parse_hash_with_algorithms_or_throws.assert_called_once_with( # noqa
sha1_git, ['sha1'], 'Only sha1_git is supported.')
mock_backend.revision_get.assert_called_once_with(sha1_git_bin)
mock_backend.revision_log.assert_called_with(
sha1_git_root_bin, 100)
@patch('swh.web.ui.service.backend')
@patch('swh.web.ui.service.query')
@istest
def lookup_directory_with_revision_ko_revision_not_found(self,
mock_query,
mock_backend):
# given
mock_query.parse_hash_with_algorithms_or_throws.return_value = ('sha1',
b'123')
mock_backend.revision_get.return_value = None
# when
with self.assertRaises(NotFoundExc) as cm:
service.lookup_directory_with_revision('123')
self.assertIn('Revision 123 not found', cm.exception.args[0])
mock_query.parse_hash_with_algorithms_or_throws.assert_called_once_with
('123', ['sha1'], 'Only sha1_git is supported.')
mock_backend.revision_get.assert_called_once_with(b'123')
@patch('swh.web.ui.service.backend')
@patch('swh.web.ui.service.query')
@istest
def lookup_directory_with_revision_ko_revision_with_path_to_nowhere(
self,
mock_query,
mock_backend):
# given
mock_query.parse_hash_with_algorithms_or_throws.return_value = ('sha1',
b'123')
dir_id = b'dir-id-as-sha1'
mock_backend.revision_get.return_value = {
'directory': dir_id,
}
mock_backend.directory_entry_get_by_path.return_value = None
# when
with self.assertRaises(NotFoundExc) as cm:
service.lookup_directory_with_revision(
'123',
'path/to/something/unknown')
self.assertIn("Directory/File 'path/to/something/unknown' " +
"pointed to by revision 123 not found",
cm.exception.args[0])
mock_query.parse_hash_with_algorithms_or_throws.assert_called_once_with
('123', ['sha1'], 'Only sha1_git is supported.')
mock_backend.revision_get.assert_called_once_with(b'123')
mock_backend.directory_entry_get_by_path.assert_called_once_with(
b'dir-id-as-sha1', 'path/to/something/unknown')
@patch('swh.web.ui.service.backend')
@patch('swh.web.ui.service.query')
@istest
def lookup_directory_with_revision_ko_type_not_implemented(
self,
mock_query,
mock_backend):
# given
mock_query.parse_hash_with_algorithms_or_throws.return_value = ('sha1',
b'123')
dir_id = b'dir-id-as-sha1'
mock_backend.revision_get.return_value = {
'directory': dir_id,
}
mock_backend.directory_entry_get_by_path.return_value = {
'type': 'rev',
'name': b'some/path/to/rev',
'target': b'456'
}
stub_content = {
'id': b'12',
'type': 'file'
}
mock_backend.content_get.return_value = stub_content
# when
with self.assertRaises(NotImplementedError) as cm:
service.lookup_directory_with_revision(
'123',
'some/path/to/rev')
self.assertIn("Entity of type 'rev' not implemented.",
cm.exception.args[0])
# then
mock_query.parse_hash_with_algorithms_or_throws.assert_called_once_with
('123', ['sha1'], 'Only sha1_git is supported.')
mock_backend.revision_get.assert_called_once_with(b'123')
mock_backend.directory_entry_get_by_path.assert_called_once_with(
b'dir-id-as-sha1', 'some/path/to/rev')
@patch('swh.web.ui.service.backend')
@patch('swh.web.ui.service.query')
@istest
def lookup_directory_with_revision_revision_without_path(self,
mock_query,
mock_backend):
# given
mock_query.parse_hash_with_algorithms_or_throws.return_value = ('sha1',
b'123')
dir_id = b'dir-id-as-sha1'
mock_backend.revision_get.return_value = {
'directory': dir_id,
}
stub_dir_entries = [{
'id': b'123',
'type': 'dir'
}, {
'id': b'456',
'type': 'file'
}]
mock_backend.directory_ls.return_value = stub_dir_entries
# when
actual_directory_entries = service.lookup_directory_with_revision(
'123')
self.assertEqual(actual_directory_entries['type'], 'dir')
self.assertEqual(list(actual_directory_entries['content']),
stub_dir_entries)
mock_query.parse_hash_with_algorithms_or_throws.assert_called_once_with
('123', ['sha1'], 'Only sha1_git is supported.')
mock_backend.revision_get.assert_called_once_with(b'123')
mock_backend.directory_ls.assert_called_once_with(dir_id)
@patch('swh.web.ui.service.backend')
@patch('swh.web.ui.service.query')
@istest
def lookup_directory_with_revision_revision_with_path_to_dir(self,
mock_query,
mock_backend):
# given
mock_query.parse_hash_with_algorithms_or_throws.return_value = ('sha1',
b'123')
dir_id = b'dir-id-as-sha1'
mock_backend.revision_get.return_value = {
'directory': dir_id,
}
stub_dir_entries = [{
'id': b'12',
'type': 'dir'
}, {
'id': b'34',
'type': 'file'
}]
mock_backend.directory_entry_get_by_path.return_value = {
'type': 'dir',
'name': b'some/path',
'target': b'456'
}
mock_backend.directory_ls.return_value = stub_dir_entries
# when
actual_directory_entries = service.lookup_directory_with_revision(
'123',
'some/path')
self.assertEqual(actual_directory_entries['type'], 'dir')
self.assertEqual(actual_directory_entries['revision'], '123')
self.assertEqual(actual_directory_entries['path'], 'some/path')
self.assertEqual(list(actual_directory_entries['content']),
stub_dir_entries)
mock_query.parse_hash_with_algorithms_or_throws.assert_called_once_with
('123', ['sha1'], 'Only sha1_git is supported.')
mock_backend.revision_get.assert_called_once_with(b'123')
mock_backend.directory_entry_get_by_path.assert_called_once_with(
dir_id,
'some/path')
mock_backend.directory_ls.assert_called_once_with(b'456')
@patch('swh.web.ui.service.backend')
@patch('swh.web.ui.service.query')
@istest
def lookup_directory_with_revision_revision_with_path_to_file_without_data(
self,
mock_query,
mock_backend):
# given
mock_query.parse_hash_with_algorithms_or_throws.return_value = ('sha1',
b'123')
dir_id = b'dir-id-as-sha1'
mock_backend.revision_get.return_value = {
'directory': dir_id,
}
mock_backend.directory_entry_get_by_path.return_value = {
'type': 'file',
'name': b'some/path/to/file',
'target': b'789'
}
stub_content = {
'status': 'visible',
}
mock_backend.content_find.return_value = stub_content
# when
actual_content = service.lookup_directory_with_revision(
'123',
'some/path/to/file')
# then
self.assertEqual(actual_content, {'type': 'file',
'revision': '123',
'path': 'some/path/to/file',
'content': stub_content})
mock_query.parse_hash_with_algorithms_or_throws.assert_called_once_with
('123', ['sha1'], 'Only sha1_git is supported.')
mock_backend.revision_get.assert_called_once_with(b'123')
mock_backend.directory_entry_get_by_path.assert_called_once_with(
b'dir-id-as-sha1', 'some/path/to/file')
mock_backend.content_find.assert_called_once_with('sha1_git', b'789')
@patch('swh.web.ui.service.backend')
@patch('swh.web.ui.service.query')
@istest
def lookup_directory_with_revision_revision_with_path_to_file_with_data(
self,
mock_query,
mock_backend):
# given
mock_query.parse_hash_with_algorithms_or_throws.return_value = ('sha1',
b'123')
dir_id = b'dir-id-as-sha1'
mock_backend.revision_get.return_value = {
'directory': dir_id,
}
mock_backend.directory_entry_get_by_path.return_value = {
'type': 'file',
'name': b'some/path/to/file',
'target': b'789'
}
stub_content = {
'status': 'visible',
'sha1': b'content-sha1'
}
mock_backend.content_find.return_value = stub_content
mock_backend.content_get.return_value = {
'sha1': b'content-sha1',
'data': b'some raw data'
}
expected_content = {
'status': 'visible',
'sha1': hash_to_hex(b'content-sha1'),
'data': b'some raw data'
}
# when
actual_content = service.lookup_directory_with_revision(
'123',
'some/path/to/file',
with_data=True)
# then
self.assertEqual(actual_content, {'type': 'file',
'revision': '123',
'path': 'some/path/to/file',
'content': expected_content})
mock_query.parse_hash_with_algorithms_or_throws.assert_called_once_with
('123', ['sha1'], 'Only sha1_git is supported.')
mock_backend.revision_get.assert_called_once_with(b'123')
mock_backend.directory_entry_get_by_path.assert_called_once_with(
b'dir-id-as-sha1', 'some/path/to/file')
mock_backend.content_find.assert_called_once_with('sha1_git', b'789')
mock_backend.content_get.assert_called_once_with(b'content-sha1')
@patch('swh.web.ui.service.backend')
@istest
def lookup_revision(self, mock_backend):
# given
mock_backend.revision_get = MagicMock(
return_value=self.SAMPLE_REVISION_RAW)
# when
actual_revision = service.lookup_revision(
self.SHA1_SAMPLE)
# then
self.assertEqual(actual_revision, self.SAMPLE_REVISION)
mock_backend.revision_get.assert_called_with(
self.SHA1_SAMPLE_BIN)
@patch('swh.web.ui.service.backend')
@istest
def lookup_revision_invalid_msg(self, mock_backend):
# given
stub_rev = self.SAMPLE_REVISION_RAW
stub_rev['message'] = b'elegant fix for bug \xff'
expected_revision = self.SAMPLE_REVISION
expected_revision['message'] = None
expected_revision['message_decoding_failed'] = True
mock_backend.revision_get = MagicMock(return_value=stub_rev)
# when
actual_revision = service.lookup_revision(
self.SHA1_SAMPLE)
# then
self.assertEqual(actual_revision, expected_revision)
mock_backend.revision_get.assert_called_with(
self.SHA1_SAMPLE_BIN)
@patch('swh.web.ui.service.backend')
@istest
def lookup_revision_msg_ok(self, mock_backend):
# given
mock_backend.revision_get.return_value = self.SAMPLE_REVISION_RAW
# when
rv = service.lookup_revision_message(
self.SHA1_SAMPLE)
# then
self.assertEquals(rv, {'message': self.SAMPLE_MESSAGE_BIN})
mock_backend.revision_get.assert_called_with(
self.SHA1_SAMPLE_BIN)
@patch('swh.web.ui.service.backend')
@istest
def lookup_revision_msg_absent(self, mock_backend):
# given
stub_revision = self.SAMPLE_REVISION_RAW
del stub_revision['message']
mock_backend.revision_get.return_value = stub_revision
# when
with self.assertRaises(NotFoundExc) as cm:
service.lookup_revision_message(
self.SHA1_SAMPLE)
# then
mock_backend.revision_get.assert_called_with(
self.SHA1_SAMPLE_BIN)
self.assertEqual(cm.exception.args[0], 'No message for revision '
'with sha1_git '
'18d8be353ed3480476f032475e7c233eff7371d5.')
@patch('swh.web.ui.service.backend')
@istest
def lookup_revision_msg_norev(self, mock_backend):
# given
mock_backend.revision_get.return_value = None
# when
with self.assertRaises(NotFoundExc) as cm:
service.lookup_revision_message(
self.SHA1_SAMPLE)
# then
mock_backend.revision_get.assert_called_with(
self.SHA1_SAMPLE_BIN)
self.assertEqual(cm.exception.args[0], 'Revision with sha1_git '
'18d8be353ed3480476f032475e7c233eff7371d5 '
'not found.')
@patch('swh.web.ui.service.backend')
@istest
def lookup_revision_multiple(self, mock_backend):
# given
sha1 = self.SHA1_SAMPLE
sha1_other = 'adc83b19e793491b1c6ea0fd8b46cd9f32e592fc'
stub_revisions = [
self.SAMPLE_REVISION_RAW,
{
'id': hex_to_hash(sha1_other),
'directory': 'abcdbe353ed3480476f032475e7c233eff7371d5',
'author': {
'name': b'name',
'email': b'name@surname.org',
},
'committer': {
'name': b'name',
'email': b'name@surname.org',
},
'message': b'ugly fix for bug 42',
'date': {
'timestamp': datetime.datetime(
2000, 1, 12, 5, 23, 54,
tzinfo=datetime.timezone.utc).timestamp(),
'offset': 0,
'negative_utc': False
},
'date_offset': 0,
'committer_date': {
'timestamp': datetime.datetime(
2000, 1, 12, 5, 23, 54,
tzinfo=datetime.timezone.utc).timestamp(),
'offset': 0,
'negative_utc': False
},
'committer_date_offset': 0,
'synthetic': False,
'type': 'git',
'parents': [],
'metadata': [],
}
]
mock_backend.revision_get_multiple.return_value = stub_revisions
# when
actual_revisions = service.lookup_revision_multiple(
[sha1, sha1_other])
# then
self.assertEqual(list(actual_revisions), [
self.SAMPLE_REVISION,
{
'id': sha1_other,
'directory': 'abcdbe353ed3480476f032475e7c233eff7371d5',
'author': {
'name': 'name',
'email': 'name@surname.org',
},
'committer': {
'name': 'name',
'email': 'name@surname.org',
},
'message': 'ugly fix for bug 42',
'date': '2000-01-12T05:23:54+00:00',
'date_offset': 0,
'committer_date': '2000-01-12T05:23:54+00:00',
'committer_date_offset': 0,
'synthetic': False,
'type': 'git',
'parents': [],
'metadata': {},
'merge': False
}
])
self.assertEqual(
list(mock_backend.revision_get_multiple.call_args[0][0]),
[hex_to_hash(sha1),
hex_to_hash(sha1_other)])
@patch('swh.web.ui.service.backend')
@istest
def lookup_revision_multiple_none_found(self, mock_backend):
# given
sha1_bin = self.SHA1_SAMPLE
sha1_other = 'adc83b19e793491b1c6ea0fd8b46cd9f32e592fc'
mock_backend.revision_get_multiple.return_value = []
# then
actual_revisions = service.lookup_revision_multiple(
[sha1_bin, sha1_other])
self.assertEqual(list(actual_revisions), [])
self.assertEqual(
list(mock_backend.revision_get_multiple.call_args[0][0]),
[hex_to_hash(self.SHA1_SAMPLE),
hex_to_hash(sha1_other)])
@patch('swh.web.ui.service.backend')
@istest
def lookup_revision_log(self, mock_backend):
# given
stub_revision_log = [self.SAMPLE_REVISION_RAW]
mock_backend.revision_log = MagicMock(return_value=stub_revision_log)
# when
actual_revision = service.lookup_revision_log(
'abcdbe353ed3480476f032475e7c233eff7371d5',
limit=25)
# then
self.assertEqual(list(actual_revision), [self.SAMPLE_REVISION])
mock_backend.revision_log.assert_called_with(
hex_to_hash('abcdbe353ed3480476f032475e7c233eff7371d5'), 25)
@patch('swh.web.ui.service.backend')
@istest
def lookup_revision_log_by(self, mock_backend):
# given
stub_revision_log = [self.SAMPLE_REVISION_RAW]
mock_backend.revision_log_by = MagicMock(
return_value=stub_revision_log)
# when
actual_log = service.lookup_revision_log_by(
1, 'refs/heads/master', None, limit=100)
# then
self.assertEqual(list(actual_log), [self.SAMPLE_REVISION])
mock_backend.revision_log_by.assert_called_with(
1, 'refs/heads/master', None, 100)
@patch('swh.web.ui.service.backend')
@istest
def lookup_revision_log_by_nolog(self, mock_backend):
# given
mock_backend.revision_log_by = MagicMock(return_value=None)
# when
res = service.lookup_revision_log_by(
1, 'refs/heads/master', None, limit=100)
# then
self.assertEquals(res, None)
mock_backend.revision_log_by.assert_called_with(
1, 'refs/heads/master', None, 100)
@patch('swh.web.ui.service.backend')
@istest
def lookup_content_raw_not_found(self, mock_backend):
# given
mock_backend.content_find = MagicMock(return_value=None)
# when
actual_content = service.lookup_content_raw(
'sha1:18d8be353ed3480476f032475e7c233eff7371d5')
# then
self.assertIsNone(actual_content)
mock_backend.content_find.assert_called_with(
'sha1', hex_to_hash(self.SHA1_SAMPLE))
@patch('swh.web.ui.service.backend')
@istest
def lookup_content_raw(self, mock_backend):
# given
mock_backend.content_find = MagicMock(return_value={
'sha1': self.SHA1_SAMPLE,
})
mock_backend.content_get = MagicMock(return_value={
'data': b'binary data'})
# when
actual_content = service.lookup_content_raw(
'sha256:%s' % self.SHA256_SAMPLE)
# then
self.assertEquals(actual_content, {'data': b'binary data'})
mock_backend.content_find.assert_called_once_with(
'sha256', self.SHA256_SAMPLE_BIN)
mock_backend.content_get.assert_called_once_with(
self.SHA1_SAMPLE)
@patch('swh.web.ui.service.backend')
@istest
def lookup_content_not_found(self, mock_backend):
# given
mock_backend.content_find = MagicMock(return_value=None)
# when
actual_content = service.lookup_content(
'sha1:%s' % self.SHA1_SAMPLE)
# then
self.assertIsNone(actual_content)
mock_backend.content_find.assert_called_with(
'sha1', self.SHA1_SAMPLE_BIN)
@patch('swh.web.ui.service.backend')
@istest
def lookup_content_with_sha1(self, mock_backend):
# given
mock_backend.content_find = MagicMock(
return_value=self.SAMPLE_CONTENT_RAW)
# when
actual_content = service.lookup_content(
'sha1:%s' % self.SHA1_SAMPLE)
# then
self.assertEqual(actual_content, self.SAMPLE_CONTENT)
mock_backend.content_find.assert_called_with(
'sha1', hex_to_hash(self.SHA1_SAMPLE))
@patch('swh.web.ui.service.backend')
@istest
def lookup_content_with_sha256(self, mock_backend):
# given
stub_content = self.SAMPLE_CONTENT_RAW
stub_content['status'] = 'visible'
expected_content = self.SAMPLE_CONTENT
expected_content['status'] = 'visible'
mock_backend.content_find = MagicMock(
return_value=stub_content)
# when
actual_content = service.lookup_content(
'sha256:%s' % self.SHA256_SAMPLE)
# then
self.assertEqual(actual_content, expected_content)
mock_backend.content_find.assert_called_with(
'sha256', self.SHA256_SAMPLE_BIN)
@patch('swh.web.ui.service.backend')
@istest
def lookup_person(self, mock_backend):
# given
mock_backend.person_get = MagicMock(return_value={
'id': 'person_id',
'name': b'some_name',
'email': b'some-email',
})
# when
actual_person = service.lookup_person('person_id')
# then
self.assertEqual(actual_person, {
'id': 'person_id',
'name': 'some_name',
'email': 'some-email',
})
mock_backend.person_get.assert_called_with('person_id')
@patch('swh.web.ui.service.backend')
@istest
def lookup_directory_bad_checksum(self, mock_backend):
# given
mock_backend.directory_ls = MagicMock()
# when
with self.assertRaises(BadInputExc):
service.lookup_directory('directory_id')
# then
mock_backend.directory_ls.called = False
@patch('swh.web.ui.service.backend')
@patch('swh.web.ui.service.query')
@istest
def lookup_directory_not_found(self, mock_query, mock_backend):
# given
mock_query.parse_hash_with_algorithms_or_throws.return_value = (
'sha1',
'directory-id-bin')
mock_backend.directory_get.return_value = None
# when
actual_dir = service.lookup_directory('directory_id')
# then
self.assertIsNone(actual_dir)
mock_query.parse_hash_with_algorithms_or_throws.assert_called_with(
'directory_id', ['sha1'], 'Only sha1_git is supported.')
mock_backend.directory_get.assert_called_with('directory-id-bin')
mock_backend.directory_ls.called = False
@patch('swh.web.ui.service.backend')
@patch('swh.web.ui.service.query')
@istest
def lookup_directory(self, mock_query, mock_backend):
mock_query.parse_hash_with_algorithms_or_throws.return_value = (
'sha1',
'directory-sha1-bin')
# something that exists is all that matters here
mock_backend.directory_get.return_value = {'id': b'directory-sha1-bin'}
# given
stub_dir_entries = [{
'sha1': self.SHA1_SAMPLE_BIN,
'sha256': self.SHA256_SAMPLE_BIN,
'sha1_git': self.SHA1GIT_SAMPLE_BIN,
'target': hex_to_hash('40e71b8614fcd89ccd17ca2b1d9e66'
'c5b00a6d03'),
'dir_id': self.DIRECTORY_ID_BIN,
'name': b'bob',
'type': 10,
}]
expected_dir_entries = [{
'sha1': self.SHA1_SAMPLE,
'sha256': self.SHA256_SAMPLE,
'sha1_git': self.SHA1GIT_SAMPLE,
'target': '40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03',
'dir_id': self.DIRECTORY_ID,
'name': 'bob',
'type': 10,
}]
mock_backend.directory_ls.return_value = stub_dir_entries
# when
actual_directory_ls = list(service.lookup_directory(
'directory-sha1'))
# then
self.assertEqual(actual_directory_ls, expected_dir_entries)
mock_query.parse_hash_with_algorithms_or_throws.assert_called_with(
'directory-sha1', ['sha1'], 'Only sha1_git is supported.')
mock_backend.directory_ls.assert_called_with(
'directory-sha1-bin')
@patch('swh.web.ui.service.backend')
@istest
def lookup_revision_by_nothing_found(self, mock_backend):
# given
mock_backend.revision_get_by.return_value = None
# when
actual_revisions = service.lookup_revision_by(1)
# then
self.assertIsNone(actual_revisions)
mock_backend.revision_get_by(1, 'master', None)
@patch('swh.web.ui.service.backend')
@istest
def lookup_revision_by(self, mock_backend):
# given
stub_rev = self.SAMPLE_REVISION_RAW
expected_rev = self.SAMPLE_REVISION
mock_backend.revision_get_by.return_value = stub_rev
# when
actual_revision = service.lookup_revision_by(10, 'master2', 'some-ts')
# then
self.assertEquals(actual_revision, expected_rev)
mock_backend.revision_get_by(1, 'master2', 'some-ts')
@patch('swh.web.ui.service.backend')
@istest
def lookup_revision_by_nomerge(self, mock_backend):
# given
stub_rev = self.SAMPLE_REVISION_RAW
stub_rev['parents'] = [
hex_to_hash('adc83b19e793491b1c6ea0fd8b46cd9f32e592fc')]
expected_rev = self.SAMPLE_REVISION
expected_rev['parents'] = ['adc83b19e793491b1c6ea0fd8b46cd9f32e592fc']
mock_backend.revision_get_by.return_value = stub_rev
# when
actual_revision = service.lookup_revision_by(10, 'master2', 'some-ts')
# then
self.assertEquals(actual_revision, expected_rev)
mock_backend.revision_get_by(1, 'master2', 'some-ts')
@patch('swh.web.ui.service.backend')
@istest
def lookup_revision_by_merge(self, mock_backend):
# given
stub_rev = self.SAMPLE_REVISION_RAW
stub_rev['parents'] = [
hex_to_hash('adc83b19e793491b1c6ea0fd8b46cd9f32e592fc'),
hex_to_hash('ffff3b19e793491b1c6db0fd8b46cd9f32e592fc')
]
expected_rev = self.SAMPLE_REVISION
expected_rev['parents'] = [
'adc83b19e793491b1c6ea0fd8b46cd9f32e592fc',
'ffff3b19e793491b1c6db0fd8b46cd9f32e592fc'
]
expected_rev['merge'] = True
mock_backend.revision_get_by.return_value = stub_rev
# when
actual_revision = service.lookup_revision_by(10, 'master2', 'some-ts')
# then
self.assertEquals(actual_revision, expected_rev)
mock_backend.revision_get_by(1, 'master2', 'some-ts')
@patch('swh.web.ui.service.backend')
@istest
def lookup_revision_with_context_by_ko(self, mock_backend):
# given
mock_backend.revision_get_by.return_value = None
# when
with self.assertRaises(NotFoundExc) as cm:
origin_id = 1
branch_name = 'master3'
ts = None
service.lookup_revision_with_context_by(origin_id, branch_name, ts,
'sha1')
# then
self.assertIn(
'Revision with (origin_id: %s, branch_name: %s'
', ts: %s) not found.' % (origin_id,
branch_name,
ts), cm.exception.args[0])
mock_backend.revision_get_by.assert_called_once_with(
origin_id, branch_name, ts)
@patch('swh.web.ui.service.lookup_revision_with_context')
@patch('swh.web.ui.service.backend')
@istest
def lookup_revision_with_context_by(self, mock_backend,
mock_lookup_revision_with_context):
# given
stub_root_rev = {'id': 'root-rev-id'}
mock_backend.revision_get_by.return_value = {'id': 'root-rev-id'}
stub_rev = {'id': 'rev-found'}
mock_lookup_revision_with_context.return_value = stub_rev
# when
origin_id = 1
branch_name = 'master3'
ts = None
sha1_git = 'sha1'
actual_root_rev, actual_rev = service.lookup_revision_with_context_by(
origin_id, branch_name, ts, sha1_git)
# then
self.assertEquals(actual_root_rev, stub_root_rev)
self.assertEquals(actual_rev, stub_rev)
mock_backend.revision_get_by.assert_called_once_with(
origin_id, branch_name, ts)
mock_lookup_revision_with_context.assert_called_once_with(
stub_root_rev, sha1_git, 100)
@patch('swh.web.ui.service.backend')
@patch('swh.web.ui.service.query')
@istest
def lookup_entity_by_uuid(self, mock_query, mock_backend):
# given
uuid_test = 'correct-uuid'
mock_query.parse_uuid4.return_value = uuid_test
stub_entities = [{'uuid': uuid_test}]
mock_backend.entity_get.return_value = stub_entities
# when
actual_entities = list(service.lookup_entity_by_uuid(uuid_test))
# then
self.assertEquals(actual_entities, stub_entities)
mock_query.parse_uuid4.assert_called_once_with(uuid_test)
mock_backend.entity_get.assert_called_once_with(uuid_test)
@istest
def lookup_revision_through_ko_not_implemented(self):
# then
with self.assertRaises(NotImplementedError):
service.lookup_revision_through({
'something-unknown': 10,
})
@patch('swh.web.ui.service.lookup_revision_with_context_by')
@istest
def lookup_revision_through_with_context_by(self, mock_lookup):
# given
stub_rev = {'id': 'rev'}
mock_lookup.return_value = stub_rev
# when
actual_revision = service.lookup_revision_through({
'origin_id': 1,
'branch_name': 'master',
'ts': None,
'sha1_git': 'sha1-git'
}, limit=1000)
# then
self.assertEquals(actual_revision, stub_rev)
mock_lookup.assert_called_once_with(
1, 'master', None, 'sha1-git', 1000)
@patch('swh.web.ui.service.lookup_revision_by')
@istest
def lookup_revision_through_with_revision_by(self, mock_lookup):
# given
stub_rev = {'id': 'rev'}
mock_lookup.return_value = stub_rev
# when
actual_revision = service.lookup_revision_through({
'origin_id': 2,
'branch_name': 'master2',
'ts': 'some-ts',
}, limit=10)
# then
self.assertEquals(actual_revision, stub_rev)
mock_lookup.assert_called_once_with(
2, 'master2', 'some-ts')
@patch('swh.web.ui.service.lookup_revision_with_context')
@istest
def lookup_revision_through_with_context(self, mock_lookup):
# given
stub_rev = {'id': 'rev'}
mock_lookup.return_value = stub_rev
# when
actual_revision = service.lookup_revision_through({
'sha1_git_root': 'some-sha1-root',
'sha1_git': 'some-sha1',
})
# then
self.assertEquals(actual_revision, stub_rev)
mock_lookup.assert_called_once_with(
'some-sha1-root', 'some-sha1', 100)
@patch('swh.web.ui.service.lookup_revision')
@istest
def lookup_revision_through_with_revision(self, mock_lookup):
# given
stub_rev = {'id': 'rev'}
mock_lookup.return_value = stub_rev
# when
actual_revision = service.lookup_revision_through({
'sha1_git': 'some-sha1',
})
# then
self.assertEquals(actual_revision, stub_rev)
mock_lookup.assert_called_once_with(
'some-sha1')
@patch('swh.web.ui.service.lookup_revision_through')
@istest
def lookup_directory_through_revision_ko_not_found(
self, mock_lookup_rev):
# given
mock_lookup_rev.return_value = None
# when
with self.assertRaises(NotFoundExc):
service.lookup_directory_through_revision(
{'id': 'rev'}, 'some/path', 100)
mock_lookup_rev.assert_called_once_with({'id': 'rev'}, 100)
@patch('swh.web.ui.service.lookup_revision_through')
@patch('swh.web.ui.service.lookup_directory_with_revision')
@istest
def lookup_directory_through_revision_ok_with_data(
self, mock_lookup_dir, mock_lookup_rev):
# given
mock_lookup_rev.return_value = {'id': 'rev-id'}
mock_lookup_dir.return_value = {'type': 'dir',
'content': []}
# when
rev_id, dir_result = service.lookup_directory_through_revision(
{'id': 'rev'}, 'some/path', 100)
# then
self.assertEquals(rev_id, 'rev-id')
self.assertEquals(dir_result, {'type': 'dir',
'content': []})
mock_lookup_rev.assert_called_once_with({'id': 'rev'}, 100)
mock_lookup_dir.assert_called_once_with('rev-id', 'some/path', False)
@patch('swh.web.ui.service.lookup_revision_through')
@patch('swh.web.ui.service.lookup_directory_with_revision')
@istest
def lookup_directory_through_revision_ok_with_content(
self, mock_lookup_dir, mock_lookup_rev):
# given
mock_lookup_rev.return_value = {'id': 'rev-id'}
stub_result = {'type': 'file',
'revision': 'rev-id',
'content': {'data': b'blah',
'sha1': 'sha1'}}
mock_lookup_dir.return_value = stub_result
# when
rev_id, dir_result = service.lookup_directory_through_revision(
{'id': 'rev'}, 'some/path', 10, with_data=True)
# then
self.assertEquals(rev_id, 'rev-id')
self.assertEquals(dir_result, stub_result)
mock_lookup_rev.assert_called_once_with({'id': 'rev'}, 10)
mock_lookup_dir.assert_called_once_with('rev-id', 'some/path', True)
diff --git a/swh/web/ui/views/browse.py b/swh/web/ui/views/browse.py
index 792ad932..ae13aa6f 100644
--- a/swh/web/ui/views/browse.py
+++ b/swh/web/ui/views/browse.py
@@ -1,1009 +1,1019 @@
-# Copyright (C) 2015-2016 The Software Heritage developers
+# Copyright (C) 2015-2017 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU Affero General Public License version 3, or any later version
# See top-level LICENSE file for more information
+import dateutil.parser
+
from encodings.aliases import aliases
from flask import render_template, request, url_for, redirect
from swh.core.hashutil import ALGORITHMS
from swh.core.utils import grouper
from .. import service, utils
from ..exc import BadInputExc, NotFoundExc
from ..main import app
from . import api
hash_filter_keys = ALGORITHMS
def api_lookup(api_fn, query):
"""Lookup api with api_fn function with parameter query.
Example:
filetype = api_lookup('api_content_filetype', 'sha1:blah')
if filetype:
content['mimetype'] = filetype['mimetype']
"""
try:
return api_fn(query)
except (NotFoundExc, BadInputExc):
return None
@app.route('/origin/search/')
def search_origin():
"""
Redirect request with GET params for an origin to our fragmented URI scheme
"""
if request.method == 'GET':
data = request.args
origin_id = data.get('origin_id')
if origin_id:
return redirect(url_for('browse_origin', origin_id=origin_id))
args = ['origin_type', 'origin_url']
values = {arg: data.get(arg) for arg in args if data.get(arg)}
if 'origin_type' in values and 'origin_url' in values:
return redirect(url_for('browse_origin', **values))
@app.route('/directory/search/')
def search_directory():
"""
Redirect request with GET params for a directory to our fragmented
URI scheme
"""
def url_for_filtered(endpoint, **kwargs):
"""Make url_for ignore keyword args that have an empty string for value
"""
filtered = {k: v for k, v in kwargs.items() if kwargs[k]}
return url_for(endpoint, **filtered)
if request.method == 'GET':
data = request.args
sha1_git = data.get('sha1_git')
if sha1_git:
if 'dir_path' in data:
# dir_path exists only in requests for a revision's directory
return redirect(url_for_filtered(
'browse_revision_directory',
sha1_git=sha1_git,
dir_path=data.get('dir_path')
))
return redirect(url_for_filtered(
'browse_directory',
sha1_git=sha1_git,
path=data.get('path')
))
args = ['origin_id', 'branch_name', 'ts', 'path']
values = {arg: data.get(arg) for arg in args if data.get(arg)}
if 'origin_id' in values:
return redirect(url_for('browse_revision_directory_through_origin',
**values))
@app.route('/revision/search/')
def search_revision():
"""
Redirect request with GET params for a revision to our fragmented
URI scheme
"""
if request.method == 'GET':
data = request.args
sha1_git = data.get('sha1_git')
if sha1_git:
return redirect(url_for('browse_revision', sha1_git=sha1_git))
args = ['origin_id', 'branch_name', 'ts']
values = {arg: data.get(arg) for arg in args if data.get(arg)}
if 'origin_id' in values:
return redirect(url_for('browse_revision_with_origin', **values))
@app.route('/content/symbol/', methods=['GET'])
def search_symbol():
"""Search for symbols in contents.
Returns:
dict representing data to look for in swh storage.
"""
env = {
'result': None,
'per_page': None,
'message': '',
'linknext': None,
'linkprev': None,
}
# Read form or get information
data = request.args
q = data.get('q')
per_page = data.get('per_page')
env['q'] = q
if per_page:
env['per_page'] = per_page
if q:
try:
result = api.api_content_symbol(q)
if result:
headers = result.get('headers')
result = utils.prepare_data_for_view(result['results'])
env['result'] = result
if headers:
if 'link-next' in headers:
next_last_sha1 = result[-1]['sha1']
params = {
'q': q,
'last_sha1': next_last_sha1,
}
if per_page:
params['per_page'] = per_page
env['linknext'] = url_for('search_symbol', **params)
except BadInputExc as e:
env['message'] = str(e)
return render_template('symbols.html', **env)
@app.route('/content/search/', methods=['GET', 'POST'])
def search_content():
"""Search for hashes in swh-storage.
One form to submit either:
- hash query to look up in swh storage
- file hashes calculated client-side to be queried in swh storage
- both
Returns:
dict representing data to look for in swh storage.
The following keys are returned:
- search_stats: {'nbfiles': X, 'pct': Y} the number of total
queried files and percentage of files not in storage respectively
- responses: array of {'filename': X, 'sha1': Y, 'found': Z}
- messages: General messages.
TODO:
Batch-process with all checksums, not just sha1
"""
env = {'search_res': None,
'search_stats': None,
'message': []}
search_stats = {'nbfiles': 0, 'pct': 0}
search_res = None
message = ''
# Get with a single hash request
if request.method == 'POST':
# Post form submission with many hash requests
q = None
else:
data = request.args
q = data.get('q')
try:
search = api.api_check_content_known(q)
search_res = search['search_res']
search_stats = search['search_stats']
except BadInputExc as e:
message = str(e)
env['search_stats'] = search_stats
env['search_res'] = search_res
env['message'] = message
return render_template('search.html', **env)
@app.route('/browse/')
def browse():
"""Render the user-facing browse view
"""
return render_template('browse.html')
@app.route('/browse/content/<string:q>/')
def browse_content(q):
"""Given a hash and a checksum, display the content's meta-data.
Args:
q is of the form algo_hash:hash with algo_hash in
(sha1, sha1_git, sha256)
Returns:
Information on one possible origin for such content.
Raises:
BadInputExc in case of unknown algo_hash or bad hash
NotFoundExc if the content is not found.
"""
env = {'q': q,
'message': None,
'content': None}
encoding = request.args.get('encoding', 'utf8')
if encoding not in aliases:
env['message'] = 'Encoding %s not supported.' \
'Supported Encodings: %s' % (
encoding, list(aliases.keys()))
return render_template('content.html', **env)
try:
content = api.api_content_metadata(q)
filetype = api_lookup(api.api_content_filetype, q)
if filetype:
content['mimetype'] = filetype.get('mimetype')
content['encoding'] = filetype.get('encoding')
else:
content['mimetype'] = None
content['encoding'] = None
language = api_lookup(api.api_content_language, q)
if language:
content['language'] = language.get('lang')
else:
content['language'] = None
licenses = api_lookup(api.api_content_license, q)
if licenses:
content['licenses'] = ', '.join(licenses.get('licenses', []))
else:
content['licenses'] = None
content_raw = service.lookup_content_raw(q)
if content_raw:
content['data'] = content_raw['data']
else:
content['data'] = None
ctags = api_lookup(api.api_content_ctags, q)
if ctags:
url = url_for('browse_content', q=q)
content['ctags'] = grouper((
'<a href="%s#l-%s">%s</a>' % (
url,
ctag['line'],
ctag['line'])
for ctag in ctags
), 20)
else:
content['ctags'] = None
env['content'] = utils.prepare_data_for_view(content,
encoding=encoding)
except (NotFoundExc, BadInputExc) as e:
env['message'] = str(e)
return render_template('content.html', **env)
@app.route('/browse/content/<string:q>/raw/')
def browse_content_raw(q):
"""Given a hash and a checksum, display the content's raw data.
Args:
q is of the form algo_hash:hash with algo_hash in
(sha1, sha1_git, sha256)
Returns:
Information on one possible origin for such content.
Raises:
BadInputExc in case of unknown algo_hash or bad hash
NotFoundExc if the content is not found.
"""
return redirect(url_for('api_content_raw', q=q))
def _origin_seen(q, data):
"""Given an origin, compute a message string with the right information.
Args:
origin: a dictionary with keys:
- origin: a dictionary with type and url keys
- occurrence: a dictionary with a validity range
Returns:
Message as a string
"""
origin_type = data['origin_type']
origin_url = data['origin_url']
revision = data['revision']
branch = data['branch']
path = data['path']
return """The content with hash %s has been seen on origin with type '%s'
at url '%s'. The revision was identified at '%s' on branch '%s'.
The file's path referenced was '%s'.""" % (q,
origin_type,
origin_url,
revision,
branch,
path)
# @app.route('/browse/content/<string:q>/origin/')
def browse_content_with_origin(q):
"""Show content information.
Args:
- q: query string of the form <algo_hash:hash> with
`algo_hash` in sha1, sha1_git, sha256.
This means that several different URLs (at least one per
HASH_ALGO) will point to the same content sha: the sha with
'hash' format
Returns:
The content's information at for a given checksum.
"""
env = {'q': q}
try:
origin = api.api_content_checksum_to_origin(q)
message = _origin_seen(q, origin)
except (NotFoundExc, BadInputExc) as e:
message = str(e)
env['message'] = message
return render_template('content-with-origin.html', **env)
@app.route('/browse/directory/<string:sha1_git>/')
@app.route('/browse/directory/<string:sha1_git>/<path:path>/')
def browse_directory(sha1_git, path=None):
"""Show directory information.
Args:
- sha1_git: the directory's sha1 git identifier. If path
is set, the base directory for the relative path to the entry
- path: the path to the requested entry, relative to
the directory pointed by sha1_git
Returns:
The content's information at sha1_git, or at sha1_git/path if
path is set.
"""
env = {'sha1_git': sha1_git,
'files': []}
try:
if path:
env['message'] = ('Listing for directory with path %s from %s:'
% (path, sha1_git))
dir_or_file = service.lookup_directory_with_path(
sha1_git, path)
if dir_or_file['type'] == 'file':
fsha = 'sha256:%s' % dir_or_file['sha256']
content = api.api_content_metadata(fsha)
content_raw = service.lookup_content_raw(fsha)
if content_raw: # FIXME: currently assuming utf8 encoding
content['data'] = content_raw['data']
env['content'] = utils.prepare_data_for_view(
content, encoding='utf-8')
return render_template('content.html', **env)
else:
directory_files = api.api_directory(dir_or_file['target'])
env['files'] = utils.prepare_data_for_view(directory_files)
else:
env['message'] = "Listing for directory %s:" % sha1_git
directory_files = api.api_directory(sha1_git)
env['files'] = utils.prepare_data_for_view(directory_files)
except (NotFoundExc, BadInputExc) as e:
env['message'] = str(e)
return render_template('directory.html', **env)
@app.route('/browse/origin/<string:origin_type>/url/<path:origin_url>/')
@app.route('/browse/origin/<int:origin_id>/')
def browse_origin(origin_id=None, origin_type=None, origin_url=None):
"""Browse origin matching given criteria - either origin_id or
origin_type and origin_path.
Args:
- origin_id: origin's swh identifier
- origin_type: origin's type
- origin_url: origin's URL
"""
# URLs for the calendar JS plugin
env = {'browse_url': None,
'visit_url': None,
'origin': None}
try:
origin = api.api_origin(origin_id, origin_type, origin_url)
env['origin'] = origin
env['browse_url'] = url_for('browse_revision_with_origin',
origin_id=origin['id'])
- env['visit_url'] = url_for('api_origin_visits',
+ env['visit_url'] = url_for('browse_origin_visits',
origin_id=origin['id'])
except (NotFoundExc, BadInputExc) as e:
env['message'] = str(e)
return render_template('origin.html', **env)
+@app.route('/browse/origin/<int:origin_id>/visits/')
+def browse_origin_visits(origin_id):
+ origin = api.api_origin_visits(origin_id)['results']
+ for v in origin:
+ v['date'] = dateutil.parser.parse(v['date']).timestamp()
+ return origin
+
+
@app.route('/browse/person/<int:person_id>/')
def browse_person(person_id):
"""Browse person with id id.
"""
env = {'person_id': person_id,
'person': None,
'message': None}
try:
env['person'] = api.api_person(person_id)
except (NotFoundExc, BadInputExc) as e:
env['message'] = str(e)
return render_template('person.html', **env)
@app.route('/browse/release/<string:sha1_git>/')
def browse_release(sha1_git):
"""Browse release with sha1_git.
"""
env = {'sha1_git': sha1_git,
'message': None,
'release': None}
try:
rel = api.api_release(sha1_git)
env['release'] = utils.prepare_data_for_view(rel)
except (NotFoundExc, BadInputExc) as e:
env['message'] = str(e)
return render_template('release.html', **env)
@app.route('/browse/revision/<string:sha1_git>/')
@app.route('/browse/revision/<string:sha1_git>/prev/<path:prev_sha1s>/')
def browse_revision(sha1_git, prev_sha1s=None):
"""Browse the revision with git SHA1 sha1_git_cur, while optionally keeping
the context from which we came as a list of previous (i.e. later)
revisions' sha1s.
Args:
sha1_git: the requested revision's sha1_git.
prev_sha1s: an optional string of /-separated sha1s representing our
context, ordered by descending revision date.
Returns:
Information about revision of git SHA1 sha1_git_cur, with relevant URLS
pointing to the context augmented with sha1_git_cur.
Example:
GET /browse/revision/
"""
env = {'sha1_git': sha1_git,
'message': None,
'revision': None}
try:
rev = api.api_revision(sha1_git, prev_sha1s)
env['revision'] = utils.prepare_data_for_view(rev)
except (NotFoundExc, BadInputExc) as e:
env['message'] = str(e)
return render_template('revision.html', **env)
@app.route('/browse/revision/<string:sha1_git>/raw/')
def browse_revision_raw_message(sha1_git):
"""Given a sha1_git, display the corresponding revision's raw message.
"""
return redirect(url_for('api_revision_raw_message', sha1_git=sha1_git))
@app.route('/browse/revision/<string:sha1_git>/log/')
@app.route('/browse/revision/<string:sha1_git>/prev/<path:prev_sha1s>/log/')
def browse_revision_log(sha1_git, prev_sha1s=None):
"""Browse revision with sha1_git's log. If the navigation path through the
commit tree is specified, we intersect the earliest revision's log with the
revisions the user browsed through - ie the path taken to the specified
revision.
Args:
sha1_git: the current revision's SHA1_git checksum
prev_sha1s: optionally, the path through which we want log information
"""
env = {'sha1_git': sha1_git,
'sha1_url': '/browse/revision/%s/' % sha1_git,
'message': None,
'revisions': []}
try:
revision_data = api.api_revision_log(sha1_git, prev_sha1s)
revisions = revision_data['revisions']
next_revs_url = revision_data['next_revs_url']
env['revisions'] = map(utils.prepare_data_for_view, revisions)
env['next_revs_url'] = utils.prepare_data_for_view(next_revs_url)
except (NotFoundExc, BadInputExc) as e:
env['message'] = str(e)
return render_template('revision-log.html', **env)
@app.route('/browse/revision'
'/origin/<int:origin_id>/log/')
@app.route('/browse/revision'
'/origin/<int:origin_id>'
'/branch/<path:branch_name>/log/')
@app.route('/browse/revision'
'/origin/<int:origin_id>'
'/branch/<path:branch_name>'
'/ts/<string:ts>/log/')
@app.route('/browse/revision'
'/origin/<int:origin_id>'
'/ts/<string:ts>/log/')
def browse_revision_log_by(origin_id,
branch_name='refs/heads/master',
timestamp=None):
"""Browse the revision described by origin, branch name and timestamp's
log
Args:
origin_id: the revision's origin
branch_name: the revision's branch
timestamp: the requested timeframe for the revision
Returns:
The revision log of the described revision as a list of revisions
if it is found.
"""
env = {'sha1_git': None,
'origin_id': origin_id,
'origin_url': '/browse/origin/%d/' % origin_id,
'branch_name': branch_name,
'timestamp': timestamp,
'message': None,
'revisions': []}
try:
revisions = api.api_revision_log_by(
origin_id, branch_name, timestamp)
env['revisions'] = map(utils.prepare_data_for_view, revisions)
except (NotFoundExc, BadInputExc) as e:
env['message'] = str(e)
return render_template('revision-log.html', **env)
@app.route('/browse/revision/<string:sha1_git_cur>/prev/<path:sha1s>/')
def browse_with_rev_context(sha1_git_cur, sha1s):
"""Browse the revision with git SHA1 sha1_git_cur, while keeping the context
from which we came as a list of previous (i.e. later) revisions' sha1s.
Args:
sha1_git_cur: the requested revision's sha1_git.
sha1s: a string of /-separated sha1s representing our context, ordered
by descending revision date.
Returns:
Information about revision of git SHA1 sha1_git_cur, with relevant URLS
pointing to the context augmented with sha1_git_cur.
Example:
GET /browse/revision/
"""
env = {'sha1_git': sha1_git_cur,
'message': None,
'revision': None}
try:
revision = api.api_revision(
sha1_git_cur, sha1s)
env['revision'] = utils.prepare_data_for_view(revision)
except (BadInputExc, NotFoundExc) as e:
env['message'] = str(e)
return render_template('revision.html', **env)
@app.route('/browse/revision/<string:sha1_git_root>/history/<sha1_git>/')
def browse_revision_history(sha1_git_root, sha1_git):
"""Display information about revision sha1_git, limited to the
sub-graph of all transitive parents of sha1_git_root.
In other words, sha1_git is an ancestor of sha1_git_root.
Args:
sha1_git_root: latest revision of the browsed history.
sha1_git: one of sha1_git_root's ancestors.
limit: optional query parameter to limit the revisions log
(default to 100). For now, note that this limit could impede the
transitivity conclusion about sha1_git not being an ancestor of
sha1_git_root (even if it is).
Returns:
Information on sha1_git if it is an ancestor of sha1_git_root
including children leading to sha1_git_root.
"""
env = {'sha1_git_root': sha1_git_root,
'sha1_git': sha1_git,
'message': None,
'keys': [],
'revision': None}
if sha1_git == sha1_git_root:
return redirect(url_for('browse_revision',
sha1_git=sha1_git))
try:
revision = api.api_revision_history(sha1_git_root,
sha1_git)
env['revision'] = utils.prepare_data_for_view(revision)
except (BadInputExc, NotFoundExc) as e:
env['message'] = str(e)
return render_template('revision.html', **env)
@app.route('/browse/revision/<string:sha1_git>/directory/')
@app.route('/browse/revision/<string:sha1_git>/directory/<path:dir_path>/')
def browse_revision_directory(sha1_git, dir_path=None):
"""Browse directory from revision with sha1_git.
"""
env = {
'sha1_git': sha1_git,
'path': '.' if not dir_path else dir_path,
'message': None,
'result': None
}
encoding = request.args.get('encoding', 'utf8')
if encoding not in aliases:
env['message'] = 'Encoding %s not supported.' \
'Supported Encodings: %s' % (
encoding, list(aliases.keys()))
return render_template('revision-directory.html', **env)
try:
result = api.api_revision_directory(sha1_git, dir_path, with_data=True)
result['content'] = utils.prepare_data_for_view(result['content'],
encoding=encoding)
env['revision'] = result['revision']
env['result'] = result
except (BadInputExc, NotFoundExc) as e:
env['message'] = str(e)
return render_template('revision-directory.html', **env)
@app.route('/browse/revision/<string:sha1_git_root>'
'/history/<sha1_git>'
'/directory/')
@app.route('/browse/revision/<string:sha1_git_root>'
'/history/<sha1_git>'
'/directory/<path:path>/')
def browse_revision_history_directory(sha1_git_root, sha1_git, path=None):
"""Return information about directory pointed to by the revision
defined as: revision sha1_git, limited to the sub-graph of all
transitive parents of sha1_git_root.
Args:
sha1_git_root: latest revision of the browsed history.
sha1_git: one of sha1_git_root's ancestors.
path: optional directory pointed to by that revision.
limit: optional query parameter to limit the revisions log
(default to 100). For now, note that this limit could impede the
transitivity conclusion about sha1_git not being an ancestor of
sha1_git_root (even if it is).
Returns:
Information on the directory pointed to by that revision.
Raises:
BadInputExc in case of unknown algo_hash or bad hash.
NotFoundExc if either revision is not found or if sha1_git is not an
ancestor of sha1_git_root or the path referenced does not exist
"""
env = {
'sha1_git_root': sha1_git_root,
'sha1_git': sha1_git,
'path': '.' if not path else path,
'message': None,
'result': None
}
encoding = request.args.get('encoding', 'utf8')
if encoding not in aliases:
env['message'] = 'Encoding %s not supported.' \
'Supported Encodings: %s' % (
encoding, list(aliases.keys()))
return render_template('revision-directory.html', **env)
if sha1_git == sha1_git_root:
return redirect(url_for('browse_revision_directory',
sha1_git=sha1_git,
path=path,
encoding=encoding),
code=301)
try:
result = api.api_revision_history_directory(sha1_git_root,
sha1_git,
path,
with_data=True)
env['revision'] = result['revision']
env['content'] = utils.prepare_data_for_view(result['content'],
encoding=encoding)
env['result'] = result
except (BadInputExc, NotFoundExc) as e:
env['message'] = str(e)
return render_template('revision-directory.html', **env)
@app.route('/browse/revision'
'/origin/<int:origin_id>'
'/history/<sha1_git>'
'/directory/')
@app.route('/browse/revision'
'/origin/<int:origin_id>'
'/history/<sha1_git>'
'/directory/<path:path>/')
@app.route('/browse/revision'
'/origin/<int:origin_id>'
'/branch/<path:branch_name>'
'/history/<sha1_git>'
'/directory/')
@app.route('/browse/revision'
'/origin/<int:origin_id>'
'/branch/<path:branch_name>'
'/history/<sha1_git>'
'/directory/<path:path>/')
@app.route('/browse/revision'
'/origin/<int:origin_id>'
'/ts/<string:ts>'
'/history/<sha1_git>'
'/directory/')
@app.route('/browse/revision'
'/origin/<int:origin_id>'
'/ts/<string:ts>'
'/history/<sha1_git>'
'/directory/<path:path>/')
@app.route('/browse/revision'
'/origin/<int:origin_id>'
'/branch/<path:branch_name>'
'/ts/<string:ts>'
'/history/<sha1_git>'
'/directory/')
@app.route('/browse/revision'
'/origin/<int:origin_id>'
'/branch/<path:branch_name>'
'/ts/<string:ts>'
'/history/<sha1_git>'
'/directory/<path:path>/')
def browse_directory_through_revision_with_origin_history(
origin_id,
branch_name="refs/heads/master",
ts=None,
sha1_git=None,
path=None):
env = {
'origin_id': origin_id,
'branch_name': branch_name,
'ts': ts,
'sha1_git': sha1_git,
'path': '.' if not path else path,
'message': None,
'result': None
}
encoding = request.args.get('encoding', 'utf8')
if encoding not in aliases:
env['message'] = (('Encoding %s not supported.'
'Supported Encodings: %s') % (
encoding, list(aliases.keys())))
return render_template('revision-directory.html', **env)
try:
result = api.api_directory_through_revision_with_origin_history(
origin_id, branch_name, ts, sha1_git, path, with_data=True)
env['revision'] = result['revision']
env['content'] = utils.prepare_data_for_view(result['content'],
encoding=encoding)
env['result'] = result
except (BadInputExc, NotFoundExc) as e:
env['message'] = str(e)
return render_template('revision-directory.html', **env)
@app.route('/browse/revision'
'/origin/')
@app.route('/browse/revision'
'/origin/<int:origin_id>/')
@app.route('/browse/revision'
'/origin/<int:origin_id>'
'/branch/<path:branch_name>/')
@app.route('/browse/revision'
'/origin/<int:origin_id>'
'/branch/<path:branch_name>'
'/ts/<string:ts>/')
@app.route('/browse/revision'
'/origin/<int:origin_id>'
'/ts/<string:ts>/')
def browse_revision_with_origin(origin_id,
branch_name="refs/heads/master",
ts=None):
"""Instead of having to specify a (root) revision by SHA1_GIT, users
might want to specify a place and a time. In SWH a "place" is an
origin; a "time" is a timestamp at which some place has been
observed by SWH crawlers.
Args:
origin_id: origin's identifier (default to 1).
branch_name: the optional branch for the given origin (default
to master).
timestamp: optional timestamp (default to the nearest time
crawl of timestamp).
Returns:
Information on the revision if found.
Raises:
BadInputExc in case of unknown algo_hash or bad hash.
NotFoundExc if the revision is not found.
"""
env = {'message': None,
'revision': None}
try:
revision = api.api_revision_with_origin(origin_id,
branch_name,
ts)
env['revision'] = utils.prepare_data_for_view(revision)
except (ValueError, NotFoundExc, BadInputExc) as e:
env['message'] = str(e)
return render_template('revision.html', **env)
@app.route('/browse/revision'
'/origin/<int:origin_id>'
'/history/<sha1_git>/')
@app.route('/browse/revision'
'/origin/<int:origin_id>'
'/branch/<path:branch_name>'
'/history/<sha1_git>/')
@app.route('/browse/revision'
'/origin/<int:origin_id>'
'/branch/<path:branch_name>'
'/ts/<string:ts>'
'/history/<sha1_git>/')
def browse_revision_history_through_origin(origin_id,
branch_name='refs/heads/master',
ts=None,
sha1_git=None):
"""Return information about revision sha1_git, limited to the
sub-graph of all transitive parents of the revision root identified
by (origin_id, branch_name, ts).
Given sha1_git_root such root revision's identifier, in other words,
sha1_git is an ancestor of sha1_git_root.
Args:
origin_id: origin's identifier (default to 1).
branch_name: the optional branch for the given origin (default
to master).
timestamp: optional timestamp (default to the nearest time
crawl of timestamp).
sha1_git: one of sha1_git_root's ancestors.
limit: optional query parameter to limit the revisions log
(default to 100). For now, note that this limit could impede the
transitivity conclusion about sha1_git not being an ancestor of
sha1_git_root (even if it is).
Returns:
Information on sha1_git if it is an ancestor of sha1_git_root
including children leading to sha1_git_root.
"""
env = {'message': None,
'revision': None}
try:
revision = api.api_revision_history_through_origin(
origin_id,
branch_name,
ts,
sha1_git)
env['revision'] = utils.prepare_data_for_view(revision)
except (ValueError, BadInputExc, NotFoundExc) as e:
env['message'] = str(e)
return render_template('revision.html', **env)
@app.route('/browse/revision'
'/origin/<int:origin_id>'
'/directory/')
@app.route('/browse/revision'
'/origin/<int:origin_id>'
'/directory/<path:path>')
@app.route('/browse/revision'
'/origin/<int:origin_id>'
'/branch/<path:branch_name>'
'/directory/')
@app.route('/browse/revision'
'/origin/<int:origin_id>'
'/branch/<path:branch_name>'
'/directory/<path:path>/')
@app.route('/browse/revision'
'/origin/<int:origin_id>'
'/branch/<path:branch_name>'
'/ts/<string:ts>'
'/directory/')
@app.route('/browse/revision'
'/origin/<int:origin_id>'
'/branch/<path:branch_name>'
'/ts/<string:ts>'
'/directory/<path:path>/')
def browse_revision_directory_through_origin(origin_id,
branch_name='refs/heads/master',
ts=None,
path=None):
env = {'message': None,
'origin_id': origin_id,
'ts': ts,
'path': '.' if not path else path,
'result': None}
encoding = request.args.get('encoding', 'utf8')
if encoding not in aliases:
env['message'] = 'Encoding %s not supported.' \
'Supported Encodings: %s' % (
encoding, list(aliases.keys()))
return render_template('revision-directory.html', **env)
try:
result = api.api_directory_through_revision_origin(
origin_id,
branch_name,
ts,
path,
with_data=True)
result['content'] = utils.prepare_data_for_view(result['content'],
encoding=encoding)
env['revision'] = result['revision']
env['result'] = result
except (ValueError, BadInputExc, NotFoundExc) as e:
env['message'] = str(e)
return render_template('revision-directory.html', **env)
@app.route('/browse/entity/')
@app.route('/browse/entity/<string:uuid>/')
def browse_entity(uuid):
env = {'entities': [],
'message': None}
try:
entities = api.api_entity_by_uuid(uuid)
env['entities'] = entities
except (NotFoundExc, BadInputExc) as e:
env['message'] = str(e)
return render_template('entity.html', **env)

File Metadata

Mime Type
text/x-diff
Expires
Jul 4 2025, 9:59 AM (5 w, 20 h ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3235828

Event Timeline