diff --git a/docs/uri-scheme-browse-origin.rst b/docs/uri-scheme-browse-origin.rst index eecc8ddf..b299e223 100644 --- a/docs/uri-scheme-browse-origin.rst +++ b/docs/uri-scheme-browse-origin.rst @@ -1,861 +1,848 @@ Origin ^^^^^^ This describes the URI scheme when one wants to browse the Software Heritage archive in the context of an origin (for instance, a repository crawled from GitHub or a Debian source package). All the views pointed by that scheme offer quick links to browse objects as found during the associated crawls performed by Software Heritage: * the root directory of the origin * the list of branches of the origin * the list of releases of the origin Origin visits """"""""""""" .. http:get:: /browse/origin/visits/ HTML view that displays visits reporting for a software origin identified by its type and url. :query string origin_url: mandatory parameter providing the url of the origin (e.g. https://github.com/(user)/(repo)) :statuscode 200: no error :statuscode 400: no origin url has been provided as parameter :statuscode 404: requested origin can not be found in the archive **Examples:** .. parsed-literal:: :swh_web_browse:`origin/visits/?origin_url=https://github.com/torvalds/linux` :swh_web_browse:`origin/visits/?origin_url=https://github.com/python/cpython` :swh_web_browse:`origin/visits/?origin_url=deb://Debian-Security/packages/mediawiki` :swh_web_browse:`origin/visits/?origin_url=https://gitorious.org/qt/qtbase.git` .. http:get:: /browse/origin/(origin_url)/visits/ :deprecated: .. warning:: That endpoint is deprecated, use :http:get:`/browse/origin/visits/` instead. HTML view that displays a visits reporting for a software origin identified by its type and url. :param string origin_url: the url of the origin (e.g. https://github.com/(user)/(repo)/) :statuscode 200: no error :statuscode 404: requested origin can not be found in the archive **Examples:** .. parsed-literal:: :swh_web_browse:`origin/https://github.com/torvalds/linux/visits/` :swh_web_browse:`origin/https://github.com/python/cpython/visits/` :swh_web_browse:`origin/deb://Debian-Security/packages/mediawiki/visits/` :swh_web_browse:`origin/https://gitorious.org/qt/qtbase.git/visits/` Origin directory """""""""""""""" .. http:get:: /browse/origin/directory/ HTML view for browsing the content of a directory reachable from the root directory (including itself) associated to the latest full visit of a software origin. The content of the directory is first sorted in lexicographical order and the sub-directories are displayed before the regular files. The view enables to navigate from the requested directory to directories reachable from it in a recursive way but also up to the origin root directory. A breadcrumb located in the top part of the view allows to keep track of the paths navigated so far. The view also enables to easily switch between the origin branches and releases through a dropdown menu. The origin branch (default to HEAD) from which to retrieve the directory content can also be specified by using the branch query parameter. :query string origin_url: mandatory parameter providing the url of the origin (e.g. https://github.com/(user)/(repo)) :query string path: optional parameter used to specify the path of a directory reachable from the origin root one :query string branch: specify the origin branch name from which to retrieve the root directory :query string release: specify the origin release name from which to retrieve the root directory :query string revision: specify the origin revision, identified by the hexadecimal representation of its **sha1_git** value, from which to retrieve the root directory - :query string timestamp: a date string (any format parsable by `dateutil.parser.parse`_) - or Unix timestamp to parse in order to find the closest visit. + :query string timestamp: an ISO 8601 datetime string to parse in order to find the + closest visit. :query int visit_id: specify a visit id to retrieve the directory from instead of using the latest full visit by default :statuscode 200: no error :statuscode 400: no origin url has been provided as parameter :statuscode 404: requested origin can not be found in the archive or the provided path does not exist from the origin root directory **Examples:** .. parsed-literal:: :swh_web_browse:`origin/directory/?origin_url=https://github.com/torvalds/linux` :swh_web_browse:`origin/directory/?origin_url=https://github.com/torvalds/linux&path=net/ethernet` :swh_web_browse:`origin/directory/?origin_url=https://github.com/python/cpython` :swh_web_browse:`origin/directory/?origin_url=https://github.com/python/cpython&path=Python` :swh_web_browse:`origin/directory/?origin_url=https://github.com/python/cpython&branch=refs/heads/2.7` - :swh_web_browse:`origin/directory/?origin_url=https://github.com/torvalds/linux×tamp=1493926809` :swh_web_browse:`origin/directory/?origin_url=https://github.com/torvalds/linux&path=net/ethernet×tamp=2016-09-14T10:36:21Z` - :swh_web_browse:`origin/directory/?origin_url=https://github.com/python/cpython×tamp=1474620651` :swh_web_browse:`origin/directory/?origin_url=https://github.com/python/cpython&path=Python×tamp=2017-05-05` :swh_web_browse:`origin/directory/?origin_url=https://github.com/python/cpython&branch=refs/heads/2.7×tamp=2015-08` .. http:get:: /browse/origin/(origin_url)/directory/[(path)/] :deprecated: .. warning:: That endpoint is deprecated, use :http:get:`/browse/origin/directory/` instead. HTML view for browsing the content of a directory reachable from the root directory (including itself) associated to the latest full visit of a software origin. The content of the directory is first sorted in lexicographical order and the sub-directories are displayed before the regular files. The view enables to navigate from the requested directory to directories reachable from it in a recursive way but also up to the origin root directory. A breadcrumb located in the top part of the view allows to keep track of the paths navigated so far. The view also enables to easily switch between the origin branches and releases through a dropdown menu. The origin branch (default to HEAD) from which to retrieve the directory content can also be specified by using the branch query parameter. :param string origin_url: the url of the origin (e.g. https://github.com/(user)/(repo)/) :param string path: optional parameter used to specify the path of a directory reachable from the origin root one :query string branch: specify the origin branch name from which to retrieve the root directory :query string release: specify the origin release name from which to retrieve the root directory :query string revision: specify the origin revision, identified by the hexadecimal representation of its **sha1_git** value, from which to retrieve the root directory :query int visit_id: specify a visit id to retrieve the directory from instead of using the latest full visit by default :statuscode 200: no error :statuscode 404: requested origin can not be found in the archive or the provided path does not exist from the origin root directory **Examples:** .. parsed-literal:: :swh_web_browse:`origin/https://github.com/torvalds/linux/directory/` :swh_web_browse:`origin/https://github.com/torvalds/linux/directory/net/ethernet/` :swh_web_browse:`origin/https://github.com/python/cpython/directory/` :swh_web_browse:`origin/https://github.com/python/cpython/directory/Python/` :swh_web_browse:`origin/https://github.com/python/cpython/directory/?branch=refs/heads/2.7` .. http:get:: /browse/origin/(origin_url)/visit/(timestamp)/directory/[(path)/] :deprecated: .. warning:: That endpoint is deprecated, use :http:get:`/browse/origin/directory/` instead. HTML view for browsing the content of a directory reachable from the root directory (including itself) associated to a visit of a software origin closest to a provided timestamp. The content of the directory is first sorted in lexicographical order and the sub-directories are displayed before the regular files. The view enables to navigate from the requested directory to directories reachable from it in a recursive way but also up to the origin root directory. A breadcrumb located in the top part of the view allows to keep track of the paths navigated so far. The view also enables to easily switch between the origin branches and releases through a dropdown menu. The origin branch (default to HEAD) from which to retrieve the directory content can also be specified by using the branch query parameter. :param string origin_url: the url of the origin (e.g. https://github.com/(user)/(repo)/) - :param string timestamp: a date string (any format parsable by `dateutil.parser.parse`_) - or Unix timestamp to parse in order to find the closest visit. + :param string timestamp: an ISO 8601 datetime string to parse in order to find the + closest visit. :param path: optional parameter used to specify the path of a directory reachable from the origin root one :type path: string :query string branch: specify the origin branch name from which to retrieve the root directory :query string release: specify the origin release name from which to retrieve the root directory :query string revision: specify the origin revision, identified by the hexadecimal representation of its **sha1_git** value, from which to retrieve the directory :query int visit_id: specify a visit id to retrieve the directory from instead of using the provided timestamp :statuscode 200: no error :statuscode 404: requested origin can not be found in the archive, requested visit timestamp does not exist or the provided path does not exist from the origin root directory **Examples:** .. parsed-literal:: :swh_web_browse:`origin/https://github.com/torvalds/linux/visit/1493926809/directory/` :swh_web_browse:`origin/https://github.com/torvalds/linux/visit/2016-09-14T10:36:21Z/directory/net/ethernet/` :swh_web_browse:`origin/https://github.com/python/cpython/visit/1474620651/directory/` :swh_web_browse:`origin/https://github.com/python/cpython/visit/2017-05-05/directory/Python/` :swh_web_browse:`origin/https://github.com/python/cpython/visit/2015-08/directory/?branch=refs/heads/2.7` Origin content """""""""""""" .. http:get:: /browse/origin/content/ HTML view that produces a display of a content associated to the latest full visit of a software origin. If the content to display is textual, it will be highlighted client-side if possible using highlightjs_. The procedure to perform that task is described in :http:get:`/browse/content/[(algo_hash):](hash)/`. It is also possible to highlight specific lines of a textual content (not in terms of syntax highlighting but to emphasize some relevant content part) by either: * clicking on line numbers (holding shift to highlight a lines range) * using an url fragment in the form '#Ln' or '#Lm-Ln' The view displays a breadcrumb on top of the rendered content in order to easily navigate up to the origin root directory. The view also enables to easily switch between the origin branches and releases through a dropdown menu. The origin branch (default to HEAD) from which to retrieve the content can also be specified by using the branch query parameter. :query string origin_url: mandatory parameter providing the url of the origin (e.g. https://github.com/(user)/(repo)) :query string path: path of a content reachable from the origin root directory :query string branch: specify the origin branch name from which to retrieve the content :query string release: specify the origin release name from which to retrieve the content :query string revision: specify the origin revision, identified by the hexadecimal representation of its **sha1_git** value, from which to retrieve the content - :query string timestamp: a date string (any format parsable by `dateutil.parser.parse`_) - or Unix timestamp to parse in order to find the closest visit. + :query string timestamp: an ISO 8601 datetime string to parse in order to find the + closest visit. :query int visit_id: specify a visit id to retrieve the content from instead of using the latest full visit by default :statuscode 200: no error :statuscode 400: no origin url has been provided as parameter :statuscode 404: requested origin can not be found in the archive, or the provided content path does not exist from the origin root directory **Examples:** .. parsed-literal:: :swh_web_browse:`origin/content/?origin_url=https://github.com/git/git?path=git.c` :swh_web_browse:`origin/content/?origin_url=https://github.com/mozilla/gecko-dev&path=js/src/json.cpp` :swh_web_browse:`origin/content/?origin_url=https://github.com/git/git?path=git.c&branch=refs/heads/next` - :swh_web_browse:`origin/content/?origin_url=https://github.com/git/git&path=git.c×tamp=1473933564` :swh_web_browse:`origin/content/?origin_url=https://github.com/git/git&path=git.c×tamp=2016-05-05T00:0:00+00:00Z` - :swh_web_browse:`origin/content/?origin_url=https://github.com/mozilla/gecko-dev&path=js/src/json.cpp×tamp=1490126182` :swh_web_browse:`origin/content/?origin_url=https://github.com/mozilla/gecko-dev&path=js/src/json.cpp×tamp=2017-03-21#L904-L931` :swh_web_browse:`origin/content/?origin_url=https://github.com/git/git&path=git.c&branch=refs/heads/next×tamp=2017-09-15` .. http:get:: /browse/origin/(origin_url)/content/ :deprecated: .. warning:: That endpoint is deprecated, use :http:get:`/browse/origin/content/` instead. HTML view that produces a display of a content associated to the latest full visit of a software origin. If the content to display is textual, it will be highlighted client-side if possible using highlightjs_. The procedure to perform that task is described in :http:get:`/browse/content/[(algo_hash):](hash)/`. It is also possible to highlight specific lines of a textual content (not in terms of syntax highlighting but to emphasize some relevant content part) by either: * clicking on line numbers (holding shift to highlight a lines range) * using an url fragment in the form '#Ln' or '#Lm-Ln' The view displays a breadcrumb on top of the rendered content in order to easily navigate up to the origin root directory. The view also enables to easily switch between the origin branches and releases through a dropdown menu. The origin branch (default to HEAD) from which to retrieve the content can also be specified by using the branch query parameter. :param string origin_url: the url of the origin (e.g. https://github.com/(user)/(repo)/) :query string path: path of a content reachable from the origin root directory :query string branch: specify the origin branch name from which to retrieve the content :query string release: specify the origin release name from which to retrieve the content :query string revision: specify the origin revision, identified by the hexadecimal representation of its **sha1_git** value, from which to retrieve the content - :query string timestamp: a date string (any format parsable by `dateutil.parser.parse`_) - or Unix timestamp to parse in order to find the closest visit. + :query string timestamp: an ISO 8601 datetime string to parse in order to find the + closest visit. :query int visit_id: specify a visit id to retrieve the content from instead of using the latest full visit by default :statuscode 200: no error :statuscode 400: no origin url has been provided as parameter :statuscode 404: requested origin can not be found in the archive, or the provided content path does not exist from the origin root directory **Examples:** .. parsed-literal:: :swh_web_browse:`origin/https://github.com/git/git/content/?path=git.c` :swh_web_browse:`origin/https://github.com/mozilla/gecko-dev/content/?path=js/src/json.cpp` :swh_web_browse:`origin/https://github.com/git/git/content/?path=git.c&branch=refs/heads/next` - :swh_web_browse:`origin/https://github.com/git/git/content/?path=git.c×tamp=1473933564` :swh_web_browse:`origin/https://github.com/git/git/content/?path=git.c×tamp=2016-05-05T00:0:00+00:00Z` - :swh_web_browse:`origin/https://github.com/mozilla/gecko-dev/content?path=js/src/json.cpp×tamp=1490126182` :swh_web_browse:`origin/https://github.com/mozilla/gecko-dev/content?path=js/src/json.cpp×tamp=2017-03-21#L904-L931` :swh_web_browse:`origin/https://github.com/git/git/content/git.c/?branch=refs/heads/next×tamp=2017-09-15` .. http:get:: /browse/origin/(origin_url)/content/(path)/ :deprecated: .. warning:: That endpoint is deprecated, use :http:get:`/browse/origin/content/` instead. HTML view that produces a display of a content associated to the latest full visit of a software origin. If the content to display is textual, it will be highlighted client-side if possible using highlightjs_. The procedure to perform that task is described in :http:get:`/browse/content/[(algo_hash):](hash)/`. It is also possible to highlight specific lines of a textual content (not in terms of syntax highlighting but to emphasize some relevant content part) by either: * clicking on line numbers (holding shift to highlight a lines range) * using an url fragment in the form '#Ln' or '#Lm-Ln' The view displays a breadcrumb on top of the rendered content in order to easily navigate up to the origin root directory. The view also enables to easily switch between the origin branches and releases through a dropdown menu. The origin branch (default to HEAD) from which to retrieve the content can also be specified by using the branch query parameter. :param string origin_url: the url of the origin (e.g. https://github.com/(user)/(repo)/) :param string path: path of a content reachable from the origin root directory :query string branch: specify the origin branch name from which to retrieve the content :query string release: specify the origin release name from which to retrieve the content :query string revision: specify the origin revision, identified by the hexadecimal representation of its **sha1_git** value, from which to retrieve the content :query int visit_id: specify a visit id to retrieve the content from instead of using the latest full visit by default :statuscode 200: no error :statuscode 404: requested origin can not be found in the archive, or the provided content path does not exist from the origin root directory **Examples:** .. parsed-literal:: :swh_web_browse:`origin/https://github.com/git/git/content/git.c/` :swh_web_browse:`origin/https://github.com/git/git/content/git.c/` :swh_web_browse:`origin/https://github.com/mozilla/gecko-dev/content/js/src/json.cpp/` :swh_web_browse:`origin/https://github.com/git/git/content/git.c/?branch=refs/heads/next` .. http:get:: /browse/origin/(origin_url)/visit/(timestamp)/content/(path)/ :deprecated: .. warning:: That endpoint is deprecated, use :http:get:`/browse/origin/content/` instead. HTML view that produces a display of a content associated to a visit of a software origin closest to a provided timestamp. If the content to display is textual, it will be highlighted client-side if possible using highlightjs_. The procedure to perform that task is described in :http:get:`/browse/content/[(algo_hash):](hash)/`. It is also possible to highlight specific lines of a textual content (not in terms of syntax highlighting but to emphasize some relevant content part) by either: * clicking on line numbers (holding shift to highlight a lines range) * using an url fragment in the form '#Ln' or '#Lm-Ln' The view displays a breadcrumb on top of the rendered content in order to easily navigate up to the origin root directory. The view also enables to easily switch between the origin branches and releases through a dropdown menu. The origin branch (default to HEAD) from which to retrieve the content can also be specified by using the branch query parameter. :param string origin_url: the url of the origin (e.g. https://github.com/(user)/(repo)/) - :param string timestamp: a date string (any format parsable by `dateutil.parser.parse`_) - or Unix timestamp to parse in order to find the closest visit. + :param string timestamp: an ISO 8601 datetime string to parse in order to find the + closest visit. :param string path: path of a content reachable from the origin root directory :query string branch: specify the origin branch name from which to retrieve the content :query string release: specify the origin release name from which to retrieve the content :query string revision: specify the origin revision, identified by the hexadecimal representation of its **sha1_git** value, from which to retrieve the content :query int visit_id: specify a visit id to retrieve the content from instead of using the provided timestamp :statuscode 200: no error :statuscode 404: requested origin can not be found in the archive, requested visit timestamp does not exist or the provided content path does not exist from the origin root directory **Examples:** .. parsed-literal:: - :swh_web_browse:`origin/https://github.com/git/git/visit/1473933564/content/git.c/` :swh_web_browse:`origin/https://github.com/git/git/visit/2016-05-05T00:0:00+00:00Z/content/git.c/` - :swh_web_browse:`origin/https://github.com/mozilla/gecko-dev/visit/1490126182/content/js/src/json.cpp/` :swh_web_browse:`origin/https://github.com/mozilla/gecko-dev/visit/2017-03-21/content/js/src/json.cpp/#L904-L931` :swh_web_browse:`origin/https://github.com/git/git/visit/2017-09-15/content/git.c/?branch=refs/heads/next` Origin history """""""""""""" .. http:get:: /browse/origin/log/ HTML view that produces a display of revisions history heading to the last revision found during the latest visit of a software origin. In other words, it shows the commit log associated to the latest full visit of a software origin. The following data are displayed for each log entry: * link to browse the associated revision in the origin context * author of the revision * date of the revision * message associated the revision * commit date of the revision By default, the revisions are ordered in reverse chronological order of their commit date. N log entries are displayed per page (default is 100). In order to navigate in a large history, two buttons are present at the bottom of the view: * **Newer**: fetch and display if available the N more recent log entries than the ones currently displayed * **Older**: fetch and display if available the N older log entries than the ones currently displayed The view also enables to easily switch between the origin branches and releases through a dropdown menu. The origin branch (default to HEAD) from which to retrieve the content can also be specified by using the branch query parameter. :query string origin_url: mandatory parameter providing the url of the origin (e.g. https://github.com/(user)/(repo)) :query int per_page: the number of log entries to display per page :query int offset: the number of revisions to skip before returning those to display :query str revs_ordering: specify the revisions ordering, possible values are ``committer_date``, ``dfs``, ``dfs_post`` and ``bfs`` :query string branch: specify the origin branch name from which to retrieve the commit log :query string release: specify the origin release name from which to retrieve the commit log :query string revision: specify the origin revision, identified by the hexadecimal representation of its **sha1_git** value, from which to retrieve the commit log - :query string timestamp: a date string (any format parsable by `dateutil.parser.parse`_) - or Unix timestamp to parse in order to find the closest visit. + :query string timestamp: an ISO 8601 datetime string to parse in order to find the + closest visit. :query int visit_id: specify a visit id to retrieve the history log from instead of using the latest visit by default :statuscode 200: no error :statuscode 400: no origin url has been provided as parameter :statuscode 404: requested origin can not be found in the archive **Examples:** .. parsed-literal:: :swh_web_browse:`origin/log/?origin_url=https://github.com/videolan/vlc` :swh_web_browse:`origin/log/?origin_url=https://github.com/Kitware/CMake` :swh_web_browse:`origin/log/?origin_url=https://github.com/Kitware/CMake&branch=refs/heads/release` :swh_web_browse:`origin/log/?origin_url=https://github.com/videolan/vlc&visit=1459651262` :swh_web_browse:`origin/log/?origin_url=https://github.com/Kitware/CMake×tamp=2016-04-01` :swh_web_browse:`origin/log/?origin_url=https://github.com/Kitware/CMake&branch=refs/heads/release×tamp=1438116814` :swh_web_browse:`origin/log/?origin_url=https://github.com/Kitware/CMake&branch=refs/heads/release×tamp=2017-05-05T03:14:23Z` .. http:get:: /browse/origin/(origin_url)/log/ :deprecated: .. warning:: That endpoint is deprecated, use :http:get:`/browse/origin/log/` instead. HTML view that produces a display of revisions history heading to the last revision found during the latest visit of a software origin. In other words, it shows the commit log associated to the latest full visit of a software origin. The following data are displayed for each log entry: * link to browse the associated revision in the origin context * author of the revision * date of the revision * message associated the revision * commit date of the revision By default, the revisions are ordered in reverse chronological order of their commit date. N log entries are displayed per page (default is 100). In order to navigate in a large history, two buttons are present at the bottom of the view: * **Newer**: fetch and display if available the N more recent log entries than the ones currently displayed * **Older**: fetch and display if available the N older log entries than the ones currently displayed The view also enables to easily switch between the origin branches and releases through a dropdown menu. The origin branch (default to HEAD) from which to retrieve the content can also be specified by using the branch query parameter. :query string origin_url: mandatory parameter providing the url of the origin (e.g. https://github.com/(user)/(repo)) :query int per_page: the number of log entries to display per page :query int offset: the number of revisions to skip before returning those to display :query str revs_ordering: specify the revisions ordering, possible values are ``committer_date``, ``dfs``, ``dfs_post`` and ``bfs`` :query string branch: specify the origin branch name from which to retrieve the commit log :query string release: specify the origin release name from which to retrieve the commit log :query string revision: specify the origin revision, identified by the hexadecimal representation of its **sha1_git** value, from which to retrieve the commit log - :query string timestamp: a date string (any format parsable by `dateutil.parser.parse`_) - or Unix timestamp to parse in order to find the closest visit. + :query string timestamp: an ISO 8601 datetime string to parse in order to find the + closest visit. :query int visit_id: specify a visit id to retrieve the history log from instead of using the latest visit by default :statuscode 200: no error :statuscode 404: requested origin can not be found in the archive **Examples:** .. parsed-literal:: :swh_web_browse:`origin/https://github.com/videolan/vlc/log/` :swh_web_browse:`origin/https://github.com/Kitware/CMake/log/` :swh_web_browse:`origin/https://github.com/Kitware/CMake/log/?branch=refs/heads/release` - :swh_web_browse:`origin/https://github.com/videolan/vlc/log/?visit=1459651262` :swh_web_browse:`origin/https://github.com/Kitware/CMake/log/?timestamp=2016-04-01` - :swh_web_browse:`origin/https://github.com/Kitware/CMake/log/?branch=refs/heads/release×tamp=1438116814` :swh_web_browse:`origin/https://github.com/Kitware/CMake/log/?branch=refs/heads/release×tamp=2017-05-05T03:14:23Z` .. http:get:: /browse/origin/(origin_url)/visit/(timestamp)/log/ :deprecated: .. warning:: That endpoint is deprecated, use :http:get:`/browse/origin/log/` instead. HTML view that produces a display of revisions history heading to the last revision found during a visit of a software origin closest to the provided timestamp. In other words, it shows the commit log associated to a visit of a software origin closest to a provided timestamp. The following data are displayed for each log entry: * author of the revision * link to the revision metadata * message associated the revision * date of the revision * link to browse the associated source tree in the origin context N log entries are displayed per page (default is 20). In order to navigate in a large history, two buttons are present at the bottom of the view: * **Newer**: fetch and display if available the N more recent log entries than the ones currently displayed * **Older**: fetch and display if available the N older log entries than the ones currently displayed The view also enables to easily switch between the origin branches and releases through a dropdown menu. The origin branch (default to HEAD) from which to retrieve the content can also be specified by using the branch query parameter. :param string origin_url: the url of the origin (e.g. https://github.com/(user)/(repo)/) - :param string timestamp: a date string (any format parsable by `dateutil.parser.parse`_) - or Unix timestamp to parse in order to find the closest visit. + :param string timestamp: an ISO 8601 datetime string to parse in order to find the + closest visit. :query int per_page: the number of log entries to display per page (default is 20, max is 50) :query string branch: specify the origin branch name from which to retrieve the commit log :query string release: specify the origin release name from which to retrieve the commit log :query string revision: specify the origin revision, identified by the hexadecimal representation of its **sha1_git** value, from which to retrieve the commit log :query int visit_id: specify a visit id to retrieve the history log from instead of using the provided timestamp :statuscode 200: no error :statuscode 404: requested origin can not be found in the archive **Examples:** .. parsed-literal:: - :swh_web_browse:`origin/https://github.com/videolan/vlc/visit/1459651262/log/` :swh_web_browse:`origin/https://github.com/Kitware/CMake/visit/2016-04-01/log/` - :swh_web_browse:`origin/https://github.com/Kitware/CMake/visit/1438116814/log/?branch=refs/heads/release` :swh_web_browse:`origin/https://github.com/Kitware/CMake/visit/2017-05-05T03:14:23Z/log/?branch=refs/heads/release` Origin branches """"""""""""""" .. http:get:: /browse/origin/branches/ HTML view that produces a display of the list of branches found during the latest full visit of a software origin. The following data are displayed for each branch: * its name * a link to browse the associated directory * a link to browse the associated revision * last commit message * last commit date That list of branches is paginated, each page displaying a maximum of 100 branches. :query string origin_url: mandatory parameter providing the url of the origin (e.g. https://github.com/(user)/(repo)) - :query string timestamp: a date string (any format parsable by `dateutil.parser.parse`_) - or Unix timestamp to parse in order to find the closest visit. + :query string timestamp: an ISO 8601 datetime string to parse in order to find the + closest visit. :statuscode 200: no error :statuscode 400: no origin url has been provided as parameter :statuscode 404: requested origin can not be found in the archive **Examples:** .. parsed-literal:: :swh_web_browse:`origin/branches/?origin_url=deb://Debian/packages/linux` :swh_web_browse:`origin/branches/?origin_url=https://github.com/webpack/webpack` :swh_web_browse:`origin/branches/?origin_url=https://github.com/kripken/emscripten×tamp=2017-05-05T12:02:03Z` :swh_web_browse:`origin/branches/?origin_url=deb://Debian/packages/apache2-mod-xforward×tamp=2017-11-15T05:15:09Z` .. http:get:: /browse/origin/(origin_url)/branches/ :deprecated: .. warning:: That endpoint is deprecated, use :http:get:`/browse/origin/branches/` instead. HTML view that produces a display of the list of branches found during the latest full visit of a software origin. The following data are displayed for each branch: * its name * a link to browse the associated directory * a link to browse the associated revision * last commit message * last commit date That list of branches is paginated, each page displaying a maximum of 100 branches. :param string origin_url: the url of the origin (e.g. https://github.com/(user)/(repo)/) - :query string timestamp: a date string (any format parsable by `dateutil.parser.parse`_) - or Unix timestamp to parse in order to find the closest visit. + :query string timestamp: an ISO 8601 datetime string to parse in order to find the + closest visit. :statuscode 200: no error :statuscode 404: requested origin can not be found in the archive **Examples:** .. parsed-literal:: :swh_web_browse:`origin/deb://Debian/packages/linux/branches/` :swh_web_browse:`origin/https://github.com/webpack/webpack/branches/` :swh_web_browse:`origin/https://github.com/kripken/emscripten/branches/?timestamp=2017-05-05T12:02:03Z` :swh_web_browse:`origin/deb://Debian/packages/apache2-mod-xforward/branches/?timestamp=2017-11-15T05:15:09` .. http:get:: /browse/origin/(origin_url)/visit/(timestamp)/branches/ :deprecated: .. warning:: That endpoint is deprecated, use :http:get:`/browse/origin/branches/` instead. HTML view that produces a display of the list of branches found during a visit of a software origin closest to the provided timestamp. The following data are displayed for each branch: * its name * a link to browse the associated directory * a link to browse the associated revision * last commit message * last commit date That list of branches is paginated, each page displaying a maximum of 100 branches. :param string origin_url: the url of the origin (e.g. https://github.com/(user)/(repo)/) - :param string timestamp: a date string (any format parsable by `dateutil.parser.parse`_) - or Unix timestamp to parse in order to find the closest visit. + :param string timestamp: an ISO 8601 datetime string to parse in order to find the + closest visit. :statuscode 200: no error :statuscode 404: requested origin can not be found in the archive **Examples:** .. parsed-literal:: :swh_web_browse:`origin/https://github.com/kripken/emscripten/visit/2017-05-05T12:02:03Z/branches/` :swh_web_browse:`origin/deb://Debian/packages/apache2-mod-xforward/visit/2017-11-15T05:15:09Z/branches/` Origin releases """"""""""""""" .. http:get:: /browse/origin/releases/ HTML view that produces a display of the list of releases found during the latest full visit of a software origin. The following data are displayed for each release: * its name * a link to browse the release details * its target type (revision, directory, content or release) * its associated message * its date That list of releases is paginated, each page displaying a maximum of 100 releases. :query string origin_url: mandatory parameter providing the url of the origin (e.g. https://github.com/(user)/(repo)) - :query string timestamp: a date string (any format parsable by `dateutil.parser.parse`_) - or Unix timestamp to parse in order to find the closest visit. + :query string timestamp: an ISO 8601 datetime string to parse in order to find the + closest visit. :statuscode 200: no error :statuscode 400: no origin url has been provided as parameter :statuscode 404: requested origin can not be found in the archive **Examples:** .. parsed-literal:: :swh_web_browse:`origin/releases/?origin_url=https://github.com/git/git` :swh_web_browse:`origin/releases/?origin_url=https://github.com/webpack/webpack` :swh_web_browse:`origin/releases/?origin_url=https://github.com/torvalds/linux×tamp=2017-11-21T19:37:42Z` :swh_web_browse:`origin/releases/?origin_url=https://github.com/Kitware/CMake×tamp=2016-09-23T14:06:35Z` .. http:get:: /browse/origin/(origin_url)/releases/ :deprecated: .. warning:: That endpoint is deprecated, use :http:get:`/browse/origin/releases/` instead. HTML view that produces a display of the list of releases found during the latest full visit of a software origin. The following data are displayed for each release: * its name * a link to browse the release details * its target type (revision, directory, content or release) * its associated message * its date That list of releases is paginated, each page displaying a maximum of 100 releases. :param string origin_url: the url of the origin (e.g. https://github.com/(user)/(repo)/) - :query string timestamp: a date string (any format parsable by `dateutil.parser.parse`_) - or Unix timestamp to parse in order to find the closest visit. + :query string timestamp: an ISO 8601 datetime string to parse in order to find the + closest visit. :statuscode 200: no error :statuscode 404: requested origin can not be found in the archive **Examples:** .. parsed-literal:: :swh_web_browse:`origin/https://github.com/git/git/releases/` :swh_web_browse:`origin/https://github.com/webpack/webpack/releases/` :swh_web_browse:`origin/https://github.com/torvalds/linux/releases/?timestamp=2017-11-21T19:37:42Z` :swh_web_browse:`origin/https://github.com/Kitware/CMake/releases/?timestamp=2016-09-23T14:06:35Z` .. http:get:: /browse/origin/(origin_url)/visit/(timestamp)/releases/ :deprecated: .. warning:: That endpoint is deprecated, use :http:get:`/browse/origin/releases/` instead. HTML view that produces a display of the list of releases found during a visit of a software origin closest to the provided timestamp. The following data are displayed for each release: * its name * a link to browse the release details * its target type (revision, directory, content or release) * its associated message * its date That list of releases is paginated, each page displaying a maximum of 100 releases. :param string origin_url: the url of the origin (e.g. https://github.com/(user)/(repo)/) - :param string timestamp: a date string (any format parsable by `dateutil.parser.parse`_) - or Unix timestamp to parse in order to find the closest visit. + :param string timestamp: an ISO 8601 datetime string to parse in order to find the + closest visit. :statuscode 200: no error :statuscode 404: requested origin can not be found in the archive **Examples:** .. parsed-literal:: :swh_web_browse:`origin/https://github.com/torvalds/linux/visit/2017-11-21T19:37:42Z/releases/` :swh_web_browse:`origin/https://github.com/Kitware/CMake/visit/2016-09-23T14:06:35Z/releases/` .. _highlightjs: https://highlightjs.org/ -.. _dateutil.parser.parse: http://dateutil.readthedocs.io/en/stable/parser.html diff --git a/docs/uri-scheme-browse-revision.rst b/docs/uri-scheme-browse-revision.rst index 79a65961..c54a151a 100644 --- a/docs/uri-scheme-browse-revision.rst +++ b/docs/uri-scheme-browse-revision.rst @@ -1,79 +1,75 @@ Revision ^^^^^^^^ .. http:get:: /browse/revision/(sha1_git)/ HTML view to browse a revision. It notably shows the revision date and message but also offers links to get more details on: * its author * its parent revisions * the history log reachable from it The view also enables to navigate in the source tree associated to the revision and browse its content. Last but not least, the view displays the list of file changes introduced in the revision but also the diffs of each changed files. :param string sha1_git: hexadecimal representation for the **sha1_git** identifier of a revision :query string origin_url: used internally to associate an origin url (e.g. https://github.com/user/repo) to the revision - :query string timestamp: used internally to associate an origin visit to the - revision, must be a date string (any format parsable by `dateutil.parser.parse`_) - or Unix timestamp to parse in order to find the closest visit. - :query int visit_id: used internally to specify a visit id instead of + :query string timestamp: an ISO 8601 datetime string to parse in order to find the + closest visit. + :query int visit_id: specify a visit id instead of using the provided timestamp - :query string path: used internally when navigating in the source tree - associated to the revision + :query string path: optional relative path from the revision root directory :statuscode 200: no error :statuscode 404: requested revision can not be found in the archive **Examples:** .. parsed-literal:: :swh_web_browse:`revision/f1b94134a4b879bc55c3dacdb496690c8ebdc03f/` :swh_web_browse:`revision/d1aa2b3f607b35dc5dbf613b2334b6d243ec2bda/` - .. _dateutil.parser.parse: http://dateutil.readthedocs.io/en/stable/parser.html - .. http:get:: /browse/revision/(sha1_git)/log/ HTML view that displays the list of revisions heading to a given one. In other words, it shows a commit log. The following data are displayed for each log entry: * link to browse the revision * author of the revision * date of the revision * message associated to the revision * commit date of the revision By default, the revisions are ordered in reverse chronological order of their commit date. N log entries are displayed per page (default is 100). In order to navigate in a large history, two buttons are present at the bottom of the view: * **Newer**: fetch and display if available the N more recent log entries than the ones currently displayed * **Older**: fetch and display if available the N older log entries than the ones currently displayed :param string sha1_git: hexadecimal representation for the **sha1_git** identifier of a revision :query int per_page: the number of log entries to display per page :query int offset: the number of revisions to skip before returning those to display :query str revs_ordering: specify the revisions ordering, possible values are ``committer_date``, ``dfs``, ``dfs_post`` and ``bfs`` :statuscode 200: no error :statuscode 404: requested revision can not be found in the archive **Examples:** .. parsed-literal:: :swh_web_browse:`revision/f1b94134a4b879bc55c3dacdb496690c8ebdc03f/log/` :swh_web_browse:`revision/d1aa2b3f607b35dc5dbf613b2334b6d243ec2bda/log/` diff --git a/docs/uri-scheme-browse.rst b/docs/uri-scheme-browse.rst index 87b637f6..ba81d6a2 100644 --- a/docs/uri-scheme-browse.rst +++ b/docs/uri-scheme-browse.rst @@ -1,93 +1,92 @@ URI scheme for swh-web Browse application ========================================= This web application aims to provide HTML views to easily navigate in the archive, thus it needs to be reached from a web browser. If you intend to query the archive programmatically through any HTTP client, please refer to the :ref:`swh-web-api-urls` section instead. Context-independent browsing ---------------------------- Context-independent URLs provide information about objects (e.g., revisions, directories, contents, person, ...), independently of the contexts where they have been found (e.g., specific repositories, branches, commits, ...). The following endpoints are the same of the API case (see below), and just render the corresponding information for user consumption. Where hyperlinks are created, they always point to other context-independent user URLs: * :http:get:`/browse/content/[(algo_hash):](hash)/`: Display a content * :http:get:`/browse/content/[(algo_hash):](hash)/raw/`: Get / Download content raw data * :http:get:`/browse/directory/(sha1_git)/`: Browse the content of a directory * :http:get:`/browse/person/(person_id)/`: Information on a person * :http:get:`/browse/revision/(sha1_git)/`: Browse a revision * :http:get:`/browse/revision/(sha1_git)/log/`: Browse history log heading to a revision Context-dependent browsing -------------------------- Context-dependent URLs provide information about objects, limited to specific contexts where the objects have been found. For instance, instead of having to specify a (root) revision by **sha1_git**, users might want to specify a place and a time. In Software Heritage a "place" is an origin, with an optional branch name; a "time" is a timestamp at which some place has been observed by Software Heritage crawlers. Wherever a revision context is expected in a path (i.e., a **/browse/revision/(sha1_git)/** path fragment) we can put in its stead a path fragment of the form **/browse/origin/?origin_url=(origin_url)×tamp=(timestamp)&branch=(branch)**. Such a fragment is resolved, internally by the archive, to a revision **sha1_git** as follows: - if **timestamp** is not given as query parameter: look for the most recent crawl of origin identified by **origin_url** - if **timestamp** is given: look for the closest crawl of origin identified by **origin_url** from timestamp **timestamp** - if **branch** is given as a query parameter: look for the branch **branch** - if **branch** is absent: look for branch "HEAD" or "master" - return the revision **sha1_git** pointed by the chosen branch The already mentioned URLs for revision contexts can therefore be alternatively specified by users as: * :http:get:`/browse/origin/directory/` * :http:get:`/browse/origin/content/` * :http:get:`/browse/origin/log/` Typing: - **origin_url** corresponds to the URL the origin was crawled from, for instance https://github.com/(user)/(repo)/ - **branch** name is given as per the corresponding VCS (e.g., Git) as a query parameter to the requested URL. - **timestamp** is given in a format as liberal as possible, to uphold the principle of least surprise. At the very minimum it is possible to enter timestamps as: - - Unix epoch timestamp (see for instance the output of `date +%s`) - ISO 8601 timestamps (see for instance the output of `date -I`, `date -Is`) - YYYY[MM[DD[HH[MM[SS]]]]] ad-hoc format - YYYY[-MM[-DD[ HH:[MM:[SS:]]]]] ad-hoc format swh-web Browse Urls ------------------- .. include:: uri-scheme-browse-content.rst .. include:: uri-scheme-browse-directory.rst .. include:: uri-scheme-browse-origin.rst .. include:: uri-scheme-browse-person.rst .. include:: uri-scheme-browse-release.rst .. include:: uri-scheme-browse-revision.rst .. include:: uri-scheme-browse-snapshot.rst diff --git a/mypy.ini b/mypy.ini index e5d71724..4c94e70f 100644 --- a/mypy.ini +++ b/mypy.ini @@ -1,52 +1,55 @@ [mypy] namespace_packages = True warn_unused_ignores = True # support for django magic: https://github.com/typeddjango/django-stubs plugins = mypy_django_plugin.main, mypy_drf_plugin.main [mypy.plugins.django-stubs] django_settings_module = swh.web.settings.development # 3rd party libraries without stubs (yet) [mypy-bs4.*] ignore_missing_imports = True [mypy-corsheaders.*] ignore_missing_imports = True [mypy-django_js_reverse.*] ignore_missing_imports = True [mypy-htmlmin.*] ignore_missing_imports = True +[mypy-iso8601.*] +ignore_missing_imports = True + [mypy-keycloak.*] ignore_missing_imports = True [mypy-magic.*] ignore_missing_imports = True [mypy-pkg_resources.*] ignore_missing_imports = True [mypy-prometheus_client.*] ignore_missing_imports = True [mypy-pygments.*] ignore_missing_imports = True [mypy-pytest.*] ignore_missing_imports = True [mypy-requests_mock.*] ignore_missing_imports = True [mypy-sphinx.*] ignore_missing_imports = True [mypy-sphinxcontrib.*] ignore_missing_imports = True [mypy-swh.docs.*] ignore_missing_imports = True diff --git a/requirements.txt b/requirements.txt index 53032284..17fb4438 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,24 +1,24 @@ # Add here external Python modules dependencies, one per line. Module names # should match https://pypi.python.org/pypi names. For the full spec or # dependency lines, see https://pip.readthedocs.org/en/1.1/requirements.html beautifulsoup4 django < 3 django-cors-headers django-js-reverse djangorestframework django-webpack-loader docutils htmlmin +iso8601 lxml prometheus-client pybadges pygments -python-dateutil python-keycloak >= 0.19.0 python-magic >= 0.4.0 python-memcached pyyaml requests sentry-sdk typing-extensions diff --git a/swh/web/browse/snapshot_context.py b/swh/web/browse/snapshot_context.py index a6e262e2..69e4eead 100644 --- a/swh/web/browse/snapshot_context.py +++ b/swh/web/browse/snapshot_context.py @@ -1,1465 +1,1465 @@ # Copyright (C) 2018-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information # Utility module for browsing the archive in a snapshot context. from collections import defaultdict from copy import copy -from typing import Any, Dict, List, Optional, Union, Tuple +from typing import Any, Dict, List, Optional, Tuple from django.core.cache import cache from django.shortcuts import render from django.template.defaultfilters import filesizeformat from django.utils.html import escape import sentry_sdk from swh.model.identifiers import ( swhid, snapshot_identifier, CONTENT, DIRECTORY, REVISION, RELEASE, SNAPSHOT, ) from swh.web.browse.utils import ( get_directory_entries, gen_directory_link, gen_revision_link, gen_revision_url, request_content, gen_content_link, prepare_content_for_display, content_display_max_size, format_log_entries, gen_revision_log_link, gen_release_link, get_readme_to_display, gen_snapshot_link, ) from swh.web.common import service, highlightjs from swh.web.common.exc import handle_view_exception, NotFoundExc, BadInputExc from swh.web.common.identifiers import get_swhids_info from swh.web.common.origin_visits import get_origin_visit from swh.web.common.typing import ( OriginInfo, SnapshotBranchInfo, SnapshotReleaseInfo, SnapshotContext, ContentMetadata, DirectoryMetadata, SWHObjectInfo, ) from swh.web.common.utils import ( reverse, gen_path_info, format_utc_iso_date, swh_object_icons, ) from swh.web.config import get_config _empty_snapshot_id = snapshot_identifier({"branches": {}}) def _get_branch(branches, branch_name, snapshot_id): """ Utility function to get a specific branch from a branches list. Its purpose is to get the default HEAD branch as some software origin (e.g those with svn type) does not have it. In that latter case, check if there is a master branch instead and returns it. """ filtered_branches = [b for b in branches if b["name"] == branch_name] if filtered_branches: return filtered_branches[0] elif branch_name == "HEAD": filtered_branches = [b for b in branches if b["name"].endswith("master")] if filtered_branches: return filtered_branches[0] elif branches: return branches[0] else: # case where a large branches list has been truncated snp = service.lookup_snapshot( snapshot_id, branches_from=branch_name, branches_count=1, target_types=["revision", "alias"], ) snp_branch, _ = process_snapshot_branches(snp) if snp_branch and snp_branch[0]["name"] == branch_name: branches.append(snp_branch[0]) return snp_branch[0] def _get_release(releases, release_name, snapshot_id): """ Utility function to get a specific release from a releases list. Returns None if the release can not be found in the list. """ filtered_releases = [r for r in releases if r["name"] == release_name] if filtered_releases: return filtered_releases[0] else: # case where a large branches list has been truncated try: # git origins have specific branches for releases snp = service.lookup_snapshot( snapshot_id, branches_from=f"refs/tags/{release_name}", branches_count=1, target_types=["release"], ) except NotFoundExc: snp = service.lookup_snapshot( snapshot_id, branches_from=release_name, branches_count=1, target_types=["release"], ) _, snp_release = process_snapshot_branches(snp) if snp_release and snp_release[0]["name"] == release_name: releases.append(snp_release[0]) return snp_release[0] def _branch_not_found( branch_type, branch, snapshot_id, snapshot_sizes, origin_info, timestamp, visit_id ): """ Utility function to raise an exception when a specified branch/release can not be found. """ if branch_type == "branch": branch_type = "Branch" branch_type_plural = "branches" target_type = "revision" else: branch_type = "Release" branch_type_plural = "releases" target_type = "release" if snapshot_id and snapshot_sizes[target_type] == 0: msg = "Snapshot with id %s has an empty list" " of %s!" % ( snapshot_id, branch_type_plural, ) elif snapshot_id: msg = "%s %s for snapshot with id %s" " not found!" % ( branch_type, branch, snapshot_id, ) elif visit_id and snapshot_sizes[target_type] == 0: msg = ( "Origin with url %s" " for visit with id %s has an empty list" " of %s!" % (origin_info["url"], visit_id, branch_type_plural) ) elif visit_id: msg = ( "%s %s associated to visit with" " id %s for origin with url %s" " not found!" % (branch_type, branch, visit_id, origin_info["url"]) ) elif snapshot_sizes[target_type] == 0: msg = ( "Origin with url %s" " for visit with timestamp %s has an empty list" " of %s!" % (origin_info["url"], timestamp, branch_type_plural) ) else: msg = ( "%s %s associated to visit with" " timestamp %s for origin with " "url %s not found!" % (branch_type, branch, timestamp, origin_info["url"]) ) raise NotFoundExc(escape(msg)) def process_snapshot_branches( snapshot: Dict[str, Any] ) -> Tuple[List[SnapshotBranchInfo], List[SnapshotReleaseInfo]]: """ Process a dictionary describing snapshot branches: extract those targeting revisions and releases, put them in two different lists, then sort those lists in lexicographical order of the branches' names. Args: snapshot: A dict describing a snapshot as returned for instance by :func:`swh.web.common.service.lookup_snapshot` Returns: A tuple whose first member is the sorted list of branches targeting revisions and second member the sorted list of branches targeting releases """ snapshot_branches = snapshot["branches"] branches: Dict[str, SnapshotBranchInfo] = {} branch_aliases: Dict[str, str] = {} releases: Dict[str, SnapshotReleaseInfo] = {} revision_to_branch = defaultdict(set) revision_to_release = defaultdict(set) release_to_branch = defaultdict(set) for branch_name, target in snapshot_branches.items(): if not target: # FIXME: display branches with an unknown target anyway continue target_id = target["target"] target_type = target["target_type"] if target_type == "revision": branches[branch_name] = SnapshotBranchInfo( name=branch_name, revision=target_id, date=None, directory=None, message=None, url=None, ) revision_to_branch[target_id].add(branch_name) elif target_type == "release": release_to_branch[target_id].add(branch_name) elif target_type == "alias": branch_aliases[branch_name] = target_id # FIXME: handle pointers to other object types def _add_release_info(branch, release): releases[branch] = SnapshotReleaseInfo( name=release["name"], branch_name=branch, date=format_utc_iso_date(release["date"]), directory=None, id=release["id"], message=release["message"], target_type=release["target_type"], target=release["target"], url=None, ) def _add_branch_info(branch, revision): branches[branch] = SnapshotBranchInfo( name=branch, revision=revision["id"], directory=revision["directory"], date=format_utc_iso_date(revision["date"]), message=revision["message"], url=None, ) releases_info = service.lookup_release_multiple(release_to_branch.keys()) for release in releases_info: branches_to_update = release_to_branch[release["id"]] for branch in branches_to_update: _add_release_info(branch, release) if release["target_type"] == "revision": revision_to_release[release["target"]].update(branches_to_update) revisions = service.lookup_revision_multiple( set(revision_to_branch.keys()) | set(revision_to_release.keys()) ) for revision in revisions: if not revision: continue for branch in revision_to_branch[revision["id"]]: _add_branch_info(branch, revision) for release in revision_to_release[revision["id"]]: releases[release]["directory"] = revision["directory"] for branch_alias, branch_target in branch_aliases.items(): if branch_target in branches: branches[branch_alias] = copy(branches[branch_target]) else: snp = service.lookup_snapshot( snapshot["id"], branches_from=branch_target, branches_count=1 ) if snp and branch_target in snp["branches"]: if snp["branches"][branch_target] is None: continue target_type = snp["branches"][branch_target]["target_type"] target = snp["branches"][branch_target]["target"] if target_type == "revision": branches[branch_alias] = snp["branches"][branch_target] revision = service.lookup_revision(target) _add_branch_info(branch_alias, revision) elif target_type == "release": release = service.lookup_release(target) _add_release_info(branch_alias, release) if branch_alias in branches: branches[branch_alias]["name"] = branch_alias ret_branches = list(sorted(branches.values(), key=lambda b: b["name"])) ret_releases = list(sorted(releases.values(), key=lambda b: b["name"])) return ret_branches, ret_releases def get_snapshot_content( snapshot_id: str, ) -> Tuple[List[SnapshotBranchInfo], List[SnapshotReleaseInfo]]: """Returns the lists of branches and releases associated to a swh snapshot. That list is put in cache in order to speedup the navigation in the swh-web/browse ui. .. warning:: At most 1000 branches contained in the snapshot will be returned for performance reasons. Args: snapshot_id: hexadecimal representation of the snapshot identifier Returns: A tuple with two members. The first one is a list of dict describing the snapshot branches. The second one is a list of dict describing the snapshot releases. Raises: NotFoundExc if the snapshot does not exist """ cache_entry_id = "swh_snapshot_%s" % snapshot_id cache_entry = cache.get(cache_entry_id) if cache_entry: return cache_entry["branches"], cache_entry["releases"] branches: List[SnapshotBranchInfo] = [] releases: List[SnapshotReleaseInfo] = [] snapshot_content_max_size = get_config()["snapshot_content_max_size"] if snapshot_id: snapshot = service.lookup_snapshot( snapshot_id, branches_count=snapshot_content_max_size ) branches, releases = process_snapshot_branches(snapshot) cache.set(cache_entry_id, {"branches": branches, "releases": releases,}) return branches, releases def get_origin_visit_snapshot( origin_info: OriginInfo, - visit_ts: Optional[Union[int, str]] = None, + visit_ts: Optional[str] = None, visit_id: Optional[int] = None, snapshot_id: Optional[str] = None, ) -> Tuple[List[SnapshotBranchInfo], List[SnapshotReleaseInfo]]: """Returns the lists of branches and releases associated to an origin for a given visit. The visit is expressed by either: * a snapshot identifier * a timestamp, if no visit with that exact timestamp is found, the closest one from the provided timestamp will be used. If no visit parameter is provided, it returns the list of branches found for the latest visit. That list is put in cache in order to speedup the navigation in the swh-web/browse ui. .. warning:: At most 1000 branches contained in the snapshot will be returned for performance reasons. Args: origin_info: a dict filled with origin information - visit_ts: an ISO date string or Unix timestamp to parse + visit_ts: an ISO 8601 datetime string to parse visit_id: visit id for disambiguation in case several visits have the same timestamp snapshot_id: if provided, visit associated to the snapshot will be processed Returns: A tuple with two members. The first one is a list of dict describing the origin branches for the given visit. The second one is a list of dict describing the origin releases for the given visit. Raises: NotFoundExc if the origin or its visit are not found """ visit_info = get_origin_visit(origin_info, visit_ts, visit_id, snapshot_id) return get_snapshot_content(visit_info["snapshot"]) def get_snapshot_context( snapshot_id: Optional[str] = None, origin_url: Optional[str] = None, timestamp: Optional[str] = None, visit_id: Optional[int] = None, branch_name: Optional[str] = None, release_name: Optional[str] = None, revision_id: Optional[str] = None, path: Optional[str] = None, browse_context: str = "directory", ) -> SnapshotContext: """ Utility function to compute relevant information when navigating the archive in a snapshot context. The snapshot is either referenced by its id or it will be retrieved from an origin visit. Args: snapshot_id: hexadecimal representation of a snapshot identifier origin_url: an origin_url timestamp: a datetime string for retrieving the closest visit of the origin visit_id: optional visit id for disambiguation in case of several visits with the same timestamp branch_name: optional branch name set when browsing the snapshot in that scope (will default to "HEAD" if not provided) release_name: optional release name set when browsing the snapshot in that scope revision_id: optional revision identifier set when browsing the snapshot in that scope path: optional path of the object currently browsed in the snapshot browse_context: indicates which type of object is currently browsed Returns: A dict filled with snapshot context information. Raises: swh.web.common.exc.NotFoundExc: if no snapshot is found for the visit of an origin. """ assert origin_url is not None or snapshot_id is not None origin_info = None visit_info = None url_args = {} query_params: Dict[str, Any] = {} origin_visits_url = None if origin_url: if visit_id is not None: query_params["visit_id"] = visit_id elif snapshot_id is not None: query_params["snapshot"] = snapshot_id origin_info = service.lookup_origin({"url": origin_url}) visit_info = get_origin_visit(origin_info, timestamp, visit_id, snapshot_id) formatted_date = format_utc_iso_date(visit_info["date"]) visit_info["formatted_date"] = formatted_date snapshot_id = visit_info["snapshot"] if not snapshot_id: raise NotFoundExc( "No snapshot associated to the visit of origin " "%s on %s" % (escape(origin_url), formatted_date) ) # provided timestamp is not necessarily equals to the one # of the retrieved visit, so get the exact one in order # to use it in the urls generated below if timestamp: timestamp = visit_info["date"] branches, releases = get_origin_visit_snapshot( origin_info, timestamp, visit_id, snapshot_id ) query_params["origin_url"] = origin_info["url"] origin_visits_url = reverse( "browse-origin-visits", query_params={"origin_url": origin_info["url"]} ) if timestamp is not None: query_params["timestamp"] = format_utc_iso_date( timestamp, "%Y-%m-%dT%H:%M:%SZ" ) visit_url = reverse("browse-origin-directory", query_params=query_params) visit_info["url"] = visit_url branches_url = reverse("browse-origin-branches", query_params=query_params) releases_url = reverse("browse-origin-releases", query_params=query_params) else: assert snapshot_id is not None branches, releases = get_snapshot_content(snapshot_id) url_args = {"snapshot_id": snapshot_id} branches_url = reverse("browse-snapshot-branches", url_args=url_args) releases_url = reverse("browse-snapshot-releases", url_args=url_args) releases = list(reversed(releases)) snapshot_sizes = service.lookup_snapshot_sizes(snapshot_id) is_empty = sum(snapshot_sizes.values()) == 0 swh_snp_id = swhid("snapshot", snapshot_id) if visit_info: timestamp = format_utc_iso_date(visit_info["date"]) if origin_info: browse_view_name = f"browse-origin-{browse_context}" else: browse_view_name = f"browse-snapshot-{browse_context}" release_id = None root_directory = None snapshot_total_size = sum(snapshot_sizes.values()) if path is not None: query_params["path"] = path if snapshot_total_size and revision_id is not None: revision = service.lookup_revision(revision_id) root_directory = revision["directory"] branches.append( SnapshotBranchInfo( name=revision_id, revision=revision_id, directory=root_directory, date=revision["date"], message=revision["message"], url=None, ) ) branch_name = revision_id query_params["revision"] = revision_id elif snapshot_total_size and release_name: release = _get_release(releases, release_name, snapshot_id) try: root_directory = release["directory"] revision_id = release["target"] release_id = release["id"] query_params["release"] = release_name except Exception as exc: sentry_sdk.capture_exception(exc) _branch_not_found( "release", release_name, snapshot_id, snapshot_sizes, origin_info, timestamp, visit_id, ) elif snapshot_total_size: if branch_name: query_params["branch"] = branch_name branch = _get_branch(branches, branch_name or "HEAD", snapshot_id) try: branch_name = branch["name"] revision_id = branch["revision"] root_directory = branch["directory"] except Exception as exc: sentry_sdk.capture_exception(exc) _branch_not_found( "branch", branch_name, snapshot_id, snapshot_sizes, origin_info, timestamp, visit_id, ) for b in branches: branch_query_params = dict(query_params) branch_query_params.pop("release", None) if b["name"] != b["revision"]: branch_query_params.pop("revision", None) branch_query_params["branch"] = b["name"] b["url"] = reverse( browse_view_name, url_args=url_args, query_params=branch_query_params ) for r in releases: release_query_params = dict(query_params) release_query_params.pop("branch", None) release_query_params.pop("revision", None) release_query_params["release"] = r["name"] r["url"] = reverse( browse_view_name, url_args=url_args, query_params=release_query_params, ) revision_info = None if revision_id: try: revision_info = service.lookup_revision(revision_id) except NotFoundExc: pass else: revision_info["date"] = format_utc_iso_date(revision_info["date"]) revision_info["committer_date"] = format_utc_iso_date( revision_info["committer_date"] ) if revision_info["message"]: message_lines = revision_info["message"].split("\n") revision_info["message_header"] = message_lines[0] else: revision_info["message_header"] = "" snapshot_context = SnapshotContext( branch=branch_name, branches=branches, branches_url=branches_url, is_empty=is_empty, origin_info=origin_info, origin_visits_url=origin_visits_url, release=release_name, release_id=release_id, query_params=query_params, releases=releases, releases_url=releases_url, revision_id=revision_id, revision_info=revision_info, root_directory=root_directory, snapshot_id=snapshot_id, snapshot_sizes=snapshot_sizes, snapshot_swhid=swh_snp_id, url_args=url_args, visit_info=visit_info, ) if revision_info: revision_info["revision_url"] = gen_revision_url(revision_id, snapshot_context) return snapshot_context def _build_breadcrumbs(snapshot_context: SnapshotContext, path: str): origin_info = snapshot_context["origin_info"] url_args = snapshot_context["url_args"] query_params = dict(snapshot_context["query_params"]) root_directory = snapshot_context["root_directory"] path_info = gen_path_info(path) if origin_info: browse_view_name = "browse-origin-directory" else: browse_view_name = "browse-snapshot-directory" breadcrumbs = [] if root_directory: query_params.pop("path", None) breadcrumbs.append( { "name": root_directory[:7], "url": reverse( browse_view_name, url_args=url_args, query_params=query_params ), } ) for pi in path_info: query_params["path"] = pi["path"] breadcrumbs.append( { "name": pi["name"], "url": reverse( browse_view_name, url_args=url_args, query_params=query_params ), } ) return breadcrumbs def _check_origin_url(snapshot_id, origin_url): if snapshot_id is None and origin_url is None: raise BadInputExc("An origin URL must be provided as query parameter.") def browse_snapshot_directory( request, snapshot_id=None, origin_url=None, timestamp=None, path=None ): """ Django view implementation for browsing a directory in a snapshot context. """ try: _check_origin_url(snapshot_id, origin_url) snapshot_context = get_snapshot_context( snapshot_id=snapshot_id, origin_url=origin_url, timestamp=timestamp, visit_id=request.GET.get("visit_id"), path=path, browse_context="directory", branch_name=request.GET.get("branch"), release_name=request.GET.get("release"), revision_id=request.GET.get("revision"), ) root_directory = snapshot_context["root_directory"] sha1_git = root_directory if root_directory and path: dir_info = service.lookup_directory_with_path(root_directory, path) sha1_git = dir_info["target"] dirs = [] files = [] if sha1_git: dirs, files = get_directory_entries(sha1_git) except Exception as exc: return handle_view_exception(request, exc) origin_info = snapshot_context["origin_info"] visit_info = snapshot_context["visit_info"] url_args = snapshot_context["url_args"] query_params = dict(snapshot_context["query_params"]) revision_id = snapshot_context["revision_id"] snapshot_id = snapshot_context["snapshot_id"] if origin_info: browse_view_name = "browse-origin-directory" else: browse_view_name = "browse-snapshot-directory" breadcrumbs = _build_breadcrumbs(snapshot_context, path) path = "" if path is None else (path + "/") for d in dirs: if d["type"] == "rev": d["url"] = reverse("browse-revision", url_args={"sha1_git": d["target"]}) else: query_params["path"] = path + d["name"] d["url"] = reverse( browse_view_name, url_args=url_args, query_params=query_params ) sum_file_sizes = 0 readmes = {} if origin_info: browse_view_name = "browse-origin-content" else: browse_view_name = "browse-snapshot-content" for f in files: query_params["path"] = path + f["name"] f["url"] = reverse( browse_view_name, url_args=url_args, query_params=query_params ) if f["length"] is not None: sum_file_sizes += f["length"] f["length"] = filesizeformat(f["length"]) if f["name"].lower().startswith("readme"): readmes[f["name"]] = f["checksums"]["sha1"] readme_name, readme_url, readme_html = get_readme_to_display(readmes) if origin_info: browse_view_name = "browse-origin-log" else: browse_view_name = "browse-snapshot-log" history_url = None if snapshot_id != _empty_snapshot_id: query_params.pop("path", None) history_url = reverse( browse_view_name, url_args=url_args, query_params=query_params ) nb_files = None nb_dirs = None dir_path = None if root_directory: nb_files = len(files) nb_dirs = len(dirs) sum_file_sizes = filesizeformat(sum_file_sizes) dir_path = "/" + path browse_dir_link = gen_directory_link(sha1_git) browse_rev_link = gen_revision_link(revision_id) browse_snp_link = gen_snapshot_link(snapshot_id) revision_found = True if sha1_git is None and revision_id is not None: try: service.lookup_revision(revision_id) except NotFoundExc: revision_found = False swh_objects = [ SWHObjectInfo(object_type=DIRECTORY, object_id=sha1_git), SWHObjectInfo(object_type=REVISION, object_id=revision_id), SWHObjectInfo(object_type=SNAPSHOT, object_id=snapshot_id), ] visit_date = None visit_type = None if visit_info: visit_date = format_utc_iso_date(visit_info["date"]) visit_type = visit_info["type"] release_id = snapshot_context["release_id"] browse_rel_link = None if release_id: swh_objects.append(SWHObjectInfo(object_type=RELEASE, object_id=release_id)) browse_rel_link = gen_release_link(release_id) dir_metadata = DirectoryMetadata( object_type=DIRECTORY, object_id=sha1_git, directory=sha1_git, directory_url=browse_dir_link, nb_files=nb_files, nb_dirs=nb_dirs, sum_file_sizes=sum_file_sizes, root_directory=root_directory, path=dir_path, revision=revision_id, revision_found=revision_found, revision_url=browse_rev_link, release=release_id, release_url=browse_rel_link, snapshot=snapshot_id, snapshot_url=browse_snp_link, origin_url=origin_url, visit_date=visit_date, visit_type=visit_type, ) vault_cooking = { "directory_context": True, "directory_id": sha1_git, "revision_context": True, "revision_id": revision_id, } swhids_info = get_swhids_info(swh_objects, snapshot_context, dir_metadata) dir_path = "/".join([bc["name"] for bc in breadcrumbs]) + "/" context_found = "snapshot: %s" % snapshot_context["snapshot_id"] if origin_info: context_found = "origin: %s" % origin_info["url"] heading = "Directory - %s - %s - %s" % ( dir_path, snapshot_context["branch"], context_found, ) top_right_link = None if not snapshot_context["is_empty"]: top_right_link = { "url": history_url, "icon": swh_object_icons["revisions history"], "text": "History", } return render( request, "browse/directory.html", { "heading": heading, "swh_object_name": "Directory", "swh_object_metadata": dir_metadata, "dirs": dirs, "files": files, "breadcrumbs": breadcrumbs if root_directory else [], "top_right_link": top_right_link, "readme_name": readme_name, "readme_url": readme_url, "readme_html": readme_html, "snapshot_context": snapshot_context, "vault_cooking": vault_cooking, "show_actions": True, "swhids_info": swhids_info, }, ) def browse_snapshot_content( request, snapshot_id=None, origin_url=None, timestamp=None, path=None, selected_language=None, ): """ Django view implementation for browsing a content in a snapshot context. """ try: _check_origin_url(snapshot_id, origin_url) if path is None: raise BadInputExc("The path of a content must be given as query parameter.") snapshot_context = get_snapshot_context( snapshot_id=snapshot_id, origin_url=origin_url, timestamp=timestamp, visit_id=request.GET.get("visit_id"), path=path, browse_context="content", branch_name=request.GET.get("branch"), release_name=request.GET.get("release"), revision_id=request.GET.get("revision"), ) root_directory = snapshot_context["root_directory"] sha1_git = None query_string = None content_data = {} directory_id = None split_path = path.split("/") filename = split_path[-1] filepath = path[: -len(filename)] if root_directory: content_info = service.lookup_directory_with_path(root_directory, path) sha1_git = content_info["target"] query_string = "sha1_git:" + sha1_git content_data = request_content(query_string, raise_if_unavailable=False) if filepath: dir_info = service.lookup_directory_with_path(root_directory, filepath) directory_id = dir_info["target"] else: directory_id = root_directory except Exception as exc: return handle_view_exception(request, exc) revision_id = snapshot_context["revision_id"] origin_info = snapshot_context["origin_info"] visit_info = snapshot_context["visit_info"] snapshot_id = snapshot_context["snapshot_id"] if content_data.get("raw_data") is not None: content_display_data = prepare_content_for_display( content_data["raw_data"], content_data["mimetype"], path ) content_data.update(content_display_data) # Override language with user-selected language if selected_language is not None: content_data["language"] = selected_language available_languages = None if content_data.get("mimetype") is not None and "text/" in content_data["mimetype"]: available_languages = highlightjs.get_supported_languages() breadcrumbs = _build_breadcrumbs(snapshot_context, filepath) breadcrumbs.append({"name": filename, "url": None}) browse_content_link = gen_content_link(sha1_git) content_raw_url = None if query_string: content_raw_url = reverse( "browse-content-raw", url_args={"query_string": query_string}, query_params={"filename": filename}, ) browse_rev_link = gen_revision_link(revision_id) browse_dir_link = gen_directory_link(directory_id) content_checksums = content_data.get("checksums", {}) swh_objects = [ SWHObjectInfo(object_type=CONTENT, object_id=content_checksums.get("sha1_git")), SWHObjectInfo(object_type=DIRECTORY, object_id=directory_id), SWHObjectInfo(object_type=REVISION, object_id=revision_id), SWHObjectInfo(object_type=SNAPSHOT, object_id=snapshot_id), ] visit_date = None visit_type = None if visit_info: visit_date = format_utc_iso_date(visit_info["date"]) visit_type = visit_info["type"] release_id = snapshot_context["release_id"] browse_rel_link = None if release_id: swh_objects.append(SWHObjectInfo(object_type=RELEASE, object_id=release_id)) browse_rel_link = gen_release_link(release_id) content_metadata = ContentMetadata( object_type=CONTENT, object_id=content_checksums.get("sha1_git"), sha1=content_checksums.get("sha1"), sha1_git=content_checksums.get("sha1_git"), sha256=content_checksums.get("sha256"), blake2s256=content_checksums.get("blake2s256"), content_url=browse_content_link, mimetype=content_data.get("mimetype"), encoding=content_data.get("encoding"), size=filesizeformat(content_data.get("length", 0)), language=content_data.get("language"), licenses=content_data.get("licenses"), root_directory=root_directory, path=f"/{filepath}", filename=filename, directory=directory_id, directory_url=browse_dir_link, revision=revision_id, revision_url=browse_rev_link, release=release_id, release_url=browse_rel_link, snapshot=snapshot_id, snapshot_url=gen_snapshot_link(snapshot_id), origin_url=origin_url, visit_date=visit_date, visit_type=visit_type, ) swhids_info = get_swhids_info(swh_objects, snapshot_context, content_metadata) content_path = "/".join([bc["name"] for bc in breadcrumbs]) context_found = "snapshot: %s" % snapshot_context["snapshot_id"] if origin_info: context_found = "origin: %s" % origin_info["url"] heading = "Content - %s - %s - %s" % ( content_path, snapshot_context["branch"], context_found, ) top_right_link = None if not snapshot_context["is_empty"]: top_right_link = { "url": content_raw_url, "icon": swh_object_icons["content"], "text": "Raw File", } return render( request, "browse/content.html", { "heading": heading, "swh_object_name": "Content", "swh_object_metadata": content_metadata, "content": content_data.get("content_data"), "content_size": content_data.get("length"), "max_content_size": content_display_max_size, "filename": filename, "encoding": content_data.get("encoding"), "mimetype": content_data.get("mimetype"), "language": content_data.get("language"), "available_languages": available_languages, "breadcrumbs": breadcrumbs if root_directory else [], "top_right_link": top_right_link, "snapshot_context": snapshot_context, "vault_cooking": None, "show_actions": True, "swhids_info": swhids_info, "error_code": content_data.get("error_code"), "error_message": content_data.get("error_message"), "error_description": content_data.get("error_description"), }, status=content_data.get("error_code", 200), ) PER_PAGE = 100 def browse_snapshot_log(request, snapshot_id=None, origin_url=None, timestamp=None): """ Django view implementation for browsing a revision history in a snapshot context. """ try: _check_origin_url(snapshot_id, origin_url) snapshot_context = get_snapshot_context( snapshot_id=snapshot_id, origin_url=origin_url, timestamp=timestamp, visit_id=request.GET.get("visit_id"), browse_context="log", branch_name=request.GET.get("branch"), release_name=request.GET.get("release"), revision_id=request.GET.get("revision"), ) revision_id = snapshot_context["revision_id"] per_page = int(request.GET.get("per_page", PER_PAGE)) offset = int(request.GET.get("offset", 0)) revs_ordering = request.GET.get("revs_ordering", "committer_date") session_key = "rev_%s_log_ordering_%s" % (revision_id, revs_ordering) rev_log_session = request.session.get(session_key, None) rev_log = [] revs_walker_state = None if rev_log_session: rev_log = rev_log_session["rev_log"] revs_walker_state = rev_log_session["revs_walker_state"] if len(rev_log) < offset + per_page: revs_walker = service.get_revisions_walker( revs_ordering, revision_id, max_revs=offset + per_page + 1, state=revs_walker_state, ) rev_log += [rev["id"] for rev in revs_walker] revs_walker_state = revs_walker.export_state() revs = rev_log[offset : offset + per_page] revision_log = service.lookup_revision_multiple(revs) request.session[session_key] = { "rev_log": rev_log, "revs_walker_state": revs_walker_state, } except Exception as exc: return handle_view_exception(request, exc) origin_info = snapshot_context["origin_info"] visit_info = snapshot_context["visit_info"] url_args = snapshot_context["url_args"] query_params = snapshot_context["query_params"] snapshot_id = snapshot_context["snapshot_id"] query_params["per_page"] = per_page revs_ordering = request.GET.get("revs_ordering", "") query_params["revs_ordering"] = revs_ordering if origin_info: browse_view_name = "browse-origin-log" else: browse_view_name = "browse-snapshot-log" prev_log_url = None if len(rev_log) > offset + per_page: query_params["offset"] = offset + per_page prev_log_url = reverse( browse_view_name, url_args=url_args, query_params=query_params ) next_log_url = None if offset != 0: query_params["offset"] = offset - per_page next_log_url = reverse( browse_view_name, url_args=url_args, query_params=query_params ) revision_log_data = format_log_entries(revision_log, per_page, snapshot_context) browse_rev_link = gen_revision_link(revision_id) browse_log_link = gen_revision_log_link(revision_id) browse_snp_link = gen_snapshot_link(snapshot_id) revision_metadata = { "context-independent revision": browse_rev_link, "context-independent revision history": browse_log_link, "context-independent snapshot": browse_snp_link, "snapshot": snapshot_id, } if origin_info: revision_metadata["origin url"] = origin_info["url"] revision_metadata["origin visit date"] = format_utc_iso_date(visit_info["date"]) revision_metadata["origin visit type"] = visit_info["type"] swh_objects = [ SWHObjectInfo(object_type=REVISION, object_id=revision_id), SWHObjectInfo(object_type=SNAPSHOT, object_id=snapshot_id), ] release_id = snapshot_context["release_id"] if release_id: swh_objects.append(SWHObjectInfo(object_type=RELEASE, object_id=release_id)) browse_rel_link = gen_release_link(release_id) revision_metadata["release"] = release_id revision_metadata["context-independent release"] = browse_rel_link swhids_info = get_swhids_info(swh_objects, snapshot_context) context_found = "snapshot: %s" % snapshot_context["snapshot_id"] if origin_info: context_found = "origin: %s" % origin_info["url"] heading = "Revision history - %s - %s" % (snapshot_context["branch"], context_found) return render( request, "browse/revision-log.html", { "heading": heading, "swh_object_name": "Revisions history", "swh_object_metadata": revision_metadata, "revision_log": revision_log_data, "revs_ordering": revs_ordering, "next_log_url": next_log_url, "prev_log_url": prev_log_url, "breadcrumbs": None, "top_right_link": None, "snapshot_context": snapshot_context, "vault_cooking": None, "show_actions": True, "swhids_info": swhids_info, }, ) def browse_snapshot_branches( request, snapshot_id=None, origin_url=None, timestamp=None ): """ Django view implementation for browsing a list of branches in a snapshot context. """ try: _check_origin_url(snapshot_id, origin_url) snapshot_context = get_snapshot_context( snapshot_id=snapshot_id, origin_url=origin_url, timestamp=timestamp, visit_id=request.GET.get("visit_id"), ) branches_bc = request.GET.get("branches_breadcrumbs", "") branches_bc = branches_bc.split(",") if branches_bc else [] branches_from = branches_bc[-1] if branches_bc else "" origin_info = snapshot_context["origin_info"] url_args = snapshot_context["url_args"] query_params = snapshot_context["query_params"] if origin_info: browse_view_name = "browse-origin-directory" else: browse_view_name = "browse-snapshot-directory" snapshot = service.lookup_snapshot( snapshot_context["snapshot_id"], branches_from, PER_PAGE + 1, target_types=["revision", "alias"], ) displayed_branches, _ = process_snapshot_branches(snapshot) except Exception as exc: return handle_view_exception(request, exc) for branch in displayed_branches: rev_query_params = {} if origin_info: rev_query_params["origin_url"] = origin_info["url"] revision_url = reverse( "browse-revision", url_args={"sha1_git": branch["revision"]}, query_params=query_params, ) query_params["branch"] = branch["name"] directory_url = reverse( browse_view_name, url_args=url_args, query_params=query_params ) del query_params["branch"] branch["revision_url"] = revision_url branch["directory_url"] = directory_url if origin_info: browse_view_name = "browse-origin-branches" else: browse_view_name = "browse-snapshot-branches" prev_branches_url = None next_branches_url = None if branches_bc: query_params_prev = dict(query_params) query_params_prev["branches_breadcrumbs"] = ",".join(branches_bc[:-1]) prev_branches_url = reverse( browse_view_name, url_args=url_args, query_params=query_params_prev ) elif branches_from: prev_branches_url = reverse( browse_view_name, url_args=url_args, query_params=query_params ) if snapshot["next_branch"] is not None: query_params_next = dict(query_params) next_branch = displayed_branches[-1]["name"] del displayed_branches[-1] branches_bc.append(next_branch) query_params_next["branches_breadcrumbs"] = ",".join(branches_bc) next_branches_url = reverse( browse_view_name, url_args=url_args, query_params=query_params_next ) heading = "Branches - " if origin_info: heading += "origin: %s" % origin_info["url"] else: heading += "snapshot: %s" % snapshot_id return render( request, "browse/branches.html", { "heading": heading, "swh_object_name": "Branches", "swh_object_metadata": {}, "top_right_link": None, "displayed_branches": displayed_branches, "prev_branches_url": prev_branches_url, "next_branches_url": next_branches_url, "snapshot_context": snapshot_context, }, ) def browse_snapshot_releases( request, snapshot_id=None, origin_url=None, timestamp=None ): """ Django view implementation for browsing a list of releases in a snapshot context. """ try: _check_origin_url(snapshot_id, origin_url) snapshot_context = get_snapshot_context( snapshot_id=snapshot_id, origin_url=origin_url, timestamp=timestamp, visit_id=request.GET.get("visit_id"), ) rel_bc = request.GET.get("releases_breadcrumbs", "") rel_bc = rel_bc.split(",") if rel_bc else [] rel_from = rel_bc[-1] if rel_bc else "" origin_info = snapshot_context["origin_info"] url_args = snapshot_context["url_args"] query_params = snapshot_context["query_params"] snapshot = service.lookup_snapshot( snapshot_context["snapshot_id"], rel_from, PER_PAGE + 1, target_types=["release", "alias"], ) _, displayed_releases = process_snapshot_branches(snapshot) except Exception as exc: return handle_view_exception(request, exc) for release in displayed_releases: query_params_tgt = {"snapshot": snapshot_id} if origin_info: query_params_tgt["origin_url"] = origin_info["url"] release_url = reverse( "browse-release", url_args={"sha1_git": release["id"]}, query_params=query_params_tgt, ) target_url = "" if release["target_type"] == "revision": target_url = reverse( "browse-revision", url_args={"sha1_git": release["target"]}, query_params=query_params_tgt, ) elif release["target_type"] == "directory": target_url = reverse( "browse-directory", url_args={"sha1_git": release["target"]}, query_params=query_params_tgt, ) elif release["target_type"] == "content": target_url = reverse( "browse-content", url_args={"query_string": release["target"]}, query_params=query_params_tgt, ) elif release["target_type"] == "release": target_url = reverse( "browse-release", url_args={"sha1_git": release["target"]}, query_params=query_params_tgt, ) release["release_url"] = release_url release["target_url"] = target_url if origin_info: browse_view_name = "browse-origin-releases" else: browse_view_name = "browse-snapshot-releases" prev_releases_url = None next_releases_url = None if rel_bc: query_params_prev = dict(query_params) query_params_prev["releases_breadcrumbs"] = ",".join(rel_bc[:-1]) prev_releases_url = reverse( browse_view_name, url_args=url_args, query_params=query_params_prev ) elif rel_from: prev_releases_url = reverse( browse_view_name, url_args=url_args, query_params=query_params ) if snapshot["next_branch"] is not None: query_params_next = dict(query_params) next_rel = displayed_releases[-1]["branch_name"] del displayed_releases[-1] rel_bc.append(next_rel) query_params_next["releases_breadcrumbs"] = ",".join(rel_bc) next_releases_url = reverse( browse_view_name, url_args=url_args, query_params=query_params_next ) heading = "Releases - " if origin_info: heading += "origin: %s" % origin_info["url"] else: heading += "snapshot: %s" % snapshot_id return render( request, "browse/releases.html", { "heading": heading, "top_panel_visible": False, "top_panel_collapsible": False, "swh_object_name": "Releases", "swh_object_metadata": {}, "top_right_link": None, "displayed_releases": displayed_releases, "prev_releases_url": prev_releases_url, "next_releases_url": next_releases_url, "snapshot_context": snapshot_context, "vault_cooking": None, "show_actions": False, }, ) diff --git a/swh/web/browse/views/origin.py b/swh/web/browse/views/origin.py index 4ca0d2c4..5bda0c27 100644 --- a/swh/web/browse/views/origin.py +++ b/swh/web/browse/views/origin.py @@ -1,320 +1,320 @@ # Copyright (C) 2017-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information from django.shortcuts import render, redirect from swh.web.browse.browseurls import browse_route from swh.web.browse.snapshot_context import ( browse_snapshot_directory, browse_snapshot_content, browse_snapshot_log, browse_snapshot_branches, browse_snapshot_releases, get_snapshot_context, ) from swh.web.common import service from swh.web.common.exc import handle_view_exception, BadInputExc from swh.web.common.origin_visits import get_origin_visits -from swh.web.common.utils import reverse, format_utc_iso_date, parse_timestamp +from swh.web.common.utils import reverse, format_utc_iso_date, parse_iso8601_date_to_utc @browse_route( r"origin/directory/", view_name="browse-origin-directory", ) def origin_directory_browse(request): """Django view for browsing the content of a directory associated to an origin for a given visit. The URL that points to it is :http:get:`/browse/origin/directory/` """ return browse_snapshot_directory( request, origin_url=request.GET.get("origin_url"), snapshot_id=request.GET.get("snapshot"), timestamp=request.GET.get("timestamp"), path=request.GET.get("path"), ) @browse_route( r"origin/(?P.+)/visit/(?P.+)/directory/", r"origin/(?P.+)/visit/(?P.+)/directory/(?P.+)/", r"origin/(?P.+)/directory/(?P.+)/", r"origin/(?P.+)/directory/", view_name="browse-origin-directory-legacy", ) def origin_directory_browse_legacy(request, origin_url, timestamp=None, path=None): """Django view for browsing the content of a directory associated to an origin for a given visit. The URLs that point to it are :http:get:`/browse/origin/(origin_url)/directory/[(path)/]` and :http:get:`/browse/origin/(origin_url)/visit/(timestamp)/directory/[(path)/]` """ return browse_snapshot_directory( request, origin_url=origin_url, snapshot_id=request.GET.get("snapshot"), timestamp=timestamp, path=path, ) @browse_route( r"origin/content/", view_name="browse-origin-content", ) def origin_content_browse(request): """Django view that produces an HTML display of a content associated to an origin for a given visit. The URL that points to it is :http:get:`/browse/origin/content/` """ return browse_snapshot_content( request, origin_url=request.GET.get("origin_url"), snapshot_id=request.GET.get("snapshot"), timestamp=request.GET.get("timestamp"), path=request.GET.get("path"), selected_language=request.GET.get("language"), ) @browse_route( r"origin/(?P.+)/visit/(?P.+)/content/(?P.+)/", r"origin/(?P.+)/content/(?P.+)/", r"origin/(?P.+)/content/", view_name="browse-origin-content-legacy", ) def origin_content_browse_legacy(request, origin_url, path=None, timestamp=None): """Django view that produces an HTML display of a content associated to an origin for a given visit. The URLs that point to it are :http:get:`/browse/origin/(origin_url)/content/(path)/` and :http:get:`/browse/origin/(origin_url)/visit/(timestamp)/content/(path)/` """ return browse_snapshot_content( request, origin_url=origin_url, snapshot_id=request.GET.get("snapshot"), timestamp=timestamp, path=path, selected_language=request.GET.get("language"), ) @browse_route( r"origin/log/", view_name="browse-origin-log", ) def origin_log_browse(request): """Django view that produces an HTML display of revisions history (aka the commit log) associated to a software origin. The URL that points to it is :http:get:`/browse/origin/log/` """ return browse_snapshot_log( request, origin_url=request.GET.get("origin_url"), snapshot_id=request.GET.get("snapshot"), timestamp=request.GET.get("timestamp"), ) @browse_route( r"origin/(?P.+)/visit/(?P.+)/log/", r"origin/(?P.+)/log/", view_name="browse-origin-log-legacy", ) def origin_log_browse_legacy(request, origin_url, timestamp=None): """Django view that produces an HTML display of revisions history (aka the commit log) associated to a software origin. The URLs that point to it are :http:get:`/browse/origin/(origin_url)/log/` and :http:get:`/browse/origin/(origin_url)/visit/(timestamp)/log/` """ return browse_snapshot_log( request, origin_url=origin_url, snapshot_id=request.GET.get("snapshot"), timestamp=timestamp, ) @browse_route( r"origin/branches/", view_name="browse-origin-branches", ) def origin_branches_browse(request): """Django view that produces an HTML display of the list of branches associated to an origin for a given visit. The URL that points to it is :http:get:`/browse/origin/branches/` """ return browse_snapshot_branches( request, origin_url=request.GET.get("origin_url"), snapshot_id=request.GET.get("snapshot"), timestamp=request.GET.get("timestamp"), ) @browse_route( r"origin/(?P.+)/visit/(?P.+)/branches/", r"origin/(?P.+)/branches/", view_name="browse-origin-branches-legacy", ) def origin_branches_browse_legacy(request, origin_url, timestamp=None): """Django view that produces an HTML display of the list of branches associated to an origin for a given visit. The URLs that point to it are :http:get:`/browse/origin/(origin_url)/branches/` and :http:get:`/browse/origin/(origin_url)/visit/(timestamp)/branches/` """ return browse_snapshot_branches( request, origin_url=origin_url, snapshot_id=request.GET.get("snapshot"), timestamp=timestamp, ) @browse_route( r"origin/releases/", view_name="browse-origin-releases", ) def origin_releases_browse(request): """Django view that produces an HTML display of the list of releases associated to an origin for a given visit. The URL that points to it is :http:get:`/browse/origin/releases/` """ return browse_snapshot_releases( request, origin_url=request.GET.get("origin_url"), snapshot_id=request.GET.get("snapshot"), timestamp=request.GET.get("timestamp"), ) @browse_route( r"origin/(?P.+)/visit/(?P.+)/releases/", r"origin/(?P.+)/releases/", view_name="browse-origin-releases-legacy", ) def origin_releases_browse_legacy(request, origin_url, timestamp=None): """Django view that produces an HTML display of the list of releases associated to an origin for a given visit. The URLs that point to it are :http:get:`/browse/origin/(origin_url)/releases/` and :http:get:`/browse/origin/(origin_url)/visit/(timestamp)/releases/` """ return browse_snapshot_releases( request, origin_url=origin_url, snapshot_id=request.GET.get("snapshot"), timestamp=timestamp, ) def _origin_visits_browse(request, origin_url): try: if origin_url is None: raise BadInputExc("An origin URL must be provided as query parameter.") origin_info = service.lookup_origin({"url": origin_url}) origin_visits = get_origin_visits(origin_info) snapshot_context = get_snapshot_context(origin_url=origin_url) except Exception as exc: return handle_view_exception(request, exc) for i, visit in enumerate(origin_visits): url_date = format_utc_iso_date(visit["date"], "%Y-%m-%dT%H:%M:%SZ") visit["formatted_date"] = format_utc_iso_date(visit["date"]) query_params = {"origin_url": origin_url, "timestamp": url_date} if i < len(origin_visits) - 1: if visit["date"] == origin_visits[i + 1]["date"]: query_params = {"visit_id": visit["visit"]} if i > 0: if visit["date"] == origin_visits[i - 1]["date"]: query_params = {"visit_id": visit["visit"]} snapshot = visit["snapshot"] if visit["snapshot"] else "" visit["url"] = reverse("browse-origin-directory", query_params=query_params,) if not snapshot: visit["snapshot"] = "" - visit["date"] = parse_timestamp(visit["date"]).timestamp() + visit["date"] = parse_iso8601_date_to_utc(visit["date"]).timestamp() heading = "Origin visits - %s" % origin_url return render( request, "browse/origin-visits.html", { "heading": heading, "swh_object_name": "Visits", "swh_object_metadata": origin_info, "origin_visits": origin_visits, "origin_info": origin_info, "snapshot_context": snapshot_context, "vault_cooking": None, "show_actions": False, }, ) @browse_route(r"origin/visits/", view_name="browse-origin-visits") def origin_visits_browse(request): """Django view that produces an HTML display of visits reporting for a given origin. The URL that points to it is :http:get:`/browse/origin/visits/`. """ return _origin_visits_browse(request, request.GET.get("origin_url")) @browse_route( r"origin/(?P.+)/visits/", view_name="browse-origin-visits-legacy" ) def origin_visits_browse_legacy(request, origin_url): """Django view that produces an HTML display of visits reporting for a given origin. The URL that points to it is :http:get:`/browse/origin/(origin_url)/visits/`. """ return _origin_visits_browse(request, origin_url) @browse_route(r"origin/", view_name="browse-origin") def origin_browse(request): """Django view that redirects to the display of the latest archived snapshot for a given software origin. """ last_snapshot_url = reverse("browse-origin-directory", query_params=request.GET,) return redirect(last_snapshot_url) @browse_route(r"origin/(?P.+)/", view_name="browse-origin-legacy") def origin_browse_legacy(request, origin_url): """Django view that redirects to the display of the latest archived snapshot for a given software origin. """ last_snapshot_url = reverse( "browse-origin-directory", query_params={"origin_url": origin_url, **request.GET}, ) return redirect(last_snapshot_url) diff --git a/swh/web/common/origin_save.py b/swh/web/common/origin_save.py index 58d4fdf0..9632cdbb 100644 --- a/swh/web/common/origin_save.py +++ b/swh/web/common/origin_save.py @@ -1,620 +1,620 @@ # Copyright (C) 2018-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information from bisect import bisect_right from datetime import datetime, timezone, timedelta from itertools import product import json import logging from typing import Any, Dict from django.core.exceptions import ObjectDoesNotExist from django.core.exceptions import ValidationError from django.core.validators import URLValidator from django.utils.html import escape from prometheus_client import Gauge import requests import sentry_sdk from swh.web import config from swh.web.common import service from swh.web.common.exc import BadInputExc, ForbiddenExc, NotFoundExc from swh.web.common.models import ( SaveUnauthorizedOrigin, SaveAuthorizedOrigin, SaveOriginRequest, SAVE_REQUEST_ACCEPTED, SAVE_REQUEST_REJECTED, SAVE_REQUEST_PENDING, SAVE_TASK_NOT_YET_SCHEDULED, SAVE_TASK_SCHEDULED, SAVE_TASK_SUCCEED, SAVE_TASK_FAILED, SAVE_TASK_RUNNING, SAVE_TASK_NOT_CREATED, ) from swh.web.common.origin_visits import get_origin_visits -from swh.web.common.utils import parse_timestamp, SWH_WEB_METRICS_REGISTRY +from swh.web.common.utils import parse_iso8601_date_to_utc, SWH_WEB_METRICS_REGISTRY from swh.scheduler.utils import create_oneshot_task_dict scheduler = config.scheduler() logger = logging.getLogger(__name__) def get_origin_save_authorized_urls(): """ Get the list of origin url prefixes authorized to be immediately loaded into the archive (whitelist). Returns: list: The list of authorized origin url prefix """ return [origin.url for origin in SaveAuthorizedOrigin.objects.all()] def get_origin_save_unauthorized_urls(): """ Get the list of origin url prefixes forbidden to be loaded into the archive (blacklist). Returns: list: the list of unauthorized origin url prefix """ return [origin.url for origin in SaveUnauthorizedOrigin.objects.all()] def can_save_origin(origin_url): """ Check if a software origin can be saved into the archive. Based on the origin url, the save request will be either: * immediately accepted if the url is whitelisted * rejected if the url is blacklisted * put in pending state for manual review otherwise Args: origin_url (str): the software origin url to check Returns: str: the origin save request status, either **accepted**, **rejected** or **pending** """ # origin url may be blacklisted for url_prefix in get_origin_save_unauthorized_urls(): if origin_url.startswith(url_prefix): return SAVE_REQUEST_REJECTED # if the origin url is in the white list, it can be immediately saved for url_prefix in get_origin_save_authorized_urls(): if origin_url.startswith(url_prefix): return SAVE_REQUEST_ACCEPTED # otherwise, the origin url needs to be manually verified return SAVE_REQUEST_PENDING # map visit type to scheduler task # TODO: do not hardcode the task name here (T1157) _visit_type_task = {"git": "load-git", "hg": "load-hg", "svn": "load-svn"} # map scheduler task status to origin save status _save_task_status = { "next_run_not_scheduled": SAVE_TASK_NOT_YET_SCHEDULED, "next_run_scheduled": SAVE_TASK_SCHEDULED, "completed": SAVE_TASK_SUCCEED, "disabled": SAVE_TASK_FAILED, } def get_savable_visit_types(): return sorted(list(_visit_type_task.keys())) def _check_visit_type_savable(visit_type): """ Get the list of visit types that can be performed through a save request. Returns: list: the list of saveable visit types """ allowed_visit_types = ", ".join(get_savable_visit_types()) if visit_type not in _visit_type_task: raise BadInputExc( "Visit of type %s can not be saved! " "Allowed types are the following: %s" % (visit_type, allowed_visit_types) ) _validate_url = URLValidator(schemes=["http", "https", "svn", "git"]) def _check_origin_url_valid(origin_url): try: _validate_url(origin_url) except ValidationError: raise BadInputExc( "The provided origin url (%s) is not valid!" % escape(origin_url) ) def _get_visit_info_for_save_request(save_request): visit_date = None visit_status = None time_now = datetime.now(tz=timezone.utc) time_delta = time_now - save_request.request_date # stop trying to find a visit date one month after save request submission # as those requests to storage are expensive and associated loading task # surely ended up with errors if time_delta.days <= 30: try: origin = {"url": save_request.origin_url} origin_info = service.lookup_origin(origin) origin_visits = get_origin_visits(origin_info) - visit_dates = [parse_timestamp(v["date"]) for v in origin_visits] + visit_dates = [parse_iso8601_date_to_utc(v["date"]) for v in origin_visits] i = bisect_right(visit_dates, save_request.request_date) if i != len(visit_dates): visit_date = visit_dates[i] visit_status = origin_visits[i]["status"] if origin_visits[i]["status"] == "ongoing": visit_date = None except Exception as exc: sentry_sdk.capture_exception(exc) return visit_date, visit_status def _check_visit_update_status(save_request, save_task_status): visit_date, visit_status = _get_visit_info_for_save_request(save_request) save_request.visit_date = visit_date # visit has been performed, mark the saving task as succeed if visit_date and visit_status is not None: save_task_status = SAVE_TASK_SUCCEED elif visit_status == "ongoing": save_task_status = SAVE_TASK_RUNNING else: time_now = datetime.now(tz=timezone.utc) time_delta = time_now - save_request.request_date # consider the task as failed if it is still in scheduled state # 30 days after its submission if time_delta.days > 30: save_task_status = SAVE_TASK_FAILED return visit_date, save_task_status def _save_request_dict(save_request, task=None): must_save = False visit_date = save_request.visit_date # save task still in scheduler db if task: save_task_status = _save_task_status[task["status"]] # Consider request from which a visit date has already been found # as succeeded to avoid retrieving it again if save_task_status == SAVE_TASK_SCHEDULED and visit_date: save_task_status = SAVE_TASK_SUCCEED if save_task_status in (SAVE_TASK_FAILED, SAVE_TASK_SUCCEED) and not visit_date: visit_date, _ = _get_visit_info_for_save_request(save_request) save_request.visit_date = visit_date must_save = True # Check tasks still marked as scheduled / not yet scheduled if save_task_status in (SAVE_TASK_SCHEDULED, SAVE_TASK_NOT_YET_SCHEDULED): visit_date, save_task_status = _check_visit_update_status( save_request, save_task_status ) # save task may have been archived else: save_task_status = save_request.loading_task_status if save_task_status in (SAVE_TASK_SCHEDULED, SAVE_TASK_NOT_YET_SCHEDULED): visit_date, save_task_status = _check_visit_update_status( save_request, save_task_status ) else: save_task_status = save_request.loading_task_status if save_request.loading_task_status != save_task_status: save_request.loading_task_status = save_task_status must_save = True if must_save: save_request.save() return { "id": save_request.id, "visit_type": save_request.visit_type, "origin_url": save_request.origin_url, "save_request_date": save_request.request_date.isoformat(), "save_request_status": save_request.status, "save_task_status": save_task_status, "visit_date": visit_date.isoformat() if visit_date else None, } def create_save_origin_request(visit_type, origin_url): """ Create a loading task to save a software origin into the archive. This function aims to create a software origin loading task trough the use of the swh-scheduler component. First, some checks are performed to see if the visit type and origin url are valid but also if the the save request can be accepted. If those checks passed, the loading task is then created. Otherwise, the save request is put in pending or rejected state. All the submitted save requests are logged into the swh-web database to keep track of them. Args: visit_type (str): the type of visit to perform (currently only ``git`` but ``svn`` and ``hg`` will soon be available) origin_url (str): the url of the origin to save Raises: BadInputExc: the visit type or origin url is invalid ForbiddenExc: the provided origin url is blacklisted Returns: dict: A dict describing the save request with the following keys: * **visit_type**: the type of visit to perform * **origin_url**: the url of the origin * **save_request_date**: the date the request was submitted * **save_request_status**: the request status, either **accepted**, **rejected** or **pending** * **save_task_status**: the origin loading task status, either **not created**, **not yet scheduled**, **scheduled**, **succeed** or **failed** """ _check_visit_type_savable(visit_type) _check_origin_url_valid(origin_url) save_request_status = can_save_origin(origin_url) task = None # if the origin save request is accepted, create a scheduler # task to load it into the archive if save_request_status == SAVE_REQUEST_ACCEPTED: # create a task with high priority kwargs = { "priority": "high", "url": origin_url, } sor = None # get list of previously sumitted save requests current_sors = list( SaveOriginRequest.objects.filter( visit_type=visit_type, origin_url=origin_url ) ) can_create_task = False # if no save requests previously submitted, create the scheduler task if not current_sors: can_create_task = True else: # get the latest submitted save request sor = current_sors[0] # if it was in pending state, we need to create the scheduler task # and update the save request info in the database if sor.status == SAVE_REQUEST_PENDING: can_create_task = True # a task has already been created to load the origin elif sor.loading_task_id != -1: # get the scheduler task and its status tasks = scheduler.get_tasks([sor.loading_task_id]) task = tasks[0] if tasks else None task_status = _save_request_dict(sor, task)["save_task_status"] # create a new scheduler task only if the previous one has been # already executed if task_status == SAVE_TASK_FAILED or task_status == SAVE_TASK_SUCCEED: can_create_task = True sor = None else: can_create_task = False if can_create_task: # effectively create the scheduler task task_dict = create_oneshot_task_dict(_visit_type_task[visit_type], **kwargs) task = scheduler.create_tasks([task_dict])[0] # pending save request has been accepted if sor: sor.status = SAVE_REQUEST_ACCEPTED sor.loading_task_id = task["id"] sor.save() else: sor = SaveOriginRequest.objects.create( visit_type=visit_type, origin_url=origin_url, status=save_request_status, loading_task_id=task["id"], ) # save request must be manually reviewed for acceptation elif save_request_status == SAVE_REQUEST_PENDING: # check if there is already such a save request already submitted, # no need to add it to the database in that case try: sor = SaveOriginRequest.objects.get( visit_type=visit_type, origin_url=origin_url, status=save_request_status ) # if not add it to the database except ObjectDoesNotExist: sor = SaveOriginRequest.objects.create( visit_type=visit_type, origin_url=origin_url, status=save_request_status ) # origin can not be saved as its url is blacklisted, # log the request to the database anyway else: sor = SaveOriginRequest.objects.create( visit_type=visit_type, origin_url=origin_url, status=save_request_status ) if save_request_status == SAVE_REQUEST_REJECTED: raise ForbiddenExc( ( 'The "save code now" request has been rejected ' "because the provided origin url is blacklisted." ) ) return _save_request_dict(sor, task) def get_save_origin_requests_from_queryset(requests_queryset): """ Get all save requests from a SaveOriginRequest queryset. Args: requests_queryset (django.db.models.QuerySet): input SaveOriginRequest queryset Returns: list: A list of save origin requests dict as described in :func:`swh.web.common.origin_save.create_save_origin_request` """ task_ids = [] for sor in requests_queryset: task_ids.append(sor.loading_task_id) save_requests = [] if task_ids: tasks = scheduler.get_tasks(task_ids) tasks = {task["id"]: task for task in tasks} for sor in requests_queryset: sr_dict = _save_request_dict(sor, tasks.get(sor.loading_task_id)) save_requests.append(sr_dict) return save_requests def get_save_origin_requests(visit_type, origin_url): """ Get all save requests for a given software origin. Args: visit_type (str): the type of visit origin_url (str): the url of the origin Raises: BadInputExc: the visit type or origin url is invalid swh.web.common.exc.NotFoundExc: no save requests can be found for the given origin Returns: list: A list of save origin requests dict as described in :func:`swh.web.common.origin_save.create_save_origin_request` """ _check_visit_type_savable(visit_type) _check_origin_url_valid(origin_url) sors = SaveOriginRequest.objects.filter( visit_type=visit_type, origin_url=origin_url ) if sors.count() == 0: raise NotFoundExc( ("No save requests found for visit of type " "%s on origin with url %s.") % (visit_type, origin_url) ) return get_save_origin_requests_from_queryset(sors) def get_save_origin_task_info( save_request_id: int, full_info: bool = True ) -> Dict[str, Any]: """ Get detailed information about an accepted save origin request and its associated loading task. If the associated loading task info is archived and removed from the scheduler database, returns an empty dictionary. Args: save_request_id: identifier of a save origin request full_info: whether to return detailed info for staff users Returns: A dictionary with the following keys: - **type**: loading task type - **arguments**: loading task arguments - **id**: loading task database identifier - **backend_id**: loading task celery identifier - **scheduled**: loading task scheduling date - **ended**: loading task termination date - **status**: loading task execution status Depending on the availability of the task logs in the elasticsearch cluster of Software Heritage, the returned dictionary may also contain the following keys: - **name**: associated celery task name - **message**: relevant log message from task execution - **duration**: task execution time (only if it succeeded) - **worker**: name of the worker that executed the task """ try: save_request = SaveOriginRequest.objects.get(id=save_request_id) except ObjectDoesNotExist: return {} task = scheduler.get_tasks([save_request.loading_task_id]) task = task[0] if task else None if task is None: return {} task_run = scheduler.get_task_runs([task["id"]]) task_run = task_run[0] if task_run else None if task_run is None: return {} task_run["type"] = task["type"] task_run["arguments"] = task["arguments"] task_run["id"] = task_run["task"] del task_run["task"] del task_run["metadata"] es_workers_index_url = config.get_config()["es_workers_index_url"] if not es_workers_index_url: return task_run es_workers_index_url += "/_search" if save_request.visit_date: min_ts = save_request.visit_date max_ts = min_ts + timedelta(days=7) else: min_ts = save_request.request_date max_ts = min_ts + timedelta(days=30) min_ts_unix = int(min_ts.timestamp()) * 1000 max_ts_unix = int(max_ts.timestamp()) * 1000 save_task_status = _save_task_status[task["status"]] priority = "3" if save_task_status == SAVE_TASK_FAILED else "6" query = { "bool": { "must": [ {"match_phrase": {"priority": {"query": priority}}}, {"match_phrase": {"swh_task_id": {"query": task_run["backend_id"]}}}, { "range": { "@timestamp": { "gte": min_ts_unix, "lte": max_ts_unix, "format": "epoch_millis", } } }, ] } } try: response = requests.post( es_workers_index_url, json={"query": query, "sort": ["@timestamp"]}, timeout=30, ) results = json.loads(response.text) if results["hits"]["total"]["value"] >= 1: task_run_info = results["hits"]["hits"][-1]["_source"] if "swh_logging_args_runtime" in task_run_info: duration = task_run_info["swh_logging_args_runtime"] task_run["duration"] = duration if "message" in task_run_info: task_run["message"] = task_run_info["message"] if "swh_logging_args_name" in task_run_info: task_run["name"] = task_run_info["swh_logging_args_name"] elif "swh_task_name" in task_run_info: task_run["name"] = task_run_info["swh_task_name"] if "hostname" in task_run_info: task_run["worker"] = task_run_info["hostname"] elif "host" in task_run_info: task_run["worker"] = task_run_info["host"] except Exception as exc: logger.warning("Request to Elasticsearch failed\n%s", exc) sentry_sdk.capture_exception(exc) if not full_info: for field in ("id", "backend_id", "worker"): # remove some staff only fields task_run.pop(field, None) if "message" in task_run and "Loading failure" in task_run["message"]: # hide traceback for non staff users, only display exception message_lines = task_run["message"].split("\n") message = "" for line in message_lines: if line.startswith("Traceback"): break message += f"{line}\n" message += message_lines[-1] task_run["message"] = message return task_run SUBMITTED_SAVE_REQUESTS_METRIC = "swh_web_submitted_save_requests" _submitted_save_requests_gauge = Gauge( name=SUBMITTED_SAVE_REQUESTS_METRIC, documentation="Number of submitted origin save requests", labelnames=["status", "visit_type"], registry=SWH_WEB_METRICS_REGISTRY, ) ACCEPTED_SAVE_REQUESTS_METRIC = "swh_web_accepted_save_requests" _accepted_save_requests_gauge = Gauge( name=ACCEPTED_SAVE_REQUESTS_METRIC, documentation="Number of accepted origin save requests", labelnames=["load_task_status", "visit_type"], registry=SWH_WEB_METRICS_REGISTRY, ) def compute_save_requests_metrics(): """Compute a couple of Prometheus metrics related to origin save requests""" request_statuses = ( SAVE_REQUEST_ACCEPTED, SAVE_REQUEST_REJECTED, SAVE_REQUEST_PENDING, ) load_task_statuses = ( SAVE_TASK_NOT_CREATED, SAVE_TASK_NOT_YET_SCHEDULED, SAVE_TASK_SCHEDULED, SAVE_TASK_SUCCEED, SAVE_TASK_FAILED, SAVE_TASK_RUNNING, ) visit_types = get_savable_visit_types() labels_set = product(request_statuses, visit_types) for labels in labels_set: _submitted_save_requests_gauge.labels(*labels).set(0) labels_set = product(load_task_statuses, visit_types) for labels in labels_set: _accepted_save_requests_gauge.labels(*labels).set(0) for sor in SaveOriginRequest.objects.all(): if sor.status == SAVE_REQUEST_ACCEPTED: _accepted_save_requests_gauge.labels( load_task_status=sor.loading_task_status, visit_type=sor.visit_type ).inc() _submitted_save_requests_gauge.labels( status=sor.status, visit_type=sor.visit_type ).inc() diff --git a/swh/web/common/origin_visits.py b/swh/web/common/origin_visits.py index 5e7b0d2e..8f63d5b1 100644 --- a/swh/web/common/origin_visits.py +++ b/swh/web/common/origin_visits.py @@ -1,193 +1,193 @@ # Copyright (C) 2018-2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information import math -from typing import List, Optional, Union +from typing import List, Optional from django.core.cache import cache from swh.web.common.exc import NotFoundExc from swh.web.common.typing import OriginInfo, OriginVisitInfo -from swh.web.common.utils import parse_timestamp +from swh.web.common.utils import parse_iso8601_date_to_utc def get_origin_visits(origin_info: OriginInfo) -> List[OriginVisitInfo]: """Function that returns the list of visits for a swh origin. That list is put in cache in order to speedup the navigation in the swh web browse ui. The returned visits are sorted according to their date in ascending order. Args: origin_info: dict describing the origin to fetch visits from Returns: A list of dict describing the origin visits Raises: swh.web.common.exc.NotFoundExc: if the origin is not found """ from swh.web.common import service if "url" in origin_info: origin_url = origin_info["url"] else: origin_url = service.lookup_origin(origin_info)["url"] cache_entry_id = "origin_visits_%s" % origin_url cache_entry = cache.get(cache_entry_id) if cache_entry: last_visit = cache_entry[-1]["visit"] new_visits = list( service.lookup_origin_visits(origin_url, last_visit=last_visit) ) if not new_visits: last_snp = service.lookup_latest_origin_snapshot(origin_url) if not last_snp or last_snp["id"] == cache_entry[-1]["snapshot"]: return cache_entry origin_visits = [] per_page = service.MAX_LIMIT last_visit = None while 1: visits = list( service.lookup_origin_visits( origin_url, last_visit=last_visit, per_page=per_page ) ) origin_visits += visits if len(visits) < per_page: break else: if not last_visit: last_visit = per_page else: last_visit += per_page def _visit_sort_key(visit): - ts = parse_timestamp(visit["date"]).timestamp() + ts = parse_iso8601_date_to_utc(visit["date"]).timestamp() return ts + (float(visit["visit"]) / 10e3) origin_visits = sorted(origin_visits, key=lambda v: _visit_sort_key(v)) cache.set(cache_entry_id, origin_visits) return origin_visits def get_origin_visit( origin_info: OriginInfo, - visit_ts: Optional[Union[int, str]] = None, + visit_ts: Optional[str] = None, visit_id: Optional[int] = None, snapshot_id: Optional[str] = None, ) -> OriginVisitInfo: """Function that returns information about a visit for a given origin. If a timestamp is provided, the closest visit from that timestamp is returned. If a snapshot identifier is provided, the first visit with that snapshot is returned. If no search hints are provided, return the most recent full visit with a valid snapshot or the most recent partial visit with a valid snapshot otherwise. Args: origin_info: a dict filled with origin information - visit_ts: an ISO date string or Unix timestamp to parse + visit_ts: an ISO 8601 datetime string to parse snapshot_id: a snapshot identifier Returns: A dict containing the visit info. Raises: swh.web.common.exc.NotFoundExc: if no visit can be found """ if not visit_ts and not visit_id and not snapshot_id: from swh.web.common import service # returns the latest full visit with a valid snapshot visit = service.lookup_origin_visit_latest( origin_info["url"], allowed_statuses=["full"], require_snapshot=True ) if not visit: # or the latest partial visit with a valid snapshot otherwise visit = service.lookup_origin_visit_latest( origin_info["url"], allowed_statuses=["partial"], require_snapshot=True ) if visit: return visit else: raise NotFoundExc( f"No valid visit for origin with url {origin_info['url']} found!" ) visits = get_origin_visits(origin_info) if not visits: raise NotFoundExc( f"No visits associated to origin with url {origin_info['url']}!" ) if snapshot_id: visits = [v for v in visits if v["snapshot"] == snapshot_id] if len(visits) == 0: raise NotFoundExc( ( "Visit for snapshot with id %s for origin with" " url %s not found!" % (snapshot_id, origin_info["url"]) ) ) return visits[0] if visit_id: visits = [v for v in visits if v["visit"] == int(visit_id)] if len(visits) == 0: raise NotFoundExc( ( "Visit with id %s for origin with" " url %s not found!" % (visit_id, origin_info["url"]) ) ) return visits[0] if visit_ts: - target_visit_ts = math.floor(parse_timestamp(visit_ts).timestamp()) + target_visit_ts = math.floor(parse_iso8601_date_to_utc(visit_ts).timestamp()) # Find the visit with date closest to the target (in absolute value) (abs_time_delta, visit_idx) = min( ( - (math.floor(parse_timestamp(visit["date"]).timestamp()), i) + (math.floor(parse_iso8601_date_to_utc(visit["date"]).timestamp()), i) for (i, visit) in enumerate(visits) ), key=lambda ts_and_i: abs(ts_and_i[0] - target_visit_ts), ) if visit_idx is not None: visit = visits[visit_idx] # If multiple visits have the same date, select the one with # the largest id. while ( visit_idx < len(visits) - 1 and visit["date"] == visits[visit_idx + 1]["date"] ): visit_idx = visit_idx + 1 visit = visits[visit_idx] return visit else: raise NotFoundExc( ( "Visit with timestamp %s for origin with " "url %s not found!" % (visit_ts, origin_info["url"]) ) ) return visits[-1] diff --git a/swh/web/common/utils.py b/swh/web/common/utils.py index 9ff3fb7e..7d64deb1 100644 --- a/swh/web/common/utils.py +++ b/swh/web/common/utils.py @@ -1,365 +1,357 @@ # Copyright (C) 2017-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information import re from datetime import datetime, timezone -from dateutil import parser as date_parser -from dateutil import tz - from typing import Optional, Dict, Any import docutils.parsers.rst import docutils.utils from bs4 import BeautifulSoup from docutils.core import publish_parts from docutils.writers.html5_polyglot import Writer, HTMLTranslator from django.urls import reverse as django_reverse from django.http import QueryDict, HttpRequest +from iso8601 import parse_date, ParseError + from prometheus_client.registry import CollectorRegistry from rest_framework.authentication import SessionAuthentication from swh.web.common.exc import BadInputExc from swh.web.common.typing import QueryParameters from swh.web.config import get_config SWH_WEB_METRICS_REGISTRY = CollectorRegistry(auto_describe=True) swh_object_icons = { "branch": "mdi mdi-source-branch", "branches": "mdi mdi-source-branch", "content": "mdi mdi-file-document", "directory": "mdi mdi-folder", "origin": "mdi mdi-source-repository", "person": "mdi mdi-account", "revisions history": "mdi mdi-history", "release": "mdi mdi-tag", "releases": "mdi mdi-tag", "revision": "mdi mdi-rotate-90 mdi-source-commit", "snapshot": "mdi mdi-camera", "visits": "mdi mdi-calendar-month", } def reverse( viewname: str, url_args: Optional[Dict[str, Any]] = None, query_params: Optional[QueryParameters] = None, current_app: Optional[str] = None, urlconf: Optional[str] = None, request: Optional[HttpRequest] = None, ) -> str: """An override of django reverse function supporting query parameters. Args: viewname: the name of the django view from which to compute a url url_args: dictionary of url arguments indexed by their names query_params: dictionary of query parameters to append to the reversed url current_app: the name of the django app tighten to the view urlconf: url configuration module request: build an absolute URI if provided Returns: str: the url of the requested view with processed arguments and query parameters """ if url_args: url_args = {k: v for k, v in url_args.items() if v is not None} url = django_reverse( viewname, urlconf=urlconf, kwargs=url_args, current_app=current_app ) if query_params: query_params = {k: v for k, v in query_params.items() if v} if query_params and len(query_params) > 0: query_dict = QueryDict("", mutable=True) for k in sorted(query_params.keys()): query_dict[k] = query_params[k] url += "?" + query_dict.urlencode(safe="/;:") if request is not None: url = request.build_absolute_uri(url) return url def datetime_to_utc(date): """Returns datetime in UTC without timezone info Args: date (datetime.datetime): input datetime with timezone info Returns: datetime.datetime: datetime in UTC without timezone info """ - if date.tzinfo: - return date.astimezone(tz.gettz("UTC")).replace(tzinfo=timezone.utc) + if date.tzinfo and date.tzinfo != timezone.utc: + return date.astimezone(tz=timezone.utc) else: return date -def parse_timestamp(timestamp): - """Given a time or timestamp (as string), parse the result as UTC datetime. +def parse_iso8601_date_to_utc(iso_date: str) -> datetime: + """Given an ISO 8601 datetime string, parse the result as UTC datetime. Returns: - datetime.datetime: a timezone-aware datetime representing the - parsed value or None if the parsing fails. + a timezone-aware datetime representing the parsed date + + Raises: + swh.web.common.exc.BadInputExc: provided date does not respect ISO 8601 format Samples: - 2016-01-12 - 2016-01-12T09:19:12+0100 - - Today is January 1, 2047 at 8:21:00AM - - 1452591542 + - 2007-01-14T20:34:22Z """ - if not timestamp: - return None - try: - date = date_parser.parse(timestamp, ignoretz=False, fuzzy=True) + date = parse_date(iso_date) return datetime_to_utc(date) - except Exception: - try: - return datetime.utcfromtimestamp(float(timestamp)).replace( - tzinfo=timezone.utc - ) - except (ValueError, OverflowError) as e: - raise BadInputExc(e) + except ParseError as e: + raise BadInputExc(e) def shorten_path(path): """Shorten the given path: for each hash present, only return the first 8 characters followed by an ellipsis""" sha256_re = r"([0-9a-f]{8})[0-9a-z]{56}" sha1_re = r"([0-9a-f]{8})[0-9a-f]{32}" ret = re.sub(sha256_re, r"\1...", path) return re.sub(sha1_re, r"\1...", ret) def format_utc_iso_date(iso_date, fmt="%d %B %Y, %H:%M UTC"): - """Turns a string representation of an ISO 8601 date string + """Turns a string representation of an ISO 8601 datetime string to UTC and format it into a more human readable one. For instance, from the following input string: '2017-05-04T13:27:13+02:00' the following one is returned: '04 May 2017, 11:27 UTC'. Custom format string may also be provided as parameter Args: iso_date (str): a string representation of an ISO 8601 date fmt (str): optional date formatting string Returns: str: a formatted string representation of the input iso date """ if not iso_date: return iso_date - date = parse_timestamp(iso_date) + date = parse_iso8601_date_to_utc(iso_date) return date.strftime(fmt) def gen_path_info(path): """Function to generate path data navigation for use with a breadcrumb in the swh web ui. For instance, from a path /folder1/folder2/folder3, it returns the following list:: [{'name': 'folder1', 'path': 'folder1'}, {'name': 'folder2', 'path': 'folder1/folder2'}, {'name': 'folder3', 'path': 'folder1/folder2/folder3'}] Args: path: a filesystem path Returns: list: a list of path data for navigation as illustrated above. """ path_info = [] if path: sub_paths = path.strip("/").split("/") path_from_root = "" for p in sub_paths: path_from_root += "/" + p path_info.append({"name": p, "path": path_from_root.strip("/")}) return path_info def parse_rst(text, report_level=2): """ Parse a reStructuredText string with docutils. Args: text (str): string with reStructuredText markups in it report_level (int): level of docutils report messages to print (1 info 2 warning 3 error 4 severe 5 none) Returns: docutils.nodes.document: a parsed docutils document """ parser = docutils.parsers.rst.Parser() components = (docutils.parsers.rst.Parser,) settings = docutils.frontend.OptionParser( components=components ).get_default_values() settings.report_level = report_level document = docutils.utils.new_document("rst-doc", settings=settings) parser.parse(text, document) return document def get_client_ip(request): """ Return the client IP address from an incoming HTTP request. Args: request (django.http.HttpRequest): the incoming HTTP request Returns: str: The client IP address """ x_forwarded_for = request.META.get("HTTP_X_FORWARDED_FOR") if x_forwarded_for: ip = x_forwarded_for.split(",")[0] else: ip = request.META.get("REMOTE_ADDR") return ip browsers_supported_image_mimes = set( [ "image/gif", "image/png", "image/jpeg", "image/bmp", "image/webp", "image/svg", "image/svg+xml", ] ) def context_processor(request): """ Django context processor used to inject variables in all swh-web templates. """ config = get_config() if ( hasattr(request, "user") and request.user.is_authenticated and not hasattr(request.user, "backend") ): # To avoid django.template.base.VariableDoesNotExist errors # when rendering templates when standard Django user is logged in. request.user.backend = "django.contrib.auth.backends.ModelBackend" return { "swh_object_icons": swh_object_icons, "available_languages": None, "swh_client_config": config["client_config"], "oidc_enabled": bool(config["keycloak"]["server_url"]), "browsers_supported_image_mimes": browsers_supported_image_mimes, } class EnforceCSRFAuthentication(SessionAuthentication): """ Helper class to enforce CSRF validation on a DRF view when a user is not authenticated. """ def authenticate(self, request): user = getattr(request._request, "user", None) self.enforce_csrf(request) return (user, None) def resolve_branch_alias( snapshot: Dict[str, Any], branch: Optional[Dict[str, Any]] ) -> Optional[Dict[str, Any]]: """ Resolve branch alias in snapshot content. Args: snapshot: a full snapshot content branch: a branch alias contained in the snapshot Returns: The real snapshot branch that got aliased. """ while branch and branch["target_type"] == "alias": if branch["target"] in snapshot["branches"]: branch = snapshot["branches"][branch["target"]] else: from swh.web.common import service snp = service.lookup_snapshot( snapshot["id"], branches_from=branch["target"], branches_count=1 ) if snp and branch["target"] in snp["branches"]: branch = snp["branches"][branch["target"]] else: branch = None return branch class _NoHeaderHTMLTranslator(HTMLTranslator): """ Docutils translator subclass to customize the generation of HTML from reST-formatted docstrings """ def __init__(self, document): super().__init__(document) self.body_prefix = [] self.body_suffix = [] _HTML_WRITER = Writer() _HTML_WRITER.translator_class = _NoHeaderHTMLTranslator def rst_to_html(rst: str) -> str: """ Convert reStructuredText document into HTML. Args: rst: A string containing a reStructuredText document Returns: Body content of the produced HTML conversion. """ settings = { "initial_header_level": 2, } pp = publish_parts(rst, writer=_HTML_WRITER, settings_overrides=settings) return f'
{pp["html_body"]}
' def prettify_html(html: str) -> str: """ Prettify an HTML document. Args: html: Input HTML document Returns: The prettified HTML document """ return BeautifulSoup(html, "lxml").prettify() diff --git a/swh/web/tests/browse/views/test_origin.py b/swh/web/tests/browse/views/test_origin.py index dce5954b..f42e2833 100644 --- a/swh/web/tests/browse/views/test_origin.py +++ b/swh/web/tests/browse/views/test_origin.py @@ -1,1313 +1,1244 @@ # Copyright (C) 2017-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information import random import re import string from django.utils.html import escape from hypothesis import given from swh.storage.utils import now from swh.model.hashutil import hash_to_bytes from swh.model.identifiers import CONTENT, DIRECTORY, RELEASE, REVISION, SNAPSHOT from swh.model.model import ( Snapshot, SnapshotBranch, TargetType, OriginVisit, OriginVisitStatus, ) from swh.web.browse.snapshot_context import process_snapshot_branches from swh.web.common.exc import NotFoundExc from swh.web.common.identifiers import gen_swhid from swh.web.common.utils import ( reverse, gen_path_info, format_utc_iso_date, - parse_timestamp, + parse_iso8601_date_to_utc, ) from swh.web.tests.data import get_content, random_sha1 from swh.web.tests.django_asserts import assert_contains, assert_template_used from swh.web.tests.strategies import ( origin, origin_with_multiple_visits, new_origin, new_snapshot, visit_dates, revisions, origin_with_releases, release as existing_release, unknown_revision, ) @given(origin_with_multiple_visits()) def test_origin_visits_browse(client, archive_data, origin): url = reverse("browse-origin-visits", query_params={"origin_url": origin["url"]}) resp = client.get(url) assert resp.status_code == 200 assert_template_used(resp, "browse/origin-visits.html") url = reverse("browse-origin-visits", query_params={"origin_url": origin["url"]}) resp = client.get(url) assert resp.status_code == 200 assert_template_used(resp, "browse/origin-visits.html") visits = archive_data.origin_visit_get(origin["url"]) for v in visits: vdate = format_utc_iso_date(v["date"], "%Y-%m-%dT%H:%M:%SZ") browse_dir_url = reverse( "browse-origin-directory", query_params={"origin_url": origin["url"], "timestamp": vdate}, ) assert_contains(resp, browse_dir_url) _check_origin_link(resp, origin["url"]) @given(origin_with_multiple_visits()) def test_origin_content_view(client, archive_data, origin): origin_visits = archive_data.origin_visit_get(origin["url"]) def _get_archive_data(visit_idx): snapshot = archive_data.snapshot_get(origin_visits[visit_idx]["snapshot"]) head_rev_id = archive_data.snapshot_get_head(snapshot) head_rev = archive_data.revision_get(head_rev_id) dir_content = archive_data.directory_ls(head_rev["directory"]) dir_files = [e for e in dir_content if e["type"] == "file"] dir_file = random.choice(dir_files) branches, releases = process_snapshot_branches(snapshot) return { "branches": branches, "releases": releases, "root_dir_sha1": head_rev["directory"], "content": get_content(dir_file["checksums"]["sha1"]), "visit": origin_visits[visit_idx], } tdata = _get_archive_data(-1) _origin_content_view_test_helper( client, archive_data, origin, origin_visits[-1], tdata["branches"], tdata["releases"], tdata["root_dir_sha1"], tdata["content"], ) _origin_content_view_test_helper( client, archive_data, origin, origin_visits[-1], tdata["branches"], tdata["releases"], tdata["root_dir_sha1"], tdata["content"], timestamp=tdata["visit"]["date"], ) - visit_unix_ts = parse_timestamp(tdata["visit"]["date"]).timestamp() - visit_unix_ts = int(visit_unix_ts) - - _origin_content_view_test_helper( - client, - archive_data, - origin, - origin_visits[-1], - tdata["branches"], - tdata["releases"], - tdata["root_dir_sha1"], - tdata["content"], - timestamp=visit_unix_ts, - ) - _origin_content_view_test_helper( client, archive_data, origin, origin_visits[-1], tdata["branches"], tdata["releases"], tdata["root_dir_sha1"], tdata["content"], snapshot_id=tdata["visit"]["snapshot"], ) tdata = _get_archive_data(0) _origin_content_view_test_helper( client, archive_data, origin, origin_visits[0], tdata["branches"], tdata["releases"], tdata["root_dir_sha1"], tdata["content"], visit_id=tdata["visit"]["visit"], ) _origin_content_view_test_helper( client, archive_data, origin, origin_visits[0], tdata["branches"], tdata["releases"], tdata["root_dir_sha1"], tdata["content"], snapshot_id=tdata["visit"]["snapshot"], ) @given(origin()) def test_origin_root_directory_view(client, archive_data, origin): origin_visits = archive_data.origin_visit_get(origin["url"]) visit = origin_visits[-1] snapshot = archive_data.snapshot_get(visit["snapshot"]) head_rev_id = archive_data.snapshot_get_head(snapshot) head_rev = archive_data.revision_get(head_rev_id) root_dir_sha1 = head_rev["directory"] dir_content = archive_data.directory_ls(root_dir_sha1) branches, releases = process_snapshot_branches(snapshot) - visit_unix_ts = parse_timestamp(visit["date"]).timestamp() - visit_unix_ts = int(visit_unix_ts) _origin_directory_view_test_helper( client, archive_data, origin, visit, branches, releases, root_dir_sha1, dir_content, ) _origin_directory_view_test_helper( client, archive_data, origin, visit, branches, releases, root_dir_sha1, dir_content, visit_id=visit["visit"], ) - _origin_directory_view_test_helper( - client, - archive_data, - origin, - visit, - branches, - releases, - root_dir_sha1, - dir_content, - timestamp=visit_unix_ts, - ) - _origin_directory_view_test_helper( client, archive_data, origin, visit, branches, releases, root_dir_sha1, dir_content, timestamp=visit["date"], ) _origin_directory_view_test_helper( client, archive_data, origin, visit, branches, releases, root_dir_sha1, dir_content, snapshot_id=visit["snapshot"], ) _origin_directory_view_test_helper( client, archive_data, origin, visit, branches, releases, root_dir_sha1, dir_content, ) _origin_directory_view_test_helper( client, archive_data, origin, visit, branches, releases, root_dir_sha1, dir_content, visit_id=visit["visit"], ) - _origin_directory_view_test_helper( - client, - archive_data, - origin, - visit, - branches, - releases, - root_dir_sha1, - dir_content, - timestamp=visit_unix_ts, - ) - _origin_directory_view_test_helper( client, archive_data, origin, visit, branches, releases, root_dir_sha1, dir_content, timestamp=visit["date"], ) _origin_directory_view_test_helper( client, archive_data, origin, visit, branches, releases, root_dir_sha1, dir_content, snapshot_id=visit["snapshot"], ) @given(origin()) def test_origin_sub_directory_view(client, archive_data, origin): origin_visits = archive_data.origin_visit_get(origin["url"]) visit = origin_visits[-1] snapshot = archive_data.snapshot_get(visit["snapshot"]) head_rev_id = archive_data.snapshot_get_head(snapshot) head_rev = archive_data.revision_get(head_rev_id) root_dir_sha1 = head_rev["directory"] subdirs = [ e for e in archive_data.directory_ls(root_dir_sha1) if e["type"] == "dir" ] branches, releases = process_snapshot_branches(snapshot) - visit_unix_ts = parse_timestamp(visit["date"]).timestamp() - visit_unix_ts = int(visit_unix_ts) if len(subdirs) == 0: return subdir = random.choice(subdirs) subdir_content = archive_data.directory_ls(subdir["target"]) subdir_path = subdir["name"] _origin_directory_view_test_helper( client, archive_data, origin, visit, branches, releases, root_dir_sha1, subdir_content, path=subdir_path, ) _origin_directory_view_test_helper( client, archive_data, origin, visit, branches, releases, root_dir_sha1, subdir_content, path=subdir_path, visit_id=visit["visit"], ) - _origin_directory_view_test_helper( - client, - archive_data, - origin, - visit, - branches, - releases, - root_dir_sha1, - subdir_content, - path=subdir_path, - timestamp=visit_unix_ts, - ) - _origin_directory_view_test_helper( client, archive_data, origin, visit, branches, releases, root_dir_sha1, subdir_content, path=subdir_path, timestamp=visit["date"], ) _origin_directory_view_test_helper( client, archive_data, origin, visit, branches, releases, root_dir_sha1, subdir_content, path=subdir_path, snapshot_id=visit["snapshot"], ) _origin_directory_view_test_helper( client, archive_data, origin, visit, branches, releases, root_dir_sha1, subdir_content, path=subdir_path, ) _origin_directory_view_test_helper( client, archive_data, origin, visit, branches, releases, root_dir_sha1, subdir_content, path=subdir_path, visit_id=visit["visit"], ) - _origin_directory_view_test_helper( - client, - archive_data, - origin, - visit, - branches, - releases, - root_dir_sha1, - subdir_content, - path=subdir_path, - timestamp=visit_unix_ts, - ) - _origin_directory_view_test_helper( client, archive_data, origin, visit, branches, releases, root_dir_sha1, subdir_content, path=subdir_path, timestamp=visit["date"], ) _origin_directory_view_test_helper( client, archive_data, origin, visit, branches, releases, root_dir_sha1, subdir_content, path=subdir_path, snapshot_id=visit["snapshot"], ) @given(origin()) def test_origin_branches(client, archive_data, origin): origin_visits = archive_data.origin_visit_get(origin["url"]) visit = origin_visits[-1] snapshot = archive_data.snapshot_get(visit["snapshot"]) snapshot_content = process_snapshot_branches(snapshot) _origin_branches_test_helper(client, origin, snapshot_content) _origin_branches_test_helper( client, origin, snapshot_content, snapshot_id=visit["snapshot"] ) @given(origin()) def test_origin_releases(client, archive_data, origin): origin_visits = archive_data.origin_visit_get(origin["url"]) visit = origin_visits[-1] snapshot = archive_data.snapshot_get(visit["snapshot"]) snapshot_content = process_snapshot_branches(snapshot) _origin_releases_test_helper(client, origin, snapshot_content) _origin_releases_test_helper( client, origin, snapshot_content, snapshot_id=visit["snapshot"] ) @given( new_origin(), new_snapshot(min_size=4, max_size=4), visit_dates(), revisions(min_size=3, max_size=3), ) def test_origin_snapshot_null_branch( client, archive_data, new_origin, new_snapshot, visit_dates, revisions ): snp_dict = new_snapshot.to_dict() archive_data.origin_add([new_origin]) for i, branch in enumerate(snp_dict["branches"].keys()): if i == 0: snp_dict["branches"][branch] = None else: snp_dict["branches"][branch] = { "target_type": "revision", "target": hash_to_bytes(revisions[i - 1]), } archive_data.snapshot_add([Snapshot.from_dict(snp_dict)]) visit = archive_data.origin_visit_add( [OriginVisit(origin=new_origin.url, date=visit_dates[0], type="git",)] )[0] visit_status = OriginVisitStatus( origin=new_origin.url, visit=visit.visit, date=now(), status="partial", snapshot=snp_dict["id"], ) archive_data.origin_visit_status_add([visit_status]) url = reverse( "browse-origin-directory", query_params={"origin_url": new_origin.url} ) rv = client.get(url) assert rv.status_code == 200 @given( new_origin(), new_snapshot(min_size=4, max_size=4), visit_dates(), revisions(min_size=4, max_size=4), ) def test_origin_snapshot_invalid_branch( client, archive_data, new_origin, new_snapshot, visit_dates, revisions ): snp_dict = new_snapshot.to_dict() archive_data.origin_add([new_origin]) for i, branch in enumerate(snp_dict["branches"].keys()): snp_dict["branches"][branch] = { "target_type": "revision", "target": hash_to_bytes(revisions[i]), } archive_data.snapshot_add([Snapshot.from_dict(snp_dict)]) visit = archive_data.origin_visit_add( [OriginVisit(origin=new_origin.url, date=visit_dates[0], type="git",)] )[0] visit_status = OriginVisitStatus( origin=new_origin.url, visit=visit.visit, date=now(), status="full", snapshot=snp_dict["id"], ) archive_data.origin_visit_status_add([visit_status]) url = reverse( "browse-origin-directory", query_params={"origin_url": new_origin.url, "branch": "invalid_branch"}, ) rv = client.get(url) assert rv.status_code == 404 @given(new_origin()) def test_browse_visits_origin_not_found(client, new_origin): url = reverse("browse-origin-visits", query_params={"origin_url": new_origin.url}) resp = client.get(url) assert resp.status_code == 404 assert_template_used(resp, "error.html") assert_contains( resp, f"Origin with url {new_origin.url} not found", status_code=404 ) @given(origin()) def test_browse_origin_directory_no_visit(client, mocker, origin): mock_get_origin_visits = mocker.patch( "swh.web.common.origin_visits.get_origin_visits" ) mock_get_origin_visits.return_value = [] url = reverse("browse-origin-directory", query_params={"origin_url": origin["url"]}) resp = client.get(url) assert resp.status_code == 404 assert_template_used(resp, "error.html") assert_contains(resp, "No visit", status_code=404) assert mock_get_origin_visits.called @given(origin()) def test_browse_origin_directory_unknown_visit(client, mocker, origin): mock_get_origin_visits = mocker.patch( "swh.web.common.origin_visits.get_origin_visits" ) mock_get_origin_visits.return_value = [{"visit": 1}] url = reverse( "browse-origin-directory", query_params={"origin_url": origin["url"], "visit_id": 2}, ) resp = client.get(url) assert resp.status_code == 404 assert_template_used(resp, "error.html") assert re.search("Visit.*not found", resp.content.decode("utf-8")) assert mock_get_origin_visits.called @given(origin()) def test_browse_origin_directory_not_found(client, origin): url = reverse( "browse-origin-directory", query_params={"origin_url": origin["url"], "path": "/invalid/dir/path/"}, ) resp = client.get(url) assert resp.status_code == 404 assert_template_used(resp, "error.html") assert re.search("Directory.*not found", resp.content.decode("utf-8")) @given(origin()) def test_browse_origin_content_no_visit(client, mocker, origin): mock_get_origin_visits = mocker.patch( "swh.web.common.origin_visits.get_origin_visits" ) mock_get_origin_visits.return_value = [] url = reverse( "browse-origin-content", query_params={"origin_url": origin["url"], "path": "foo"}, ) resp = client.get(url) assert resp.status_code == 404 assert_template_used(resp, "error.html") assert_contains(resp, "No visit", status_code=404) assert mock_get_origin_visits.called @given(origin()) def test_browse_origin_content_unknown_visit(client, mocker, origin): mock_get_origin_visits = mocker.patch( "swh.web.common.origin_visits.get_origin_visits" ) mock_get_origin_visits.return_value = [{"visit": 1}] url = reverse( "browse-origin-content", query_params={"origin_url": origin["url"], "path": "foo", "visit_id": 2}, ) resp = client.get(url) assert resp.status_code == 404 assert_template_used(resp, "error.html") assert re.search("Visit.*not found", resp.content.decode("utf-8")) assert mock_get_origin_visits.called @given(origin()) def test_browse_origin_content_directory_empty_snapshot(client, mocker, origin): mock_snapshot_service = mocker.patch("swh.web.browse.snapshot_context.service") mock_get_origin_visit_snapshot = mocker.patch( "swh.web.browse.snapshot_context.get_origin_visit_snapshot" ) mock_get_origin_visit_snapshot.return_value = ([], []) mock_snapshot_service.lookup_origin.return_value = origin mock_snapshot_service.lookup_snapshot_sizes.return_value = { "revision": 0, "release": 0, } for browse_context in ("content", "directory"): url = reverse( f"browse-origin-{browse_context}", query_params={"origin_url": origin["url"], "path": "baz"}, ) resp = client.get(url) assert resp.status_code == 200 assert_template_used(resp, f"browse/{browse_context}.html") assert re.search("snapshot.*is empty", resp.content.decode("utf-8")) assert mock_get_origin_visit_snapshot.called assert mock_snapshot_service.lookup_origin.called assert mock_snapshot_service.lookup_snapshot_sizes.called @given(origin()) def test_browse_origin_content_not_found(client, origin): url = reverse( "browse-origin-content", query_params={"origin_url": origin["url"], "path": "/invalid/file/path"}, ) resp = client.get(url) assert resp.status_code == 404 assert_template_used(resp, "error.html") assert re.search("Directory entry.*not found", resp.content.decode("utf-8")) @given(origin()) def test_browse_directory_snapshot_not_found(client, mocker, origin): mock_get_snapshot_context = mocker.patch( "swh.web.browse.snapshot_context.get_snapshot_context" ) mock_get_snapshot_context.side_effect = NotFoundExc("Snapshot not found") url = reverse("browse-origin-directory", query_params={"origin_url": origin["url"]}) resp = client.get(url) assert resp.status_code == 404 assert_template_used(resp, "error.html") assert_contains(resp, "Snapshot not found", status_code=404) assert mock_get_snapshot_context.called @given(origin()) def test_origin_empty_snapshot(client, mocker, origin): mock_service = mocker.patch("swh.web.browse.snapshot_context.service") mock_get_origin_visit_snapshot = mocker.patch( "swh.web.browse.snapshot_context.get_origin_visit_snapshot" ) mock_get_origin_visit_snapshot.return_value = ([], []) mock_service.lookup_snapshot_sizes.return_value = { "revision": 0, "release": 0, } mock_service.lookup_origin.return_value = origin url = reverse("browse-origin-directory", query_params={"origin_url": origin["url"]}) resp = client.get(url) assert resp.status_code == 200 assert_template_used(resp, "browse/directory.html") resp_content = resp.content.decode("utf-8") assert re.search("snapshot.*is empty", resp_content) assert not re.search("swh-tr-link", resp_content) assert mock_get_origin_visit_snapshot.called assert mock_service.lookup_snapshot_sizes.called @given(new_origin()) def test_origin_empty_snapshot_null_revision(client, archive_data, new_origin): snapshot = Snapshot( branches={ b"HEAD": SnapshotBranch( target="refs/head/master".encode(), target_type=TargetType.ALIAS, ), b"refs/head/master": None, } ) archive_data.origin_add([new_origin]) archive_data.snapshot_add([snapshot]) visit = archive_data.origin_visit_add( [OriginVisit(origin=new_origin.url, date=now(), type="git",)] )[0] visit_status = OriginVisitStatus( origin=new_origin.url, visit=visit.visit, date=now(), status="partial", snapshot=snapshot.id, ) archive_data.origin_visit_status_add([visit_status]) url = reverse( "browse-origin-directory", query_params={"origin_url": new_origin.url}, ) resp = client.get(url) assert resp.status_code == 200 assert_template_used(resp, "browse/directory.html") resp_content = resp.content.decode("utf-8") assert re.search("snapshot.*is empty", resp_content) assert not re.search("swh-tr-link", resp_content) @given(origin_with_releases()) def test_origin_release_browse(client, archive_data, origin): snapshot = archive_data.snapshot_get_latest(origin["url"]) release = [ b for b in snapshot["branches"].values() if b["target_type"] == "release" ][-1] release_data = archive_data.release_get(release["target"]) revision_data = archive_data.revision_get(release_data["target"]) url = reverse( "browse-origin-directory", query_params={"origin_url": origin["url"], "release": release_data["name"]}, ) resp = client.get(url) assert resp.status_code == 200 assert_contains(resp, release_data["name"]) assert_contains(resp, release["target"]) swhid_context = { "origin": origin["url"], "visit": gen_swhid(SNAPSHOT, snapshot["id"]), "anchor": gen_swhid(RELEASE, release_data["id"]), "path": "/", } swh_dir_id = gen_swhid( DIRECTORY, revision_data["directory"], metadata=swhid_context ) swh_dir_id_url = reverse("browse-swhid", url_args={"swhid": swh_dir_id}) assert_contains(resp, swh_dir_id) assert_contains(resp, swh_dir_id_url) @given(origin_with_releases()) def test_origin_release_browse_not_found(client, origin): invalid_release_name = "swh-foo-bar" url = reverse( "browse-origin-directory", query_params={"origin_url": origin["url"], "release": invalid_release_name}, ) resp = client.get(url) assert resp.status_code == 404 assert re.search( f"Release {invalid_release_name}.*not found", resp.content.decode("utf-8") ) @given(new_origin(), unknown_revision()) def test_origin_browse_directory_branch_with_non_resolvable_revision( client, archive_data, new_origin, unknown_revision ): branch_name = "master" snapshot = Snapshot( branches={ branch_name.encode(): SnapshotBranch( target=hash_to_bytes(unknown_revision), target_type=TargetType.REVISION, ) } ) archive_data.origin_add([new_origin]) archive_data.snapshot_add([snapshot]) visit = archive_data.origin_visit_add( [OriginVisit(origin=new_origin.url, date=now(), type="git",)] )[0] visit_status = OriginVisitStatus( origin=new_origin.url, visit=visit.visit, date=now(), status="partial", snapshot=snapshot.id, ) archive_data.origin_visit_status_add([visit_status]) url = reverse( "browse-origin-directory", query_params={"origin_url": new_origin.url, "branch": branch_name}, ) resp = client.get(url) assert resp.status_code == 200 assert_contains( resp, f"Revision {unknown_revision } could not be found in the archive." ) @given(origin()) def test_origin_content_no_path(client, origin): url = reverse("browse-origin-content", query_params={"origin_url": origin["url"]}) resp = client.get(url) assert resp.status_code == 400 assert_contains( resp, "The path of a content must be given as query parameter.", status_code=400 ) def test_origin_views_no_url_query_parameter(client): for browse_context in ( "content", "directory", "log", "branches", "releases", "visits", ): url = reverse(f"browse-origin-{browse_context}") resp = client.get(url) assert resp.status_code == 400 assert_contains( resp, "An origin URL must be provided as query parameter.", status_code=400 ) def _origin_content_view_test_helper( client, archive_data, origin_info, origin_visit, origin_branches, origin_releases, root_dir_sha1, content, visit_id=None, timestamp=None, snapshot_id=None, ): content_path = "/".join(content["path"].split("/")[1:]) if not visit_id and not snapshot_id: visit_id = origin_visit["visit"] query_params = {"origin_url": origin_info["url"], "path": content_path} if timestamp: query_params["timestamp"] = timestamp if visit_id: query_params["visit_id"] = visit_id elif snapshot_id: query_params["snapshot"] = snapshot_id url = reverse("browse-origin-content", query_params=query_params) resp = client.get(url) assert resp.status_code == 200 assert_template_used(resp, "browse/content.html") assert type(content["data"]) == str assert_contains(resp, '' % content["hljs_language"]) assert_contains(resp, escape(content["data"])) split_path = content_path.split("/") filename = split_path[-1] path = content_path.replace(filename, "")[:-1] path_info = gen_path_info(path) del query_params["path"] if timestamp: query_params["timestamp"] = format_utc_iso_date( - parse_timestamp(timestamp).isoformat(), "%Y-%m-%dT%H:%M:%SZ" + parse_iso8601_date_to_utc(timestamp).isoformat(), "%Y-%m-%dT%H:%M:%SZ" ) root_dir_url = reverse("browse-origin-directory", query_params=query_params) assert_contains(resp, '
  • ', count=len(path_info) + 1) assert_contains(resp, '%s' % (root_dir_url, root_dir_sha1[:7])) for p in path_info: query_params["path"] = p["path"] dir_url = reverse("browse-origin-directory", query_params=query_params) assert_contains(resp, '%s' % (dir_url, p["name"])) assert_contains(resp, "
  • %s
  • " % filename) query_string = "sha1_git:" + content["sha1_git"] url_raw = reverse( "browse-content-raw", url_args={"query_string": query_string}, query_params={"filename": filename}, ) assert_contains(resp, url_raw) if "path" in query_params: del query_params["path"] origin_branches_url = reverse("browse-origin-branches", query_params=query_params) assert_contains(resp, f'href="{escape(origin_branches_url)}"') assert_contains(resp, f"Branches ({len(origin_branches)})") origin_releases_url = reverse("browse-origin-releases", query_params=query_params) assert_contains(resp, f'href="{escape(origin_releases_url)}">') assert_contains(resp, f"Releases ({len(origin_releases)})") assert_contains(resp, '
  • ', count=len(origin_branches)) query_params["path"] = content_path for branch in origin_branches: root_dir_branch_url = reverse( "browse-origin-content", query_params={"branch": branch["name"], **query_params}, ) assert_contains(resp, '' % root_dir_branch_url) assert_contains(resp, '
  • ', count=len(origin_releases)) query_params["branch"] = None for release in origin_releases: root_dir_release_url = reverse( "browse-origin-content", query_params={"release": release["name"], **query_params}, ) assert_contains(resp, '' % root_dir_release_url) url = reverse("browse-origin-content", query_params=query_params) resp = client.get(url) assert resp.status_code == 200 assert_template_used(resp, "browse/content.html") snapshot = archive_data.snapshot_get(origin_visit["snapshot"]) head_rev_id = archive_data.snapshot_get_head(snapshot) swhid_context = { "origin": origin_info["url"], "visit": gen_swhid(SNAPSHOT, snapshot["id"]), "anchor": gen_swhid(REVISION, head_rev_id), "path": f"/{content_path}", } swh_cnt_id = gen_swhid(CONTENT, content["sha1_git"], metadata=swhid_context) swh_cnt_id_url = reverse("browse-swhid", url_args={"swhid": swh_cnt_id}) assert_contains(resp, swh_cnt_id) assert_contains(resp, swh_cnt_id_url) assert_contains(resp, "swh-take-new-snapshot") _check_origin_link(resp, origin_info["url"]) def _origin_directory_view_test_helper( client, archive_data, origin_info, origin_visit, origin_branches, origin_releases, root_directory_sha1, directory_entries, visit_id=None, timestamp=None, snapshot_id=None, path=None, ): dirs = [e for e in directory_entries if e["type"] in ("dir", "rev")] files = [e for e in directory_entries if e["type"] == "file"] if not visit_id and not snapshot_id: visit_id = origin_visit["visit"] query_params = {"origin_url": origin_info["url"]} if timestamp: query_params["timestamp"] = timestamp elif visit_id: query_params["visit_id"] = visit_id else: query_params["snapshot"] = snapshot_id if path: query_params["path"] = path url = reverse("browse-origin-directory", query_params=query_params) resp = client.get(url) assert resp.status_code == 200 assert_template_used(resp, "browse/directory.html") assert resp.status_code == 200 assert_template_used(resp, "browse/directory.html") assert_contains(resp, '', count=len(dirs)) assert_contains(resp, '', count=len(files)) if timestamp: query_params["timestamp"] = format_utc_iso_date( - parse_timestamp(timestamp).isoformat(), "%Y-%m-%dT%H:%M:%SZ" + parse_iso8601_date_to_utc(timestamp).isoformat(), "%Y-%m-%dT%H:%M:%SZ" ) for d in dirs: if d["type"] == "rev": dir_url = reverse("browse-revision", url_args={"sha1_git": d["target"]}) else: dir_path = d["name"] if path: dir_path = "%s/%s" % (path, d["name"]) query_params["path"] = dir_path dir_url = reverse("browse-origin-directory", query_params=query_params,) assert_contains(resp, dir_url) for f in files: file_path = f["name"] if path: file_path = "%s/%s" % (path, f["name"]) query_params["path"] = file_path file_url = reverse("browse-origin-content", query_params=query_params) assert_contains(resp, file_url) if "path" in query_params: del query_params["path"] root_dir_branch_url = reverse("browse-origin-directory", query_params=query_params) nb_bc_paths = 1 if path: nb_bc_paths = len(path.split("/")) + 1 assert_contains(resp, '
  • ', count=nb_bc_paths) assert_contains( resp, '%s' % (root_dir_branch_url, root_directory_sha1[:7]) ) origin_branches_url = reverse("browse-origin-branches", query_params=query_params) assert_contains(resp, f'href="{escape(origin_branches_url)}"') assert_contains(resp, f"Branches ({len(origin_branches)})") origin_releases_url = reverse("browse-origin-releases", query_params=query_params) nb_releases = len(origin_releases) if nb_releases > 0: assert_contains(resp, f'href="{escape(origin_releases_url)}"') assert_contains(resp, f"Releases ({nb_releases})") if path: query_params["path"] = path assert_contains(resp, '
  • ', count=len(origin_branches)) for branch in origin_branches: query_params["branch"] = branch["name"] root_dir_branch_url = reverse( "browse-origin-directory", query_params=query_params ) assert_contains(resp, '' % root_dir_branch_url) assert_contains(resp, '
  • ', count=len(origin_releases)) query_params["branch"] = None for release in origin_releases: query_params["release"] = release["name"] root_dir_release_url = reverse( "browse-origin-directory", query_params=query_params ) assert_contains(resp, 'href="%s"' % root_dir_release_url) assert_contains(resp, "vault-cook-directory") assert_contains(resp, "vault-cook-revision") snapshot = archive_data.snapshot_get(origin_visit["snapshot"]) head_rev_id = archive_data.snapshot_get_head(snapshot) swhid_context = { "origin": origin_info["url"], "visit": gen_swhid(SNAPSHOT, snapshot["id"]), "anchor": gen_swhid(REVISION, head_rev_id), "path": f"/{path}" if path else "/", } swh_dir_id = gen_swhid( DIRECTORY, directory_entries[0]["dir_id"], metadata=swhid_context ) swh_dir_id_url = reverse("browse-swhid", url_args={"swhid": swh_dir_id}) assert_contains(resp, swh_dir_id) assert_contains(resp, swh_dir_id_url) assert_contains(resp, "swh-take-new-snapshot") _check_origin_link(resp, origin_info["url"]) def _origin_branches_test_helper( client, origin_info, origin_snapshot, snapshot_id=None ): query_params = {"origin_url": origin_info["url"], "snapshot": snapshot_id} url = reverse("browse-origin-branches", query_params=query_params) resp = client.get(url) assert resp.status_code == 200 assert_template_used(resp, "browse/branches.html") origin_branches = origin_snapshot[0] origin_releases = origin_snapshot[1] origin_branches_url = reverse("browse-origin-branches", query_params=query_params) assert_contains(resp, f'href="{escape(origin_branches_url)}"') assert_contains(resp, f"Branches ({len(origin_branches)})") origin_releases_url = reverse("browse-origin-releases", query_params=query_params) nb_releases = len(origin_releases) if nb_releases > 0: assert_contains(resp, f'href="{escape(origin_releases_url)}">') assert_contains(resp, f"Releases ({nb_releases})") assert_contains(resp, '' % escape(browse_branch_url)) browse_revision_url = reverse( "browse-revision", url_args={"sha1_git": branch["revision"]}, query_params=query_params, ) assert_contains(resp, '' % escape(browse_revision_url)) _check_origin_link(resp, origin_info["url"]) def _origin_releases_test_helper( client, origin_info, origin_snapshot, snapshot_id=None ): query_params = {"origin_url": origin_info["url"], "snapshot": snapshot_id} url = reverse("browse-origin-releases", query_params=query_params) resp = client.get(url) assert resp.status_code == 200 assert_template_used(resp, "browse/releases.html") origin_branches = origin_snapshot[0] origin_releases = origin_snapshot[1] origin_branches_url = reverse("browse-origin-branches", query_params=query_params) assert_contains(resp, f'href="{escape(origin_branches_url)}"') assert_contains(resp, f"Branches ({len(origin_branches)})") origin_releases_url = reverse("browse-origin-releases", query_params=query_params) nb_releases = len(origin_releases) if nb_releases > 0: assert_contains(resp, f'href="{escape(origin_releases_url)}"') assert_contains(resp, f"Releases ({nb_releases})") assert_contains(resp, '' % escape(browse_release_url)) assert_contains(resp, '' % escape(browse_revision_url)) _check_origin_link(resp, origin_info["url"]) @given( new_origin(), visit_dates(), revisions(min_size=10, max_size=10), existing_release() ) def test_origin_branches_pagination_with_alias( client, archive_data, mocker, new_origin, visit_dates, revisions, existing_release ): """ When a snapshot contains a branch or a release alias, pagination links in the branches / releases view should be displayed. """ mocker.patch("swh.web.browse.snapshot_context.PER_PAGE", len(revisions) / 2) snp_dict = {"branches": {}, "id": hash_to_bytes(random_sha1())} for i in range(len(revisions)): branch = "".join(random.choices(string.ascii_lowercase, k=8)) snp_dict["branches"][branch.encode()] = { "target_type": "revision", "target": hash_to_bytes(revisions[i]), } release = "".join(random.choices(string.ascii_lowercase, k=8)) snp_dict["branches"][b"RELEASE_ALIAS"] = { "target_type": "alias", "target": release.encode(), } snp_dict["branches"][release.encode()] = { "target_type": "release", "target": hash_to_bytes(existing_release), } archive_data.origin_add([new_origin]) archive_data.snapshot_add([Snapshot.from_dict(snp_dict)]) visit = archive_data.origin_visit_add( [OriginVisit(origin=new_origin.url, date=visit_dates[0], type="git",)] )[0] visit_status = OriginVisitStatus( origin=new_origin.url, visit=visit.visit, date=now(), status="full", snapshot=snp_dict["id"], ) archive_data.origin_visit_status_add([visit_status]) url = reverse("browse-origin-branches", query_params={"origin_url": new_origin.url}) resp = client.get(url) assert resp.status_code == 200 assert_template_used(resp, "browse/branches.html") assert_contains(resp, '
      Newer') if len(revision_log_sorted) > per_page: assert_contains( resp, 'Older' % escape(next_page_url), ) for log in revision_log_sorted[:per_page]: revision_url = reverse("browse-revision", url_args={"sha1_git": log["id"]}) assert_contains(resp, log["id"][:7]) assert_contains(resp, log["author"]["name"]) assert_contains(resp, format_utc_iso_date(log["date"])) assert_contains(resp, escape(log["message"])) assert_contains(resp, format_utc_iso_date(log["committer_date"])) assert_contains(resp, revision_url) if len(revision_log_sorted) <= per_page: return resp = client.get(next_page_url) prev_page_url = reverse( "browse-revision-log", url_args={"sha1_git": revision}, query_params={"per_page": per_page}, ) next_page_url = reverse( "browse-revision-log", url_args={"sha1_git": revision}, query_params={"offset": 2 * per_page, "per_page": per_page}, ) nb_log_entries = len(revision_log_sorted) - per_page if nb_log_entries > per_page: nb_log_entries = per_page assert resp.status_code == 200 assert_template_used(resp, "browse/revision-log.html") assert_contains(resp, 'Newer' % escape(prev_page_url) ) if len(revision_log_sorted) > 2 * per_page: assert_contains( resp, 'Older' % escape(next_page_url), ) if len(revision_log_sorted) <= 2 * per_page: return resp = client.get(next_page_url) prev_page_url = reverse( "browse-revision-log", url_args={"sha1_git": revision}, query_params={"offset": per_page, "per_page": per_page}, ) next_page_url = reverse( "browse-revision-log", url_args={"sha1_git": revision}, query_params={"offset": 3 * per_page, "per_page": per_page}, ) nb_log_entries = len(revision_log_sorted) - 2 * per_page if nb_log_entries > per_page: nb_log_entries = per_page assert resp.status_code == 200 assert_template_used(resp, "browse/revision-log.html") assert_contains(resp, 'Newer' % escape(prev_page_url) ) if len(revision_log_sorted) > 3 * per_page: assert_contains( resp, 'Older' % escape(next_page_url), ) @given(revision(), unknown_revision(), new_origin()) def test_revision_request_errors(client, revision, unknown_revision, new_origin): url = reverse("browse-revision", url_args={"sha1_git": unknown_revision}) resp = client.get(url) assert resp.status_code == 404 assert_template_used(resp, "error.html") assert_contains( resp, "Revision with sha1_git %s not found" % unknown_revision, status_code=404 ) url = reverse( "browse-revision", url_args={"sha1_git": revision}, query_params={"origin_url": new_origin.url}, ) resp = client.get(url) assert resp.status_code == 404 assert_template_used(resp, "error.html") assert_contains( resp, "the origin mentioned in your request" " appears broken", status_code=404 ) @given(revision()) def test_revision_uppercase(client, revision): url = reverse( "browse-revision-uppercase-checksum", url_args={"sha1_git": revision.upper()} ) resp = client.get(url) assert resp.status_code == 302 redirect_url = reverse("browse-revision", url_args={"sha1_git": revision}) assert resp["location"] == redirect_url def _revision_browse_checks( client, archive_data, revision, origin_url=None, snapshot=None ): query_params = {} if origin_url: query_params["origin_url"] = origin_url if snapshot: query_params["snapshot"] = snapshot["id"] url = reverse( "browse-revision", url_args={"sha1_git": revision}, query_params=query_params ) revision_data = archive_data.revision_get(revision) author_name = revision_data["author"]["name"] committer_name = revision_data["committer"]["name"] dir_id = revision_data["directory"] if origin_url: snapshot = archive_data.snapshot_get_latest(origin_url) history_url = reverse( "browse-origin-log", query_params={"revision": revision, **query_params}, ) elif snapshot: history_url = reverse( "browse-snapshot-log", url_args={"snapshot_id": snapshot["id"]}, query_params={"revision": revision}, ) else: history_url = reverse("browse-revision-log", url_args={"sha1_git": revision}) resp = client.get(url) assert resp.status_code == 200 assert_template_used(resp, "browse/revision.html") assert_contains(resp, author_name) assert_contains(resp, committer_name) assert_contains(resp, history_url) for parent in revision_data["parents"]: parent_url = reverse( "browse-revision", url_args={"sha1_git": parent}, query_params=query_params ) assert_contains(resp, '%s' % (escape(parent_url), parent[:7])) author_date = revision_data["date"] committer_date = revision_data["committer_date"] message_lines = revision_data["message"].split("\n") assert_contains(resp, format_utc_iso_date(author_date)) assert_contains(resp, format_utc_iso_date(committer_date)) assert_contains(resp, escape(message_lines[0])) assert_contains(resp, escape("\n".join(message_lines[1:]))) assert_contains(resp, "vault-cook-directory") assert_contains(resp, "vault-cook-revision") swh_rev_id = gen_swhid("revision", revision) swh_rev_id_url = reverse("browse-swhid", url_args={"swhid": swh_rev_id}) assert_contains(resp, swh_rev_id) assert_contains(resp, swh_rev_id_url) swh_dir_id = gen_swhid("directory", dir_id) swh_dir_id_url = reverse("browse-swhid", url_args={"swhid": swh_dir_id}) assert_contains(resp, swh_dir_id) assert_contains(resp, swh_dir_id_url) if origin_url: assert_contains(resp, "swh-take-new-snapshot") swh_rev_id = gen_swhid(REVISION, revision) swh_rev_id_url = reverse("browse-swhid", url_args={"swhid": swh_rev_id}) if origin_url: browse_origin_url = reverse( "browse-origin", query_params={"origin_url": origin_url} ) assert_contains(resp, f'href="{browse_origin_url}"') elif snapshot: swh_snp_id = gen_swhid("snapshot", snapshot["id"]) swh_snp_id_url = reverse("browse-swhid", url_args={"swhid": swh_snp_id}) assert_contains(resp, f'href="{swh_snp_id_url}"') swhid_context = {} if origin_url: swhid_context["origin"] = origin_url if snapshot: swhid_context["visit"] = gen_swhid(SNAPSHOT, snapshot["id"]) swh_rev_id = gen_swhid(REVISION, revision, metadata=swhid_context) swh_rev_id_url = reverse("browse-swhid", url_args={"swhid": swh_rev_id}) assert_contains(resp, swh_rev_id) assert_contains(resp, swh_rev_id_url) swhid_context["anchor"] = gen_swhid(REVISION, revision) swhid_context["path"] = "/" swh_dir_id = gen_swhid(DIRECTORY, dir_id, metadata=swhid_context) swh_dir_id_url = reverse("browse-swhid", url_args={"swhid": swh_dir_id}) assert_contains(resp, swh_dir_id) assert_contains(resp, swh_dir_id_url) diff --git a/swh/web/tests/common/test_origin_visits.py b/swh/web/tests/common/test_origin_visits.py index ffeb5dda..3b4569f5 100644 --- a/swh/web/tests/common/test_origin_visits.py +++ b/swh/web/tests/common/test_origin_visits.py @@ -1,238 +1,235 @@ # Copyright (C) 2018-2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information from datetime import timedelta from hypothesis import given import pytest from swh.model.hashutil import hash_to_hex from swh.model.model import OriginVisit, OriginVisitStatus from swh.storage.utils import now from swh.web.common.exc import NotFoundExc from swh.web.common.origin_visits import get_origin_visits, get_origin_visit from swh.web.common.typing import OriginInfo from swh.web.tests.strategies import new_origin, new_snapshots @given(new_snapshots(3)) def test_get_origin_visits(mocker, snapshots): mock_service = mocker.patch("swh.web.common.service") mock_service.MAX_LIMIT = 2 def _lookup_origin_visits(*args, **kwargs): if kwargs["last_visit"] is None: return [ { "visit": 1, "date": "2017-05-06T00:59:10+00:00", "status": "full", "snapshot": hash_to_hex(snapshots[0].id), "type": "git", }, { "visit": 2, "date": "2017-08-06T00:59:10+00:00", "status": "full", "snapshot": hash_to_hex(snapshots[1].id), "type": "git", }, ] else: return [ { "visit": 3, "date": "2017-09-06T00:59:10+00:00", "status": "full", "snapshot": hash_to_hex(snapshots[2].id), "type": "git", } ] mock_service.lookup_origin_visits.side_effect = _lookup_origin_visits origin_info = { "url": "https://github.com/foo/bar", } origin_visits = get_origin_visits(origin_info) assert len(origin_visits) == 3 @given(new_snapshots(5)) def test_get_origin_visit(mocker, snapshots): mock_origin_visits = mocker.patch("swh.web.common.origin_visits.get_origin_visits") origin_info = { "url": "https://github.com/foo/bar", } visits = [ { "status": "full", "date": "2015-07-09T21:09:24+00:00", "visit": 1, "origin": "https://github.com/foo/bar", "type": "git", "snapshot": hash_to_hex(snapshots[0].id), }, { "status": "full", "date": "2016-02-23T18:05:23.312045+00:00", "visit": 2, "origin": "https://github.com/foo/bar", "type": "git", "snapshot": hash_to_hex(snapshots[1].id), }, { "status": "full", "date": "2016-03-28T01:35:06.554111+00:00", "visit": 3, "origin": "https://github.com/foo/bar", "type": "git", "snapshot": hash_to_hex(snapshots[2].id), }, { "status": "full", "date": "2016-06-18T01:22:24.808485+00:00", "visit": 4, "origin": "https://github.com/foo/bar", "type": "git", "snapshot": hash_to_hex(snapshots[3].id), }, { "status": "full", "date": "2016-08-14T12:10:00.536702+00:00", "visit": 5, "origin": "https://github.com/foo/bar", "type": "git", "snapshot": hash_to_hex(snapshots[4].id), }, ] mock_origin_visits.return_value = visits visit_id = 12 with pytest.raises(NotFoundExc) as e: visit = get_origin_visit(origin_info, visit_id=visit_id) assert e.match("Visit with id %s" % visit_id) assert e.match("url %s" % origin_info["url"]) visit = get_origin_visit(origin_info, visit_id=2) assert visit == visits[1] visit = get_origin_visit(origin_info, visit_ts="2016-02-23T18:05:23.312045+00:00") assert visit == visits[1] visit = get_origin_visit(origin_info, visit_ts="2016-02-20") assert visit == visits[1] visit = get_origin_visit(origin_info, visit_ts="2016-06-18T01:22") assert visit == visits[3] visit = get_origin_visit(origin_info, visit_ts="2016-06-18 01:22") assert visit == visits[3] - visit = get_origin_visit(origin_info, visit_ts=1466208000) - assert visit == visits[3] - visit = get_origin_visit(origin_info, visit_ts="2014-01-01") assert visit == visits[0] visit = get_origin_visit(origin_info, visit_ts="2018-01-01") assert visit == visits[-1] @given(new_origin(), new_snapshots(6)) def test_get_origin_visit_return_first_valid_full_visit( archive_data, new_origin, new_snapshots ): visits = [] archive_data.origin_add([new_origin]) # create 6 visits, the first three have full status while the # last three have partial status and set a null snapshot for # the last four visits for i, snp in enumerate(new_snapshots): visit_date = now() + timedelta(days=i * 10) visit = archive_data.origin_visit_add( [OriginVisit(origin=new_origin.url, date=visit_date, type="git",)] )[0] archive_data.snapshot_add([new_snapshots[i]]) visit_status = OriginVisitStatus( origin=new_origin.url, visit=visit.visit, date=visit_date + timedelta(minutes=5), status="full" if i < 3 else "partial", snapshot=new_snapshots[i].id if i < 2 else None, ) if i < 2: archive_data.origin_visit_status_add([visit_status]) visits.append(visit.visit) # should return the second visit expected_visit = archive_data.origin_visit_get_by(new_origin.url, visits[1]) assert get_origin_visit((OriginInfo(url=new_origin.url))) == expected_visit @given(new_origin(), new_snapshots(6)) def test_get_origin_visit_non_resolvable_snapshots( archive_data, new_origin, new_snapshots ): visits = [] archive_data.origin_add([new_origin]) # create 6 full visits, the first three have resolvable snapshots # while the last three have non resolvable snapshots for i, snp in enumerate(new_snapshots): visit_date = now() + timedelta(days=i * 10) visit = archive_data.origin_visit_add( [OriginVisit(origin=new_origin.url, date=visit_date, type="git",)] )[0] archive_data.snapshot_add([new_snapshots[i]]) visit_status = OriginVisitStatus( origin=new_origin.url, visit=visit.visit, date=visit_date + timedelta(minutes=5), status="full", snapshot=new_snapshots[i].id, ) if i < 3: archive_data.origin_visit_status_add([visit_status]) visits.append(visit.visit) # should return the third visit expected_visit = archive_data.origin_visit_get_by(new_origin.url, visits[2]) assert get_origin_visit((OriginInfo(url=new_origin.url))) == expected_visit @given(new_origin(), new_snapshots(6)) def test_get_origin_visit_return_first_valid_partial_visit( archive_data, new_origin, new_snapshots ): visits = [] archive_data.origin_add([new_origin]) # create 6 visits, the first three have full status but null snapshot # while the last three have partial status with valid snapshot for i, snp in enumerate(new_snapshots): visit_date = now() + timedelta(days=i * 10) visit = archive_data.origin_visit_add( [OriginVisit(origin=new_origin.url, date=visit_date, type="git",)] )[0] archive_data.snapshot_add([new_snapshots[i]]) visit_status = OriginVisitStatus( origin=new_origin.url, visit=visit.visit, date=visit_date + timedelta(minutes=5), status="full" if i < 3 else "partial", snapshot=new_snapshots[i].id if i > 2 else None, ) if i > 2: archive_data.origin_visit_status_add([visit_status]) visits.append(visit.visit) # should return the last visit expected_visit = archive_data.origin_visit_get_by(new_origin.url, visits[-1]) assert get_origin_visit((OriginInfo(url=new_origin.url))) == expected_visit diff --git a/swh/web/tests/common/test_utils.py b/swh/web/tests/common/test_utils.py index dd4abd85..2db4bf34 100644 --- a/swh/web/tests/common/test_utils.py +++ b/swh/web/tests/common/test_utils.py @@ -1,129 +1,140 @@ # Copyright (C) 2017-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information import datetime +import pytest + from swh.web.common import utils +from swh.web.common.exc import BadInputExc def test_shorten_path_noop(): noops = ["/api/", "/browse/", "/content/symbol/foobar/"] for noop in noops: assert utils.shorten_path(noop) == noop def test_shorten_path_sha1(): sha1 = "aafb16d69fd30ff58afdd69036a26047f3aebdc6" short_sha1 = sha1[:8] + "..." templates = [ "/api/1/content/sha1:%s/", "/api/1/content/sha1_git:%s/", "/api/1/directory/%s/", "/api/1/content/sha1:%s/ctags/", ] for template in templates: assert utils.shorten_path(template % sha1) == template % short_sha1 def test_shorten_path_sha256(): sha256 = "aafb16d69fd30ff58afdd69036a26047" "213add102934013a014dfca031c41aef" short_sha256 = sha256[:8] + "..." templates = [ "/api/1/content/sha256:%s/", "/api/1/directory/%s/", "/api/1/content/sha256:%s/filetype/", ] for template in templates: assert utils.shorten_path(template % sha256) == template % short_sha256 -def test_parse_timestamp(): - input_timestamps = [ - None, - "2016-01-12", - "2016-01-12T09:19:12+0100", - "Today is January 1, 2047 at 8:21:00AM", - "1452591542", - ] - - output_dates = [ - None, - datetime.datetime(2016, 1, 12, 0, 0), - datetime.datetime(2016, 1, 12, 8, 19, 12, tzinfo=datetime.timezone.utc), - datetime.datetime(2047, 1, 1, 8, 21), - datetime.datetime(2016, 1, 12, 9, 39, 2, tzinfo=datetime.timezone.utc), - ] - - for ts, exp_date in zip(input_timestamps, output_dates): - assert utils.parse_timestamp(ts) == exp_date +@pytest.mark.parametrize( + "input_timestamp, output_date", + [ + ( + "2016-01-12", + datetime.datetime(2016, 1, 12, 0, 0, tzinfo=datetime.timezone.utc), + ), + ( + "2016-01-12T09:19:12+0100", + datetime.datetime(2016, 1, 12, 8, 19, 12, tzinfo=datetime.timezone.utc), + ), + ( + "2007-01-14T20:34:22Z", + datetime.datetime(2007, 1, 14, 20, 34, 22, tzinfo=datetime.timezone.utc), + ), + ], +) +def test_parse_iso8601_date_to_utc_ok(input_timestamp, output_date): + assert utils.parse_iso8601_date_to_utc(input_timestamp) == output_date + + +@pytest.mark.parametrize( + "invalid_iso8601_timestamp", ["Today is January 1, 2047 at 8:21:00AM", "1452591542"] +) +def test_parse_iso8601_date_to_utc_ko(invalid_iso8601_timestamp): + with pytest.raises(BadInputExc): + utils.parse_iso8601_date_to_utc(invalid_iso8601_timestamp) def test_format_utc_iso_date(): assert ( utils.format_utc_iso_date("2017-05-04T13:27:13+02:00") == "04 May 2017, 11:27 UTC" ) def test_gen_path_info(): input_path = "/home/user/swh-environment/swh-web/" expected_result = [ {"name": "home", "path": "home"}, {"name": "user", "path": "home/user"}, {"name": "swh-environment", "path": "home/user/swh-environment"}, {"name": "swh-web", "path": "home/user/swh-environment/swh-web"}, ] path_info = utils.gen_path_info(input_path) assert path_info == expected_result input_path = "home/user/swh-environment/swh-web" path_info = utils.gen_path_info(input_path) assert path_info == expected_result def test_rst_to_html(): rst = ( "Section\n" "=======\n\n" "**Some strong text**\n\n" "Subsection\n" "----------\n\n" "* This is a bulleted list.\n" "* It has two items, the second\n" " item uses two lines.\n" "\n" "1. This is a numbered list.\n" "2. It has two items too.\n" "\n" "#. This is a numbered list.\n" "#. It has two items too.\n" ) expected_html = ( '

      Section

      \n' "

      Some strong text

      \n" '
      \n' "

      Subsection

      \n" '
        \n' "
      • This is a bulleted list.

      • \n" "
      • It has two items, the second\n" "item uses two lines.

      • \n" "
      \n" '
        \n' "
      1. This is a numbered list.

      2. \n" "
      3. It has two items too.

      4. \n" "
      5. This is a numbered list.

      6. \n" "
      7. It has two items too.

      8. \n" "
      \n" "
      \n" "
      " ) assert utils.rst_to_html(rst) == expected_html