diff --git a/docs/uri-scheme-api-origin.rst b/docs/uri-scheme-api-origin.rst index f239cd37..bbc3f867 100644 --- a/docs/uri-scheme-api-origin.rst +++ b/docs/uri-scheme-api-origin.rst @@ -1,276 +1,278 @@ Origin ------ .. http:get:: /api/1/origin/(origin_id)/ Get information about a software origin from its unique (but otherwise meaningless) identifier. :param int origin_id: a SWH origin identifier :>json number id: the origin unique identifier :>json string origin_visits_url: link to in order to get information about the SWH visits for that origin :>json string type: the type of software origin (*git*, *svn*, *hg*, *deb*, *ftp*, ...) :>json string url: the origin canonical url :reqheader Accept: the requested response content type, either *application/json* (default) or *application/yaml* :resheader Content-Type: this depends on :http:header:`Accept` header of request :statuscode 200: no error :statuscode 404: requested origin can not be found in the SWH archive **Request:** .. parsed-literal:: $ curl -i :swh_web_api:`origin/1/` **Response:** .. sourcecode:: http HTTP/1.1 200 OK Content-Type: application/json { "id": 1, "origin_visits_url": "/api/1/origin/1/visits/", "type": "git", "url": "https://github.com/hylang/hy" } .. http:get:: /api/1/origin/(origin_type)/url/(origin_url)/ Get information about a software origin from its type and canonical url. :param string origin_type: the origin type (*git*, *svn*, *hg*, *deb*, *ftp*, ...) :param string origin_url: the origin url :>json number id: the origin unique identifier :>json string origin_visits_url: link to in order to get information about the SWH visits for that origin :>json string type: the type of software origin (*git*, *svn*, *hg*, *deb*, *ftp*, ...) :>json string url: the origin canonical url :reqheader Accept: the requested response content type, either *application/json* (default) or *application/yaml* :resheader Content-Type: this depends on :http:header:`Accept` header of request **Allowed HTTP Methods:** :http:method:`get`, :http:method:`head`, :http:method:`options` :statuscode 200: no error :statuscode 404: requested origin can not be found in the SWH archive **Request:** .. parsed-literal:: $ curl -i :swh_web_api:`origin/git/url/https://github.com/python/cpython/` **Response:** .. sourcecode:: http HTTP/1.1 200 OK Content-Type: application/json { "id": 13706355, "origin_visits_url": "/api/1/origin/13706355/visits/", "type": "git", "url": "https://github.com/python/cpython" } .. http:get:: /api/1/origin/search/(url_pattern)/ Search for software origins whose urls contain a provided string pattern or match a provided regular expression. The search is performed in a case insensitive way. :param string url_pattern: a string pattern or a regular expression :query int offset: the number of found origins to skip before returning results :query int limit: the maximum number of found origins to return :query boolean regexp: if true, consider provided pattern as a regular expression and search origins whose urls match it :>jsonarr number id: the origin unique identifier :>jsonarr string origin_visits_url: link to in order to get information about the SWH visits for that origin :>jsonarr string type: the type of software origin (*git*, *svn*, *hg*, *deb*, *ftp*, ...) :>jsonarr string url: the origin canonical url :reqheader Accept: the requested response content type, either *application/json* (default) or *application/yaml* :resheader Content-Type: this depends on :http:header:`Accept` header of request **Allowed HTTP Methods:** :http:method:`get`, :http:method:`head`, :http:method:`options` :statuscode 200: no error **Request:** .. parsed-literal:: $ curl -i :swh_web_api:`origin/search/python/?limit=2` **Response:** .. sourcecode:: http HTTP/1.1 200 OK Content-Type: application/json [ { "type": "git", "origin_visits_url": "/api/1/origin/220/visits/", "id": 220, "url": "https://github.com/neon670/python.dev" }, { "type": "git", "origin_visits_url": "/api/1/origin/328/visits/", "id": 328, "url": "https://github.com/aur-archive/python-werkzeug" } ] .. http:get:: /api/1/origin/(origin_id)/visits/ Get information about all visits of a software origin. + Visits are returned sorted in descending order according + to their date. :param int origin_id: a SWH origin identifier :query int per_page: specify the number of visits to list, for pagination purposes :query int last_visit: visit to start listing from, for pagination purposes :reqheader Accept: the requested response content type, either *application/json* (default) or *application/yaml* :resheader Content-Type: this depends on :http:header:`Accept` header of request :resheader Link: indicates that a subsequent result page is available and contains the url pointing to it :>jsonarr string date: ISO representation of the visit date (in UTC) :>jsonarr number id: the unique identifier of the origin :>jsonarr string origin_visit_url: link to :http:get:`/api/1/origin/(origin_id)/visit/(visit_id)/` in order to get information about the visit :>jsonarr string snapshot: the snapshot identifier of the visit :>jsonarr string snapshot_url: link to :http:get:`/api/1/snapshot/(snapshot_id)/` in order to get information about the snapshot of the visit :>jsonarr string status: status of the visit (either *full*, *partial* or *ongoing*) :>jsonarr number visit: the unique identifier of the visit **Allowed HTTP Methods:** :http:method:`get`, :http:method:`head`, :http:method:`options` :statuscode 200: no error :statuscode 404: requested origin can not be found in the SWH archive **Request:** .. parsed-literal:: $ curl -i :swh_web_api:`origin/1/visits/` **Response:** .. sourcecode:: http HTTP/1.1 200 OK Link: ; rel="next" Content-Type: application/json [ { "date": "2015-08-04T22:26:14.804009+00:00", "origin": 1, "origin_visit_url": "/api/1/origin/1/visit/1/", "snapshot": "584b2fe3ce6218a96892e73bd76c2966bbc2a797", "snapshot_url": "/api/1/snapshot/584b2fe3ce6218a96892e73bd76c2966bbc2a797/", "status": "full", "visit": 1 }, { "date": "2016-02-22T16:56:16.725068+00:00", "metadata": {}, "origin": 1, "origin_visit_url": "/api/1/origin/1/visit/2/", "snapshot": "9d502b5a193c33b59b22be3c75f73472867b0f8e", "snapshot_url": "/api/1/snapshot/9d502b5a193c33b59b22be3c75f73472867b0f8e/", "status": "full", "visit": 2 }, ] .. http:get:: /api/1/origin/(origin_id)/visit/(visit_id)/ Get information about a specific visit of a software origin. :param int origin_id: a SWH origin identifier :param int visit_id: a visit identifier :reqheader Accept: the requested response content type, either *application/json* (default) or *application/yaml* :resheader Content-Type: this depends on :http:header:`Accept` header of request :>json string date: ISO representation of the visit date (in UTC) :>json object occurrences: object containing all branches associated to the origin found during the visit, for each of them the associated SWH target type and id are given but also a link to get information about that target :>json number origin: the origin unique identifier :>json string origin_url: link to get information about the origin :>jsonarr string snapshot: the snapshot identifier of the visit :>jsonarr string snapshot_url: link to :http:get:`/api/1/snapshot/(snapshot_id)/` in order to get information about the snapshot of the visit :>json string status: status of the visit (either *full*, *partial* or *ongoing*) :>json number visit: the unique identifier of the visit **Allowed HTTP Methods:** :http:method:`get`, :http:method:`head`, :http:method:`options` :statuscode 200: no error :statuscode 404: requested origin or visit can not be found in the SWH archive **Request:** .. parsed-literal:: $ curl -i :swh_web_api:`origin/1500/visit/1/` **Response:** .. sourcecode:: http HTTP/1.1 200 OK Content-Type: application/json { "date": "2015-08-23T17:48:46.800813+00:00", "occurrences": { "refs/heads/master": { "target": "83c20a6a63a7ebc1a549d367bc07a61b926cecf3", "target_type": "revision", "target_url": "/api/1/revision/83c20a6a63a7ebc1a549d367bc07a61b926cecf3/" }, "refs/heads/wiki": { "target": "71f667aeb5d02562f2fa0941ad91df69c474ff3b", "target_type": "revision", "target_url": "/api/1/revision/71f667aeb5d02562f2fa0941ad91df69c474ff3b/" }, "refs/tags/dpkt-1.6": { "target": "7fc0fd582812af36064d1c85fe51e33227920479", "target_type": "revision", "target_url": "/api/1/revision/7fc0fd582812af36064d1c85fe51e33227920479/" }, "refs/tags/dpkt-1.7": { "target": "0c9dbfbc0974ec8ac1d8253aa1092366a03633a8", "target_type": "revision", "target_url": "/api/1/revision/0c9dbfbc0974ec8ac1d8253aa1092366a03633a8/" } }, "origin": 1500, "origin_url": "/api/1/origin/1500/", "snapshot": "6a3a2cf0b2b90ce7ae1cf0a221ed68035b686f5a", "snapshot_url": "/api/1/snapshot/6a3a2cf0b2b90ce7ae1cf0a221ed68035b686f5a/", "status": "full", "visit": 1 } diff --git a/swh/web/api/views/origin.py b/swh/web/api/views/origin.py index dce5d436..80b7826d 100644 --- a/swh/web/api/views/origin.py +++ b/swh/web/api/views/origin.py @@ -1,254 +1,267 @@ # Copyright (C) 2015-2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information from distutils.util import strtobool from swh.web.common import service -from swh.web.common.utils import reverse +from swh.web.common.utils import ( + reverse, get_origin_visits +) from swh.web.api import utils from swh.web.api import apidoc as api_doc from swh.web.api.apiurls import api_route from swh.web.api.views.utils import ( api_lookup, doc_exc_id_not_found, doc_header_link, doc_arg_last_elt, doc_arg_per_page ) def _enrich_origin(origin): if 'id' in origin: o = origin.copy() o['origin_visits_url'] = \ reverse('origin-visits', kwargs={'origin_id': origin['id']}) return o return origin @api_route(r'/origin/(?P[0-9]+)/', 'origin') @api_route(r'/origin/(?P[a-z]+)/url/(?P.+)', 'origin') @api_doc.route('/origin/') @api_doc.arg('origin_id', default=1, argtype=api_doc.argtypes.int, argdoc='origin identifier (when looking up by ID)') @api_doc.arg('origin_type', default='git', argtype=api_doc.argtypes.str, argdoc='origin type (when looking up by type+URL)') @api_doc.arg('origin_url', default='https://github.com/hylang/hy', argtype=api_doc.argtypes.path, argdoc='origin URL (when looking up by type+URL)') @api_doc.raises(exc=api_doc.excs.notfound, doc=doc_exc_id_not_found) @api_doc.returns(rettype=api_doc.rettypes.dict, retdoc="""The metadata of the origin corresponding to the given criteria""") def api_origin(request, origin_id=None, origin_type=None, origin_url=None): """Get information about a software origin. Software origins might be looked up by origin type and canonical URL (e.g., "git" + a "git clone" URL), or by their unique (but otherwise meaningless) identifier. """ ori_dict = { 'id': origin_id, 'type': origin_type, 'url': origin_url } ori_dict = {k: v for k, v in ori_dict.items() if ori_dict[k]} if 'id' in ori_dict: error_msg = 'Origin with id %s not found.' % ori_dict['id'] else: error_msg = 'Origin with type %s and URL %s not found' % ( ori_dict['type'], ori_dict['url']) return api_lookup( service.lookup_origin, ori_dict, notfound_msg=error_msg, enrich_fn=_enrich_origin) @api_route(r'/origin/search/(?P.+)/', 'origin-search') @api_doc.route('/origin/search/') @api_doc.arg('url_pattern', default='python', argtype=api_doc.argtypes.str, argdoc='string pattern to search for in origin urls') @api_doc.header('Link', doc=doc_header_link) @api_doc.param('offset', default=0, argtype=api_doc.argtypes.int, doc='number of found origins to skip before returning results') # noqa @api_doc.param('limit', default=70, argtype=api_doc.argtypes.int, doc='the maximum number of found origins to return') @api_doc.param('regexp', default='false', argtype=api_doc.argtypes.str, doc="""if that query parameter is set to 'true', consider provided pattern as a regular expression and search origins whose urls match it""") @api_doc.returns(rettype=api_doc.rettypes.list, retdoc="""The metadata of the origins whose urls match the provided string pattern""") def api_origin_search(request, url_pattern): """Search for origins whose urls contain a provided string pattern or match a provided regular expression. The search is performed in a case insensitive way. """ result = {} offset = int(request.query_params.get('offset', '0')) limit = int(request.query_params.get('limit', '70')) regexp = request.query_params.get('regexp', 'false') results = api_lookup(service.search_origin, url_pattern, offset, limit, bool(strtobool(regexp)), enrich_fn=_enrich_origin) nb_results = len(results) if nb_results == limit: query_params = {} query_params['offset'] = offset + limit query_params['limit'] = limit query_params['regexp'] = regexp result['headers'] = { 'link-next': reverse('origin-search', kwargs={'url_pattern': url_pattern}, query_params=query_params) } result.update({ 'results': results }) return result @api_route(r'/origin/(?P[0-9]+)/visits/', 'origin-visits') @api_doc.route('/origin/visits/') @api_doc.arg('origin_id', default=1, argtype=api_doc.argtypes.int, argdoc='software origin identifier') @api_doc.header('Link', doc=doc_header_link) @api_doc.param('last_visit', default=None, argtype=api_doc.argtypes.int, doc=doc_arg_last_elt) @api_doc.param('per_page', default=10, argtype=api_doc.argtypes.int, doc=doc_arg_per_page) @api_doc.raises(exc=api_doc.excs.notfound, doc=doc_exc_id_not_found) @api_doc.returns(rettype=api_doc.rettypes.list, retdoc="""a list of dictionaries describing individual visits. For each visit, its identifier, timestamp (as UNIX time), - outcome, and visit-specific URL for more information are - given.""") + outcome, snapshot id, and visit-specific URL for more + information are given. Visits are sorted in descending + order according to their timestamp.""") def api_origin_visits(request, origin_id): """Get information about all visits of a given software origin. """ result = {} per_page = int(request.query_params.get('per_page', '10')) last_visit = request.query_params.get('last_visit') if last_visit: last_visit = int(last_visit) def _lookup_origin_visits( origin_id, last_visit=last_visit, per_page=per_page): - return service.lookup_origin_visits( - origin_id, last_visit=last_visit, per_page=per_page) + all_visits = get_origin_visits({'id': origin_id}) + all_visits.reverse() + visits = [] + if not last_visit: + visits = all_visits[:per_page] + else: + for i, v in enumerate(all_visits): + if v['visit'] == last_visit: + visits = all_visits[i+1:i+1+per_page] + break + for v in visits: + yield v def _enrich_origin_visit(origin_visit): ov = origin_visit.copy() ov['origin_visit_url'] = reverse('origin-visit', kwargs={'origin_id': origin_id, 'visit_id': ov['visit']}) snapshot = ov['snapshot'] if snapshot: ov['snapshot_url'] = reverse('snapshot', kwargs={'snapshot_id': snapshot}) else: ov['snapshot_url'] = None return ov results = api_lookup(_lookup_origin_visits, origin_id, notfound_msg='No origin {} found'.format(origin_id), enrich_fn=_enrich_origin_visit) if results: nb_results = len(results) if nb_results == per_page: new_last_visit = results[-1]['visit'] query_params = {} query_params['last_visit'] = new_last_visit if request.query_params.get('per_page'): query_params['per_page'] = per_page result['headers'] = { 'link-next': reverse('origin-visits', kwargs={'origin_id': origin_id}, query_params=query_params) } result.update({ 'results': results }) return result @api_route(r'/origin/(?P[0-9]+)/visit/(?P[0-9]+)/', 'origin-visit') @api_doc.route('/origin/visit/') @api_doc.arg('origin_id', default=1, argtype=api_doc.argtypes.int, argdoc='software origin identifier') @api_doc.arg('visit_id', default=1, argtype=api_doc.argtypes.int, argdoc="""visit identifier, relative to the origin identified by origin_id""") @api_doc.raises(exc=api_doc.excs.notfound, doc=doc_exc_id_not_found) @api_doc.returns(rettype=api_doc.rettypes.dict, retdoc="""dictionary containing both metadata for the entire visit (e.g., timestamp as UNIX time, visit outcome, etc.) and what was at the software origin during the visit (i.e., a mapping from branches to other archive objects)""") def api_origin_visit(request, origin_id, visit_id): """Get information about a specific visit of a software origin. """ def _enrich_origin_visit(origin_visit): ov = origin_visit.copy() ov['origin_url'] = reverse('origin', kwargs={'origin_id': ov['origin']}) snapshot = ov['snapshot'] if snapshot: ov['snapshot_url'] = reverse('snapshot', kwargs={'snapshot_id': snapshot}) else: ov['snapshot_url'] = None # TODO: remove that piece of code once the snapshot migration # is totally effective in storage (no more occurrences) if 'occurrences' in ov: ov['occurrences'] = { k: utils.enrich_object(v) if v else None for k, v in ov['occurrences'].items() } return ov return api_lookup( service.lookup_origin_visit, origin_id, visit_id, notfound_msg=('No visit {} for origin {} found' .format(visit_id, origin_id)), enrich_fn=_enrich_origin_visit) diff --git a/swh/web/browse/utils.py b/swh/web/browse/utils.py index ab60c762..b936069d 100644 --- a/swh/web/browse/utils.py +++ b/swh/web/browse/utils.py @@ -1,1056 +1,996 @@ # Copyright (C) 2017-2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information import base64 import magic import math import pypandoc import stat from django.core.cache import cache from django.utils.safestring import mark_safe from importlib import reload from swh.web.common import highlightjs, service from swh.web.common.exc import NotFoundExc from swh.web.common.utils import ( - reverse, format_utc_iso_date, parse_timestamp + reverse, format_utc_iso_date, parse_timestamp, + get_origin_visits ) from swh.web.config import get_config def get_directory_entries(sha1_git): """Function that retrieves the content of a SWH directory from the SWH archive. The directories entries are first sorted in lexicographical order. Sub-directories and regular files are then extracted. Args: sha1_git: sha1_git identifier of the directory Returns: A tuple whose first member corresponds to the sub-directories list and second member the regular files list Raises: NotFoundExc if the directory is not found """ cache_entry_id = 'directory_entries_%s' % sha1_git cache_entry = cache.get(cache_entry_id) if cache_entry: return cache_entry entries = list(service.lookup_directory(sha1_git)) entries = sorted(entries, key=lambda e: e['name']) for entry in entries: entry['perms'] = stat.filemode(entry['perms']) dirs = [e for e in entries if e['type'] == 'dir'] files = [e for e in entries if e['type'] == 'file'] cache.set(cache_entry_id, (dirs, files)) return dirs, files def get_mimetype_and_encoding_for_content(content): """Function that returns the mime type and the encoding associated to a content buffer using the magic module under the hood. Args: content (bytes): a content buffer Returns: A tuple (mimetype, encoding), for instance ('text/plain', 'us-ascii'), associated to the provided content. """ while True: try: magic_result = magic.detect_from_content(content) mime_type = magic_result.mime_type encoding = magic_result.encoding break except Exception as exc: # workaround an issue with the magic module who can fail # if detect_from_content is called multiple times in # a short amount of time reload(magic) return mime_type, encoding # maximum authorized content size in bytes for HTML display # with code highlighting content_display_max_size = get_config()['content_display_max_size'] def request_content(query_string, max_size=content_display_max_size): """Function that retrieves a SWH content from the SWH archive. Raw bytes content is first retrieved, then the content mime type. If the mime type is not stored in the archive, it will be computed using Python magic module. Args: query_string: a string of the form "[ALGO_HASH:]HASH" where optional ALGO_HASH can be either *sha1*, *sha1_git*, *sha256*, or *blake2s256* (default to *sha1*) and HASH the hexadecimal representation of the hash value max_size: the maximum size for a content to retrieve (default to 1MB, no size limit if None) Returns: A tuple whose first member corresponds to the content raw bytes and second member the content mime type Raises: NotFoundExc if the content is not found """ content_data = service.lookup_content(query_string) filetype = None language = None license = None # requests to the indexer db may fail so properly handle # those cases in order to avoid content display errors try: filetype = service.lookup_content_filetype(query_string) language = service.lookup_content_language(query_string) license = service.lookup_content_license(query_string) except Exception as e: pass mimetype = 'unknown' encoding = 'unknown' if filetype: mimetype = filetype['mimetype'] encoding = filetype['encoding'] if not max_size or content_data['length'] < max_size: content_raw = service.lookup_content_raw(query_string) content_data['raw_data'] = content_raw['data'] if not filetype: mimetype, encoding = \ get_mimetype_and_encoding_for_content(content_data['raw_data']) # encode textual content to utf-8 if needed if mimetype.startswith('text/'): # probably a malformed UTF-8 content, reencode it # by replacing invalid chars with a substitution one if encoding == 'unknown-8bit': content_data['raw_data'] = \ content_data['raw_data'].decode('utf-8', 'replace')\ .encode('utf-8') elif 'ascii' not in encoding and encoding not in ['utf-8', 'binary']: # noqa content_data['raw_data'] = \ content_data['raw_data'].decode(encoding, 'replace')\ .encode('utf-8') else: content_data['raw_data'] = None content_data['mimetype'] = mimetype content_data['encoding'] = encoding if language: content_data['language'] = language['lang'] else: content_data['language'] = 'not detected' if license: content_data['licenses'] = ', '.join(license['licenses']) else: content_data['licenses'] = 'not detected' return content_data _browsers_supported_image_mimes = set(['image/gif', 'image/png', 'image/jpeg', 'image/bmp', 'image/webp']) def prepare_content_for_display(content_data, mime_type, path): """Function that prepares a content for HTML display. The function tries to associate a programming language to a content in order to perform syntax highlighting client-side using highlightjs. The language is determined using either the content filename or its mime type. If the mime type corresponds to an image format supported by web browsers, the content will be encoded in base64 for displaying the image. Args: content_data (bytes): raw bytes of the content mime_type (string): mime type of the content path (string): path of the content including filename Returns: A dict containing the content bytes (possibly different from the one provided as parameter if it is an image) under the key 'content_data and the corresponding highlightjs language class under the key 'language'. """ language = highlightjs.get_hljs_language_from_filename(path) if not language: language = highlightjs.get_hljs_language_from_mime_type(mime_type) if not language: language = 'nohighlight-swh' elif mime_type.startswith('application/'): mime_type = mime_type.replace('application/', 'text/') if mime_type.startswith('image/'): if mime_type in _browsers_supported_image_mimes: content_data = base64.b64encode(content_data) else: content_data = None return {'content_data': content_data, 'language': language} -def get_origin_visits(origin_info): - """Function that returns the list of visits for a swh origin. - That list is put in cache in order to speedup the navigation - in the swh web browse ui. - - Args: - origin_id (int): the id of the swh origin to fetch visits from - - Returns: - A list of dict describing the origin visits:: - - [{'date': , - 'origin': , - 'status': <'full' | 'partial'>, - 'visit': - }, - ... - ] - - Raises: - NotFoundExc if the origin is not found - """ - cache_entry_id = 'origin_%s_visits' % origin_info['id'] - cache_entry = cache.get(cache_entry_id) - - if cache_entry: - return cache_entry - - origin_visits = [] - - per_page = service.MAX_LIMIT - last_visit = None - while 1: - visits = list(service.lookup_origin_visits(origin_info['id'], - last_visit=last_visit, - per_page=per_page)) - origin_visits += visits - if len(visits) < per_page: - break - else: - if not last_visit: - last_visit = per_page - else: - last_visit += per_page - - def _visit_sort_key(visit): - ts = parse_timestamp(visit['date']).timestamp() - return ts + (float(visit['visit']) / 10e3) - - for v in origin_visits: - if 'metadata' in v: - del v['metadata'] - origin_visits = [dict(t) for t in set([tuple(d.items()) - for d in origin_visits])] - origin_visits = sorted(origin_visits, key=lambda v: _visit_sort_key(v)) - - cache.set(cache_entry_id, origin_visits) - - return origin_visits - - def get_origin_visit(origin_info, visit_ts=None, visit_id=None, snapshot_id=None): """Function that returns information about a SWH visit for a given origin. The visit is retrieved from a provided timestamp. The closest visit from that timestamp is selected. Args: origin_info (dict): a dict filled with origin information (id, url, type) visit_ts (int or str): an ISO date string or Unix timestamp to parse Returns: A dict containing the visit info as described below:: {'origin': 2, 'date': '2017-10-08T11:54:25.582463+00:00', 'metadata': {}, 'visit': 25, 'status': 'full'} """ visits = get_origin_visits(origin_info) if not visits: raise NotFoundExc('No SWH visit associated to origin with' ' type %s and url %s!' % (origin_info['type'], origin_info['url'])) if snapshot_id: visit = [v for v in visits if v['snapshot'] == snapshot_id] if len(visit) == 0: raise NotFoundExc( 'Visit for snapshot with id %s for origin with type %s' ' and url %s not found!' % (snapshot_id, origin_info['type'], origin_info['url'])) return visit[0] if visit_id: visit = [v for v in visits if v['visit'] == int(visit_id)] if len(visit) == 0: raise NotFoundExc( 'Visit with id %s for origin with type %s' ' and url %s not found!' % (visit_id, origin_info['type'], origin_info['url'])) return visit[0] if not visit_ts: # returns the latest full visit when no timestamp is provided for v in reversed(visits): if v['status'] == 'full': return v return visits[-1] parsed_visit_ts = math.floor(parse_timestamp(visit_ts).timestamp()) visit_idx = None for i, visit in enumerate(visits): ts = math.floor(parse_timestamp(visit['date']).timestamp()) if i == 0 and parsed_visit_ts <= ts: return visit elif i == len(visits) - 1: if parsed_visit_ts >= ts: return visit else: next_ts = math.floor( parse_timestamp(visits[i+1]['date']).timestamp()) if parsed_visit_ts >= ts and parsed_visit_ts < next_ts: if (parsed_visit_ts - ts) < (next_ts - parsed_visit_ts): visit_idx = i break else: visit_idx = i+1 break if visit_idx: visit = visits[visit_idx] while visit_idx < len(visits) - 1 and \ visit['date'] == visits[visit_idx+1]['date']: visit_idx = visit_idx + 1 visit = visits[visit_idx] return visit else: raise NotFoundExc( 'Visit with timestamp %s for origin with type %s and url %s not found!' % # noqa (visit_ts, origin_info['type'], origin_info['url'])) def get_snapshot_content(snapshot_id): """Returns the lists of branches and releases associated to a swh snapshot. That list is put in cache in order to speedup the navigation in the swh-web/browse ui. Args: snapshot_id (str): hexadecimal representation of the snapshot identifier Returns: A tuple with two members. The first one is a list of dict describing the snapshot branches. The second one is a list of dict describing the snapshot releases. Raises: NotFoundExc if the snapshot does not exist """ cache_entry_id = 'swh_snapshot_%s' % snapshot_id cache_entry = cache.get(cache_entry_id) if cache_entry: return cache_entry['branches'], cache_entry['releases'] branches = [] releases = [] if snapshot_id: revision_ids = [] releases_ids = [] snapshot = service.lookup_snapshot(snapshot_id) snapshot_branches = snapshot['branches'] for key in sorted(snapshot_branches.keys()): if not snapshot_branches[key]: continue if snapshot_branches[key]['target_type'] == 'revision': branches.append({'name': key, 'revision': snapshot_branches[key]['target']}) revision_ids.append(snapshot_branches[key]['target']) elif snapshot_branches[key]['target_type'] == 'release': releases_ids.append(snapshot_branches[key]['target']) releases_info = service.lookup_release_multiple(releases_ids) for release in releases_info: releases.append({'name': release['name'], 'date': format_utc_iso_date(release['date']), 'id': release['id'], 'message': release['message'], 'target_type': release['target_type'], 'target': release['target']}) revision_ids.append(release['target']) revisions = service.lookup_revision_multiple(revision_ids) branches_to_remove = [] for idx, revision in enumerate(revisions): if idx < len(branches): if revision: branches[idx]['directory'] = revision['directory'] branches[idx]['date'] = format_utc_iso_date(revision['date']) # noqa branches[idx]['message'] = revision['message'] else: branches_to_remove.append(branches[idx]) else: rel_idx = idx - len(branches) if revision: releases[rel_idx]['directory'] = revision['directory'] for b in branches_to_remove: branches.remove(b) cache.set(cache_entry_id, {'branches': branches, 'releases': releases}) return branches, releases def get_origin_visit_snapshot(origin_info, visit_ts=None, visit_id=None, snapshot_id=None): """Returns the lists of branches and releases associated to a swh origin for a given visit. The visit is expressed by a timestamp. In the latter case, the closest visit from the provided timestamp will be used. If no visit parameter is provided, it returns the list of branches found for the latest visit. That list is put in cache in order to speedup the navigation in the swh-web/browse ui. Args: origin_info (dict): a dict filled with origin information (id, url, type) visit_ts (int or str): an ISO date string or Unix timestamp to parse visit_id (int): optional visit id for desambiguation in case several visits have the same timestamp Returns: A tuple with two members. The first one is a list of dict describing the origin branches for the given visit. The second one is a list of dict describing the origin releases for the given visit. Raises: NotFoundExc if the origin or its visit are not found """ visit_info = get_origin_visit(origin_info, visit_ts, visit_id, snapshot_id) return get_snapshot_content(visit_info['snapshot']) def gen_link(url, link_text, link_attrs={}): """ Utility function for generating an HTML link to insert in Django templates. Args: url (str): an url link_text (str): the text for the produced link link_attrs (dict): optional attributes (e.g. class) to add to the link Returns: An HTML link in the form 'link_text' """ attrs = ' ' for k, v in link_attrs.items(): attrs += '%s="%s" ' % (k, v) link = '%s' % (attrs, url, link_text) return mark_safe(link) def gen_person_link(person_id, person_name, snapshot_context=None, link_attrs={}): """ Utility function for generating a link to a SWH person HTML view to insert in Django templates. Args: person_id (int): a SWH person id person_name (str): the associated person name link_attrs (dict): optional attributes (e.g. class) to add to the link Returns: An HTML link in the form 'person_name' """ query_params = None if snapshot_context and snapshot_context['origin_info']: origin_info = snapshot_context['origin_info'] query_params = {'origin_type': origin_info['type'], 'origin_url': origin_info['url']} if 'timestamp' in snapshot_context['url_args']: query_params['timestamp'] = \ snapshot_context['url_args']['timestamp'] if 'visit_id' in snapshot_context['query_params']: query_params['visit_id'] = \ snapshot_context['query_params']['visit_id'] elif snapshot_context: query_params = {'snapshot_id': snapshot_context['snapshot_id']} person_url = reverse('browse-person', kwargs={'person_id': person_id}, query_params=query_params) return gen_link(person_url, person_name, link_attrs) def gen_revision_link(revision_id, shorten_id=False, snapshot_context=None, link_text=None, link_attrs={}): """ Utility function for generating a link to a SWH revision HTML view to insert in Django templates. Args: revision_id (str): a SWH revision id shorten_id (boolean): wheter to shorten the revision id to 7 characters for the link text snapshot_context (dict): if provided, generate snapshot-dependent browsing link link_attrs (dict): optional attributes (e.g. class) to add to the link Returns: An HTML link in the form 'revision_id' """ query_params = None if snapshot_context and snapshot_context['origin_info']: origin_info = snapshot_context['origin_info'] query_params = {'origin_type': origin_info['type'], 'origin_url': origin_info['url']} if 'timestamp' in snapshot_context['url_args']: query_params['timestamp'] = \ snapshot_context['url_args']['timestamp'] if 'visit_id' in snapshot_context['query_params']: query_params['visit_id'] = \ snapshot_context['query_params']['visit_id'] elif snapshot_context: query_params = {'snapshot_id': snapshot_context['snapshot_id']} revision_url = reverse('browse-revision', kwargs={'sha1_git': revision_id}, query_params=query_params) if shorten_id: return gen_link(revision_url, revision_id[:7], link_attrs) else: if not link_text: link_text = revision_id return gen_link(revision_url, link_text, link_attrs) def gen_origin_link(origin_info, link_attrs={}): """ Utility function for generating a link to a SWH origin HTML view to insert in Django templates. Args: origin_info (dict): a dicted filled with origin information (id, type, url) link_attrs (dict): optional attributes (e.g. class) to add to the link Returns: An HTML link in the form 'Origin: origin_url' """ # noqa origin_browse_url = reverse('browse-origin', kwargs={'origin_type': origin_info['type'], 'origin_url': origin_info['url']}) return gen_link(origin_browse_url, 'Origin: ' + origin_info['url'], link_attrs) def gen_directory_link(sha1_git, link_text=None, link_attrs={}): """ Utility function for generating a link to a SWH directory HTML view to insert in Django templates. Args: sha1_git (str): directory identifier link_text (str): optional text for the generated link (the generated url will be used by default) link_attrs (dict): optional attributes (e.g. class) to add to the link Returns: An HTML link in the form 'link_text' """ directory_url = reverse('browse-directory', kwargs={'sha1_git': sha1_git}) if not link_text: link_text = directory_url return gen_link(directory_url, link_text, link_attrs) def gen_snapshot_link(snapshot_id, link_text=None, link_attrs={}): """ Utility function for generating a link to a SWH snapshot HTML view to insert in Django templates. Args: snapshot_id (str): snapshot identifier link_text (str): optional text for the generated link (the generated url will be used by default) link_attrs (dict): optional attributes (e.g. class) to add to the link Returns: An HTML link in the form 'link_text' """ snapshot_url = reverse('browse-snapshot', kwargs={'snapshot_id': snapshot_id}) if not link_text: link_text = snapshot_url return gen_link(snapshot_url, link_text, link_attrs) def gen_snapshot_directory_link(snapshot_context, revision_id=None, link_text=None, link_attrs={}): """ Utility function for generating a link to a SWH directory HTML view in the context of a snapshot to insert in Django templates. Args: snapshot_context (dict): the snapshot information revision_id (str): optional revision identifier in order to use the associated directory link_text (str): optional text to use for the generated link link_attrs (dict): optional attributes (e.g. class) to add to the link Returns: An HTML link in the form 'origin_directory_view_url' """ query_params = {'revision': revision_id} if snapshot_context['origin_info']: origin_info = snapshot_context['origin_info'] url_args = {'origin_type': origin_info['type'], 'origin_url': origin_info['url']} if 'timestamp' in snapshot_context['url_args']: url_args['timestamp'] = \ snapshot_context['url_args']['timestamp'] if 'visit_id' in snapshot_context['query_params']: query_params['visit_id'] = \ snapshot_context['query_params']['visit_id'] directory_url = reverse('browse-origin-directory', kwargs=url_args, query_params=query_params) else: url_args = {'snapshot_id': snapshot_context['snapshot_id']} directory_url = reverse('browse-snapshot-directory', kwargs=url_args, query_params=query_params) if not link_text: link_text = directory_url return gen_link(directory_url, link_text, link_attrs) def gen_content_link(sha1_git, link_text=None, link_attrs={}): """ Utility function for generating a link to a SWH content HTML view to insert in Django templates. Args: sha1_git (str): content identifier link_text (str): optional text for the generated link (the generated url will be used by default) link_attrs (dict): optional attributes (e.g. class) to add to the link Returns: An HTML link in the form 'link_text' """ content_url = reverse('browse-content', kwargs={'query_string': 'sha1_git:' + sha1_git}) if not link_text: link_text = content_url return gen_link(content_url, link_text, link_attrs) def get_revision_log_url(revision_id, snapshot_context=None): """ Utility function for getting the URL for a SWH revision log HTML view (possibly in the context of an origin). Args: revision_id (str): revision identifier the history heads to snapshot_context (dict): if provided, generate snapshot-dependent browsing link Returns: The SWH revision log view URL """ query_params = {'revision': revision_id} if snapshot_context and snapshot_context['origin_info']: origin_info = snapshot_context['origin_info'] url_args = {'origin_type': origin_info['type'], 'origin_url': origin_info['url']} if 'timestamp' in snapshot_context['url_args']: url_args['timestamp'] = \ snapshot_context['url_args']['timestamp'] if 'visit_id' in snapshot_context['query_params']: query_params['visit_id'] = \ snapshot_context['query_params']['visit_id'] revision_log_url = reverse('browse-origin-log', kwargs=url_args, query_params=query_params) elif snapshot_context: url_args = {'snapshot_id': snapshot_context['snapshot_id']} revision_log_url = reverse('browse-snapshot-log', kwargs=url_args, query_params=query_params) else: revision_log_url = reverse('browse-revision-log', kwargs={'sha1_git': revision_id}) return revision_log_url def gen_revision_log_link(revision_id, snapshot_context=None, link_text=None, link_attrs={}): """ Utility function for generating a link to a SWH revision log HTML view (possibly in the context of an origin) to insert in Django templates. Args: revision_id (str): revision identifier the history heads to snapshot_context (dict): if provided, generate snapshot-dependent browsing link link_text (str): optional text to use for the generated link link_attrs (dict): optional attributes (e.g. class) to add to the link Returns: An HTML link in the form 'link_text' """ revision_log_url = get_revision_log_url(revision_id, snapshot_context) if not link_text: link_text = revision_log_url return gen_link(revision_log_url, link_text, link_attrs) def _format_log_entries(revision_log, per_page, snapshot_context=None): revision_log_data = [] for i, log in enumerate(revision_log): if i == per_page: break revision_log_data.append( {'author': gen_person_link(log['author']['id'], log['author']['name'], snapshot_context), 'revision': gen_revision_link(log['id'], True, snapshot_context), 'message': log['message'], 'date': format_utc_iso_date(log['date']), 'directory': log['directory']}) return revision_log_data def prepare_revision_log_for_display(revision_log, per_page, revs_breadcrumb, snapshot_context=None): """ Utility functions that process raw revision log data for HTML display. Its purpose is to: * add links to relevant SWH browse views * format date in human readable format * truncate the message log It also computes the data needed to generate the links for navigating back and forth in the history log. Args: revision_log (list): raw revision log as returned by the SWH web api per_page (int): number of log entries per page revs_breadcrumb (str): breadcrumbs of revisions navigated so far, in the form 'rev1[/rev2/../revN]'. Each revision corresponds to the first one displayed in the HTML view for history log. snapshot_context (dict): if provided, generate snapshot-dependent browsing link """ current_rev = revision_log[0]['id'] next_rev = None prev_rev = None next_revs_breadcrumb = None prev_revs_breadcrumb = None if len(revision_log) == per_page + 1: prev_rev = revision_log[-1]['id'] prev_rev_bc = current_rev if snapshot_context: prev_rev_bc = prev_rev if revs_breadcrumb: revs = revs_breadcrumb.split('/') next_rev = revs[-1] if len(revs) > 1: next_revs_breadcrumb = '/'.join(revs[:-1]) if len(revision_log) == per_page + 1: prev_revs_breadcrumb = revs_breadcrumb + '/' + prev_rev_bc else: prev_revs_breadcrumb = prev_rev_bc return {'revision_log_data': _format_log_entries(revision_log, per_page, snapshot_context), 'prev_rev': prev_rev, 'prev_revs_breadcrumb': prev_revs_breadcrumb, 'next_rev': next_rev, 'next_revs_breadcrumb': next_revs_breadcrumb} # list of origin types that can be found in the swh archive # TODO: retrieve it dynamically in an efficient way instead # of hardcoding it _swh_origin_types = ['git', 'svn', 'deb', 'hg', 'ftp', 'deposit'] def get_origin_info(origin_url, origin_type=None): """ Get info about a SWH origin. Its main purpose is to automatically find an origin type when it is not provided as parameter. Args: origin_url (str): complete url of a SWH origin origin_type (str): optionnal origin type Returns: A dict with the following entries: * type: the origin type * url: the origin url * id: the SWH internal id of the origin """ if origin_type: return service.lookup_origin({'type': origin_type, 'url': origin_url}) else: for origin_type in _swh_origin_types: origin_info = service.lookup_origin({'type': origin_type, 'url': origin_url}) if origin_info: return origin_info return None def get_snapshot_context(snapshot_id=None, origin_type=None, origin_url=None, timestamp=None, visit_id=None): """ Utility function to compute relevant information when navigating the SWH archive in a snapshot context. The snapshot is either referenced by its id or it will be retrieved from an origin visit. Args: snapshot_id (str): hexadecimal representation of a snapshot identifier, all other parameters will be ignored if it is provided origin_type (str): the origin type (git, svn, deposit, ...) origin_url (str): the origin_url (e.g. https://github.com/(user)/(repo)/) timestamp (str): a datetime string for retrieving the closest SWH visit of the origin visit_id (int): optional visit id for disambiguation in case of several visits with the same timestamp Returns: A dict with the following entries: * origin_info: dict containing origin information * visit_info: dict containing SWH visit information * branches: the list of branches for the origin found during the visit * releases: the list of releases for the origin found during the visit * origin_browse_url: the url to browse the origin * origin_branches_url: the url to browse the origin branches * origin_releases_url': the url to browse the origin releases * origin_visit_url: the url to browse the snapshot of the origin found during the visit * url_args: dict containg url arguments to use when browsing in the context of the origin and its visit """ # noqa origin_info = None visit_info = None url_args = None query_params = {} branches = [] releases = [] browse_url = None visit_url = None branches_url = None releases_url = None swh_type = 'snapshot' if origin_url: swh_type = 'origin' origin_info = get_origin_info(origin_url, origin_type) origin_info['type'] = origin_type visit_info = get_origin_visit(origin_info, timestamp, visit_id, snapshot_id) visit_info['fmt_date'] = format_utc_iso_date(visit_info['date']) snapshot_id = visit_info['snapshot'] # provided timestamp is not necessarily equals to the one # of the retrieved visit, so get the exact one in order # use it in the urls generated below if timestamp: timestamp = visit_info['date'] branches, releases = \ get_origin_visit_snapshot(origin_info, timestamp, visit_id, snapshot_id) url_args = {'origin_type': origin_info['type'], 'origin_url': origin_info['url']} query_params = {'visit_id': visit_id} browse_url = reverse('browse-origin-visits', kwargs=url_args) if timestamp: url_args['timestamp'] = format_utc_iso_date(timestamp, '%Y-%m-%dT%H:%M:%S') visit_url = reverse('browse-origin-directory', kwargs=url_args, query_params=query_params) visit_info['url'] = visit_url branches_url = reverse('browse-origin-branches', kwargs=url_args, query_params=query_params) releases_url = reverse('browse-origin-releases', kwargs=url_args, query_params=query_params) elif snapshot_id: branches, releases = get_snapshot_content(snapshot_id) url_args = {'snapshot_id': snapshot_id} browse_url = reverse('browse-snapshot', kwargs=url_args) branches_url = reverse('browse-snapshot-branches', kwargs=url_args) releases_url = reverse('browse-snapshot-releases', kwargs=url_args) releases = list(reversed(releases)) return { 'swh_type': swh_type, 'snapshot_id': snapshot_id, 'origin_info': origin_info, 'visit_info': visit_info, 'branches': branches, 'releases': releases, 'branch': None, 'release': None, 'browse_url': browse_url, 'branches_url': branches_url, 'releases_url': releases_url, 'url_args': url_args, 'query_params': query_params } # list of common readme names ordered by preference # (lower indices have higher priority) _common_readme_names = [ "readme.markdown", "readme.md", "readme.rst", "readme.txt", "readme" ] def get_readme_to_display(readmes): """ Process a list of readme files found in a directory in order to find the adequate one to display. Args: readmes: a list of dict where keys are readme file names and values are readme sha1s Returns: A tuple (readme_name, readme_sha1) """ readme_name = None readme_url = None readme_sha1 = None readme_html = None lc_readmes = {k.lower(): {'orig_name': k, 'sha1': v} for k, v in readmes.items()} # look for readme names according to the preference order # defined by the _common_readme_names list for common_readme_name in _common_readme_names: if common_readme_name in lc_readmes: readme_name = lc_readmes[common_readme_name]['orig_name'] readme_sha1 = lc_readmes[common_readme_name]['sha1'] readme_url = reverse('browse-content-raw', kwargs={'query_string': readme_sha1}) break # otherwise pick the first readme like file if any if not readme_name and len(readmes.items()) > 0: readme_name = next(iter(readmes)) readme_sha1 = readmes[readme_name] readme_url = reverse('browse-content-raw', kwargs={'query_string': readme_sha1}) # convert rst README to html server side as there is # no viable solution to perform that task client side if readme_name and readme_name.endswith('.rst'): cache_entry_id = 'readme_%s' % readme_sha1 cache_entry = cache.get(cache_entry_id) if cache_entry: readme_html = cache_entry else: rst_doc = request_content(readme_sha1) readme_html = pypandoc.convert_text(rst_doc['raw_data'], 'html', format='rst') cache.set(cache_entry_id, readme_html) return readme_name, readme_url, readme_html diff --git a/swh/web/browse/views/origin.py b/swh/web/browse/views/origin.py index f6b31484..2fa7533f 100644 --- a/swh/web/browse/views/origin.py +++ b/swh/web/browse/views/origin.py @@ -1,234 +1,235 @@ # Copyright (C) 2017-2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information import json from distutils.util import strtobool from django.http import HttpResponse from django.shortcuts import render, redirect from swh.web.common import service from swh.web.common.utils import ( - reverse, format_utc_iso_date, parse_timestamp + reverse, format_utc_iso_date, parse_timestamp, + get_origin_visits ) from swh.web.common.exc import handle_view_exception from swh.web.browse.utils import ( - get_origin_info, get_origin_visits + get_origin_info ) from swh.web.browse.browseurls import browse_route from .utils.snapshot_context import ( browse_snapshot_directory, browse_snapshot_content, browse_snapshot_log, browse_snapshot_branches, browse_snapshot_releases ) @browse_route(r'origin/(?P[a-z]+)/url/(?P.+)/visit/(?P.+)/directory/', # noqa r'origin/(?P[a-z]+)/url/(?P.+)/visit/(?P.+)/directory/(?P.+)/', # noqa r'origin/(?P[a-z]+)/url/(?P.+)/directory/', # noqa r'origin/(?P[a-z]+)/url/(?P.+)/directory/(?P.+)/', # noqa r'origin/(?P.+)/visit/(?P.+)/directory/', # noqa r'origin/(?P.+)/visit/(?P.+)/directory/(?P.+)/', # noqa r'origin/(?P.+)/directory/', # noqa r'origin/(?P.+)/directory/(?P.+)/', # noqa view_name='browse-origin-directory') def origin_directory_browse(request, origin_url, origin_type=None, timestamp=None, path=None): """Django view for browsing the content of a SWH directory associated to an origin for a given visit. The url scheme that points to it is the following: * :http:get:`/browse/origin/[(origin_type)/url/](origin_url)/directory/[(path)/]` * :http:get:`/browse/origin/[(origin_type)/url/](origin_url)/visit/(timestamp)/directory/[(path)/]` """ # noqa return browse_snapshot_directory( request, origin_type=origin_type, origin_url=origin_url, timestamp=timestamp, path=path) @browse_route(r'origin/(?P[a-z]+)/url/(?P.+)/visit/(?P.+)/content/(?P.+)/', # noqa r'origin/(?P[a-z]+)/url/(?P.+)/content/(?P.+)/', # noqa r'origin/(?P.+)/visit/(?P.+)/content/(?P.+)/', # noqa r'origin/(?P.+)/content/(?P.+)/', # noqa view_name='browse-origin-content') def origin_content_browse(request, origin_url, origin_type=None, path=None, timestamp=None): """Django view that produces an HTML display of a SWH content associated to an origin for a given visit. The url scheme that points to it is the following: * :http:get:`/browse/origin/[(origin_type)/url/](origin_url)/content/(path)/` * :http:get:`/browse/origin/[(origin_type)/url/](origin_url)/visit/(timestamp)/content/(path)/` """ # noqa return browse_snapshot_content(request, origin_type=origin_type, origin_url=origin_url, timestamp=timestamp, path=path) PER_PAGE = 20 @browse_route(r'origin/(?P[a-z]+)/url/(?P.+)/visit/(?P.+)/log/', # noqa r'origin/(?P[a-z]+)/url/(?P.+)/log/', r'origin/(?P.+)/visit/(?P.+)/log/', # noqa r'origin/(?P.+)/log/', view_name='browse-origin-log') def origin_log_browse(request, origin_url, origin_type=None, timestamp=None): """Django view that produces an HTML display of revisions history (aka the commit log) associated to a SWH origin. The url scheme that points to it is the following: * :http:get:`/browse/origin/[(origin_type)/url/](origin_url)/log/` * :http:get:`/browse/origin/[(origin_type)/url/](origin_url)/visit/(timestamp)/log/` """ # noqa return browse_snapshot_log(request, origin_type=origin_type, origin_url=origin_url, timestamp=timestamp) @browse_route(r'origin/(?P[a-z]+)/url/(?P.+)/visit/(?P.+)/branches/', # noqa r'origin/(?P[a-z]+)/url/(?P.+)/branches/', # noqa r'origin/(?P.+)/visit/(?P.+)/branches/', # noqa r'origin/(?P.+)/branches/', # noqa view_name='browse-origin-branches') def origin_branches_browse(request, origin_url, origin_type=None, timestamp=None): """Django view that produces an HTML display of the list of branches associated to an origin for a given visit. The url scheme that points to it is the following: * :http:get:`/browse/origin/[(origin_type)/url/](origin_url)/branches/` * :http:get:`/browse/origin/[(origin_type)/url/](origin_url)/visit/(timestamp)/branches/` """ # noqa return browse_snapshot_branches(request, origin_type=origin_type, origin_url=origin_url, timestamp=timestamp) @browse_route(r'origin/(?P[a-z]+)/url/(?P.+)/visit/(?P.+)/releases/', # noqa r'origin/(?P[a-z]+)/url/(?P.+)/releases/', # noqa r'origin/(?P.+)/visit/(?P.+)/releases/', # noqa r'origin/(?P.+)/releases/', # noqa view_name='browse-origin-releases') def origin_releases_browse(request, origin_url, origin_type=None, timestamp=None): """Django view that produces an HTML display of the list of releases associated to an origin for a given visit. The url scheme that points to it is the following: * :http:get:`/browse/origin/[(origin_type)/url/](origin_url)/releases/` * :http:get:`/browse/origin/[(origin_type)/url/](origin_url)/visit/(timestamp)/releases/` """ # noqa return browse_snapshot_releases(request, origin_type=origin_type, origin_url=origin_url, timestamp=timestamp) @browse_route(r'origin/(?P[a-z]+)/url/(?P.+)/visits/', r'origin/(?P.+)/visits/', view_name='browse-origin-visits') def origin_visits_browse(request, origin_url, origin_type=None): """Django view that produces an HTML display of visits reporting for a swh origin identified by its id or its url. The url that points to it is :http:get:`/browse/origin/[(origin_type)/url/](origin_url)/visits/`. """ # noqa try: origin_info = get_origin_info(origin_url, origin_type) origin_visits = get_origin_visits(origin_info) except Exception as exc: return handle_view_exception(request, exc) origin_info['last swh visit browse url'] = \ reverse('browse-origin-directory', kwargs={'origin_url': origin_url, 'origin_type': origin_type}) for i, visit in enumerate(origin_visits): url_date = format_utc_iso_date(visit['date'], '%Y-%m-%dT%H:%M:%SZ') visit['fmt_date'] = format_utc_iso_date(visit['date']) query_params = {} if i < len(origin_visits) - 1: if visit['date'] == origin_visits[i+1]['date']: query_params = {'visit_id': visit['visit']} if i > 0: if visit['date'] == origin_visits[i-1]['date']: query_params = {'visit_id': visit['visit']} snapshot = visit['snapshot'] if visit['snapshot'] else '' visit['browse_url'] = reverse('browse-origin-directory', kwargs={'origin_type': origin_type, 'origin_url': origin_url, 'timestamp': url_date}, query_params=query_params) if not snapshot: visit['snapshot'] = '' visit['date'] = parse_timestamp(visit['date']).timestamp() return render(request, 'origin-visits.html', {'heading': 'Origin visits', 'swh_object_name': 'Visits', 'swh_object_metadata': origin_info, 'origin_visits': origin_visits, 'origin_info': origin_info, 'browse_url_base': '/browse/origin/%s/url/%s/' % (origin_type, origin_url), 'vault_cooking': None, 'show_actions_menu': False}) @browse_route(r'origin/search/(?P.+)/', view_name='browse-origin-search') def _origin_search(request, url_pattern): """Internal browse endpoint to search for origins whose urls contain a provided string pattern or match a provided regular expression. The search is performed in a case insensitive way. """ offset = int(request.GET.get('offset', '0')) limit = int(request.GET.get('limit', '50')) regexp = request.GET.get('regexp', 'false') results = service.search_origin(url_pattern, offset, limit, bool(strtobool(regexp))) results = json.dumps(list(results), sort_keys=True, indent=4, separators=(',', ': ')) return HttpResponse(results, content_type='application/json') @browse_route(r'origin/(?P[0-9]+)/latest_snapshot/', view_name='browse-origin-latest-snapshot') def _origin_latest_snapshot(request, origin_id): """ Internal browse endpoint used to check if an origin has already been visited by Software Heritage and has at least one full visit. """ result = service.lookup_latest_origin_snapshot(origin_id, allowed_statuses=['full']) result = json.dumps(result, sort_keys=True, indent=4, separators=(',', ': ')) return HttpResponse(result, content_type='application/json') @browse_route(r'origin/(?P[a-z]+)/url/(?P.+)/', r'origin/(?P.+)/', view_name='browse-origin') def origin_browse(request, origin_url, origin_type=None): """Django view that redirects to the display of the latest archived snapshot for a given software origin. """ # noqa last_snapshot_url = reverse('browse-origin-directory', kwargs={'origin_type': origin_type, 'origin_url': origin_url}) return redirect(last_snapshot_url) diff --git a/swh/web/common/converters.py b/swh/web/common/converters.py index d97a26fd..a151fd00 100644 --- a/swh/web/common/converters.py +++ b/swh/web/common/converters.py @@ -1,362 +1,362 @@ # Copyright (C) 2015-2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information import datetime import json from swh.model import hashutil from swh.core.utils import decode_with_escape -from swh.web.api import utils +from swh.web.common import utils def _group_checksums(data): """Groups checksums values computed from hash functions used in swh and stored in data dict under a single entry 'checksums' """ if data: checksums = {} for hash in hashutil.ALGORITHMS: if hash in data and data[hash]: checksums[hash] = data[hash] del data[hash] if len(checksums) > 0: data['checksums'] = checksums def from_swh(dict_swh, hashess={}, bytess={}, dates={}, blacklist={}, removables_if_empty={}, empty_dict={}, empty_list={}, convert={}, convert_fn=lambda x: x): """Convert from an swh dictionary to something reasonably json serializable. Args: dict_swh: the origin dictionary needed to be transformed hashess: list/set of keys representing hashes values (sha1, sha256, sha1_git, etc...) as bytes. Those need to be transformed in hexadecimal string bytess: list/set of keys representing bytes values which needs to be decoded blacklist: set of keys to filter out from the conversion convert: set of keys whose associated values need to be converted using convert_fn convert_fn: the conversion function to apply on the value of key in 'convert' The remaining keys are copied as is in the output. Returns: dictionary equivalent as dict_swh only with its keys converted. """ def convert_hashes_bytes(v): """v is supposedly a hash as bytes, returns it converted in hex. """ if isinstance(v, bytes): return hashutil.hash_to_hex(v) return v def convert_bytes(v): """v is supposedly a bytes string, decode as utf-8. FIXME: Improve decoding policy. If not utf-8, break! """ if isinstance(v, bytes): return v.decode('utf-8') return v def convert_date(v): """ Args: v (dict or datatime): either: - a dict with three keys: - timestamp (dict or integer timestamp) - offset - negative_utc - or, a datetime We convert it to a human-readable string """ if not v: return v if isinstance(v, datetime.datetime): return v.isoformat() tz = datetime.timezone(datetime.timedelta(minutes=v['offset'])) swh_timestamp = v['timestamp'] if isinstance(swh_timestamp, dict): date = datetime.datetime.fromtimestamp( swh_timestamp['seconds'], tz=tz) else: date = datetime.datetime.fromtimestamp( swh_timestamp, tz=tz) datestr = date.isoformat() if v['offset'] == 0 and v['negative_utc']: # remove the rightmost + and replace it with a - return '-'.join(datestr.rsplit('+', 1)) return datestr if not dict_swh: return dict_swh new_dict = {} for key, value in dict_swh.items(): if key in blacklist or (key in removables_if_empty and not value): continue if key in dates: new_dict[key] = convert_date(value) elif key in convert: new_dict[key] = convert_fn(value) elif isinstance(value, dict): new_dict[key] = from_swh(value, hashess=hashess, bytess=bytess, dates=dates, blacklist=blacklist, removables_if_empty=removables_if_empty, empty_dict=empty_dict, empty_list=empty_list, convert=convert, convert_fn=convert_fn) elif key in hashess: new_dict[key] = utils.fmap(convert_hashes_bytes, value) elif key in bytess: try: new_dict[key] = utils.fmap(convert_bytes, value) except UnicodeDecodeError: if 'decoding_failures' not in new_dict: new_dict['decoding_failures'] = [key] else: new_dict['decoding_failures'].append(key) new_dict[key] = utils.fmap(decode_with_escape, value) elif key in empty_dict and not value: new_dict[key] = {} elif key in empty_list and not value: new_dict[key] = [] else: new_dict[key] = value _group_checksums(new_dict) return new_dict def from_provenance(provenance): """Convert from a provenance information to a provenance dictionary. Args: provenance (dict): Dictionary with the following keys: - content (sha1_git): the content's identifier - revision (sha1_git): the revision the content was seen - origin (int): the origin the content was seen - visit (int): the visit it occurred - path (bytes): the path the content was seen at """ return from_swh(provenance, hashess={'content', 'revision'}, bytess={'path'}) def from_origin(origin): """Convert from an SWH origin to an origin dictionary. """ return from_swh(origin, removables_if_empty={'lister', 'project'}) def from_release(release): """Convert from an SWH release to a json serializable release dictionary. Args: release (dict): dictionary with keys: - id: identifier of the revision (sha1 in bytes) - revision: identifier of the revision the release points to (sha1 in bytes) comment: release's comment message (bytes) name: release's name (string) author: release's author identifier (swh's id) synthetic: the synthetic property (boolean) Returns: dict: Release dictionary with the following keys: - id: hexadecimal sha1 (string) - revision: hexadecimal sha1 (string) - comment: release's comment message (string) - name: release's name (string) - author: release's author identifier (swh's id) - synthetic: the synthetic property (boolean) """ return from_swh( release, hashess={'id', 'target'}, bytess={'message', 'name', 'fullname', 'email'}, dates={'date'}, ) class SWHMetadataEncoder(json.JSONEncoder): """Special json encoder for metadata field which can contain bytes encoded value. """ def default(self, obj): if isinstance(obj, bytes): return obj.decode('utf-8') # Let the base class default method raise the TypeError return json.JSONEncoder.default(self, obj) def convert_revision_metadata(metadata): """Convert json specific dict to a json serializable one. """ if not metadata: return {} return json.loads(json.dumps(metadata, cls=SWHMetadataEncoder)) def from_revision(revision): """Convert from an SWH revision to a json serializable revision dictionary. Args: revision (dict): dict with keys: - id: identifier of the revision (sha1 in bytes) - directory: identifier of the directory the revision points to (sha1 in bytes) - author_name, author_email: author's revision name and email - committer_name, committer_email: committer's revision name and email - message: revision's message - date, date_offset: revision's author date - committer_date, committer_date_offset: revision's commit date - parents: list of parents for such revision - synthetic: revision's property nature - type: revision's type (git, tar or dsc at the moment) - metadata: if the revision is synthetic, this can reference dynamic properties. Returns: dict: Revision dictionary with the same keys as inputs, except: - sha1s are in hexadecimal strings (id, directory) - bytes are decoded in string (author_name, committer_name, author_email, committer_email) Remaining keys are left as is """ revision = from_swh(revision, hashess={'id', 'directory', 'parents', 'children'}, bytess={'name', 'fullname', 'email'}, convert={'metadata'}, convert_fn=convert_revision_metadata, dates={'date', 'committer_date'}) if revision: if 'parents' in revision: revision['merge'] = len(revision['parents']) > 1 if 'message' in revision: try: revision['message'] = revision['message'].decode('utf-8') except UnicodeDecodeError: revision['message_decoding_failed'] = True revision['message'] = None return revision def from_content(content): """Convert swh content to serializable content dictionary. """ return from_swh(content, hashess={'sha1', 'sha1_git', 'sha256', 'blake2s256'}, blacklist={'ctime'}, convert={'status'}, convert_fn=lambda v: 'absent' if v == 'hidden' else v) def from_person(person): """Convert swh person to serializable person dictionary. """ return from_swh(person, bytess={'name', 'fullname', 'email'}) def from_origin_visit(visit): """Convert swh origin_visit to serializable origin_visit dictionary. """ ov = from_swh(visit, hashess={'target', 'snapshot'}, bytess={'branch'}, dates={'date'}, empty_dict={'metadata'}) # TODO: remove that piece of code once snapshot migration # is totally effective in storage (no more occurrences) if ov and 'occurrences' in ov: ov['occurrences'] = { decode_with_escape(k): v for k, v in ov['occurrences'].items() } return ov def from_snapshot(snapshot): """Convert swh snapshot to serializable snapshot dictionary. """ sv = from_swh(snapshot, hashess={'id', 'target'}) if sv and 'branches' in sv: sv['branches'] = { decode_with_escape(k): v for k, v in sv['branches'].items() } return sv def from_directory_entry(dir_entry): """Convert swh person to serializable person dictionary. """ return from_swh(dir_entry, hashess={'dir_id', 'sha1_git', 'sha1', 'sha256', 'blake2s256', 'target'}, bytess={'name'}, removables_if_empty={ 'sha1', 'sha1_git', 'sha256', 'blake2s256', 'status'}, convert={'status'}, convert_fn=lambda v: 'absent' if v == 'hidden' else v) def from_filetype(content_entry): """Convert swh person to serializable person dictionary. """ return from_swh(content_entry, hashess={'id'}, bytess={'mimetype', 'encoding'}) diff --git a/swh/web/common/utils.py b/swh/web/common/utils.py index 9ea969ef..0fc91dfa 100644 --- a/swh/web/common/utils.py +++ b/swh/web/common/utils.py @@ -1,187 +1,250 @@ # Copyright (C) 2017-2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information import re from datetime import datetime, timezone from dateutil import parser as date_parser from dateutil import tz -from swh.web.common.exc import BadInputExc - +from django.core.cache import cache from django.core import urlresolvers from django.http import QueryDict +from swh.web.common import service +from swh.web.common.exc import BadInputExc + def reverse(viewname, args=None, kwargs=None, query_params=None, current_app=None, urlconf=None): """An override of django reverse function supporting query parameters. Args: viewname: the name of the django view from which to compute a url args: list of url arguments ordered according to their position it kwargs: dictionary of url arguments indexed by their names query_params: dictionary of query parameters to append to the reversed url current_app: the name of the django app tighted to the view urlconf: url configuration module Returns: The url of the requested view with processed arguments and query parameters """ if kwargs: kwargs = {k: v for k, v in kwargs.items() if v is not None} url = urlresolvers.reverse( viewname, urlconf=urlconf, args=args, kwargs=kwargs, current_app=current_app) if query_params: query_params = {k: v for k, v in query_params.items() if v is not None} if query_params and len(query_params) > 0: query_dict = QueryDict('', mutable=True) for k in sorted(query_params.keys()): query_dict[k] = query_params[k] url += ('?' + query_dict.urlencode(safe='/')) return url def fmap(f, data): """Map f to data at each level. This must keep the origin data structure type: - map -> map - dict -> dict - list -> list - None -> None Args: f: function that expects one argument. data: data to traverse to apply the f function. list, map, dict or bare value. Returns: The same data-structure with modified values by the f function. """ if data is None: return data if isinstance(data, map): return map(lambda y: fmap(f, y), (x for x in data)) if isinstance(data, list): return [fmap(f, x) for x in data] if isinstance(data, dict): return {k: fmap(f, v) for (k, v) in data.items()} return f(data) def datetime_to_utc(date): """Returns datetime in UTC without timezone info Args: date (datetime.datetime): input datetime with timezone info Returns: datetime.datime: datetime in UTC without timezone info """ if date.tzinfo: return date.astimezone(tz.gettz('UTC')).replace(tzinfo=timezone.utc) else: return date def parse_timestamp(timestamp): """Given a time or timestamp (as string), parse the result as UTC datetime. Returns: a timezone-aware datetime representing the parsed value. None if the parsing fails. Samples: - 2016-01-12 - 2016-01-12T09:19:12+0100 - Today is January 1, 2047 at 8:21:00AM - 1452591542 """ if not timestamp: return None try: date = date_parser.parse(timestamp, ignoretz=False, fuzzy=True) return datetime_to_utc(date) except Exception: try: return datetime.utcfromtimestamp(float(timestamp)).replace( tzinfo=timezone.utc) except (ValueError, OverflowError) as e: raise BadInputExc(e) def shorten_path(path): """Shorten the given path: for each hash present, only return the first 8 characters followed by an ellipsis""" sha256_re = r'([0-9a-f]{8})[0-9a-z]{56}' sha1_re = r'([0-9a-f]{8})[0-9a-f]{32}' ret = re.sub(sha256_re, r'\1...', path) return re.sub(sha1_re, r'\1...', ret) def format_utc_iso_date(iso_date, fmt='%d %B %Y, %H:%M UTC'): """Turns a string reprensation of an ISO 8601 date string to UTC and format it into a more human readable one. For instance, from the following input string: '2017-05-04T13:27:13+02:00' the following one is returned: '04 May 2017, 11:27 UTC'. Custom format string may also be provided as parameter Args: iso_date (str): a string representation of an ISO 8601 date fmt (str): optional date formatting string Returns: A formatted string representation of the input iso date """ if not iso_date: return iso_date date = parse_timestamp(iso_date) return date.strftime(fmt) def gen_path_info(path): """Function to generate path data navigation for use with a breadcrumb in the swh web ui. For instance, from a path /folder1/folder2/folder3, it returns the following list:: [{'name': 'folder1', 'path': 'folder1'}, {'name': 'folder2', 'path': 'folder1/folder2'}, {'name': 'folder3', 'path': 'folder1/folder2/folder3'}] Args: path: a filesystem path Returns: A list of path data for navigation as illustrated above. """ path_info = [] if path: sub_paths = path.strip('/').split('/') path_from_root = '' for p in sub_paths: path_from_root += '/' + p path_info.append({'name': p, 'path': path_from_root.strip('/')}) return path_info + + +def get_origin_visits(origin_info): + """Function that returns the list of visits for a swh origin. + That list is put in cache in order to speedup the navigation + in the swh web browse ui. + + Args: + origin_id (int): the id of the swh origin to fetch visits from + + Returns: + A list of dict describing the origin visits:: + + [{'date': , + 'origin': , + 'status': <'full' | 'partial'>, + 'visit': + }, + ... + ] + + Raises: + NotFoundExc if the origin is not found + """ + cache_entry_id = 'origin_%s_visits' % origin_info['id'] + cache_entry = cache.get(cache_entry_id) + + if cache_entry: + return cache_entry + + origin_visits = [] + + per_page = service.MAX_LIMIT + last_visit = None + while 1: + visits = list(service.lookup_origin_visits(origin_info['id'], + last_visit=last_visit, + per_page=per_page)) + origin_visits += visits + if len(visits) < per_page: + break + else: + if not last_visit: + last_visit = per_page + else: + last_visit += per_page + + def _visit_sort_key(visit): + ts = parse_timestamp(visit['date']).timestamp() + return ts + (float(visit['visit']) / 10e3) + + for v in origin_visits: + if 'metadata' in v: + del v['metadata'] + origin_visits = [dict(t) for t in set([tuple(d.items()) + for d in origin_visits])] + origin_visits = sorted(origin_visits, key=lambda v: _visit_sort_key(v)) + + cache.set(cache_entry_id, origin_visits) + + return origin_visits diff --git a/swh/web/tests/api/views/test_origin.py b/swh/web/tests/api/views/test_origin.py index 5190466a..fa88bd0d 100644 --- a/swh/web/tests/api/views/test_origin.py +++ b/swh/web/tests/api/views/test_origin.py @@ -1,259 +1,269 @@ # Copyright (C) 2015-2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information from nose.tools import istest from rest_framework.test import APITestCase from unittest.mock import patch from swh.storage.exc import StorageDBError, StorageAPIError from swh.web.tests.testbase import SWHWebTestBase class OriginApiTestCase(SWHWebTestBase, APITestCase): def setUp(self): self.origin_visit1 = { 'date': 1104616800.0, 'origin': 10, 'visit': 100, 'metadata': None, 'status': 'full', } self.origin1 = { 'id': 1234, 'lister': 'uuid-lister-0', 'project': 'uuid-project-0', 'url': 'ftp://some/url/to/origin/0', 'type': 'ftp' } - @patch('swh.web.api.views.origin.service') + @patch('swh.web.api.views.origin.get_origin_visits') @istest - def api_1_lookup_origin_visits_raise_error(self, mock_service): + def api_1_lookup_origin_visits_raise_error(self, mock_get_origin_visits): # given - mock_service.lookup_origin_visits.side_effect = ValueError( + mock_get_origin_visits.side_effect = ValueError( 'voluntary error to check the bad request middleware.') # when rv = self.client.get('/api/1/origin/2/visits/') # then self.assertEquals(rv.status_code, 400) self.assertEquals(rv['Content-Type'], 'application/json') self.assertEquals(rv.data, { 'exception': 'ValueError', 'reason': 'voluntary error to check the bad request middleware.'}) - @patch('swh.web.api.views.origin.service') + @patch('swh.web.common.utils.service') @istest def api_1_lookup_origin_visits_raise_swh_storage_error_db( self, mock_service): # given mock_service.lookup_origin_visits.side_effect = StorageDBError( 'SWH Storage exploded! Will be back online shortly!') # when rv = self.client.get('/api/1/origin/2/visits/') # then self.assertEquals(rv.status_code, 503) self.assertEquals(rv['Content-Type'], 'application/json') self.assertEquals(rv.data, { 'exception': 'StorageDBError', 'reason': 'An unexpected error occurred in the backend: ' 'SWH Storage exploded! Will be back online shortly!'}) - @patch('swh.web.api.views.origin.service') + @patch('swh.web.common.utils.service') @istest def api_1_lookup_origin_visits_raise_swh_storage_error_api( self, mock_service): # given mock_service.lookup_origin_visits.side_effect = StorageAPIError( 'SWH Storage API dropped dead! Will resurrect from its ashes asap!' ) # when rv = self.client.get('/api/1/origin/2/visits/') # then self.assertEquals(rv.status_code, 503) self.assertEquals(rv['Content-Type'], 'application/json') self.assertEquals(rv.data, { 'exception': 'StorageAPIError', 'reason': 'An unexpected error occurred in the api backend: ' 'SWH Storage API dropped dead! Will resurrect from its ashes asap!' }) - @patch('swh.web.api.views.origin.service') + @patch('swh.web.api.views.origin.get_origin_visits') @istest - def api_1_lookup_origin_visits(self, mock_service): + def api_1_lookup_origin_visits(self, mock_get_origin_visits): # given stub_visits = [ + { + 'date': 1293919200.0, + 'origin': 2, + 'snapshot': '1234', + 'visit': 1 + }, { 'date': 1293919200.0, 'origin': 2, 'snapshot': '1234', 'visit': 2 }, { 'date': 1420149600.0, 'origin': 2, 'snapshot': '5678', 'visit': 3 + }, + { + 'date': 1420149600.0, + 'origin': 2, + 'snapshot': '5678', + 'visit': 4 } ] - mock_service.lookup_origin_visits.return_value = stub_visits + mock_get_origin_visits.return_value = stub_visits # when - rv = self.client.get('/api/1/origin/2/visits/?per_page=2&last_visit=1') + rv = self.client.get('/api/1/origin/2/visits/?per_page=2&last_visit=3') self.assertEquals(rv.status_code, 200) self.assertEquals(rv['Content-Type'], 'application/json') self.assertEquals(rv.data, [ { 'date': 1293919200.0, 'origin': 2, 'snapshot': '1234', 'visit': 2, 'origin_visit_url': '/api/1/origin/2/visit/2/', 'snapshot_url': '/api/1/snapshot/1234/' }, { - 'date': 1420149600.0, + 'date': 1293919200.0, 'origin': 2, - 'snapshot': '5678', - 'visit': 3, - 'origin_visit_url': '/api/1/origin/2/visit/3/', - 'snapshot_url': '/api/1/snapshot/5678/' - } - ]) + 'snapshot': '1234', + 'visit': 1, + 'origin_visit_url': '/api/1/origin/2/visit/1/', + 'snapshot_url': '/api/1/snapshot/1234/' + }, - mock_service.lookup_origin_visits.assert_called_once_with( - '2', last_visit=1, per_page=2) + ]) @patch('swh.web.api.views.origin.service') @istest def api_1_lookup_origin_visit(self, mock_service): # given origin_visit = self.origin_visit1.copy() origin_visit.update({ 'occurrences': { 'master': { 'target_type': 'revision', 'target': '98564', } }, 'snapshot': '57478754' }) mock_service.lookup_origin_visit.return_value = origin_visit expected_origin_visit = self.origin_visit1.copy() expected_origin_visit.update({ 'origin_url': '/api/1/origin/10/', 'occurrences': { 'master': { 'target_type': 'revision', 'target': '98564', 'target_url': '/api/1/revision/98564/' } }, 'snapshot': '57478754', 'snapshot_url': '/api/1/snapshot/57478754/' }) # when rv = self.client.get('/api/1/origin/10/visit/100/') self.assertEquals(rv.status_code, 200) self.assertEquals(rv['Content-Type'], 'application/json') self.assertEquals(rv.data, expected_origin_visit) mock_service.lookup_origin_visit.assert_called_once_with('10', '100') @patch('swh.web.api.views.origin.service') @istest def api_1_lookup_origin_visit_not_found(self, mock_service): # given mock_service.lookup_origin_visit.return_value = None # when rv = self.client.get('/api/1/origin/1/visit/1000/') self.assertEquals(rv.status_code, 404) self.assertEquals(rv['Content-Type'], 'application/json') self.assertEquals(rv.data, { 'exception': 'NotFoundExc', 'reason': 'No visit 1000 for origin 1 found' }) mock_service.lookup_origin_visit.assert_called_once_with('1', '1000') @patch('swh.web.api.views.origin.service') @istest def api_origin_by_id(self, mock_service): # given mock_service.lookup_origin.return_value = self.origin1 expected_origin = self.origin1.copy() expected_origin.update({ 'origin_visits_url': '/api/1/origin/1234/visits/' }) # when rv = self.client.get('/api/1/origin/1234/') # then self.assertEquals(rv.status_code, 200) self.assertEquals(rv['Content-Type'], 'application/json') self.assertEquals(rv.data, expected_origin) mock_service.lookup_origin.assert_called_with({'id': '1234'}) @patch('swh.web.api.views.origin.service') @istest def api_origin_by_type_url(self, mock_service): # given stub_origin = self.origin1.copy() stub_origin.update({ 'id': 987 }) mock_service.lookup_origin.return_value = stub_origin expected_origin = stub_origin.copy() expected_origin.update({ 'origin_visits_url': '/api/1/origin/987/visits/' }) # when rv = self.client.get('/api/1/origin/ftp/url' '/ftp://some/url/to/origin/0/') # then self.assertEquals(rv.status_code, 200) self.assertEquals(rv['Content-Type'], 'application/json') self.assertEquals(rv.data, expected_origin) mock_service.lookup_origin.assert_called_with( {'url': 'ftp://some/url/to/origin/0/', 'type': 'ftp'}) @patch('swh.web.api.views.origin.service') @istest def api_origin_not_found(self, mock_service): # given mock_service.lookup_origin.return_value = None # when rv = self.client.get('/api/1/origin/4321/') # then self.assertEquals(rv.status_code, 404) self.assertEquals(rv['Content-Type'], 'application/json') self.assertEquals(rv.data, { 'exception': 'NotFoundExc', 'reason': 'Origin with id 4321 not found.' }) mock_service.lookup_origin.assert_called_with({'id': '4321'}) diff --git a/swh/web/tests/browse/test_utils.py b/swh/web/tests/browse/test_utils.py index 9f515ddf..9038904c 100644 --- a/swh/web/tests/browse/test_utils.py +++ b/swh/web/tests/browse/test_utils.py @@ -1,429 +1,397 @@ # Copyright (C) 2017-2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information # flake8: noqa import unittest from unittest.mock import patch from nose.tools import istest from swh.web.browse import utils from swh.web.common.exc import NotFoundExc from swh.web.common.utils import reverse from swh.web.tests.testbase import SWHWebTestBase from .views.data.revision_test_data import revision_history_log_test class SwhBrowseUtilsTestCase(SWHWebTestBase, unittest.TestCase): @istest def get_mimetype_and_encoding_for_content(self): text = b'Hello world!' self.assertEqual(utils.get_mimetype_and_encoding_for_content(text), ('text/plain', 'us-ascii')) - @patch('swh.web.browse.utils.service') - @istest - def get_origin_visits(self, mock_service): - mock_service.MAX_LIMIT = 2 - - def _lookup_origin_visits(*args, **kwargs): - if kwargs['last_visit'] is None: - return [{'visit': 1, - 'date': '2017-05-06T00:59:10+00:00', - 'metadata': {}}, - {'visit': 2, - 'date': '2017-08-06T00:59:10+00:00', - 'metadata': {}} - ] - else: - return [{'visit': 3, - 'date': '2017-09-06T00:59:10+00:00', - 'metadata': {}} - ] - - mock_service.lookup_origin_visits.side_effect = _lookup_origin_visits - - origin_info = { - 'id': 1, - 'type': 'git', - 'url': 'https://github.com/foo/bar', - } - - origin_visits = utils.get_origin_visits(origin_info) - - self.assertEqual(len(origin_visits), 3) - @patch('swh.web.browse.utils.get_origin_visits') @istest def get_origin_visit(self, mock_origin_visits): origin_info = { 'id': 2, 'type': 'git', 'url': 'https://github.com/foo/bar', } visits = \ [{'status': 'full', 'date': '2015-07-09T21:09:24+00:00', 'visit': 1, 'origin': origin_info['id'] }, {'status': 'full', 'date': '2016-02-23T18:05:23.312045+00:00', 'visit': 2, 'origin': origin_info['id'] }, {'status': 'full', 'date': '2016-03-28T01:35:06.554111+00:00', 'visit': 3, 'origin': origin_info['id'] }, {'status': 'full', 'date': '2016-06-18T01:22:24.808485+00:00', 'visit': 4, 'origin': origin_info['id'] }, {'status': 'full', 'date': '2016-08-14T12:10:00.536702+00:00', 'visit': 5, 'origin': origin_info['id'] }] mock_origin_visits.return_value = visits with self.assertRaises(NotFoundExc) as cm: visit_id = 12 visit = utils.get_origin_visit(origin_info, visit_id=visit_id) self.assertIn('Visit with id %s for origin with id %s not found' % (origin_info['id'], visit_id), cm.exception.args[0]) visit = utils.get_origin_visit(origin_info, visit_id=2) self.assertEqual(visit, visits[1]) visit = utils.get_origin_visit( origin_info, visit_ts='2016-02-23T18:05:23.312045+00:00') self.assertEqual(visit, visits[1]) visit = utils.get_origin_visit( origin_info, visit_ts='2016-02-20') self.assertEqual(visit, visits[1]) visit = utils.get_origin_visit( origin_info, visit_ts='2016-06-18T01:22') self.assertEqual(visit, visits[3]) visit = utils.get_origin_visit( origin_info, visit_ts='2016-06-18 01:22') self.assertEqual(visit, visits[3]) visit = utils.get_origin_visit( origin_info, visit_ts=1466208000) self.assertEqual(visit, visits[3]) visit = utils.get_origin_visit( origin_info, visit_ts='2014-01-01') self.assertEqual(visit, visits[0]) visit = utils.get_origin_visit( origin_info, visit_ts='2018-01-01') self.assertEqual(visit, visits[-1]) @patch('swh.web.browse.utils.service') @patch('swh.web.browse.utils.get_origin_visit') @istest def get_origin_visit_snapshot(self, mock_get_origin_visit, mock_service): mock_get_origin_visit.return_value = \ {'status': 'full', 'date': '2015-08-04T22:26:14.804009+00:00', 'visit': 1, 'origin': 1, 'snapshot': '584b2fe3ce6218a96892e73bd76c2966bbc2a797'} mock_service.lookup_snapshot.return_value = \ {'branches': { 'refs/heads/master': { 'target': '9fbd21adbac36be869514e82e2e98505dc47219c', 'target_type': 'revision', 'target_url': '/api/1/revision/9fbd21adbac36be869514e82e2e98505dc47219c/' }, 'refs/tags/0.10.0': { 'target': '6072557b6c10cd9a21145781e26ad1f978ed14b9', 'target_type': 'release', 'target_url': '/api/1/release/6072557b6c10cd9a21145781e26ad1f978ed14b9/' }, 'refs/tags/0.10.1': { 'target': 'ecc003b43433e5b46511157598e4857a761007bf', 'target_type': 'release', 'target_url': '/api/1/release/ecc003b43433e5b46511157598e4857a761007bf/' } }, 'id': '584b2fe3ce6218a96892e73bd76c2966bbc2a797'} mock_service.lookup_release_multiple.return_value = \ [{'name': '0.10.0', 'message': 'release 0.10.0', 'id': '6072557b6c10cd9a21145781e26ad1f978ed14b9', 'date': '2015-08-04T13:16:54+03:00', 'target_type': 'revision', 'target': 'e9c6243371087d04848b7686888f6dd29dfaef0e'}, {'name': '0.10.1', 'message': 'release 0.10.1', 'id': 'ecc003b43433e5b46511157598e4857a761007bf', 'date': '2017-08-04T13:16:54+03:00', 'target_type': 'revision', 'target': '6072557b6c10cd9a21145781e26ad1f978ed14b9'}] mock_service.lookup_revision_multiple.return_value = \ [{'date': '2015-08-04T13:16:54+03:00', 'directory': '828da2b80e41aa958b2c98526f4a1d2cc7d298b7', 'id': '9fbd21adbac36be869514e82e2e98505dc47219c', 'message': 'Merge pull request #678 from algernon'}, {'date': '2014-04-10T23:01:11-04:00', 'directory': '2df4cd84ecc65b50b1d5318d3727e02a39b8a4cf', 'id': '6072557b6c10cd9a21145781e26ad1f978ed14b9', 'message': '0.10: The "Oh fuck it\'s PyCon" release\n'}, {'date': '2014-10-10T09:45:23-04:00', 'directory': '28ba64f97ef709e54838ae482c2da2619a74a0bd', 'id': 'ecc003b43433e5b46511157598e4857a761007bf', 'message': '0.10.1\n'}] expected_result = ( [{'name': 'refs/heads/master', 'message': 'Merge pull request #678 from algernon', 'date': '04 August 2015, 10:16 UTC', 'revision': '9fbd21adbac36be869514e82e2e98505dc47219c', 'directory': '828da2b80e41aa958b2c98526f4a1d2cc7d298b7'}], [{'name': '0.10.0', 'id': '6072557b6c10cd9a21145781e26ad1f978ed14b9', 'message': 'release 0.10.0', 'date': '04 August 2015, 10:16 UTC', 'target_type': 'revision', 'target': 'e9c6243371087d04848b7686888f6dd29dfaef0e', 'directory': '2df4cd84ecc65b50b1d5318d3727e02a39b8a4cf'}, {'name': '0.10.1', 'id': 'ecc003b43433e5b46511157598e4857a761007bf', 'message': 'release 0.10.1', 'date': '04 August 2017, 10:16 UTC', 'target_type': 'revision', 'target': '6072557b6c10cd9a21145781e26ad1f978ed14b9', 'directory': '28ba64f97ef709e54838ae482c2da2619a74a0bd'}] ) origin_info = { 'id': 1, 'type': 'git', 'url': 'https://github.com/hylang/hy' } origin_visit_branches = \ utils.get_origin_visit_snapshot(origin_info, visit_id=1) self.assertEqual(origin_visit_branches, expected_result) @istest def gen_link(self): self.assertEqual(utils.gen_link('https://www.softwareheritage.org/', 'SWH'), 'SWH') @istest def gen_person_link(self): person_id = 8221896 person_name = 'Antoine Lambert' person_url = reverse('browse-person', kwargs={'person_id': person_id}) self.assertEqual(utils.gen_person_link(person_id, person_name), '%s' % (person_url, person_name)) @istest def gen_revision_link(self): revision_id = '28a0bc4120d38a394499382ba21d6965a67a3703' revision_url = reverse('browse-revision', kwargs={'sha1_git': revision_id}) self.assertEqual(utils.gen_revision_link(revision_id), '%s' % (revision_url, revision_id)) self.assertEqual(utils.gen_revision_link(revision_id, shorten_id=True), '%s' % (revision_url, revision_id[:7])) @istest def prepare_revision_log_for_display_no_contex(self): per_page = 10 first_page_logs_data = revision_history_log_test[:per_page+1] second_page_logs_data = revision_history_log_test[per_page:2*per_page+1] third_page_logs_data = revision_history_log_test[2*per_page:3*per_page+1] last_page_logs_data = revision_history_log_test[3*per_page:3*per_page+5] revision_log_display_data = utils.prepare_revision_log_for_display( first_page_logs_data, per_page, None) self.assertEqual(revision_log_display_data['revision_log_data'], utils._format_log_entries(first_page_logs_data, per_page)) self.assertEqual(revision_log_display_data['prev_rev'], first_page_logs_data[-1]['id']) self.assertEqual(revision_log_display_data['prev_revs_breadcrumb'], first_page_logs_data[0]['id']) self.assertEqual(revision_log_display_data['next_rev'], None) self.assertEqual(revision_log_display_data['next_revs_breadcrumb'], None) old_prev_revs_bc = str(revision_log_display_data['prev_revs_breadcrumb']) revision_log_display_data = utils.prepare_revision_log_for_display( second_page_logs_data, per_page, old_prev_revs_bc) self.assertEqual(revision_log_display_data['revision_log_data'], utils._format_log_entries(second_page_logs_data, per_page)) self.assertEqual(revision_log_display_data['prev_rev'], second_page_logs_data[-1]['id']) self.assertEqual(revision_log_display_data['prev_revs_breadcrumb'], old_prev_revs_bc + '/' + second_page_logs_data[0]['id']) self.assertEqual(revision_log_display_data['next_rev'], old_prev_revs_bc) self.assertEqual(revision_log_display_data['next_revs_breadcrumb'], None) old_prev_revs_bc = str(revision_log_display_data['prev_revs_breadcrumb']) revision_log_display_data = utils.prepare_revision_log_for_display( third_page_logs_data, per_page, old_prev_revs_bc) self.assertEqual(revision_log_display_data['revision_log_data'], utils._format_log_entries(third_page_logs_data, per_page)) self.assertEqual(revision_log_display_data['prev_rev'], third_page_logs_data[-1]['id']) self.assertEqual(revision_log_display_data['prev_revs_breadcrumb'], old_prev_revs_bc + '/' + third_page_logs_data[0]['id']) self.assertEqual(revision_log_display_data['next_rev'], old_prev_revs_bc.split('/')[-1]) self.assertEqual(revision_log_display_data['next_revs_breadcrumb'], '/'.join(old_prev_revs_bc.split('/')[:-1])) old_prev_revs_bc = str(revision_log_display_data['prev_revs_breadcrumb']) revision_log_display_data = utils.prepare_revision_log_for_display( last_page_logs_data, per_page, old_prev_revs_bc) self.assertEqual(revision_log_display_data['revision_log_data'], utils._format_log_entries(last_page_logs_data, per_page)) self.assertEqual(revision_log_display_data['prev_rev'], None) self.assertEqual(revision_log_display_data['prev_revs_breadcrumb'], None) self.assertEqual(revision_log_display_data['next_rev'], old_prev_revs_bc.split('/')[-1]) self.assertEqual(revision_log_display_data['next_revs_breadcrumb'], '/'.join(old_prev_revs_bc.split('/')[:-1])) @istest def prepare_revision_log_for_display_snapshot_context(self): per_page = 10 first_page_logs_data = revision_history_log_test[:per_page+1] second_page_logs_data = revision_history_log_test[per_page:2*per_page+1] third_page_logs_data = revision_history_log_test[2*per_page:3*per_page+1] last_page_logs_data = revision_history_log_test[3*per_page:3*per_page+5] snapshot_context = { 'origin_info': {'type': 'git', 'url': 'https://github.com/git/git'}, 'url_args': {}, 'query_params': {} } revision_log_display_data = utils.prepare_revision_log_for_display( first_page_logs_data, per_page, None, snapshot_context=snapshot_context) self.assertEqual(revision_log_display_data['revision_log_data'], utils._format_log_entries(first_page_logs_data, per_page, snapshot_context=snapshot_context)) self.assertEqual(revision_log_display_data['prev_rev'], first_page_logs_data[-1]['id']) self.assertEqual(revision_log_display_data['prev_revs_breadcrumb'], first_page_logs_data[-1]['id']) self.assertEqual(revision_log_display_data['next_rev'], None) self.assertEqual(revision_log_display_data['next_revs_breadcrumb'], None) old_prev_revs_bc = str(revision_log_display_data['prev_revs_breadcrumb']) revision_log_display_data = utils.prepare_revision_log_for_display( second_page_logs_data, per_page, old_prev_revs_bc, snapshot_context=snapshot_context) self.assertEqual(revision_log_display_data['revision_log_data'], utils._format_log_entries(second_page_logs_data, per_page, snapshot_context=snapshot_context)) self.assertEqual(revision_log_display_data['prev_rev'], second_page_logs_data[-1]['id']) self.assertEqual(revision_log_display_data['prev_revs_breadcrumb'], old_prev_revs_bc + '/' + second_page_logs_data[-1]['id']) self.assertEqual(revision_log_display_data['next_rev'], old_prev_revs_bc) self.assertEqual(revision_log_display_data['next_revs_breadcrumb'], None) old_prev_revs_bc = str(revision_log_display_data['prev_revs_breadcrumb']) revision_log_display_data = utils.prepare_revision_log_for_display( third_page_logs_data, per_page, old_prev_revs_bc, snapshot_context=snapshot_context) self.assertEqual(revision_log_display_data['revision_log_data'], utils._format_log_entries(third_page_logs_data, per_page, snapshot_context=snapshot_context)) self.assertEqual(revision_log_display_data['prev_rev'], third_page_logs_data[-1]['id']) self.assertEqual(revision_log_display_data['prev_revs_breadcrumb'], old_prev_revs_bc + '/' + third_page_logs_data[-1]['id']) self.assertEqual(revision_log_display_data['next_rev'], old_prev_revs_bc.split('/')[-1]) self.assertEqual(revision_log_display_data['next_revs_breadcrumb'], '/'.join(old_prev_revs_bc.split('/')[:-1])) old_prev_revs_bc = str(revision_log_display_data['prev_revs_breadcrumb']) revision_log_display_data = utils.prepare_revision_log_for_display( last_page_logs_data, per_page, old_prev_revs_bc, snapshot_context=snapshot_context) self.assertEqual(revision_log_display_data['revision_log_data'], utils._format_log_entries(last_page_logs_data, per_page, snapshot_context=snapshot_context)) self.assertEqual(revision_log_display_data['prev_rev'], None) self.assertEqual(revision_log_display_data['prev_revs_breadcrumb'], None) self.assertEqual(revision_log_display_data['next_rev'], old_prev_revs_bc.split('/')[-1]) self.assertEqual(revision_log_display_data['next_revs_breadcrumb'], '/'.join(old_prev_revs_bc.split('/')[:-1])) diff --git a/swh/web/tests/browse/views/test_release.py b/swh/web/tests/browse/views/test_release.py index 1d3a4bcc..9299e37c 100644 --- a/swh/web/tests/browse/views/test_release.py +++ b/swh/web/tests/browse/views/test_release.py @@ -1,106 +1,109 @@ # Copyright (C) 2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information # flake8: noqa from unittest.mock import patch from nose.tools import istest from django.test import TestCase from swh.web.common.exc import NotFoundExc from swh.web.common.utils import reverse, format_utc_iso_date from swh.web.tests.testbase import SWHWebTestBase from .data.release_test_data import ( stub_release ) from .data.origin_test_data import stub_origin_visits class SwhBrowseReleaseTest(SWHWebTestBase, TestCase): @patch('swh.web.browse.views.release.service') @patch('swh.web.browse.utils.service') + @patch('swh.web.common.utils.service') @istest - def release_browse(self, mock_service_utils, mock_service): + def release_browse(self, mock_service_common, mock_service_utils, + mock_service): mock_service.lookup_release.return_value = stub_release url = reverse('browse-release', kwargs={'sha1_git': stub_release['id']}) release_id = stub_release['id'] release_name = stub_release['name'] author_id = stub_release['author']['id'] author_name = stub_release['author']['name'] author_url = reverse('browse-person', kwargs={'person_id': author_id}) release_date = stub_release['date'] message = stub_release['message'] target_type = stub_release['target_type'] target = stub_release['target'] target_url = reverse('browse-revision', kwargs={'sha1_git': target}) message_lines = stub_release['message'].split('\n') resp = self.client.get(url) self.assertEquals(resp.status_code, 200) self.assertTemplateUsed('release.html') self.assertContains(resp, '%s' % (author_url, author_name)) self.assertContains(resp, format_utc_iso_date(release_date)) self.assertContains(resp, '
%s
%s' % (message_lines[0], '\n'.join(message_lines[1:]))) self.assertContains(resp, release_id) self.assertContains(resp, release_name) self.assertContains(resp, target_type) self.assertContains(resp, '%s' % (target_url, target)) origin_info = { 'id': 13706355, 'type': 'git', 'url': 'https://github.com/python/cpython' } mock_service_utils.lookup_origin.return_value = origin_info - mock_service_utils.lookup_origin_visits.return_value = stub_origin_visits - mock_service_utils.MAX_LIMIT = 20 + mock_service_common.lookup_origin_visits.return_value = stub_origin_visits + mock_service_common.MAX_LIMIT = 20 url = reverse('browse-release', kwargs={'sha1_git': stub_release['id']}, query_params={'origin_type': origin_info['type'], 'origin_url': origin_info['url']}) resp = self.client.get(url) + print(resp.content) self.assertEquals(resp.status_code, 200) self.assertTemplateUsed('release.html') self.assertContains(resp, author_url) self.assertContains(resp, author_name) self.assertContains(resp, format_utc_iso_date(release_date)) self.assertContains(resp, '
%s
%s' % (message_lines[0], '\n'.join(message_lines[1:]))) self.assertContains(resp, release_id) self.assertContains(resp, release_name) self.assertContains(resp, target_type) target_url = reverse('browse-revision', kwargs={'sha1_git': target}, query_params={'origin_type': origin_info['type'], 'origin_url': origin_info['url']}) self.assertContains(resp, '%s' % (target_url, target)) mock_service.lookup_release.side_effect = \ NotFoundExc('Release not found') url = reverse('browse-release', kwargs={'sha1_git': 'ffff'}) resp = self.client.get(url) self.assertEquals(resp.status_code, 404) self.assertTemplateUsed('error.html') self.assertContains(resp, 'Release not found', status_code=404) diff --git a/swh/web/tests/browse/views/test_revision.py b/swh/web/tests/browse/views/test_revision.py index d7e2e85a..ab83a062 100644 --- a/swh/web/tests/browse/views/test_revision.py +++ b/swh/web/tests/browse/views/test_revision.py @@ -1,262 +1,264 @@ # Copyright (C) 2017-2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information # flake8: noqa from unittest.mock import patch from nose.tools import istest from django.test import TestCase from django.utils.html import escape from swh.web.common.exc import NotFoundExc from swh.web.common.utils import reverse, format_utc_iso_date from swh.web.tests.testbase import SWHWebTestBase from .data.revision_test_data import ( revision_id_test, revision_metadata_test, revision_history_log_test ) from .data.origin_test_data import stub_origin_visits class SwhBrowseRevisionTest(SWHWebTestBase, TestCase): @patch('swh.web.browse.views.revision.service') @patch('swh.web.browse.utils.service') + @patch('swh.web.common.utils.service') @istest - def revision_browse(self, mock_service_utils, mock_service): + def revision_browse(self, mock_service_common, mock_service_utils, + mock_service): mock_service.lookup_revision.return_value = revision_metadata_test url = reverse('browse-revision', kwargs={'sha1_git': revision_id_test}) author_id = revision_metadata_test['author']['id'] author_name = revision_metadata_test['author']['name'] committer_id = revision_metadata_test['committer']['id'] committer_name = revision_metadata_test['committer']['name'] dir_id = revision_metadata_test['directory'] author_url = reverse('browse-person', kwargs={'person_id': author_id}) committer_url = reverse('browse-person', kwargs={'person_id': committer_id}) directory_url = reverse('browse-directory', kwargs={'sha1_git': dir_id}) history_url = reverse('browse-revision-log', kwargs={'sha1_git': revision_id_test}) resp = self.client.get(url) self.assertEquals(resp.status_code, 200) self.assertTemplateUsed('revision.html') self.assertContains(resp, '%s' % (author_url, author_name)) self.assertContains(resp, '%s' % (committer_url, committer_name)) self.assertContains(resp, directory_url) self.assertContains(resp, history_url) for parent in revision_metadata_test['parents']: parent_url = reverse('browse-revision', kwargs={'sha1_git': parent}) self.assertContains(resp, '%s' % (parent_url, parent)) author_date = revision_metadata_test['date'] committer_date = revision_metadata_test['committer_date'] message_lines = revision_metadata_test['message'].split('\n') self.assertContains(resp, format_utc_iso_date(author_date)) self.assertContains(resp, format_utc_iso_date(committer_date)) self.assertContains(resp, message_lines[0]) self.assertContains(resp, '\n'.join(message_lines[1:])) origin_info = { 'id': '7416001', 'type': 'git', 'url': 'https://github.com/webpack/webpack' } mock_service_utils.lookup_origin.return_value = origin_info - mock_service_utils.lookup_origin_visits.return_value = stub_origin_visits - mock_service_utils.MAX_LIMIT = 20 + mock_service_common.lookup_origin_visits.return_value = stub_origin_visits + mock_service_common.MAX_LIMIT = 20 origin_directory_url = reverse('browse-origin-directory', kwargs={'origin_type': origin_info['type'], 'origin_url': origin_info['url']}, query_params={'revision': revision_id_test}) origin_revision_log_url = reverse('browse-origin-log', kwargs={'origin_type': origin_info['type'], 'origin_url': origin_info['url']}, query_params={'revision': revision_id_test}) url = reverse('browse-revision', kwargs={'sha1_git': revision_id_test}, query_params={'origin_type': origin_info['type'], 'origin_url': origin_info['url']}) resp = self.client.get(url) self.assertContains(resp, origin_directory_url) self.assertContains(resp, origin_revision_log_url) for parent in revision_metadata_test['parents']: parent_url = reverse('browse-revision', kwargs={'sha1_git': parent}, query_params={'origin_type': origin_info['type'], 'origin_url': origin_info['url']}) self.assertContains(resp, '%s' % (parent_url, parent)) self.assertContains(resp, 'vault-cook-directory') self.assertContains(resp, 'vault-cook-revision') @patch('swh.web.browse.views.revision.service') @istest def revision_log_browse(self, mock_service): per_page = 10 mock_service.lookup_revision_log.return_value = \ revision_history_log_test[:per_page+1] url = reverse('browse-revision-log', kwargs={'sha1_git': revision_id_test}, query_params={'per_page': per_page}) resp = self.client.get(url) prev_rev = revision_history_log_test[per_page]['id'] next_page_url = reverse('browse-revision-log', kwargs={'sha1_git': prev_rev}, query_params={'revs_breadcrumb': revision_id_test, 'per_page': per_page}) self.assertEquals(resp.status_code, 200) self.assertTemplateUsed('revision-log.html') self.assertContains(resp, '', count=per_page) self.assertContains(resp, '
  • Newer
  • ') self.assertContains(resp, '
  • Older
  • ' % escape(next_page_url)) for log in revision_history_log_test[:per_page]: author_url = reverse('browse-person', kwargs={'person_id': log['author']['id']}) revision_url = reverse('browse-revision', kwargs={'sha1_git': log['id']}) directory_url = reverse('browse-directory', kwargs={'sha1_git': log['directory']}) self.assertContains(resp, '%s' % (author_url, log['author']['name'])) self.assertContains(resp, '%s' % (revision_url, log['id'][:7])) self.assertContains(resp, directory_url) mock_service.lookup_revision_log.return_value = \ revision_history_log_test[per_page:2*per_page+1] resp = self.client.get(next_page_url) prev_prev_rev = revision_history_log_test[2*per_page]['id'] prev_page_url = reverse('browse-revision-log', kwargs={'sha1_git': revision_id_test}, query_params={'per_page': per_page}) next_page_url = reverse('browse-revision-log', kwargs={'sha1_git': prev_prev_rev}, query_params={'revs_breadcrumb': revision_id_test + '/' + prev_rev, 'per_page': per_page}) self.assertEquals(resp.status_code, 200) self.assertTemplateUsed('revision-log.html') self.assertContains(resp, '', count=per_page) self.assertContains(resp, '
  • Newer
  • ' % escape(prev_page_url)) self.assertContains(resp, '
  • Older
  • ' % escape(next_page_url)) mock_service.lookup_revision_log.return_value = \ revision_history_log_test[2*per_page:3*per_page+1] resp = self.client.get(next_page_url) prev_prev_prev_rev = revision_history_log_test[3*per_page]['id'] prev_page_url = reverse('browse-revision-log', kwargs={'sha1_git': prev_rev}, query_params={'revs_breadcrumb': revision_id_test, 'per_page': per_page}) next_page_url = reverse('browse-revision-log', kwargs={'sha1_git': prev_prev_prev_rev}, query_params={'revs_breadcrumb': revision_id_test + '/' + prev_rev + '/' + prev_prev_rev, 'per_page': per_page}) self.assertEquals(resp.status_code, 200) self.assertTemplateUsed('revision-log.html') self.assertContains(resp, '', count=per_page) self.assertContains(resp, '
  • Newer
  • ' % escape(prev_page_url)) self.assertContains(resp, '
  • Older
  • ' % escape(next_page_url)) mock_service.lookup_revision_log.return_value = \ revision_history_log_test[3*per_page:3*per_page+per_page//2] resp = self.client.get(next_page_url) prev_page_url = reverse('browse-revision-log', kwargs={'sha1_git': prev_prev_rev}, query_params={'revs_breadcrumb': revision_id_test + '/' + prev_rev, 'per_page': per_page}) self.assertEquals(resp.status_code, 200) self.assertTemplateUsed('revision-log.html') self.assertContains(resp, '', count=per_page//2) self.assertContains(resp, '
  • Older
  • ') self.assertContains(resp, '
  • Newer
  • ' % escape(prev_page_url)) @patch('swh.web.browse.utils.service') @patch('swh.web.browse.views.revision.service') @istest def revision_request_errors(self, mock_service, mock_utils_service): mock_service.lookup_revision.side_effect = \ NotFoundExc('Revision not found') url = reverse('browse-revision', kwargs={'sha1_git': revision_id_test}) resp = self.client.get(url) self.assertEquals(resp.status_code, 404) self.assertTemplateUsed('error.html') self.assertContains(resp, 'Revision not found', status_code=404) mock_service.lookup_revision_log.side_effect = \ NotFoundExc('Revision not found') url = reverse('browse-revision-log', kwargs={'sha1_git': revision_id_test}) resp = self.client.get(url) self.assertEquals(resp.status_code, 404) self.assertTemplateUsed('error.html') self.assertContains(resp, 'Revision not found', status_code=404) url = reverse('browse-revision', kwargs={'sha1_git': revision_id_test}, query_params={'origin_type': 'git', 'origin_url': 'https://github.com/foo/bar'}) mock_service.lookup_revision.side_effect = None mock_utils_service.lookup_origin.side_effect = \ NotFoundExc('Origin not found') resp = self.client.get(url) self.assertEquals(resp.status_code, 404) self.assertTemplateUsed('error.html') self.assertContains(resp, 'Origin not found', status_code=404) diff --git a/swh/web/tests/common/test_utils.py b/swh/web/tests/common/test_utils.py index 8393b934..751a975d 100644 --- a/swh/web/tests/common/test_utils.py +++ b/swh/web/tests/common/test_utils.py @@ -1,107 +1,140 @@ # Copyright (C) 2017-2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information import datetime import unittest from nose.tools import istest +from unittest.mock import patch from swh.web.common import utils class UtilsTestCase(unittest.TestCase): @istest def shorten_path_noop(self): noops = [ '/api/', '/browse/', '/content/symbol/foobar/' ] for noop in noops: self.assertEqual( utils.shorten_path(noop), noop ) @istest def shorten_path_sha1(self): sha1 = 'aafb16d69fd30ff58afdd69036a26047f3aebdc6' short_sha1 = sha1[:8] + '...' templates = [ '/api/1/content/sha1:%s/', '/api/1/content/sha1_git:%s/', '/api/1/directory/%s/', '/api/1/content/sha1:%s/ctags/', ] for template in templates: self.assertEqual( utils.shorten_path(template % sha1), template % short_sha1 ) @istest def shorten_path_sha256(self): sha256 = ('aafb16d69fd30ff58afdd69036a26047' '213add102934013a014dfca031c41aef') short_sha256 = sha256[:8] + '...' templates = [ '/api/1/content/sha256:%s/', '/api/1/directory/%s/', '/api/1/content/sha256:%s/filetype/', ] for template in templates: self.assertEqual( utils.shorten_path(template % sha256), template % short_sha256 ) @istest def parse_timestamp(self): input_timestamps = [ None, '2016-01-12', '2016-01-12T09:19:12+0100', 'Today is January 1, 2047 at 8:21:00AM', '1452591542', ] output_dates = [ None, datetime.datetime(2016, 1, 12, 0, 0), datetime.datetime(2016, 1, 12, 8, 19, 12, tzinfo=datetime.timezone.utc), datetime.datetime(2047, 1, 1, 8, 21), datetime.datetime(2016, 1, 12, 9, 39, 2, tzinfo=datetime.timezone.utc), ] for ts, exp_date in zip(input_timestamps, output_dates): self.assertEquals(utils.parse_timestamp(ts), exp_date) @istest def format_utc_iso_date(self): self.assertEqual(utils.format_utc_iso_date('2017-05-04T13:27:13+02:00'), # noqa '04 May 2017, 11:27 UTC') @istest def gen_path_info(self): input_path = '/home/user/swh-environment/swh-web/' expected_result = [ {'name': 'home', 'path': 'home'}, {'name': 'user', 'path': 'home/user'}, {'name': 'swh-environment', 'path': 'home/user/swh-environment'}, {'name': 'swh-web', 'path': 'home/user/swh-environment/swh-web'} ] path_info = utils.gen_path_info(input_path) self.assertEquals(path_info, expected_result) input_path = 'home/user/swh-environment/swh-web' path_info = utils.gen_path_info(input_path) self.assertEquals(path_info, expected_result) + + @patch('swh.web.common.utils.service') + @istest + def get_origin_visits(self, mock_service): + mock_service.MAX_LIMIT = 2 + + def _lookup_origin_visits(*args, **kwargs): + if kwargs['last_visit'] is None: + return [{'visit': 1, + 'date': '2017-05-06T00:59:10+00:00', + 'metadata': {}}, + {'visit': 2, + 'date': '2017-08-06T00:59:10+00:00', + 'metadata': {}} + ] + else: + return [{'visit': 3, + 'date': '2017-09-06T00:59:10+00:00', + 'metadata': {}} + ] + + mock_service.lookup_origin_visits.side_effect = _lookup_origin_visits + + origin_info = { + 'id': 1, + 'type': 'git', + 'url': 'https://github.com/foo/bar', + } + + origin_visits = utils.get_origin_visits(origin_info) + + self.assertEqual(len(origin_visits), 3)