diff --git a/PKG-INFO b/PKG-INFO index fe855da9..0c11bc99 100644 --- a/PKG-INFO +++ b/PKG-INFO @@ -1,112 +1,112 @@ Metadata-Version: 2.1 Name: swh.web -Version: 0.0.208 +Version: 0.0.209 Summary: Software Heritage Web UI Home-page: https://forge.softwareheritage.org/diffusion/DWUI/ Author: Software Heritage developers Author-email: swh-devel@inria.fr License: UNKNOWN Project-URL: Funding, https://www.softwareheritage.org/donate Project-URL: Source, https://forge.softwareheritage.org/source/swh-web Project-URL: Bug Reports, https://forge.softwareheritage.org/maniphest Description: # swh-web This repository holds the development of Software Heritage web applications: * swh-web API (https://archive.softwareheritage.org/api): enables to query the content of the archive through HTTP requests and get responses in JSON or YAML. * swh-web browse (https://archive.softwareheritage.org/browse): graphical interface that eases the navigation in the archive. Documentation about how to use these components but also the details of their URI schemes can be found in the docs folder. The produced HTML documentation can be read and browsed at https://docs.softwareheritage.org/devel/swh-web/index.html. ## Technical details Those applications are powered by: * [Django Web Framework](https://www.djangoproject.com/) on the backend side with the following extensions enabled: * [django-rest-framework](http://www.django-rest-framework.org/) * [django-webpack-loader](https://github.com/owais/django-webpack-loader) * [django-js-reverse](http://django-js-reverse.readthedocs.io/en/latest/) * [webpack](https://webpack.js.org/) on the frontend side for better static assets management, including: * assets dependencies management and retrieval through [yarn](https://yarnpkg.com/en/) * linting of custom javascript code (through [eslint](https://eslint.org/)) and stylesheets (through [stylelint](https://stylelint.io/)) * use of [es6](http://es6-features.org) syntax and advanced javascript feature like [async/await](https://javascript.info/async-await) or [fetch](https://developer.mozilla.org/en-US/docs/Web/API/Fetch_API) thanks to [babel](https://babeljs.io/) (es6 to es5 transpiler and polyfills provider) * assets minification (using [terser](https://github.com/terser-js/terser) and [cssnano](http://cssnano.co/)) but also dead code elimination for production use ## How to build and run ### Requirements First you will need [Python 3](https://www.python.org) and a complete [swh development environment](https://forge.softwareheritage.org/source/swh-environment/) installed. To run the backend, you need to have the following Python 3 modules installed: * beautifulsoup4 * django >= 1.11.0 * djangorestframework >= 3.4.0 * django_webpack_loader * django_js_reverse * docutils * file_magic >= 0.3.0 * htmlmin * lxml * pygments * pypandoc * python-dateutil * pyyaml * requests To compile the frontend assets, you need to have [nodejs](https://nodejs.org/en/) >= 8.x and [yarn](https://yarnpkg.com/en/) installed. If you are on Debian stretch, you can easily install an up to date nodejs from the [stretch-backports](https://backports.debian.org/Instructions/) repository. Packages for yarn can be installed by following [these instructions](https://yarnpkg.com/en/docs/install#debian-stable). Alternatively, you can install yarn with `npm install yarn`, and add `YARN=node_modules/yarn/bin/yarn` as argument whenever you run `make`. Please note that the static assets bundles generated by webpack are not stored in the git repository. Follow the instructions below in order to generate them in order to be able to run the frontend part of the web applications. ### Make targets Below is the list of available make targets that can be executed from the root directory of swh-web in order to build and/or execute the web applications under various configurations: * **run-django-webpack-devserver**: Compile and serve not optimized (without mignification and dead code elimination) frontend static assets using [webpack-dev-server](https://github.com/webpack/webpack-dev-server) and run django server with development settings. This is the recommended target to use when developing swh-web as it enables automatic reloading of backend and frontend part of the applications when modifying source files (*.py, *.js, *.css, *.html). * **run-django-webpack-dev**: Compile not optimized (no minification, no dead code elimination) frontend static assets using webpack and run django server with development settings. This is the recommended target when one only wants to develop the backend side of the application. * **run-django-webpack-prod**: Compile optimized (with minification and dead code elimination) frontend static assets using webpack and run django server with production settings. This is useful to test the applications in production mode (with the difference that static assets are served by django). Production settings notably enable advanced django caching and you will need to have [memcached](https://memcached.org/) installed for that feature to work. * **run-django-server-dev**: Run the django server with development settings but without compiling frontend static assets through webpack. * **run-django-server-prod**: Run the django server with production settings but without compiling frontend static assets through webpack. * **run-gunicorn-server**: Run the web applications with production settings in a [gunicorn](http://gunicorn.org/) worker as they will be in real production environment. Once one of these targets executed, the web applications can be executed by pointing your browser to http://localhost:5004. ### Yarn targets Below is a list of available yarn targets in order to only execute the frontend static assets compilation (no web server will be executed): * **build-dev**: compile not optimized (without mignification and dead code elimination) frontend static assets and store the results in the `swh/web/static` folder. * **build**: compile optimized (with mignification and dead code elimination) frontend static assets and store the results in the `swh/web/static` folder. **The build target must be executed prior performing the Debian packaging of swh-web** in order for the package to contain the optimized assets dedicated to production environment. To execute these targets, issue the following command: ``` $ yarn ``` Platform: UNKNOWN Classifier: Programming Language :: Python :: 3 Classifier: Intended Audience :: Developers Classifier: License :: OSI Approved :: GNU Affero General Public License v3 or later (AGPLv3+) Classifier: Operating System :: OS Independent Classifier: Development Status :: 5 - Production/Stable Classifier: Framework :: Django Description-Content-Type: text/markdown Provides-Extra: testing diff --git a/swh.web.egg-info/PKG-INFO b/swh.web.egg-info/PKG-INFO index fe855da9..0c11bc99 100644 --- a/swh.web.egg-info/PKG-INFO +++ b/swh.web.egg-info/PKG-INFO @@ -1,112 +1,112 @@ Metadata-Version: 2.1 Name: swh.web -Version: 0.0.208 +Version: 0.0.209 Summary: Software Heritage Web UI Home-page: https://forge.softwareheritage.org/diffusion/DWUI/ Author: Software Heritage developers Author-email: swh-devel@inria.fr License: UNKNOWN Project-URL: Funding, https://www.softwareheritage.org/donate Project-URL: Source, https://forge.softwareheritage.org/source/swh-web Project-URL: Bug Reports, https://forge.softwareheritage.org/maniphest Description: # swh-web This repository holds the development of Software Heritage web applications: * swh-web API (https://archive.softwareheritage.org/api): enables to query the content of the archive through HTTP requests and get responses in JSON or YAML. * swh-web browse (https://archive.softwareheritage.org/browse): graphical interface that eases the navigation in the archive. Documentation about how to use these components but also the details of their URI schemes can be found in the docs folder. The produced HTML documentation can be read and browsed at https://docs.softwareheritage.org/devel/swh-web/index.html. ## Technical details Those applications are powered by: * [Django Web Framework](https://www.djangoproject.com/) on the backend side with the following extensions enabled: * [django-rest-framework](http://www.django-rest-framework.org/) * [django-webpack-loader](https://github.com/owais/django-webpack-loader) * [django-js-reverse](http://django-js-reverse.readthedocs.io/en/latest/) * [webpack](https://webpack.js.org/) on the frontend side for better static assets management, including: * assets dependencies management and retrieval through [yarn](https://yarnpkg.com/en/) * linting of custom javascript code (through [eslint](https://eslint.org/)) and stylesheets (through [stylelint](https://stylelint.io/)) * use of [es6](http://es6-features.org) syntax and advanced javascript feature like [async/await](https://javascript.info/async-await) or [fetch](https://developer.mozilla.org/en-US/docs/Web/API/Fetch_API) thanks to [babel](https://babeljs.io/) (es6 to es5 transpiler and polyfills provider) * assets minification (using [terser](https://github.com/terser-js/terser) and [cssnano](http://cssnano.co/)) but also dead code elimination for production use ## How to build and run ### Requirements First you will need [Python 3](https://www.python.org) and a complete [swh development environment](https://forge.softwareheritage.org/source/swh-environment/) installed. To run the backend, you need to have the following Python 3 modules installed: * beautifulsoup4 * django >= 1.11.0 * djangorestframework >= 3.4.0 * django_webpack_loader * django_js_reverse * docutils * file_magic >= 0.3.0 * htmlmin * lxml * pygments * pypandoc * python-dateutil * pyyaml * requests To compile the frontend assets, you need to have [nodejs](https://nodejs.org/en/) >= 8.x and [yarn](https://yarnpkg.com/en/) installed. If you are on Debian stretch, you can easily install an up to date nodejs from the [stretch-backports](https://backports.debian.org/Instructions/) repository. Packages for yarn can be installed by following [these instructions](https://yarnpkg.com/en/docs/install#debian-stable). Alternatively, you can install yarn with `npm install yarn`, and add `YARN=node_modules/yarn/bin/yarn` as argument whenever you run `make`. Please note that the static assets bundles generated by webpack are not stored in the git repository. Follow the instructions below in order to generate them in order to be able to run the frontend part of the web applications. ### Make targets Below is the list of available make targets that can be executed from the root directory of swh-web in order to build and/or execute the web applications under various configurations: * **run-django-webpack-devserver**: Compile and serve not optimized (without mignification and dead code elimination) frontend static assets using [webpack-dev-server](https://github.com/webpack/webpack-dev-server) and run django server with development settings. This is the recommended target to use when developing swh-web as it enables automatic reloading of backend and frontend part of the applications when modifying source files (*.py, *.js, *.css, *.html). * **run-django-webpack-dev**: Compile not optimized (no minification, no dead code elimination) frontend static assets using webpack and run django server with development settings. This is the recommended target when one only wants to develop the backend side of the application. * **run-django-webpack-prod**: Compile optimized (with minification and dead code elimination) frontend static assets using webpack and run django server with production settings. This is useful to test the applications in production mode (with the difference that static assets are served by django). Production settings notably enable advanced django caching and you will need to have [memcached](https://memcached.org/) installed for that feature to work. * **run-django-server-dev**: Run the django server with development settings but without compiling frontend static assets through webpack. * **run-django-server-prod**: Run the django server with production settings but without compiling frontend static assets through webpack. * **run-gunicorn-server**: Run the web applications with production settings in a [gunicorn](http://gunicorn.org/) worker as they will be in real production environment. Once one of these targets executed, the web applications can be executed by pointing your browser to http://localhost:5004. ### Yarn targets Below is a list of available yarn targets in order to only execute the frontend static assets compilation (no web server will be executed): * **build-dev**: compile not optimized (without mignification and dead code elimination) frontend static assets and store the results in the `swh/web/static` folder. * **build**: compile optimized (with mignification and dead code elimination) frontend static assets and store the results in the `swh/web/static` folder. **The build target must be executed prior performing the Debian packaging of swh-web** in order for the package to contain the optimized assets dedicated to production environment. To execute these targets, issue the following command: ``` $ yarn ``` Platform: UNKNOWN Classifier: Programming Language :: Python :: 3 Classifier: Intended Audience :: Developers Classifier: License :: OSI Approved :: GNU Affero General Public License v3 or later (AGPLv3+) Classifier: Operating System :: OS Independent Classifier: Development Status :: 5 - Production/Stable Classifier: Framework :: Django Description-Content-Type: text/markdown Provides-Extra: testing diff --git a/swh/web/api/views/origin.py b/swh/web/api/views/origin.py index 2e908a34..bbf7f096 100644 --- a/swh/web/api/views/origin.py +++ b/swh/web/api/views/origin.py @@ -1,638 +1,639 @@ # Copyright (C) 2015-2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information from distutils.util import strtobool from functools import partial from swh.web.common import service from swh.web.common.exc import BadInputExc from swh.web.common.origin_visits import get_origin_visits from swh.web.common.utils import reverse from swh.web.api.apidoc import api_doc, format_docstring from swh.web.api.apiurls import api_route from swh.web.api.views.utils import api_lookup DOC_RETURN_ORIGIN = ''' :>json string origin_visits_url: link to in order to get information about the visits for that origin :>json string url: the origin canonical url :>json string type: the type of software origin (deprecated value; types are now associated to visits instead of origins) :>json number id: the origin unique identifier (deprecated value; you should only refer to origins based on their URL) ''' DOC_RETURN_ORIGIN_ARRAY = \ DOC_RETURN_ORIGIN.replace(':>json', ':>jsonarr') DOC_RETURN_ORIGIN_VISIT = ''' :>json string date: ISO representation of the visit date (in UTC) :>json str origin: the origin canonical url :>json string origin_url: link to get information about the origin :>jsonarr string snapshot: the snapshot identifier of the visit :>jsonarr string snapshot_url: link to :http:get:`/api/1/snapshot/(snapshot_id)/` in order to get information about the snapshot of the visit :>json string status: status of the visit (either **full**, **partial** or **ongoing**) :>json number visit: the unique identifier of the visit ''' DOC_RETURN_ORIGIN_VISIT_ARRAY = \ DOC_RETURN_ORIGIN_VISIT.replace(':>json', ':>jsonarr') DOC_RETURN_ORIGIN_VISIT_ARRAY += ''' :>jsonarr number id: the unique identifier of the origin :>jsonarr string origin_visit_url: link to :http:get:`/api/1/origin/(origin_url)/visit/(visit_id)/` in order to get information about the visit ''' def _enrich_origin(origin): if 'url' in origin: o = origin.copy() o['origin_visits_url'] = reverse( 'api-1-origin-visits', url_args={'origin_url': origin['url']}) return o return origin def _enrich_origin_visit(origin_visit, *, with_origin_link, with_origin_visit_link): ov = origin_visit.copy() if with_origin_link: ov['origin_url'] = reverse('api-1-origin', url_args={'origin_url': ov['origin']}) if with_origin_visit_link: ov['origin_visit_url'] = reverse('api-1-origin-visit', url_args={'origin_url': ov['origin'], 'visit_id': ov['visit']}) snapshot = ov['snapshot'] if snapshot: ov['snapshot_url'] = reverse('api-1-snapshot', url_args={'snapshot_id': snapshot}) else: ov['snapshot_url'] = None return ov @api_route(r'/origins/', 'api-1-origins') @api_doc('/origins/', noargs=True) @format_docstring(return_origin_array=DOC_RETURN_ORIGIN_ARRAY) def api_origins(request): """ .. http:get:: /api/1/origins/ Get list of archived software origins. Origins are sorted by ids before returning them. :query int origin_from: The first origin id that will be included in returned results (default to 1) :query int origin_count: The maximum number of origins to return (default to 100, can not exceed 10000) {return_origin_array} {common_headers} {resheader_link} **Allowed HTTP Methods:** :http:method:`get`, :http:method:`head`, - :http:method:`options` + :http:method:`options` :statuscode 200: no error **Example:** .. parsed-literal:: :swh_web_api:`origins?origin_from=50000&origin_count=500` """ origin_from = int(request.query_params.get('origin_from', '1')) origin_count = int(request.query_params.get('origin_count', '100')) origin_count = min(origin_count, 10000) results = api_lookup( service.lookup_origins, origin_from, origin_count+1, enrich_fn=_enrich_origin) response = {'results': results, 'headers': {}} if len(results) > origin_count: origin_from = results.pop()['id'] response['headers']['link-next'] = reverse( 'api-1-origins', query_params={'origin_from': origin_from, 'origin_count': origin_count}) return response @api_route(r'/origin/(?P[a-z]+)/url/(?P.+)/', 'api-1-origin') @api_route(r'/origin/(?P.+)/get/', 'api-1-origin') @api_route(r'/origin/(?P[0-9]+)/', 'api-1-origin') @api_doc('/origin/') @format_docstring(return_origin=DOC_RETURN_ORIGIN) def api_origin(request, origin_id=None, origin_type=None, origin_url=None): """ .. http:get:: /api/1/origin/(origin_url)/get/ Get information about a software origin. :param string origin_url: the origin url {return_origin} {common_headers} **Allowed HTTP Methods:** :http:method:`get`, :http:method:`head`, - :http:method:`options` + :http:method:`options` :statuscode 200: no error :statuscode 404: requested origin can not be found in the archive **Example:** .. parsed-literal:: :swh_web_api:`origin/git/url/https://github.com/python/cpython/` .. http:get:: /api/1/origin/(origin_id)/ Get information about a software origin. .. warning:: All endpoints using an ``origin_id`` or an ``origin_type`` are deprecated and will be removed in the near future. Only those using an ``origin_url`` will remain available. You should use :http:get:`/api/1/origin/(origin_url)/get/` instead. :param int origin_id: a software origin identifier {return_origin} {common_headers} **Allowed HTTP Methods:** :http:method:`get`, :http:method:`head`, - :http:method:`options` + :http:method:`options` :statuscode 200: no error :statuscode 404: requested origin can not be found in the archive **Example:** .. parsed-literal:: :swh_web_api:`origin/1/` .. http:get:: /api/1/origin/(origin_type)/url/(origin_url)/ Get information about a software origin. .. warning:: All endpoints using an ``origin_id`` or an ``origin_type`` are deprecated and will be removed in the near future. Only those using an ``origin_url`` will remain available. You should use :http:get:`/api/1/origin/(origin_url)/get/` instead. :param string origin_type: the origin type (possible values are ``git``, ``svn``, ``hg``, ``deb``, ``pypi``, ``npm``, ``ftp`` or ``deposit``) :param string origin_url: the origin url {return_origin} {common_headers} **Allowed HTTP Methods:** :http:method:`get`, :http:method:`head`, - :http:method:`options` + :http:method:`options` :statuscode 200: no error :statuscode 404: requested origin can not be found in the archive **Example:** .. parsed-literal:: :swh_web_api:`origin/git/url/https://github.com/python/cpython/` """ ori_dict = { 'id': int(origin_id) if origin_id else None, 'type': origin_type, 'url': origin_url } ori_dict = {k: v for k, v in ori_dict.items() if ori_dict[k]} error_msg = 'Origin %s not found.' % \ (ori_dict.get('id') or ori_dict['url']) return api_lookup( service.lookup_origin, ori_dict, notfound_msg=error_msg, enrich_fn=_enrich_origin) @api_route(r'/origin/search/(?P.+)/', 'api-1-origin-search') @api_doc('/origin/search/') @format_docstring(return_origin_array=DOC_RETURN_ORIGIN_ARRAY) def api_origin_search(request, url_pattern): """ .. http:get:: /api/1/origin/search/(url_pattern)/ Search for software origins whose urls contain a provided string pattern or match a provided regular expression. The search is performed in a case insensitive way. :param string url_pattern: a string pattern or a regular expression :query int offset: the number of found origins to skip before returning results :query int limit: the maximum number of found origins to return :query boolean regexp: if true, consider provided pattern as a regular expression and search origins whose urls match it :query boolean with_visit: if true, only return origins with at least one visit by Software heritage {return_origin_array} {common_headers} **Allowed HTTP Methods:** :http:method:`get`, :http:method:`head`, - :http:method:`options` + :http:method:`options` :statuscode 200: no error **Example:** .. parsed-literal:: :swh_web_api:`origin/search/python/?limit=2` """ result = {} offset = int(request.query_params.get('offset', '0')) limit = int(request.query_params.get('limit', '70')) regexp = request.query_params.get('regexp', 'false') with_visit = request.query_params.get('with_visit', 'false') results = api_lookup(service.search_origin, url_pattern, offset, limit, bool(strtobool(regexp)), bool(strtobool(with_visit)), enrich_fn=_enrich_origin) nb_results = len(results) if nb_results == limit: query_params = {} query_params['offset'] = offset + limit query_params['limit'] = limit query_params['regexp'] = regexp result['headers'] = { 'link-next': reverse('api-1-origin-search', url_args={'url_pattern': url_pattern}, query_params=query_params) } result.update({ 'results': results }) return result @api_route(r'/origin/metadata-search/', 'api-1-origin-metadata-search') @api_doc('/origin/metadata-search/', noargs=True, need_params=True) @format_docstring(return_origin_array=DOC_RETURN_ORIGIN_ARRAY) def api_origin_metadata_search(request): """ .. http:get:: /api/1/origin/metadata-search/ Search for software origins whose metadata (expressed as a JSON-LD/CodeMeta dictionary) match the provided criteria. For now, only full-text search on this dictionary is supported. :query str fulltext: a string that will be matched against origin metadata; results are ranked and ordered starting with the best ones. :query int limit: the maximum number of found origins to return (bounded to 100) {return_origin_array} {common_headers} **Allowed HTTP Methods:** :http:method:`get`, :http:method:`head`, - :http:method:`options` + :http:method:`options` :statuscode 200: no error **Example:** .. parsed-literal:: :swh_web_api:`origin/metadata-search/?limit=2&fulltext=Jane%20Doe` """ fulltext = request.query_params.get('fulltext', None) limit = min(int(request.query_params.get('limit', '70')), 100) if not fulltext: content = '"fulltext" must be provided and non-empty.' raise BadInputExc(content) results = api_lookup(service.search_origin_metadata, fulltext, limit) return { 'results': results, } @api_route(r'/origin/(?P.*)/visits/', 'api-1-origin-visits') @api_route(r'/origin/(?P[0-9]+)/visits/', 'api-1-origin-visits') @api_doc('/origin/visits/') @format_docstring( return_origin_visit_array=DOC_RETURN_ORIGIN_VISIT_ARRAY) def api_origin_visits(request, origin_id=None, origin_url=None): """ .. http:get:: /api/1/origin/(origin_url)/visits/ Get information about all visits of a software origin. Visits are returned sorted in descending order according to their date. :param str origin_url: a software origin URL :query int per_page: specify the number of visits to list, for pagination purposes :query int last_visit: visit to start listing from, for pagination purposes {common_headers} {resheader_link} {return_origin_visit_array} **Allowed HTTP Methods:** :http:method:`get`, :http:method:`head`, - :http:method:`options` + :http:method:`options` :statuscode 200: no error :statuscode 404: requested origin can not be found in the archive **Example:** .. parsed-literal:: :swh_web_api:`origin/https://github.com/hylang/hy/visits/` .. http:get:: /api/1/origin/(origin_id)/visits/ Get information about all visits of a software origin. Visits are returned sorted in descending order according to their date. .. warning:: All endpoints using an ``origin_id`` are deprecated and will be removed in the near future. Only those using an ``origin_url`` will remain available. Use :http:get:`/api/1/origin/(origin_url)/visits/` instead. :param int origin_id: a software origin identifier :query int per_page: specify the number of visits to list, for pagination purposes :query int last_visit: visit to start listing from, for pagination purposes {common_headers} {resheader_link} {return_origin_visit_array} **Allowed HTTP Methods:** :http:method:`get`, :http:method:`head`, - :http:method:`options` + :http:method:`options` :statuscode 200: no error :statuscode 404: requested origin can not be found in the archive **Example:** .. parsed-literal:: :swh_web_api:`origin/1/visits/` """ result = {} if origin_url: origin_query = {'url': origin_url} notfound_msg = 'No origin {} found'.format(origin_url) url_args_next = {'origin_url': origin_url} else: origin_query = {'id': int(origin_id)} notfound_msg = 'No origin {} found'.format(origin_id) url_args_next = {'origin_id': origin_id} per_page = int(request.query_params.get('per_page', '10')) last_visit = request.query_params.get('last_visit') if last_visit: last_visit = int(last_visit) def _lookup_origin_visits( origin_query, last_visit=last_visit, per_page=per_page): all_visits = get_origin_visits(origin_query) all_visits.reverse() visits = [] if not last_visit: visits = all_visits[:per_page] else: for i, v in enumerate(all_visits): if v['visit'] == last_visit: visits = all_visits[i+1:i+1+per_page] break for v in visits: yield v results = api_lookup(_lookup_origin_visits, origin_query, notfound_msg=notfound_msg, enrich_fn=partial(_enrich_origin_visit, with_origin_link=False, with_origin_visit_link=True)) if results: nb_results = len(results) if nb_results == per_page: new_last_visit = results[-1]['visit'] query_params = {} query_params['last_visit'] = new_last_visit if request.query_params.get('per_page'): query_params['per_page'] = per_page result['headers'] = { 'link-next': reverse('api-1-origin-visits', url_args=url_args_next, query_params=query_params) } result.update({ 'results': results }) return result @api_route(r'/origin/(?P.*)/visit/latest/', 'api-1-origin-visit-latest', throttle_scope='swh_api_origin_visit_latest') @api_doc('/origin/visit/') @format_docstring(return_origin_visit=DOC_RETURN_ORIGIN_VISIT) def api_origin_visit_latest(request, origin_url=None): """ .. http:get:: /api/1/origin/(origin_url)/visit/latest/ Get information about a specific visit of a software origin. :param str origin_url: a software origin URL :query boolean require_snapshot: if true, only return a visit with a snapshot {common_headers} {return_origin_visit} **Allowed HTTP Methods:** :http:method:`get`, :http:method:`head`, - :http:method:`options` + :http:method:`options` :statuscode 200: no error :statuscode 404: requested origin or visit can not be found in the archive **Example:** .. parsed-literal:: :swh_web_api:`origin/https://github.com/hylang/hy/visit/latest/` """ require_snapshot = request.query_params.get('require_snapshot', 'false') return api_lookup( service.lookup_origin_visit_latest, origin_url, bool(strtobool(require_snapshot)), notfound_msg=('No visit for origin {} found' .format(origin_url)), enrich_fn=partial(_enrich_origin_visit, with_origin_link=True, with_origin_visit_link=False)) @api_route(r'/origin/(?P.*)/visit/(?P[0-9]+)/', 'api-1-origin-visit') @api_route(r'/origin/(?P[0-9]+)/visit/(?P[0-9]+)/', 'api-1-origin-visit') @api_doc('/origin/visit/') @format_docstring(return_origin_visit=DOC_RETURN_ORIGIN_VISIT) def api_origin_visit(request, visit_id, origin_url=None, origin_id=None): """ .. http:get:: /api/1/origin/(origin_url)/visit/(visit_id)/ Get information about a specific visit of a software origin. :param str origin_url: a software origin URL :param int visit_id: a visit identifier {common_headers} {return_origin_visit} **Allowed HTTP Methods:** :http:method:`get`, :http:method:`head`, - :http:method:`options` + :http:method:`options` :statuscode 200: no error :statuscode 404: requested origin or visit can not be found in the archive **Example:** .. parsed-literal:: :swh_web_api:`origin/https://github.com/hylang/hy/visit/1/` .. http:get:: /api/1/origin/(origin_id)/visit/(visit_id)/ Get information about a specific visit of a software origin. .. warning:: All endpoints using an ``origin_id`` are deprecated and will be removed in the near future. Only those using an ``origin_url`` will remain available. Use :http:get:`/api/1/origin/(origin_url)/visit/(visit_id)` instead. :param int origin_id: a software origin identifier :param int visit_id: a visit identifier {common_headers} {return_origin_visit} **Allowed HTTP Methods:** :http:method:`get`, :http:method:`head`, - :http:method:`options` + :http:method:`options` :statuscode 200: no error :statuscode 404: requested origin or visit can not be found in the archive **Example:** .. parsed-literal:: :swh_web_api:`origin/1500/visit/1/` """ if not origin_url: origin_url = service.lookup_origin({'id': int(origin_id)})['url'] return api_lookup( service.lookup_origin_visit, origin_url, int(visit_id), notfound_msg=('No visit {} for origin {} found' .format(visit_id, origin_url)), enrich_fn=partial(_enrich_origin_visit, with_origin_link=True, with_origin_visit_link=False)) @api_route(r'/origin/(?P[a-z]+)/url/(?P.+)' '/intrinsic-metadata', 'api-origin-intrinsic-metadata') @api_doc('/origin/intrinsic-metadata/') @format_docstring() def api_origin_intrinsic_metadata(request, origin_type, origin_url): """ .. http:get:: /api/1/origin/(origin_type)/url/(origin_url)/intrinsic-metadata Get intrinsic metadata of a software origin (as a JSON-LD/CodeMeta dictionary). :param string origin_type: the origin type (possible values are ``git``, ``svn``, ``hg``, ``deb``, ``pypi``, ``npm``, ``ftp`` or ``deposit``) :param string origin_url: the origin url :>json string ???: intrinsic metadata field of the origin {common_headers} - **Allowed HTTP Methods:** :http:method:`get`, :http:method:`head`, :http:method:`options` + **Allowed HTTP Methods:** :http:method:`get`, :http:method:`head`, + :http:method:`options` :statuscode 200: no error :statuscode 404: requested origin can not be found in the archive **Example:** .. parsed-literal:: :swh_web_api:`origin/git/url/https://github.com/python/cpython/intrinsic-metadata` """ # noqa ori_dict = { 'type': origin_type, 'url': origin_url } error_msg = 'Origin with URL %s not found' % ori_dict['url'] return api_lookup( service.lookup_origin_intrinsic_metadata, ori_dict, notfound_msg=error_msg, enrich_fn=_enrich_origin) diff --git a/swh/web/api/views/origin_save.py b/swh/web/api/views/origin_save.py index 34e5ea05..6ac43fe1 100644 --- a/swh/web/api/views/origin_save.py +++ b/swh/web/api/views/origin_save.py @@ -1,87 +1,87 @@ # Copyright (C) 2018-2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information from django.views.decorators.cache import never_cache from swh.web.api.apidoc import api_doc, format_docstring from swh.web.api.apiurls import api_route from swh.web.common.origin_save import ( create_save_origin_request, get_save_origin_requests ) @api_route(r'/origin/save/(?P.+)/url/(?P.+)/', 'api-1-save-origin', methods=['GET', 'POST'], throttle_scope='swh_save_origin') @never_cache @api_doc('/origin/save/') @format_docstring() def api_save_origin(request, origin_type, origin_url): """ .. http:get:: /api/1/origin/save/(origin_type)/url/(origin_url)/ .. http:post:: /api/1/origin/save/(origin_type)/url/(origin_url)/ Request the saving of a software origin into the archive or check the status of previously created save requests. That endpoint enables to create a saving task for a software origin through a POST request. Depending of the provided origin url, the save request can either be: * immediately **accepted**, for well known code hosting providers like for instance GitHub or GitLab * **rejected**, in case the url is blacklisted by Software Heritage * **put in pending state** until a manual check is done in order to determine if it can be loaded or not Once a saving request has been accepted, its associated saving task status can then be checked through a GET request on the same url. Returned status can either be: * **not created**: no saving task has been created * **not yet scheduled**: saving task has been created but its - execution has not yet been scheduled + execution has not yet been scheduled * **scheduled**: the task execution has been scheduled * **succeed**: the saving task has been successfully executed * **failed**: the saving task has been executed but it failed When issuing a POST request an object will be returned while a GET request will return an array of objects (as multiple save requests might have been submitted for the same origin). :param string origin_type: the type of origin to save (currently the supported types are ``git``, ``hg`` and ``svn``) :param string origin_url: the url of the origin to save {common_headers} :>json string origin_url: the url of the origin to save :>json string origin_type: the type of the origin to save :>json string save_request_date: the date (in iso format) the save request was issued :>json string save_request_status: the status of the save request, either **accepted**, **rejected** or **pending** :>json string save_task_status: the status of the origin saving task, either **not created**, **not yet scheduled**, **scheduled**, **succeed** or **failed** **Allowed HTTP Methods:** :http:method:`get`, :http:method:`post`, - :http:method:`head`, :http:method:`options` + :http:method:`head`, :http:method:`options` :statuscode 200: no error :statuscode 400: an invalid origin type or url has been provided :statuscode 403: the provided origin url is blacklisted :statuscode 404: no save requests have been found for a given origin """ if request.method == 'POST': sor = create_save_origin_request(origin_type, origin_url) del sor['id'] else: sor = get_save_origin_requests(origin_type, origin_url) for s in sor: del s['id'] # noqa return sor diff --git a/swh/web/api/views/release.py b/swh/web/api/views/release.py index b43ecdc3..ca765fb7 100644 --- a/swh/web/api/views/release.py +++ b/swh/web/api/views/release.py @@ -1,59 +1,59 @@ # Copyright (C) 2015-2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information from swh.web.common import service from swh.web.api import utils from swh.web.api.apidoc import api_doc, format_docstring from swh.web.api.apiurls import api_route from swh.web.api.views.utils import api_lookup @api_route(r'/release/(?P[0-9a-f]+)/', 'api-1-release', checksum_args=['sha1_git']) @api_doc('/release/') @format_docstring() def api_release(request, sha1_git): """ .. http:get:: /api/1/release/(sha1_git)/ Get information about a release in the archive. Releases are identified by **sha1** checksums, compatible with Git tag identifiers. See :func:`swh.model.identifiers.release_identifier` in our data model module for details about how they are computed. :param string sha1_git: hexadecimal representation of the release **sha1_git** identifier {common_headers} :>json object author: information about the author of the release :>json string date: ISO representation of the release date (in UTC) :>json string id: the release unique identifier :>json string message: the message associated to the release :>json string name: the name of the release :>json string target: the target identifier of the release :>json string target_type: the type of the target, can be either **release**, **revision**, **content**, **directory** :>json string target_url: a link to the adequate api url based on the target type **Allowed HTTP Methods:** :http:method:`get`, :http:method:`head`, - :http:method:`options` + :http:method:`options` :statuscode 200: no error :statuscode 400: an invalid **sha1_git** value has been provided :statuscode 404: requested release can not be found in the archive **Example:** .. parsed-literal:: :swh_web_api:`release/208f61cc7a5dbc9879ae6e5c2f95891e270f09ef/` """ error_msg = 'Release with sha1_git %s not found.' % sha1_git return api_lookup( service.lookup_release, sha1_git, notfound_msg=error_msg, enrich_fn=utils.enrich_release) diff --git a/swh/web/api/views/revision.py b/swh/web/api/views/revision.py index e7f676b9..bb9aab9f 100644 --- a/swh/web/api/views/revision.py +++ b/swh/web/api/views/revision.py @@ -1,473 +1,477 @@ # Copyright (C) 2015-2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information from django.http import HttpResponse from swh.web.common import service from swh.web.common.utils import reverse from swh.web.common.utils import parse_timestamp from swh.web.api import utils from swh.web.api.apidoc import api_doc, format_docstring from swh.web.api.apiurls import api_route from swh.web.api.views.utils import api_lookup DOC_RETURN_REVISION = ''' :>json object author: information about the author of the revision :>json object committer: information about the committer of the revision :>json string committer_date: ISO representation of the commit date (in UTC) :>json string date: ISO representation of the revision date (in UTC) :>json string directory: the unique identifier that revision points to :>json string directory_url: link to :http:get:`/api/1/directory/(sha1_git)/[(path)/]` to get information about the directory associated to the revision :>json string id: the revision unique identifier :>json boolean merge: whether or not the revision corresponds to a merge commit :>json string message: the message associated to the revision :>json array parents: the parents of the revision, i.e. the previous revisions that head directly to it, each entry of that array contains an unique parent revision identifier but also a link to :http:get:`/api/1/revision/(sha1_git)/` to get more information about it :>json string type: the type of the revision ''' # noqa DOC_RETURN_REVISION_ARRAY = \ DOC_RETURN_REVISION.replace(':>json', ':>jsonarr') def _revision_directory_by(revision, path, request_path, limit=100, with_data=False): """ Compute the revision matching criterion's directory or content data. Args: revision: dictionary of criterions representing a revision to lookup path: directory's path to lookup request_path: request path which holds the original context to limit: optional query parameter to limit the revisions log (default to 100). For now, note that this limit could impede the transitivity conclusion about sha1_git not being an ancestor of with_data: indicate to retrieve the content's raw data if path resolves to a content. """ def enrich_directory_local(dir, context_url=request_path): return utils.enrich_directory(dir, context_url) rev_id, result = service.lookup_directory_through_revision( revision, path, limit=limit, with_data=with_data) content = result['content'] if result['type'] == 'dir': # dir_entries result['content'] = list(map(enrich_directory_local, content)) elif result['type'] == 'file': # content result['content'] = utils.enrich_content(content) elif result['type'] == 'rev': # revision result['content'] = utils.enrich_revision(content) return result @api_route(r'/revision/origin/(?P[0-9]+)' r'/branch/(?P.+)/log/', 'api-1-revision-origin-log') @api_route(r'/revision/origin/(?P[0-9]+)/log/', 'api-1-revision-origin-log') @api_route(r'/revision/origin/(?P[0-9]+)' r'/ts/(?P.+)/log/', 'api-1-revision-origin-log') @api_route(r'/revision/origin/(?P[0-9]+)' r'/branch/(?P.+)' r'/ts/(?P.+)/log/', 'api-1-revision-origin-log') @api_doc('/revision/origin/log/') @format_docstring(return_revision_array=DOC_RETURN_REVISION_ARRAY) def api_revision_log_by(request, origin_id, branch_name='HEAD', ts=None): """ .. http:get:: /api/1/revision/origin/(origin_id)[/branch/(branch_name)][/ts/(timestamp)]/log Show the commit log for a revision, searching for it based on software origin, branch name, and/or visit timestamp. This endpoint behaves like :http:get:`/api/1/revision/(sha1_git)[/prev/(prev_sha1s)]/log/`, but operates on the revision that has been found at a given software origin, close to a given point in time, pointed by a given branch. .. warning:: All endpoints using an ``origin_id`` are deprecated and will be removed in the near future. Only those using an ``origin_url`` will remain available. You should instead use successively :http:get:`/api/1/origin/(origin_url)/visits/`, :http:get:`/api/1/snapshot/(snapshot_id)/`, and :http:get:`/api/1/revision/(sha1_git)[/prev/(prev_sha1s)]/log/`. :param int origin_id: a software origin identifier :param string branch_name: optional parameter specifying a fully-qualified branch name associated to the software origin, e.g., "refs/heads/master". Defaults to the HEAD branch. :param string timestamp: optional parameter specifying a timestamp close to which the revision pointed by the given branch should be looked up. The timestamp can be expressed either as an ISO date or as a Unix one (in UTC). Defaults to now. {common_headers} {return_revision_array} - **Allowed HTTP Methods:** :http:method:`get`, :http:method:`head`, :http:method:`options` + **Allowed HTTP Methods:** :http:method:`get`, :http:method:`head`, + :http:method:`options` :statuscode 200: no error :statuscode 404: no revision matching the given criteria could be found in the archive **Example:** .. parsed-literal:: :swh_web_api:`revision/origin/723566/ts/2016-01-17T00:00:00+00:00/log/` """ # noqa result = {} per_page = int(request.query_params.get('per_page', '10')) def lookup_revision_log_by_with_limit(o_id, br, ts, limit=per_page+1): return service.lookup_revision_log_by(o_id, br, ts, limit) error_msg = 'No revision matching origin %s ' % origin_id error_msg += ', branch name %s' % branch_name error_msg += (' and time stamp %s.' % ts) if ts else '.' rev_get = api_lookup( lookup_revision_log_by_with_limit, int(origin_id), branch_name, ts, notfound_msg=error_msg, enrich_fn=utils.enrich_revision) nb_rev = len(rev_get) if nb_rev == per_page+1: revisions = rev_get[:-1] last_sha1_git = rev_get[-1]['id'] params = {k: v for k, v in {'origin_id': origin_id, 'branch_name': branch_name, 'ts': ts, }.items() if v is not None} query_params = {} query_params['sha1_git'] = last_sha1_git if request.query_params.get('per_page'): query_params['per_page'] = per_page result['headers'] = { 'link-next': reverse('api-1-revision-origin-log', url_args=params, query_params=query_params) } else: revisions = rev_get result.update({'results': revisions}) return result @api_route(r'/revision/origin/(?P[0-9]+)/directory/', 'api-1-revision-origin-directory') @api_route(r'/revision/origin/(?P[0-9]+)/directory/(?P.+)/', 'api-1-revision-origin-directory') @api_route(r'/revision/origin/(?P[0-9]+)' r'/branch/(?P.+)/directory/', 'api-1-revision-origin-directory') @api_route(r'/revision/origin/(?P[0-9]+)' r'/branch/(?P.+)/ts/(?P.+)/directory/', 'api-1-revision-origin-directory') @api_route(r'/revision/origin/(?P[0-9]+)' r'/branch/(?P.+)/directory/(?P.+)/', 'api-1-revision-origin-directory') @api_route(r'/revision/origin/(?P[0-9]+)' r'/branch/(?P.+)/ts/(?P.+)' r'/directory/(?P.+)/', 'api-1-revision-origin-directory') @api_doc('/revision/origin/directory/', tags=['hidden']) def api_directory_through_revision_origin(request, origin_id, branch_name='HEAD', ts=None, path=None, with_data=False): """ Display directory or content information through a revision identified by origin/branch/timestamp. .. warning:: All endpoints using an ``origin_id`` are deprecated and will be removed in the near future. Only those using an ``origin_url`` will remain available. You should instead use successively :http:get:`/api/1/origin/(origin_url)/visits/`, :http:get:`/api/1/snapshot/(snapshot_id)/`, :http:get:`/api/1/revision/(sha1_git)/`, :http:get:`/api/1/directory/(sha1_git)/[(path)/]` """ if ts: ts = parse_timestamp(ts) return _revision_directory_by({'origin_id': int(origin_id), 'branch_name': branch_name, 'ts': ts }, path, request.path, with_data=with_data) @api_route(r'/revision/origin/(?P[0-9]+)/', 'api-1-revision-origin') @api_route(r'/revision/origin/(?P[0-9]+)' r'/branch/(?P.+)/', 'api-1-revision-origin') @api_route(r'/revision/origin/(?P[0-9]+)' r'/branch/(?P.+)/ts/(?P.+)/', 'api-1-revision-origin') @api_route(r'/revision/origin/(?P[0-9]+)/ts/(?P.+)/', 'api-1-revision-origin') @api_doc('/revision/origin/') @format_docstring(return_revision=DOC_RETURN_REVISION) def api_revision_with_origin(request, origin_id, branch_name='HEAD', ts=None): """ .. http:get:: /api/1/revision/origin/(origin_id)/[branch/(branch_name)/][ts/(timestamp)/] Get information about a revision, searching for it based on software origin, branch name, and/or visit timestamp. This endpoint behaves like :http:get:`/api/1/revision/(sha1_git)/`, but operates on the revision that has been found at a given software origin, close to a given point in time, pointed by a given branch. .. warning:: All endpoints using an ``origin_id`` are deprecated and will be removed in the near future. Only those using an ``origin_url`` will remain available. You should instead use successively :http:get:`/api/1/origin/(origin_url)/visits/`, :http:get:`/api/1/snapshot/(snapshot_id)/`, and :http:get:`/api/1/revision/(sha1_git)/`. :param int origin_id: a software origin identifier :param string branch_name: optional parameter specifying a fully-qualified branch name associated to the software origin, e.g., "refs/heads/master". Defaults to the HEAD branch. :param string timestamp: optional parameter specifying a timestamp close to which the revision pointed by the given branch should be looked up. The timestamp can be expressed either as an ISO date or as a Unix one (in UTC). Defaults to now. {common_headers} {return_revision} - **Allowed HTTP Methods:** :http:method:`get`, :http:method:`head`, :http:method:`options` + **Allowed HTTP Methods:** :http:method:`get`, :http:method:`head`, + :http:method:`options` :statuscode 200: no error :statuscode 404: no revision matching the given criteria could be found in the archive **Example:** .. parsed-literal:: :swh_web_api:`revision/origin/13706355/branch/refs/heads/2.7/` """ # noqa return api_lookup( service.lookup_revision_by, int(origin_id), branch_name, ts, notfound_msg=('Revision with (origin_id: {}, branch_name: {}' ', ts: {}) not found.'.format(origin_id, branch_name, ts)), enrich_fn=utils.enrich_revision) @api_route(r'/revision/(?P[0-9a-f]+)/', 'api-1-revision', checksum_args=['sha1_git']) @api_doc('/revision/') @format_docstring(return_revision=DOC_RETURN_REVISION) def api_revision(request, sha1_git): """ .. http:get:: /api/1/revision/(sha1_git)/ Get information about a revision in the archive. Revisions are identified by **sha1** checksums, compatible with Git commit identifiers. See :func:`swh.model.identifiers.revision_identifier` in our data model module for details about how they are computed. :param string sha1_git: hexadecimal representation of the revision **sha1_git** identifier {common_headers} {return_revision} **Allowed HTTP Methods:** :http:method:`get`, :http:method:`head`, - :http:method:`options` + :http:method:`options` :statuscode 200: no error :statuscode 400: an invalid **sha1_git** value has been provided :statuscode 404: requested revision can not be found in the archive **Example:** .. parsed-literal:: :swh_web_api:`revision/aafb16d69fd30ff58afdd69036a26047f3aebdc6/` """ # noqa return api_lookup( service.lookup_revision, sha1_git, notfound_msg='Revision with sha1_git {} not found.'.format(sha1_git), enrich_fn=utils.enrich_revision) @api_route(r'/revision/(?P[0-9a-f]+)/raw/', 'api-1-revision-raw-message', checksum_args=['sha1_git']) @api_doc('/revision/raw/', tags=['hidden'], handle_response=True) def api_revision_raw_message(request, sha1_git): """Return the raw data of the message of revision identified by sha1_git """ raw = service.lookup_revision_message(sha1_git) response = HttpResponse(raw['message'], content_type='application/octet-stream') response['Content-disposition'] = \ 'attachment;filename=rev_%s_raw' % sha1_git return response @api_route(r'/revision/(?P[0-9a-f]+)/directory/', 'api-1-revision-directory', checksum_args=['sha1_git']) @api_route(r'/revision/(?P[0-9a-f]+)/directory/(?P.+)/', 'api-1-revision-directory', checksum_args=['sha1_git']) @api_doc('/revision/directory/') @format_docstring() def api_revision_directory(request, sha1_git, dir_path=None, with_data=False): """ .. http:get:: /api/1/revision/(sha1_git)/directory/[(path)/] Get information about directory (entry) objects associated to revisions. Each revision is associated to a single "root" directory. This endpoint behaves like :http:get:`/api/1/directory/(sha1_git)/[(path)/]`, but operates on the root directory associated to a given revision. :param string sha1_git: hexadecimal representation of the revision **sha1_git** identifier :param string path: optional parameter to get information about the directory entry pointed by that relative path {common_headers} :>json array content: directory entries as returned by :http:get:`/api/1/directory/(sha1_git)/[(path)/]` :>json string path: path of directory from the revision root one :>json string revision: the unique revision identifier :>json string type: the type of the directory - **Allowed HTTP Methods:** :http:method:`get`, :http:method:`head`, :http:method:`options` + **Allowed HTTP Methods:** :http:method:`get`, :http:method:`head`, + :http:method:`options` :statuscode 200: no error :statuscode 400: an invalid **sha1_git** value has been provided :statuscode 404: requested revision can not be found in the archive **Example:** .. parsed-literal:: :swh_web_api:`revision/f1b94134a4b879bc55c3dacdb496690c8ebdc03f/directory/` """ # noqa return _revision_directory_by({'sha1_git': sha1_git}, dir_path, request.path, with_data=with_data) @api_route(r'/revision/(?P[0-9a-f]+)/log/', 'api-1-revision-log', checksum_args=['sha1_git']) @api_route(r'/revision/(?P[0-9a-f]+)' r'/prev/(?P[0-9a-f]*/*)/log/', 'api-1-revision-log', checksum_args=['sha1_git', 'prev_sha1s']) @api_doc('/revision/log/') @format_docstring(return_revision_array=DOC_RETURN_REVISION_ARRAY) def api_revision_log(request, sha1_git, prev_sha1s=None): """ .. http:get:: /api/1/revision/(sha1_git)[/prev/(prev_sha1s)]/log/ Get a list of all revisions heading to a given one, in other words show the commit log. :param string sha1_git: hexadecimal representation of the revision **sha1_git** identifier :param string prev_sha1s: optional parameter representing the navigation breadcrumbs (descendant revisions previously visited). If multiple values, use / as delimiter. If provided, revisions information will be added at the beginning of the returned list. :query int per_page: number of elements in the returned list, for pagination purpose {common_headers} {resheader_link} {return_revision_array} - **Allowed HTTP Methods:** :http:method:`get`, :http:method:`head`, :http:method:`options` + **Allowed HTTP Methods:** :http:method:`get`, :http:method:`head`, + :http:method:`options` :statuscode 200: no error :statuscode 400: an invalid **sha1_git** value has been provided :statuscode 404: requested revision can not be found in the archive **Example:** .. parsed-literal:: :swh_web_api:`revision/e1a315fa3fa734e2a6154ed7b5b9ae0eb8987aad/log/` """ # noqa result = {} per_page = int(request.query_params.get('per_page', '10')) def lookup_revision_log_with_limit(s, limit=per_page+1): return service.lookup_revision_log(s, limit) error_msg = 'Revision with sha1_git %s not found.' % sha1_git rev_get = api_lookup(lookup_revision_log_with_limit, sha1_git, notfound_msg=error_msg, enrich_fn=utils.enrich_revision) nb_rev = len(rev_get) if nb_rev == per_page+1: rev_backward = rev_get[:-1] new_last_sha1 = rev_get[-1]['id'] query_params = {} if request.query_params.get('per_page'): query_params['per_page'] = per_page result['headers'] = { 'link-next': reverse('api-1-revision-log', url_args={'sha1_git': new_last_sha1}, query_params=query_params) } else: rev_backward = rev_get if not prev_sha1s: # no nav breadcrumbs, so we're done revisions = rev_backward else: rev_forward_ids = prev_sha1s.split('/') rev_forward = api_lookup( service.lookup_revision_multiple, rev_forward_ids, notfound_msg=error_msg, enrich_fn=utils.enrich_revision) revisions = rev_forward + rev_backward result.update({ 'results': revisions }) return result diff --git a/swh/web/api/views/snapshot.py b/swh/web/api/views/snapshot.py index fcf85e21..5a88d710 100644 --- a/swh/web/api/views/snapshot.py +++ b/swh/web/api/views/snapshot.py @@ -1,119 +1,119 @@ # Copyright (C) 2018-2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information from swh.web.common import service from swh.web.common.utils import reverse from swh.web.config import get_config from swh.web.api.apidoc import api_doc, format_docstring from swh.web.api import utils from swh.web.api.apiurls import api_route from swh.web.api.views.utils import api_lookup @api_route(r'/snapshot/(?P[0-9a-f]+)/', 'api-1-snapshot', checksum_args=['snapshot_id']) @api_doc('/snapshot/') @format_docstring() def api_snapshot(request, snapshot_id): """ .. http:get:: /api/1/snapshot/(snapshot_id)/ Get information about a snapshot in the archive. A snapshot is a set of named branches, which are pointers to objects at any level of the Software Heritage DAG. It represents a full picture of an origin at a given time. As well as pointing to other objects in the Software Heritage DAG, branches can also be aliases, in which case their target is the name of another branch in the same snapshot, or dangling, in which case the target is unknown. A snapshot identifier is a salted sha1. See :func:`swh.model.identifiers.snapshot_identifier` in our data model module for details about how they are computed. :param sha1 snapshot_id: a snapshot identifier :query str branches_from: optional parameter used to skip branches whose name is lesser than it before returning them :query int branches_count: optional parameter used to restrain the amount of returned branches (default to 1000) :query str target_types: optional comma separated list parameter used to filter the target types of branch to return (possible values that can be contained in that list are ``content``, ``directory``, ``revision``, ``release``, ``snapshot`` or ``alias``) {common_headers} {resheader_link} :>json object branches: object containing all branches associated to the snapshot,for each of them the associated target type and id are given but also a link to get information about that target :>json string id: the unique identifier of the snapshot **Allowed HTTP Methods:** :http:method:`get`, :http:method:`head`, - :http:method:`options` + :http:method:`options` :statuscode 200: no error :statuscode 400: an invalid snapshot identifier has been provided :statuscode 404: requested snapshot can not be found in the archive **Example:** .. parsed-literal:: :swh_web_api:`snapshot/6a3a2cf0b2b90ce7ae1cf0a221ed68035b686f5a/` """ def _enrich_snapshot(snapshot): s = snapshot.copy() if 'branches' in s: s['branches'] = { k: utils.enrich_object(v) if v else None for k, v in s['branches'].items() } for k, v in s['branches'].items(): if v and v['target_type'] == 'alias': if v['target'] in s['branches']: branch_alias = s['branches'][v['target']] if branch_alias: v['target_url'] = branch_alias['target_url'] else: snp = \ service.lookup_snapshot(s['id'], branches_from=v['target'], branches_count=1) if snp and v['target'] in snp['branches']: branch = snp['branches'][v['target']] branch = utils.enrich_object(branch) v['target_url'] = branch['target_url'] return s snapshot_content_max_size = get_config()['snapshot_content_max_size'] branches_from = request.GET.get('branches_from', '') branches_count = int(request.GET.get('branches_count', snapshot_content_max_size)) target_types = request.GET.get('target_types', None) target_types = target_types.split(',') if target_types else None results = api_lookup( service.lookup_snapshot, snapshot_id, branches_from, branches_count, target_types, notfound_msg='Snapshot with id {} not found.'.format(snapshot_id), enrich_fn=_enrich_snapshot) response = {'results': results, 'headers': {}} if results['next_branch'] is not None: response['headers']['link-next'] = \ reverse('api-1-snapshot', url_args={'snapshot_id': snapshot_id}, query_params={'branches_from': results['next_branch'], 'branches_count': branches_count, 'target_types': target_types}) return response diff --git a/swh/web/api/views/stat.py b/swh/web/api/views/stat.py index 608c2e25..edddcc96 100644 --- a/swh/web/api/views/stat.py +++ b/swh/web/api/views/stat.py @@ -1,53 +1,53 @@ # Copyright (C) 2015-2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information from swh.web.common import service from swh.web.api.apidoc import api_doc, format_docstring from swh.web.api.apiurls import api_route @api_route(r'/stat/counters/', 'api-1-stat-counters') @api_doc('/stat/counters/', noargs=True) @format_docstring() def api_stats(request): """ .. http:get:: /api/1/stat/counters/ Get statistics about the content of the archive. :>json number content: current number of content objects (aka files) in the archive :>json number directory: current number of directory objects in the archive :>json number origin: current number of software origins (an origin is a "place" where code source can be found, e.g. a git repository, a tarball, ...) in the archive :>json number origin_visit: current number of visits on software origins to fill the archive :>json number person: current number of persons (code source authors or committers) in the archive :>json number release: current number of releases objects in the archive :>json number revision: current number of revision objects (aka commits) in the archive :>json number skipped_content: current number of content objects (aka files) which where not inserted in the archive :>json number snapshot: current number of snapshot objects (aka set of named branches) in the archive {common_headers} **Allowed HTTP Methods:** :http:method:`get`, :http:method:`head`, - :http:method:`options` + :http:method:`options` :statuscode 200: no error **Example:** .. parsed-literal:: :swh_web_api:`stat/counters/` """ return service.stat_counters() diff --git a/swh/web/api/views/vault.py b/swh/web/api/views/vault.py index 75c12fde..0b62daba 100644 --- a/swh/web/api/views/vault.py +++ b/swh/web/api/views/vault.py @@ -1,240 +1,240 @@ # Copyright (C) 2015-2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information from django.http import HttpResponse from django.shortcuts import redirect from django.views.decorators.cache import never_cache from swh.model import hashutil from swh.web.common import service, query from swh.web.common.utils import reverse from swh.web.api.apidoc import api_doc, format_docstring from swh.web.api.apiurls import api_route from swh.web.api.views.utils import api_lookup # XXX: a bit spaghetti. Would be better with class-based views. def _dispatch_cook_progress(request, obj_type, obj_id): hex_id = hashutil.hash_to_hex(obj_id) object_name = obj_type.split('_')[0].title() if request.method == 'GET': return api_lookup( service.vault_progress, obj_type, obj_id, notfound_msg=("{} '{}' was never requested." .format(object_name, hex_id))) elif request.method == 'POST': email = request.POST.get('email', request.GET.get('email', None)) return api_lookup( service.vault_cook, obj_type, obj_id, email, notfound_msg=("{} '{}' not found." .format(object_name, hex_id))) @api_route(r'/vault/directory/(?P[0-9a-f]+)/', 'api-1-vault-cook-directory', methods=['GET', 'POST'], checksum_args=['dir_id'], throttle_scope='swh_vault_cooking') @never_cache @api_doc('/vault/directory/') @format_docstring() def api_vault_cook_directory(request, dir_id): """ .. http:get:: /api/1/vault/directory/(dir_id)/ .. http:post:: /api/1/vault/directory/(dir_id)/ Request the cooking of an archive for a directory or check its cooking status. That endpoint enables to create a vault cooking task for a directory through a POST request or check the status of a previously created one through a GET request. Once the cooking task has been executed, the resulting archive can be downloaded using the dedicated endpoint - :http:get:`/api/1/vault/directory/(dir_id)/raw/`. + :http:get:`/api/1/vault/directory/(dir_id)/raw/`. Then to extract the cooked directory in the current one, use:: $ tar xvf path/to/directory.tar.gz :param string dir_id: the directory's sha1 identifier :query string email: e-mail to notify when the archive is ready {common_headers} :>json string fetch_url: the url from which to download the archive once it has been cooked (see :http:get:`/api/1/vault/directory/(dir_id)/raw/`) :>json string obj_type: the type of object to cook (directory or revision) :>json string progress_message: message describing the cooking task progress :>json number id: the cooking task id :>json string status: the cooking task status (either **new**, **pending**, **done** or **failed**) :>json string obj_id: the identifier of the object to cook **Allowed HTTP Methods:** :http:method:`get`, :http:method:`post`, - :http:method:`head`, :http:method:`options` + :http:method:`head`, :http:method:`options` :statuscode 200: no error :statuscode 400: an invalid directory identifier has been provided :statuscode 404: requested directory can not be found in the archive """ _, obj_id = query.parse_hash_with_algorithms_or_throws( dir_id, ['sha1'], 'Only sha1_git is supported.') res = _dispatch_cook_progress(request, 'directory', obj_id) res['fetch_url'] = reverse('api-1-vault-fetch-directory', url_args={'dir_id': dir_id}) return res @api_route(r'/vault/directory/(?P[0-9a-f]+)/raw/', 'api-1-vault-fetch-directory', checksum_args=['dir_id']) @api_doc('/vault/directory/raw/', handle_response=True) def api_vault_fetch_directory(request, dir_id): """ .. http:get:: /api/1/vault/directory/(dir_id)/raw/ Fetch the cooked archive for a directory. See :http:get:`/api/1/vault/directory/(dir_id)/` to get more details on directory cooking. :param string dir_id: the directory's sha1 identifier :resheader Content-Type: application/octet-stream **Allowed HTTP Methods:** :http:method:`get`, :http:method:`head`, - :http:method:`options` + :http:method:`options` :statuscode 200: no error :statuscode 400: an invalid directory identifier has been provided :statuscode 404: requested directory can not be found in the archive """ _, obj_id = query.parse_hash_with_algorithms_or_throws( dir_id, ['sha1'], 'Only sha1_git is supported.') res = api_lookup( service.vault_fetch, 'directory', obj_id, notfound_msg="Directory with ID '{}' not found.".format(dir_id)) fname = '{}.tar.gz'.format(dir_id) response = HttpResponse(res, content_type='application/gzip') response['Content-disposition'] = 'attachment; filename={}'.format(fname) return response @api_route(r'/vault/revision/(?P[0-9a-f]+)/gitfast/', 'api-1-vault-cook-revision_gitfast', methods=['GET', 'POST'], checksum_args=['rev_id'], throttle_scope='swh_vault_cooking') @never_cache @api_doc('/vault/revision/gitfast/') @format_docstring() def api_vault_cook_revision_gitfast(request, rev_id): """ .. http:get:: /api/1/vault/revision/(rev_id)/gitfast/ .. http:post:: /api/1/vault/revision/(rev_id)/gitfast/ Request the cooking of a gitfast archive for a revision or check its cooking status. That endpoint enables to create a vault cooking task for a revision through a POST request or check the status of a previously created one through a GET request. Once the cooking task has been executed, the resulting gitfast archive can be downloaded using the dedicated endpoint - :http:get:`/api/1/vault/revision/(rev_id)/gitfast/raw/`. + :http:get:`/api/1/vault/revision/(rev_id)/gitfast/raw/`. Then to import the revision in the current directory, use:: $ git init $ zcat path/to/revision.gitfast.gz | git fast-import $ git checkout HEAD :param string rev_id: the revision's sha1 identifier :query string email: e-mail to notify when the gitfast archive is ready {common_headers} :>json string fetch_url: the url from which to download the archive once it has been cooked (see :http:get:`/api/1/vault/revision/(rev_id)/gitfast/raw/`) :>json string obj_type: the type of object to cook (directory or revision) :>json string progress_message: message describing the cooking task progress :>json number id: the cooking task id :>json string status: the cooking task status (new/pending/done/failed) :>json string obj_id: the identifier of the object to cook **Allowed HTTP Methods:** :http:method:`get`, :http:method:`post`, - :http:method:`head`, :http:method:`options` + :http:method:`head`, :http:method:`options` :statuscode 200: no error :statuscode 400: an invalid revision identifier has been provided :statuscode 404: requested revision can not be found in the archive """ _, obj_id = query.parse_hash_with_algorithms_or_throws( rev_id, ['sha1'], 'Only sha1_git is supported.') res = _dispatch_cook_progress(request, 'revision_gitfast', obj_id) res['fetch_url'] = reverse('api-1-vault-fetch-revision_gitfast', url_args={'rev_id': rev_id}) return res @api_route(r'/vault/revision/(?P[0-9a-f]+)/gitfast/raw/', 'api-1-vault-fetch-revision_gitfast', checksum_args=['rev_id']) @api_doc('/vault/revision/gitfast/raw/', handle_response=True) def api_vault_fetch_revision_gitfast(request, rev_id): """ .. http:get:: /api/1/vault/revision/(rev_id)/gitfast/raw/ Fetch the cooked gitfast archive for a revision. See :http:get:`/api/1/vault/revision/(rev_id)/gitfast/` to get more details on directory cooking. :param string rev_id: the revision's sha1 identifier :resheader Content-Type: application/octet-stream **Allowed HTTP Methods:** :http:method:`get`, :http:method:`head`, - :http:method:`options` + :http:method:`options` :statuscode 200: no error :statuscode 400: an invalid revision identifier has been provided :statuscode 404: requested revision can not be found in the archive """ _, obj_id = query.parse_hash_with_algorithms_or_throws( rev_id, ['sha1'], 'Only sha1_git is supported.') res = api_lookup( service.vault_fetch, 'revision_gitfast', obj_id, notfound_msg="Revision with ID '{}' not found.".format(rev_id)) fname = '{}.gitfast.gz'.format(rev_id) response = HttpResponse(res, content_type='application/gzip') response['Content-disposition'] = 'attachment; filename={}'.format(fname) return response @api_route(r'/vault/revision_gitfast/(?P[0-9a-f]+)/raw/', 'api-1-vault-revision_gitfast-raw', checksum_args=['rev_id']) @api_doc('/vault/revision_gitfast/raw/', tags=['hidden'], handle_response=True) def _api_vault_revision_gitfast_raw(request, rev_id): """ The vault backend sends an email containing an invalid url to fetch a gitfast archive. So setup a redirection to the correct one as a temporary workaround. """ rev_gitfast_raw_url = reverse('api-1-vault-fetch-revision_gitfast', url_args={'rev_id': rev_id}) return redirect(rev_gitfast_raw_url) diff --git a/swh/web/browse/utils.py b/swh/web/browse/utils.py index 45e5369c..0962f604 100644 --- a/swh/web/browse/utils.py +++ b/swh/web/browse/utils.py @@ -1,1111 +1,1111 @@ # Copyright (C) 2017-2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information import base64 import magic import pypandoc import stat import textwrap from collections import defaultdict from threading import Lock from django.core.cache import cache from django.utils.safestring import mark_safe from django.utils.html import escape from swh.model.identifiers import persistent_identifier from swh.web.common import highlightjs, service from swh.web.common.exc import NotFoundExc, http_status_code_message from swh.web.common.origin_visits import get_origin_visit from swh.web.common.utils import ( reverse, format_utc_iso_date, get_swh_persistent_id, swh_object_icons ) from swh.web.config import get_config def get_directory_entries(sha1_git): """Function that retrieves the content of a directory from the archive. The directories entries are first sorted in lexicographical order. Sub-directories and regular files are then extracted. Args: sha1_git: sha1_git identifier of the directory Returns: A tuple whose first member corresponds to the sub-directories list and second member the regular files list Raises: NotFoundExc if the directory is not found """ cache_entry_id = 'directory_entries_%s' % sha1_git cache_entry = cache.get(cache_entry_id) if cache_entry: return cache_entry entries = list(service.lookup_directory(sha1_git)) for e in entries: e['perms'] = stat.filemode(e['perms']) if e['type'] == 'rev': # modify dir entry name to explicitly show it points # to a revision e['name'] = '%s @ %s' % (e['name'], e['target'][:7]) dirs = [e for e in entries if e['type'] in ('dir', 'rev')] files = [e for e in entries if e['type'] == 'file'] dirs = sorted(dirs, key=lambda d: d['name']) files = sorted(files, key=lambda f: f['name']) cache.set(cache_entry_id, (dirs, files)) return dirs, files _lock = Lock() def get_mimetype_and_encoding_for_content(content): """Function that returns the mime type and the encoding associated to a content buffer using the magic module under the hood. Args: content (bytes): a content buffer Returns: A tuple (mimetype, encoding), for instance ('text/plain', 'us-ascii'), associated to the provided content. """ # https://pypi.org/project/python-magic/ # packaged as python3-magic in debian buster if hasattr(magic, 'from_buffer'): m = magic.Magic(mime=True, mime_encoding=True) mime_encoding = m.from_buffer(content) mime_type, encoding = mime_encoding.split(';') encoding = encoding.replace(' charset=', '') # https://pypi.org/project/file-magic/ # packaged as python3-magic in debian stretch else: # TODO: Remove that code when production environment is upgraded # to debian buster # calls to the file-magic API are not thread-safe so they must # be protected with a Lock to guarantee they will succeed _lock.acquire() magic_result = magic.detect_from_content(content) _lock.release() mime_type = magic_result.mime_type encoding = magic_result.encoding return mime_type, encoding # maximum authorized content size in bytes for HTML display # with code highlighting content_display_max_size = get_config()['content_display_max_size'] snapshot_content_max_size = get_config()['snapshot_content_max_size'] def _re_encode_content(mimetype, encoding, content_data): # encode textual content to utf-8 if needed if mimetype.startswith('text/'): # probably a malformed UTF-8 content, re-encode it # by replacing invalid chars with a substitution one if encoding == 'unknown-8bit': content_data = content_data.decode('utf-8', 'replace')\ .encode('utf-8') elif encoding not in ['utf-8', 'binary']: content_data = content_data.decode(encoding, 'replace')\ .encode('utf-8') elif mimetype.startswith('application/octet-stream'): # file may detect a text content as binary # so try to decode it for display encodings = ['us-ascii'] encodings += ['iso-8859-%s' % i for i in range(1, 17)] for encoding in encodings: try: content_data = content_data.decode(encoding)\ .encode('utf-8') except Exception: pass else: # ensure display in content view mimetype = 'text/plain' break return mimetype, content_data def request_content(query_string, max_size=content_display_max_size, raise_if_unavailable=True, re_encode=True): """Function that retrieves a content from the archive. Raw bytes content is first retrieved, then the content mime type. If the mime type is not stored in the archive, it will be computed using Python magic module. Args: query_string: a string of the form "[ALGO_HASH:]HASH" where optional ALGO_HASH can be either ``sha1``, ``sha1_git``, ``sha256``, or ``blake2s256`` (default to ``sha1``) and HASH the hexadecimal representation of the hash value max_size: the maximum size for a content to retrieve (default to 1MB, no size limit if None) Returns: A tuple whose first member corresponds to the content raw bytes and second member the content mime type Raises: NotFoundExc if the content is not found """ content_data = service.lookup_content(query_string) filetype = None language = None license = None # requests to the indexer db may fail so properly handle # those cases in order to avoid content display errors try: filetype = service.lookup_content_filetype(query_string) language = service.lookup_content_language(query_string) license = service.lookup_content_license(query_string) except Exception: pass mimetype = 'unknown' encoding = 'unknown' if filetype: mimetype = filetype['mimetype'] encoding = filetype['encoding'] # workaround when encountering corrupted data due to implicit # conversion from bytea to text in the indexer db (see T818) # TODO: Remove that code when all data have been correctly converted if mimetype.startswith('\\'): filetype = None content_data['error_code'] = 200 content_data['error_message'] = '' content_data['error_description'] = '' if not max_size or content_data['length'] < max_size: try: content_raw = service.lookup_content_raw(query_string) except Exception as e: if raise_if_unavailable: raise e else: content_data['raw_data'] = None content_data['error_code'] = 404 content_data['error_description'] = \ 'The bytes of the content are currently not available in the archive.' # noqa content_data['error_message'] = \ http_status_code_message[content_data['error_code']] else: content_data['raw_data'] = content_raw['data'] if not filetype: mimetype, encoding = \ get_mimetype_and_encoding_for_content(content_data['raw_data']) # noqa if re_encode: mimetype, raw_data = _re_encode_content( mimetype, encoding, content_data['raw_data']) content_data['raw_data'] = raw_data else: content_data['raw_data'] = None content_data['mimetype'] = mimetype content_data['encoding'] = encoding if language: content_data['language'] = language['lang'] else: content_data['language'] = 'not detected' if license: content_data['licenses'] = ', '.join(license['facts'][0]['licenses']) else: content_data['licenses'] = 'not detected' return content_data _browsers_supported_image_mimes = set(['image/gif', 'image/png', 'image/jpeg', 'image/bmp', 'image/webp', 'image/svg', 'image/svg+xml']) def prepare_content_for_display(content_data, mime_type, path): """Function that prepares a content for HTML display. The function tries to associate a programming language to a content in order to perform syntax highlighting client-side using highlightjs. The language is determined using either the content filename or its mime type. If the mime type corresponds to an image format supported by web browsers, the content will be encoded in base64 for displaying the image. Args: content_data (bytes): raw bytes of the content mime_type (string): mime type of the content path (string): path of the content including filename Returns: A dict containing the content bytes (possibly different from the one provided as parameter if it is an image) under the key 'content_data and the corresponding highlightjs language class under the key 'language'. """ language = highlightjs.get_hljs_language_from_filename(path) if not language: language = highlightjs.get_hljs_language_from_mime_type(mime_type) if not language: language = 'nohighlight' elif mime_type.startswith('application/'): mime_type = mime_type.replace('application/', 'text/') if mime_type.startswith('image/'): if mime_type in _browsers_supported_image_mimes: content_data = base64.b64encode(content_data) content_data = content_data.decode('utf-8') else: content_data = None if mime_type.startswith('image/svg'): mime_type = 'image/svg+xml' return {'content_data': content_data, 'language': language, 'mimetype': mime_type} def process_snapshot_branches(snapshot): """ Process a dictionary describing snapshot branches: extract those targeting revisions and releases, put them in two different lists, then sort those lists in lexicographical order of the branches' names. Args: snapshot_branches (dict): A dict describing the branches of a snapshot as returned for instance by :func:`swh.web.common.service.lookup_snapshot` Returns: tuple: A tuple whose first member is the sorted list of branches targeting revisions and second member the sorted list of branches targeting releases """ snapshot_branches = snapshot['branches'] branches = {} branch_aliases = {} releases = {} revision_to_branch = defaultdict(set) revision_to_release = defaultdict(set) release_to_branch = defaultdict(set) for branch_name, target in snapshot_branches.items(): if not target: # FIXME: display branches with an unknown target anyway continue target_id = target['target'] target_type = target['target_type'] if target_type == 'revision': branches[branch_name] = { 'name': branch_name, 'revision': target_id, } revision_to_branch[target_id].add(branch_name) elif target_type == 'release': release_to_branch[target_id].add(branch_name) elif target_type == 'alias': branch_aliases[branch_name] = target_id # FIXME: handle pointers to other object types def _enrich_release_branch(branch, release): releases[branch] = { 'name': release['name'], 'branch_name': branch, 'date': format_utc_iso_date(release['date']), 'id': release['id'], 'message': release['message'], 'target_type': release['target_type'], 'target': release['target'], } def _enrich_revision_branch(branch, revision): branches[branch].update({ 'revision': revision['id'], 'directory': revision['directory'], 'date': format_utc_iso_date(revision['date']), 'message': revision['message'] }) releases_info = service.lookup_release_multiple( release_to_branch.keys() ) for release in releases_info: branches_to_update = release_to_branch[release['id']] for branch in branches_to_update: _enrich_release_branch(branch, release) if release['target_type'] == 'revision': revision_to_release[release['target']].update( branches_to_update ) revisions = service.lookup_revision_multiple( set(revision_to_branch.keys()) | set(revision_to_release.keys()) ) for revision in revisions: if not revision: continue for branch in revision_to_branch[revision['id']]: _enrich_revision_branch(branch, revision) for release in revision_to_release[revision['id']]: releases[release]['directory'] = revision['directory'] for branch_alias, branch_target in branch_aliases.items(): if branch_target in branches: branches[branch_alias] = dict(branches[branch_target]) else: snp = service.lookup_snapshot(snapshot['id'], branches_from=branch_target, branches_count=1) if snp and branch_target in snp['branches']: if snp['branches'][branch_target] is None: continue target_type = snp['branches'][branch_target]['target_type'] target = snp['branches'][branch_target]['target'] if target_type == 'revision': branches[branch_alias] = snp['branches'][branch_target] revision = service.lookup_revision(target) _enrich_revision_branch(branch_alias, revision) elif target_type == 'release': release = service.lookup_release(target) _enrich_release_branch(branch_alias, release) if branch_alias in branches: branches[branch_alias]['name'] = branch_alias ret_branches = list(sorted(branches.values(), key=lambda b: b['name'])) ret_releases = list(sorted(releases.values(), key=lambda b: b['name'])) return ret_branches, ret_releases def get_snapshot_content(snapshot_id): """Returns the lists of branches and releases associated to a swh snapshot. That list is put in cache in order to speedup the navigation in the swh-web/browse ui. .. warning:: At most 1000 branches contained in the snapshot will be returned for performance reasons. Args: snapshot_id (str): hexadecimal representation of the snapshot identifier Returns: A tuple with two members. The first one is a list of dict describing the snapshot branches. The second one is a list of dict describing the snapshot releases. Raises: NotFoundExc if the snapshot does not exist """ cache_entry_id = 'swh_snapshot_%s' % snapshot_id cache_entry = cache.get(cache_entry_id) if cache_entry: return cache_entry['branches'], cache_entry['releases'] branches = [] releases = [] if snapshot_id: snapshot = service.lookup_snapshot( snapshot_id, branches_count=snapshot_content_max_size) branches, releases = process_snapshot_branches(snapshot) cache.set(cache_entry_id, { 'branches': branches, 'releases': releases, }) return branches, releases def get_origin_visit_snapshot(origin_info, visit_ts=None, visit_id=None, snapshot_id=None): """Returns the lists of branches and releases associated to a swh origin for a given visit. The visit is expressed by a timestamp. In the latter case, the closest visit from the provided timestamp will be used. If no visit parameter is provided, it returns the list of branches found for the latest visit. That list is put in cache in order to speedup the navigation in the swh-web/browse ui. .. warning:: At most 1000 branches contained in the snapshot will be returned for performance reasons. Args: origin_info (dict): a dict filled with origin information (id, url, type) visit_ts (int or str): an ISO date string or Unix timestamp to parse visit_id (int): optional visit id for disambiguation in case several visits have the same timestamp Returns: A tuple with two members. The first one is a list of dict describing the origin branches for the given visit. The second one is a list of dict describing the origin releases for the given visit. Raises: NotFoundExc if the origin or its visit are not found """ visit_info = get_origin_visit(origin_info, visit_ts, visit_id, snapshot_id) return get_snapshot_content(visit_info['snapshot']) def gen_link(url, link_text=None, link_attrs=None): """ Utility function for generating an HTML link to insert in Django templates. Args: url (str): an url link_text (str): optional text for the produced link, if not provided the url will be used link_attrs (dict): optional attributes (e.g. class) to add to the link Returns: An HTML link in the form 'link_text' """ attrs = ' ' if link_attrs: for k, v in link_attrs.items(): attrs += '%s="%s" ' % (k, v) if not link_text: link_text = url link = '%s' \ % (attrs, escape(url), escape(link_text)) return mark_safe(link) def _snapshot_context_query_params(snapshot_context): query_params = None if snapshot_context and snapshot_context['origin_info']: origin_info = snapshot_context['origin_info'] query_params = {'origin': origin_info['url']} if 'timestamp' in snapshot_context['url_args']: query_params['timestamp'] = \ snapshot_context['url_args']['timestamp'] if 'visit_id' in snapshot_context['query_params']: query_params['visit_id'] = \ snapshot_context['query_params']['visit_id'] elif snapshot_context: query_params = {'snapshot_id': snapshot_context['snapshot_id']} return query_params def gen_revision_url(revision_id, snapshot_context=None): """ Utility function for generating an url to a revision. Args: revision_id (str): a revision id snapshot_context (dict): if provided, generate snapshot-dependent browsing url Returns: str: The url to browse the revision """ query_params = _snapshot_context_query_params(snapshot_context) return reverse('browse-revision', url_args={'sha1_git': revision_id}, query_params=query_params) def gen_revision_link(revision_id, shorten_id=False, snapshot_context=None, link_text='Browse', link_attrs={'class': 'btn btn-default btn-sm', 'role': 'button'}): """ Utility function for generating a link to a revision HTML view to insert in Django templates. Args: revision_id (str): a revision id shorten_id (boolean): whether to shorten the revision id to 7 characters for the link text snapshot_context (dict): if provided, generate snapshot-dependent browsing link link_text (str): optional text for the generated link (the revision id will be used by default) link_attrs (dict): optional attributes (e.g. class) to add to the link Returns: str: An HTML link in the form 'revision_id' """ if not revision_id: return None revision_url = gen_revision_url(revision_id, snapshot_context) if shorten_id: return gen_link(revision_url, revision_id[:7], link_attrs) else: if not link_text: link_text = revision_id return gen_link(revision_url, link_text, link_attrs) def gen_directory_link(sha1_git, snapshot_context=None, link_text='Browse', link_attrs={'class': 'btn btn-default btn-sm', 'role': 'button'}): """ Utility function for generating a link to a directory HTML view to insert in Django templates. Args: sha1_git (str): directory identifier link_text (str): optional text for the generated link (the directory id will be used by default) link_attrs (dict): optional attributes (e.g. class) to add to the link Returns: An HTML link in the form 'link_text' """ if not sha1_git: return None query_params = _snapshot_context_query_params(snapshot_context) directory_url = reverse('browse-directory', url_args={'sha1_git': sha1_git}, query_params=query_params) if not link_text: link_text = sha1_git return gen_link(directory_url, link_text, link_attrs) def gen_snapshot_link(snapshot_id, snapshot_context=None, link_text='Browse', link_attrs={'class': 'btn btn-default btn-sm', 'role': 'button'}): """ Utility function for generating a link to a snapshot HTML view to insert in Django templates. Args: snapshot_id (str): snapshot identifier link_text (str): optional text for the generated link (the snapshot id will be used by default) link_attrs (dict): optional attributes (e.g. class) to add to the link Returns: An HTML link in the form 'link_text' """ query_params = _snapshot_context_query_params(snapshot_context) snapshot_url = reverse('browse-snapshot', url_args={'snapshot_id': snapshot_id}, query_params=query_params) if not link_text: link_text = snapshot_id return gen_link(snapshot_url, link_text, link_attrs) def gen_content_link(sha1_git, snapshot_context=None, link_text='Browse', link_attrs={'class': 'btn btn-default btn-sm', 'role': 'button'}): """ Utility function for generating a link to a content HTML view to insert in Django templates. Args: sha1_git (str): content identifier link_text (str): optional text for the generated link (the content sha1_git will be used by default) link_attrs (dict): optional attributes (e.g. class) to add to the link Returns: An HTML link in the form 'link_text' """ if not sha1_git: return None query_params = _snapshot_context_query_params(snapshot_context) content_url = reverse('browse-content', url_args={'query_string': 'sha1_git:' + sha1_git}, query_params=query_params) if not link_text: link_text = sha1_git return gen_link(content_url, link_text, link_attrs) def get_revision_log_url(revision_id, snapshot_context=None): """ Utility function for getting the URL for a revision log HTML view (possibly in the context of an origin). Args: revision_id (str): revision identifier the history heads to snapshot_context (dict): if provided, generate snapshot-dependent browsing link Returns: The revision log view URL """ query_params = {'revision': revision_id} if snapshot_context and snapshot_context['origin_info']: origin_info = snapshot_context['origin_info'] url_args = {'origin_url': origin_info['url']} if 'timestamp' in snapshot_context['url_args']: url_args['timestamp'] = \ snapshot_context['url_args']['timestamp'] if 'visit_id' in snapshot_context['query_params']: query_params['visit_id'] = \ snapshot_context['query_params']['visit_id'] revision_log_url = reverse('browse-origin-log', url_args=url_args, query_params=query_params) elif snapshot_context: url_args = {'snapshot_id': snapshot_context['snapshot_id']} revision_log_url = reverse('browse-snapshot-log', url_args=url_args, query_params=query_params) else: revision_log_url = reverse('browse-revision-log', url_args={'sha1_git': revision_id}) return revision_log_url def gen_revision_log_link(revision_id, snapshot_context=None, link_text='Browse', link_attrs={'class': 'btn btn-default btn-sm', 'role': 'button'}): """ Utility function for generating a link to a revision log HTML view (possibly in the context of an origin) to insert in Django templates. Args: revision_id (str): revision identifier the history heads to snapshot_context (dict): if provided, generate snapshot-dependent browsing link link_text (str): optional text to use for the generated link (the revision id will be used by default) link_attrs (dict): optional attributes (e.g. class) to add to the link Returns: An HTML link in the form 'link_text' """ if not revision_id: return None revision_log_url = get_revision_log_url(revision_id, snapshot_context) if not link_text: link_text = revision_id return gen_link(revision_log_url, link_text, link_attrs) def gen_release_link(sha1_git, snapshot_context=None, link_text='Browse', link_attrs={'class': 'btn btn-default btn-sm', 'role': 'button'}): """ Utility function for generating a link to a release HTML view to insert in Django templates. Args: sha1_git (str): release identifier link_text (str): optional text for the generated link (the release id will be used by default) link_attrs (dict): optional attributes (e.g. class) to add to the link Returns: An HTML link in the form 'link_text' """ query_params = _snapshot_context_query_params(snapshot_context) release_url = reverse('browse-release', url_args={'sha1_git': sha1_git}, query_params=query_params) if not link_text: link_text = sha1_git return gen_link(release_url, link_text, link_attrs) def format_log_entries(revision_log, per_page, snapshot_context=None): """ Utility functions that process raw revision log data for HTML display. Its purpose is to: * add links to relevant browse views * format date in human readable format * truncate the message log Args: revision_log (list): raw revision log as returned by the swh-web api per_page (int): number of log entries per page snapshot_context (dict): if provided, generate snapshot-dependent browsing link """ revision_log_data = [] for i, rev in enumerate(revision_log): if i == per_page: break author_name = 'None' author_fullname = 'None' committer_fullname = 'None' if rev['author']: author_name = rev['author']['name'] or rev['author']['fullname'] author_fullname = rev['author']['fullname'] if rev['committer']: committer_fullname = rev['committer']['fullname'] author_date = format_utc_iso_date(rev['date']) committer_date = format_utc_iso_date(rev['committer_date']) tooltip = 'revision %s\n' % rev['id'] tooltip += 'author: %s\n' % author_fullname tooltip += 'author date: %s\n' % author_date tooltip += 'committer: %s\n' % committer_fullname tooltip += 'committer date: %s\n\n' % committer_date if rev['message']: tooltip += textwrap.indent(rev['message'], ' '*4) revision_log_data.append({ 'author': author_name, 'id': rev['id'][:7], 'message': rev['message'], 'date': author_date, 'commit_date': committer_date, 'url': gen_revision_url(rev['id'], snapshot_context), 'tooltip': tooltip }) return revision_log_data # list of origin types that can be found in the swh archive # TODO: retrieve it dynamically in an efficient way instead # of hardcoding it _swh_origin_types = ['git', 'svn', 'deb', 'hg', 'ftp', 'deposit', 'pypi', 'npm'] def get_origin_info(origin_url, origin_type=None): """ Get info about a software origin. Its main purpose is to automatically find an origin type when it is not provided as parameter. Args: origin_url (str): complete url of a software origin origin_type (str): optional origin type Returns: A dict with the following entries: * type: the origin type * url: the origin url * id: the internal id of the origin """ if origin_type: return service.lookup_origin({'type': origin_type, 'url': origin_url}) else: for origin_type in _swh_origin_types: try: origin_info = service.lookup_origin({'type': origin_type, 'url': origin_url}) return origin_info except Exception: pass raise NotFoundExc('Origin with url %s not found!' % escape(origin_url)) def get_snapshot_context(snapshot_id=None, origin_type=None, origin_url=None, timestamp=None, visit_id=None): """ Utility function to compute relevant information when navigating the archive in a snapshot context. The snapshot is either referenced by its id or it will be retrieved from an origin visit. Args: snapshot_id (str): hexadecimal representation of a snapshot identifier, all other parameters will be ignored if it is provided origin_type (str): the origin type (git, svn, deposit, ...) origin_url (str): the origin_url (e.g. https://github.com/(user)/(repo)/) timestamp (str): a datetime string for retrieving the closest visit of the origin visit_id (int): optional visit id for disambiguation in case of several visits with the same timestamp Returns: A dict with the following entries: * origin_info: dict containing origin information * visit_info: dict containing visit information * branches: the list of branches for the origin found during the visit * releases: the list of releases for the origin found during the visit * origin_browse_url: the url to browse the origin * origin_branches_url: the url to browse the origin branches * origin_releases_url': the url to browse the origin releases * origin_visit_url: the url to browse the snapshot of the origin found during the visit * url_args: dict containing url arguments to use when browsing in the context of the origin and its visit Raises: NotFoundExc: if no snapshot is found for the visit of an origin. """ origin_info = None visit_info = None url_args = None query_params = {} branches = [] releases = [] browse_url = None visit_url = None branches_url = None releases_url = None swh_type = 'snapshot' if origin_url: swh_type = 'origin' origin_info = get_origin_info(origin_url, origin_type) visit_info = get_origin_visit(origin_info, timestamp, visit_id, snapshot_id) fmt_date = format_utc_iso_date(visit_info['date']) visit_info['fmt_date'] = fmt_date snapshot_id = visit_info['snapshot'] if not snapshot_id: raise NotFoundExc('No snapshot associated to the visit of origin ' '%s on %s' % (escape(origin_url), fmt_date)) # provided timestamp is not necessarily equals to the one # of the retrieved visit, so get the exact one in order # use it in the urls generated below if timestamp: timestamp = visit_info['date'] branches, releases = \ get_origin_visit_snapshot(origin_info, timestamp, visit_id, snapshot_id) url_args = {'origin_type': origin_type, 'origin_url': origin_info['url']} query_params = {'visit_id': visit_id} browse_url = reverse('browse-origin-visits', url_args=url_args) if timestamp: url_args['timestamp'] = format_utc_iso_date(timestamp, '%Y-%m-%dT%H:%M:%S') visit_url = reverse('browse-origin-directory', url_args=url_args, query_params=query_params) visit_info['url'] = visit_url branches_url = reverse('browse-origin-branches', url_args=url_args, query_params=query_params) releases_url = reverse('browse-origin-releases', url_args=url_args, query_params=query_params) elif snapshot_id: branches, releases = get_snapshot_content(snapshot_id) url_args = {'snapshot_id': snapshot_id} browse_url = reverse('browse-snapshot', url_args=url_args) branches_url = reverse('browse-snapshot-branches', url_args=url_args) releases_url = reverse('browse-snapshot-releases', url_args=url_args) releases = list(reversed(releases)) snapshot_size = service.lookup_snapshot_size(snapshot_id) is_empty = sum(snapshot_size.values()) == 0 swh_snp_id = persistent_identifier('snapshot', snapshot_id) return { 'swh_type': swh_type, 'swh_object_id': swh_snp_id, 'snapshot_id': snapshot_id, 'snapshot_size': snapshot_size, 'is_empty': is_empty, 'origin_info': origin_info, # keep track if the origin type was provided as url argument 'origin_type': origin_type, 'visit_info': visit_info, 'branches': branches, 'releases': releases, 'branch': None, 'release': None, 'browse_url': browse_url, 'branches_url': branches_url, 'releases_url': releases_url, 'url_args': url_args, 'query_params': query_params } # list of common readme names ordered by preference # (lower indices have higher priority) _common_readme_names = [ "readme.markdown", "readme.md", "readme.rst", "readme.txt", "readme" ] def get_readme_to_display(readmes): """ Process a list of readme files found in a directory in order to find the adequate one to display. Args: readmes: a list of dict where keys are readme file names and values are readme sha1s Returns: A tuple (readme_name, readme_sha1) """ readme_name = None readme_url = None readme_sha1 = None readme_html = None lc_readmes = {k.lower(): {'orig_name': k, 'sha1': v} for k, v in readmes.items()} # look for readme names according to the preference order # defined by the _common_readme_names list for common_readme_name in _common_readme_names: if common_readme_name in lc_readmes: readme_name = lc_readmes[common_readme_name]['orig_name'] readme_sha1 = lc_readmes[common_readme_name]['sha1'] readme_url = reverse('browse-content-raw', url_args={'query_string': readme_sha1}, query_params={'re_encode': 'true'}) break # otherwise pick the first readme like file if any if not readme_name and len(readmes.items()) > 0: readme_name = next(iter(readmes)) readme_sha1 = readmes[readme_name] readme_url = reverse('browse-content-raw', url_args={'query_string': readme_sha1}, query_params={'re_encode': 'true'}) # convert rst README to html server side as there is # no viable solution to perform that task client side if readme_name and readme_name.endswith('.rst'): cache_entry_id = 'readme_%s' % readme_sha1 cache_entry = cache.get(cache_entry_id) if cache_entry: readme_html = cache_entry else: try: rst_doc = request_content(readme_sha1) readme_html = pypandoc.convert_text(rst_doc['raw_data'], 'html', format='rst') cache.set(cache_entry_id, readme_html) except Exception: readme_html = 'Readme bytes are not available' return readme_name, readme_url, readme_html def get_swh_persistent_ids(swh_objects, snapshot_context=None): """ Returns a list of dict containing info related to persistent identifiers of swh objects. Args: swh_objects (list): a list of dict with the following keys: * type: swh object type - (content/directory/release/revision/snapshot) + (content/directory/release/revision/snapshot) * id: swh object id snapshot_context (dict): optional parameter describing the snapshot in which the object has been found Returns: list: a list of dict with the following keys: * object_type: the swh object type - (content/directory/release/revision/snapshot) + (content/directory/release/revision/snapshot) * object_icon: the swh object icon to use in HTML views * swh_id: the computed swh object persistent identifier * swh_id_url: the url resolving the persistent identifier * show_options: boolean indicating if the persistent id options must be displayed in persistent ids HTML view """ swh_ids = [] for swh_object in swh_objects: if not swh_object['id']: continue swh_id = get_swh_persistent_id(swh_object['type'], swh_object['id']) show_options = swh_object['type'] == 'content' or \ (snapshot_context and snapshot_context['origin_info'] is not None) object_icon = swh_object_icons[swh_object['type']] swh_ids.append({ 'object_type': swh_object['type'], 'object_icon': object_icon, 'swh_id': swh_id, 'swh_id_url': reverse('browse-swh-id', url_args={'swh_id': swh_id}), 'show_options': show_options }) return swh_ids diff --git a/swh/web/browse/views/content.py b/swh/web/browse/views/content.py index b8af32b8..f90550b0 100644 --- a/swh/web/browse/views/content.py +++ b/swh/web/browse/views/content.py @@ -1,331 +1,331 @@ # Copyright (C) 2017-2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information import difflib import json from distutils.util import strtobool from django.http import HttpResponse from django.shortcuts import render from django.template.defaultfilters import filesizeformat from swh.model.hashutil import hash_to_hex from swh.web.common import query, service, highlightjs from swh.web.common.utils import ( reverse, gen_path_info, swh_object_icons ) from swh.web.common.exc import NotFoundExc, handle_view_exception from swh.web.browse.utils import ( request_content, prepare_content_for_display, content_display_max_size, get_snapshot_context, get_swh_persistent_ids, gen_link, gen_directory_link ) from swh.web.browse.browseurls import browse_route @browse_route(r'content/(?P[0-9a-z_:]*[0-9a-f]+.)/raw/', view_name='browse-content-raw', checksum_args=['query_string']) def content_raw(request, query_string): """Django view that produces a raw display of a content identified by its hash value. The url that points to it is - :http:get:`/browse/content/[(algo_hash):](hash)/raw/` + :http:get:`/browse/content/[(algo_hash):](hash)/raw/` """ try: re_encode = bool(strtobool(request.GET.get('re_encode', 'false'))) algo, checksum = query.parse_hash(query_string) checksum = hash_to_hex(checksum) content_data = request_content(query_string, max_size=None, re_encode=re_encode) except Exception as exc: return handle_view_exception(request, exc) filename = request.GET.get('filename', None) if not filename: filename = '%s_%s' % (algo, checksum) if content_data['mimetype'].startswith('text/') or \ content_data['mimetype'] == 'inode/x-empty': response = HttpResponse(content_data['raw_data'], content_type="text/plain") response['Content-disposition'] = 'filename=%s' % filename else: response = HttpResponse(content_data['raw_data'], content_type='application/octet-stream') response['Content-disposition'] = 'attachment; filename=%s' % filename return response _auto_diff_size_limit = 20000 @browse_route(r'content/(?P.*)/diff/(?P.*)', # noqa view_name='diff-contents') def _contents_diff(request, from_query_string, to_query_string): """ Browse endpoint used to compute unified diffs between two contents. Diffs are generated only if the two contents are textual. By default, diffs whose size are greater than 20 kB will not be generated. To force the generation of large diffs, the 'force' boolean query parameter must be used. Args: request: input django http request from_query_string: a string of the form "[ALGO_HASH:]HASH" where optional ALGO_HASH can be either ``sha1``, ``sha1_git``, ``sha256``, or ``blake2s256`` (default to ``sha1``) and HASH the hexadecimal representation of the hash value identifying the first content to_query_string: same as above for identifying the second content Returns: A JSON object containing the unified diff. """ diff_data = {} content_from = None content_to = None content_from_size = 0 content_to_size = 0 content_from_lines = [] content_to_lines = [] force = request.GET.get('force', 'false') path = request.GET.get('path', None) language = 'nohighlight' force = bool(strtobool(force)) if from_query_string == to_query_string: diff_str = 'File renamed without changes' else: try: text_diff = True if from_query_string: content_from = \ request_content(from_query_string, max_size=None) content_from_display_data = \ prepare_content_for_display(content_from['raw_data'], content_from['mimetype'], path) language = content_from_display_data['language'] content_from_size = content_from['length'] if not (content_from['mimetype'].startswith('text/') or content_from['mimetype'] == 'inode/x-empty'): text_diff = False if text_diff and to_query_string: content_to = request_content(to_query_string, max_size=None) content_to_display_data = prepare_content_for_display( content_to['raw_data'], content_to['mimetype'], path) language = content_to_display_data['language'] content_to_size = content_to['length'] if not (content_to['mimetype'].startswith('text/') or content_to['mimetype'] == 'inode/x-empty'): text_diff = False diff_size = abs(content_to_size - content_from_size) if not text_diff: diff_str = 'Diffs are not generated for non textual content' language = 'nohighlight' elif not force and diff_size > _auto_diff_size_limit: diff_str = 'Large diffs are not automatically computed' language = 'nohighlight' else: if content_from: content_from_lines = \ content_from['raw_data'].decode('utf-8')\ .splitlines(True) if content_from_lines and \ content_from_lines[-1][-1] != '\n': content_from_lines[-1] += '[swh-no-nl-marker]\n' if content_to: content_to_lines = content_to['raw_data'].decode('utf-8')\ .splitlines(True) if content_to_lines and content_to_lines[-1][-1] != '\n': content_to_lines[-1] += '[swh-no-nl-marker]\n' diff_lines = difflib.unified_diff(content_from_lines, content_to_lines) diff_str = ''.join(list(diff_lines)[2:]) except Exception as e: diff_str = str(e) diff_data['diff_str'] = diff_str diff_data['language'] = language diff_data_json = json.dumps(diff_data, separators=(',', ': ')) return HttpResponse(diff_data_json, content_type='application/json') @browse_route(r'content/(?P[0-9a-z_:]*[0-9a-f]+.)/', view_name='browse-content', checksum_args=['query_string']) def content_display(request, query_string): """Django view that produces an HTML display of a content identified by its hash value. The url that points to it is - :http:get:`/browse/content/[(algo_hash):](hash)/` + :http:get:`/browse/content/[(algo_hash):](hash)/` """ try: algo, checksum = query.parse_hash(query_string) checksum = hash_to_hex(checksum) content_data = request_content(query_string, raise_if_unavailable=False) origin_type = request.GET.get('origin_type', None) origin_url = request.GET.get('origin_url', None) selected_language = request.GET.get('language', None) if not origin_url: origin_url = request.GET.get('origin', None) snapshot_context = None if origin_url: try: snapshot_context = get_snapshot_context(None, origin_type, origin_url) except Exception: raw_cnt_url = reverse('browse-content', url_args={'query_string': query_string}) error_message = \ ('The Software Heritage archive has a content ' 'with the hash you provided but the origin ' 'mentioned in your request appears broken: %s. ' 'Please check the URL and try again.\n\n' 'Nevertheless, you can still browse the content ' 'without origin information: %s' % (gen_link(origin_url), gen_link(raw_cnt_url))) raise NotFoundExc(error_message) if snapshot_context: snapshot_context['visit_info'] = None except Exception as exc: return handle_view_exception(request, exc) path = request.GET.get('path', None) content = None language = None mimetype = None if content_data['raw_data'] is not None: content_display_data = prepare_content_for_display( content_data['raw_data'], content_data['mimetype'], path) content = content_display_data['content_data'] language = content_display_data['language'] mimetype = content_display_data['mimetype'] # Override language with user-selected language if selected_language is not None: language = selected_language available_languages = None if mimetype and 'text/' in mimetype: available_languages = highlightjs.get_supported_languages() root_dir = None filename = None path_info = None directory_id = None directory_url = None query_params = {'origin': origin_url} breadcrumbs = [] if path: split_path = path.split('/') root_dir = split_path[0] filename = split_path[-1] if root_dir != path: path = path.replace(root_dir + '/', '') path = path[:-len(filename)] path_info = gen_path_info(path) dir_url = reverse('browse-directory', url_args={'sha1_git': root_dir}, query_params=query_params) breadcrumbs.append({'name': root_dir[:7], 'url': dir_url}) for pi in path_info: dir_url = reverse('browse-directory', url_args={'sha1_git': root_dir, 'path': pi['path']}, query_params=query_params) breadcrumbs.append({'name': pi['name'], 'url': dir_url}) breadcrumbs.append({'name': filename, 'url': None}) if path and root_dir != path: dir_info = service.lookup_directory_with_path(root_dir, path) directory_id = dir_info['target'] elif root_dir != path: directory_id = root_dir if directory_id: directory_url = gen_directory_link(directory_id) query_params = {'filename': filename} content_raw_url = reverse('browse-content-raw', url_args={'query_string': query_string}, query_params=query_params) content_metadata = { 'sha1': content_data['checksums']['sha1'], 'sha1_git': content_data['checksums']['sha1_git'], 'sha256': content_data['checksums']['sha256'], 'blake2s256': content_data['checksums']['blake2s256'], 'mimetype': content_data['mimetype'], 'encoding': content_data['encoding'], 'size': filesizeformat(content_data['length']), 'language': content_data['language'], 'licenses': content_data['licenses'], 'filename': filename, 'directory': directory_id, 'context-independent directory': directory_url } if filename: content_metadata['filename'] = filename sha1_git = content_data['checksums']['sha1_git'] swh_ids = get_swh_persistent_ids([{'type': 'content', 'id': sha1_git}]) heading = 'Content - %s' % sha1_git if breadcrumbs: content_path = '/'.join([bc['name'] for bc in breadcrumbs]) heading += ' - %s' % content_path return render(request, 'browse/content.html', {'heading': heading, 'swh_object_id': swh_ids[0]['swh_id'], 'swh_object_name': 'Content', 'swh_object_metadata': content_metadata, 'content': content, 'content_size': content_data['length'], 'max_content_size': content_display_max_size, 'mimetype': mimetype, 'language': language, 'available_languages': available_languages, 'breadcrumbs': breadcrumbs, 'top_right_link': { 'url': content_raw_url, 'icon': swh_object_icons['content'], 'text': 'Raw File' }, 'snapshot_context': snapshot_context, 'vault_cooking': None, 'show_actions_menu': True, 'swh_ids': swh_ids, 'error_code': content_data['error_code'], 'error_message': content_data['error_message'], 'error_description': content_data['error_description']}, status=content_data['error_code']) diff --git a/swh/web/browse/views/directory.py b/swh/web/browse/views/directory.py index 7561ba39..8bc37d2a 100644 --- a/swh/web/browse/views/directory.py +++ b/swh/web/browse/views/directory.py @@ -1,178 +1,178 @@ # Copyright (C) 2017-2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information import os from django.http import HttpResponse from django.shortcuts import render, redirect from django.template.defaultfilters import filesizeformat from swh.web.common import service from swh.web.common.utils import ( reverse, gen_path_info ) from swh.web.common.exc import handle_view_exception, NotFoundExc from swh.web.browse.utils import ( get_directory_entries, get_snapshot_context, get_readme_to_display, get_swh_persistent_ids, gen_link ) from swh.web.browse.browseurls import browse_route @browse_route(r'directory/(?P[0-9a-f]+)/', r'directory/(?P[0-9a-f]+)/(?P.+)/', view_name='browse-directory', checksum_args=['sha1_git']) def directory_browse(request, sha1_git, path=None): """Django view for browsing the content of a directory identified by its sha1_git value. The url that points to it is - :http:get:`/browse/directory/(sha1_git)/[(path)/]` + :http:get:`/browse/directory/(sha1_git)/[(path)/]` """ root_sha1_git = sha1_git try: if path: dir_info = service.lookup_directory_with_path(sha1_git, path) sha1_git = dir_info['target'] dirs, files = get_directory_entries(sha1_git) origin_type = request.GET.get('origin_type', None) origin_url = request.GET.get('origin_url', None) if not origin_url: origin_url = request.GET.get('origin', None) snapshot_context = None if origin_url: try: snapshot_context = get_snapshot_context(None, origin_type, origin_url) except Exception: raw_dir_url = reverse('browse-directory', url_args={'sha1_git': sha1_git}) error_message = \ ('The Software Heritage archive has a directory ' 'with the hash you provided but the origin ' 'mentioned in your request appears broken: %s. ' 'Please check the URL and try again.\n\n' 'Nevertheless, you can still browse the directory ' 'without origin information: %s' % (gen_link(origin_url), gen_link(raw_dir_url))) raise NotFoundExc(error_message) if snapshot_context: snapshot_context['visit_info'] = None except Exception as exc: return handle_view_exception(request, exc) path_info = gen_path_info(path) query_params = {'origin': origin_url} breadcrumbs = [] breadcrumbs.append({'name': root_sha1_git[:7], 'url': reverse('browse-directory', url_args={'sha1_git': root_sha1_git}, query_params=query_params)}) for pi in path_info: breadcrumbs.append({'name': pi['name'], 'url': reverse('browse-directory', url_args={'sha1_git': root_sha1_git, 'path': pi['path']}, query_params=query_params)}) path = '' if path is None else (path + '/') for d in dirs: if d['type'] == 'rev': d['url'] = reverse('browse-revision', url_args={'sha1_git': d['target']}, query_params=query_params) else: d['url'] = reverse('browse-directory', url_args={'sha1_git': root_sha1_git, 'path': path + d['name']}, query_params=query_params) sum_file_sizes = 0 readmes = {} for f in files: query_string = 'sha1_git:' + f['target'] f['url'] = reverse('browse-content', url_args={'query_string': query_string}, query_params={'path': root_sha1_git + '/' + path + f['name'], 'origin': origin_url}) if f['length'] is not None: sum_file_sizes += f['length'] f['length'] = filesizeformat(f['length']) if f['name'].lower().startswith('readme'): readmes[f['name']] = f['checksums']['sha1'] readme_name, readme_url, readme_html = get_readme_to_display(readmes) sum_file_sizes = filesizeformat(sum_file_sizes) dir_metadata = {'directory': sha1_git, 'number of regular files': len(files), 'number of subdirectories': len(dirs), 'sum of regular file sizes': sum_file_sizes} vault_cooking = { 'directory_context': True, 'directory_id': sha1_git, 'revision_context': False, 'revision_id': None } swh_ids = get_swh_persistent_ids([{'type': 'directory', 'id': sha1_git}]) heading = 'Directory - %s' % sha1_git if breadcrumbs: dir_path = '/'.join([bc['name'] for bc in breadcrumbs]) + '/' heading += ' - %s' % dir_path return render(request, 'browse/directory.html', {'heading': heading, 'swh_object_id': swh_ids[0]['swh_id'], 'swh_object_name': 'Directory', 'swh_object_metadata': dir_metadata, 'dirs': dirs, 'files': files, 'breadcrumbs': breadcrumbs, 'top_right_link': None, 'readme_name': readme_name, 'readme_url': readme_url, 'readme_html': readme_html, 'snapshot_context': snapshot_context, 'vault_cooking': vault_cooking, 'show_actions_menu': True, 'swh_ids': swh_ids}) @browse_route(r'directory/resolve/content-path/(?P[0-9a-f]+)/(?P.+)/', # noqa view_name='browse-directory-resolve-content-path', checksum_args=['sha1_git']) def _directory_resolve_content_path(request, sha1_git, path): """ Internal endpoint redirecting to data url for a specific file path relative to a root directory. """ try: path = os.path.normpath(path) if not path.startswith('../'): dir_info = service.lookup_directory_with_path(sha1_git, path) if dir_info['type'] == 'file': sha1 = dir_info['checksums']['sha1'] data_url = reverse('browse-content-raw', url_args={'query_string': sha1}) return redirect(data_url) except Exception: pass return HttpResponse(status=404) diff --git a/swh/web/browse/views/origin.py b/swh/web/browse/views/origin.py index 2e5f926e..9dec75ab 100644 --- a/swh/web/browse/views/origin.py +++ b/swh/web/browse/views/origin.py @@ -1,241 +1,241 @@ # Copyright (C) 2017-2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information import json from distutils.util import strtobool from django.http import HttpResponse from django.shortcuts import render, redirect from swh.web.common import service from swh.web.common.origin_visits import get_origin_visits from swh.web.common.utils import ( reverse, format_utc_iso_date, parse_timestamp ) from swh.web.common.exc import handle_view_exception from swh.web.browse.utils import ( get_origin_info, get_snapshot_context ) from swh.web.browse.browseurls import browse_route from .utils.snapshot_context import ( browse_snapshot_directory, browse_snapshot_content, browse_snapshot_log, browse_snapshot_branches, browse_snapshot_releases ) @browse_route(r'origin/(?P[a-z]+)/url/(?P.+)' '/visit/(?P.+)/directory/', r'origin/(?P[a-z]+)/url/(?P.+)' '/visit/(?P.+)/directory/(?P.+)/', r'origin/(?P[a-z]+)/url/(?P.+)' '/directory/', r'origin/(?P[a-z]+)/url/(?P.+)' '/directory/(?P.+)/', r'origin/(?P.+)/visit/(?P.+)/directory/', r'origin/(?P.+)/visit/(?P.+)' '/directory/(?P.+)/', r'origin/(?P.+)/directory/', r'origin/(?P.+)/directory/(?P.+)/', view_name='browse-origin-directory') def origin_directory_browse(request, origin_url, origin_type=None, timestamp=None, path=None): """Django view for browsing the content of a directory associated to an origin for a given visit. The url scheme that points to it is the following: * :http:get:`/browse/origin/[(origin_type)/url/](origin_url)/directory/[(path)/]` * :http:get:`/browse/origin/[(origin_type)/url/](origin_url)/visit/(timestamp)/directory/[(path)/]` """ # noqa return browse_snapshot_directory( request, origin_type=origin_type, origin_url=origin_url, timestamp=timestamp, path=path) @browse_route(r'origin/(?P[a-z]+)/url/(?P.+)' '/visit/(?P.+)/content/(?P.+)/', r'origin/(?P[a-z]+)/url/(?P.+)' '/content/(?P.+)/', r'origin/(?P.+)/visit/(?P.+)' '/content/(?P.+)/', r'origin/(?P.+)/content/(?P.+)/', view_name='browse-origin-content') def origin_content_browse(request, origin_url, origin_type=None, path=None, timestamp=None): """Django view that produces an HTML display of a content associated to an origin for a given visit. The url scheme that points to it is the following: * :http:get:`/browse/origin/[(origin_type)/url/](origin_url)/content/(path)/` * :http:get:`/browse/origin/[(origin_type)/url/](origin_url)/visit/(timestamp)/content/(path)/` """ # noqa language = request.GET.get('language', None) return browse_snapshot_content(request, origin_type=origin_type, origin_url=origin_url, timestamp=timestamp, path=path, selected_language=language) PER_PAGE = 20 @browse_route(r'origin/(?P[a-z]+)/url/(?P.+)' '/visit/(?P.+)/log/', r'origin/(?P[a-z]+)/url/(?P.+)/log/', r'origin/(?P.+)/visit/(?P.+)/log/', r'origin/(?P.+)/log/', view_name='browse-origin-log') def origin_log_browse(request, origin_url, origin_type=None, timestamp=None): """Django view that produces an HTML display of revisions history (aka the commit log) associated to a software origin. The url scheme that points to it is the following: * :http:get:`/browse/origin/[(origin_type)/url/](origin_url)/log/` * :http:get:`/browse/origin/[(origin_type)/url/](origin_url)/visit/(timestamp)/log/` """ # noqa return browse_snapshot_log(request, origin_type=origin_type, origin_url=origin_url, timestamp=timestamp) @browse_route(r'origin/(?P[a-z]+)/url/(?P.+)' '/visit/(?P.+)/branches/', r'origin/(?P[a-z]+)/url/(?P.+)' '/branches/', r'origin/(?P.+)/visit/(?P.+)/branches/', r'origin/(?P.+)/branches/', view_name='browse-origin-branches') def origin_branches_browse(request, origin_url, origin_type=None, timestamp=None): """Django view that produces an HTML display of the list of branches associated to an origin for a given visit. The url scheme that points to it is the following: * :http:get:`/browse/origin/[(origin_type)/url/](origin_url)/branches/` * :http:get:`/browse/origin/[(origin_type)/url/](origin_url)/visit/(timestamp)/branches/` """ # noqa return browse_snapshot_branches(request, origin_type=origin_type, origin_url=origin_url, timestamp=timestamp) @browse_route(r'origin/(?P[a-z]+)/url/(?P.+)' '/visit/(?P.+)/releases/', r'origin/(?P[a-z]+)/url/(?P.+)' '/releases/', r'origin/(?P.+)/visit/(?P.+)/releases/', r'origin/(?P.+)/releases/', view_name='browse-origin-releases') def origin_releases_browse(request, origin_url, origin_type=None, timestamp=None): """Django view that produces an HTML display of the list of releases associated to an origin for a given visit. The url scheme that points to it is the following: * :http:get:`/browse/origin/[(origin_type)/url/](origin_url)/releases/` * :http:get:`/browse/origin/[(origin_type)/url/](origin_url)/visit/(timestamp)/releases/` """ # noqa return browse_snapshot_releases(request, origin_type=origin_type, origin_url=origin_url, timestamp=timestamp) @browse_route(r'origin/(?P[a-z]+)/url/(?P.+)/visits/', r'origin/(?P.+)/visits/', view_name='browse-origin-visits') def origin_visits_browse(request, origin_url, origin_type=None): """Django view that produces an HTML display of visits reporting for a swh origin identified by its id or its url. The url that points to it is - :http:get:`/browse/origin/[(origin_type)/url/](origin_url)/visits/`. + :http:get:`/browse/origin/[(origin_type)/url/](origin_url)/visits/`. """ try: origin_info = get_origin_info(origin_url, origin_type) origin_visits = get_origin_visits(origin_info) snapshot_context = get_snapshot_context(origin_type=origin_type, origin_url=origin_url) except Exception as exc: return handle_view_exception(request, exc) for i, visit in enumerate(origin_visits): url_date = format_utc_iso_date(visit['date'], '%Y-%m-%dT%H:%M:%SZ') visit['fmt_date'] = format_utc_iso_date(visit['date']) query_params = {} if i < len(origin_visits) - 1: if visit['date'] == origin_visits[i+1]['date']: query_params = {'visit_id': visit['visit']} if i > 0: if visit['date'] == origin_visits[i-1]['date']: query_params = {'visit_id': visit['visit']} snapshot = visit['snapshot'] if visit['snapshot'] else '' visit['browse_url'] = reverse('browse-origin-directory', url_args={'origin_type': origin_type, 'origin_url': origin_url, 'timestamp': url_date}, query_params=query_params) if not snapshot: visit['snapshot'] = '' visit['date'] = parse_timestamp(visit['date']).timestamp() heading = 'Origin visits - %s' % origin_url return render(request, 'browse/origin-visits.html', {'heading': heading, 'swh_object_name': 'Visits', 'swh_object_metadata': origin_info, 'origin_visits': origin_visits, 'origin_info': origin_info, 'snapshot_context': snapshot_context, 'vault_cooking': None, 'show_actions_menu': False}) @browse_route(r'origin/search/(?P.+)/', view_name='browse-origin-search') def _origin_search(request, url_pattern): """Internal browse endpoint to search for origins whose urls contain a provided string pattern or match a provided regular expression. The search is performed in a case insensitive way. """ offset = int(request.GET.get('offset', '0')) limit = int(request.GET.get('limit', '50')) regexp = request.GET.get('regexp', 'false') with_visit = request.GET.get('with_visit', 'false') url_pattern = url_pattern.replace('///', '\\') try: results = service.search_origin(url_pattern, offset, limit, bool(strtobool(regexp)), bool(strtobool(with_visit))) results = json.dumps(list(results), sort_keys=True, indent=4, separators=(',', ': ')) except Exception as exc: return handle_view_exception(request, exc, html_response=False) return HttpResponse(results, content_type='application/json') @browse_route(r'origin/(?P[a-z]+)/url/(?P.+)/', r'origin/(?P.+)/', view_name='browse-origin') def origin_browse(request, origin_url, origin_type=None): """Django view that redirects to the display of the latest archived snapshot for a given software origin. """ last_snapshot_url = reverse('browse-origin-directory', url_args={'origin_type': origin_type, 'origin_url': origin_url}) return redirect(last_snapshot_url) diff --git a/swh/web/browse/views/snapshot.py b/swh/web/browse/views/snapshot.py index e40a5096..f42fe399 100644 --- a/swh/web/browse/views/snapshot.py +++ b/swh/web/browse/views/snapshot.py @@ -1,104 +1,104 @@ # Copyright (C) 2018-2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information from django.shortcuts import redirect from swh.web.browse.browseurls import browse_route from swh.web.common.utils import reverse from .utils.snapshot_context import ( browse_snapshot_directory, browse_snapshot_content, browse_snapshot_log, browse_snapshot_branches, browse_snapshot_releases ) @browse_route(r'snapshot/(?P[0-9a-f]+)/', view_name='browse-snapshot', checksum_args=['snapshot_id']) def snapshot_browse(request, snapshot_id): """Django view for browsing the content of a snapshot. The url that points to it is :http:get:`/browse/snapshot/(snapshot_id)/` """ browse_snapshot_url = reverse('browse-snapshot-directory', url_args={'snapshot_id': snapshot_id}, query_params=request.GET) return redirect(browse_snapshot_url) @browse_route(r'snapshot/(?P[0-9a-f]+)/directory/', r'snapshot/(?P[0-9a-f]+)/directory/(?P.+)/', view_name='browse-snapshot-directory', checksum_args=['snapshot_id']) def snapshot_directory_browse(request, snapshot_id, path=None): """Django view for browsing the content of a directory collected in a snapshot. The url that points to it is - :http:get:`/browse/snapshot/(snapshot_id)/directory/[(path)/]` + :http:get:`/browse/snapshot/(snapshot_id)/directory/[(path)/]` """ origin_type = request.GET.get('origin_type', None) origin_url = request.GET.get('origin_url', None) if not origin_url: origin_url = request.GET.get('origin', None) return browse_snapshot_directory(request, snapshot_id=snapshot_id, path=path, origin_type=origin_type, origin_url=origin_url) @browse_route(r'snapshot/(?P[0-9a-f]+)/content/(?P.+)/', view_name='browse-snapshot-content', checksum_args=['snapshot_id']) def snapshot_content_browse(request, snapshot_id, path): """Django view that produces an HTML display of a content collected in a snapshot. The url that points to it is - :http:get:`/browse/snapshot/(snapshot_id)/content/(path)/` + :http:get:`/browse/snapshot/(snapshot_id)/content/(path)/` """ language = request.GET.get('language', None) return browse_snapshot_content(request, snapshot_id=snapshot_id, path=path, selected_language=language) @browse_route(r'snapshot/(?P[0-9a-f]+)/log/', view_name='browse-snapshot-log', checksum_args=['snapshot_id']) def snapshot_log_browse(request, snapshot_id): """Django view that produces an HTML display of revisions history (aka the commit log) collected in a snapshot. The url that points to it is - :http:get:`/browse/snapshot/(snapshot_id)/log/` + :http:get:`/browse/snapshot/(snapshot_id)/log/` """ return browse_snapshot_log(request, snapshot_id=snapshot_id) @browse_route(r'snapshot/(?P[0-9a-f]+)/branches/', view_name='browse-snapshot-branches', checksum_args=['snapshot_id']) def snapshot_branches_browse(request, snapshot_id): """Django view that produces an HTML display of the list of releases collected in a snapshot. The url that points to it is - :http:get:`/browse/snapshot/(snapshot_id)/branches/` + :http:get:`/browse/snapshot/(snapshot_id)/branches/` """ return browse_snapshot_branches(request, snapshot_id=snapshot_id) @browse_route(r'snapshot/(?P[0-9a-f]+)/releases/', view_name='browse-snapshot-releases', checksum_args=['snapshot_id']) def snapshot_releases_browse(request, snapshot_id): """Django view that produces an HTML display of the list of releases collected in a snapshot. The url that points to it is - :http:get:`/browse/snapshot/(snapshot_id)/releases/` + :http:get:`/browse/snapshot/(snapshot_id)/releases/` """ return browse_snapshot_releases(request, snapshot_id=snapshot_id) diff --git a/swh/web/common/service.py b/swh/web/common/service.py index 08f8369c..75df1b1c 100644 --- a/swh/web/common/service.py +++ b/swh/web/common/service.py @@ -1,1109 +1,1122 @@ # Copyright (C) 2015-2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information import os from collections import defaultdict from swh.model import hashutil from swh.storage.algos import diff, revisions_walker from swh.web.common import converters from swh.web.common import query from swh.web.common.exc import NotFoundExc from swh.web.common.origin_visits import get_origin_visit from swh.web import config storage = config.storage() vault = config.vault() idx_storage = config.indexer_storage() MAX_LIMIT = 50 # Top limit the users can ask for def _first_element(l): """Returns the first element in the provided list or None if it is empty or None""" return next(iter(l or []), None) def lookup_multiple_hashes(hashes): """Lookup the passed hashes in a single DB connection, using batch processing. Args: An array of {filename: X, sha1: Y}, string X, hex sha1 string Y. Returns: The same array with elements updated with elem['found'] = true if the hash is present in storage, elem['found'] = false if not. """ hashlist = [hashutil.hash_to_bytes(elem['sha1']) for elem in hashes] content_missing = storage.content_missing_per_sha1(hashlist) missing = [hashutil.hash_to_hex(x) for x in content_missing] for x in hashes: x.update({'found': True}) for h in hashes: if h['sha1'] in missing: h['found'] = False return hashes def lookup_expression(expression, last_sha1, per_page): """Lookup expression in raw content. Args: expression (str): An expression to lookup through raw indexed content last_sha1 (str): Last sha1 seen per_page (int): Number of results per page Yields: ctags whose content match the expression """ limit = min(per_page, MAX_LIMIT) ctags = idx_storage.content_ctags_search(expression, last_sha1=last_sha1, limit=limit) for ctag in ctags: ctag = converters.from_swh(ctag, hashess={'id'}) ctag['sha1'] = ctag['id'] ctag.pop('id') yield ctag def lookup_hash(q): """Checks if the storage contains a given content checksum Args: query string of the form Returns: Dict with key found containing the hash info if the hash is present, None if not. """ algo, hash = query.parse_hash(q) found = _first_element(storage.content_find({algo: hash})) return {'found': converters.from_content(found), 'algo': algo} def search_hash(q): """Checks if the storage contains a given content checksum Args: query string of the form Returns: Dict with key found to True or False, according to whether the checksum is present or not """ algo, hash = query.parse_hash(q) found = _first_element(storage.content_find({algo: hash})) return {'found': found is not None} def _lookup_content_sha1(q): """Given a possible input, query for the content's sha1. Args: q: query string of the form Returns: binary sha1 if found or None """ algo, hash = query.parse_hash(q) if algo != 'sha1': hashes = _first_element(storage.content_find({algo: hash})) if not hashes: return None return hashes['sha1'] return hash def lookup_content_ctags(q): """Return ctags information from a specified content. Args: q: query string of the form Yields: ctags information (dict) list if the content is found. """ sha1 = _lookup_content_sha1(q) if not sha1: return None ctags = list(idx_storage.content_ctags_get([sha1])) if not ctags: return None for ctag in ctags: yield converters.from_swh(ctag, hashess={'id'}) def lookup_content_filetype(q): """Return filetype information from a specified content. Args: q: query string of the form Yields: filetype information (dict) list if the content is found. """ sha1 = _lookup_content_sha1(q) if not sha1: return None filetype = _first_element(list(idx_storage.content_mimetype_get([sha1]))) if not filetype: return None return converters.from_filetype(filetype) def lookup_content_language(q): """Return language information from a specified content. Args: q: query string of the form Yields: language information (dict) list if the content is found. """ sha1 = _lookup_content_sha1(q) if not sha1: return None lang = _first_element(list(idx_storage.content_language_get([sha1]))) if not lang: return None return converters.from_swh(lang, hashess={'id'}) def lookup_content_license(q): """Return license information from a specified content. Args: q: query string of the form Yields: license information (dict) list if the content is found. """ sha1 = _lookup_content_sha1(q) if not sha1: return None lic = _first_element(idx_storage.content_fossology_license_get([sha1])) if not lic: return None return converters.from_swh({'id': sha1, 'facts': lic[sha1]}, hashess={'id'}) def lookup_origin(origin): """Return information about the origin matching dict origin. Args: origin: origin's dict with keys either 'id' or 'url' Returns: origin information as dict. """ origin_info = storage.origin_get(origin) if not origin_info: msg = 'Origin %s not found!' % \ (origin.get('id') or origin['url']) raise NotFoundExc(msg) return converters.from_origin(origin_info) def lookup_origins(origin_from=1, origin_count=100): """Get list of archived software origins in a paginated way. Origins are sorted by id before returning them Args: origin_from (int): The minimum id of the origins to return origin_count (int): The maximum number of origins to return Yields: origins information as dicts """ origins = storage.origin_get_range(origin_from, origin_count) return map(converters.from_origin, origins) def search_origin(url_pattern, offset=0, limit=50, regexp=False, with_visit=False): """Search for origins whose urls contain a provided string pattern or match a provided regular expression. Args: url_pattern: the string pattern to search for in origin urls offset: number of found origins to skip before returning results limit: the maximum number of found origins to return Returns: list of origin information as dict. """ origins = storage.origin_search(url_pattern, offset, limit, regexp, with_visit) return map(converters.from_origin, origins) def search_origin_metadata(fulltext, limit=50): """Search for origins whose metadata match a provided string pattern. Args: fulltext: the string pattern to search for in origin metadata offset: number of found origins to skip before returning results limit: the maximum number of found origins to return Returns: list of origin metadata as dict. """ matches = idx_storage.origin_intrinsic_metadata_search_fulltext( conjunction=[fulltext], limit=limit) results = [] + for match in matches: match['from_revision'] = hashutil.hash_to_hex(match['from_revision']) - result = converters.from_origin( - storage.origin_get({'url': match.pop('origin_url')})) - result['metadata'] = match - results.append(result) + + if match['origin_url']: + origin = storage.origin_get({'url': match['origin_url']}) + else: + # Fallback to origin-id for idx-storage with outdated db + origin = storage.origin_get({'id': match['id']}) + + del match['origin_url'] + if 'id' in match: + del match['id'] + + result = converters.from_origin(origin) + if result: + result['metadata'] = match + results.append(result) + return results def lookup_origin_intrinsic_metadata(origin_dict): """Return intrinsic metadata for origin whose origin matches given origin. Args: origin_dict: origin's dict with keys ('type' AND 'url') Returns: origin metadata. """ origin_info = storage.origin_get(origin_dict) if not origin_info: msg = 'Origin with type %s and url %s not found!' % \ (origin_dict['type'], origin_dict['url']) raise NotFoundExc(msg) origins = [origin_info['url']] match = _first_element( idx_storage.origin_intrinsic_metadata_get(origins)) result = {} if match: result = match['metadata'] return result def _to_sha1_bin(sha1_hex): _, sha1_git_bin = query.parse_hash_with_algorithms_or_throws( sha1_hex, ['sha1'], # HACK: sha1_git really 'Only sha1_git is supported.') return sha1_git_bin def _check_directory_exists(sha1_git, sha1_git_bin): if len(list(storage.directory_missing([sha1_git_bin]))): raise NotFoundExc('Directory with sha1_git %s not found' % sha1_git) def lookup_directory(sha1_git): """Return information about the directory with id sha1_git. Args: sha1_git as string Returns: directory information as dict. """ empty_dir_sha1 = '4b825dc642cb6eb9a060e54bf8d69288fbee4904' if sha1_git == empty_dir_sha1: return [] sha1_git_bin = _to_sha1_bin(sha1_git) _check_directory_exists(sha1_git, sha1_git_bin) directory_entries = storage.directory_ls(sha1_git_bin) return map(converters.from_directory_entry, directory_entries) def lookup_directory_with_path(sha1_git, path_string): """Return directory information for entry with path path_string w.r.t. root directory pointed by directory_sha1_git Args: - directory_sha1_git: sha1_git corresponding to the directory to which we append paths to (hopefully) find the entry - the relative path to the entry starting from the directory pointed by directory_sha1_git Raises: NotFoundExc if the directory entry is not found """ sha1_git_bin = _to_sha1_bin(sha1_git) _check_directory_exists(sha1_git, sha1_git_bin) paths = path_string.strip(os.path.sep).split(os.path.sep) queried_dir = storage.directory_entry_get_by_path( sha1_git_bin, list(map(lambda p: p.encode('utf-8'), paths))) if not queried_dir: raise NotFoundExc(('Directory entry with path %s from %s not found') % (path_string, sha1_git)) return converters.from_directory_entry(queried_dir) def lookup_release(release_sha1_git): """Return information about the release with sha1 release_sha1_git. Args: release_sha1_git: The release's sha1 as hexadecimal Returns: Release information as dict. Raises: ValueError if the identifier provided is not of sha1 nature. """ sha1_git_bin = _to_sha1_bin(release_sha1_git) release = _first_element(storage.release_get([sha1_git_bin])) if not release: raise NotFoundExc('Release with sha1_git %s not found.' % release_sha1_git) return converters.from_release(release) def lookup_release_multiple(sha1_git_list): """Return information about the revisions identified with their sha1_git identifiers. Args: sha1_git_list: A list of revision sha1_git identifiers Returns: Release information as dict. Raises: ValueError if the identifier provided is not of sha1 nature. """ sha1_bin_list = (_to_sha1_bin(sha1_git) for sha1_git in sha1_git_list) releases = storage.release_get(sha1_bin_list) or [] return (converters.from_release(r) for r in releases) def lookup_revision(rev_sha1_git): """Return information about the revision with sha1 revision_sha1_git. Args: revision_sha1_git: The revision's sha1 as hexadecimal Returns: Revision information as dict. Raises: ValueError if the identifier provided is not of sha1 nature. NotFoundExc if there is no revision with the provided sha1_git. """ sha1_git_bin = _to_sha1_bin(rev_sha1_git) revision = _first_element(storage.revision_get([sha1_git_bin])) if not revision: raise NotFoundExc('Revision with sha1_git %s not found.' % rev_sha1_git) return converters.from_revision(revision) def lookup_revision_multiple(sha1_git_list): """Return information about the revisions identified with their sha1_git identifiers. Args: sha1_git_list: A list of revision sha1_git identifiers Returns: Generator of revisions information as dict. Raises: ValueError if the identifier provided is not of sha1 nature. """ sha1_bin_list = (_to_sha1_bin(sha1_git) for sha1_git in sha1_git_list) revisions = storage.revision_get(sha1_bin_list) or [] return (converters.from_revision(r) for r in revisions) def lookup_revision_message(rev_sha1_git): """Return the raw message of the revision with sha1 revision_sha1_git. Args: revision_sha1_git: The revision's sha1 as hexadecimal Returns: Decoded revision message as dict {'message': } Raises: ValueError if the identifier provided is not of sha1 nature. NotFoundExc if the revision is not found, or if it has no message """ sha1_git_bin = _to_sha1_bin(rev_sha1_git) revision = _first_element(storage.revision_get([sha1_git_bin])) if not revision: raise NotFoundExc('Revision with sha1_git %s not found.' % rev_sha1_git) if 'message' not in revision: raise NotFoundExc('No message for revision with sha1_git %s.' % rev_sha1_git) res = {'message': revision['message']} return res def _lookup_revision_id_by(origin, branch_name, timestamp): def _get_snapshot_branch(snapshot, branch_name): snapshot = lookup_snapshot(visit['snapshot'], branches_from=branch_name, branches_count=10) branch = None if branch_name in snapshot['branches']: branch = snapshot['branches'][branch_name] return branch if isinstance(origin, int): origin = {'id': origin} elif isinstance(origin, str): origin = {'url': origin} else: raise TypeError('"origin" must be an int or a string.') visit = get_origin_visit(origin, visit_ts=timestamp) branch = _get_snapshot_branch(visit['snapshot'], branch_name) rev_id = None if branch and branch['target_type'] == 'revision': rev_id = branch['target'] elif branch and branch['target_type'] == 'alias': branch = _get_snapshot_branch(visit['snapshot'], branch['target']) if branch and branch['target_type'] == 'revision': rev_id = branch['target'] if not rev_id: raise NotFoundExc('Revision for origin %s and branch %s not found.' % (origin.get('url') or origin['id'], branch_name)) return rev_id def lookup_revision_by(origin, branch_name='HEAD', timestamp=None): """Lookup revision by origin, snapshot branch name and visit timestamp. If branch_name is not provided, lookup using 'HEAD' as default. If timestamp is not provided, use the most recent. Args: origin (Union[int,str]): origin of the revision branch_name (str): snapshot branch name timestamp (str/int): origin visit time frame Returns: dict: The revision matching the criterions Raises: NotFoundExc if no revision corresponds to the criterion """ rev_id = _lookup_revision_id_by(origin, branch_name, timestamp) return lookup_revision(rev_id) def lookup_revision_log(rev_sha1_git, limit): """Lookup revision log by revision id. Args: rev_sha1_git (str): The revision's sha1 as hexadecimal limit (int): the maximum number of revisions returned Returns: list: Revision log as list of revision dicts Raises: ValueError: if the identifier provided is not of sha1 nature. NotFoundExc: if there is no revision with the provided sha1_git. """ lookup_revision(rev_sha1_git) sha1_git_bin = _to_sha1_bin(rev_sha1_git) revision_entries = storage.revision_log([sha1_git_bin], limit) return map(converters.from_revision, revision_entries) def lookup_revision_log_by(origin, branch_name, timestamp, limit): """Lookup revision by origin, snapshot branch name and visit timestamp. Args: origin (Union[int,str]): origin of the revision branch_name (str): snapshot branch timestamp (str/int): origin visit time frame limit (int): the maximum number of revisions returned Returns: list: Revision log as list of revision dicts Raises: NotFoundExc: if no revision corresponds to the criterion """ rev_id = _lookup_revision_id_by(origin, branch_name, timestamp) return lookup_revision_log(rev_id, limit) def lookup_revision_with_context_by(origin, branch_name, timestamp, sha1_git, limit=100): """Return information about revision sha1_git, limited to the sub-graph of all transitive parents of sha1_git_root. sha1_git_root being resolved through the lookup of a revision by origin, branch_name and ts. In other words, sha1_git is an ancestor of sha1_git_root. Args: - origin: origin of the revision. - branch_name: revision's branch. - timestamp: revision's time frame. - sha1_git: one of sha1_git_root's ancestors. - limit: limit the lookup to 100 revisions back. Returns: Pair of (root_revision, revision). Information on sha1_git if it is an ancestor of sha1_git_root including children leading to sha1_git_root Raises: - BadInputExc in case of unknown algo_hash or bad hash. - NotFoundExc if either revision is not found or if sha1_git is not an ancestor of sha1_git_root. """ rev_root_id = _lookup_revision_id_by(origin, branch_name, timestamp) rev_root_id_bin = hashutil.hash_to_bytes(rev_root_id) rev_root = _first_element(storage.revision_get([rev_root_id_bin])) return (converters.from_revision(rev_root), lookup_revision_with_context(rev_root, sha1_git, limit)) def lookup_revision_with_context(sha1_git_root, sha1_git, limit=100): """Return information about revision sha1_git, limited to the sub-graph of all transitive parents of sha1_git_root. In other words, sha1_git is an ancestor of sha1_git_root. Args: sha1_git_root: latest revision. The type is either a sha1 (as an hex string) or a non converted dict. sha1_git: one of sha1_git_root's ancestors limit: limit the lookup to 100 revisions back Returns: Information on sha1_git if it is an ancestor of sha1_git_root including children leading to sha1_git_root Raises: BadInputExc in case of unknown algo_hash or bad hash NotFoundExc if either revision is not found or if sha1_git is not an ancestor of sha1_git_root """ sha1_git_bin = _to_sha1_bin(sha1_git) revision = _first_element(storage.revision_get([sha1_git_bin])) if not revision: raise NotFoundExc('Revision %s not found' % sha1_git) if isinstance(sha1_git_root, str): sha1_git_root_bin = _to_sha1_bin(sha1_git_root) revision_root = _first_element(storage.revision_get([sha1_git_root_bin])) # noqa if not revision_root: raise NotFoundExc('Revision root %s not found' % sha1_git_root) else: sha1_git_root_bin = sha1_git_root['id'] revision_log = storage.revision_log([sha1_git_root_bin], limit) parents = {} children = defaultdict(list) for rev in revision_log: rev_id = rev['id'] parents[rev_id] = [] for parent_id in rev['parents']: parents[rev_id].append(parent_id) children[parent_id].append(rev_id) if revision['id'] not in parents: raise NotFoundExc('Revision %s is not an ancestor of %s' % (sha1_git, sha1_git_root)) revision['children'] = children[revision['id']] return converters.from_revision(revision) def lookup_directory_with_revision(sha1_git, dir_path=None, with_data=False): """Return information on directory pointed by revision with sha1_git. If dir_path is not provided, display top level directory. Otherwise, display the directory pointed by dir_path (if it exists). Args: sha1_git: revision's hash. dir_path: optional directory pointed to by that revision. with_data: boolean that indicates to retrieve the raw data if the path resolves to a content. Default to False (for the api) Returns: Information on the directory pointed to by that revision. Raises: BadInputExc in case of unknown algo_hash or bad hash. NotFoundExc either if the revision is not found or the path referenced does not exist. NotImplementedError in case of dir_path exists but do not reference a type 'dir' or 'file'. """ sha1_git_bin = _to_sha1_bin(sha1_git) revision = _first_element(storage.revision_get([sha1_git_bin])) if not revision: raise NotFoundExc('Revision %s not found' % sha1_git) dir_sha1_git_bin = revision['directory'] if dir_path: paths = dir_path.strip(os.path.sep).split(os.path.sep) entity = storage.directory_entry_get_by_path( dir_sha1_git_bin, list(map(lambda p: p.encode('utf-8'), paths))) if not entity: raise NotFoundExc( "Directory or File '%s' pointed to by revision %s not found" % (dir_path, sha1_git)) else: entity = {'type': 'dir', 'target': dir_sha1_git_bin} if entity['type'] == 'dir': directory_entries = storage.directory_ls(entity['target']) or [] return {'type': 'dir', 'path': '.' if not dir_path else dir_path, 'revision': sha1_git, 'content': list(map(converters.from_directory_entry, directory_entries))} elif entity['type'] == 'file': # content content = _first_element( storage.content_find({'sha1_git': entity['target']})) if not content: raise NotFoundExc('Content not found for revision %s' % sha1_git) if with_data: c = _first_element(storage.content_get([content['sha1']])) content['data'] = c['data'] return {'type': 'file', 'path': '.' if not dir_path else dir_path, 'revision': sha1_git, 'content': converters.from_content(content)} elif entity['type'] == 'rev': # revision revision = next(storage.revision_get([entity['target']])) return {'type': 'rev', 'path': '.' if not dir_path else dir_path, 'revision': sha1_git, 'content': converters.from_revision(revision)} else: raise NotImplementedError('Entity of type %s not implemented.' % entity['type']) def lookup_content(q): """Lookup the content designed by q. Args: q: The release's sha1 as hexadecimal Raises: NotFoundExc if the requested content is not found """ algo, hash = query.parse_hash(q) c = _first_element(storage.content_find({algo: hash})) if not c: raise NotFoundExc('Content with %s checksum equals to %s not found!' % (algo, hashutil.hash_to_hex(hash))) return converters.from_content(c) def lookup_content_raw(q): """Lookup the content defined by q. Args: q: query string of the form Returns: dict with 'sha1' and 'data' keys. data representing its raw data decoded. Raises: NotFoundExc if the requested content is not found or if the content bytes are not available in the storage """ c = lookup_content(q) content_sha1_bytes = hashutil.hash_to_bytes(c['checksums']['sha1']) content = _first_element(storage.content_get([content_sha1_bytes])) if not content: algo, hash = query.parse_hash(q) raise NotFoundExc('Bytes of content with %s checksum equals to %s ' 'are not available!' % (algo, hashutil.hash_to_hex(hash))) return converters.from_content(content) def stat_counters(): """Return the stat counters for Software Heritage Returns: A dict mapping textual labels to integer values. """ return storage.stat_counters() def _lookup_origin_visits(origin_url, last_visit=None, limit=10): """Yields the origin origins' visits. Args: origin_url (str): origin to list visits for last_visit (int): last visit to lookup from limit (int): Number of elements max to display Yields: Dictionaries of origin_visit for that origin """ limit = min(limit, MAX_LIMIT) for visit in storage.origin_visit_get( origin_url, last_visit=last_visit, limit=limit): visit['origin'] = origin_url yield visit def lookup_origin_visits(origin, last_visit=None, per_page=10): """Yields the origin origins' visits. Args: origin: origin to list visits for Yields: Dictionaries of origin_visit for that origin """ visits = _lookup_origin_visits(origin, last_visit=last_visit, limit=per_page) for visit in visits: yield converters.from_origin_visit(visit) def lookup_origin_visit_latest(origin_url, require_snapshot): """Return the origin's latest visit Args: origin_url (str): origin to list visits for require_snapshot (bool): filter out origins without a snapshot Returns: dict: The origin_visit concerned """ visit = storage.origin_visit_get_latest( origin_url, require_snapshot=require_snapshot) if isinstance(visit['origin'], int): # soon-to-be-legacy origin ids visit['origin'] = storage.origin_get({'id': visit['origin']})['url'] return converters.from_origin_visit(visit) def lookup_origin_visit(origin_url, visit_id): """Return information about visit visit_id with origin origin. Args: origin (str): origin concerned by the visit visit_id: the visit identifier to lookup Yields: The dict origin_visit concerned """ visit = storage.origin_visit_get_by(origin_url, visit_id) if not visit: raise NotFoundExc('Origin %s or its visit ' 'with id %s not found!' % (origin_url, visit_id)) visit['origin'] = origin_url return converters.from_origin_visit(visit) def lookup_snapshot_size(snapshot_id): """Count the number of branches in the snapshot with the given id Args: snapshot_id (str): sha1 identifier of the snapshot Returns: dict: A dict whose keys are the target types of branches and values their corresponding amount """ snapshot_id_bin = _to_sha1_bin(snapshot_id) snapshot_size = storage.snapshot_count_branches(snapshot_id_bin) if 'revision' not in snapshot_size: snapshot_size['revision'] = 0 if 'release' not in snapshot_size: snapshot_size['release'] = 0 return snapshot_size def lookup_snapshot(snapshot_id, branches_from='', branches_count=1000, target_types=None): """Return information about a snapshot, aka the list of named branches found during a specific visit of an origin. Args: snapshot_id (str): sha1 identifier of the snapshot branches_from (str): optional parameter used to skip branches whose name is lesser than it before returning them branches_count (int): optional parameter used to restrain the amount of returned branches target_types (list): optional parameter used to filter the target types of branch to return (possible values that can be contained in that list are `'content', 'directory', 'revision', 'release', 'snapshot', 'alias'`) Returns: A dict filled with the snapshot content. """ snapshot_id_bin = _to_sha1_bin(snapshot_id) snapshot = storage.snapshot_get_branches(snapshot_id_bin, branches_from.encode(), branches_count, target_types) if not snapshot: raise NotFoundExc('Snapshot with id %s not found!' % snapshot_id) return converters.from_snapshot(snapshot) def lookup_latest_origin_snapshot(origin, allowed_statuses=None): """Return information about the latest snapshot of an origin. .. warning:: At most 1000 branches contained in the snapshot will be returned for performance reasons. Args: origin: URL or integer identifier of the origin allowed_statuses: list of visit statuses considered to find the latest snapshot for the visit. For instance, ``allowed_statuses=['full']`` will only consider visits that have successfully run to completion. Returns: A dict filled with the snapshot content. """ snapshot = storage.snapshot_get_latest(origin, allowed_statuses) return converters.from_snapshot(snapshot) def lookup_revision_through(revision, limit=100): """Retrieve a revision from the criterion stored in revision dictionary. Args: revision: Dictionary of criterion to lookup the revision with. Here are the supported combination of possible values: - origin_id, branch_name, ts, sha1_git - origin_id, branch_name, ts - origin_url, branch_name, ts, sha1_git - origin_url, branch_name, ts - sha1_git_root, sha1_git - sha1_git Returns: None if the revision is not found or the actual revision. """ if ( 'origin_url' in revision and 'branch_name' in revision and 'ts' in revision and 'sha1_git' in revision): return lookup_revision_with_context_by(revision['origin_url'], revision['branch_name'], revision['ts'], revision['sha1_git'], limit) if ( 'origin_id' in revision and 'branch_name' in revision and 'ts' in revision and 'sha1_git' in revision): return lookup_revision_with_context_by(revision['origin_id'], revision['branch_name'], revision['ts'], revision['sha1_git'], limit) if ( 'origin_url' in revision and 'branch_name' in revision and 'ts' in revision): return lookup_revision_by(revision['origin_url'], revision['branch_name'], revision['ts']) if ( 'origin_id' in revision and 'branch_name' in revision and 'ts' in revision): return lookup_revision_by(revision['origin_id'], revision['branch_name'], revision['ts']) if ( 'sha1_git_root' in revision and 'sha1_git' in revision): return lookup_revision_with_context(revision['sha1_git_root'], revision['sha1_git'], limit) if 'sha1_git' in revision: return lookup_revision(revision['sha1_git']) # this should not happen raise NotImplementedError('Should not happen!') def lookup_directory_through_revision(revision, path=None, limit=100, with_data=False): """Retrieve the directory information from the revision. Args: revision: dictionary of criterion representing a revision to lookup path: directory's path to lookup. limit: optional query parameter to limit the revisions log (default to 100). For now, note that this limit could impede the transitivity conclusion about sha1_git not being an ancestor of. with_data: indicate to retrieve the content's raw data if path resolves to a content. Returns: The directory pointing to by the revision criterions at path. """ rev = lookup_revision_through(revision, limit) if not rev: raise NotFoundExc('Revision with criterion %s not found!' % revision) return (rev['id'], lookup_directory_with_revision(rev['id'], path, with_data)) def vault_cook(obj_type, obj_id, email=None): """Cook a vault bundle. """ return vault.cook(obj_type, obj_id, email=email) def vault_fetch(obj_type, obj_id): """Fetch a vault bundle. """ return vault.fetch(obj_type, obj_id) def vault_progress(obj_type, obj_id): """Get the current progress of a vault bundle. """ return vault.progress(obj_type, obj_id) def diff_revision(rev_id): """Get the list of file changes (insertion / deletion / modification / renaming) for a particular revision. """ rev_sha1_git_bin = _to_sha1_bin(rev_id) changes = diff.diff_revision(storage, rev_sha1_git_bin, track_renaming=True) for change in changes: change['from'] = converters.from_directory_entry(change['from']) change['to'] = converters.from_directory_entry(change['to']) if change['from_path']: change['from_path'] = change['from_path'].decode('utf-8') if change['to_path']: change['to_path'] = change['to_path'].decode('utf-8') return changes class _RevisionsWalkerProxy(object): """ Proxy class wrapping a revisions walker iterator from swh-storage and performing needed conversions. """ def __init__(self, rev_walker_type, rev_start, *args, **kwargs): rev_start_bin = hashutil.hash_to_bytes(rev_start) self.revisions_walker = \ revisions_walker.get_revisions_walker(rev_walker_type, storage, rev_start_bin, *args, **kwargs) def export_state(self): return self.revisions_walker.export_state() def __next__(self): return converters.from_revision(next(self.revisions_walker)) def __iter__(self): return self def get_revisions_walker(rev_walker_type, rev_start, *args, **kwargs): """ Utility function to instantiate a revisions walker of a given type, see :mod:`swh.storage.algos.revisions_walker`. Args: rev_walker_type (str): the type of revisions walker to return, possible values are: ``committer_date``, ``dfs``, ``dfs_post``, ``bfs`` and ``path`` rev_start (str): hexadecimal representation of a revision identifier args (list): position arguments to pass to the revisions walker constructor kwargs (dict): keyword arguments to pass to the revisions walker constructor """ # first check if the provided revision is valid lookup_revision(rev_start) return _RevisionsWalkerProxy(rev_walker_type, rev_start, *args, **kwargs) diff --git a/swh/web/common/utils.py b/swh/web/common/utils.py index f0cbf86e..b6094e37 100644 --- a/swh/web/common/utils.py +++ b/swh/web/common/utils.py @@ -1,338 +1,338 @@ # Copyright (C) 2017-2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information import docutils.parsers.rst import docutils.utils import re from datetime import datetime, timezone from dateutil import parser as date_parser from dateutil import tz from django.urls import reverse as django_reverse from django.http import QueryDict from rest_framework.authentication import SessionAuthentication from swh.model.exceptions import ValidationError from swh.model.identifiers import ( persistent_identifier, parse_persistent_identifier, CONTENT, DIRECTORY, RELEASE, REVISION, SNAPSHOT ) from swh.web.common.exc import BadInputExc swh_object_icons = { 'branch': 'fa fa-code-fork', 'branches': 'fa fa-code-fork', 'content': 'fa fa-file-text', 'directory': 'fa fa-folder', 'person': 'fa fa-user', 'revisions history': 'fa fa-history', 'release': 'fa fa-tag', 'releases': 'fa fa-tag', 'revision': 'octicon-git-commit', 'snapshot': 'fa fa-camera', 'visits': 'fa fa-calendar', } def reverse(viewname, url_args=None, query_params=None, current_app=None, urlconf=None): """An override of django reverse function supporting query parameters. Args: viewname (str): the name of the django view from which to compute a url url_args (dict): dictionary of url arguments indexed by their names query_params (dict): dictionary of query parameters to append to the reversed url current_app (str): the name of the django app tighten to the view urlconf (str): url configuration module Returns: str: the url of the requested view with processed arguments and query parameters """ if url_args: url_args = {k: v for k, v in url_args.items() if v is not None} url = django_reverse(viewname, urlconf=urlconf, kwargs=url_args, current_app=current_app) if query_params: query_params = {k: v for k, v in query_params.items() if v} if query_params and len(query_params) > 0: query_dict = QueryDict('', mutable=True) for k in sorted(query_params.keys()): query_dict[k] = query_params[k] url += ('?' + query_dict.urlencode(safe='/;:')) return url def datetime_to_utc(date): """Returns datetime in UTC without timezone info Args: date (datetime.datetime): input datetime with timezone info Returns: datetime.datetime: datetime in UTC without timezone info """ if date.tzinfo: return date.astimezone(tz.gettz('UTC')).replace(tzinfo=timezone.utc) else: return date def parse_timestamp(timestamp): """Given a time or timestamp (as string), parse the result as UTC datetime. Returns: datetime.datetime: a timezone-aware datetime representing the parsed value or None if the parsing fails. Samples: - 2016-01-12 - 2016-01-12T09:19:12+0100 - Today is January 1, 2047 at 8:21:00AM - 1452591542 """ if not timestamp: return None try: date = date_parser.parse(timestamp, ignoretz=False, fuzzy=True) return datetime_to_utc(date) except Exception: try: return datetime.utcfromtimestamp(float(timestamp)).replace( tzinfo=timezone.utc) except (ValueError, OverflowError) as e: raise BadInputExc(e) def shorten_path(path): """Shorten the given path: for each hash present, only return the first 8 characters followed by an ellipsis""" sha256_re = r'([0-9a-f]{8})[0-9a-z]{56}' sha1_re = r'([0-9a-f]{8})[0-9a-f]{32}' ret = re.sub(sha256_re, r'\1...', path) return re.sub(sha1_re, r'\1...', ret) def format_utc_iso_date(iso_date, fmt='%d %B %Y, %H:%M UTC'): """Turns a string representation of an ISO 8601 date string to UTC and format it into a more human readable one. For instance, from the following input string: '2017-05-04T13:27:13+02:00' the following one is returned: '04 May 2017, 11:27 UTC'. Custom format string may also be provided as parameter Args: iso_date (str): a string representation of an ISO 8601 date fmt (str): optional date formatting string Returns: str: a formatted string representation of the input iso date """ if not iso_date: return iso_date date = parse_timestamp(iso_date) return date.strftime(fmt) def gen_path_info(path): """Function to generate path data navigation for use with a breadcrumb in the swh web ui. For instance, from a path /folder1/folder2/folder3, it returns the following list:: [{'name': 'folder1', 'path': 'folder1'}, {'name': 'folder2', 'path': 'folder1/folder2'}, {'name': 'folder3', 'path': 'folder1/folder2/folder3'}] Args: path: a filesystem path Returns: list: a list of path data for navigation as illustrated above. """ path_info = [] if path: sub_paths = path.strip('/').split('/') path_from_root = '' for p in sub_paths: path_from_root += '/' + p path_info.append({'name': p, 'path': path_from_root.strip('/')}) return path_info def get_swh_persistent_id(object_type, object_id, scheme_version=1): """ Returns the persistent identifier for a swh object based on: * the object type * the object id * the swh identifiers scheme version Args: object_type (str): the swh object type (content/directory/release/revision/snapshot) object_id (str): the swh object id (hexadecimal representation of its hash value) scheme_version (int): the scheme version of the swh persistent identifiers Returns: str: the swh object persistent identifier Raises: BadInputExc: if the provided parameters do not enable to generate a valid identifier """ try: swh_id = persistent_identifier(object_type, object_id, scheme_version) except ValidationError as e: raise BadInputExc('Invalid object (%s) for swh persistent id. %s' % (object_id, e)) else: return swh_id def resolve_swh_persistent_id(swh_id, query_params=None): """ Try to resolve a Software Heritage persistent id into an url for browsing the pointed object. Args: swh_id (str): a Software Heritage persistent identifier query_params (django.http.QueryDict): optional dict filled with query parameters to append to the browse url Returns: dict: a dict with the following keys: * **swh_id_parsed (swh.model.identifiers.PersistentId)**: - the parsed identifier + the parsed identifier * **browse_url (str)**: the url for browsing the pointed object Raises: BadInputExc: if the provided identifier can not be parsed """ try: swh_id_parsed = parse_persistent_identifier(swh_id) object_type = swh_id_parsed.object_type object_id = swh_id_parsed.object_id browse_url = None query_dict = QueryDict('', mutable=True) if query_params and len(query_params) > 0: for k in sorted(query_params.keys()): query_dict[k] = query_params[k] if 'origin' in swh_id_parsed.metadata: query_dict['origin'] = swh_id_parsed.metadata['origin'] if object_type == CONTENT: query_string = 'sha1_git:' + object_id fragment = '' if 'lines' in swh_id_parsed.metadata: lines = swh_id_parsed.metadata['lines'].split('-') fragment += '#L' + lines[0] if len(lines) > 1: fragment += '-L' + lines[1] browse_url = reverse('browse-content', url_args={'query_string': query_string}, query_params=query_dict) + fragment elif object_type == DIRECTORY: browse_url = reverse('browse-directory', url_args={'sha1_git': object_id}, query_params=query_dict) elif object_type == RELEASE: browse_url = reverse('browse-release', url_args={'sha1_git': object_id}, query_params=query_dict) elif object_type == REVISION: browse_url = reverse('browse-revision', url_args={'sha1_git': object_id}, query_params=query_dict) elif object_type == SNAPSHOT: browse_url = reverse('browse-snapshot', url_args={'snapshot_id': object_id}, query_params=query_dict) except ValidationError as ve: raise BadInputExc('Error when parsing identifier. %s' % ' '.join(ve.messages)) else: return {'swh_id_parsed': swh_id_parsed, 'browse_url': browse_url} def parse_rst(text, report_level=2): """ Parse a reStructuredText string with docutils. Args: text (str): string with reStructuredText markups in it report_level (int): level of docutils report messages to print (1 info 2 warning 3 error 4 severe 5 none) Returns: docutils.nodes.document: a parsed docutils document """ parser = docutils.parsers.rst.Parser() components = (docutils.parsers.rst.Parser,) settings = docutils.frontend.OptionParser( components=components).get_default_values() settings.report_level = report_level document = docutils.utils.new_document('rst-doc', settings=settings) parser.parse(text, document) return document def get_client_ip(request): """ Return the client IP address from an incoming HTTP request. Args: request (django.http.HttpRequest): the incoming HTTP request Returns: str: The client IP address """ x_forwarded_for = request.META.get('HTTP_X_FORWARDED_FOR') if x_forwarded_for: ip = x_forwarded_for.split(',')[0] else: ip = request.META.get('REMOTE_ADDR') return ip def context_processor(request): """ Django context processor used to inject variables in all swh-web templates. """ return {'swh_object_icons': swh_object_icons, 'available_languages': None} class EnforceCSRFAuthentication(SessionAuthentication): """ Helper class to enforce CSRF validation on a DRF view when a user is not authenticated. """ def authenticate(self, request): user = getattr(request._request, 'user', None) self.enforce_csrf(request) return (user, None) diff --git a/swh/web/tests/api/views/test_origin.py b/swh/web/tests/api/views/test_origin.py index ddb46a85..a88840da 100644 --- a/swh/web/tests/api/views/test_origin.py +++ b/swh/web/tests/api/views/test_origin.py @@ -1,614 +1,671 @@ # Copyright (C) 2015-2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information import random from hypothesis import given import pytest from rest_framework.test import APITestCase from unittest.mock import patch from swh.storage.exc import StorageDBError, StorageAPIError from swh.web.common.utils import reverse from swh.web.common.origin_visits import get_origin_visits from swh.web.tests.strategies import ( origin, new_origin, new_origins, visit_dates, new_snapshots ) from swh.web.tests.testcase import WebTestCase class OriginApiTestCase(WebTestCase, APITestCase): @patch('swh.web.api.views.origin.get_origin_visits') def test_api_lookup_origin_visits_raise_error( self, mock_get_origin_visits, ): err_msg = 'voluntary error to check the bad request middleware.' mock_get_origin_visits.side_effect = ValueError(err_msg) url = reverse( 'api-1-origin-visits', url_args={'origin_url': 'http://foo'}) rv = self.client.get(url) self.assertEqual(rv.status_code, 400, rv.data) self.assertEqual(rv['Content-Type'], 'application/json') self.assertEqual(rv.data, { 'exception': 'ValueError', 'reason': err_msg}) @patch('swh.web.api.views.origin.get_origin_visits') def test_api_lookup_origin_visits_raise_swh_storage_error_db( self, mock_get_origin_visits): err_msg = 'Storage exploded! Will be back online shortly!' mock_get_origin_visits.side_effect = StorageDBError(err_msg) url = reverse( 'api-1-origin-visits', url_args={'origin_url': 'http://foo'}) rv = self.client.get(url) self.assertEqual(rv.status_code, 503, rv.data) self.assertEqual(rv['Content-Type'], 'application/json') self.assertEqual(rv.data, { 'exception': 'StorageDBError', 'reason': 'An unexpected error occurred in the backend: %s' % err_msg}) @patch('swh.web.api.views.origin.get_origin_visits') def test_api_lookup_origin_visits_raise_swh_storage_error_api( self, mock_get_origin_visits): err_msg = 'Storage API dropped dead! Will resurrect asap!' mock_get_origin_visits.side_effect = StorageAPIError(err_msg) url = reverse( 'api-1-origin-visits', url_args={'origin_url': 'http://foo'}) rv = self.client.get(url) self.assertEqual(rv.status_code, 503, rv.data) self.assertEqual(rv['Content-Type'], 'application/json') self.assertEqual(rv.data, { 'exception': 'StorageAPIError', 'reason': 'An unexpected error occurred in the api backend: %s' % err_msg }) @given(new_origin(), visit_dates(3), new_snapshots(3)) def test_api_lookup_origin_visits(self, new_origin, visit_dates, new_snapshots): origin_id = self.storage.origin_add_one(new_origin) new_origin['id'] = origin_id for i, visit_date in enumerate(visit_dates): origin_visit = self.storage.origin_visit_add(origin_id, visit_date) self.storage.snapshot_add([new_snapshots[i]]) self.storage.origin_visit_update( origin_id, origin_visit['visit'], snapshot=new_snapshots[i]['id']) all_visits = list(reversed(get_origin_visits(new_origin))) for last_visit, expected_visits in ( (None, all_visits[:2]), (all_visits[1]['visit'], all_visits[2:4])): url = reverse('api-1-origin-visits', url_args={'origin_url': new_origin['url']}, query_params={'per_page': 2, 'last_visit': last_visit}) rv = self.client.get(url) self.assertEqual(rv.status_code, 200, rv.data) self.assertEqual(rv['Content-Type'], 'application/json') for expected_visit in expected_visits: origin_visit_url = reverse( 'api-1-origin-visit', url_args={'origin_url': new_origin['url'], 'visit_id': expected_visit['visit']}) snapshot_url = reverse( 'api-1-snapshot', url_args={'snapshot_id': expected_visit['snapshot']}) expected_visit['origin'] = new_origin['url'] expected_visit['origin_visit_url'] = origin_visit_url expected_visit['snapshot_url'] = snapshot_url self.assertEqual(rv.data, expected_visits) @given(new_origin(), visit_dates(3), new_snapshots(3)) def test_api_lookup_origin_visits_by_id(self, new_origin, visit_dates, new_snapshots): origin_id = self.storage.origin_add_one(new_origin) new_origin['id'] = origin_id for i, visit_date in enumerate(visit_dates): origin_visit = self.storage.origin_visit_add(origin_id, visit_date) self.storage.snapshot_add([new_snapshots[i]]) self.storage.origin_visit_update( origin_id, origin_visit['visit'], snapshot=new_snapshots[i]['id']) all_visits = list(reversed(get_origin_visits(new_origin))) for last_visit, expected_visits in ( (None, all_visits[:2]), (all_visits[1]['visit'], all_visits[2:4])): url = reverse('api-1-origin-visits', url_args={'origin_url': new_origin['url']}, query_params={'per_page': 2, 'last_visit': last_visit}) rv = self.client.get(url) self.assertEqual(rv.status_code, 200, rv.data) self.assertEqual(rv['Content-Type'], 'application/json') for expected_visit in expected_visits: origin_visit_url = reverse( 'api-1-origin-visit', url_args={'origin_url': new_origin['url'], 'visit_id': expected_visit['visit']}) snapshot_url = reverse( 'api-1-snapshot', url_args={'snapshot_id': expected_visit['snapshot']}) expected_visit['origin'] = new_origin['url'] expected_visit['origin_visit_url'] = origin_visit_url expected_visit['snapshot_url'] = snapshot_url self.assertEqual(rv.data, expected_visits) @given(new_origin(), visit_dates(3), new_snapshots(3)) def test_api_lookup_origin_visit(self, new_origin, visit_dates, new_snapshots): origin_id = self.storage.origin_add_one(new_origin) new_origin['id'] = origin_id for i, visit_date in enumerate(visit_dates): origin_visit = self.storage.origin_visit_add(origin_id, visit_date) visit_id = origin_visit['visit'] self.storage.snapshot_add([new_snapshots[i]]) self.storage.origin_visit_update( origin_id, origin_visit['visit'], snapshot=new_snapshots[i]['id']) url = reverse('api-1-origin-visit', url_args={'origin_url': new_origin['url'], 'visit_id': visit_id}) rv = self.client.get(url) self.assertEqual(rv.status_code, 200, rv.data) self.assertEqual(rv['Content-Type'], 'application/json') expected_visit = self.origin_visit_get_by(origin_id, visit_id) origin_url = reverse('api-1-origin', url_args={'origin_url': new_origin['url']}) snapshot_url = reverse( 'api-1-snapshot', url_args={'snapshot_id': expected_visit['snapshot']}) expected_visit['origin'] = new_origin['url'] expected_visit['origin_url'] = origin_url expected_visit['snapshot_url'] = snapshot_url self.assertEqual(rv.data, expected_visit) @given(new_origin(), visit_dates(2), new_snapshots(1)) def test_api_lookup_origin_visit_latest( self, new_origin, visit_dates, new_snapshots): origin_id = self.storage.origin_add_one(new_origin) new_origin['id'] = origin_id visit_dates.sort() visit_ids = [] for i, visit_date in enumerate(visit_dates): origin_visit = self.storage.origin_visit_add(origin_id, visit_date) visit_ids.append(origin_visit['visit']) self.storage.snapshot_add([new_snapshots[0]]) self.storage.origin_visit_update( origin_id, visit_ids[0], snapshot=new_snapshots[0]['id']) url = reverse('api-1-origin-visit-latest', url_args={'origin_url': new_origin['url']}) rv = self.client.get(url) self.assertEqual(rv.status_code, 200, rv.data) self.assertEqual(rv['Content-Type'], 'application/json') expected_visit = self.origin_visit_get_by(origin_id, visit_ids[1]) origin_url = reverse('api-1-origin', url_args={'origin_url': new_origin['url']}) expected_visit['origin'] = new_origin['url'] expected_visit['origin_url'] = origin_url expected_visit['snapshot_url'] = None self.assertEqual(rv.data, expected_visit) @given(new_origin(), visit_dates(2), new_snapshots(1)) def test_api_lookup_origin_visit_latest_with_snapshot( self, new_origin, visit_dates, new_snapshots): origin_id = self.storage.origin_add_one(new_origin) new_origin['id'] = origin_id visit_dates.sort() visit_ids = [] for i, visit_date in enumerate(visit_dates): origin_visit = self.storage.origin_visit_add(origin_id, visit_date) visit_ids.append(origin_visit['visit']) self.storage.snapshot_add([new_snapshots[0]]) self.storage.origin_visit_update( origin_id, visit_ids[0], snapshot=new_snapshots[0]['id']) url = reverse('api-1-origin-visit-latest', url_args={'origin_url': new_origin['url']}) url += '?require_snapshot=true' rv = self.client.get(url) self.assertEqual(rv.status_code, 200, rv.data) self.assertEqual(rv['Content-Type'], 'application/json') expected_visit = self.origin_visit_get_by(origin_id, visit_ids[0]) origin_url = reverse('api-1-origin', url_args={'origin_url': new_origin['url']}) snapshot_url = reverse( 'api-1-snapshot', url_args={'snapshot_id': expected_visit['snapshot']}) expected_visit['origin'] = new_origin['url'] expected_visit['origin_url'] = origin_url expected_visit['snapshot_url'] = snapshot_url self.assertEqual(rv.data, expected_visit) @pytest.mark.origin_id @given(new_origin(), visit_dates(3), new_snapshots(3)) def test_api_lookup_origin_visit_by_id(self, new_origin, visit_dates, new_snapshots): origin_id = self.storage.origin_add_one(new_origin) new_origin['id'] = origin_id for i, visit_date in enumerate(visit_dates): origin_visit = self.storage.origin_visit_add(origin_id, visit_date) visit_id = origin_visit['visit'] self.storage.snapshot_add([new_snapshots[i]]) self.storage.origin_visit_update( origin_id, origin_visit['visit'], snapshot=new_snapshots[i]['id']) url = reverse('api-1-origin-visit', url_args={'origin_id': origin_id, 'visit_id': visit_id}) rv = self.client.get(url) self.assertEqual(rv.status_code, 200, rv.data) self.assertEqual(rv['Content-Type'], 'application/json') expected_visit = self.origin_visit_get_by(origin_id, visit_id) origin_url = reverse('api-1-origin', url_args={'origin_url': new_origin['url']}) snapshot_url = reverse( 'api-1-snapshot', url_args={'snapshot_id': expected_visit['snapshot']}) expected_visit['origin'] = new_origin['url'] expected_visit['origin_url'] = origin_url expected_visit['snapshot_url'] = snapshot_url self.assertEqual(rv.data, expected_visit) @given(origin()) def test_api_lookup_origin_visit_not_found(self, origin): all_visits = list(reversed(get_origin_visits(origin))) max_visit_id = max([v['visit'] for v in all_visits]) url = reverse('api-1-origin-visit', url_args={'origin_url': origin['url'], 'visit_id': max_visit_id + 1}) rv = self.client.get(url) self.assertEqual(rv.status_code, 404, rv.data) self.assertEqual(rv['Content-Type'], 'application/json') self.assertEqual(rv.data, { 'exception': 'NotFoundExc', 'reason': 'Origin %s or its visit with id %s not found!' % (origin['url'], max_visit_id+1) }) @pytest.mark.origin_id @given(origin()) def test_api_lookup_origin_visit_not_found_by_id(self, origin): all_visits = list(reversed(get_origin_visits(origin))) max_visit_id = max([v['visit'] for v in all_visits]) url = reverse('api-1-origin-visit', url_args={'origin_id': origin['id'], 'visit_id': max_visit_id + 1}) rv = self.client.get(url) self.assertEqual(rv.status_code, 404, rv.data) self.assertEqual(rv['Content-Type'], 'application/json') self.assertEqual(rv.data, { 'exception': 'NotFoundExc', 'reason': 'Origin %s or its visit with id %s not found!' % (origin['url'], max_visit_id+1) }) @pytest.mark.origin_id @given(origin()) def test_api_origin_by_id(self, origin): url = reverse('api-1-origin', url_args={'origin_id': origin['id']}) rv = self.client.get(url) expected_origin = self.origin_get(origin) origin_visits_url = reverse('api-1-origin-visits', url_args={'origin_url': origin['url']}) expected_origin['origin_visits_url'] = origin_visits_url self.assertEqual(rv.status_code, 200, rv.data) self.assertEqual(rv['Content-Type'], 'application/json') self.assertEqual(rv.data, expected_origin) @given(origin()) def test_api_origin_by_url(self, origin): url = reverse('api-1-origin', url_args={'origin_url': origin['url']}) rv = self.client.get(url) expected_origin = self.origin_get(origin) origin_visits_url = reverse('api-1-origin-visits', url_args={'origin_url': origin['url']}) expected_origin['origin_visits_url'] = origin_visits_url self.assertEqual(rv.status_code, 200, rv.data) self.assertEqual(rv['Content-Type'], 'application/json') self.assertEqual(rv.data, expected_origin) @given(origin()) def test_api_origin_by_type_url(self, origin): url = reverse('api-1-origin', url_args={'origin_type': origin['type'], 'origin_url': origin['url']}) rv = self.client.get(url) expected_origin = self.origin_get(origin) origin_visits_url = reverse('api-1-origin-visits', url_args={'origin_url': origin['url']}) expected_origin['origin_visits_url'] = origin_visits_url self.assertEqual(rv.status_code, 200, rv.data) self.assertEqual(rv['Content-Type'], 'application/json') self.assertEqual(rv.data, expected_origin) @given(new_origin()) def test_api_origin_not_found(self, new_origin): url = reverse('api-1-origin', url_args={'origin_type': new_origin['type'], 'origin_url': new_origin['url']}) rv = self.client.get(url) self.assertEqual(rv.status_code, 404, rv.data) self.assertEqual(rv['Content-Type'], 'application/json') self.assertEqual(rv.data, { 'exception': 'NotFoundExc', 'reason': 'Origin %s not found!' % new_origin['url'] }) @given(origin()) def test_api_origin_metadata_search(self, origin): with patch('swh.web.common.service.idx_storage') as mock_idx_storage: mock_idx_storage.origin_intrinsic_metadata_search_fulltext \ .side_effect = lambda conjunction, limit: [{ 'from_revision': ( b'p&\xb7\xc1\xa2\xafVR\x1e\x95\x1c\x01\xed ' b'\xf2U\xfa\x05B8'), 'metadata': {'author': 'Jane Doe'}, 'origin_url': origin['url'], 'tool': { 'configuration': { 'context': ['NpmMapping', 'CodemetaMapping'], 'type': 'local' }, 'id': 3, 'name': 'swh-metadata-detector', 'version': '0.0.1' } }] url = reverse('api-1-origin-metadata-search', query_params={'fulltext': 'Jane Doe'}) rv = self.client.get(url) self.assertEqual(rv.status_code, 200, rv.content) self.assertEqual(rv['Content-Type'], 'application/json') expected_data = [{ 'type': origin['type'], 'url': origin['url'], 'metadata': { 'metadata': {'author': 'Jane Doe'}, 'from_revision': ( '7026b7c1a2af56521e951c01ed20f255fa054238'), 'tool': { 'configuration': { 'context': ['NpmMapping', 'CodemetaMapping'], 'type': 'local' }, 'id': 3, 'name': 'swh-metadata-detector', 'version': '0.0.1', } } }] actual_data = rv.data for d in actual_data: if 'id' in d: del d['id'] self.assertEqual(rv.data, expected_data) mock_idx_storage.origin_intrinsic_metadata_search_fulltext \ .assert_called_with(conjunction=['Jane Doe'], limit=70) + @pytest.mark.origin_id + @given(origin()) + def test_api_origin_metadata_search_missing_url(self, origin): + """indexer-storage with outdated db will return origin_url: None.""" + with patch('swh.web.common.service.idx_storage') as mock_idx_storage: + mock_idx_storage.origin_intrinsic_metadata_search_fulltext \ + .side_effect = lambda conjunction, limit: [{ + 'id': origin['id'], + 'from_revision': ( + b'p&\xb7\xc1\xa2\xafVR\x1e\x95\x1c\x01\xed ' + b'\xf2U\xfa\x05B8'), + 'metadata': {'author': 'Jane Doe'}, + 'origin_url': None, + 'tool': { + 'configuration': { + 'context': ['NpmMapping', 'CodemetaMapping'], + 'type': 'local' + }, + 'id': 3, + 'name': 'swh-metadata-detector', + 'version': '0.0.1' + } + }] + + url = reverse('api-1-origin-metadata-search', + query_params={'fulltext': 'Jane Doe'}) + rv = self.client.get(url) + + self.assertEqual(rv.status_code, 200, rv.content) + self.assertEqual(rv['Content-Type'], 'application/json') + expected_data = [{ + 'type': origin['type'], + 'url': origin['url'], + 'metadata': { + 'metadata': {'author': 'Jane Doe'}, + 'from_revision': ( + '7026b7c1a2af56521e951c01ed20f255fa054238'), + 'tool': { + 'configuration': { + 'context': ['NpmMapping', 'CodemetaMapping'], + 'type': 'local' + }, + 'id': 3, + 'name': 'swh-metadata-detector', + 'version': '0.0.1', + } + } + }] + actual_data = rv.data + for d in actual_data: + if 'id' in d: + del d['id'] + self.assertEqual(rv.data, expected_data) + mock_idx_storage.origin_intrinsic_metadata_search_fulltext \ + .assert_called_with(conjunction=['Jane Doe'], limit=70) + @given(origin()) def test_api_origin_metadata_search_limit(self, origin): with patch('swh.web.common.service.idx_storage') as mock_idx_storage: mock_idx_storage.origin_intrinsic_metadata_search_fulltext \ .side_effect = lambda conjunction, limit: [{ 'from_revision': ( b'p&\xb7\xc1\xa2\xafVR\x1e\x95\x1c\x01\xed ' b'\xf2U\xfa\x05B8'), 'metadata': {'author': 'Jane Doe'}, 'origin_url': origin['url'], 'tool': { 'configuration': { 'context': ['NpmMapping', 'CodemetaMapping'], 'type': 'local' }, 'id': 3, 'name': 'swh-metadata-detector', 'version': '0.0.1' } }] url = reverse('api-1-origin-metadata-search', query_params={'fulltext': 'Jane Doe'}) rv = self.client.get(url) self.assertEqual(rv.status_code, 200, rv.content) self.assertEqual(rv['Content-Type'], 'application/json') self.assertEqual(len(rv.data), 1) mock_idx_storage.origin_intrinsic_metadata_search_fulltext \ .assert_called_with(conjunction=['Jane Doe'], limit=70) url = reverse('api-1-origin-metadata-search', query_params={'fulltext': 'Jane Doe', 'limit': 10}) rv = self.client.get(url) self.assertEqual(rv.status_code, 200, rv.content) self.assertEqual(rv['Content-Type'], 'application/json') self.assertEqual(len(rv.data), 1) mock_idx_storage.origin_intrinsic_metadata_search_fulltext \ .assert_called_with(conjunction=['Jane Doe'], limit=10) url = reverse('api-1-origin-metadata-search', query_params={'fulltext': 'Jane Doe', 'limit': 987}) rv = self.client.get(url) self.assertEqual(rv.status_code, 200, rv.content) self.assertEqual(rv['Content-Type'], 'application/json') self.assertEqual(len(rv.data), 1) mock_idx_storage.origin_intrinsic_metadata_search_fulltext \ .assert_called_with(conjunction=['Jane Doe'], limit=100) @given(origin()) def test_api_origin_intrinsic_metadata(self, origin): with patch('swh.web.common.service.idx_storage') as mock_idx_storage: mock_idx_storage.origin_intrinsic_metadata_get \ .side_effect = lambda origin_ids: [{ 'from_revision': ( b'p&\xb7\xc1\xa2\xafVR\x1e\x95\x1c\x01\xed ' b'\xf2U\xfa\x05B8'), 'metadata': {'author': 'Jane Doe'}, 'origin_url': origin['url'], 'tool': { 'configuration': { 'context': ['NpmMapping', 'CodemetaMapping'], 'type': 'local' }, 'id': 3, 'name': 'swh-metadata-detector', 'version': '0.0.1' } }] url = reverse('api-origin-intrinsic-metadata', url_args={'origin_type': origin['type'], 'origin_url': origin['url']}) rv = self.client.get(url) mock_idx_storage.origin_intrinsic_metadata_get \ .assert_called_once_with([origin['url']]) self.assertEqual(rv.status_code, 200, rv.content) self.assertEqual(rv['Content-Type'], 'application/json') expected_data = {'author': 'Jane Doe'} self.assertEqual(rv.data, expected_data) @patch('swh.web.common.service.idx_storage') def test_api_origin_metadata_search_invalid(self, mock_idx_storage): url = reverse('api-1-origin-metadata-search') rv = self.client.get(url) self.assertEqual(rv.status_code, 400, rv.content) mock_idx_storage.assert_not_called() @pytest.mark.origin_id @given(new_origins(10)) def test_api_lookup_origins(self, new_origins): nb_origins = len(new_origins) expected_origins = self.storage.origin_add(new_origins) + expected_origins.sort(key=lambda orig: orig['id']) origin_from_idx = random.randint(1, nb_origins-1) - 1 origin_from = expected_origins[origin_from_idx]['id'] max_origin_id = expected_origins[-1]['id'] origin_count = random.randint(1, max_origin_id - origin_from) url = reverse('api-1-origins', query_params={'origin_from': origin_from, 'origin_count': origin_count}) rv = self.client.get(url) self.assertEqual(rv.status_code, 200, rv.data) start = origin_from_idx end = origin_from_idx + origin_count expected_origins = expected_origins[start:end] for expected_origin in expected_origins: expected_origin['origin_visits_url'] = reverse( 'api-1-origin-visits', url_args={'origin_url': expected_origin['url']}) self.assertEqual(rv.data, expected_origins) next_origin_id = expected_origins[-1]['id']+1 if self.storage.origin_get({'id': next_origin_id}): self.assertIn('Link', rv) next_url = reverse('api-1-origins', query_params={'origin_from': next_origin_id, 'origin_count': origin_count}) self.assertIn(next_url, rv['Link']) diff --git a/swh/web/tests/browse/views/test_origin.py b/swh/web/tests/browse/views/test_origin.py index bc66c206..d6b554c0 100644 --- a/swh/web/tests/browse/views/test_origin.py +++ b/swh/web/tests/browse/views/test_origin.py @@ -1,902 +1,904 @@ # Copyright (C) 2017-2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information import random from unittest.mock import patch from django.utils.html import escape from hypothesis import given from swh.model.hashutil import hash_to_bytes from swh.web.browse.utils import process_snapshot_branches from swh.web.common.exc import NotFoundExc from swh.web.common.utils import ( reverse, gen_path_info, format_utc_iso_date, parse_timestamp, get_swh_persistent_id ) from swh.web.tests.data import get_content from swh.web.tests.strategies import ( origin, origin_with_multiple_visits, new_origin, new_snapshot, visit_dates, revisions ) from swh.web.tests.testcase import WebTestCase class SwhBrowseOriginTest(WebTestCase): @given(origin_with_multiple_visits()) def test_origin_visits_browse(self, origin): url = reverse('browse-origin-visits', url_args={'origin_type': origin['type'], 'origin_url': origin['url']}) resp = self.client.get(url) self.assertEqual(resp.status_code, 200) self.assertTemplateUsed('origin-visits.html') url = reverse('browse-origin-visits', url_args={'origin_url': origin['url']}) resp = self.client.get(url) self.assertEqual(resp.status_code, 200) self.assertTemplateUsed('origin-visits.html') visits = self.origin_visit_get(origin['url']) for v in visits: vdate = format_utc_iso_date(v['date'], '%Y-%m-%dT%H:%M:%SZ') browse_dir_url = reverse('browse-origin-directory', url_args={'origin_url': origin['url'], 'timestamp': vdate}) self.assertContains(resp, browse_dir_url) def origin_content_view_helper(self, origin_info, origin_visits, origin_branches, origin_releases, root_dir_sha1, content, visit_id=None, timestamp=None): content_path = '/'.join(content['path'].split('/')[1:]) url_args = {'origin_type': origin_info['type'], 'origin_url': origin_info['url'], 'path': content_path} if not visit_id: visit_id = origin_visits[-1]['visit'] query_params = {} if timestamp: url_args['timestamp'] = timestamp if visit_id: query_params['visit_id'] = visit_id url = reverse('browse-origin-content', url_args=url_args, query_params=query_params) resp = self.client.get(url) self.assertEqual(resp.status_code, 200) self.assertTemplateUsed('content.html') self.assertContains(resp, '' % content['hljs_language']) self.assertContains(resp, escape(content['data'])) split_path = content_path.split('/') filename = split_path[-1] path = content_path.replace(filename, '')[:-1] path_info = gen_path_info(path) del url_args['path'] if timestamp: url_args['timestamp'] = \ format_utc_iso_date(parse_timestamp(timestamp).isoformat(), '%Y-%m-%dT%H:%M:%S') root_dir_url = reverse('browse-origin-directory', url_args=url_args, query_params=query_params) self.assertContains(resp, '
  • ', count=len(path_info)+1) self.assertContains(resp, '%s' % (root_dir_url, root_dir_sha1[:7])) for p in path_info: url_args['path'] = p['path'] dir_url = reverse('browse-origin-directory', url_args=url_args, query_params=query_params) self.assertContains(resp, '%s' % (dir_url, p['name'])) self.assertContains(resp, '
  • %s
  • ' % filename) query_string = 'sha1_git:' + content['sha1_git'] url_raw = reverse('browse-content-raw', url_args={'query_string': query_string}, query_params={'filename': filename}) self.assertContains(resp, url_raw) if 'args' in url_args: del url_args['path'] origin_branches_url = reverse('browse-origin-branches', url_args=url_args, query_params=query_params) self.assertContains(resp, 'Branches (%s)' % (origin_branches_url, len(origin_branches))) origin_releases_url = reverse('browse-origin-releases', url_args=url_args, query_params=query_params) self.assertContains(resp, 'Releases (%s)' % (origin_releases_url, len(origin_releases))) self.assertContains(resp, '
  • ', count=len(origin_branches)) url_args['path'] = content_path for branch in origin_branches: query_params['branch'] = branch['name'] root_dir_branch_url = reverse('browse-origin-content', url_args=url_args, query_params=query_params) self.assertContains(resp, '' % root_dir_branch_url) self.assertContains(resp, '
  • ', count=len(origin_releases)) query_params['branch'] = None for release in origin_releases: query_params['release'] = release['name'] root_dir_release_url = reverse('browse-origin-content', url_args=url_args, query_params=query_params) self.assertContains(resp, '' % root_dir_release_url) del url_args['origin_type'] url = reverse('browse-origin-content', url_args=url_args, query_params=query_params) resp = self.client.get(url) self.assertEqual(resp.status_code, 200) self.assertTemplateUsed('content.html') swh_cnt_id = get_swh_persistent_id('content', content['sha1_git']) swh_cnt_id_url = reverse('browse-swh-id', url_args={'swh_id': swh_cnt_id}) self.assertContains(resp, swh_cnt_id) self.assertContains(resp, swh_cnt_id_url) self.assertContains(resp, 'swh-take-new-snapshot') @given(origin_with_multiple_visits()) def test_origin_content_view(self, origin): origin_visits = self.origin_visit_get(origin['url']) def _get_test_data(visit_idx): snapshot = self.snapshot_get(origin_visits[visit_idx]['snapshot']) head_rev_id = snapshot['branches']['HEAD']['target'] head_rev = self.revision_get(head_rev_id) dir_content = self.directory_ls(head_rev['directory']) dir_files = [e for e in dir_content if e['type'] == 'file'] dir_file = random.choice(dir_files) branches, releases = process_snapshot_branches(snapshot) return { 'branches': branches, 'releases': releases, 'root_dir_sha1': head_rev['directory'], 'content': get_content(dir_file['checksums']['sha1']), 'visit': origin_visits[visit_idx] } test_data = _get_test_data(-1) self.origin_content_view_helper(origin, origin_visits, test_data['branches'], test_data['releases'], test_data['root_dir_sha1'], test_data['content']) self.origin_content_view_helper(origin, origin_visits, test_data['branches'], test_data['releases'], test_data['root_dir_sha1'], test_data['content'], timestamp=test_data['visit']['date']) visit_unix_ts = parse_timestamp(test_data['visit']['date']).timestamp() visit_unix_ts = int(visit_unix_ts) self.origin_content_view_helper(origin, origin_visits, test_data['branches'], test_data['releases'], test_data['root_dir_sha1'], test_data['content'], timestamp=visit_unix_ts) test_data = _get_test_data(0) self.origin_content_view_helper(origin, origin_visits, test_data['branches'], test_data['releases'], test_data['root_dir_sha1'], test_data['content'], visit_id=test_data['visit']['visit']) def origin_directory_view_helper(self, origin_info, origin_visits, origin_branches, origin_releases, root_directory_sha1, directory_entries, visit_id=None, timestamp=None, path=None): dirs = [e for e in directory_entries if e['type'] in ('dir', 'rev')] files = [e for e in directory_entries if e['type'] == 'file'] if not visit_id: visit_id = origin_visits[-1]['visit'] url_args = {'origin_url': origin_info['url']} query_params = {} if timestamp: url_args['timestamp'] = timestamp else: query_params['visit_id'] = visit_id if path: url_args['path'] = path url = reverse('browse-origin-directory', url_args=url_args, query_params=query_params) resp = self.client.get(url) self.assertEqual(resp.status_code, 200) self.assertTemplateUsed('directory.html') self.assertEqual(resp.status_code, 200) self.assertTemplateUsed('directory.html') self.assertContains(resp, '', count=len(dirs)) self.assertContains(resp, '', count=len(files)) if timestamp: url_args['timestamp'] = \ format_utc_iso_date(parse_timestamp(timestamp).isoformat(), '%Y-%m-%dT%H:%M:%S') for d in dirs: if d['type'] == 'rev': dir_url = reverse('browse-revision', url_args={'sha1_git': d['target']}) else: dir_path = d['name'] if path: dir_path = "%s/%s" % (path, d['name']) dir_url_args = dict(url_args) dir_url_args['path'] = dir_path dir_url = reverse('browse-origin-directory', url_args=dir_url_args, query_params=query_params) self.assertContains(resp, dir_url) for f in files: file_path = f['name'] if path: file_path = "%s/%s" % (path, f['name']) file_url_args = dict(url_args) file_url_args['path'] = file_path file_url = reverse('browse-origin-content', url_args=file_url_args, query_params=query_params) self.assertContains(resp, file_url) if 'path' in url_args: del url_args['path'] root_dir_branch_url = \ reverse('browse-origin-directory', url_args=url_args, query_params=query_params) nb_bc_paths = 1 if path: nb_bc_paths = len(path.split('/')) + 1 self.assertContains(resp, '
  • ', count=nb_bc_paths) self.assertContains(resp, '%s' % (root_dir_branch_url, root_directory_sha1[:7])) origin_branches_url = reverse('browse-origin-branches', url_args=url_args, query_params=query_params) self.assertContains(resp, 'Branches (%s)' % (origin_branches_url, len(origin_branches))) origin_releases_url = reverse('browse-origin-releases', url_args=url_args, query_params=query_params) nb_releases = len(origin_releases) if nb_releases > 0: self.assertContains(resp, 'Releases (%s)' % (origin_releases_url, nb_releases)) if path: url_args['path'] = path self.assertContains(resp, '
  • ', count=len(origin_branches)) for branch in origin_branches: query_params['branch'] = branch['name'] root_dir_branch_url = \ reverse('browse-origin-directory', url_args=url_args, query_params=query_params) self.assertContains(resp, '' % root_dir_branch_url) self.assertContains(resp, '
  • ', count=len(origin_releases)) query_params['branch'] = None for release in origin_releases: query_params['release'] = release['name'] root_dir_release_url = \ reverse('browse-origin-directory', url_args=url_args, query_params=query_params) self.assertContains(resp, '' % root_dir_release_url) self.assertContains(resp, 'vault-cook-directory') self.assertContains(resp, 'vault-cook-revision') swh_dir_id = get_swh_persistent_id('directory', directory_entries[0]['dir_id']) # noqa swh_dir_id_url = reverse('browse-swh-id', url_args={'swh_id': swh_dir_id}) self.assertContains(resp, swh_dir_id) self.assertContains(resp, swh_dir_id_url) self.assertContains(resp, 'swh-take-new-snapshot') @given(origin()) def test_origin_root_directory_view(self, origin): origin_visits = self.origin_visit_get(origin['url']) visit = origin_visits[-1] snapshot = self.snapshot_get(visit['snapshot']) head_rev_id = snapshot['branches']['HEAD']['target'] head_rev = self.revision_get(head_rev_id) root_dir_sha1 = head_rev['directory'] dir_content = self.directory_ls(root_dir_sha1) branches, releases = process_snapshot_branches(snapshot) visit_unix_ts = parse_timestamp(visit['date']).timestamp() visit_unix_ts = int(visit_unix_ts) self.origin_directory_view_helper(origin, origin_visits, branches, releases, root_dir_sha1, dir_content) self.origin_directory_view_helper(origin, origin_visits, branches, releases, root_dir_sha1, dir_content, visit_id=visit['visit']) self.origin_directory_view_helper(origin, origin_visits, branches, releases, root_dir_sha1, dir_content, timestamp=visit_unix_ts) self.origin_directory_view_helper(origin, origin_visits, branches, releases, root_dir_sha1, dir_content, timestamp=visit['date']) origin = dict(origin) del origin['type'] self.origin_directory_view_helper(origin, origin_visits, branches, releases, root_dir_sha1, dir_content) self.origin_directory_view_helper(origin, origin_visits, branches, releases, root_dir_sha1, dir_content, visit_id=visit['visit']) self.origin_directory_view_helper(origin, origin_visits, branches, releases, root_dir_sha1, dir_content, timestamp=visit_unix_ts) self.origin_directory_view_helper(origin, origin_visits, branches, releases, root_dir_sha1, dir_content, timestamp=visit['date']) @given(origin()) def test_origin_sub_directory_view(self, origin): origin_visits = self.origin_visit_get(origin['url']) visit = origin_visits[-1] snapshot = self.snapshot_get(visit['snapshot']) head_rev_id = snapshot['branches']['HEAD']['target'] head_rev = self.revision_get(head_rev_id) root_dir_sha1 = head_rev['directory'] subdirs = [e for e in self.directory_ls(root_dir_sha1) if e['type'] == 'dir'] branches, releases = process_snapshot_branches(snapshot) visit_unix_ts = parse_timestamp(visit['date']).timestamp() visit_unix_ts = int(visit_unix_ts) if len(subdirs) == 0: return subdir = random.choice(subdirs) subdir_content = self.directory_ls(subdir['target']) subdir_path = subdir['name'] self.origin_directory_view_helper(origin, origin_visits, branches, releases, root_dir_sha1, subdir_content, path=subdir_path) self.origin_directory_view_helper(origin, origin_visits, branches, releases, root_dir_sha1, subdir_content, path=subdir_path, visit_id=visit['visit']) self.origin_directory_view_helper(origin, origin_visits, branches, releases, root_dir_sha1, subdir_content, path=subdir_path, timestamp=visit_unix_ts) self.origin_directory_view_helper(origin, origin_visits, branches, releases, root_dir_sha1, subdir_content, path=subdir_path, timestamp=visit['date']) origin = dict(origin) del origin['type'] self.origin_directory_view_helper(origin, origin_visits, branches, releases, root_dir_sha1, subdir_content, path=subdir_path) self.origin_directory_view_helper(origin, origin_visits, branches, releases, root_dir_sha1, subdir_content, path=subdir_path, visit_id=visit['visit']) self.origin_directory_view_helper(origin, origin_visits, branches, releases, root_dir_sha1, subdir_content, path=subdir_path, timestamp=visit_unix_ts) self.origin_directory_view_helper(origin, origin_visits, branches, releases, root_dir_sha1, subdir_content, path=subdir_path, timestamp=visit['date']) def origin_branches_helper(self, origin_info, origin_snapshot): url_args = {'origin_type': origin_info['type'], 'origin_url': origin_info['url']} url = reverse('browse-origin-branches', url_args=url_args) resp = self.client.get(url) self.assertEqual(resp.status_code, 200) self.assertTemplateUsed('branches.html') origin_branches = origin_snapshot[0] origin_releases = origin_snapshot[1] origin_branches_url = reverse('browse-origin-branches', url_args=url_args) self.assertContains(resp, 'Branches (%s)' % (origin_branches_url, len(origin_branches))) origin_releases_url = reverse('browse-origin-releases', url_args=url_args) nb_releases = len(origin_releases) if nb_releases > 0: self.assertContains(resp, 'Releases (%s)' % (origin_releases_url, nb_releases)) self.assertContains(resp, '' % escape(browse_branch_url)) browse_revision_url = reverse( 'browse-revision', url_args={'sha1_git': branch['revision']}, query_params={'origin_type': origin_info['type'], 'origin': origin_info['url']}) self.assertContains(resp, '' % escape(browse_revision_url)) @given(origin()) def test_origin_branches(self, origin): origin_visits = self.origin_visit_get(origin['url']) visit = origin_visits[-1] snapshot = self.snapshot_get(visit['snapshot']) snapshot_content = process_snapshot_branches(snapshot) self.origin_branches_helper(origin, snapshot_content) origin = dict(origin) origin['type'] = None self.origin_branches_helper(origin, snapshot_content) def origin_releases_helper(self, origin_info, origin_snapshot): url_args = {'origin_type': origin_info['type'], 'origin_url': origin_info['url']} url = reverse('browse-origin-releases', url_args=url_args) resp = self.client.get(url) self.assertEqual(resp.status_code, 200) self.assertTemplateUsed('releases.html') origin_branches = origin_snapshot[0] origin_releases = origin_snapshot[1] origin_branches_url = reverse('browse-origin-branches', url_args=url_args) self.assertContains(resp, 'Branches (%s)' % (origin_branches_url, len(origin_branches))) origin_releases_url = reverse('browse-origin-releases', url_args=url_args) nb_releases = len(origin_releases) if nb_releases > 0: self.assertContains(resp, 'Releases (%s)' % (origin_releases_url, nb_releases)) self.assertContains(resp, '' % escape(browse_release_url)) self.assertContains(resp, '' % escape(browse_revision_url)) @given(origin()) def test_origin_releases(self, origin): origin_visits = self.origin_visit_get(origin['url']) visit = origin_visits[-1] snapshot = self.snapshot_get(visit['snapshot']) snapshot_content = process_snapshot_branches(snapshot) self.origin_releases_helper(origin, snapshot_content) origin = dict(origin) origin['type'] = None self.origin_releases_helper(origin, snapshot_content) @given(new_origin(), new_snapshot(min_size=4, max_size=4), visit_dates(), revisions(min_size=3, max_size=3)) def test_origin_snapshot_null_branch(self, new_origin, new_snapshot, visit_dates, revisions): snp_dict = new_snapshot.to_dict() new_origin = self.storage.origin_add([new_origin])[0] for i, branch in enumerate(snp_dict['branches'].keys()): if i == 0: snp_dict['branches'][branch] = None else: - snp_dict['branches'][branch]['target_type'] = 'revision' - snp_dict['branches'][branch]['target'] = hash_to_bytes( - revisions[i-1]) + snp_dict['branches'][branch] = { + 'target_type': 'revision', + 'target': hash_to_bytes(revisions[i-1]), + } + self.storage.snapshot_add([snp_dict]) visit = self.storage.origin_visit_add( new_origin['url'], visit_dates[0]) self.storage.origin_visit_update(new_origin['url'], visit['visit'], status='partial', snapshot=snp_dict['id']) url = reverse('browse-origin-directory', url_args={'origin_url': new_origin['url']}) rv = self.client.get(url) self.assertEqual(rv.status_code, 200) @patch('swh.web.browse.views.utils.snapshot_context.request_content') @patch('swh.web.common.origin_visits.get_origin_visits') @patch('swh.web.browse.utils.get_origin_visit_snapshot') @patch('swh.web.browse.utils.service') @patch('swh.web.browse.views.origin.service') @patch('swh.web.browse.views.utils.snapshot_context.service') @patch('swh.web.browse.views.origin.get_origin_info') def test_origin_request_errors(self, mock_get_origin_info, mock_snapshot_service, mock_origin_service, mock_utils_service, mock_get_origin_visit_snapshot, mock_get_origin_visits, mock_request_content): mock_get_origin_info.side_effect = \ NotFoundExc('origin not found') url = reverse('browse-origin-visits', url_args={'origin_type': 'foo', 'origin_url': 'bar'}) resp = self.client.get(url) self.assertEqual(resp.status_code, 404) self.assertTemplateUsed('error.html') self.assertContains(resp, 'origin not found', status_code=404) mock_utils_service.lookup_origin.side_effect = None mock_utils_service.lookup_origin.return_value = {'type': 'foo', 'url': 'bar', 'id': 457} mock_get_origin_visits.return_value = [] url = reverse('browse-origin-directory', url_args={'origin_type': 'foo', 'origin_url': 'bar'}) resp = self.client.get(url) self.assertEqual(resp.status_code, 404) self.assertTemplateUsed('error.html') self.assertContains(resp, "No visit", status_code=404) mock_get_origin_visits.return_value = [{'visit': 1}] mock_get_origin_visit_snapshot.side_effect = \ NotFoundExc('visit not found') url = reverse('browse-origin-directory', url_args={'origin_type': 'foo', 'origin_url': 'bar'}, query_params={'visit_id': 2}) resp = self.client.get(url) self.assertEqual(resp.status_code, 404) self.assertTemplateUsed('error.html') self.assertRegex(resp.content.decode('utf-8'), 'Visit.*not found') mock_get_origin_visits.return_value = [{ 'date': '2015-09-26T09:30:52.373449+00:00', 'metadata': {}, 'origin': 457, 'snapshot': 'bdaf9ac436488a8c6cda927a0f44e172934d3f65', 'status': 'full', 'visit': 1 }] mock_get_origin_visit_snapshot.side_effect = None mock_get_origin_visit_snapshot.return_value = ( [{'directory': 'ae59ceecf46367e8e4ad800e231fc76adc3afffb', 'name': 'HEAD', 'revision': '7bc08e1aa0b08cb23e18715a32aa38517ad34672', 'date': '04 May 2017, 13:27 UTC', 'message': ''}], [] ) mock_utils_service.lookup_snapshot_size.return_value = { 'revision': 1, 'release': 0 } mock_utils_service.lookup_directory.side_effect = \ NotFoundExc('Directory not found') url = reverse('browse-origin-directory', url_args={'origin_type': 'foo', 'origin_url': 'bar'}) resp = self.client.get(url) self.assertEqual(resp.status_code, 404) self.assertTemplateUsed('error.html') self.assertContains(resp, 'Directory not found', status_code=404) with patch('swh.web.browse.views.utils.snapshot_context.' 'get_snapshot_context') as mock_get_snapshot_context: mock_get_snapshot_context.side_effect = \ NotFoundExc('Snapshot not found') url = reverse('browse-origin-directory', url_args={'origin_type': 'foo', 'origin_url': 'bar'}) resp = self.client.get(url) self.assertEqual(resp.status_code, 404) self.assertTemplateUsed('error.html') self.assertContains(resp, 'Snapshot not found', status_code=404) mock_origin_service.lookup_origin.side_effect = None mock_origin_service.lookup_origin.return_value = {'type': 'foo', 'url': 'bar', 'id': 457} mock_get_origin_visits.return_value = [] url = reverse('browse-origin-content', url_args={'origin_type': 'foo', 'origin_url': 'bar', 'path': 'foo'}) resp = self.client.get(url) self.assertEqual(resp.status_code, 404) self.assertTemplateUsed('error.html') self.assertContains(resp, "No visit", status_code=404) mock_get_origin_visits.return_value = [{'visit': 1}] mock_get_origin_visit_snapshot.side_effect = \ NotFoundExc('visit not found') url = reverse('browse-origin-content', url_args={'origin_type': 'foo', 'origin_url': 'bar', 'path': 'foo'}, query_params={'visit_id': 2}) resp = self.client.get(url) self.assertEqual(resp.status_code, 404) self.assertTemplateUsed('error.html') self.assertRegex(resp.content.decode('utf-8'), 'Visit.*not found') mock_get_origin_visits.return_value = [{ 'date': '2015-09-26T09:30:52.373449+00:00', 'metadata': {}, 'origin': 457, 'snapshot': 'bdaf9ac436488a8c6cda927a0f44e172934d3f65', 'status': 'full', 'visit': 1 }] mock_get_origin_visit_snapshot.side_effect = None mock_get_origin_visit_snapshot.return_value = ([], []) url = reverse('browse-origin-content', url_args={'origin_type': 'foo', 'origin_url': 'bar', 'path': 'baz'}) resp = self.client.get(url) self.assertEqual(resp.status_code, 404) self.assertTemplateUsed('error.html') self.assertRegex(resp.content.decode('utf-8'), 'Origin.*has an empty list of branches') mock_get_origin_visit_snapshot.return_value = ( [{'directory': 'ae59ceecf46367e8e4ad800e231fc76adc3afffb', 'name': 'HEAD', 'revision': '7bc08e1aa0b08cb23e18715a32aa38517ad34672', 'date': '04 May 2017, 13:27 UTC', 'message': ''}], [] ) mock_snapshot_service.lookup_directory_with_path.return_value = \ {'target': '5ecd9f37b7a2d2e9980d201acd6286116f2ba1f1'} mock_request_content.side_effect = \ NotFoundExc('Content not found') url = reverse('browse-origin-content', url_args={'origin_type': 'foo', 'origin_url': 'bar', 'path': 'baz'}) resp = self.client.get(url) self.assertEqual(resp.status_code, 404) self.assertTemplateUsed('error.html') self.assertContains(resp, 'Content not found', status_code=404) @patch('swh.web.common.origin_visits.get_origin_visits') @patch('swh.web.browse.utils.get_origin_visit_snapshot') @patch('swh.web.browse.utils.service') def test_origin_empty_snapshot(self, mock_utils_service, mock_get_origin_visit_snapshot, mock_get_origin_visits): mock_get_origin_visits.return_value = [{ 'date': '2015-09-26T09:30:52.373449+00:00', 'metadata': {}, 'origin': 457, 'snapshot': 'bdaf9ac436488a8c6cda927a0f44e172934d3f65', 'status': 'full', 'visit': 1 }] mock_get_origin_visit_snapshot.return_value = ([], []) mock_utils_service.lookup_snapshot_size.return_value = { 'revision': 0, 'release': 0 } mock_utils_service.lookup_origin.return_value = { 'id': 457, 'type': 'git', 'url': 'https://github.com/foo/bar' } url = reverse('browse-origin-directory', url_args={'origin_type': 'foo', 'origin_url': 'bar'}) resp = self.client.get(url) self.assertEqual(resp.status_code, 200) self.assertTemplateUsed('content.html') self.assertRegex(resp.content.decode('utf-8'), 'snapshot.*is empty') diff --git a/swh/web/tests/common/test_service.py b/swh/web/tests/common/test_service.py index cc2af791..05f11fb1 100644 --- a/swh/web/tests/common/test_service.py +++ b/swh/web/tests/common/test_service.py @@ -1,877 +1,878 @@ # Copyright (C) 2015-2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information import itertools import pytest import random from collections import defaultdict from hypothesis import given from swh.model.hashutil import hash_to_bytes, hash_to_hex from swh.model.from_disk import DentryPerms from swh.web.common import service from swh.web.common.exc import BadInputExc, NotFoundExc from swh.web.tests.data import random_sha1, random_content from swh.web.tests.strategies import ( content, contents, unknown_contents, contents_with_ctags, origin, new_origin, visit_dates, directory, release, revision, unknown_revision, revisions, ancestor_revisions, non_ancestor_revisions, invalid_sha1, sha256, revision_with_submodules, empty_directory, new_revision, new_origins ) from swh.web.tests.testcase import ( WebTestCase, ctags_json_missing, fossology_missing ) class ServiceTestCase(WebTestCase): @given(contents()) def test_lookup_multiple_hashes_all_present(self, contents): input_data = [] expected_output = [] for cnt in contents: input_data.append({'sha1': cnt['sha1']}) expected_output.append({'sha1': cnt['sha1'], 'found': True}) self.assertEqual(service.lookup_multiple_hashes(input_data), expected_output) @given(contents(), unknown_contents()) def test_lookup_multiple_hashes_some_missing(self, contents, unknown_contents): input_contents = list(itertools.chain(contents, unknown_contents)) random.shuffle(input_contents) input_data = [] expected_output = [] for cnt in input_contents: input_data.append({'sha1': cnt['sha1']}) expected_output.append({'sha1': cnt['sha1'], 'found': cnt in contents}) self.assertEqual(service.lookup_multiple_hashes(input_data), expected_output) def test_lookup_hash_does_not_exist(self): unknown_content_ = random_content() actual_lookup = service.lookup_hash('sha1_git:%s' % unknown_content_['sha1_git']) self.assertEqual(actual_lookup, {'found': None, 'algo': 'sha1_git'}) @given(content()) def test_lookup_hash_exist(self, content): actual_lookup = service.lookup_hash('sha1:%s' % content['sha1']) content_metadata = self.content_get_metadata(content['sha1']) self.assertEqual({'found': content_metadata, 'algo': 'sha1'}, actual_lookup) def test_search_hash_does_not_exist(self): unknown_content_ = random_content() actual_lookup = service.search_hash('sha1_git:%s' % unknown_content_['sha1_git']) self.assertEqual({'found': False}, actual_lookup) @given(content()) def test_search_hash_exist(self, content): actual_lookup = service.search_hash('sha1:%s' % content['sha1']) self.assertEqual({'found': True}, actual_lookup) @pytest.mark.skipif(ctags_json_missing, reason="requires ctags with json output support") @given(contents_with_ctags()) def test_lookup_content_ctags(self, contents_with_ctags): content_sha1 = random.choice(contents_with_ctags['sha1s']) self.content_add_ctags(content_sha1) actual_ctags = \ list(service.lookup_content_ctags('sha1:%s' % content_sha1)) expected_data = list(self.content_get_ctags(content_sha1)) for ctag in expected_data: ctag['id'] = content_sha1 self.assertEqual(actual_ctags, expected_data) def test_lookup_content_ctags_no_hash(self): unknown_content_ = random_content() actual_ctags = \ list(service.lookup_content_ctags('sha1:%s' % unknown_content_['sha1'])) self.assertEqual(actual_ctags, []) @given(content()) def test_lookup_content_filetype(self, content): self.content_add_mimetype(content['sha1']) actual_filetype = service.lookup_content_filetype(content['sha1']) expected_filetype = self.content_get_mimetype(content['sha1']) self.assertEqual(actual_filetype, expected_filetype) @pytest.mark.xfail # Language indexer is disabled. @given(content()) def test_lookup_content_language(self, content): self.content_add_language(content['sha1']) actual_language = service.lookup_content_language(content['sha1']) expected_language = self.content_get_language(content['sha1']) self.assertEqual(actual_language, expected_language) @given(contents_with_ctags()) def test_lookup_expression(self, contents_with_ctags): per_page = 10 expected_ctags = [] for content_sha1 in contents_with_ctags['sha1s']: if len(expected_ctags) == per_page: break self.content_add_ctags(content_sha1) for ctag in self.content_get_ctags(content_sha1): if len(expected_ctags) == per_page: break if ctag['name'] == contents_with_ctags['symbol_name']: del ctag['id'] ctag['sha1'] = content_sha1 expected_ctags.append(ctag) actual_ctags = \ list(service.lookup_expression(contents_with_ctags['symbol_name'], last_sha1=None, per_page=10)) self.assertEqual(actual_ctags, expected_ctags) def test_lookup_expression_no_result(self): expected_ctags = [] actual_ctags = \ list(service.lookup_expression('barfoo', last_sha1=None, per_page=10)) self.assertEqual(actual_ctags, expected_ctags) @pytest.mark.skipif(fossology_missing, reason="requires fossology-nomossa installed") @given(content()) def test_lookup_content_license(self, content): self.content_add_license(content['sha1']) actual_license = service.lookup_content_license(content['sha1']) expected_license = self.content_get_license(content['sha1']) self.assertEqual(actual_license, expected_license) def test_stat_counters(self): actual_stats = service.stat_counters() self.assertEqual(actual_stats, self.storage.stat_counters()) @given(new_origin(), visit_dates()) def test_lookup_origin_visits(self, new_origin, visit_dates): origin_id = self.storage.origin_add_one(new_origin) for ts in visit_dates: self.storage.origin_visit_add(origin_id, ts) actual_origin_visits = list( service.lookup_origin_visits(origin_id, per_page=100)) expected_visits = self.origin_visit_get(origin_id) self.assertEqual(actual_origin_visits, expected_visits) @given(new_origin(), visit_dates()) def test_lookup_origin_visit(self, new_origin, visit_dates): origin_id = self.storage.origin_add_one(new_origin) visits = [] for ts in visit_dates: visits.append(self.storage.origin_visit_add(origin_id, ts)) visit = random.choice(visits)['visit'] actual_origin_visit = service.lookup_origin_visit(origin_id, visit) expected_visit = dict(self.storage.origin_visit_get_by(origin_id, visit)) expected_visit['date'] = expected_visit['date'].isoformat() expected_visit['metadata'] = {} self.assertEqual(actual_origin_visit, expected_visit) @pytest.mark.origin_id @given(new_origin()) def test_lookup_origin_by_id(self, new_origin): origin_id = self.storage.origin_add_one(new_origin) actual_origin = service.lookup_origin({'id': origin_id}) expected_origin = self.storage.origin_get({'id': origin_id}) self.assertEqual(actual_origin, expected_origin) @given(new_origin()) def test_lookup_origin(self, new_origin): self.storage.origin_add_one(new_origin) actual_origin = service.lookup_origin({'type': new_origin['type'], 'url': new_origin['url']}) expected_origin = self.storage.origin_get({'type': new_origin['type'], 'url': new_origin['url']}) self.assertEqual(actual_origin, expected_origin) @given(invalid_sha1()) def test_lookup_release_ko_id_checksum_not_a_sha1(self, invalid_sha1): with self.assertRaises(BadInputExc) as cm: service.lookup_release(invalid_sha1) self.assertIn('invalid checksum', cm.exception.args[0].lower()) @given(sha256()) def test_lookup_release_ko_id_checksum_too_long(self, sha256): with self.assertRaises(BadInputExc) as cm: service.lookup_release(sha256) self.assertEqual('Only sha1_git is supported.', cm.exception.args[0]) @given(directory()) def test_lookup_directory_with_path_not_found(self, directory): path = 'some/invalid/path/here' with self.assertRaises(NotFoundExc) as cm: service.lookup_directory_with_path(directory, path) self.assertEqual('Directory entry with path %s from %s ' 'not found' % (path, directory), cm.exception.args[0]) @given(directory()) def test_lookup_directory_with_path_found(self, directory): directory_content = self.directory_ls(directory) directory_entry = random.choice(directory_content) path = directory_entry['name'] actual_result = service.lookup_directory_with_path(directory, path) self.assertEqual(actual_result, directory_entry) @given(release()) def test_lookup_release(self, release): actual_release = service.lookup_release(release) self.assertEqual(actual_release, self.release_get(release)) @given(revision(), invalid_sha1(), sha256()) def test_lookup_revision_with_context_ko_not_a_sha1(self, revision, invalid_sha1, sha256): sha1_git_root = revision sha1_git = invalid_sha1 with self.assertRaises(BadInputExc) as cm: service.lookup_revision_with_context(sha1_git_root, sha1_git) self.assertIn('Invalid checksum query string', cm.exception.args[0]) sha1_git = sha256 with self.assertRaises(BadInputExc) as cm: service.lookup_revision_with_context(sha1_git_root, sha1_git) self.assertIn('Only sha1_git is supported', cm.exception.args[0]) @given(revision(), unknown_revision()) def test_lookup_revision_with_context_ko_sha1_git_does_not_exist( self, revision, unknown_revision): sha1_git_root = revision sha1_git = unknown_revision with self.assertRaises(NotFoundExc) as cm: service.lookup_revision_with_context(sha1_git_root, sha1_git) self.assertIn('Revision %s not found' % sha1_git, cm.exception.args[0]) @given(revision(), unknown_revision()) def test_lookup_revision_with_context_ko_root_sha1_git_does_not_exist( self, revision, unknown_revision): sha1_git_root = unknown_revision sha1_git = revision with self.assertRaises(NotFoundExc) as cm: service.lookup_revision_with_context(sha1_git_root, sha1_git) self.assertIn('Revision root %s not found' % sha1_git_root, cm.exception.args[0]) @given(ancestor_revisions()) def test_lookup_revision_with_context(self, ancestor_revisions): sha1_git = ancestor_revisions['sha1_git'] root_sha1_git = ancestor_revisions['sha1_git_root'] for sha1_git_root in (root_sha1_git, {'id': hash_to_bytes(root_sha1_git)}): actual_revision = \ service.lookup_revision_with_context(sha1_git_root, sha1_git) children = [] for rev in self.revision_log(root_sha1_git): for p_rev in rev['parents']: p_rev_hex = hash_to_hex(p_rev) if p_rev_hex == sha1_git: children.append(rev['id']) expected_revision = self.revision_get(sha1_git) expected_revision['children'] = children self.assertEqual(actual_revision, expected_revision) @given(non_ancestor_revisions()) def test_lookup_revision_with_context_ko(self, non_ancestor_revisions): sha1_git = non_ancestor_revisions['sha1_git'] root_sha1_git = non_ancestor_revisions['sha1_git_root'] with self.assertRaises(NotFoundExc) as cm: service.lookup_revision_with_context(root_sha1_git, sha1_git) self.assertIn('Revision %s is not an ancestor of %s' % (sha1_git, root_sha1_git), cm.exception.args[0]) def test_lookup_directory_with_revision_not_found(self): unknown_revision_ = random_sha1() with self.assertRaises(NotFoundExc) as cm: service.lookup_directory_with_revision(unknown_revision_) self.assertIn('Revision %s not found' % unknown_revision_, cm.exception.args[0]) def test_lookup_directory_with_revision_unknown_content(self): unknown_content_ = random_content() unknown_revision_ = random_sha1() unknown_directory_ = random_sha1() dir_path = 'README.md' # Create a revision that points to a directory # Which points to unknown content revision = { 'author': { 'name': b'abcd', 'email': b'abcd@company.org', 'fullname': b'abcd abcd' }, 'committer': { 'email': b'aaaa@company.org', 'fullname': b'aaaa aaa', 'name': b'aaa' }, 'committer_date': { 'negative_utc': False, 'offset': 0, 'timestamp': 1437511651 }, 'date': { 'negative_utc': False, 'offset': 0, 'timestamp': 1437511651 }, 'message': b'bleh', 'metadata': [], 'parents': [], 'synthetic': False, 'type': 'file', 'id': hash_to_bytes(unknown_revision_), 'directory': hash_to_bytes(unknown_directory_) } # A directory that points to unknown content dir = { 'id': hash_to_bytes(unknown_directory_), 'entries': [{ 'name': bytes(dir_path.encode('utf-8')), 'type': 'file', 'target': hash_to_bytes(unknown_content_['sha1_git']), 'perms': DentryPerms.content }] } # Add the directory and revision in mem self.storage.directory_add([dir]) self.storage.revision_add([revision]) with self.assertRaises(NotFoundExc) as cm: service.lookup_directory_with_revision( unknown_revision_, dir_path) self.assertIn('Content not found for revision %s' % unknown_revision_, cm.exception.args[0]) @given(revision()) def test_lookup_directory_with_revision_ko_path_to_nowhere( self, revision): invalid_path = 'path/to/something/unknown' with self.assertRaises(NotFoundExc) as cm: service.lookup_directory_with_revision(revision, invalid_path) exception_text = cm.exception.args[0].lower() self.assertIn('directory or file', exception_text) self.assertIn(invalid_path, exception_text) self.assertIn('revision %s' % revision, exception_text) self.assertIn('not found', exception_text) @given(revision_with_submodules()) def test_lookup_directory_with_revision_submodules( self, revision_with_submodules): rev_sha1_git = revision_with_submodules['rev_sha1_git'] rev_dir_path = revision_with_submodules['rev_dir_rev_path'] actual_data = service.lookup_directory_with_revision( rev_sha1_git, rev_dir_path) revision = self.revision_get(revision_with_submodules['rev_sha1_git']) directory = self.directory_ls(revision['directory']) rev_entry = next(e for e in directory if e['name'] == rev_dir_path) expected_data = { 'content': self.revision_get(rev_entry['target']), 'path': rev_dir_path, 'revision': rev_sha1_git, 'type': 'rev' } self.assertEqual(actual_data, expected_data) @given(revision()) def test_lookup_directory_with_revision_without_path(self, revision): actual_directory_entries = \ service.lookup_directory_with_revision(revision) revision_data = self.revision_get(revision) expected_directory_entries = \ self.directory_ls(revision_data['directory']) self.assertEqual(actual_directory_entries['type'], 'dir') self.assertEqual(actual_directory_entries['content'], expected_directory_entries) @given(revision()) def test_lookup_directory_with_revision_with_path(self, revision): revision_data = self.revision_get(revision) dir_entries = [e for e in self.directory_ls(revision_data['directory']) if e['type'] in ('file', 'dir')] expected_dir_entry = random.choice(dir_entries) actual_dir_entry = \ service.lookup_directory_with_revision(revision, expected_dir_entry['name']) self.assertEqual(actual_dir_entry['type'], expected_dir_entry['type']) self.assertEqual(actual_dir_entry['revision'], revision) self.assertEqual(actual_dir_entry['path'], expected_dir_entry['name']) if actual_dir_entry['type'] == 'file': del actual_dir_entry['content']['checksums']['blake2s256'] for key in ('checksums', 'status', 'length'): self.assertEqual(actual_dir_entry['content'][key], expected_dir_entry[key]) else: sub_dir_entries = self.directory_ls(expected_dir_entry['target']) self.assertEqual(actual_dir_entry['content'], sub_dir_entries) @given(revision()) def test_lookup_directory_with_revision_with_path_to_file_and_data( self, revision): revision_data = self.revision_get(revision) dir_entries = [e for e in self.directory_ls(revision_data['directory']) if e['type'] == 'file'] expected_dir_entry = random.choice(dir_entries) expected_data = \ self.content_get(expected_dir_entry['checksums']['sha1']) actual_dir_entry = \ service.lookup_directory_with_revision(revision, expected_dir_entry['name'], with_data=True) self.assertEqual(actual_dir_entry['type'], expected_dir_entry['type']) self.assertEqual(actual_dir_entry['revision'], revision) self.assertEqual(actual_dir_entry['path'], expected_dir_entry['name']) del actual_dir_entry['content']['checksums']['blake2s256'] for key in ('checksums', 'status', 'length'): self.assertEqual(actual_dir_entry['content'][key], expected_dir_entry[key]) self.assertEqual(actual_dir_entry['content']['data'], expected_data['data']) @given(revision()) def test_lookup_revision(self, revision): actual_revision = service.lookup_revision(revision) self.assertEqual(actual_revision, self.revision_get(revision)) @given(new_revision()) def test_lookup_revision_invalid_msg(self, new_revision): new_revision['message'] = b'elegant fix for bug \xff' self.storage.revision_add([new_revision]) revision = service.lookup_revision(hash_to_hex(new_revision['id'])) self.assertEqual(revision['message'], None) self.assertEqual(revision['message_decoding_failed'], True) @given(new_revision()) def test_lookup_revision_msg_ok(self, new_revision): self.storage.revision_add([new_revision]) revision_message = service.lookup_revision_message( hash_to_hex(new_revision['id'])) self.assertEqual(revision_message, {'message': new_revision['message']}) @given(new_revision()) def test_lookup_revision_msg_absent(self, new_revision): del new_revision['message'] self.storage.revision_add([new_revision]) new_revision_id = hash_to_hex(new_revision['id']) with self.assertRaises(NotFoundExc) as cm: service.lookup_revision_message(new_revision_id) self.assertEqual( cm.exception.args[0], 'No message for revision with sha1_git %s.' % new_revision_id ) def test_lookup_revision_msg_no_rev(self): unknown_revision_ = random_sha1() with self.assertRaises(NotFoundExc) as cm: service.lookup_revision_message(unknown_revision_) self.assertEqual( cm.exception.args[0], 'Revision with sha1_git %s not found.' % unknown_revision_ ) @given(revisions()) def test_lookup_revision_multiple(self, revisions): actual_revisions = list(service.lookup_revision_multiple(revisions)) expected_revisions = [] for rev in revisions: expected_revisions.append(self.revision_get(rev)) self.assertEqual(actual_revisions, expected_revisions) def test_lookup_revision_multiple_none_found(self): unknown_revisions_ = [random_sha1(), random_sha1(), random_sha1()] actual_revisions = \ list(service.lookup_revision_multiple(unknown_revisions_)) self.assertEqual(actual_revisions, [None] * len(unknown_revisions_)) @given(revision()) def test_lookup_revision_log(self, revision): actual_revision_log = \ list(service.lookup_revision_log(revision, limit=25)) expected_revision_log = self.revision_log(revision, limit=25) self.assertEqual(actual_revision_log, expected_revision_log) def _get_origin_branches(self, origin): origin_visit = self.origin_visit_get(origin['url'])[-1] snapshot = self.snapshot_get(origin_visit['snapshot']) branches = {k: v for (k, v) in snapshot['branches'].items() if v['target_type'] == 'revision'} return branches @given(origin()) def test_lookup_revision_log_by(self, origin): branches = self._get_origin_branches(origin) branch_name = random.choice(list(branches.keys())) actual_log = \ list(service.lookup_revision_log_by(origin['url'], branch_name, None, limit=25)) expected_log = \ self.revision_log(branches[branch_name]['target'], limit=25) self.assertEqual(actual_log, expected_log) @given(origin()) def test_lookup_revision_log_by_notfound(self, origin): with self.assertRaises(NotFoundExc): service.lookup_revision_log_by( origin['url'], 'unknown_branch_name', None, limit=100) def test_lookup_content_raw_not_found(self): unknown_content_ = random_content() with self.assertRaises(NotFoundExc) as cm: service.lookup_content_raw('sha1:' + unknown_content_['sha1']) self.assertIn(cm.exception.args[0], 'Content with %s checksum equals to %s not found!' % ('sha1', unknown_content_['sha1'])) @given(content()) def test_lookup_content_raw(self, content): actual_content = service.lookup_content_raw( 'sha256:%s' % content['sha256']) expected_content = self.content_get(content['sha1']) self.assertEqual(actual_content, expected_content) def test_lookup_content_not_found(self): unknown_content_ = random_content() with self.assertRaises(NotFoundExc) as cm: service.lookup_content('sha1:%s' % unknown_content_['sha1']) self.assertIn(cm.exception.args[0], 'Content with %s checksum equals to %s not found!' % ('sha1', unknown_content_['sha1'])) @given(content()) def test_lookup_content_with_sha1(self, content): actual_content = service.lookup_content( 'sha1:%s' % content['sha1']) expected_content = self.content_get_metadata(content['sha1']) self.assertEqual(actual_content, expected_content) @given(content()) def test_lookup_content_with_sha256(self, content): actual_content = service.lookup_content( 'sha256:%s' % content['sha256']) expected_content = self.content_get_metadata(content['sha1']) self.assertEqual(actual_content, expected_content) def test_lookup_directory_bad_checksum(self): with self.assertRaises(BadInputExc): service.lookup_directory('directory_id') def test_lookup_directory_not_found(self): unknown_directory_ = random_sha1() with self.assertRaises(NotFoundExc) as cm: service.lookup_directory(unknown_directory_) self.assertIn('Directory with sha1_git %s not found' % unknown_directory_, cm.exception.args[0]) @given(directory()) def test_lookup_directory(self, directory): actual_directory_ls = list(service.lookup_directory( directory)) expected_directory_ls = self.directory_ls(directory) self.assertEqual(actual_directory_ls, expected_directory_ls) @given(empty_directory()) def test_lookup_directory_empty(self, empty_directory): actual_directory_ls = list(service.lookup_directory(empty_directory)) self.assertEqual(actual_directory_ls, []) @given(origin()) def test_lookup_revision_by_nothing_found(self, origin): with self.assertRaises(NotFoundExc): service.lookup_revision_by( origin['url'], 'invalid-branch-name') @given(origin()) def test_lookup_revision_by(self, origin): branches = self._get_origin_branches(origin) branch_name = random.choice(list(branches.keys())) actual_revision = \ service.lookup_revision_by(origin['url'], branch_name, None) expected_revision = \ self.revision_get(branches[branch_name]['target']) self.assertEqual(actual_revision, expected_revision) @given(origin(), revision()) def test_lookup_revision_with_context_by_ko(self, origin, revision): with self.assertRaises(NotFoundExc): service.lookup_revision_with_context_by(origin['url'], 'invalid-branch-name', None, revision) @given(origin()) def test_lookup_revision_with_context_by(self, origin): branches = self._get_origin_branches(origin) branch_name = random.choice(list(branches.keys())) root_rev = branches[branch_name]['target'] root_rev_log = self.revision_log(root_rev) children = defaultdict(list) for rev in root_rev_log: for rev_p in rev['parents']: children[rev_p].append(rev['id']) rev = root_rev_log[-1]['id'] actual_root_rev, actual_rev = service.lookup_revision_with_context_by( origin['url'], branch_name, None, rev) expected_root_rev = self.revision_get(root_rev) expected_rev = self.revision_get(rev) expected_rev['children'] = children[rev] self.assertEqual(actual_root_rev, expected_root_rev) self.assertEqual(actual_rev, expected_rev) def test_lookup_revision_through_ko_not_implemented(self): with self.assertRaises(NotImplementedError): service.lookup_revision_through({ 'something-unknown': 10, }) @given(origin()) def test_lookup_revision_through_with_context_by(self, origin): branches = self._get_origin_branches(origin) branch_name = random.choice(list(branches.keys())) root_rev = branches[branch_name]['target'] root_rev_log = self.revision_log(root_rev) rev = root_rev_log[-1]['id'] self.assertEqual(service.lookup_revision_through({ 'origin_url': origin['url'], 'branch_name': branch_name, 'ts': None, 'sha1_git': rev }), service.lookup_revision_with_context_by( origin['url'], branch_name, None, rev) ) @given(origin()) def test_lookup_revision_through_with_revision_by(self, origin): branches = self._get_origin_branches(origin) branch_name = random.choice(list(branches.keys())) self.assertEqual(service.lookup_revision_through({ 'origin_url': origin['url'], 'branch_name': branch_name, 'ts': None, }), service.lookup_revision_by( origin['url'], branch_name, None) ) @given(ancestor_revisions()) def test_lookup_revision_through_with_context(self, ancestor_revisions): sha1_git = ancestor_revisions['sha1_git'] sha1_git_root = ancestor_revisions['sha1_git_root'] self.assertEqual(service.lookup_revision_through({ 'sha1_git_root': sha1_git_root, 'sha1_git': sha1_git, }), service.lookup_revision_with_context( sha1_git_root, sha1_git) ) @given(revision()) def test_lookup_revision_through_with_revision(self, revision): self.assertEqual(service.lookup_revision_through({ 'sha1_git': revision }), service.lookup_revision(revision) ) @given(revision()) def test_lookup_directory_through_revision_ko_not_found(self, revision): with self.assertRaises(NotFoundExc): service.lookup_directory_through_revision( {'sha1_git': revision}, 'some/invalid/path') @given(revision()) def test_lookup_directory_through_revision_ok(self, revision): revision_data = self.revision_get(revision) dir_entries = [e for e in self.directory_ls(revision_data['directory']) if e['type'] == 'file'] dir_entry = random.choice(dir_entries) self.assertEqual( service.lookup_directory_through_revision({'sha1_git': revision}, dir_entry['name']), (revision, service.lookup_directory_with_revision( revision, dir_entry['name'])) ) @given(revision()) def test_lookup_directory_through_revision_ok_with_data(self, revision): revision_data = self.revision_get(revision) dir_entries = [e for e in self.directory_ls(revision_data['directory']) if e['type'] == 'file'] dir_entry = random.choice(dir_entries) self.assertEqual( service.lookup_directory_through_revision({'sha1_git': revision}, dir_entry['name'], with_data=True), (revision, service.lookup_directory_with_revision( revision, dir_entry['name'], with_data=True)) ) @pytest.mark.origin_id @given(new_origins(20)) def test_lookup_origins(self, new_origins): nb_origins = len(new_origins) expected_origins = self.storage.origin_add(new_origins) + expected_origins.sort(key=lambda orig: orig['id']) origin_from_idx = random.randint(1, nb_origins-1) - 1 origin_from = expected_origins[origin_from_idx]['id'] max_origin_idx = expected_origins[-1]['id'] origin_count = random.randint(1, max_origin_idx - origin_from) actual_origins = list(service.lookup_origins(origin_from, origin_count)) expected_origins = list(self.storage.origin_get_range(origin_from, origin_count)) self.assertEqual(actual_origins, expected_origins) diff --git a/swh/web/tests/data.py b/swh/web/tests/data.py index 876050e8..604e6940 100644 --- a/swh/web/tests/data.py +++ b/swh/web/tests/data.py @@ -1,469 +1,464 @@ # Copyright (C) 2018-2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information from copy import deepcopy import os import random from rest_framework.decorators import api_view from rest_framework.response import Response from swh.indexer.fossology_license import FossologyLicenseIndexer from swh.indexer.mimetype import MimetypeIndexer from swh.indexer.ctags import CtagsIndexer from swh.indexer.storage import get_indexer_storage from swh.model.from_disk import Directory from swh.model.hashutil import hash_to_hex, hash_to_bytes, DEFAULT_ALGORITHMS from swh.model.identifiers import directory_identifier from swh.loader.git.from_disk import GitLoaderFromArchive from swh.storage.algos.dir_iterators import dir_iterator from swh.web import config from swh.web.browse.utils import ( get_mimetype_and_encoding_for_content, prepare_content_for_display ) from swh.web.common import service from swh.web.common.highlightjs import get_hljs_language_from_filename # Module used to initialize data that will be provided as tests input # Configuration for git loader _TEST_LOADER_CONFIG = { 'storage': { 'cls': 'memory', 'args': {} }, 'send_contents': True, 'send_directories': True, 'send_revisions': True, 'send_releases': True, 'send_snapshot': True, 'content_size_limit': 100 * 1024 * 1024, 'content_packet_size': 10, 'content_packet_size_bytes': 100 * 1024 * 1024, 'directory_packet_size': 10, 'revision_packet_size': 10, 'release_packet_size': 10, 'save_data': False, } # Base content indexer configuration _TEST_INDEXER_BASE_CONFIG = { 'storage': { 'cls': 'memory', 'args': {}, }, 'objstorage': { 'cls': 'memory', 'args': {}, }, 'indexer_storage': { 'cls': 'memory', 'args': {}, } } def random_sha1(): return hash_to_hex(bytes(random.randint(0, 255) for _ in range(20))) def random_sha256(): return hash_to_hex(bytes(random.randint(0, 255) for _ in range(32))) def random_blake2s256(): return hash_to_hex(bytes(random.randint(0, 255) for _ in range(32))) def random_content(): return { 'sha1': random_sha1(), 'sha1_git': random_sha1(), 'sha256': random_sha256(), 'blake2s256': random_blake2s256(), } # MimetypeIndexer with custom configuration for tests class _MimetypeIndexer(MimetypeIndexer): def parse_config_file(self, *args, **kwargs): return { **_TEST_INDEXER_BASE_CONFIG, 'tools': { 'name': 'file', 'version': '1:5.30-1+deb9u1', 'configuration': { "type": "library", "debian-package": "python3-magic" } } } # FossologyLicenseIndexer with custom configuration for tests class _FossologyLicenseIndexer(FossologyLicenseIndexer): def parse_config_file(self, *args, **kwargs): return { **_TEST_INDEXER_BASE_CONFIG, 'workdir': '/tmp/swh/indexer.fossology.license', 'tools': { 'name': 'nomos', 'version': '3.1.0rc2-31-ga2cbb8c', 'configuration': { 'command_line': 'nomossa ', }, } } # CtagsIndexer with custom configuration for tests class _CtagsIndexer(CtagsIndexer): def parse_config_file(self, *args, **kwargs): return { **_TEST_INDEXER_BASE_CONFIG, 'workdir': '/tmp/swh/indexer.ctags', 'languages': {'c': 'c'}, 'tools': { 'name': 'universal-ctags', 'version': '~git7859817b', 'configuration': { 'command_line': '''ctags --fields=+lnz --sort=no --links=no ''' # noqa '''--output-format=json ''' }, } } # Lightweight git repositories that will be loaded to generate # input data for tests _TEST_ORIGINS = [ { 'type': 'git', 'url': 'https://github.com/wcoder/highlightjs-line-numbers.js', 'archives': ['highlightjs-line-numbers.js.zip', 'highlightjs-line-numbers.js_visit2.zip'], 'visit_date': ['Dec 1 2018, 01:00 UTC', 'Jan 20 2019, 15:00 UTC'] }, { 'type': 'git', 'url': 'https://github.com/memononen/libtess2', 'archives': ['libtess2.zip'], 'visit_date': ['May 25 2018, 01:00 UTC'] }, { 'type': 'git', 'url': 'repo_with_submodules', 'archives': ['repo_with_submodules.tgz'], 'visit_date': ['Jan 1 2019, 01:00 UTC'] } ] _contents = {} # Tests data initialization def _init_tests_data(): # Load git repositories from archives loader = GitLoaderFromArchive(config=_TEST_LOADER_CONFIG) # Get reference to the memory storage storage = loader.storage for origin in _TEST_ORIGINS: for i, archive in enumerate(origin['archives']): origin_repo_archive = \ os.path.join(os.path.dirname(__file__), 'resources/repos/%s' % archive) loader.load(origin['url'], origin_repo_archive, origin['visit_date'][i]) origin.update(storage.origin_get(origin)) # add an 'id' key if enabled contents = set() directories = set() revisions = set() releases = set() snapshots = set() - persons = set() content_path = {} # Get all objects loaded into the test archive for origin in _TEST_ORIGINS: snp = storage.snapshot_get_latest(origin['url']) snapshots.add(hash_to_hex(snp['id'])) for branch_name, branch_data in snp['branches'].items(): if branch_data['target_type'] == 'revision': revisions.add(branch_data['target']) elif branch_data['target_type'] == 'release': release = next(storage.release_get([branch_data['target']])) revisions.add(release['target']) releases.add(hash_to_hex(branch_data['target'])) - persons.add(release['author']['id']) for rev_log in storage.revision_shortlog(set(revisions)): rev_id = rev_log[0] revisions.add(rev_id) for rev in storage.revision_get(revisions): dir_id = rev['directory'] - persons.add(rev['author']['id']) - persons.add(rev['committer']['id']) directories.add(hash_to_hex(dir_id)) for entry in dir_iterator(storage, dir_id): content_path[entry['sha1']] = '/'.join( [hash_to_hex(dir_id), entry['path'].decode('utf-8')]) if entry['type'] == 'file': contents.add(entry['sha1']) elif entry['type'] == 'dir': directories.add(hash_to_hex(entry['target'])) # Get all checksums for each content contents_metadata = storage.content_get_metadata(contents) contents = [] for content_metadata in contents_metadata: contents.append({ algo: hash_to_hex(content_metadata[algo]) for algo in DEFAULT_ALGORITHMS }) path = content_path[content_metadata['sha1']] cnt = next(storage.content_get([content_metadata['sha1']])) mimetype, encoding = get_mimetype_and_encoding_for_content(cnt['data']) content_display_data = prepare_content_for_display( cnt['data'], mimetype, path) contents[-1]['path'] = path contents[-1]['mimetype'] = mimetype contents[-1]['encoding'] = encoding contents[-1]['hljs_language'] = content_display_data['language'] contents[-1]['data'] = content_display_data['content_data'] _contents[contents[-1]['sha1']] = contents[-1] # Create indexer storage instance that will be shared by indexers idx_storage = get_indexer_storage('memory', {}) # Add the empty directory to the test archive empty_dir_id = directory_identifier({'entries': []}) empty_dir_id_bin = hash_to_bytes(empty_dir_id) storage.directory_add([{'id': empty_dir_id_bin, 'entries': []}]) # Return tests data return { 'storage': storage, 'idx_storage': idx_storage, 'origins': _TEST_ORIGINS, 'contents': contents, 'directories': list(directories), - 'persons': list(persons), 'releases': list(releases), 'revisions': list(map(hash_to_hex, revisions)), 'snapshots': list(snapshots), 'generated_checksums': set(), } def _init_indexers(tests_data): # Instantiate content indexers that will be used in tests # and force them to use the memory storages indexers = {} for idx_name, idx_class in (('mimetype_indexer', _MimetypeIndexer), ('license_indexer', _FossologyLicenseIndexer), ('ctags_indexer', _CtagsIndexer)): idx = idx_class() idx.storage = tests_data['storage'] idx.objstorage = tests_data['storage'].objstorage idx.idx_storage = tests_data['idx_storage'] idx.register_tools(idx.config['tools']) indexers[idx_name] = idx return indexers def get_content(content_sha1): return _contents.get(content_sha1) _tests_data = None _current_tests_data = None _indexer_loggers = {} def get_tests_data(reset=False): """ Initialize tests data and return them in a dict. """ global _tests_data, _current_tests_data if _tests_data is None: _tests_data = _init_tests_data() indexers = _init_indexers(_tests_data) for (name, idx) in indexers.items(): # pytest makes the loggers use a temporary file; and deepcopy # requires serializability. So we remove them, and add them # back after the copy. _indexer_loggers[name] = idx.log del idx.log _tests_data.update(indexers) if reset or _current_tests_data is None: _current_tests_data = deepcopy(_tests_data) for (name, logger) in _indexer_loggers.items(): _current_tests_data[name].log = logger return _current_tests_data def override_storages(storage, idx_storage): """ Helper function to replace the storages from which archive data are fetched. """ swh_config = config.get_config() swh_config.update({'storage': storage}) service.storage = storage swh_config.update({'indexer_storage': idx_storage}) service.idx_storage = idx_storage # Implement some special endpoints used to provide input tests data # when executing end to end tests with cypress _content_code_data_exts = {} _content_code_data_filenames = {} _content_other_data_exts = {} def _init_content_tests_data(data_path, data_dict, ext_key): """ Helper function to read the content of a directory, store it into a test archive and add some files metadata (sha1 and/or expected programming language) in a dict. Args: data_path (str): path to a directory relative to the tests folder of swh-web data_dict (dict): the dict that will store files metadata ext_key (bool): whether to use file extensions or filenames as dict keys """ test_contents_dir = os.path.join( os.path.dirname(__file__), data_path).encode('utf-8') directory = Directory.from_disk(path=test_contents_dir, data=True, save_path=True) objects = directory.collect() for c in objects['content'].values(): c['status'] = 'visible' sha1 = hash_to_hex(c['sha1']) if ext_key: key = c['path'].decode('utf-8').split('.')[-1] filename = 'test.' + key else: filename = c['path'].decode('utf-8').split('/')[-1] key = filename language = get_hljs_language_from_filename(filename) data_dict[key] = {'sha1': sha1, 'language': language} storage = get_tests_data()['storage'] storage.content_add(objects['content'].values()) def _init_content_code_data_exts(): """ Fill a global dictionary which maps source file extension to a code content example. """ global _content_code_data_exts _init_content_tests_data('resources/contents/code/extensions', _content_code_data_exts, True) def _init_content_other_data_exts(): """ Fill a global dictionary which maps a file extension to a content example. """ global _content_other_data_exts _init_content_tests_data('resources/contents/other/extensions', _content_other_data_exts, True) def _init_content_code_data_filenames(): """ Fill a global dictionary which maps a filename to a content example. """ global _content_code_data_filenames _init_content_tests_data('resources/contents/code/filenames', _content_code_data_filenames, False) if config.get_config()['e2e_tests_mode']: _init_content_code_data_exts() _init_content_other_data_exts() _init_content_code_data_filenames() @api_view(['GET']) def get_content_code_data_all_exts(request): """ Endpoint implementation returning a list of all source file extensions to test for highlighting using cypress. """ return Response(sorted(_content_code_data_exts.keys()), status=200, content_type='application/json') @api_view(['GET']) def get_content_code_data_by_ext(request, ext): """ Endpoint implementation returning metadata of a code content example based on the source file extension. """ data = None status = 404 if ext in _content_code_data_exts: data = _content_code_data_exts[ext] status = 200 return Response(data, status=status, content_type='application/json') @api_view(['GET']) def get_content_other_data_by_ext(request, ext): """ Endpoint implementation returning metadata of a content example based on the file extension. """ _init_content_other_data_exts() data = None status = 404 if ext in _content_other_data_exts: data = _content_other_data_exts[ext] status = 200 return Response(data, status=status, content_type='application/json') @api_view(['GET']) def get_content_code_data_all_filenames(request): """ Endpoint implementation returning a list of all source filenames to test for highlighting using cypress. """ return Response(sorted(_content_code_data_filenames.keys()), status=200, content_type='application/json') @api_view(['GET']) def get_content_code_data_by_filename(request, filename): """ Endpoint implementation returning metadata of a code content example based on the source filename. """ data = None status = 404 if filename in _content_code_data_filenames: data = _content_code_data_filenames[filename] status = 200 return Response(data, status=status, content_type='application/json') diff --git a/swh/web/tests/strategies.py b/swh/web/tests/strategies.py index e79874dc..74d9d28b 100644 --- a/swh/web/tests/strategies.py +++ b/swh/web/tests/strategies.py @@ -1,536 +1,520 @@ # Copyright (C) 2018-2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information import random from collections import defaultdict from datetime import datetime from hypothesis import settings, assume from hypothesis.strategies import ( just, sampled_from, lists, composite, datetimes, integers, binary, text, characters ) from swh.model.hashutil import hash_to_hex, hash_to_bytes from swh.model.identifiers import directory_identifier from swh.storage.algos.revisions_walker import get_revisions_walker from swh.model.hypothesis_strategies import ( origins as new_origin_strategy, snapshots as new_snapshot ) from swh.web.tests.data import get_tests_data # Module dedicated to the generation of input data for tests through # the use of hypothesis. # Some of these data are sampled from a test archive created and populated # in the swh.web.tests.data module. # Set the swh-web hypothesis profile if none has been explicitly set hypothesis_default_settings = settings.get_profile('default') if repr(settings()) == repr(hypothesis_default_settings): settings.load_profile('swh-web') # The following strategies exploit the hypothesis capabilities def _filter_checksum(cs): generated_checksums = get_tests_data()['generated_checksums'] if not int.from_bytes(cs, byteorder='little') or \ cs in generated_checksums: return False generated_checksums.add(cs) return True def _known_swh_object(object_type): return sampled_from(get_tests_data()[object_type]) def sha1(): """ Hypothesis strategy returning a valid hexadecimal sha1 value. """ return binary( min_size=20, max_size=20).filter(_filter_checksum).map(hash_to_hex) def invalid_sha1(): """ Hypothesis strategy returning an invalid sha1 representation. """ return binary( min_size=50, max_size=50).filter(_filter_checksum).map(hash_to_hex) def sha256(): """ Hypothesis strategy returning a valid hexadecimal sha256 value. """ return binary( min_size=32, max_size=32).filter(_filter_checksum).map(hash_to_hex) def content(): """ Hypothesis strategy returning a random content ingested into the test archive. """ return _known_swh_object('contents') def contents(): """ Hypothesis strategy returning random contents ingested into the test archive. """ return lists(content(), min_size=2, max_size=8) def content_text(): """ Hypothesis strategy returning random textual contents ingested into the test archive. """ return content().filter(lambda c: c['mimetype'].startswith('text/')) def content_text_non_utf8(): """ Hypothesis strategy returning random textual contents not encoded to UTF-8 ingested into the test archive. """ return content().filter(lambda c: c['mimetype'].startswith('text/') and c['encoding'] not in ('utf-8', 'us-ascii')) def content_text_no_highlight(): """ Hypothesis strategy returning random textual contents with no detected programming language to highlight ingested into the test archive. """ return content().filter(lambda c: c['mimetype'].startswith('text/') and c['hljs_language'] == 'nohighlight') def content_image_type(): """ Hypothesis strategy returning random image contents ingested into the test archive. """ return content().filter(lambda c: c['mimetype'].startswith('image/')) @composite def new_content(draw): blake2s256_hex = draw(sha256()) sha1_hex = draw(sha1()) sha1_git_hex = draw(sha1()) sha256_hex = draw(sha256()) assume(sha1_hex != sha1_git_hex) assume(blake2s256_hex != sha256_hex) return { 'blake2S256': blake2s256_hex, 'sha1': sha1_hex, 'sha1_git': sha1_git_hex, 'sha256': sha256_hex } def unknown_content(): """ Hypothesis strategy returning a random content not ingested into the test archive. """ return new_content().filter( lambda c: next(get_tests_data()['storage'].content_get( [hash_to_bytes(c['sha1'])])) is None) def unknown_contents(): """ Hypothesis strategy returning random contents not ingested into the test archive. """ return lists(unknown_content(), min_size=2, max_size=8) def directory(): """ Hypothesis strategy returning a random directory ingested into the test archive. """ return _known_swh_object('directories') def directory_with_subdirs(): """ Hypothesis strategy returning a random directory containing sub directories ingested into the test archive. """ storage = get_tests_data()['storage'] return directory().filter( lambda d: any([e['type'] == 'dir' for e in list(storage.directory_ls(hash_to_bytes(d)))])) def empty_directory(): """ Hypothesis strategy returning the empty directory ingested into the test archive. """ return just(directory_identifier({'entries': []})) def unknown_directory(): """ Hypothesis strategy returning a random directory not ingested into the test archive. """ storage = get_tests_data()['storage'] return sha1().filter( lambda s: len(list(storage.directory_missing([hash_to_bytes(s)]))) > 0) def origin(): """ Hypothesis strategy returning a random origin ingested into the test archive. """ return _known_swh_object('origins') def origin_with_multiple_visits(): """ Hypothesis strategy returning a random origin ingested into the test archive. """ ret = [] tests_data = get_tests_data() for origin in tests_data['origins']: visits = list(tests_data['storage'].origin_visit_get(origin['url'])) if len(visits) > 1: ret.append(origin) return sampled_from(ret) def origin_with_release(): """ Hypothesis strategy returning a random origin ingested into the test archive. """ ret = [] tests_data = get_tests_data() for origin in tests_data['origins']: snapshot = tests_data['storage'].snapshot_get_latest(origin['url']) if any([b['target_type'] == 'release' for b in snapshot['branches'].values()]): ret.append(origin) return sampled_from(ret) def unknown_origin_id(): """ Hypothesis strategy returning a random origin id not ingested into the test archive. """ return integers(min_value=1000000) def new_origin(): """ Hypothesis strategy returning a random origin not ingested into the test archive. """ storage = get_tests_data()['storage'] return new_origin_strategy().map(lambda origin: origin.to_dict()).filter( lambda origin: storage.origin_get([origin])[0] is None) def new_origins(nb_origins=None): """ Hypothesis strategy returning random origins not ingested into the test archive. """ min_size = nb_origins if nb_origins is not None else 2 max_size = nb_origins if nb_origins is not None else 8 size = random.randint(min_size, max_size) return lists(new_origin(), min_size=size, max_size=size, unique_by=lambda o: tuple(sorted(o.items()))) def visit_dates(nb_dates=None): """ Hypothesis strategy returning a list of visit dates. """ min_size = nb_dates if nb_dates else 2 max_size = nb_dates if nb_dates else 8 return lists(datetimes(min_value=datetime(2015, 1, 1, 0, 0), max_value=datetime(2018, 12, 31, 0, 0)), min_size=min_size, max_size=max_size, unique=True).map(sorted) def release(): """ Hypothesis strategy returning a random release ingested into the test archive. """ return _known_swh_object('releases') def unknown_release(): """ Hypothesis strategy returning a random revision not ingested into the test archive. """ return sha1().filter( lambda s: next(get_tests_data()['storage'].release_get([s])) is None) def revision(): """ Hypothesis strategy returning a random revision ingested into the test archive. """ return _known_swh_object('revisions') def unknown_revision(): """ Hypothesis strategy returning a random revision not ingested into the test archive. """ storage = get_tests_data()['storage'] return sha1().filter( lambda s: next(storage.revision_get([hash_to_bytes(s)])) is None) @composite def new_person(draw): """ Hypothesis strategy returning random raw swh person data. """ name = draw(text(min_size=5, max_size=30, alphabet=characters(min_codepoint=0, max_codepoint=255))) email = '%s@company.org' % name return { 'name': name.encode(), 'email': email.encode(), 'fullname': ('%s <%s>' % (name, email)).encode() } @composite def new_swh_date(draw): """ Hypothesis strategy returning random raw swh date data. """ timestamp = draw( datetimes(min_value=datetime(2015, 1, 1, 0, 0), max_value=datetime(2018, 12, 31, 0, 0)).map( lambda d: int(d.timestamp()))) return { 'timestamp': timestamp, 'offset': 0, 'negative_utc': False, } @composite def new_revision(draw): """ Hypothesis strategy returning random raw swh revision data not ingested into the test archive. """ return { 'id': draw(unknown_revision().map(hash_to_bytes)), 'directory': draw(sha1().map(hash_to_bytes)), 'author': draw(new_person()), 'committer': draw(new_person()), 'message': draw( text(min_size=20, max_size=100).map(lambda t: t.encode())), 'date': draw(new_swh_date()), 'committer_date': draw(new_swh_date()), 'synthetic': False, 'type': 'git', 'parents': [], 'metadata': [], } def revisions(min_size=2, max_size=8): """ Hypothesis strategy returning random revisions ingested into the test archive. """ return lists(revision(), min_size=min_size, max_size=max_size) def unknown_revisions(min_size=2, max_size=8): """ Hypothesis strategy returning random revisions not ingested into the test archive. """ return lists(unknown_revision(), min_size=min_size, max_size=max_size) def snapshot(): """ Hypothesis strategy returning a random snapshot ingested into the test archive. """ return _known_swh_object('snapshots') def new_snapshots(nb_snapshots=None): min_size = nb_snapshots if nb_snapshots else 2 max_size = nb_snapshots if nb_snapshots else 8 return lists(new_snapshot(min_size=2, max_size=10, only_objects=True) .map(lambda snp: snp.to_dict()), min_size=min_size, max_size=max_size) def unknown_snapshot(): """ Hypothesis strategy returning a random revision not ingested into the test archive. """ storage = get_tests_data()['storage'] return sha1().filter( lambda s: storage.snapshot_get(hash_to_bytes(s)) is None) -def person(): - """ - Hypothesis strategy returning a random person ingested - into the test archive. - """ - return _known_swh_object('persons') - - -def unknown_person(): - """ - Hypothesis strategy returning a random person not ingested - into the test archive. - """ - return integers(min_value=1000000) - - def _get_origin_dfs_revisions_walker(): tests_data = get_tests_data() storage = tests_data['storage'] origin = random.choice(tests_data['origins'][:-1]) snapshot = storage.snapshot_get_latest(origin['url']) head = snapshot['branches'][b'HEAD']['target'] return get_revisions_walker('dfs', storage, head) def ancestor_revisions(): """ Hypothesis strategy returning a pair of revisions ingested into the test archive with an ancestor relation. """ # get a dfs revisions walker for one of the origins # loaded into the test archive revisions_walker = _get_origin_dfs_revisions_walker() master_revisions = [] children = defaultdict(list) init_rev_found = False # get revisions only authored in the master branch for rev in revisions_walker: for rev_p in rev['parents']: children[rev_p].append(rev['id']) if not init_rev_found: master_revisions.append(rev) if not rev['parents']: init_rev_found = True # head revision root_rev = master_revisions[0] # pick a random revision, different from head, only authored # in the master branch ancestor_rev_idx = random.choice(list(range(1, len(master_revisions)-1))) ancestor_rev = master_revisions[ancestor_rev_idx] ancestor_child_revs = children[ancestor_rev['id']] return just({ 'sha1_git_root': hash_to_hex(root_rev['id']), 'sha1_git': hash_to_hex(ancestor_rev['id']), 'children': [hash_to_hex(r) for r in ancestor_child_revs] }) def non_ancestor_revisions(): """ Hypothesis strategy returning a pair of revisions ingested into the test archive with no ancestor relation. """ # get a dfs revisions walker for one of the origins # loaded into the test archive revisions_walker = _get_origin_dfs_revisions_walker() merge_revs = [] children = defaultdict(list) # get all merge revisions for rev in revisions_walker: if len(rev['parents']) > 1: merge_revs.append(rev) for rev_p in rev['parents']: children[rev_p].append(rev['id']) # find a merge revisions whose parents have a unique child revision random.shuffle(merge_revs) selected_revs = None for merge_rev in merge_revs: if all(len(children[rev_p]) == 1 for rev_p in merge_rev['parents']): selected_revs = merge_rev['parents'] return just({ 'sha1_git_root': hash_to_hex(selected_revs[0]), 'sha1_git': hash_to_hex(selected_revs[1]) }) # The following strategies returns data specific to some tests # that can not be generated and thus are hardcoded. def contents_with_ctags(): """ Hypothesis strategy returning contents ingested into the test archive. Those contents are ctags compatible, that is running ctags on those lay results. """ return just({ 'sha1s': ['0ab37c02043ebff946c1937523f60aadd0844351', '15554cf7608dde6bfefac7e3d525596343a85b6f', '2ce837f1489bdfb8faf3ebcc7e72421b5bea83bd', '30acd0b47fc25e159e27a980102ddb1c4bea0b95', '4f81f05aaea3efb981f9d90144f746d6b682285b', '5153aa4b6e4455a62525bc4de38ed0ff6e7dd682', '59d08bafa6a749110dfb65ba43a61963d5a5bf9f', '7568285b2d7f31ae483ae71617bd3db873deaa2c', '7ed3ee8e94ac52ba983dd7690bdc9ab7618247b4', '8ed7ef2e7ff9ed845e10259d08e4145f1b3b5b03', '9b3557f1ab4111c8607a4f2ea3c1e53c6992916c', '9c20da07ed14dc4fcd3ca2b055af99b2598d8bdd', 'c20ceebd6ec6f7a19b5c3aebc512a12fbdc9234b', 'e89e55a12def4cd54d5bff58378a3b5119878eb7', 'e8c0654fe2d75ecd7e0b01bee8a8fc60a130097e', 'eb6595e559a1d34a2b41e8d4835e0e4f98a5d2b5'], 'symbol_name': 'ABS' }) def revision_with_submodules(): """ Hypothesis strategy returning a revision that is known to point to a directory with revision entries (aka git submodule) """ return just({ 'rev_sha1_git': 'ffcb69001f3f6745dfd5b48f72ab6addb560e234', 'rev_dir_sha1_git': 'd92a21446387fa28410e5a74379c934298f39ae2', 'rev_dir_rev_path': 'libtess2' }) diff --git a/version.txt b/version.txt index d49ae8fc..984197b9 100644 --- a/version.txt +++ b/version.txt @@ -1 +1 @@ -v0.0.208-0-g0c348aa8 \ No newline at end of file +v0.0.209-0-g0edb017f \ No newline at end of file