Page Menu
Home
Software Heritage
Search
Configure Global Search
Log In
Files
F8322624
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
218 KB
Subscribers
None
View Options
diff --git a/docs/uri-scheme-browse-origin.rst b/docs/uri-scheme-browse-origin.rst
index eecc8ddf7..b299e2234 100644
--- a/docs/uri-scheme-browse-origin.rst
+++ b/docs/uri-scheme-browse-origin.rst
@@ -1,861 +1,848 @@
Origin
^^^^^^
This describes the URI scheme when one wants to browse the Software Heritage
archive in the context of an origin (for instance, a repository crawled from
GitHub or a Debian source package). All the views pointed by that scheme
offer quick links to browse objects as found during the associated crawls
performed by Software Heritage:
* the root directory of the origin
* the list of branches of the origin
* the list of releases of the origin
Origin visits
"""""""""""""
.. http:get:: /browse/origin/visits/
HTML view that displays visits reporting for a software origin identified by
its type and url.
:query string origin_url: mandatory parameter providing the url of the origin
(e.g. https://github.com/(user)/(repo))
:statuscode 200: no error
:statuscode 400: no origin url has been provided as parameter
:statuscode 404: requested origin can not be found in the archive
**Examples:**
.. parsed-literal::
:swh_web_browse:`origin/visits/?origin_url=https://github.com/torvalds/linux`
:swh_web_browse:`origin/visits/?origin_url=https://github.com/python/cpython`
:swh_web_browse:`origin/visits/?origin_url=deb://Debian-Security/packages/mediawiki`
:swh_web_browse:`origin/visits/?origin_url=https://gitorious.org/qt/qtbase.git`
.. http:get:: /browse/origin/(origin_url)/visits/
:deprecated:
.. warning::
That endpoint is deprecated, use :http:get:`/browse/origin/visits/` instead.
HTML view that displays a visits reporting for a software origin identified by
its type and url.
:param string origin_url: the url of the origin (e.g. https://github.com/(user)/(repo)/)
:statuscode 200: no error
:statuscode 404: requested origin can not be found in the archive
**Examples:**
.. parsed-literal::
:swh_web_browse:`origin/https://github.com/torvalds/linux/visits/`
:swh_web_browse:`origin/https://github.com/python/cpython/visits/`
:swh_web_browse:`origin/deb://Debian-Security/packages/mediawiki/visits/`
:swh_web_browse:`origin/https://gitorious.org/qt/qtbase.git/visits/`
Origin directory
""""""""""""""""
.. http:get:: /browse/origin/directory/
HTML view for browsing the content of a directory reachable from the root directory
(including itself) associated to the latest full visit of a software origin.
The content of the directory is first sorted in lexicographical order
and the sub-directories are displayed before the regular files.
The view enables to navigate from the requested directory to
directories reachable from it in a recursive way but also
up to the origin root directory.
A breadcrumb located in the top part of the view allows
to keep track of the paths navigated so far.
The view also enables to easily switch between the origin branches
and releases through a dropdown menu.
The origin branch (default to HEAD) from which to retrieve the directory
content can also be specified by using the branch query parameter.
:query string origin_url: mandatory parameter providing the url of the origin
(e.g. https://github.com/(user)/(repo))
:query string path: optional parameter used to specify the path of a directory
reachable from the origin root one
:query string branch: specify the origin branch name from which
to retrieve the root directory
:query string release: specify the origin release name from which
to retrieve the root directory
:query string revision: specify the origin revision, identified by the hexadecimal
representation of its **sha1_git** value, from which to retrieve the root directory
- :query string timestamp: a date string (any format parsable by `dateutil.parser.parse`_)
- or Unix timestamp to parse in order to find the closest visit.
+ :query string timestamp: an ISO 8601 datetime string to parse in order to find the
+ closest visit.
:query int visit_id: specify a visit id to retrieve the directory from instead
of using the latest full visit by default
:statuscode 200: no error
:statuscode 400: no origin url has been provided as parameter
:statuscode 404: requested origin can not be found in the archive
or the provided path does not exist from the origin root directory
**Examples:**
.. parsed-literal::
:swh_web_browse:`origin/directory/?origin_url=https://github.com/torvalds/linux`
:swh_web_browse:`origin/directory/?origin_url=https://github.com/torvalds/linux&path=net/ethernet`
:swh_web_browse:`origin/directory/?origin_url=https://github.com/python/cpython`
:swh_web_browse:`origin/directory/?origin_url=https://github.com/python/cpython&path=Python`
:swh_web_browse:`origin/directory/?origin_url=https://github.com/python/cpython&branch=refs/heads/2.7`
- :swh_web_browse:`origin/directory/?origin_url=https://github.com/torvalds/linux×tamp=1493926809`
:swh_web_browse:`origin/directory/?origin_url=https://github.com/torvalds/linux&path=net/ethernet×tamp=2016-09-14T10:36:21Z`
- :swh_web_browse:`origin/directory/?origin_url=https://github.com/python/cpython×tamp=1474620651`
:swh_web_browse:`origin/directory/?origin_url=https://github.com/python/cpython&path=Python×tamp=2017-05-05`
:swh_web_browse:`origin/directory/?origin_url=https://github.com/python/cpython&branch=refs/heads/2.7×tamp=2015-08`
.. http:get:: /browse/origin/(origin_url)/directory/[(path)/]
:deprecated:
.. warning::
That endpoint is deprecated, use :http:get:`/browse/origin/directory/` instead.
HTML view for browsing the content of a directory reachable from the root directory
(including itself) associated to the latest full visit of a software origin.
The content of the directory is first sorted in lexicographical order
and the sub-directories are displayed before the regular files.
The view enables to navigate from the requested directory to
directories reachable from it in a recursive way but also
up to the origin root directory.
A breadcrumb located in the top part of the view allows
to keep track of the paths navigated so far.
The view also enables to easily switch between the origin branches
and releases through a dropdown menu.
The origin branch (default to HEAD) from which to retrieve the directory
content can also be specified by using the branch query parameter.
:param string origin_url: the url of the origin (e.g. https://github.com/(user)/(repo)/)
:param string path: optional parameter used to specify the path of a directory
reachable from the origin root one
:query string branch: specify the origin branch name from which
to retrieve the root directory
:query string release: specify the origin release name from which
to retrieve the root directory
:query string revision: specify the origin revision, identified by the hexadecimal
representation of its **sha1_git** value, from which to retrieve the root directory
:query int visit_id: specify a visit id to retrieve the directory from instead
of using the latest full visit by default
:statuscode 200: no error
:statuscode 404: requested origin can not be found in the archive
or the provided path does not exist from the origin root directory
**Examples:**
.. parsed-literal::
:swh_web_browse:`origin/https://github.com/torvalds/linux/directory/`
:swh_web_browse:`origin/https://github.com/torvalds/linux/directory/net/ethernet/`
:swh_web_browse:`origin/https://github.com/python/cpython/directory/`
:swh_web_browse:`origin/https://github.com/python/cpython/directory/Python/`
:swh_web_browse:`origin/https://github.com/python/cpython/directory/?branch=refs/heads/2.7`
.. http:get:: /browse/origin/(origin_url)/visit/(timestamp)/directory/[(path)/]
:deprecated:
.. warning::
That endpoint is deprecated, use :http:get:`/browse/origin/directory/` instead.
HTML view for browsing the content of a directory reachable from
the root directory (including itself) associated to a visit of a software
origin closest to a provided timestamp.
The content of the directory is first sorted in lexicographical order
and the sub-directories are displayed before the regular files.
The view enables to navigate from the requested directory to
directories reachable from it in a recursive way but also
up to the origin root directory.
A breadcrumb located in the top part of the view allows
to keep track of the paths navigated so far.
The view also enables to easily switch between the origin branches
and releases through a dropdown menu.
The origin branch (default to HEAD) from which to retrieve the directory
content can also be specified by using the branch query parameter.
:param string origin_url: the url of the origin (e.g. https://github.com/(user)/(repo)/)
- :param string timestamp: a date string (any format parsable by `dateutil.parser.parse`_)
- or Unix timestamp to parse in order to find the closest visit.
+ :param string timestamp: an ISO 8601 datetime string to parse in order to find the
+ closest visit.
:param path: optional parameter used to specify the path of a directory
reachable from the origin root one
:type path: string
:query string branch: specify the origin branch name from which
to retrieve the root directory
:query string release: specify the origin release name from which
to retrieve the root directory
:query string revision: specify the origin revision, identified by the hexadecimal
representation of its **sha1_git** value, from which to retrieve the directory
:query int visit_id: specify a visit id to retrieve the directory from instead
of using the provided timestamp
:statuscode 200: no error
:statuscode 404: requested origin can not be found in the archive,
requested visit timestamp does not exist or the provided path does
not exist from the origin root directory
**Examples:**
.. parsed-literal::
:swh_web_browse:`origin/https://github.com/torvalds/linux/visit/1493926809/directory/`
:swh_web_browse:`origin/https://github.com/torvalds/linux/visit/2016-09-14T10:36:21Z/directory/net/ethernet/`
:swh_web_browse:`origin/https://github.com/python/cpython/visit/1474620651/directory/`
:swh_web_browse:`origin/https://github.com/python/cpython/visit/2017-05-05/directory/Python/`
:swh_web_browse:`origin/https://github.com/python/cpython/visit/2015-08/directory/?branch=refs/heads/2.7`
Origin content
""""""""""""""
.. http:get:: /browse/origin/content/
HTML view that produces a display of a content
associated to the latest full visit of a software origin.
If the content to display is textual, it will be highlighted client-side
if possible using highlightjs_. The procedure to perform that task is described
in :http:get:`/browse/content/[(algo_hash):](hash)/`.
It is also possible to highlight specific lines of a textual
content (not in terms of syntax highlighting but to emphasize
some relevant content part) by either:
* clicking on line numbers (holding shift to highlight a lines range)
* using an url fragment in the form '#Ln' or '#Lm-Ln'
The view displays a breadcrumb on top of the rendered
content in order to easily navigate up to the origin root directory.
The view also enables to easily switch between the origin branches
and releases through a dropdown menu.
The origin branch (default to HEAD) from which to retrieve the content
can also be specified by using the branch query parameter.
:query string origin_url: mandatory parameter providing the url of the origin
(e.g. https://github.com/(user)/(repo))
:query string path: path of a content reachable from the origin root directory
:query string branch: specify the origin branch name from which
to retrieve the content
:query string release: specify the origin release name from which
to retrieve the content
:query string revision: specify the origin revision, identified by the hexadecimal
representation of its **sha1_git** value, from which to retrieve the content
- :query string timestamp: a date string (any format parsable by `dateutil.parser.parse`_)
- or Unix timestamp to parse in order to find the closest visit.
+ :query string timestamp: an ISO 8601 datetime string to parse in order to find the
+ closest visit.
:query int visit_id: specify a visit id to retrieve the content from instead
of using the latest full visit by default
:statuscode 200: no error
:statuscode 400: no origin url has been provided as parameter
:statuscode 404: requested origin can not be found in the archive,
or the provided content path does not exist from the origin root directory
**Examples:**
.. parsed-literal::
:swh_web_browse:`origin/content/?origin_url=https://github.com/git/git?path=git.c`
:swh_web_browse:`origin/content/?origin_url=https://github.com/mozilla/gecko-dev&path=js/src/json.cpp`
:swh_web_browse:`origin/content/?origin_url=https://github.com/git/git?path=git.c&branch=refs/heads/next`
- :swh_web_browse:`origin/content/?origin_url=https://github.com/git/git&path=git.c×tamp=1473933564`
:swh_web_browse:`origin/content/?origin_url=https://github.com/git/git&path=git.c×tamp=2016-05-05T00:0:00+00:00Z`
- :swh_web_browse:`origin/content/?origin_url=https://github.com/mozilla/gecko-dev&path=js/src/json.cpp×tamp=1490126182`
:swh_web_browse:`origin/content/?origin_url=https://github.com/mozilla/gecko-dev&path=js/src/json.cpp×tamp=2017-03-21#L904-L931`
:swh_web_browse:`origin/content/?origin_url=https://github.com/git/git&path=git.c&branch=refs/heads/next×tamp=2017-09-15`
.. http:get:: /browse/origin/(origin_url)/content/
:deprecated:
.. warning::
That endpoint is deprecated, use :http:get:`/browse/origin/content/` instead.
HTML view that produces a display of a content
associated to the latest full visit of a software origin.
If the content to display is textual, it will be highlighted client-side
if possible using highlightjs_. The procedure to perform that task is described
in :http:get:`/browse/content/[(algo_hash):](hash)/`.
It is also possible to highlight specific lines of a textual
content (not in terms of syntax highlighting but to emphasize
some relevant content part) by either:
* clicking on line numbers (holding shift to highlight a lines range)
* using an url fragment in the form '#Ln' or '#Lm-Ln'
The view displays a breadcrumb on top of the rendered
content in order to easily navigate up to the origin root directory.
The view also enables to easily switch between the origin branches
and releases through a dropdown menu.
The origin branch (default to HEAD) from which to retrieve the content
can also be specified by using the branch query parameter.
:param string origin_url: the url of the origin (e.g. https://github.com/(user)/(repo)/)
:query string path: path of a content reachable from the origin root directory
:query string branch: specify the origin branch name from which
to retrieve the content
:query string release: specify the origin release name from which
to retrieve the content
:query string revision: specify the origin revision, identified by the hexadecimal
representation of its **sha1_git** value, from which to retrieve the content
- :query string timestamp: a date string (any format parsable by `dateutil.parser.parse`_)
- or Unix timestamp to parse in order to find the closest visit.
+ :query string timestamp: an ISO 8601 datetime string to parse in order to find the
+ closest visit.
:query int visit_id: specify a visit id to retrieve the content from instead
of using the latest full visit by default
:statuscode 200: no error
:statuscode 400: no origin url has been provided as parameter
:statuscode 404: requested origin can not be found in the archive,
or the provided content path does not exist from the origin root directory
**Examples:**
.. parsed-literal::
:swh_web_browse:`origin/https://github.com/git/git/content/?path=git.c`
:swh_web_browse:`origin/https://github.com/mozilla/gecko-dev/content/?path=js/src/json.cpp`
:swh_web_browse:`origin/https://github.com/git/git/content/?path=git.c&branch=refs/heads/next`
- :swh_web_browse:`origin/https://github.com/git/git/content/?path=git.c×tamp=1473933564`
:swh_web_browse:`origin/https://github.com/git/git/content/?path=git.c×tamp=2016-05-05T00:0:00+00:00Z`
- :swh_web_browse:`origin/https://github.com/mozilla/gecko-dev/content?path=js/src/json.cpp×tamp=1490126182`
:swh_web_browse:`origin/https://github.com/mozilla/gecko-dev/content?path=js/src/json.cpp×tamp=2017-03-21#L904-L931`
:swh_web_browse:`origin/https://github.com/git/git/content/git.c/?branch=refs/heads/next×tamp=2017-09-15`
.. http:get:: /browse/origin/(origin_url)/content/(path)/
:deprecated:
.. warning::
That endpoint is deprecated, use :http:get:`/browse/origin/content/` instead.
HTML view that produces a display of a content
associated to the latest full visit of a software origin.
If the content to display is textual, it will be highlighted client-side
if possible using highlightjs_. The procedure to perform that task is described
in :http:get:`/browse/content/[(algo_hash):](hash)/`.
It is also possible to highlight specific lines of a textual
content (not in terms of syntax highlighting but to emphasize
some relevant content part) by either:
* clicking on line numbers (holding shift to highlight a lines range)
* using an url fragment in the form '#Ln' or '#Lm-Ln'
The view displays a breadcrumb on top of the rendered
content in order to easily navigate up to the origin root directory.
The view also enables to easily switch between the origin branches
and releases through a dropdown menu.
The origin branch (default to HEAD) from which to retrieve the content
can also be specified by using the branch query parameter.
:param string origin_url: the url of the origin (e.g. https://github.com/(user)/(repo)/)
:param string path: path of a content reachable from the origin root directory
:query string branch: specify the origin branch name from which
to retrieve the content
:query string release: specify the origin release name from which
to retrieve the content
:query string revision: specify the origin revision, identified by the hexadecimal
representation of its **sha1_git** value, from which to retrieve the content
:query int visit_id: specify a visit id to retrieve the content from instead
of using the latest full visit by default
:statuscode 200: no error
:statuscode 404: requested origin can not be found in the archive,
or the provided content path does not exist from the origin root directory
**Examples:**
.. parsed-literal::
:swh_web_browse:`origin/https://github.com/git/git/content/git.c/`
:swh_web_browse:`origin/https://github.com/git/git/content/git.c/`
:swh_web_browse:`origin/https://github.com/mozilla/gecko-dev/content/js/src/json.cpp/`
:swh_web_browse:`origin/https://github.com/git/git/content/git.c/?branch=refs/heads/next`
.. http:get:: /browse/origin/(origin_url)/visit/(timestamp)/content/(path)/
:deprecated:
.. warning::
That endpoint is deprecated, use :http:get:`/browse/origin/content/` instead.
HTML view that produces a display of a content associated to a
visit of a software origin closest to a provided timestamp.
If the content to display is textual, it will be highlighted client-side
if possible using highlightjs_. The procedure to perform that task is described
in :http:get:`/browse/content/[(algo_hash):](hash)/`.
It is also possible to highlight specific lines of a textual
content (not in terms of syntax highlighting but to emphasize
some relevant content part) by either:
* clicking on line numbers (holding shift to highlight a lines range)
* using an url fragment in the form '#Ln' or '#Lm-Ln'
The view displays a breadcrumb on top of the rendered
content in order to easily navigate up to the origin root directory.
The view also enables to easily switch between the origin branches
and releases through a dropdown menu.
The origin branch (default to HEAD) from which to retrieve the content
can also be specified by using the branch query parameter.
:param string origin_url: the url of the origin (e.g. https://github.com/(user)/(repo)/)
- :param string timestamp: a date string (any format parsable by `dateutil.parser.parse`_)
- or Unix timestamp to parse in order to find the closest visit.
+ :param string timestamp: an ISO 8601 datetime string to parse in order to find the
+ closest visit.
:param string path: path of a content reachable from the origin root directory
:query string branch: specify the origin branch name from which
to retrieve the content
:query string release: specify the origin release name from which
to retrieve the content
:query string revision: specify the origin revision, identified by the hexadecimal
representation of its **sha1_git** value, from which to retrieve the content
:query int visit_id: specify a visit id to retrieve the content from instead
of using the provided timestamp
:statuscode 200: no error
:statuscode 404: requested origin can not be found in the archive,
requested visit timestamp does not exist or the provided content path does
not exist from the origin root directory
**Examples:**
.. parsed-literal::
- :swh_web_browse:`origin/https://github.com/git/git/visit/1473933564/content/git.c/`
:swh_web_browse:`origin/https://github.com/git/git/visit/2016-05-05T00:0:00+00:00Z/content/git.c/`
- :swh_web_browse:`origin/https://github.com/mozilla/gecko-dev/visit/1490126182/content/js/src/json.cpp/`
:swh_web_browse:`origin/https://github.com/mozilla/gecko-dev/visit/2017-03-21/content/js/src/json.cpp/#L904-L931`
:swh_web_browse:`origin/https://github.com/git/git/visit/2017-09-15/content/git.c/?branch=refs/heads/next`
Origin history
""""""""""""""
.. http:get:: /browse/origin/log/
HTML view that produces a display of revisions history heading
to the last revision found during the latest visit of a software origin.
In other words, it shows the commit log associated to the latest
full visit of a software origin.
The following data are displayed for each log entry:
* link to browse the associated revision in the origin context
* author of the revision
* date of the revision
* message associated the revision
* commit date of the revision
By default, the revisions are ordered in reverse chronological order of
their commit date.
N log entries are displayed per page (default is 100). In order to navigate
in a large history, two buttons are present at the bottom of the view:
* **Newer**: fetch and display if available the N more recent log entries
than the ones currently displayed
* **Older**: fetch and display if available the N older log entries
than the ones currently displayed
The view also enables to easily switch between the origin branches
and releases through a dropdown menu.
The origin branch (default to HEAD) from which to retrieve the content
can also be specified by using the branch query parameter.
:query string origin_url: mandatory parameter providing the url of the origin
(e.g. https://github.com/(user)/(repo))
:query int per_page: the number of log entries to display per page
:query int offset: the number of revisions to skip before returning those to display
:query str revs_ordering: specify the revisions ordering, possible values are ``committer_date``,
``dfs``, ``dfs_post`` and ``bfs``
:query string branch: specify the origin branch name from which
to retrieve the commit log
:query string release: specify the origin release name from which
to retrieve the commit log
:query string revision: specify the origin revision, identified by the hexadecimal
representation of its **sha1_git** value, from which to retrieve the commit log
- :query string timestamp: a date string (any format parsable by `dateutil.parser.parse`_)
- or Unix timestamp to parse in order to find the closest visit.
+ :query string timestamp: an ISO 8601 datetime string to parse in order to find the
+ closest visit.
:query int visit_id: specify a visit id to retrieve the history log from instead
of using the latest visit by default
:statuscode 200: no error
:statuscode 400: no origin url has been provided as parameter
:statuscode 404: requested origin can not be found in the archive
**Examples:**
.. parsed-literal::
:swh_web_browse:`origin/log/?origin_url=https://github.com/videolan/vlc`
:swh_web_browse:`origin/log/?origin_url=https://github.com/Kitware/CMake`
:swh_web_browse:`origin/log/?origin_url=https://github.com/Kitware/CMake&branch=refs/heads/release`
:swh_web_browse:`origin/log/?origin_url=https://github.com/videolan/vlc&visit=1459651262`
:swh_web_browse:`origin/log/?origin_url=https://github.com/Kitware/CMake×tamp=2016-04-01`
:swh_web_browse:`origin/log/?origin_url=https://github.com/Kitware/CMake&branch=refs/heads/release×tamp=1438116814`
:swh_web_browse:`origin/log/?origin_url=https://github.com/Kitware/CMake&branch=refs/heads/release×tamp=2017-05-05T03:14:23Z`
.. http:get:: /browse/origin/(origin_url)/log/
:deprecated:
.. warning::
That endpoint is deprecated, use :http:get:`/browse/origin/log/` instead.
HTML view that produces a display of revisions history heading
to the last revision found during the latest visit of a software origin.
In other words, it shows the commit log associated to the latest
full visit of a software origin.
The following data are displayed for each log entry:
* link to browse the associated revision in the origin context
* author of the revision
* date of the revision
* message associated the revision
* commit date of the revision
By default, the revisions are ordered in reverse chronological order of
their commit date.
N log entries are displayed per page (default is 100). In order to navigate
in a large history, two buttons are present at the bottom of the view:
* **Newer**: fetch and display if available the N more recent log entries
than the ones currently displayed
* **Older**: fetch and display if available the N older log entries
than the ones currently displayed
The view also enables to easily switch between the origin branches
and releases through a dropdown menu.
The origin branch (default to HEAD) from which to retrieve the content
can also be specified by using the branch query parameter.
:query string origin_url: mandatory parameter providing the url of the origin
(e.g. https://github.com/(user)/(repo))
:query int per_page: the number of log entries to display per page
:query int offset: the number of revisions to skip before returning those to display
:query str revs_ordering: specify the revisions ordering, possible values are ``committer_date``,
``dfs``, ``dfs_post`` and ``bfs``
:query string branch: specify the origin branch name from which
to retrieve the commit log
:query string release: specify the origin release name from which
to retrieve the commit log
:query string revision: specify the origin revision, identified by the hexadecimal
representation of its **sha1_git** value, from which to retrieve the commit log
- :query string timestamp: a date string (any format parsable by `dateutil.parser.parse`_)
- or Unix timestamp to parse in order to find the closest visit.
+ :query string timestamp: an ISO 8601 datetime string to parse in order to find the
+ closest visit.
:query int visit_id: specify a visit id to retrieve the history log from instead
of using the latest visit by default
:statuscode 200: no error
:statuscode 404: requested origin can not be found in the archive
**Examples:**
.. parsed-literal::
:swh_web_browse:`origin/https://github.com/videolan/vlc/log/`
:swh_web_browse:`origin/https://github.com/Kitware/CMake/log/`
:swh_web_browse:`origin/https://github.com/Kitware/CMake/log/?branch=refs/heads/release`
- :swh_web_browse:`origin/https://github.com/videolan/vlc/log/?visit=1459651262`
:swh_web_browse:`origin/https://github.com/Kitware/CMake/log/?timestamp=2016-04-01`
- :swh_web_browse:`origin/https://github.com/Kitware/CMake/log/?branch=refs/heads/release×tamp=1438116814`
:swh_web_browse:`origin/https://github.com/Kitware/CMake/log/?branch=refs/heads/release×tamp=2017-05-05T03:14:23Z`
.. http:get:: /browse/origin/(origin_url)/visit/(timestamp)/log/
:deprecated:
.. warning::
That endpoint is deprecated, use :http:get:`/browse/origin/log/` instead.
HTML view that produces a display of revisions history heading
to the last revision found during a visit of a software origin closest
to the provided timestamp.
In other words, it shows the commit log associated to a visit of
a software origin closest to a provided timestamp.
The following data are displayed for each log entry:
* author of the revision
* link to the revision metadata
* message associated the revision
* date of the revision
* link to browse the associated source tree in the origin context
N log entries are displayed per page (default is 20). In order to navigate
in a large history, two buttons are present at the bottom of the view:
* **Newer**: fetch and display if available the N more recent log entries
than the ones currently displayed
* **Older**: fetch and display if available the N older log entries
than the ones currently displayed
The view also enables to easily switch between the origin branches
and releases through a dropdown menu.
The origin branch (default to HEAD) from which to retrieve the content
can also be specified by using the branch query parameter.
:param string origin_url: the url of the origin (e.g. https://github.com/(user)/(repo)/)
- :param string timestamp: a date string (any format parsable by `dateutil.parser.parse`_)
- or Unix timestamp to parse in order to find the closest visit.
+ :param string timestamp: an ISO 8601 datetime string to parse in order to find the
+ closest visit.
:query int per_page: the number of log entries to display per page
(default is 20, max is 50)
:query string branch: specify the origin branch name from which
to retrieve the commit log
:query string release: specify the origin release name from which
to retrieve the commit log
:query string revision: specify the origin revision, identified by the hexadecimal
representation of its **sha1_git** value, from which to retrieve the commit log
:query int visit_id: specify a visit id to retrieve the history log from instead
of using the provided timestamp
:statuscode 200: no error
:statuscode 404: requested origin can not be found in the archive
**Examples:**
.. parsed-literal::
- :swh_web_browse:`origin/https://github.com/videolan/vlc/visit/1459651262/log/`
:swh_web_browse:`origin/https://github.com/Kitware/CMake/visit/2016-04-01/log/`
- :swh_web_browse:`origin/https://github.com/Kitware/CMake/visit/1438116814/log/?branch=refs/heads/release`
:swh_web_browse:`origin/https://github.com/Kitware/CMake/visit/2017-05-05T03:14:23Z/log/?branch=refs/heads/release`
Origin branches
"""""""""""""""
.. http:get:: /browse/origin/branches/
HTML view that produces a display of the list of branches
found during the latest full visit of a software origin.
The following data are displayed for each branch:
* its name
* a link to browse the associated directory
* a link to browse the associated revision
* last commit message
* last commit date
That list of branches is paginated, each page displaying a maximum of 100 branches.
:query string origin_url: mandatory parameter providing the url of the origin
(e.g. https://github.com/(user)/(repo))
- :query string timestamp: a date string (any format parsable by `dateutil.parser.parse`_)
- or Unix timestamp to parse in order to find the closest visit.
+ :query string timestamp: an ISO 8601 datetime string to parse in order to find the
+ closest visit.
:statuscode 200: no error
:statuscode 400: no origin url has been provided as parameter
:statuscode 404: requested origin can not be found in the archive
**Examples:**
.. parsed-literal::
:swh_web_browse:`origin/branches/?origin_url=deb://Debian/packages/linux`
:swh_web_browse:`origin/branches/?origin_url=https://github.com/webpack/webpack`
:swh_web_browse:`origin/branches/?origin_url=https://github.com/kripken/emscripten×tamp=2017-05-05T12:02:03Z`
:swh_web_browse:`origin/branches/?origin_url=deb://Debian/packages/apache2-mod-xforward×tamp=2017-11-15T05:15:09Z`
.. http:get:: /browse/origin/(origin_url)/branches/
:deprecated:
.. warning::
That endpoint is deprecated, use :http:get:`/browse/origin/branches/` instead.
HTML view that produces a display of the list of branches
found during the latest full visit of a software origin.
The following data are displayed for each branch:
* its name
* a link to browse the associated directory
* a link to browse the associated revision
* last commit message
* last commit date
That list of branches is paginated, each page displaying a maximum of 100 branches.
:param string origin_url: the url of the origin (e.g. https://github.com/(user)/(repo)/)
- :query string timestamp: a date string (any format parsable by `dateutil.parser.parse`_)
- or Unix timestamp to parse in order to find the closest visit.
+ :query string timestamp: an ISO 8601 datetime string to parse in order to find the
+ closest visit.
:statuscode 200: no error
:statuscode 404: requested origin can not be found in the archive
**Examples:**
.. parsed-literal::
:swh_web_browse:`origin/deb://Debian/packages/linux/branches/`
:swh_web_browse:`origin/https://github.com/webpack/webpack/branches/`
:swh_web_browse:`origin/https://github.com/kripken/emscripten/branches/?timestamp=2017-05-05T12:02:03Z`
:swh_web_browse:`origin/deb://Debian/packages/apache2-mod-xforward/branches/?timestamp=2017-11-15T05:15:09`
.. http:get:: /browse/origin/(origin_url)/visit/(timestamp)/branches/
:deprecated:
.. warning::
That endpoint is deprecated, use :http:get:`/browse/origin/branches/` instead.
HTML view that produces a display of the list of branches
found during a visit of a software origin closest to the provided timestamp.
The following data are displayed for each branch:
* its name
* a link to browse the associated directory
* a link to browse the associated revision
* last commit message
* last commit date
That list of branches is paginated, each page displaying a maximum of 100 branches.
:param string origin_url: the url of the origin (e.g. https://github.com/(user)/(repo)/)
- :param string timestamp: a date string (any format parsable by `dateutil.parser.parse`_)
- or Unix timestamp to parse in order to find the closest visit.
+ :param string timestamp: an ISO 8601 datetime string to parse in order to find the
+ closest visit.
:statuscode 200: no error
:statuscode 404: requested origin can not be found in the archive
**Examples:**
.. parsed-literal::
:swh_web_browse:`origin/https://github.com/kripken/emscripten/visit/2017-05-05T12:02:03Z/branches/`
:swh_web_browse:`origin/deb://Debian/packages/apache2-mod-xforward/visit/2017-11-15T05:15:09Z/branches/`
Origin releases
"""""""""""""""
.. http:get:: /browse/origin/releases/
HTML view that produces a display of the list of releases
found during the latest full visit of a software origin.
The following data are displayed for each release:
* its name
* a link to browse the release details
* its target type (revision, directory, content or release)
* its associated message
* its date
That list of releases is paginated, each page displaying a maximum of 100 releases.
:query string origin_url: mandatory parameter providing the url of the origin
(e.g. https://github.com/(user)/(repo))
- :query string timestamp: a date string (any format parsable by `dateutil.parser.parse`_)
- or Unix timestamp to parse in order to find the closest visit.
+ :query string timestamp: an ISO 8601 datetime string to parse in order to find the
+ closest visit.
:statuscode 200: no error
:statuscode 400: no origin url has been provided as parameter
:statuscode 404: requested origin can not be found in the archive
**Examples:**
.. parsed-literal::
:swh_web_browse:`origin/releases/?origin_url=https://github.com/git/git`
:swh_web_browse:`origin/releases/?origin_url=https://github.com/webpack/webpack`
:swh_web_browse:`origin/releases/?origin_url=https://github.com/torvalds/linux×tamp=2017-11-21T19:37:42Z`
:swh_web_browse:`origin/releases/?origin_url=https://github.com/Kitware/CMake×tamp=2016-09-23T14:06:35Z`
.. http:get:: /browse/origin/(origin_url)/releases/
:deprecated:
.. warning::
That endpoint is deprecated, use :http:get:`/browse/origin/releases/` instead.
HTML view that produces a display of the list of releases
found during the latest full visit of a software origin.
The following data are displayed for each release:
* its name
* a link to browse the release details
* its target type (revision, directory, content or release)
* its associated message
* its date
That list of releases is paginated, each page displaying a maximum of 100 releases.
:param string origin_url: the url of the origin (e.g. https://github.com/(user)/(repo)/)
- :query string timestamp: a date string (any format parsable by `dateutil.parser.parse`_)
- or Unix timestamp to parse in order to find the closest visit.
+ :query string timestamp: an ISO 8601 datetime string to parse in order to find the
+ closest visit.
:statuscode 200: no error
:statuscode 404: requested origin can not be found in the archive
**Examples:**
.. parsed-literal::
:swh_web_browse:`origin/https://github.com/git/git/releases/`
:swh_web_browse:`origin/https://github.com/webpack/webpack/releases/`
:swh_web_browse:`origin/https://github.com/torvalds/linux/releases/?timestamp=2017-11-21T19:37:42Z`
:swh_web_browse:`origin/https://github.com/Kitware/CMake/releases/?timestamp=2016-09-23T14:06:35Z`
.. http:get:: /browse/origin/(origin_url)/visit/(timestamp)/releases/
:deprecated:
.. warning::
That endpoint is deprecated, use :http:get:`/browse/origin/releases/` instead.
HTML view that produces a display of the list of releases
found during a visit of a software origin closest to the provided timestamp.
The following data are displayed for each release:
* its name
* a link to browse the release details
* its target type (revision, directory, content or release)
* its associated message
* its date
That list of releases is paginated, each page displaying a maximum of 100 releases.
:param string origin_url: the url of the origin (e.g. https://github.com/(user)/(repo)/)
- :param string timestamp: a date string (any format parsable by `dateutil.parser.parse`_)
- or Unix timestamp to parse in order to find the closest visit.
+ :param string timestamp: an ISO 8601 datetime string to parse in order to find the
+ closest visit.
:statuscode 200: no error
:statuscode 404: requested origin can not be found in the archive
**Examples:**
.. parsed-literal::
:swh_web_browse:`origin/https://github.com/torvalds/linux/visit/2017-11-21T19:37:42Z/releases/`
:swh_web_browse:`origin/https://github.com/Kitware/CMake/visit/2016-09-23T14:06:35Z/releases/`
.. _highlightjs: https://highlightjs.org/
-.. _dateutil.parser.parse: http://dateutil.readthedocs.io/en/stable/parser.html
diff --git a/docs/uri-scheme-browse-revision.rst b/docs/uri-scheme-browse-revision.rst
index 79a65961e..c54a151a7 100644
--- a/docs/uri-scheme-browse-revision.rst
+++ b/docs/uri-scheme-browse-revision.rst
@@ -1,79 +1,75 @@
Revision
^^^^^^^^
.. http:get:: /browse/revision/(sha1_git)/
HTML view to browse a revision. It notably shows the revision date
and message but also offers links to get more details on:
* its author
* its parent revisions
* the history log reachable from it
The view also enables to navigate in the source tree associated to the
revision and browse its content.
Last but not least, the view displays the list of file changes introduced
in the revision but also the diffs of each changed files.
:param string sha1_git: hexadecimal representation for the **sha1_git**
identifier of a revision
:query string origin_url: used internally to associate an origin url
(e.g. https://github.com/user/repo) to the revision
- :query string timestamp: used internally to associate an origin visit to the
- revision, must be a date string (any format parsable by `dateutil.parser.parse`_)
- or Unix timestamp to parse in order to find the closest visit.
- :query int visit_id: used internally to specify a visit id instead of
+ :query string timestamp: an ISO 8601 datetime string to parse in order to find the
+ closest visit.
+ :query int visit_id: specify a visit id instead of
using the provided timestamp
- :query string path: used internally when navigating in the source tree
- associated to the revision
+ :query string path: optional relative path from the revision root directory
:statuscode 200: no error
:statuscode 404: requested revision can not be found in the archive
**Examples:**
.. parsed-literal::
:swh_web_browse:`revision/f1b94134a4b879bc55c3dacdb496690c8ebdc03f/`
:swh_web_browse:`revision/d1aa2b3f607b35dc5dbf613b2334b6d243ec2bda/`
- .. _dateutil.parser.parse: http://dateutil.readthedocs.io/en/stable/parser.html
-
.. http:get:: /browse/revision/(sha1_git)/log/
HTML view that displays the list of revisions heading to
a given one. In other words, it shows a commit log.
The following data are displayed for each log entry:
* link to browse the revision
* author of the revision
* date of the revision
* message associated to the revision
* commit date of the revision
By default, the revisions are ordered in reverse chronological order of
their commit date.
N log entries are displayed per page (default is 100). In order to navigate
in a large history, two buttons are present at the bottom of the view:
* **Newer**: fetch and display if available the N more recent log entries
than the ones currently displayed
* **Older**: fetch and display if available the N older log entries
than the ones currently displayed
:param string sha1_git: hexadecimal representation for the **sha1_git**
identifier of a revision
:query int per_page: the number of log entries to display per page
:query int offset: the number of revisions to skip before returning those to display
:query str revs_ordering: specify the revisions ordering, possible values are ``committer_date``,
``dfs``, ``dfs_post`` and ``bfs``
:statuscode 200: no error
:statuscode 404: requested revision can not be found in the archive
**Examples:**
.. parsed-literal::
:swh_web_browse:`revision/f1b94134a4b879bc55c3dacdb496690c8ebdc03f/log/`
:swh_web_browse:`revision/d1aa2b3f607b35dc5dbf613b2334b6d243ec2bda/log/`
diff --git a/docs/uri-scheme-browse.rst b/docs/uri-scheme-browse.rst
index 87b637f6e..ba81d6a2e 100644
--- a/docs/uri-scheme-browse.rst
+++ b/docs/uri-scheme-browse.rst
@@ -1,93 +1,92 @@
URI scheme for swh-web Browse application
=========================================
This web application aims to provide HTML views to easily navigate in the archive,
thus it needs to be reached from a web browser.
If you intend to query the archive programmatically through any HTTP client,
please refer to the :ref:`swh-web-api-urls` section instead.
Context-independent browsing
----------------------------
Context-independent URLs provide information about objects (e.g.,
revisions, directories, contents, person, ...), independently of the
contexts where they have been found (e.g., specific repositories,
branches, commits, ...).
The following endpoints are the same of the API case (see below), and
just render the corresponding information for user consumption. Where
hyperlinks are created, they always point to other context-independent
user URLs:
* :http:get:`/browse/content/[(algo_hash):](hash)/`: Display a content
* :http:get:`/browse/content/[(algo_hash):](hash)/raw/`: Get / Download content raw data
* :http:get:`/browse/directory/(sha1_git)/`: Browse the content of a directory
* :http:get:`/browse/person/(person_id)/`: Information on a person
* :http:get:`/browse/revision/(sha1_git)/`: Browse a revision
* :http:get:`/browse/revision/(sha1_git)/log/`: Browse history log heading to a revision
Context-dependent browsing
--------------------------
Context-dependent URLs provide information about objects, limited to
specific contexts where the objects have been found.
For instance, instead of having to specify a (root) revision by **sha1_git**, users might want to
specify a place and a time. In Software Heritage a "place" is an origin, with an optional
branch name; a "time" is a timestamp at which some place has been observed by
Software Heritage crawlers.
Wherever a revision context is expected in a path (i.e., a
**/browse/revision/(sha1_git)/** path fragment) we can put in its stead a path fragment
of the form **/browse/origin/?origin_url=(origin_url)×tamp=(timestamp)&branch=(branch)**.
Such a fragment is resolved, internally by the archive, to a revision **sha1_git** as follows:
- if **timestamp** is not given as query parameter: look for the most recent crawl of origin
identified by **origin_url**
- if **timestamp** is given: look for the closest crawl of origin identified
by **origin_url** from timestamp **timestamp**
- if **branch** is given as a query parameter: look for the branch **branch**
- if **branch** is absent: look for branch "HEAD" or "master"
- return the revision **sha1_git** pointed by the chosen branch
The already mentioned URLs for revision contexts can therefore be alternatively
specified by users as:
* :http:get:`/browse/origin/directory/`
* :http:get:`/browse/origin/content/`
* :http:get:`/browse/origin/log/`
Typing:
- **origin_url** corresponds to the URL the origin was crawled from,
for instance https://github.com/(user)/(repo)/
- **branch** name is given as per the corresponding VCS (e.g., Git) as
a query parameter to the requested URL.
- **timestamp** is given in a format as liberal as possible, to uphold the
principle of least surprise. At the very minimum it is possible to
enter timestamps as:
- - Unix epoch timestamp (see for instance the output of `date +%s`)
- ISO 8601 timestamps (see for instance the output of `date -I`, `date -Is`)
- YYYY[MM[DD[HH[MM[SS]]]]] ad-hoc format
- YYYY[-MM[-DD[ HH:[MM:[SS:]]]]] ad-hoc format
swh-web Browse Urls
-------------------
.. include:: uri-scheme-browse-content.rst
.. include:: uri-scheme-browse-directory.rst
.. include:: uri-scheme-browse-origin.rst
.. include:: uri-scheme-browse-person.rst
.. include:: uri-scheme-browse-release.rst
.. include:: uri-scheme-browse-revision.rst
.. include:: uri-scheme-browse-snapshot.rst
diff --git a/mypy.ini b/mypy.ini
index e5d71724f..4c94e70fe 100644
--- a/mypy.ini
+++ b/mypy.ini
@@ -1,52 +1,55 @@
[mypy]
namespace_packages = True
warn_unused_ignores = True
# support for django magic: https://github.com/typeddjango/django-stubs
plugins = mypy_django_plugin.main, mypy_drf_plugin.main
[mypy.plugins.django-stubs]
django_settings_module = swh.web.settings.development
# 3rd party libraries without stubs (yet)
[mypy-bs4.*]
ignore_missing_imports = True
[mypy-corsheaders.*]
ignore_missing_imports = True
[mypy-django_js_reverse.*]
ignore_missing_imports = True
[mypy-htmlmin.*]
ignore_missing_imports = True
+[mypy-iso8601.*]
+ignore_missing_imports = True
+
[mypy-keycloak.*]
ignore_missing_imports = True
[mypy-magic.*]
ignore_missing_imports = True
[mypy-pkg_resources.*]
ignore_missing_imports = True
[mypy-prometheus_client.*]
ignore_missing_imports = True
[mypy-pygments.*]
ignore_missing_imports = True
[mypy-pytest.*]
ignore_missing_imports = True
[mypy-requests_mock.*]
ignore_missing_imports = True
[mypy-sphinx.*]
ignore_missing_imports = True
[mypy-sphinxcontrib.*]
ignore_missing_imports = True
[mypy-swh.docs.*]
ignore_missing_imports = True
diff --git a/requirements.txt b/requirements.txt
index 53032284e..17fb4438c 100644
--- a/requirements.txt
+++ b/requirements.txt
@@ -1,24 +1,24 @@
# Add here external Python modules dependencies, one per line. Module names
# should match https://pypi.python.org/pypi names. For the full spec or
# dependency lines, see https://pip.readthedocs.org/en/1.1/requirements.html
beautifulsoup4
django < 3
django-cors-headers
django-js-reverse
djangorestframework
django-webpack-loader
docutils
htmlmin
+iso8601
lxml
prometheus-client
pybadges
pygments
-python-dateutil
python-keycloak >= 0.19.0
python-magic >= 0.4.0
python-memcached
pyyaml
requests
sentry-sdk
typing-extensions
diff --git a/swh/web/browse/snapshot_context.py b/swh/web/browse/snapshot_context.py
index a6e262e29..69e4eead3 100644
--- a/swh/web/browse/snapshot_context.py
+++ b/swh/web/browse/snapshot_context.py
@@ -1,1465 +1,1465 @@
# Copyright (C) 2018-2020 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU Affero General Public License version 3, or any later version
# See top-level LICENSE file for more information
# Utility module for browsing the archive in a snapshot context.
from collections import defaultdict
from copy import copy
-from typing import Any, Dict, List, Optional, Union, Tuple
+from typing import Any, Dict, List, Optional, Tuple
from django.core.cache import cache
from django.shortcuts import render
from django.template.defaultfilters import filesizeformat
from django.utils.html import escape
import sentry_sdk
from swh.model.identifiers import (
swhid,
snapshot_identifier,
CONTENT,
DIRECTORY,
REVISION,
RELEASE,
SNAPSHOT,
)
from swh.web.browse.utils import (
get_directory_entries,
gen_directory_link,
gen_revision_link,
gen_revision_url,
request_content,
gen_content_link,
prepare_content_for_display,
content_display_max_size,
format_log_entries,
gen_revision_log_link,
gen_release_link,
get_readme_to_display,
gen_snapshot_link,
)
from swh.web.common import service, highlightjs
from swh.web.common.exc import handle_view_exception, NotFoundExc, BadInputExc
from swh.web.common.identifiers import get_swhids_info
from swh.web.common.origin_visits import get_origin_visit
from swh.web.common.typing import (
OriginInfo,
SnapshotBranchInfo,
SnapshotReleaseInfo,
SnapshotContext,
ContentMetadata,
DirectoryMetadata,
SWHObjectInfo,
)
from swh.web.common.utils import (
reverse,
gen_path_info,
format_utc_iso_date,
swh_object_icons,
)
from swh.web.config import get_config
_empty_snapshot_id = snapshot_identifier({"branches": {}})
def _get_branch(branches, branch_name, snapshot_id):
"""
Utility function to get a specific branch from a branches list.
Its purpose is to get the default HEAD branch as some software origin
(e.g those with svn type) does not have it. In that latter case, check
if there is a master branch instead and returns it.
"""
filtered_branches = [b for b in branches if b["name"] == branch_name]
if filtered_branches:
return filtered_branches[0]
elif branch_name == "HEAD":
filtered_branches = [b for b in branches if b["name"].endswith("master")]
if filtered_branches:
return filtered_branches[0]
elif branches:
return branches[0]
else:
# case where a large branches list has been truncated
snp = service.lookup_snapshot(
snapshot_id,
branches_from=branch_name,
branches_count=1,
target_types=["revision", "alias"],
)
snp_branch, _ = process_snapshot_branches(snp)
if snp_branch and snp_branch[0]["name"] == branch_name:
branches.append(snp_branch[0])
return snp_branch[0]
def _get_release(releases, release_name, snapshot_id):
"""
Utility function to get a specific release from a releases list.
Returns None if the release can not be found in the list.
"""
filtered_releases = [r for r in releases if r["name"] == release_name]
if filtered_releases:
return filtered_releases[0]
else:
# case where a large branches list has been truncated
try:
# git origins have specific branches for releases
snp = service.lookup_snapshot(
snapshot_id,
branches_from=f"refs/tags/{release_name}",
branches_count=1,
target_types=["release"],
)
except NotFoundExc:
snp = service.lookup_snapshot(
snapshot_id,
branches_from=release_name,
branches_count=1,
target_types=["release"],
)
_, snp_release = process_snapshot_branches(snp)
if snp_release and snp_release[0]["name"] == release_name:
releases.append(snp_release[0])
return snp_release[0]
def _branch_not_found(
branch_type, branch, snapshot_id, snapshot_sizes, origin_info, timestamp, visit_id
):
"""
Utility function to raise an exception when a specified branch/release
can not be found.
"""
if branch_type == "branch":
branch_type = "Branch"
branch_type_plural = "branches"
target_type = "revision"
else:
branch_type = "Release"
branch_type_plural = "releases"
target_type = "release"
if snapshot_id and snapshot_sizes[target_type] == 0:
msg = "Snapshot with id %s has an empty list" " of %s!" % (
snapshot_id,
branch_type_plural,
)
elif snapshot_id:
msg = "%s %s for snapshot with id %s" " not found!" % (
branch_type,
branch,
snapshot_id,
)
elif visit_id and snapshot_sizes[target_type] == 0:
msg = (
"Origin with url %s"
" for visit with id %s has an empty list"
" of %s!" % (origin_info["url"], visit_id, branch_type_plural)
)
elif visit_id:
msg = (
"%s %s associated to visit with"
" id %s for origin with url %s"
" not found!" % (branch_type, branch, visit_id, origin_info["url"])
)
elif snapshot_sizes[target_type] == 0:
msg = (
"Origin with url %s"
" for visit with timestamp %s has an empty list"
" of %s!" % (origin_info["url"], timestamp, branch_type_plural)
)
else:
msg = (
"%s %s associated to visit with"
" timestamp %s for origin with "
"url %s not found!" % (branch_type, branch, timestamp, origin_info["url"])
)
raise NotFoundExc(escape(msg))
def process_snapshot_branches(
snapshot: Dict[str, Any]
) -> Tuple[List[SnapshotBranchInfo], List[SnapshotReleaseInfo]]:
"""
Process a dictionary describing snapshot branches: extract those
targeting revisions and releases, put them in two different lists,
then sort those lists in lexicographical order of the branches' names.
Args:
snapshot: A dict describing a snapshot as returned for instance by
:func:`swh.web.common.service.lookup_snapshot`
Returns:
A tuple whose first member is the sorted list of branches
targeting revisions and second member the sorted list of branches
targeting releases
"""
snapshot_branches = snapshot["branches"]
branches: Dict[str, SnapshotBranchInfo] = {}
branch_aliases: Dict[str, str] = {}
releases: Dict[str, SnapshotReleaseInfo] = {}
revision_to_branch = defaultdict(set)
revision_to_release = defaultdict(set)
release_to_branch = defaultdict(set)
for branch_name, target in snapshot_branches.items():
if not target:
# FIXME: display branches with an unknown target anyway
continue
target_id = target["target"]
target_type = target["target_type"]
if target_type == "revision":
branches[branch_name] = SnapshotBranchInfo(
name=branch_name,
revision=target_id,
date=None,
directory=None,
message=None,
url=None,
)
revision_to_branch[target_id].add(branch_name)
elif target_type == "release":
release_to_branch[target_id].add(branch_name)
elif target_type == "alias":
branch_aliases[branch_name] = target_id
# FIXME: handle pointers to other object types
def _add_release_info(branch, release):
releases[branch] = SnapshotReleaseInfo(
name=release["name"],
branch_name=branch,
date=format_utc_iso_date(release["date"]),
directory=None,
id=release["id"],
message=release["message"],
target_type=release["target_type"],
target=release["target"],
url=None,
)
def _add_branch_info(branch, revision):
branches[branch] = SnapshotBranchInfo(
name=branch,
revision=revision["id"],
directory=revision["directory"],
date=format_utc_iso_date(revision["date"]),
message=revision["message"],
url=None,
)
releases_info = service.lookup_release_multiple(release_to_branch.keys())
for release in releases_info:
branches_to_update = release_to_branch[release["id"]]
for branch in branches_to_update:
_add_release_info(branch, release)
if release["target_type"] == "revision":
revision_to_release[release["target"]].update(branches_to_update)
revisions = service.lookup_revision_multiple(
set(revision_to_branch.keys()) | set(revision_to_release.keys())
)
for revision in revisions:
if not revision:
continue
for branch in revision_to_branch[revision["id"]]:
_add_branch_info(branch, revision)
for release in revision_to_release[revision["id"]]:
releases[release]["directory"] = revision["directory"]
for branch_alias, branch_target in branch_aliases.items():
if branch_target in branches:
branches[branch_alias] = copy(branches[branch_target])
else:
snp = service.lookup_snapshot(
snapshot["id"], branches_from=branch_target, branches_count=1
)
if snp and branch_target in snp["branches"]:
if snp["branches"][branch_target] is None:
continue
target_type = snp["branches"][branch_target]["target_type"]
target = snp["branches"][branch_target]["target"]
if target_type == "revision":
branches[branch_alias] = snp["branches"][branch_target]
revision = service.lookup_revision(target)
_add_branch_info(branch_alias, revision)
elif target_type == "release":
release = service.lookup_release(target)
_add_release_info(branch_alias, release)
if branch_alias in branches:
branches[branch_alias]["name"] = branch_alias
ret_branches = list(sorted(branches.values(), key=lambda b: b["name"]))
ret_releases = list(sorted(releases.values(), key=lambda b: b["name"]))
return ret_branches, ret_releases
def get_snapshot_content(
snapshot_id: str,
) -> Tuple[List[SnapshotBranchInfo], List[SnapshotReleaseInfo]]:
"""Returns the lists of branches and releases
associated to a swh snapshot.
That list is put in cache in order to speedup the navigation
in the swh-web/browse ui.
.. warning:: At most 1000 branches contained in the snapshot
will be returned for performance reasons.
Args:
snapshot_id: hexadecimal representation of the snapshot identifier
Returns:
A tuple with two members. The first one is a list of dict describing
the snapshot branches. The second one is a list of dict describing the
snapshot releases.
Raises:
NotFoundExc if the snapshot does not exist
"""
cache_entry_id = "swh_snapshot_%s" % snapshot_id
cache_entry = cache.get(cache_entry_id)
if cache_entry:
return cache_entry["branches"], cache_entry["releases"]
branches: List[SnapshotBranchInfo] = []
releases: List[SnapshotReleaseInfo] = []
snapshot_content_max_size = get_config()["snapshot_content_max_size"]
if snapshot_id:
snapshot = service.lookup_snapshot(
snapshot_id, branches_count=snapshot_content_max_size
)
branches, releases = process_snapshot_branches(snapshot)
cache.set(cache_entry_id, {"branches": branches, "releases": releases,})
return branches, releases
def get_origin_visit_snapshot(
origin_info: OriginInfo,
- visit_ts: Optional[Union[int, str]] = None,
+ visit_ts: Optional[str] = None,
visit_id: Optional[int] = None,
snapshot_id: Optional[str] = None,
) -> Tuple[List[SnapshotBranchInfo], List[SnapshotReleaseInfo]]:
"""Returns the lists of branches and releases associated to an origin for
a given visit.
The visit is expressed by either:
* a snapshot identifier
* a timestamp, if no visit with that exact timestamp is found,
the closest one from the provided timestamp will be used.
If no visit parameter is provided, it returns the list of branches
found for the latest visit.
That list is put in cache in order to speedup the navigation
in the swh-web/browse ui.
.. warning:: At most 1000 branches contained in the snapshot
will be returned for performance reasons.
Args:
origin_info: a dict filled with origin information
- visit_ts: an ISO date string or Unix timestamp to parse
+ visit_ts: an ISO 8601 datetime string to parse
visit_id: visit id for disambiguation in case several visits have
the same timestamp
snapshot_id: if provided, visit associated to the snapshot will be processed
Returns:
A tuple with two members. The first one is a list of dict describing
the origin branches for the given visit.
The second one is a list of dict describing the origin releases
for the given visit.
Raises:
NotFoundExc if the origin or its visit are not found
"""
visit_info = get_origin_visit(origin_info, visit_ts, visit_id, snapshot_id)
return get_snapshot_content(visit_info["snapshot"])
def get_snapshot_context(
snapshot_id: Optional[str] = None,
origin_url: Optional[str] = None,
timestamp: Optional[str] = None,
visit_id: Optional[int] = None,
branch_name: Optional[str] = None,
release_name: Optional[str] = None,
revision_id: Optional[str] = None,
path: Optional[str] = None,
browse_context: str = "directory",
) -> SnapshotContext:
"""
Utility function to compute relevant information when navigating
the archive in a snapshot context. The snapshot is either
referenced by its id or it will be retrieved from an origin visit.
Args:
snapshot_id: hexadecimal representation of a snapshot identifier
origin_url: an origin_url
timestamp: a datetime string for retrieving the closest
visit of the origin
visit_id: optional visit id for disambiguation in case
of several visits with the same timestamp
branch_name: optional branch name set when browsing the snapshot in
that scope (will default to "HEAD" if not provided)
release_name: optional release name set when browsing the snapshot in
that scope
revision_id: optional revision identifier set when browsing the snapshot in
that scope
path: optional path of the object currently browsed in the snapshot
browse_context: indicates which type of object is currently browsed
Returns:
A dict filled with snapshot context information.
Raises:
swh.web.common.exc.NotFoundExc: if no snapshot is found for the visit
of an origin.
"""
assert origin_url is not None or snapshot_id is not None
origin_info = None
visit_info = None
url_args = {}
query_params: Dict[str, Any] = {}
origin_visits_url = None
if origin_url:
if visit_id is not None:
query_params["visit_id"] = visit_id
elif snapshot_id is not None:
query_params["snapshot"] = snapshot_id
origin_info = service.lookup_origin({"url": origin_url})
visit_info = get_origin_visit(origin_info, timestamp, visit_id, snapshot_id)
formatted_date = format_utc_iso_date(visit_info["date"])
visit_info["formatted_date"] = formatted_date
snapshot_id = visit_info["snapshot"]
if not snapshot_id:
raise NotFoundExc(
"No snapshot associated to the visit of origin "
"%s on %s" % (escape(origin_url), formatted_date)
)
# provided timestamp is not necessarily equals to the one
# of the retrieved visit, so get the exact one in order
# to use it in the urls generated below
if timestamp:
timestamp = visit_info["date"]
branches, releases = get_origin_visit_snapshot(
origin_info, timestamp, visit_id, snapshot_id
)
query_params["origin_url"] = origin_info["url"]
origin_visits_url = reverse(
"browse-origin-visits", query_params={"origin_url": origin_info["url"]}
)
if timestamp is not None:
query_params["timestamp"] = format_utc_iso_date(
timestamp, "%Y-%m-%dT%H:%M:%SZ"
)
visit_url = reverse("browse-origin-directory", query_params=query_params)
visit_info["url"] = visit_url
branches_url = reverse("browse-origin-branches", query_params=query_params)
releases_url = reverse("browse-origin-releases", query_params=query_params)
else:
assert snapshot_id is not None
branches, releases = get_snapshot_content(snapshot_id)
url_args = {"snapshot_id": snapshot_id}
branches_url = reverse("browse-snapshot-branches", url_args=url_args)
releases_url = reverse("browse-snapshot-releases", url_args=url_args)
releases = list(reversed(releases))
snapshot_sizes = service.lookup_snapshot_sizes(snapshot_id)
is_empty = sum(snapshot_sizes.values()) == 0
swh_snp_id = swhid("snapshot", snapshot_id)
if visit_info:
timestamp = format_utc_iso_date(visit_info["date"])
if origin_info:
browse_view_name = f"browse-origin-{browse_context}"
else:
browse_view_name = f"browse-snapshot-{browse_context}"
release_id = None
root_directory = None
snapshot_total_size = sum(snapshot_sizes.values())
if path is not None:
query_params["path"] = path
if snapshot_total_size and revision_id is not None:
revision = service.lookup_revision(revision_id)
root_directory = revision["directory"]
branches.append(
SnapshotBranchInfo(
name=revision_id,
revision=revision_id,
directory=root_directory,
date=revision["date"],
message=revision["message"],
url=None,
)
)
branch_name = revision_id
query_params["revision"] = revision_id
elif snapshot_total_size and release_name:
release = _get_release(releases, release_name, snapshot_id)
try:
root_directory = release["directory"]
revision_id = release["target"]
release_id = release["id"]
query_params["release"] = release_name
except Exception as exc:
sentry_sdk.capture_exception(exc)
_branch_not_found(
"release",
release_name,
snapshot_id,
snapshot_sizes,
origin_info,
timestamp,
visit_id,
)
elif snapshot_total_size:
if branch_name:
query_params["branch"] = branch_name
branch = _get_branch(branches, branch_name or "HEAD", snapshot_id)
try:
branch_name = branch["name"]
revision_id = branch["revision"]
root_directory = branch["directory"]
except Exception as exc:
sentry_sdk.capture_exception(exc)
_branch_not_found(
"branch",
branch_name,
snapshot_id,
snapshot_sizes,
origin_info,
timestamp,
visit_id,
)
for b in branches:
branch_query_params = dict(query_params)
branch_query_params.pop("release", None)
if b["name"] != b["revision"]:
branch_query_params.pop("revision", None)
branch_query_params["branch"] = b["name"]
b["url"] = reverse(
browse_view_name, url_args=url_args, query_params=branch_query_params
)
for r in releases:
release_query_params = dict(query_params)
release_query_params.pop("branch", None)
release_query_params.pop("revision", None)
release_query_params["release"] = r["name"]
r["url"] = reverse(
browse_view_name, url_args=url_args, query_params=release_query_params,
)
revision_info = None
if revision_id:
try:
revision_info = service.lookup_revision(revision_id)
except NotFoundExc:
pass
else:
revision_info["date"] = format_utc_iso_date(revision_info["date"])
revision_info["committer_date"] = format_utc_iso_date(
revision_info["committer_date"]
)
if revision_info["message"]:
message_lines = revision_info["message"].split("\n")
revision_info["message_header"] = message_lines[0]
else:
revision_info["message_header"] = ""
snapshot_context = SnapshotContext(
branch=branch_name,
branches=branches,
branches_url=branches_url,
is_empty=is_empty,
origin_info=origin_info,
origin_visits_url=origin_visits_url,
release=release_name,
release_id=release_id,
query_params=query_params,
releases=releases,
releases_url=releases_url,
revision_id=revision_id,
revision_info=revision_info,
root_directory=root_directory,
snapshot_id=snapshot_id,
snapshot_sizes=snapshot_sizes,
snapshot_swhid=swh_snp_id,
url_args=url_args,
visit_info=visit_info,
)
if revision_info:
revision_info["revision_url"] = gen_revision_url(revision_id, snapshot_context)
return snapshot_context
def _build_breadcrumbs(snapshot_context: SnapshotContext, path: str):
origin_info = snapshot_context["origin_info"]
url_args = snapshot_context["url_args"]
query_params = dict(snapshot_context["query_params"])
root_directory = snapshot_context["root_directory"]
path_info = gen_path_info(path)
if origin_info:
browse_view_name = "browse-origin-directory"
else:
browse_view_name = "browse-snapshot-directory"
breadcrumbs = []
if root_directory:
query_params.pop("path", None)
breadcrumbs.append(
{
"name": root_directory[:7],
"url": reverse(
browse_view_name, url_args=url_args, query_params=query_params
),
}
)
for pi in path_info:
query_params["path"] = pi["path"]
breadcrumbs.append(
{
"name": pi["name"],
"url": reverse(
browse_view_name, url_args=url_args, query_params=query_params
),
}
)
return breadcrumbs
def _check_origin_url(snapshot_id, origin_url):
if snapshot_id is None and origin_url is None:
raise BadInputExc("An origin URL must be provided as query parameter.")
def browse_snapshot_directory(
request, snapshot_id=None, origin_url=None, timestamp=None, path=None
):
"""
Django view implementation for browsing a directory in a snapshot context.
"""
try:
_check_origin_url(snapshot_id, origin_url)
snapshot_context = get_snapshot_context(
snapshot_id=snapshot_id,
origin_url=origin_url,
timestamp=timestamp,
visit_id=request.GET.get("visit_id"),
path=path,
browse_context="directory",
branch_name=request.GET.get("branch"),
release_name=request.GET.get("release"),
revision_id=request.GET.get("revision"),
)
root_directory = snapshot_context["root_directory"]
sha1_git = root_directory
if root_directory and path:
dir_info = service.lookup_directory_with_path(root_directory, path)
sha1_git = dir_info["target"]
dirs = []
files = []
if sha1_git:
dirs, files = get_directory_entries(sha1_git)
except Exception as exc:
return handle_view_exception(request, exc)
origin_info = snapshot_context["origin_info"]
visit_info = snapshot_context["visit_info"]
url_args = snapshot_context["url_args"]
query_params = dict(snapshot_context["query_params"])
revision_id = snapshot_context["revision_id"]
snapshot_id = snapshot_context["snapshot_id"]
if origin_info:
browse_view_name = "browse-origin-directory"
else:
browse_view_name = "browse-snapshot-directory"
breadcrumbs = _build_breadcrumbs(snapshot_context, path)
path = "" if path is None else (path + "/")
for d in dirs:
if d["type"] == "rev":
d["url"] = reverse("browse-revision", url_args={"sha1_git": d["target"]})
else:
query_params["path"] = path + d["name"]
d["url"] = reverse(
browse_view_name, url_args=url_args, query_params=query_params
)
sum_file_sizes = 0
readmes = {}
if origin_info:
browse_view_name = "browse-origin-content"
else:
browse_view_name = "browse-snapshot-content"
for f in files:
query_params["path"] = path + f["name"]
f["url"] = reverse(
browse_view_name, url_args=url_args, query_params=query_params
)
if f["length"] is not None:
sum_file_sizes += f["length"]
f["length"] = filesizeformat(f["length"])
if f["name"].lower().startswith("readme"):
readmes[f["name"]] = f["checksums"]["sha1"]
readme_name, readme_url, readme_html = get_readme_to_display(readmes)
if origin_info:
browse_view_name = "browse-origin-log"
else:
browse_view_name = "browse-snapshot-log"
history_url = None
if snapshot_id != _empty_snapshot_id:
query_params.pop("path", None)
history_url = reverse(
browse_view_name, url_args=url_args, query_params=query_params
)
nb_files = None
nb_dirs = None
dir_path = None
if root_directory:
nb_files = len(files)
nb_dirs = len(dirs)
sum_file_sizes = filesizeformat(sum_file_sizes)
dir_path = "/" + path
browse_dir_link = gen_directory_link(sha1_git)
browse_rev_link = gen_revision_link(revision_id)
browse_snp_link = gen_snapshot_link(snapshot_id)
revision_found = True
if sha1_git is None and revision_id is not None:
try:
service.lookup_revision(revision_id)
except NotFoundExc:
revision_found = False
swh_objects = [
SWHObjectInfo(object_type=DIRECTORY, object_id=sha1_git),
SWHObjectInfo(object_type=REVISION, object_id=revision_id),
SWHObjectInfo(object_type=SNAPSHOT, object_id=snapshot_id),
]
visit_date = None
visit_type = None
if visit_info:
visit_date = format_utc_iso_date(visit_info["date"])
visit_type = visit_info["type"]
release_id = snapshot_context["release_id"]
browse_rel_link = None
if release_id:
swh_objects.append(SWHObjectInfo(object_type=RELEASE, object_id=release_id))
browse_rel_link = gen_release_link(release_id)
dir_metadata = DirectoryMetadata(
object_type=DIRECTORY,
object_id=sha1_git,
directory=sha1_git,
directory_url=browse_dir_link,
nb_files=nb_files,
nb_dirs=nb_dirs,
sum_file_sizes=sum_file_sizes,
root_directory=root_directory,
path=dir_path,
revision=revision_id,
revision_found=revision_found,
revision_url=browse_rev_link,
release=release_id,
release_url=browse_rel_link,
snapshot=snapshot_id,
snapshot_url=browse_snp_link,
origin_url=origin_url,
visit_date=visit_date,
visit_type=visit_type,
)
vault_cooking = {
"directory_context": True,
"directory_id": sha1_git,
"revision_context": True,
"revision_id": revision_id,
}
swhids_info = get_swhids_info(swh_objects, snapshot_context, dir_metadata)
dir_path = "/".join([bc["name"] for bc in breadcrumbs]) + "/"
context_found = "snapshot: %s" % snapshot_context["snapshot_id"]
if origin_info:
context_found = "origin: %s" % origin_info["url"]
heading = "Directory - %s - %s - %s" % (
dir_path,
snapshot_context["branch"],
context_found,
)
top_right_link = None
if not snapshot_context["is_empty"]:
top_right_link = {
"url": history_url,
"icon": swh_object_icons["revisions history"],
"text": "History",
}
return render(
request,
"browse/directory.html",
{
"heading": heading,
"swh_object_name": "Directory",
"swh_object_metadata": dir_metadata,
"dirs": dirs,
"files": files,
"breadcrumbs": breadcrumbs if root_directory else [],
"top_right_link": top_right_link,
"readme_name": readme_name,
"readme_url": readme_url,
"readme_html": readme_html,
"snapshot_context": snapshot_context,
"vault_cooking": vault_cooking,
"show_actions": True,
"swhids_info": swhids_info,
},
)
def browse_snapshot_content(
request,
snapshot_id=None,
origin_url=None,
timestamp=None,
path=None,
selected_language=None,
):
"""
Django view implementation for browsing a content in a snapshot context.
"""
try:
_check_origin_url(snapshot_id, origin_url)
if path is None:
raise BadInputExc("The path of a content must be given as query parameter.")
snapshot_context = get_snapshot_context(
snapshot_id=snapshot_id,
origin_url=origin_url,
timestamp=timestamp,
visit_id=request.GET.get("visit_id"),
path=path,
browse_context="content",
branch_name=request.GET.get("branch"),
release_name=request.GET.get("release"),
revision_id=request.GET.get("revision"),
)
root_directory = snapshot_context["root_directory"]
sha1_git = None
query_string = None
content_data = {}
directory_id = None
split_path = path.split("/")
filename = split_path[-1]
filepath = path[: -len(filename)]
if root_directory:
content_info = service.lookup_directory_with_path(root_directory, path)
sha1_git = content_info["target"]
query_string = "sha1_git:" + sha1_git
content_data = request_content(query_string, raise_if_unavailable=False)
if filepath:
dir_info = service.lookup_directory_with_path(root_directory, filepath)
directory_id = dir_info["target"]
else:
directory_id = root_directory
except Exception as exc:
return handle_view_exception(request, exc)
revision_id = snapshot_context["revision_id"]
origin_info = snapshot_context["origin_info"]
visit_info = snapshot_context["visit_info"]
snapshot_id = snapshot_context["snapshot_id"]
if content_data.get("raw_data") is not None:
content_display_data = prepare_content_for_display(
content_data["raw_data"], content_data["mimetype"], path
)
content_data.update(content_display_data)
# Override language with user-selected language
if selected_language is not None:
content_data["language"] = selected_language
available_languages = None
if content_data.get("mimetype") is not None and "text/" in content_data["mimetype"]:
available_languages = highlightjs.get_supported_languages()
breadcrumbs = _build_breadcrumbs(snapshot_context, filepath)
breadcrumbs.append({"name": filename, "url": None})
browse_content_link = gen_content_link(sha1_git)
content_raw_url = None
if query_string:
content_raw_url = reverse(
"browse-content-raw",
url_args={"query_string": query_string},
query_params={"filename": filename},
)
browse_rev_link = gen_revision_link(revision_id)
browse_dir_link = gen_directory_link(directory_id)
content_checksums = content_data.get("checksums", {})
swh_objects = [
SWHObjectInfo(object_type=CONTENT, object_id=content_checksums.get("sha1_git")),
SWHObjectInfo(object_type=DIRECTORY, object_id=directory_id),
SWHObjectInfo(object_type=REVISION, object_id=revision_id),
SWHObjectInfo(object_type=SNAPSHOT, object_id=snapshot_id),
]
visit_date = None
visit_type = None
if visit_info:
visit_date = format_utc_iso_date(visit_info["date"])
visit_type = visit_info["type"]
release_id = snapshot_context["release_id"]
browse_rel_link = None
if release_id:
swh_objects.append(SWHObjectInfo(object_type=RELEASE, object_id=release_id))
browse_rel_link = gen_release_link(release_id)
content_metadata = ContentMetadata(
object_type=CONTENT,
object_id=content_checksums.get("sha1_git"),
sha1=content_checksums.get("sha1"),
sha1_git=content_checksums.get("sha1_git"),
sha256=content_checksums.get("sha256"),
blake2s256=content_checksums.get("blake2s256"),
content_url=browse_content_link,
mimetype=content_data.get("mimetype"),
encoding=content_data.get("encoding"),
size=filesizeformat(content_data.get("length", 0)),
language=content_data.get("language"),
licenses=content_data.get("licenses"),
root_directory=root_directory,
path=f"/{filepath}",
filename=filename,
directory=directory_id,
directory_url=browse_dir_link,
revision=revision_id,
revision_url=browse_rev_link,
release=release_id,
release_url=browse_rel_link,
snapshot=snapshot_id,
snapshot_url=gen_snapshot_link(snapshot_id),
origin_url=origin_url,
visit_date=visit_date,
visit_type=visit_type,
)
swhids_info = get_swhids_info(swh_objects, snapshot_context, content_metadata)
content_path = "/".join([bc["name"] for bc in breadcrumbs])
context_found = "snapshot: %s" % snapshot_context["snapshot_id"]
if origin_info:
context_found = "origin: %s" % origin_info["url"]
heading = "Content - %s - %s - %s" % (
content_path,
snapshot_context["branch"],
context_found,
)
top_right_link = None
if not snapshot_context["is_empty"]:
top_right_link = {
"url": content_raw_url,
"icon": swh_object_icons["content"],
"text": "Raw File",
}
return render(
request,
"browse/content.html",
{
"heading": heading,
"swh_object_name": "Content",
"swh_object_metadata": content_metadata,
"content": content_data.get("content_data"),
"content_size": content_data.get("length"),
"max_content_size": content_display_max_size,
"filename": filename,
"encoding": content_data.get("encoding"),
"mimetype": content_data.get("mimetype"),
"language": content_data.get("language"),
"available_languages": available_languages,
"breadcrumbs": breadcrumbs if root_directory else [],
"top_right_link": top_right_link,
"snapshot_context": snapshot_context,
"vault_cooking": None,
"show_actions": True,
"swhids_info": swhids_info,
"error_code": content_data.get("error_code"),
"error_message": content_data.get("error_message"),
"error_description": content_data.get("error_description"),
},
status=content_data.get("error_code", 200),
)
PER_PAGE = 100
def browse_snapshot_log(request, snapshot_id=None, origin_url=None, timestamp=None):
"""
Django view implementation for browsing a revision history in a
snapshot context.
"""
try:
_check_origin_url(snapshot_id, origin_url)
snapshot_context = get_snapshot_context(
snapshot_id=snapshot_id,
origin_url=origin_url,
timestamp=timestamp,
visit_id=request.GET.get("visit_id"),
browse_context="log",
branch_name=request.GET.get("branch"),
release_name=request.GET.get("release"),
revision_id=request.GET.get("revision"),
)
revision_id = snapshot_context["revision_id"]
per_page = int(request.GET.get("per_page", PER_PAGE))
offset = int(request.GET.get("offset", 0))
revs_ordering = request.GET.get("revs_ordering", "committer_date")
session_key = "rev_%s_log_ordering_%s" % (revision_id, revs_ordering)
rev_log_session = request.session.get(session_key, None)
rev_log = []
revs_walker_state = None
if rev_log_session:
rev_log = rev_log_session["rev_log"]
revs_walker_state = rev_log_session["revs_walker_state"]
if len(rev_log) < offset + per_page:
revs_walker = service.get_revisions_walker(
revs_ordering,
revision_id,
max_revs=offset + per_page + 1,
state=revs_walker_state,
)
rev_log += [rev["id"] for rev in revs_walker]
revs_walker_state = revs_walker.export_state()
revs = rev_log[offset : offset + per_page]
revision_log = service.lookup_revision_multiple(revs)
request.session[session_key] = {
"rev_log": rev_log,
"revs_walker_state": revs_walker_state,
}
except Exception as exc:
return handle_view_exception(request, exc)
origin_info = snapshot_context["origin_info"]
visit_info = snapshot_context["visit_info"]
url_args = snapshot_context["url_args"]
query_params = snapshot_context["query_params"]
snapshot_id = snapshot_context["snapshot_id"]
query_params["per_page"] = per_page
revs_ordering = request.GET.get("revs_ordering", "")
query_params["revs_ordering"] = revs_ordering
if origin_info:
browse_view_name = "browse-origin-log"
else:
browse_view_name = "browse-snapshot-log"
prev_log_url = None
if len(rev_log) > offset + per_page:
query_params["offset"] = offset + per_page
prev_log_url = reverse(
browse_view_name, url_args=url_args, query_params=query_params
)
next_log_url = None
if offset != 0:
query_params["offset"] = offset - per_page
next_log_url = reverse(
browse_view_name, url_args=url_args, query_params=query_params
)
revision_log_data = format_log_entries(revision_log, per_page, snapshot_context)
browse_rev_link = gen_revision_link(revision_id)
browse_log_link = gen_revision_log_link(revision_id)
browse_snp_link = gen_snapshot_link(snapshot_id)
revision_metadata = {
"context-independent revision": browse_rev_link,
"context-independent revision history": browse_log_link,
"context-independent snapshot": browse_snp_link,
"snapshot": snapshot_id,
}
if origin_info:
revision_metadata["origin url"] = origin_info["url"]
revision_metadata["origin visit date"] = format_utc_iso_date(visit_info["date"])
revision_metadata["origin visit type"] = visit_info["type"]
swh_objects = [
SWHObjectInfo(object_type=REVISION, object_id=revision_id),
SWHObjectInfo(object_type=SNAPSHOT, object_id=snapshot_id),
]
release_id = snapshot_context["release_id"]
if release_id:
swh_objects.append(SWHObjectInfo(object_type=RELEASE, object_id=release_id))
browse_rel_link = gen_release_link(release_id)
revision_metadata["release"] = release_id
revision_metadata["context-independent release"] = browse_rel_link
swhids_info = get_swhids_info(swh_objects, snapshot_context)
context_found = "snapshot: %s" % snapshot_context["snapshot_id"]
if origin_info:
context_found = "origin: %s" % origin_info["url"]
heading = "Revision history - %s - %s" % (snapshot_context["branch"], context_found)
return render(
request,
"browse/revision-log.html",
{
"heading": heading,
"swh_object_name": "Revisions history",
"swh_object_metadata": revision_metadata,
"revision_log": revision_log_data,
"revs_ordering": revs_ordering,
"next_log_url": next_log_url,
"prev_log_url": prev_log_url,
"breadcrumbs": None,
"top_right_link": None,
"snapshot_context": snapshot_context,
"vault_cooking": None,
"show_actions": True,
"swhids_info": swhids_info,
},
)
def browse_snapshot_branches(
request, snapshot_id=None, origin_url=None, timestamp=None
):
"""
Django view implementation for browsing a list of branches in a snapshot
context.
"""
try:
_check_origin_url(snapshot_id, origin_url)
snapshot_context = get_snapshot_context(
snapshot_id=snapshot_id,
origin_url=origin_url,
timestamp=timestamp,
visit_id=request.GET.get("visit_id"),
)
branches_bc = request.GET.get("branches_breadcrumbs", "")
branches_bc = branches_bc.split(",") if branches_bc else []
branches_from = branches_bc[-1] if branches_bc else ""
origin_info = snapshot_context["origin_info"]
url_args = snapshot_context["url_args"]
query_params = snapshot_context["query_params"]
if origin_info:
browse_view_name = "browse-origin-directory"
else:
browse_view_name = "browse-snapshot-directory"
snapshot = service.lookup_snapshot(
snapshot_context["snapshot_id"],
branches_from,
PER_PAGE + 1,
target_types=["revision", "alias"],
)
displayed_branches, _ = process_snapshot_branches(snapshot)
except Exception as exc:
return handle_view_exception(request, exc)
for branch in displayed_branches:
rev_query_params = {}
if origin_info:
rev_query_params["origin_url"] = origin_info["url"]
revision_url = reverse(
"browse-revision",
url_args={"sha1_git": branch["revision"]},
query_params=query_params,
)
query_params["branch"] = branch["name"]
directory_url = reverse(
browse_view_name, url_args=url_args, query_params=query_params
)
del query_params["branch"]
branch["revision_url"] = revision_url
branch["directory_url"] = directory_url
if origin_info:
browse_view_name = "browse-origin-branches"
else:
browse_view_name = "browse-snapshot-branches"
prev_branches_url = None
next_branches_url = None
if branches_bc:
query_params_prev = dict(query_params)
query_params_prev["branches_breadcrumbs"] = ",".join(branches_bc[:-1])
prev_branches_url = reverse(
browse_view_name, url_args=url_args, query_params=query_params_prev
)
elif branches_from:
prev_branches_url = reverse(
browse_view_name, url_args=url_args, query_params=query_params
)
if snapshot["next_branch"] is not None:
query_params_next = dict(query_params)
next_branch = displayed_branches[-1]["name"]
del displayed_branches[-1]
branches_bc.append(next_branch)
query_params_next["branches_breadcrumbs"] = ",".join(branches_bc)
next_branches_url = reverse(
browse_view_name, url_args=url_args, query_params=query_params_next
)
heading = "Branches - "
if origin_info:
heading += "origin: %s" % origin_info["url"]
else:
heading += "snapshot: %s" % snapshot_id
return render(
request,
"browse/branches.html",
{
"heading": heading,
"swh_object_name": "Branches",
"swh_object_metadata": {},
"top_right_link": None,
"displayed_branches": displayed_branches,
"prev_branches_url": prev_branches_url,
"next_branches_url": next_branches_url,
"snapshot_context": snapshot_context,
},
)
def browse_snapshot_releases(
request, snapshot_id=None, origin_url=None, timestamp=None
):
"""
Django view implementation for browsing a list of releases in a snapshot
context.
"""
try:
_check_origin_url(snapshot_id, origin_url)
snapshot_context = get_snapshot_context(
snapshot_id=snapshot_id,
origin_url=origin_url,
timestamp=timestamp,
visit_id=request.GET.get("visit_id"),
)
rel_bc = request.GET.get("releases_breadcrumbs", "")
rel_bc = rel_bc.split(",") if rel_bc else []
rel_from = rel_bc[-1] if rel_bc else ""
origin_info = snapshot_context["origin_info"]
url_args = snapshot_context["url_args"]
query_params = snapshot_context["query_params"]
snapshot = service.lookup_snapshot(
snapshot_context["snapshot_id"],
rel_from,
PER_PAGE + 1,
target_types=["release", "alias"],
)
_, displayed_releases = process_snapshot_branches(snapshot)
except Exception as exc:
return handle_view_exception(request, exc)
for release in displayed_releases:
query_params_tgt = {"snapshot": snapshot_id}
if origin_info:
query_params_tgt["origin_url"] = origin_info["url"]
release_url = reverse(
"browse-release",
url_args={"sha1_git": release["id"]},
query_params=query_params_tgt,
)
target_url = ""
if release["target_type"] == "revision":
target_url = reverse(
"browse-revision",
url_args={"sha1_git": release["target"]},
query_params=query_params_tgt,
)
elif release["target_type"] == "directory":
target_url = reverse(
"browse-directory",
url_args={"sha1_git": release["target"]},
query_params=query_params_tgt,
)
elif release["target_type"] == "content":
target_url = reverse(
"browse-content",
url_args={"query_string": release["target"]},
query_params=query_params_tgt,
)
elif release["target_type"] == "release":
target_url = reverse(
"browse-release",
url_args={"sha1_git": release["target"]},
query_params=query_params_tgt,
)
release["release_url"] = release_url
release["target_url"] = target_url
if origin_info:
browse_view_name = "browse-origin-releases"
else:
browse_view_name = "browse-snapshot-releases"
prev_releases_url = None
next_releases_url = None
if rel_bc:
query_params_prev = dict(query_params)
query_params_prev["releases_breadcrumbs"] = ",".join(rel_bc[:-1])
prev_releases_url = reverse(
browse_view_name, url_args=url_args, query_params=query_params_prev
)
elif rel_from:
prev_releases_url = reverse(
browse_view_name, url_args=url_args, query_params=query_params
)
if snapshot["next_branch"] is not None:
query_params_next = dict(query_params)
next_rel = displayed_releases[-1]["branch_name"]
del displayed_releases[-1]
rel_bc.append(next_rel)
query_params_next["releases_breadcrumbs"] = ",".join(rel_bc)
next_releases_url = reverse(
browse_view_name, url_args=url_args, query_params=query_params_next
)
heading = "Releases - "
if origin_info:
heading += "origin: %s" % origin_info["url"]
else:
heading += "snapshot: %s" % snapshot_id
return render(
request,
"browse/releases.html",
{
"heading": heading,
"top_panel_visible": False,
"top_panel_collapsible": False,
"swh_object_name": "Releases",
"swh_object_metadata": {},
"top_right_link": None,
"displayed_releases": displayed_releases,
"prev_releases_url": prev_releases_url,
"next_releases_url": next_releases_url,
"snapshot_context": snapshot_context,
"vault_cooking": None,
"show_actions": False,
},
)
diff --git a/swh/web/browse/views/origin.py b/swh/web/browse/views/origin.py
index 4ca0d2c43..5bda0c278 100644
--- a/swh/web/browse/views/origin.py
+++ b/swh/web/browse/views/origin.py
@@ -1,320 +1,320 @@
# Copyright (C) 2017-2020 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU Affero General Public License version 3, or any later version
# See top-level LICENSE file for more information
from django.shortcuts import render, redirect
from swh.web.browse.browseurls import browse_route
from swh.web.browse.snapshot_context import (
browse_snapshot_directory,
browse_snapshot_content,
browse_snapshot_log,
browse_snapshot_branches,
browse_snapshot_releases,
get_snapshot_context,
)
from swh.web.common import service
from swh.web.common.exc import handle_view_exception, BadInputExc
from swh.web.common.origin_visits import get_origin_visits
-from swh.web.common.utils import reverse, format_utc_iso_date, parse_timestamp
+from swh.web.common.utils import reverse, format_utc_iso_date, parse_iso8601_date_to_utc
@browse_route(
r"origin/directory/", view_name="browse-origin-directory",
)
def origin_directory_browse(request):
"""Django view for browsing the content of a directory associated
to an origin for a given visit.
The URL that points to it is :http:get:`/browse/origin/directory/`
"""
return browse_snapshot_directory(
request,
origin_url=request.GET.get("origin_url"),
snapshot_id=request.GET.get("snapshot"),
timestamp=request.GET.get("timestamp"),
path=request.GET.get("path"),
)
@browse_route(
r"origin/(?P<origin_url>.+)/visit/(?P<timestamp>.+)/directory/",
r"origin/(?P<origin_url>.+)/visit/(?P<timestamp>.+)/directory/(?P<path>.+)/",
r"origin/(?P<origin_url>.+)/directory/(?P<path>.+)/",
r"origin/(?P<origin_url>.+)/directory/",
view_name="browse-origin-directory-legacy",
)
def origin_directory_browse_legacy(request, origin_url, timestamp=None, path=None):
"""Django view for browsing the content of a directory associated
to an origin for a given visit.
The URLs that point to it are
:http:get:`/browse/origin/(origin_url)/directory/[(path)/]` and
:http:get:`/browse/origin/(origin_url)/visit/(timestamp)/directory/[(path)/]`
"""
return browse_snapshot_directory(
request,
origin_url=origin_url,
snapshot_id=request.GET.get("snapshot"),
timestamp=timestamp,
path=path,
)
@browse_route(
r"origin/content/", view_name="browse-origin-content",
)
def origin_content_browse(request):
"""Django view that produces an HTML display of a content
associated to an origin for a given visit.
The URL that points to it is :http:get:`/browse/origin/content/`
"""
return browse_snapshot_content(
request,
origin_url=request.GET.get("origin_url"),
snapshot_id=request.GET.get("snapshot"),
timestamp=request.GET.get("timestamp"),
path=request.GET.get("path"),
selected_language=request.GET.get("language"),
)
@browse_route(
r"origin/(?P<origin_url>.+)/visit/(?P<timestamp>.+)/content/(?P<path>.+)/",
r"origin/(?P<origin_url>.+)/content/(?P<path>.+)/",
r"origin/(?P<origin_url>.+)/content/",
view_name="browse-origin-content-legacy",
)
def origin_content_browse_legacy(request, origin_url, path=None, timestamp=None):
"""Django view that produces an HTML display of a content
associated to an origin for a given visit.
The URLs that point to it are
:http:get:`/browse/origin/(origin_url)/content/(path)/` and
:http:get:`/browse/origin/(origin_url)/visit/(timestamp)/content/(path)/`
"""
return browse_snapshot_content(
request,
origin_url=origin_url,
snapshot_id=request.GET.get("snapshot"),
timestamp=timestamp,
path=path,
selected_language=request.GET.get("language"),
)
@browse_route(
r"origin/log/", view_name="browse-origin-log",
)
def origin_log_browse(request):
"""Django view that produces an HTML display of revisions history (aka
the commit log) associated to a software origin.
The URL that points to it is :http:get:`/browse/origin/log/`
"""
return browse_snapshot_log(
request,
origin_url=request.GET.get("origin_url"),
snapshot_id=request.GET.get("snapshot"),
timestamp=request.GET.get("timestamp"),
)
@browse_route(
r"origin/(?P<origin_url>.+)/visit/(?P<timestamp>.+)/log/",
r"origin/(?P<origin_url>.+)/log/",
view_name="browse-origin-log-legacy",
)
def origin_log_browse_legacy(request, origin_url, timestamp=None):
"""Django view that produces an HTML display of revisions history (aka
the commit log) associated to a software origin.
The URLs that point to it are
:http:get:`/browse/origin/(origin_url)/log/` and
:http:get:`/browse/origin/(origin_url)/visit/(timestamp)/log/`
"""
return browse_snapshot_log(
request,
origin_url=origin_url,
snapshot_id=request.GET.get("snapshot"),
timestamp=timestamp,
)
@browse_route(
r"origin/branches/", view_name="browse-origin-branches",
)
def origin_branches_browse(request):
"""Django view that produces an HTML display of the list of branches
associated to an origin for a given visit.
The URL that points to it is :http:get:`/browse/origin/branches/`
"""
return browse_snapshot_branches(
request,
origin_url=request.GET.get("origin_url"),
snapshot_id=request.GET.get("snapshot"),
timestamp=request.GET.get("timestamp"),
)
@browse_route(
r"origin/(?P<origin_url>.+)/visit/(?P<timestamp>.+)/branches/",
r"origin/(?P<origin_url>.+)/branches/",
view_name="browse-origin-branches-legacy",
)
def origin_branches_browse_legacy(request, origin_url, timestamp=None):
"""Django view that produces an HTML display of the list of branches
associated to an origin for a given visit.
The URLs that point to it are
:http:get:`/browse/origin/(origin_url)/branches/` and
:http:get:`/browse/origin/(origin_url)/visit/(timestamp)/branches/`
"""
return browse_snapshot_branches(
request,
origin_url=origin_url,
snapshot_id=request.GET.get("snapshot"),
timestamp=timestamp,
)
@browse_route(
r"origin/releases/", view_name="browse-origin-releases",
)
def origin_releases_browse(request):
"""Django view that produces an HTML display of the list of releases
associated to an origin for a given visit.
The URL that points to it is :http:get:`/browse/origin/releases/`
"""
return browse_snapshot_releases(
request,
origin_url=request.GET.get("origin_url"),
snapshot_id=request.GET.get("snapshot"),
timestamp=request.GET.get("timestamp"),
)
@browse_route(
r"origin/(?P<origin_url>.+)/visit/(?P<timestamp>.+)/releases/",
r"origin/(?P<origin_url>.+)/releases/",
view_name="browse-origin-releases-legacy",
)
def origin_releases_browse_legacy(request, origin_url, timestamp=None):
"""Django view that produces an HTML display of the list of releases
associated to an origin for a given visit.
The URLs that point to it are
:http:get:`/browse/origin/(origin_url)/releases/` and
:http:get:`/browse/origin/(origin_url)/visit/(timestamp)/releases/`
"""
return browse_snapshot_releases(
request,
origin_url=origin_url,
snapshot_id=request.GET.get("snapshot"),
timestamp=timestamp,
)
def _origin_visits_browse(request, origin_url):
try:
if origin_url is None:
raise BadInputExc("An origin URL must be provided as query parameter.")
origin_info = service.lookup_origin({"url": origin_url})
origin_visits = get_origin_visits(origin_info)
snapshot_context = get_snapshot_context(origin_url=origin_url)
except Exception as exc:
return handle_view_exception(request, exc)
for i, visit in enumerate(origin_visits):
url_date = format_utc_iso_date(visit["date"], "%Y-%m-%dT%H:%M:%SZ")
visit["formatted_date"] = format_utc_iso_date(visit["date"])
query_params = {"origin_url": origin_url, "timestamp": url_date}
if i < len(origin_visits) - 1:
if visit["date"] == origin_visits[i + 1]["date"]:
query_params = {"visit_id": visit["visit"]}
if i > 0:
if visit["date"] == origin_visits[i - 1]["date"]:
query_params = {"visit_id": visit["visit"]}
snapshot = visit["snapshot"] if visit["snapshot"] else ""
visit["url"] = reverse("browse-origin-directory", query_params=query_params,)
if not snapshot:
visit["snapshot"] = ""
- visit["date"] = parse_timestamp(visit["date"]).timestamp()
+ visit["date"] = parse_iso8601_date_to_utc(visit["date"]).timestamp()
heading = "Origin visits - %s" % origin_url
return render(
request,
"browse/origin-visits.html",
{
"heading": heading,
"swh_object_name": "Visits",
"swh_object_metadata": origin_info,
"origin_visits": origin_visits,
"origin_info": origin_info,
"snapshot_context": snapshot_context,
"vault_cooking": None,
"show_actions": False,
},
)
@browse_route(r"origin/visits/", view_name="browse-origin-visits")
def origin_visits_browse(request):
"""Django view that produces an HTML display of visits reporting
for a given origin.
The URL that points to it is
:http:get:`/browse/origin/visits/`.
"""
return _origin_visits_browse(request, request.GET.get("origin_url"))
@browse_route(
r"origin/(?P<origin_url>.+)/visits/", view_name="browse-origin-visits-legacy"
)
def origin_visits_browse_legacy(request, origin_url):
"""Django view that produces an HTML display of visits reporting
for a given origin.
The URL that points to it is
:http:get:`/browse/origin/(origin_url)/visits/`.
"""
return _origin_visits_browse(request, origin_url)
@browse_route(r"origin/", view_name="browse-origin")
def origin_browse(request):
"""Django view that redirects to the display of the latest archived
snapshot for a given software origin.
"""
last_snapshot_url = reverse("browse-origin-directory", query_params=request.GET,)
return redirect(last_snapshot_url)
@browse_route(r"origin/(?P<origin_url>.+)/", view_name="browse-origin-legacy")
def origin_browse_legacy(request, origin_url):
"""Django view that redirects to the display of the latest archived
snapshot for a given software origin.
"""
last_snapshot_url = reverse(
"browse-origin-directory",
query_params={"origin_url": origin_url, **request.GET},
)
return redirect(last_snapshot_url)
diff --git a/swh/web/common/origin_save.py b/swh/web/common/origin_save.py
index 58d4fdf0f..9632cdbb7 100644
--- a/swh/web/common/origin_save.py
+++ b/swh/web/common/origin_save.py
@@ -1,620 +1,620 @@
# Copyright (C) 2018-2020 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU Affero General Public License version 3, or any later version
# See top-level LICENSE file for more information
from bisect import bisect_right
from datetime import datetime, timezone, timedelta
from itertools import product
import json
import logging
from typing import Any, Dict
from django.core.exceptions import ObjectDoesNotExist
from django.core.exceptions import ValidationError
from django.core.validators import URLValidator
from django.utils.html import escape
from prometheus_client import Gauge
import requests
import sentry_sdk
from swh.web import config
from swh.web.common import service
from swh.web.common.exc import BadInputExc, ForbiddenExc, NotFoundExc
from swh.web.common.models import (
SaveUnauthorizedOrigin,
SaveAuthorizedOrigin,
SaveOriginRequest,
SAVE_REQUEST_ACCEPTED,
SAVE_REQUEST_REJECTED,
SAVE_REQUEST_PENDING,
SAVE_TASK_NOT_YET_SCHEDULED,
SAVE_TASK_SCHEDULED,
SAVE_TASK_SUCCEED,
SAVE_TASK_FAILED,
SAVE_TASK_RUNNING,
SAVE_TASK_NOT_CREATED,
)
from swh.web.common.origin_visits import get_origin_visits
-from swh.web.common.utils import parse_timestamp, SWH_WEB_METRICS_REGISTRY
+from swh.web.common.utils import parse_iso8601_date_to_utc, SWH_WEB_METRICS_REGISTRY
from swh.scheduler.utils import create_oneshot_task_dict
scheduler = config.scheduler()
logger = logging.getLogger(__name__)
def get_origin_save_authorized_urls():
"""
Get the list of origin url prefixes authorized to be
immediately loaded into the archive (whitelist).
Returns:
list: The list of authorized origin url prefix
"""
return [origin.url for origin in SaveAuthorizedOrigin.objects.all()]
def get_origin_save_unauthorized_urls():
"""
Get the list of origin url prefixes forbidden to be
loaded into the archive (blacklist).
Returns:
list: the list of unauthorized origin url prefix
"""
return [origin.url for origin in SaveUnauthorizedOrigin.objects.all()]
def can_save_origin(origin_url):
"""
Check if a software origin can be saved into the archive.
Based on the origin url, the save request will be either:
* immediately accepted if the url is whitelisted
* rejected if the url is blacklisted
* put in pending state for manual review otherwise
Args:
origin_url (str): the software origin url to check
Returns:
str: the origin save request status, either **accepted**,
**rejected** or **pending**
"""
# origin url may be blacklisted
for url_prefix in get_origin_save_unauthorized_urls():
if origin_url.startswith(url_prefix):
return SAVE_REQUEST_REJECTED
# if the origin url is in the white list, it can be immediately saved
for url_prefix in get_origin_save_authorized_urls():
if origin_url.startswith(url_prefix):
return SAVE_REQUEST_ACCEPTED
# otherwise, the origin url needs to be manually verified
return SAVE_REQUEST_PENDING
# map visit type to scheduler task
# TODO: do not hardcode the task name here (T1157)
_visit_type_task = {"git": "load-git", "hg": "load-hg", "svn": "load-svn"}
# map scheduler task status to origin save status
_save_task_status = {
"next_run_not_scheduled": SAVE_TASK_NOT_YET_SCHEDULED,
"next_run_scheduled": SAVE_TASK_SCHEDULED,
"completed": SAVE_TASK_SUCCEED,
"disabled": SAVE_TASK_FAILED,
}
def get_savable_visit_types():
return sorted(list(_visit_type_task.keys()))
def _check_visit_type_savable(visit_type):
"""
Get the list of visit types that can be performed
through a save request.
Returns:
list: the list of saveable visit types
"""
allowed_visit_types = ", ".join(get_savable_visit_types())
if visit_type not in _visit_type_task:
raise BadInputExc(
"Visit of type %s can not be saved! "
"Allowed types are the following: %s" % (visit_type, allowed_visit_types)
)
_validate_url = URLValidator(schemes=["http", "https", "svn", "git"])
def _check_origin_url_valid(origin_url):
try:
_validate_url(origin_url)
except ValidationError:
raise BadInputExc(
"The provided origin url (%s) is not valid!" % escape(origin_url)
)
def _get_visit_info_for_save_request(save_request):
visit_date = None
visit_status = None
time_now = datetime.now(tz=timezone.utc)
time_delta = time_now - save_request.request_date
# stop trying to find a visit date one month after save request submission
# as those requests to storage are expensive and associated loading task
# surely ended up with errors
if time_delta.days <= 30:
try:
origin = {"url": save_request.origin_url}
origin_info = service.lookup_origin(origin)
origin_visits = get_origin_visits(origin_info)
- visit_dates = [parse_timestamp(v["date"]) for v in origin_visits]
+ visit_dates = [parse_iso8601_date_to_utc(v["date"]) for v in origin_visits]
i = bisect_right(visit_dates, save_request.request_date)
if i != len(visit_dates):
visit_date = visit_dates[i]
visit_status = origin_visits[i]["status"]
if origin_visits[i]["status"] == "ongoing":
visit_date = None
except Exception as exc:
sentry_sdk.capture_exception(exc)
return visit_date, visit_status
def _check_visit_update_status(save_request, save_task_status):
visit_date, visit_status = _get_visit_info_for_save_request(save_request)
save_request.visit_date = visit_date
# visit has been performed, mark the saving task as succeed
if visit_date and visit_status is not None:
save_task_status = SAVE_TASK_SUCCEED
elif visit_status == "ongoing":
save_task_status = SAVE_TASK_RUNNING
else:
time_now = datetime.now(tz=timezone.utc)
time_delta = time_now - save_request.request_date
# consider the task as failed if it is still in scheduled state
# 30 days after its submission
if time_delta.days > 30:
save_task_status = SAVE_TASK_FAILED
return visit_date, save_task_status
def _save_request_dict(save_request, task=None):
must_save = False
visit_date = save_request.visit_date
# save task still in scheduler db
if task:
save_task_status = _save_task_status[task["status"]]
# Consider request from which a visit date has already been found
# as succeeded to avoid retrieving it again
if save_task_status == SAVE_TASK_SCHEDULED and visit_date:
save_task_status = SAVE_TASK_SUCCEED
if save_task_status in (SAVE_TASK_FAILED, SAVE_TASK_SUCCEED) and not visit_date:
visit_date, _ = _get_visit_info_for_save_request(save_request)
save_request.visit_date = visit_date
must_save = True
# Check tasks still marked as scheduled / not yet scheduled
if save_task_status in (SAVE_TASK_SCHEDULED, SAVE_TASK_NOT_YET_SCHEDULED):
visit_date, save_task_status = _check_visit_update_status(
save_request, save_task_status
)
# save task may have been archived
else:
save_task_status = save_request.loading_task_status
if save_task_status in (SAVE_TASK_SCHEDULED, SAVE_TASK_NOT_YET_SCHEDULED):
visit_date, save_task_status = _check_visit_update_status(
save_request, save_task_status
)
else:
save_task_status = save_request.loading_task_status
if save_request.loading_task_status != save_task_status:
save_request.loading_task_status = save_task_status
must_save = True
if must_save:
save_request.save()
return {
"id": save_request.id,
"visit_type": save_request.visit_type,
"origin_url": save_request.origin_url,
"save_request_date": save_request.request_date.isoformat(),
"save_request_status": save_request.status,
"save_task_status": save_task_status,
"visit_date": visit_date.isoformat() if visit_date else None,
}
def create_save_origin_request(visit_type, origin_url):
"""
Create a loading task to save a software origin into the archive.
This function aims to create a software origin loading task
trough the use of the swh-scheduler component.
First, some checks are performed to see if the visit type and origin
url are valid but also if the the save request can be accepted.
If those checks passed, the loading task is then created.
Otherwise, the save request is put in pending or rejected state.
All the submitted save requests are logged into the swh-web
database to keep track of them.
Args:
visit_type (str): the type of visit to perform (currently only
``git`` but ``svn`` and ``hg`` will soon be available)
origin_url (str): the url of the origin to save
Raises:
BadInputExc: the visit type or origin url is invalid
ForbiddenExc: the provided origin url is blacklisted
Returns:
dict: A dict describing the save request with the following keys:
* **visit_type**: the type of visit to perform
* **origin_url**: the url of the origin
* **save_request_date**: the date the request was submitted
* **save_request_status**: the request status, either **accepted**,
**rejected** or **pending**
* **save_task_status**: the origin loading task status, either
**not created**, **not yet scheduled**, **scheduled**,
**succeed** or **failed**
"""
_check_visit_type_savable(visit_type)
_check_origin_url_valid(origin_url)
save_request_status = can_save_origin(origin_url)
task = None
# if the origin save request is accepted, create a scheduler
# task to load it into the archive
if save_request_status == SAVE_REQUEST_ACCEPTED:
# create a task with high priority
kwargs = {
"priority": "high",
"url": origin_url,
}
sor = None
# get list of previously sumitted save requests
current_sors = list(
SaveOriginRequest.objects.filter(
visit_type=visit_type, origin_url=origin_url
)
)
can_create_task = False
# if no save requests previously submitted, create the scheduler task
if not current_sors:
can_create_task = True
else:
# get the latest submitted save request
sor = current_sors[0]
# if it was in pending state, we need to create the scheduler task
# and update the save request info in the database
if sor.status == SAVE_REQUEST_PENDING:
can_create_task = True
# a task has already been created to load the origin
elif sor.loading_task_id != -1:
# get the scheduler task and its status
tasks = scheduler.get_tasks([sor.loading_task_id])
task = tasks[0] if tasks else None
task_status = _save_request_dict(sor, task)["save_task_status"]
# create a new scheduler task only if the previous one has been
# already executed
if task_status == SAVE_TASK_FAILED or task_status == SAVE_TASK_SUCCEED:
can_create_task = True
sor = None
else:
can_create_task = False
if can_create_task:
# effectively create the scheduler task
task_dict = create_oneshot_task_dict(_visit_type_task[visit_type], **kwargs)
task = scheduler.create_tasks([task_dict])[0]
# pending save request has been accepted
if sor:
sor.status = SAVE_REQUEST_ACCEPTED
sor.loading_task_id = task["id"]
sor.save()
else:
sor = SaveOriginRequest.objects.create(
visit_type=visit_type,
origin_url=origin_url,
status=save_request_status,
loading_task_id=task["id"],
)
# save request must be manually reviewed for acceptation
elif save_request_status == SAVE_REQUEST_PENDING:
# check if there is already such a save request already submitted,
# no need to add it to the database in that case
try:
sor = SaveOriginRequest.objects.get(
visit_type=visit_type, origin_url=origin_url, status=save_request_status
)
# if not add it to the database
except ObjectDoesNotExist:
sor = SaveOriginRequest.objects.create(
visit_type=visit_type, origin_url=origin_url, status=save_request_status
)
# origin can not be saved as its url is blacklisted,
# log the request to the database anyway
else:
sor = SaveOriginRequest.objects.create(
visit_type=visit_type, origin_url=origin_url, status=save_request_status
)
if save_request_status == SAVE_REQUEST_REJECTED:
raise ForbiddenExc(
(
'The "save code now" request has been rejected '
"because the provided origin url is blacklisted."
)
)
return _save_request_dict(sor, task)
def get_save_origin_requests_from_queryset(requests_queryset):
"""
Get all save requests from a SaveOriginRequest queryset.
Args:
requests_queryset (django.db.models.QuerySet): input
SaveOriginRequest queryset
Returns:
list: A list of save origin requests dict as described in
:func:`swh.web.common.origin_save.create_save_origin_request`
"""
task_ids = []
for sor in requests_queryset:
task_ids.append(sor.loading_task_id)
save_requests = []
if task_ids:
tasks = scheduler.get_tasks(task_ids)
tasks = {task["id"]: task for task in tasks}
for sor in requests_queryset:
sr_dict = _save_request_dict(sor, tasks.get(sor.loading_task_id))
save_requests.append(sr_dict)
return save_requests
def get_save_origin_requests(visit_type, origin_url):
"""
Get all save requests for a given software origin.
Args:
visit_type (str): the type of visit
origin_url (str): the url of the origin
Raises:
BadInputExc: the visit type or origin url is invalid
swh.web.common.exc.NotFoundExc: no save requests can be found for the
given origin
Returns:
list: A list of save origin requests dict as described in
:func:`swh.web.common.origin_save.create_save_origin_request`
"""
_check_visit_type_savable(visit_type)
_check_origin_url_valid(origin_url)
sors = SaveOriginRequest.objects.filter(
visit_type=visit_type, origin_url=origin_url
)
if sors.count() == 0:
raise NotFoundExc(
("No save requests found for visit of type " "%s on origin with url %s.")
% (visit_type, origin_url)
)
return get_save_origin_requests_from_queryset(sors)
def get_save_origin_task_info(
save_request_id: int, full_info: bool = True
) -> Dict[str, Any]:
"""
Get detailed information about an accepted save origin request
and its associated loading task.
If the associated loading task info is archived and removed
from the scheduler database, returns an empty dictionary.
Args:
save_request_id: identifier of a save origin request
full_info: whether to return detailed info for staff users
Returns:
A dictionary with the following keys:
- **type**: loading task type
- **arguments**: loading task arguments
- **id**: loading task database identifier
- **backend_id**: loading task celery identifier
- **scheduled**: loading task scheduling date
- **ended**: loading task termination date
- **status**: loading task execution status
Depending on the availability of the task logs in the elasticsearch
cluster of Software Heritage, the returned dictionary may also
contain the following keys:
- **name**: associated celery task name
- **message**: relevant log message from task execution
- **duration**: task execution time (only if it succeeded)
- **worker**: name of the worker that executed the task
"""
try:
save_request = SaveOriginRequest.objects.get(id=save_request_id)
except ObjectDoesNotExist:
return {}
task = scheduler.get_tasks([save_request.loading_task_id])
task = task[0] if task else None
if task is None:
return {}
task_run = scheduler.get_task_runs([task["id"]])
task_run = task_run[0] if task_run else None
if task_run is None:
return {}
task_run["type"] = task["type"]
task_run["arguments"] = task["arguments"]
task_run["id"] = task_run["task"]
del task_run["task"]
del task_run["metadata"]
es_workers_index_url = config.get_config()["es_workers_index_url"]
if not es_workers_index_url:
return task_run
es_workers_index_url += "/_search"
if save_request.visit_date:
min_ts = save_request.visit_date
max_ts = min_ts + timedelta(days=7)
else:
min_ts = save_request.request_date
max_ts = min_ts + timedelta(days=30)
min_ts_unix = int(min_ts.timestamp()) * 1000
max_ts_unix = int(max_ts.timestamp()) * 1000
save_task_status = _save_task_status[task["status"]]
priority = "3" if save_task_status == SAVE_TASK_FAILED else "6"
query = {
"bool": {
"must": [
{"match_phrase": {"priority": {"query": priority}}},
{"match_phrase": {"swh_task_id": {"query": task_run["backend_id"]}}},
{
"range": {
"@timestamp": {
"gte": min_ts_unix,
"lte": max_ts_unix,
"format": "epoch_millis",
}
}
},
]
}
}
try:
response = requests.post(
es_workers_index_url,
json={"query": query, "sort": ["@timestamp"]},
timeout=30,
)
results = json.loads(response.text)
if results["hits"]["total"]["value"] >= 1:
task_run_info = results["hits"]["hits"][-1]["_source"]
if "swh_logging_args_runtime" in task_run_info:
duration = task_run_info["swh_logging_args_runtime"]
task_run["duration"] = duration
if "message" in task_run_info:
task_run["message"] = task_run_info["message"]
if "swh_logging_args_name" in task_run_info:
task_run["name"] = task_run_info["swh_logging_args_name"]
elif "swh_task_name" in task_run_info:
task_run["name"] = task_run_info["swh_task_name"]
if "hostname" in task_run_info:
task_run["worker"] = task_run_info["hostname"]
elif "host" in task_run_info:
task_run["worker"] = task_run_info["host"]
except Exception as exc:
logger.warning("Request to Elasticsearch failed\n%s", exc)
sentry_sdk.capture_exception(exc)
if not full_info:
for field in ("id", "backend_id", "worker"):
# remove some staff only fields
task_run.pop(field, None)
if "message" in task_run and "Loading failure" in task_run["message"]:
# hide traceback for non staff users, only display exception
message_lines = task_run["message"].split("\n")
message = ""
for line in message_lines:
if line.startswith("Traceback"):
break
message += f"{line}\n"
message += message_lines[-1]
task_run["message"] = message
return task_run
SUBMITTED_SAVE_REQUESTS_METRIC = "swh_web_submitted_save_requests"
_submitted_save_requests_gauge = Gauge(
name=SUBMITTED_SAVE_REQUESTS_METRIC,
documentation="Number of submitted origin save requests",
labelnames=["status", "visit_type"],
registry=SWH_WEB_METRICS_REGISTRY,
)
ACCEPTED_SAVE_REQUESTS_METRIC = "swh_web_accepted_save_requests"
_accepted_save_requests_gauge = Gauge(
name=ACCEPTED_SAVE_REQUESTS_METRIC,
documentation="Number of accepted origin save requests",
labelnames=["load_task_status", "visit_type"],
registry=SWH_WEB_METRICS_REGISTRY,
)
def compute_save_requests_metrics():
"""Compute a couple of Prometheus metrics related to
origin save requests"""
request_statuses = (
SAVE_REQUEST_ACCEPTED,
SAVE_REQUEST_REJECTED,
SAVE_REQUEST_PENDING,
)
load_task_statuses = (
SAVE_TASK_NOT_CREATED,
SAVE_TASK_NOT_YET_SCHEDULED,
SAVE_TASK_SCHEDULED,
SAVE_TASK_SUCCEED,
SAVE_TASK_FAILED,
SAVE_TASK_RUNNING,
)
visit_types = get_savable_visit_types()
labels_set = product(request_statuses, visit_types)
for labels in labels_set:
_submitted_save_requests_gauge.labels(*labels).set(0)
labels_set = product(load_task_statuses, visit_types)
for labels in labels_set:
_accepted_save_requests_gauge.labels(*labels).set(0)
for sor in SaveOriginRequest.objects.all():
if sor.status == SAVE_REQUEST_ACCEPTED:
_accepted_save_requests_gauge.labels(
load_task_status=sor.loading_task_status, visit_type=sor.visit_type
).inc()
_submitted_save_requests_gauge.labels(
status=sor.status, visit_type=sor.visit_type
).inc()
diff --git a/swh/web/common/origin_visits.py b/swh/web/common/origin_visits.py
index 5e7b0d2e3..8f63d5b13 100644
--- a/swh/web/common/origin_visits.py
+++ b/swh/web/common/origin_visits.py
@@ -1,193 +1,193 @@
# Copyright (C) 2018-2019 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU Affero General Public License version 3, or any later version
# See top-level LICENSE file for more information
import math
-from typing import List, Optional, Union
+from typing import List, Optional
from django.core.cache import cache
from swh.web.common.exc import NotFoundExc
from swh.web.common.typing import OriginInfo, OriginVisitInfo
-from swh.web.common.utils import parse_timestamp
+from swh.web.common.utils import parse_iso8601_date_to_utc
def get_origin_visits(origin_info: OriginInfo) -> List[OriginVisitInfo]:
"""Function that returns the list of visits for a swh origin.
That list is put in cache in order to speedup the navigation
in the swh web browse ui.
The returned visits are sorted according to their date in
ascending order.
Args:
origin_info: dict describing the origin to fetch visits from
Returns:
A list of dict describing the origin visits
Raises:
swh.web.common.exc.NotFoundExc: if the origin is not found
"""
from swh.web.common import service
if "url" in origin_info:
origin_url = origin_info["url"]
else:
origin_url = service.lookup_origin(origin_info)["url"]
cache_entry_id = "origin_visits_%s" % origin_url
cache_entry = cache.get(cache_entry_id)
if cache_entry:
last_visit = cache_entry[-1]["visit"]
new_visits = list(
service.lookup_origin_visits(origin_url, last_visit=last_visit)
)
if not new_visits:
last_snp = service.lookup_latest_origin_snapshot(origin_url)
if not last_snp or last_snp["id"] == cache_entry[-1]["snapshot"]:
return cache_entry
origin_visits = []
per_page = service.MAX_LIMIT
last_visit = None
while 1:
visits = list(
service.lookup_origin_visits(
origin_url, last_visit=last_visit, per_page=per_page
)
)
origin_visits += visits
if len(visits) < per_page:
break
else:
if not last_visit:
last_visit = per_page
else:
last_visit += per_page
def _visit_sort_key(visit):
- ts = parse_timestamp(visit["date"]).timestamp()
+ ts = parse_iso8601_date_to_utc(visit["date"]).timestamp()
return ts + (float(visit["visit"]) / 10e3)
origin_visits = sorted(origin_visits, key=lambda v: _visit_sort_key(v))
cache.set(cache_entry_id, origin_visits)
return origin_visits
def get_origin_visit(
origin_info: OriginInfo,
- visit_ts: Optional[Union[int, str]] = None,
+ visit_ts: Optional[str] = None,
visit_id: Optional[int] = None,
snapshot_id: Optional[str] = None,
) -> OriginVisitInfo:
"""Function that returns information about a visit for a given origin.
If a timestamp is provided, the closest visit from that
timestamp is returned.
If a snapshot identifier is provided, the first visit with that snapshot
is returned.
If no search hints are provided, return the most recent full visit with
a valid snapshot or the most recent partial visit with a valid snapshot
otherwise.
Args:
origin_info: a dict filled with origin information
- visit_ts: an ISO date string or Unix timestamp to parse
+ visit_ts: an ISO 8601 datetime string to parse
snapshot_id: a snapshot identifier
Returns:
A dict containing the visit info.
Raises:
swh.web.common.exc.NotFoundExc: if no visit can be found
"""
if not visit_ts and not visit_id and not snapshot_id:
from swh.web.common import service
# returns the latest full visit with a valid snapshot
visit = service.lookup_origin_visit_latest(
origin_info["url"], allowed_statuses=["full"], require_snapshot=True
)
if not visit:
# or the latest partial visit with a valid snapshot otherwise
visit = service.lookup_origin_visit_latest(
origin_info["url"], allowed_statuses=["partial"], require_snapshot=True
)
if visit:
return visit
else:
raise NotFoundExc(
f"No valid visit for origin with url {origin_info['url']} found!"
)
visits = get_origin_visits(origin_info)
if not visits:
raise NotFoundExc(
f"No visits associated to origin with url {origin_info['url']}!"
)
if snapshot_id:
visits = [v for v in visits if v["snapshot"] == snapshot_id]
if len(visits) == 0:
raise NotFoundExc(
(
"Visit for snapshot with id %s for origin with"
" url %s not found!" % (snapshot_id, origin_info["url"])
)
)
return visits[0]
if visit_id:
visits = [v for v in visits if v["visit"] == int(visit_id)]
if len(visits) == 0:
raise NotFoundExc(
(
"Visit with id %s for origin with"
" url %s not found!" % (visit_id, origin_info["url"])
)
)
return visits[0]
if visit_ts:
- target_visit_ts = math.floor(parse_timestamp(visit_ts).timestamp())
+ target_visit_ts = math.floor(parse_iso8601_date_to_utc(visit_ts).timestamp())
# Find the visit with date closest to the target (in absolute value)
(abs_time_delta, visit_idx) = min(
(
- (math.floor(parse_timestamp(visit["date"]).timestamp()), i)
+ (math.floor(parse_iso8601_date_to_utc(visit["date"]).timestamp()), i)
for (i, visit) in enumerate(visits)
),
key=lambda ts_and_i: abs(ts_and_i[0] - target_visit_ts),
)
if visit_idx is not None:
visit = visits[visit_idx]
# If multiple visits have the same date, select the one with
# the largest id.
while (
visit_idx < len(visits) - 1
and visit["date"] == visits[visit_idx + 1]["date"]
):
visit_idx = visit_idx + 1
visit = visits[visit_idx]
return visit
else:
raise NotFoundExc(
(
"Visit with timestamp %s for origin with "
"url %s not found!" % (visit_ts, origin_info["url"])
)
)
return visits[-1]
diff --git a/swh/web/common/utils.py b/swh/web/common/utils.py
index 9ff3fb7ec..7d64deb12 100644
--- a/swh/web/common/utils.py
+++ b/swh/web/common/utils.py
@@ -1,365 +1,357 @@
# Copyright (C) 2017-2020 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU Affero General Public License version 3, or any later version
# See top-level LICENSE file for more information
import re
from datetime import datetime, timezone
-from dateutil import parser as date_parser
-from dateutil import tz
-
from typing import Optional, Dict, Any
import docutils.parsers.rst
import docutils.utils
from bs4 import BeautifulSoup
from docutils.core import publish_parts
from docutils.writers.html5_polyglot import Writer, HTMLTranslator
from django.urls import reverse as django_reverse
from django.http import QueryDict, HttpRequest
+from iso8601 import parse_date, ParseError
+
from prometheus_client.registry import CollectorRegistry
from rest_framework.authentication import SessionAuthentication
from swh.web.common.exc import BadInputExc
from swh.web.common.typing import QueryParameters
from swh.web.config import get_config
SWH_WEB_METRICS_REGISTRY = CollectorRegistry(auto_describe=True)
swh_object_icons = {
"branch": "mdi mdi-source-branch",
"branches": "mdi mdi-source-branch",
"content": "mdi mdi-file-document",
"directory": "mdi mdi-folder",
"origin": "mdi mdi-source-repository",
"person": "mdi mdi-account",
"revisions history": "mdi mdi-history",
"release": "mdi mdi-tag",
"releases": "mdi mdi-tag",
"revision": "mdi mdi-rotate-90 mdi-source-commit",
"snapshot": "mdi mdi-camera",
"visits": "mdi mdi-calendar-month",
}
def reverse(
viewname: str,
url_args: Optional[Dict[str, Any]] = None,
query_params: Optional[QueryParameters] = None,
current_app: Optional[str] = None,
urlconf: Optional[str] = None,
request: Optional[HttpRequest] = None,
) -> str:
"""An override of django reverse function supporting query parameters.
Args:
viewname: the name of the django view from which to compute a url
url_args: dictionary of url arguments indexed by their names
query_params: dictionary of query parameters to append to the
reversed url
current_app: the name of the django app tighten to the view
urlconf: url configuration module
request: build an absolute URI if provided
Returns:
str: the url of the requested view with processed arguments and
query parameters
"""
if url_args:
url_args = {k: v for k, v in url_args.items() if v is not None}
url = django_reverse(
viewname, urlconf=urlconf, kwargs=url_args, current_app=current_app
)
if query_params:
query_params = {k: v for k, v in query_params.items() if v}
if query_params and len(query_params) > 0:
query_dict = QueryDict("", mutable=True)
for k in sorted(query_params.keys()):
query_dict[k] = query_params[k]
url += "?" + query_dict.urlencode(safe="/;:")
if request is not None:
url = request.build_absolute_uri(url)
return url
def datetime_to_utc(date):
"""Returns datetime in UTC without timezone info
Args:
date (datetime.datetime): input datetime with timezone info
Returns:
datetime.datetime: datetime in UTC without timezone info
"""
- if date.tzinfo:
- return date.astimezone(tz.gettz("UTC")).replace(tzinfo=timezone.utc)
+ if date.tzinfo and date.tzinfo != timezone.utc:
+ return date.astimezone(tz=timezone.utc)
else:
return date
-def parse_timestamp(timestamp):
- """Given a time or timestamp (as string), parse the result as UTC datetime.
+def parse_iso8601_date_to_utc(iso_date: str) -> datetime:
+ """Given an ISO 8601 datetime string, parse the result as UTC datetime.
Returns:
- datetime.datetime: a timezone-aware datetime representing the
- parsed value or None if the parsing fails.
+ a timezone-aware datetime representing the parsed date
+
+ Raises:
+ swh.web.common.exc.BadInputExc: provided date does not respect ISO 8601 format
Samples:
- 2016-01-12
- 2016-01-12T09:19:12+0100
- - Today is January 1, 2047 at 8:21:00AM
- - 1452591542
+ - 2007-01-14T20:34:22Z
"""
- if not timestamp:
- return None
-
try:
- date = date_parser.parse(timestamp, ignoretz=False, fuzzy=True)
+ date = parse_date(iso_date)
return datetime_to_utc(date)
- except Exception:
- try:
- return datetime.utcfromtimestamp(float(timestamp)).replace(
- tzinfo=timezone.utc
- )
- except (ValueError, OverflowError) as e:
- raise BadInputExc(e)
+ except ParseError as e:
+ raise BadInputExc(e)
def shorten_path(path):
"""Shorten the given path: for each hash present, only return the first
8 characters followed by an ellipsis"""
sha256_re = r"([0-9a-f]{8})[0-9a-z]{56}"
sha1_re = r"([0-9a-f]{8})[0-9a-f]{32}"
ret = re.sub(sha256_re, r"\1...", path)
return re.sub(sha1_re, r"\1...", ret)
def format_utc_iso_date(iso_date, fmt="%d %B %Y, %H:%M UTC"):
- """Turns a string representation of an ISO 8601 date string
+ """Turns a string representation of an ISO 8601 datetime string
to UTC and format it into a more human readable one.
For instance, from the following input
string: '2017-05-04T13:27:13+02:00' the following one
is returned: '04 May 2017, 11:27 UTC'.
Custom format string may also be provided
as parameter
Args:
iso_date (str): a string representation of an ISO 8601 date
fmt (str): optional date formatting string
Returns:
str: a formatted string representation of the input iso date
"""
if not iso_date:
return iso_date
- date = parse_timestamp(iso_date)
+ date = parse_iso8601_date_to_utc(iso_date)
return date.strftime(fmt)
def gen_path_info(path):
"""Function to generate path data navigation for use
with a breadcrumb in the swh web ui.
For instance, from a path /folder1/folder2/folder3,
it returns the following list::
[{'name': 'folder1', 'path': 'folder1'},
{'name': 'folder2', 'path': 'folder1/folder2'},
{'name': 'folder3', 'path': 'folder1/folder2/folder3'}]
Args:
path: a filesystem path
Returns:
list: a list of path data for navigation as illustrated above.
"""
path_info = []
if path:
sub_paths = path.strip("/").split("/")
path_from_root = ""
for p in sub_paths:
path_from_root += "/" + p
path_info.append({"name": p, "path": path_from_root.strip("/")})
return path_info
def parse_rst(text, report_level=2):
"""
Parse a reStructuredText string with docutils.
Args:
text (str): string with reStructuredText markups in it
report_level (int): level of docutils report messages to print
(1 info 2 warning 3 error 4 severe 5 none)
Returns:
docutils.nodes.document: a parsed docutils document
"""
parser = docutils.parsers.rst.Parser()
components = (docutils.parsers.rst.Parser,)
settings = docutils.frontend.OptionParser(
components=components
).get_default_values()
settings.report_level = report_level
document = docutils.utils.new_document("rst-doc", settings=settings)
parser.parse(text, document)
return document
def get_client_ip(request):
"""
Return the client IP address from an incoming HTTP request.
Args:
request (django.http.HttpRequest): the incoming HTTP request
Returns:
str: The client IP address
"""
x_forwarded_for = request.META.get("HTTP_X_FORWARDED_FOR")
if x_forwarded_for:
ip = x_forwarded_for.split(",")[0]
else:
ip = request.META.get("REMOTE_ADDR")
return ip
browsers_supported_image_mimes = set(
[
"image/gif",
"image/png",
"image/jpeg",
"image/bmp",
"image/webp",
"image/svg",
"image/svg+xml",
]
)
def context_processor(request):
"""
Django context processor used to inject variables
in all swh-web templates.
"""
config = get_config()
if (
hasattr(request, "user")
and request.user.is_authenticated
and not hasattr(request.user, "backend")
):
# To avoid django.template.base.VariableDoesNotExist errors
# when rendering templates when standard Django user is logged in.
request.user.backend = "django.contrib.auth.backends.ModelBackend"
return {
"swh_object_icons": swh_object_icons,
"available_languages": None,
"swh_client_config": config["client_config"],
"oidc_enabled": bool(config["keycloak"]["server_url"]),
"browsers_supported_image_mimes": browsers_supported_image_mimes,
}
class EnforceCSRFAuthentication(SessionAuthentication):
"""
Helper class to enforce CSRF validation on a DRF view
when a user is not authenticated.
"""
def authenticate(self, request):
user = getattr(request._request, "user", None)
self.enforce_csrf(request)
return (user, None)
def resolve_branch_alias(
snapshot: Dict[str, Any], branch: Optional[Dict[str, Any]]
) -> Optional[Dict[str, Any]]:
"""
Resolve branch alias in snapshot content.
Args:
snapshot: a full snapshot content
branch: a branch alias contained in the snapshot
Returns:
The real snapshot branch that got aliased.
"""
while branch and branch["target_type"] == "alias":
if branch["target"] in snapshot["branches"]:
branch = snapshot["branches"][branch["target"]]
else:
from swh.web.common import service
snp = service.lookup_snapshot(
snapshot["id"], branches_from=branch["target"], branches_count=1
)
if snp and branch["target"] in snp["branches"]:
branch = snp["branches"][branch["target"]]
else:
branch = None
return branch
class _NoHeaderHTMLTranslator(HTMLTranslator):
"""
Docutils translator subclass to customize the generation of HTML
from reST-formatted docstrings
"""
def __init__(self, document):
super().__init__(document)
self.body_prefix = []
self.body_suffix = []
_HTML_WRITER = Writer()
_HTML_WRITER.translator_class = _NoHeaderHTMLTranslator
def rst_to_html(rst: str) -> str:
"""
Convert reStructuredText document into HTML.
Args:
rst: A string containing a reStructuredText document
Returns:
Body content of the produced HTML conversion.
"""
settings = {
"initial_header_level": 2,
}
pp = publish_parts(rst, writer=_HTML_WRITER, settings_overrides=settings)
return f'<div class="swh-rst">{pp["html_body"]}</div>'
def prettify_html(html: str) -> str:
"""
Prettify an HTML document.
Args:
html: Input HTML document
Returns:
The prettified HTML document
"""
return BeautifulSoup(html, "lxml").prettify()
diff --git a/swh/web/tests/browse/views/test_origin.py b/swh/web/tests/browse/views/test_origin.py
index dce5954b3..f42e28338 100644
--- a/swh/web/tests/browse/views/test_origin.py
+++ b/swh/web/tests/browse/views/test_origin.py
@@ -1,1313 +1,1244 @@
# Copyright (C) 2017-2020 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU Affero General Public License version 3, or any later version
# See top-level LICENSE file for more information
import random
import re
import string
from django.utils.html import escape
from hypothesis import given
from swh.storage.utils import now
from swh.model.hashutil import hash_to_bytes
from swh.model.identifiers import CONTENT, DIRECTORY, RELEASE, REVISION, SNAPSHOT
from swh.model.model import (
Snapshot,
SnapshotBranch,
TargetType,
OriginVisit,
OriginVisitStatus,
)
from swh.web.browse.snapshot_context import process_snapshot_branches
from swh.web.common.exc import NotFoundExc
from swh.web.common.identifiers import gen_swhid
from swh.web.common.utils import (
reverse,
gen_path_info,
format_utc_iso_date,
- parse_timestamp,
+ parse_iso8601_date_to_utc,
)
from swh.web.tests.data import get_content, random_sha1
from swh.web.tests.django_asserts import assert_contains, assert_template_used
from swh.web.tests.strategies import (
origin,
origin_with_multiple_visits,
new_origin,
new_snapshot,
visit_dates,
revisions,
origin_with_releases,
release as existing_release,
unknown_revision,
)
@given(origin_with_multiple_visits())
def test_origin_visits_browse(client, archive_data, origin):
url = reverse("browse-origin-visits", query_params={"origin_url": origin["url"]})
resp = client.get(url)
assert resp.status_code == 200
assert_template_used(resp, "browse/origin-visits.html")
url = reverse("browse-origin-visits", query_params={"origin_url": origin["url"]})
resp = client.get(url)
assert resp.status_code == 200
assert_template_used(resp, "browse/origin-visits.html")
visits = archive_data.origin_visit_get(origin["url"])
for v in visits:
vdate = format_utc_iso_date(v["date"], "%Y-%m-%dT%H:%M:%SZ")
browse_dir_url = reverse(
"browse-origin-directory",
query_params={"origin_url": origin["url"], "timestamp": vdate},
)
assert_contains(resp, browse_dir_url)
_check_origin_link(resp, origin["url"])
@given(origin_with_multiple_visits())
def test_origin_content_view(client, archive_data, origin):
origin_visits = archive_data.origin_visit_get(origin["url"])
def _get_archive_data(visit_idx):
snapshot = archive_data.snapshot_get(origin_visits[visit_idx]["snapshot"])
head_rev_id = archive_data.snapshot_get_head(snapshot)
head_rev = archive_data.revision_get(head_rev_id)
dir_content = archive_data.directory_ls(head_rev["directory"])
dir_files = [e for e in dir_content if e["type"] == "file"]
dir_file = random.choice(dir_files)
branches, releases = process_snapshot_branches(snapshot)
return {
"branches": branches,
"releases": releases,
"root_dir_sha1": head_rev["directory"],
"content": get_content(dir_file["checksums"]["sha1"]),
"visit": origin_visits[visit_idx],
}
tdata = _get_archive_data(-1)
_origin_content_view_test_helper(
client,
archive_data,
origin,
origin_visits[-1],
tdata["branches"],
tdata["releases"],
tdata["root_dir_sha1"],
tdata["content"],
)
_origin_content_view_test_helper(
client,
archive_data,
origin,
origin_visits[-1],
tdata["branches"],
tdata["releases"],
tdata["root_dir_sha1"],
tdata["content"],
timestamp=tdata["visit"]["date"],
)
- visit_unix_ts = parse_timestamp(tdata["visit"]["date"]).timestamp()
- visit_unix_ts = int(visit_unix_ts)
-
- _origin_content_view_test_helper(
- client,
- archive_data,
- origin,
- origin_visits[-1],
- tdata["branches"],
- tdata["releases"],
- tdata["root_dir_sha1"],
- tdata["content"],
- timestamp=visit_unix_ts,
- )
-
_origin_content_view_test_helper(
client,
archive_data,
origin,
origin_visits[-1],
tdata["branches"],
tdata["releases"],
tdata["root_dir_sha1"],
tdata["content"],
snapshot_id=tdata["visit"]["snapshot"],
)
tdata = _get_archive_data(0)
_origin_content_view_test_helper(
client,
archive_data,
origin,
origin_visits[0],
tdata["branches"],
tdata["releases"],
tdata["root_dir_sha1"],
tdata["content"],
visit_id=tdata["visit"]["visit"],
)
_origin_content_view_test_helper(
client,
archive_data,
origin,
origin_visits[0],
tdata["branches"],
tdata["releases"],
tdata["root_dir_sha1"],
tdata["content"],
snapshot_id=tdata["visit"]["snapshot"],
)
@given(origin())
def test_origin_root_directory_view(client, archive_data, origin):
origin_visits = archive_data.origin_visit_get(origin["url"])
visit = origin_visits[-1]
snapshot = archive_data.snapshot_get(visit["snapshot"])
head_rev_id = archive_data.snapshot_get_head(snapshot)
head_rev = archive_data.revision_get(head_rev_id)
root_dir_sha1 = head_rev["directory"]
dir_content = archive_data.directory_ls(root_dir_sha1)
branches, releases = process_snapshot_branches(snapshot)
- visit_unix_ts = parse_timestamp(visit["date"]).timestamp()
- visit_unix_ts = int(visit_unix_ts)
_origin_directory_view_test_helper(
client,
archive_data,
origin,
visit,
branches,
releases,
root_dir_sha1,
dir_content,
)
_origin_directory_view_test_helper(
client,
archive_data,
origin,
visit,
branches,
releases,
root_dir_sha1,
dir_content,
visit_id=visit["visit"],
)
- _origin_directory_view_test_helper(
- client,
- archive_data,
- origin,
- visit,
- branches,
- releases,
- root_dir_sha1,
- dir_content,
- timestamp=visit_unix_ts,
- )
-
_origin_directory_view_test_helper(
client,
archive_data,
origin,
visit,
branches,
releases,
root_dir_sha1,
dir_content,
timestamp=visit["date"],
)
_origin_directory_view_test_helper(
client,
archive_data,
origin,
visit,
branches,
releases,
root_dir_sha1,
dir_content,
snapshot_id=visit["snapshot"],
)
_origin_directory_view_test_helper(
client,
archive_data,
origin,
visit,
branches,
releases,
root_dir_sha1,
dir_content,
)
_origin_directory_view_test_helper(
client,
archive_data,
origin,
visit,
branches,
releases,
root_dir_sha1,
dir_content,
visit_id=visit["visit"],
)
- _origin_directory_view_test_helper(
- client,
- archive_data,
- origin,
- visit,
- branches,
- releases,
- root_dir_sha1,
- dir_content,
- timestamp=visit_unix_ts,
- )
-
_origin_directory_view_test_helper(
client,
archive_data,
origin,
visit,
branches,
releases,
root_dir_sha1,
dir_content,
timestamp=visit["date"],
)
_origin_directory_view_test_helper(
client,
archive_data,
origin,
visit,
branches,
releases,
root_dir_sha1,
dir_content,
snapshot_id=visit["snapshot"],
)
@given(origin())
def test_origin_sub_directory_view(client, archive_data, origin):
origin_visits = archive_data.origin_visit_get(origin["url"])
visit = origin_visits[-1]
snapshot = archive_data.snapshot_get(visit["snapshot"])
head_rev_id = archive_data.snapshot_get_head(snapshot)
head_rev = archive_data.revision_get(head_rev_id)
root_dir_sha1 = head_rev["directory"]
subdirs = [
e for e in archive_data.directory_ls(root_dir_sha1) if e["type"] == "dir"
]
branches, releases = process_snapshot_branches(snapshot)
- visit_unix_ts = parse_timestamp(visit["date"]).timestamp()
- visit_unix_ts = int(visit_unix_ts)
if len(subdirs) == 0:
return
subdir = random.choice(subdirs)
subdir_content = archive_data.directory_ls(subdir["target"])
subdir_path = subdir["name"]
_origin_directory_view_test_helper(
client,
archive_data,
origin,
visit,
branches,
releases,
root_dir_sha1,
subdir_content,
path=subdir_path,
)
_origin_directory_view_test_helper(
client,
archive_data,
origin,
visit,
branches,
releases,
root_dir_sha1,
subdir_content,
path=subdir_path,
visit_id=visit["visit"],
)
- _origin_directory_view_test_helper(
- client,
- archive_data,
- origin,
- visit,
- branches,
- releases,
- root_dir_sha1,
- subdir_content,
- path=subdir_path,
- timestamp=visit_unix_ts,
- )
-
_origin_directory_view_test_helper(
client,
archive_data,
origin,
visit,
branches,
releases,
root_dir_sha1,
subdir_content,
path=subdir_path,
timestamp=visit["date"],
)
_origin_directory_view_test_helper(
client,
archive_data,
origin,
visit,
branches,
releases,
root_dir_sha1,
subdir_content,
path=subdir_path,
snapshot_id=visit["snapshot"],
)
_origin_directory_view_test_helper(
client,
archive_data,
origin,
visit,
branches,
releases,
root_dir_sha1,
subdir_content,
path=subdir_path,
)
_origin_directory_view_test_helper(
client,
archive_data,
origin,
visit,
branches,
releases,
root_dir_sha1,
subdir_content,
path=subdir_path,
visit_id=visit["visit"],
)
- _origin_directory_view_test_helper(
- client,
- archive_data,
- origin,
- visit,
- branches,
- releases,
- root_dir_sha1,
- subdir_content,
- path=subdir_path,
- timestamp=visit_unix_ts,
- )
-
_origin_directory_view_test_helper(
client,
archive_data,
origin,
visit,
branches,
releases,
root_dir_sha1,
subdir_content,
path=subdir_path,
timestamp=visit["date"],
)
_origin_directory_view_test_helper(
client,
archive_data,
origin,
visit,
branches,
releases,
root_dir_sha1,
subdir_content,
path=subdir_path,
snapshot_id=visit["snapshot"],
)
@given(origin())
def test_origin_branches(client, archive_data, origin):
origin_visits = archive_data.origin_visit_get(origin["url"])
visit = origin_visits[-1]
snapshot = archive_data.snapshot_get(visit["snapshot"])
snapshot_content = process_snapshot_branches(snapshot)
_origin_branches_test_helper(client, origin, snapshot_content)
_origin_branches_test_helper(
client, origin, snapshot_content, snapshot_id=visit["snapshot"]
)
@given(origin())
def test_origin_releases(client, archive_data, origin):
origin_visits = archive_data.origin_visit_get(origin["url"])
visit = origin_visits[-1]
snapshot = archive_data.snapshot_get(visit["snapshot"])
snapshot_content = process_snapshot_branches(snapshot)
_origin_releases_test_helper(client, origin, snapshot_content)
_origin_releases_test_helper(
client, origin, snapshot_content, snapshot_id=visit["snapshot"]
)
@given(
new_origin(),
new_snapshot(min_size=4, max_size=4),
visit_dates(),
revisions(min_size=3, max_size=3),
)
def test_origin_snapshot_null_branch(
client, archive_data, new_origin, new_snapshot, visit_dates, revisions
):
snp_dict = new_snapshot.to_dict()
archive_data.origin_add([new_origin])
for i, branch in enumerate(snp_dict["branches"].keys()):
if i == 0:
snp_dict["branches"][branch] = None
else:
snp_dict["branches"][branch] = {
"target_type": "revision",
"target": hash_to_bytes(revisions[i - 1]),
}
archive_data.snapshot_add([Snapshot.from_dict(snp_dict)])
visit = archive_data.origin_visit_add(
[OriginVisit(origin=new_origin.url, date=visit_dates[0], type="git",)]
)[0]
visit_status = OriginVisitStatus(
origin=new_origin.url,
visit=visit.visit,
date=now(),
status="partial",
snapshot=snp_dict["id"],
)
archive_data.origin_visit_status_add([visit_status])
url = reverse(
"browse-origin-directory", query_params={"origin_url": new_origin.url}
)
rv = client.get(url)
assert rv.status_code == 200
@given(
new_origin(),
new_snapshot(min_size=4, max_size=4),
visit_dates(),
revisions(min_size=4, max_size=4),
)
def test_origin_snapshot_invalid_branch(
client, archive_data, new_origin, new_snapshot, visit_dates, revisions
):
snp_dict = new_snapshot.to_dict()
archive_data.origin_add([new_origin])
for i, branch in enumerate(snp_dict["branches"].keys()):
snp_dict["branches"][branch] = {
"target_type": "revision",
"target": hash_to_bytes(revisions[i]),
}
archive_data.snapshot_add([Snapshot.from_dict(snp_dict)])
visit = archive_data.origin_visit_add(
[OriginVisit(origin=new_origin.url, date=visit_dates[0], type="git",)]
)[0]
visit_status = OriginVisitStatus(
origin=new_origin.url,
visit=visit.visit,
date=now(),
status="full",
snapshot=snp_dict["id"],
)
archive_data.origin_visit_status_add([visit_status])
url = reverse(
"browse-origin-directory",
query_params={"origin_url": new_origin.url, "branch": "invalid_branch"},
)
rv = client.get(url)
assert rv.status_code == 404
@given(new_origin())
def test_browse_visits_origin_not_found(client, new_origin):
url = reverse("browse-origin-visits", query_params={"origin_url": new_origin.url})
resp = client.get(url)
assert resp.status_code == 404
assert_template_used(resp, "error.html")
assert_contains(
resp, f"Origin with url {new_origin.url} not found", status_code=404
)
@given(origin())
def test_browse_origin_directory_no_visit(client, mocker, origin):
mock_get_origin_visits = mocker.patch(
"swh.web.common.origin_visits.get_origin_visits"
)
mock_get_origin_visits.return_value = []
url = reverse("browse-origin-directory", query_params={"origin_url": origin["url"]})
resp = client.get(url)
assert resp.status_code == 404
assert_template_used(resp, "error.html")
assert_contains(resp, "No visit", status_code=404)
assert mock_get_origin_visits.called
@given(origin())
def test_browse_origin_directory_unknown_visit(client, mocker, origin):
mock_get_origin_visits = mocker.patch(
"swh.web.common.origin_visits.get_origin_visits"
)
mock_get_origin_visits.return_value = [{"visit": 1}]
url = reverse(
"browse-origin-directory",
query_params={"origin_url": origin["url"], "visit_id": 2},
)
resp = client.get(url)
assert resp.status_code == 404
assert_template_used(resp, "error.html")
assert re.search("Visit.*not found", resp.content.decode("utf-8"))
assert mock_get_origin_visits.called
@given(origin())
def test_browse_origin_directory_not_found(client, origin):
url = reverse(
"browse-origin-directory",
query_params={"origin_url": origin["url"], "path": "/invalid/dir/path/"},
)
resp = client.get(url)
assert resp.status_code == 404
assert_template_used(resp, "error.html")
assert re.search("Directory.*not found", resp.content.decode("utf-8"))
@given(origin())
def test_browse_origin_content_no_visit(client, mocker, origin):
mock_get_origin_visits = mocker.patch(
"swh.web.common.origin_visits.get_origin_visits"
)
mock_get_origin_visits.return_value = []
url = reverse(
"browse-origin-content",
query_params={"origin_url": origin["url"], "path": "foo"},
)
resp = client.get(url)
assert resp.status_code == 404
assert_template_used(resp, "error.html")
assert_contains(resp, "No visit", status_code=404)
assert mock_get_origin_visits.called
@given(origin())
def test_browse_origin_content_unknown_visit(client, mocker, origin):
mock_get_origin_visits = mocker.patch(
"swh.web.common.origin_visits.get_origin_visits"
)
mock_get_origin_visits.return_value = [{"visit": 1}]
url = reverse(
"browse-origin-content",
query_params={"origin_url": origin["url"], "path": "foo", "visit_id": 2},
)
resp = client.get(url)
assert resp.status_code == 404
assert_template_used(resp, "error.html")
assert re.search("Visit.*not found", resp.content.decode("utf-8"))
assert mock_get_origin_visits.called
@given(origin())
def test_browse_origin_content_directory_empty_snapshot(client, mocker, origin):
mock_snapshot_service = mocker.patch("swh.web.browse.snapshot_context.service")
mock_get_origin_visit_snapshot = mocker.patch(
"swh.web.browse.snapshot_context.get_origin_visit_snapshot"
)
mock_get_origin_visit_snapshot.return_value = ([], [])
mock_snapshot_service.lookup_origin.return_value = origin
mock_snapshot_service.lookup_snapshot_sizes.return_value = {
"revision": 0,
"release": 0,
}
for browse_context in ("content", "directory"):
url = reverse(
f"browse-origin-{browse_context}",
query_params={"origin_url": origin["url"], "path": "baz"},
)
resp = client.get(url)
assert resp.status_code == 200
assert_template_used(resp, f"browse/{browse_context}.html")
assert re.search("snapshot.*is empty", resp.content.decode("utf-8"))
assert mock_get_origin_visit_snapshot.called
assert mock_snapshot_service.lookup_origin.called
assert mock_snapshot_service.lookup_snapshot_sizes.called
@given(origin())
def test_browse_origin_content_not_found(client, origin):
url = reverse(
"browse-origin-content",
query_params={"origin_url": origin["url"], "path": "/invalid/file/path"},
)
resp = client.get(url)
assert resp.status_code == 404
assert_template_used(resp, "error.html")
assert re.search("Directory entry.*not found", resp.content.decode("utf-8"))
@given(origin())
def test_browse_directory_snapshot_not_found(client, mocker, origin):
mock_get_snapshot_context = mocker.patch(
"swh.web.browse.snapshot_context.get_snapshot_context"
)
mock_get_snapshot_context.side_effect = NotFoundExc("Snapshot not found")
url = reverse("browse-origin-directory", query_params={"origin_url": origin["url"]})
resp = client.get(url)
assert resp.status_code == 404
assert_template_used(resp, "error.html")
assert_contains(resp, "Snapshot not found", status_code=404)
assert mock_get_snapshot_context.called
@given(origin())
def test_origin_empty_snapshot(client, mocker, origin):
mock_service = mocker.patch("swh.web.browse.snapshot_context.service")
mock_get_origin_visit_snapshot = mocker.patch(
"swh.web.browse.snapshot_context.get_origin_visit_snapshot"
)
mock_get_origin_visit_snapshot.return_value = ([], [])
mock_service.lookup_snapshot_sizes.return_value = {
"revision": 0,
"release": 0,
}
mock_service.lookup_origin.return_value = origin
url = reverse("browse-origin-directory", query_params={"origin_url": origin["url"]})
resp = client.get(url)
assert resp.status_code == 200
assert_template_used(resp, "browse/directory.html")
resp_content = resp.content.decode("utf-8")
assert re.search("snapshot.*is empty", resp_content)
assert not re.search("swh-tr-link", resp_content)
assert mock_get_origin_visit_snapshot.called
assert mock_service.lookup_snapshot_sizes.called
@given(new_origin())
def test_origin_empty_snapshot_null_revision(client, archive_data, new_origin):
snapshot = Snapshot(
branches={
b"HEAD": SnapshotBranch(
target="refs/head/master".encode(), target_type=TargetType.ALIAS,
),
b"refs/head/master": None,
}
)
archive_data.origin_add([new_origin])
archive_data.snapshot_add([snapshot])
visit = archive_data.origin_visit_add(
[OriginVisit(origin=new_origin.url, date=now(), type="git",)]
)[0]
visit_status = OriginVisitStatus(
origin=new_origin.url,
visit=visit.visit,
date=now(),
status="partial",
snapshot=snapshot.id,
)
archive_data.origin_visit_status_add([visit_status])
url = reverse(
"browse-origin-directory", query_params={"origin_url": new_origin.url},
)
resp = client.get(url)
assert resp.status_code == 200
assert_template_used(resp, "browse/directory.html")
resp_content = resp.content.decode("utf-8")
assert re.search("snapshot.*is empty", resp_content)
assert not re.search("swh-tr-link", resp_content)
@given(origin_with_releases())
def test_origin_release_browse(client, archive_data, origin):
snapshot = archive_data.snapshot_get_latest(origin["url"])
release = [
b for b in snapshot["branches"].values() if b["target_type"] == "release"
][-1]
release_data = archive_data.release_get(release["target"])
revision_data = archive_data.revision_get(release_data["target"])
url = reverse(
"browse-origin-directory",
query_params={"origin_url": origin["url"], "release": release_data["name"]},
)
resp = client.get(url)
assert resp.status_code == 200
assert_contains(resp, release_data["name"])
assert_contains(resp, release["target"])
swhid_context = {
"origin": origin["url"],
"visit": gen_swhid(SNAPSHOT, snapshot["id"]),
"anchor": gen_swhid(RELEASE, release_data["id"]),
"path": "/",
}
swh_dir_id = gen_swhid(
DIRECTORY, revision_data["directory"], metadata=swhid_context
)
swh_dir_id_url = reverse("browse-swhid", url_args={"swhid": swh_dir_id})
assert_contains(resp, swh_dir_id)
assert_contains(resp, swh_dir_id_url)
@given(origin_with_releases())
def test_origin_release_browse_not_found(client, origin):
invalid_release_name = "swh-foo-bar"
url = reverse(
"browse-origin-directory",
query_params={"origin_url": origin["url"], "release": invalid_release_name},
)
resp = client.get(url)
assert resp.status_code == 404
assert re.search(
f"Release {invalid_release_name}.*not found", resp.content.decode("utf-8")
)
@given(new_origin(), unknown_revision())
def test_origin_browse_directory_branch_with_non_resolvable_revision(
client, archive_data, new_origin, unknown_revision
):
branch_name = "master"
snapshot = Snapshot(
branches={
branch_name.encode(): SnapshotBranch(
target=hash_to_bytes(unknown_revision), target_type=TargetType.REVISION,
)
}
)
archive_data.origin_add([new_origin])
archive_data.snapshot_add([snapshot])
visit = archive_data.origin_visit_add(
[OriginVisit(origin=new_origin.url, date=now(), type="git",)]
)[0]
visit_status = OriginVisitStatus(
origin=new_origin.url,
visit=visit.visit,
date=now(),
status="partial",
snapshot=snapshot.id,
)
archive_data.origin_visit_status_add([visit_status])
url = reverse(
"browse-origin-directory",
query_params={"origin_url": new_origin.url, "branch": branch_name},
)
resp = client.get(url)
assert resp.status_code == 200
assert_contains(
resp, f"Revision {unknown_revision } could not be found in the archive."
)
@given(origin())
def test_origin_content_no_path(client, origin):
url = reverse("browse-origin-content", query_params={"origin_url": origin["url"]})
resp = client.get(url)
assert resp.status_code == 400
assert_contains(
resp, "The path of a content must be given as query parameter.", status_code=400
)
def test_origin_views_no_url_query_parameter(client):
for browse_context in (
"content",
"directory",
"log",
"branches",
"releases",
"visits",
):
url = reverse(f"browse-origin-{browse_context}")
resp = client.get(url)
assert resp.status_code == 400
assert_contains(
resp, "An origin URL must be provided as query parameter.", status_code=400
)
def _origin_content_view_test_helper(
client,
archive_data,
origin_info,
origin_visit,
origin_branches,
origin_releases,
root_dir_sha1,
content,
visit_id=None,
timestamp=None,
snapshot_id=None,
):
content_path = "/".join(content["path"].split("/")[1:])
if not visit_id and not snapshot_id:
visit_id = origin_visit["visit"]
query_params = {"origin_url": origin_info["url"], "path": content_path}
if timestamp:
query_params["timestamp"] = timestamp
if visit_id:
query_params["visit_id"] = visit_id
elif snapshot_id:
query_params["snapshot"] = snapshot_id
url = reverse("browse-origin-content", query_params=query_params)
resp = client.get(url)
assert resp.status_code == 200
assert_template_used(resp, "browse/content.html")
assert type(content["data"]) == str
assert_contains(resp, '<code class="%s">' % content["hljs_language"])
assert_contains(resp, escape(content["data"]))
split_path = content_path.split("/")
filename = split_path[-1]
path = content_path.replace(filename, "")[:-1]
path_info = gen_path_info(path)
del query_params["path"]
if timestamp:
query_params["timestamp"] = format_utc_iso_date(
- parse_timestamp(timestamp).isoformat(), "%Y-%m-%dT%H:%M:%SZ"
+ parse_iso8601_date_to_utc(timestamp).isoformat(), "%Y-%m-%dT%H:%M:%SZ"
)
root_dir_url = reverse("browse-origin-directory", query_params=query_params)
assert_contains(resp, '<li class="swh-path">', count=len(path_info) + 1)
assert_contains(resp, '<a href="%s">%s</a>' % (root_dir_url, root_dir_sha1[:7]))
for p in path_info:
query_params["path"] = p["path"]
dir_url = reverse("browse-origin-directory", query_params=query_params)
assert_contains(resp, '<a href="%s">%s</a>' % (dir_url, p["name"]))
assert_contains(resp, "<li>%s</li>" % filename)
query_string = "sha1_git:" + content["sha1_git"]
url_raw = reverse(
"browse-content-raw",
url_args={"query_string": query_string},
query_params={"filename": filename},
)
assert_contains(resp, url_raw)
if "path" in query_params:
del query_params["path"]
origin_branches_url = reverse("browse-origin-branches", query_params=query_params)
assert_contains(resp, f'href="{escape(origin_branches_url)}"')
assert_contains(resp, f"Branches ({len(origin_branches)})")
origin_releases_url = reverse("browse-origin-releases", query_params=query_params)
assert_contains(resp, f'href="{escape(origin_releases_url)}">')
assert_contains(resp, f"Releases ({len(origin_releases)})")
assert_contains(resp, '<li class="swh-branch">', count=len(origin_branches))
query_params["path"] = content_path
for branch in origin_branches:
root_dir_branch_url = reverse(
"browse-origin-content",
query_params={"branch": branch["name"], **query_params},
)
assert_contains(resp, '<a href="%s">' % root_dir_branch_url)
assert_contains(resp, '<li class="swh-release">', count=len(origin_releases))
query_params["branch"] = None
for release in origin_releases:
root_dir_release_url = reverse(
"browse-origin-content",
query_params={"release": release["name"], **query_params},
)
assert_contains(resp, '<a href="%s">' % root_dir_release_url)
url = reverse("browse-origin-content", query_params=query_params)
resp = client.get(url)
assert resp.status_code == 200
assert_template_used(resp, "browse/content.html")
snapshot = archive_data.snapshot_get(origin_visit["snapshot"])
head_rev_id = archive_data.snapshot_get_head(snapshot)
swhid_context = {
"origin": origin_info["url"],
"visit": gen_swhid(SNAPSHOT, snapshot["id"]),
"anchor": gen_swhid(REVISION, head_rev_id),
"path": f"/{content_path}",
}
swh_cnt_id = gen_swhid(CONTENT, content["sha1_git"], metadata=swhid_context)
swh_cnt_id_url = reverse("browse-swhid", url_args={"swhid": swh_cnt_id})
assert_contains(resp, swh_cnt_id)
assert_contains(resp, swh_cnt_id_url)
assert_contains(resp, "swh-take-new-snapshot")
_check_origin_link(resp, origin_info["url"])
def _origin_directory_view_test_helper(
client,
archive_data,
origin_info,
origin_visit,
origin_branches,
origin_releases,
root_directory_sha1,
directory_entries,
visit_id=None,
timestamp=None,
snapshot_id=None,
path=None,
):
dirs = [e for e in directory_entries if e["type"] in ("dir", "rev")]
files = [e for e in directory_entries if e["type"] == "file"]
if not visit_id and not snapshot_id:
visit_id = origin_visit["visit"]
query_params = {"origin_url": origin_info["url"]}
if timestamp:
query_params["timestamp"] = timestamp
elif visit_id:
query_params["visit_id"] = visit_id
else:
query_params["snapshot"] = snapshot_id
if path:
query_params["path"] = path
url = reverse("browse-origin-directory", query_params=query_params)
resp = client.get(url)
assert resp.status_code == 200
assert_template_used(resp, "browse/directory.html")
assert resp.status_code == 200
assert_template_used(resp, "browse/directory.html")
assert_contains(resp, '<td class="swh-directory">', count=len(dirs))
assert_contains(resp, '<td class="swh-content">', count=len(files))
if timestamp:
query_params["timestamp"] = format_utc_iso_date(
- parse_timestamp(timestamp).isoformat(), "%Y-%m-%dT%H:%M:%SZ"
+ parse_iso8601_date_to_utc(timestamp).isoformat(), "%Y-%m-%dT%H:%M:%SZ"
)
for d in dirs:
if d["type"] == "rev":
dir_url = reverse("browse-revision", url_args={"sha1_git": d["target"]})
else:
dir_path = d["name"]
if path:
dir_path = "%s/%s" % (path, d["name"])
query_params["path"] = dir_path
dir_url = reverse("browse-origin-directory", query_params=query_params,)
assert_contains(resp, dir_url)
for f in files:
file_path = f["name"]
if path:
file_path = "%s/%s" % (path, f["name"])
query_params["path"] = file_path
file_url = reverse("browse-origin-content", query_params=query_params)
assert_contains(resp, file_url)
if "path" in query_params:
del query_params["path"]
root_dir_branch_url = reverse("browse-origin-directory", query_params=query_params)
nb_bc_paths = 1
if path:
nb_bc_paths = len(path.split("/")) + 1
assert_contains(resp, '<li class="swh-path">', count=nb_bc_paths)
assert_contains(
resp, '<a href="%s">%s</a>' % (root_dir_branch_url, root_directory_sha1[:7])
)
origin_branches_url = reverse("browse-origin-branches", query_params=query_params)
assert_contains(resp, f'href="{escape(origin_branches_url)}"')
assert_contains(resp, f"Branches ({len(origin_branches)})")
origin_releases_url = reverse("browse-origin-releases", query_params=query_params)
nb_releases = len(origin_releases)
if nb_releases > 0:
assert_contains(resp, f'href="{escape(origin_releases_url)}"')
assert_contains(resp, f"Releases ({nb_releases})")
if path:
query_params["path"] = path
assert_contains(resp, '<li class="swh-branch">', count=len(origin_branches))
for branch in origin_branches:
query_params["branch"] = branch["name"]
root_dir_branch_url = reverse(
"browse-origin-directory", query_params=query_params
)
assert_contains(resp, '<a href="%s">' % root_dir_branch_url)
assert_contains(resp, '<li class="swh-release">', count=len(origin_releases))
query_params["branch"] = None
for release in origin_releases:
query_params["release"] = release["name"]
root_dir_release_url = reverse(
"browse-origin-directory", query_params=query_params
)
assert_contains(resp, 'href="%s"' % root_dir_release_url)
assert_contains(resp, "vault-cook-directory")
assert_contains(resp, "vault-cook-revision")
snapshot = archive_data.snapshot_get(origin_visit["snapshot"])
head_rev_id = archive_data.snapshot_get_head(snapshot)
swhid_context = {
"origin": origin_info["url"],
"visit": gen_swhid(SNAPSHOT, snapshot["id"]),
"anchor": gen_swhid(REVISION, head_rev_id),
"path": f"/{path}" if path else "/",
}
swh_dir_id = gen_swhid(
DIRECTORY, directory_entries[0]["dir_id"], metadata=swhid_context
)
swh_dir_id_url = reverse("browse-swhid", url_args={"swhid": swh_dir_id})
assert_contains(resp, swh_dir_id)
assert_contains(resp, swh_dir_id_url)
assert_contains(resp, "swh-take-new-snapshot")
_check_origin_link(resp, origin_info["url"])
def _origin_branches_test_helper(
client, origin_info, origin_snapshot, snapshot_id=None
):
query_params = {"origin_url": origin_info["url"], "snapshot": snapshot_id}
url = reverse("browse-origin-branches", query_params=query_params)
resp = client.get(url)
assert resp.status_code == 200
assert_template_used(resp, "browse/branches.html")
origin_branches = origin_snapshot[0]
origin_releases = origin_snapshot[1]
origin_branches_url = reverse("browse-origin-branches", query_params=query_params)
assert_contains(resp, f'href="{escape(origin_branches_url)}"')
assert_contains(resp, f"Branches ({len(origin_branches)})")
origin_releases_url = reverse("browse-origin-releases", query_params=query_params)
nb_releases = len(origin_releases)
if nb_releases > 0:
assert_contains(resp, f'href="{escape(origin_releases_url)}">')
assert_contains(resp, f"Releases ({nb_releases})")
assert_contains(resp, '<tr class="swh-branch-entry', count=len(origin_branches))
for branch in origin_branches:
browse_branch_url = reverse(
"browse-origin-directory",
query_params={"branch": branch["name"], **query_params},
)
assert_contains(resp, '<a href="%s">' % escape(browse_branch_url))
browse_revision_url = reverse(
"browse-revision",
url_args={"sha1_git": branch["revision"]},
query_params=query_params,
)
assert_contains(resp, '<a href="%s">' % escape(browse_revision_url))
_check_origin_link(resp, origin_info["url"])
def _origin_releases_test_helper(
client, origin_info, origin_snapshot, snapshot_id=None
):
query_params = {"origin_url": origin_info["url"], "snapshot": snapshot_id}
url = reverse("browse-origin-releases", query_params=query_params)
resp = client.get(url)
assert resp.status_code == 200
assert_template_used(resp, "browse/releases.html")
origin_branches = origin_snapshot[0]
origin_releases = origin_snapshot[1]
origin_branches_url = reverse("browse-origin-branches", query_params=query_params)
assert_contains(resp, f'href="{escape(origin_branches_url)}"')
assert_contains(resp, f"Branches ({len(origin_branches)})")
origin_releases_url = reverse("browse-origin-releases", query_params=query_params)
nb_releases = len(origin_releases)
if nb_releases > 0:
assert_contains(resp, f'href="{escape(origin_releases_url)}"')
assert_contains(resp, f"Releases ({nb_releases})")
assert_contains(resp, '<tr class="swh-release-entry', count=nb_releases)
for release in origin_releases:
browse_release_url = reverse(
"browse-release",
url_args={"sha1_git": release["id"]},
query_params=query_params,
)
browse_revision_url = reverse(
"browse-revision",
url_args={"sha1_git": release["target"]},
query_params=query_params,
)
assert_contains(resp, '<a href="%s">' % escape(browse_release_url))
assert_contains(resp, '<a href="%s">' % escape(browse_revision_url))
_check_origin_link(resp, origin_info["url"])
@given(
new_origin(), visit_dates(), revisions(min_size=10, max_size=10), existing_release()
)
def test_origin_branches_pagination_with_alias(
client, archive_data, mocker, new_origin, visit_dates, revisions, existing_release
):
"""
When a snapshot contains a branch or a release alias, pagination links
in the branches / releases view should be displayed.
"""
mocker.patch("swh.web.browse.snapshot_context.PER_PAGE", len(revisions) / 2)
snp_dict = {"branches": {}, "id": hash_to_bytes(random_sha1())}
for i in range(len(revisions)):
branch = "".join(random.choices(string.ascii_lowercase, k=8))
snp_dict["branches"][branch.encode()] = {
"target_type": "revision",
"target": hash_to_bytes(revisions[i]),
}
release = "".join(random.choices(string.ascii_lowercase, k=8))
snp_dict["branches"][b"RELEASE_ALIAS"] = {
"target_type": "alias",
"target": release.encode(),
}
snp_dict["branches"][release.encode()] = {
"target_type": "release",
"target": hash_to_bytes(existing_release),
}
archive_data.origin_add([new_origin])
archive_data.snapshot_add([Snapshot.from_dict(snp_dict)])
visit = archive_data.origin_visit_add(
[OriginVisit(origin=new_origin.url, date=visit_dates[0], type="git",)]
)[0]
visit_status = OriginVisitStatus(
origin=new_origin.url,
visit=visit.visit,
date=now(),
status="full",
snapshot=snp_dict["id"],
)
archive_data.origin_visit_status_add([visit_status])
url = reverse("browse-origin-branches", query_params={"origin_url": new_origin.url})
resp = client.get(url)
assert resp.status_code == 200
assert_template_used(resp, "browse/branches.html")
assert_contains(resp, '<ul class="pagination')
def _check_origin_link(resp, origin_url):
browse_origin_url = reverse(
"browse-origin", query_params={"origin_url": origin_url}
)
assert_contains(resp, f'href="{browse_origin_url}"')
diff --git a/swh/web/tests/browse/views/test_revision.py b/swh/web/tests/browse/views/test_revision.py
index 0dabf7898..7d19561ff 100644
--- a/swh/web/tests/browse/views/test_revision.py
+++ b/swh/web/tests/browse/views/test_revision.py
@@ -1,294 +1,294 @@
# Copyright (C) 2017-2020 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU Affero General Public License version 3, or any later version
# See top-level LICENSE file for more information
import random
from django.utils.html import escape
from hypothesis import given
from swh.model.identifiers import DIRECTORY, REVISION, SNAPSHOT
from swh.web.common.identifiers import gen_swhid
-from swh.web.common.utils import reverse, format_utc_iso_date, parse_timestamp
+from swh.web.common.utils import reverse, format_utc_iso_date, parse_iso8601_date_to_utc
from swh.web.tests.django_asserts import assert_contains, assert_template_used
from swh.web.tests.strategies import origin, revision, unknown_revision, new_origin
@given(revision())
def test_revision_browse(client, archive_data, revision):
_revision_browse_checks(client, archive_data, revision)
@given(origin())
def test_revision_origin_snapshot_browse(client, archive_data, origin):
snapshot = archive_data.snapshot_get_latest(origin["url"])
revision = archive_data.snapshot_get_head(snapshot)
_revision_browse_checks(client, archive_data, revision, origin_url=origin["url"])
_revision_browse_checks(client, archive_data, revision, snapshot=snapshot)
_revision_browse_checks(
client, archive_data, revision, origin_url=origin["url"], snapshot=snapshot,
)
revision = random.choice(archive_data.revision_log(revision))["id"]
_revision_browse_checks(client, archive_data, revision, origin_url=origin["url"])
@given(revision())
def test_revision_log_browse(client, archive_data, revision):
per_page = 10
revision_log = archive_data.revision_log(revision)
revision_log_sorted = sorted(
revision_log,
- key=lambda rev: -parse_timestamp(rev["committer_date"]).timestamp(),
+ key=lambda rev: -parse_iso8601_date_to_utc(rev["committer_date"]).timestamp(),
)
url = reverse(
"browse-revision-log",
url_args={"sha1_git": revision},
query_params={"per_page": per_page},
)
resp = client.get(url)
next_page_url = reverse(
"browse-revision-log",
url_args={"sha1_git": revision},
query_params={"offset": per_page, "per_page": per_page},
)
nb_log_entries = per_page
if len(revision_log_sorted) < per_page:
nb_log_entries = len(revision_log_sorted)
assert resp.status_code == 200
assert_template_used(resp, "browse/revision-log.html")
assert_contains(resp, '<tr class="swh-revision-log-entry', count=nb_log_entries)
assert_contains(resp, '<a class="page-link">Newer</a>')
if len(revision_log_sorted) > per_page:
assert_contains(
resp, '<a class="page-link" href="%s">Older</a>' % escape(next_page_url),
)
for log in revision_log_sorted[:per_page]:
revision_url = reverse("browse-revision", url_args={"sha1_git": log["id"]})
assert_contains(resp, log["id"][:7])
assert_contains(resp, log["author"]["name"])
assert_contains(resp, format_utc_iso_date(log["date"]))
assert_contains(resp, escape(log["message"]))
assert_contains(resp, format_utc_iso_date(log["committer_date"]))
assert_contains(resp, revision_url)
if len(revision_log_sorted) <= per_page:
return
resp = client.get(next_page_url)
prev_page_url = reverse(
"browse-revision-log",
url_args={"sha1_git": revision},
query_params={"per_page": per_page},
)
next_page_url = reverse(
"browse-revision-log",
url_args={"sha1_git": revision},
query_params={"offset": 2 * per_page, "per_page": per_page},
)
nb_log_entries = len(revision_log_sorted) - per_page
if nb_log_entries > per_page:
nb_log_entries = per_page
assert resp.status_code == 200
assert_template_used(resp, "browse/revision-log.html")
assert_contains(resp, '<tr class="swh-revision-log-entry', count=nb_log_entries)
assert_contains(
resp, '<a class="page-link" href="%s">Newer</a>' % escape(prev_page_url)
)
if len(revision_log_sorted) > 2 * per_page:
assert_contains(
resp, '<a class="page-link" href="%s">Older</a>' % escape(next_page_url),
)
if len(revision_log_sorted) <= 2 * per_page:
return
resp = client.get(next_page_url)
prev_page_url = reverse(
"browse-revision-log",
url_args={"sha1_git": revision},
query_params={"offset": per_page, "per_page": per_page},
)
next_page_url = reverse(
"browse-revision-log",
url_args={"sha1_git": revision},
query_params={"offset": 3 * per_page, "per_page": per_page},
)
nb_log_entries = len(revision_log_sorted) - 2 * per_page
if nb_log_entries > per_page:
nb_log_entries = per_page
assert resp.status_code == 200
assert_template_used(resp, "browse/revision-log.html")
assert_contains(resp, '<tr class="swh-revision-log-entry', count=nb_log_entries)
assert_contains(
resp, '<a class="page-link" href="%s">Newer</a>' % escape(prev_page_url)
)
if len(revision_log_sorted) > 3 * per_page:
assert_contains(
resp, '<a class="page-link" href="%s">Older</a>' % escape(next_page_url),
)
@given(revision(), unknown_revision(), new_origin())
def test_revision_request_errors(client, revision, unknown_revision, new_origin):
url = reverse("browse-revision", url_args={"sha1_git": unknown_revision})
resp = client.get(url)
assert resp.status_code == 404
assert_template_used(resp, "error.html")
assert_contains(
resp, "Revision with sha1_git %s not found" % unknown_revision, status_code=404
)
url = reverse(
"browse-revision",
url_args={"sha1_git": revision},
query_params={"origin_url": new_origin.url},
)
resp = client.get(url)
assert resp.status_code == 404
assert_template_used(resp, "error.html")
assert_contains(
resp, "the origin mentioned in your request" " appears broken", status_code=404
)
@given(revision())
def test_revision_uppercase(client, revision):
url = reverse(
"browse-revision-uppercase-checksum", url_args={"sha1_git": revision.upper()}
)
resp = client.get(url)
assert resp.status_code == 302
redirect_url = reverse("browse-revision", url_args={"sha1_git": revision})
assert resp["location"] == redirect_url
def _revision_browse_checks(
client, archive_data, revision, origin_url=None, snapshot=None
):
query_params = {}
if origin_url:
query_params["origin_url"] = origin_url
if snapshot:
query_params["snapshot"] = snapshot["id"]
url = reverse(
"browse-revision", url_args={"sha1_git": revision}, query_params=query_params
)
revision_data = archive_data.revision_get(revision)
author_name = revision_data["author"]["name"]
committer_name = revision_data["committer"]["name"]
dir_id = revision_data["directory"]
if origin_url:
snapshot = archive_data.snapshot_get_latest(origin_url)
history_url = reverse(
"browse-origin-log", query_params={"revision": revision, **query_params},
)
elif snapshot:
history_url = reverse(
"browse-snapshot-log",
url_args={"snapshot_id": snapshot["id"]},
query_params={"revision": revision},
)
else:
history_url = reverse("browse-revision-log", url_args={"sha1_git": revision})
resp = client.get(url)
assert resp.status_code == 200
assert_template_used(resp, "browse/revision.html")
assert_contains(resp, author_name)
assert_contains(resp, committer_name)
assert_contains(resp, history_url)
for parent in revision_data["parents"]:
parent_url = reverse(
"browse-revision", url_args={"sha1_git": parent}, query_params=query_params
)
assert_contains(resp, '<a href="%s">%s</a>' % (escape(parent_url), parent[:7]))
author_date = revision_data["date"]
committer_date = revision_data["committer_date"]
message_lines = revision_data["message"].split("\n")
assert_contains(resp, format_utc_iso_date(author_date))
assert_contains(resp, format_utc_iso_date(committer_date))
assert_contains(resp, escape(message_lines[0]))
assert_contains(resp, escape("\n".join(message_lines[1:])))
assert_contains(resp, "vault-cook-directory")
assert_contains(resp, "vault-cook-revision")
swh_rev_id = gen_swhid("revision", revision)
swh_rev_id_url = reverse("browse-swhid", url_args={"swhid": swh_rev_id})
assert_contains(resp, swh_rev_id)
assert_contains(resp, swh_rev_id_url)
swh_dir_id = gen_swhid("directory", dir_id)
swh_dir_id_url = reverse("browse-swhid", url_args={"swhid": swh_dir_id})
assert_contains(resp, swh_dir_id)
assert_contains(resp, swh_dir_id_url)
if origin_url:
assert_contains(resp, "swh-take-new-snapshot")
swh_rev_id = gen_swhid(REVISION, revision)
swh_rev_id_url = reverse("browse-swhid", url_args={"swhid": swh_rev_id})
if origin_url:
browse_origin_url = reverse(
"browse-origin", query_params={"origin_url": origin_url}
)
assert_contains(resp, f'href="{browse_origin_url}"')
elif snapshot:
swh_snp_id = gen_swhid("snapshot", snapshot["id"])
swh_snp_id_url = reverse("browse-swhid", url_args={"swhid": swh_snp_id})
assert_contains(resp, f'href="{swh_snp_id_url}"')
swhid_context = {}
if origin_url:
swhid_context["origin"] = origin_url
if snapshot:
swhid_context["visit"] = gen_swhid(SNAPSHOT, snapshot["id"])
swh_rev_id = gen_swhid(REVISION, revision, metadata=swhid_context)
swh_rev_id_url = reverse("browse-swhid", url_args={"swhid": swh_rev_id})
assert_contains(resp, swh_rev_id)
assert_contains(resp, swh_rev_id_url)
swhid_context["anchor"] = gen_swhid(REVISION, revision)
swhid_context["path"] = "/"
swh_dir_id = gen_swhid(DIRECTORY, dir_id, metadata=swhid_context)
swh_dir_id_url = reverse("browse-swhid", url_args={"swhid": swh_dir_id})
assert_contains(resp, swh_dir_id)
assert_contains(resp, swh_dir_id_url)
diff --git a/swh/web/tests/common/test_origin_visits.py b/swh/web/tests/common/test_origin_visits.py
index ffeb5ddaf..3b4569f58 100644
--- a/swh/web/tests/common/test_origin_visits.py
+++ b/swh/web/tests/common/test_origin_visits.py
@@ -1,238 +1,235 @@
# Copyright (C) 2018-2019 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU Affero General Public License version 3, or any later version
# See top-level LICENSE file for more information
from datetime import timedelta
from hypothesis import given
import pytest
from swh.model.hashutil import hash_to_hex
from swh.model.model import OriginVisit, OriginVisitStatus
from swh.storage.utils import now
from swh.web.common.exc import NotFoundExc
from swh.web.common.origin_visits import get_origin_visits, get_origin_visit
from swh.web.common.typing import OriginInfo
from swh.web.tests.strategies import new_origin, new_snapshots
@given(new_snapshots(3))
def test_get_origin_visits(mocker, snapshots):
mock_service = mocker.patch("swh.web.common.service")
mock_service.MAX_LIMIT = 2
def _lookup_origin_visits(*args, **kwargs):
if kwargs["last_visit"] is None:
return [
{
"visit": 1,
"date": "2017-05-06T00:59:10+00:00",
"status": "full",
"snapshot": hash_to_hex(snapshots[0].id),
"type": "git",
},
{
"visit": 2,
"date": "2017-08-06T00:59:10+00:00",
"status": "full",
"snapshot": hash_to_hex(snapshots[1].id),
"type": "git",
},
]
else:
return [
{
"visit": 3,
"date": "2017-09-06T00:59:10+00:00",
"status": "full",
"snapshot": hash_to_hex(snapshots[2].id),
"type": "git",
}
]
mock_service.lookup_origin_visits.side_effect = _lookup_origin_visits
origin_info = {
"url": "https://github.com/foo/bar",
}
origin_visits = get_origin_visits(origin_info)
assert len(origin_visits) == 3
@given(new_snapshots(5))
def test_get_origin_visit(mocker, snapshots):
mock_origin_visits = mocker.patch("swh.web.common.origin_visits.get_origin_visits")
origin_info = {
"url": "https://github.com/foo/bar",
}
visits = [
{
"status": "full",
"date": "2015-07-09T21:09:24+00:00",
"visit": 1,
"origin": "https://github.com/foo/bar",
"type": "git",
"snapshot": hash_to_hex(snapshots[0].id),
},
{
"status": "full",
"date": "2016-02-23T18:05:23.312045+00:00",
"visit": 2,
"origin": "https://github.com/foo/bar",
"type": "git",
"snapshot": hash_to_hex(snapshots[1].id),
},
{
"status": "full",
"date": "2016-03-28T01:35:06.554111+00:00",
"visit": 3,
"origin": "https://github.com/foo/bar",
"type": "git",
"snapshot": hash_to_hex(snapshots[2].id),
},
{
"status": "full",
"date": "2016-06-18T01:22:24.808485+00:00",
"visit": 4,
"origin": "https://github.com/foo/bar",
"type": "git",
"snapshot": hash_to_hex(snapshots[3].id),
},
{
"status": "full",
"date": "2016-08-14T12:10:00.536702+00:00",
"visit": 5,
"origin": "https://github.com/foo/bar",
"type": "git",
"snapshot": hash_to_hex(snapshots[4].id),
},
]
mock_origin_visits.return_value = visits
visit_id = 12
with pytest.raises(NotFoundExc) as e:
visit = get_origin_visit(origin_info, visit_id=visit_id)
assert e.match("Visit with id %s" % visit_id)
assert e.match("url %s" % origin_info["url"])
visit = get_origin_visit(origin_info, visit_id=2)
assert visit == visits[1]
visit = get_origin_visit(origin_info, visit_ts="2016-02-23T18:05:23.312045+00:00")
assert visit == visits[1]
visit = get_origin_visit(origin_info, visit_ts="2016-02-20")
assert visit == visits[1]
visit = get_origin_visit(origin_info, visit_ts="2016-06-18T01:22")
assert visit == visits[3]
visit = get_origin_visit(origin_info, visit_ts="2016-06-18 01:22")
assert visit == visits[3]
- visit = get_origin_visit(origin_info, visit_ts=1466208000)
- assert visit == visits[3]
-
visit = get_origin_visit(origin_info, visit_ts="2014-01-01")
assert visit == visits[0]
visit = get_origin_visit(origin_info, visit_ts="2018-01-01")
assert visit == visits[-1]
@given(new_origin(), new_snapshots(6))
def test_get_origin_visit_return_first_valid_full_visit(
archive_data, new_origin, new_snapshots
):
visits = []
archive_data.origin_add([new_origin])
# create 6 visits, the first three have full status while the
# last three have partial status and set a null snapshot for
# the last four visits
for i, snp in enumerate(new_snapshots):
visit_date = now() + timedelta(days=i * 10)
visit = archive_data.origin_visit_add(
[OriginVisit(origin=new_origin.url, date=visit_date, type="git",)]
)[0]
archive_data.snapshot_add([new_snapshots[i]])
visit_status = OriginVisitStatus(
origin=new_origin.url,
visit=visit.visit,
date=visit_date + timedelta(minutes=5),
status="full" if i < 3 else "partial",
snapshot=new_snapshots[i].id if i < 2 else None,
)
if i < 2:
archive_data.origin_visit_status_add([visit_status])
visits.append(visit.visit)
# should return the second visit
expected_visit = archive_data.origin_visit_get_by(new_origin.url, visits[1])
assert get_origin_visit((OriginInfo(url=new_origin.url))) == expected_visit
@given(new_origin(), new_snapshots(6))
def test_get_origin_visit_non_resolvable_snapshots(
archive_data, new_origin, new_snapshots
):
visits = []
archive_data.origin_add([new_origin])
# create 6 full visits, the first three have resolvable snapshots
# while the last three have non resolvable snapshots
for i, snp in enumerate(new_snapshots):
visit_date = now() + timedelta(days=i * 10)
visit = archive_data.origin_visit_add(
[OriginVisit(origin=new_origin.url, date=visit_date, type="git",)]
)[0]
archive_data.snapshot_add([new_snapshots[i]])
visit_status = OriginVisitStatus(
origin=new_origin.url,
visit=visit.visit,
date=visit_date + timedelta(minutes=5),
status="full",
snapshot=new_snapshots[i].id,
)
if i < 3:
archive_data.origin_visit_status_add([visit_status])
visits.append(visit.visit)
# should return the third visit
expected_visit = archive_data.origin_visit_get_by(new_origin.url, visits[2])
assert get_origin_visit((OriginInfo(url=new_origin.url))) == expected_visit
@given(new_origin(), new_snapshots(6))
def test_get_origin_visit_return_first_valid_partial_visit(
archive_data, new_origin, new_snapshots
):
visits = []
archive_data.origin_add([new_origin])
# create 6 visits, the first three have full status but null snapshot
# while the last three have partial status with valid snapshot
for i, snp in enumerate(new_snapshots):
visit_date = now() + timedelta(days=i * 10)
visit = archive_data.origin_visit_add(
[OriginVisit(origin=new_origin.url, date=visit_date, type="git",)]
)[0]
archive_data.snapshot_add([new_snapshots[i]])
visit_status = OriginVisitStatus(
origin=new_origin.url,
visit=visit.visit,
date=visit_date + timedelta(minutes=5),
status="full" if i < 3 else "partial",
snapshot=new_snapshots[i].id if i > 2 else None,
)
if i > 2:
archive_data.origin_visit_status_add([visit_status])
visits.append(visit.visit)
# should return the last visit
expected_visit = archive_data.origin_visit_get_by(new_origin.url, visits[-1])
assert get_origin_visit((OriginInfo(url=new_origin.url))) == expected_visit
diff --git a/swh/web/tests/common/test_utils.py b/swh/web/tests/common/test_utils.py
index dd4abd859..2db4bf34d 100644
--- a/swh/web/tests/common/test_utils.py
+++ b/swh/web/tests/common/test_utils.py
@@ -1,129 +1,140 @@
# Copyright (C) 2017-2020 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU Affero General Public License version 3, or any later version
# See top-level LICENSE file for more information
import datetime
+import pytest
+
from swh.web.common import utils
+from swh.web.common.exc import BadInputExc
def test_shorten_path_noop():
noops = ["/api/", "/browse/", "/content/symbol/foobar/"]
for noop in noops:
assert utils.shorten_path(noop) == noop
def test_shorten_path_sha1():
sha1 = "aafb16d69fd30ff58afdd69036a26047f3aebdc6"
short_sha1 = sha1[:8] + "..."
templates = [
"/api/1/content/sha1:%s/",
"/api/1/content/sha1_git:%s/",
"/api/1/directory/%s/",
"/api/1/content/sha1:%s/ctags/",
]
for template in templates:
assert utils.shorten_path(template % sha1) == template % short_sha1
def test_shorten_path_sha256():
sha256 = "aafb16d69fd30ff58afdd69036a26047" "213add102934013a014dfca031c41aef"
short_sha256 = sha256[:8] + "..."
templates = [
"/api/1/content/sha256:%s/",
"/api/1/directory/%s/",
"/api/1/content/sha256:%s/filetype/",
]
for template in templates:
assert utils.shorten_path(template % sha256) == template % short_sha256
-def test_parse_timestamp():
- input_timestamps = [
- None,
- "2016-01-12",
- "2016-01-12T09:19:12+0100",
- "Today is January 1, 2047 at 8:21:00AM",
- "1452591542",
- ]
-
- output_dates = [
- None,
- datetime.datetime(2016, 1, 12, 0, 0),
- datetime.datetime(2016, 1, 12, 8, 19, 12, tzinfo=datetime.timezone.utc),
- datetime.datetime(2047, 1, 1, 8, 21),
- datetime.datetime(2016, 1, 12, 9, 39, 2, tzinfo=datetime.timezone.utc),
- ]
-
- for ts, exp_date in zip(input_timestamps, output_dates):
- assert utils.parse_timestamp(ts) == exp_date
+@pytest.mark.parametrize(
+ "input_timestamp, output_date",
+ [
+ (
+ "2016-01-12",
+ datetime.datetime(2016, 1, 12, 0, 0, tzinfo=datetime.timezone.utc),
+ ),
+ (
+ "2016-01-12T09:19:12+0100",
+ datetime.datetime(2016, 1, 12, 8, 19, 12, tzinfo=datetime.timezone.utc),
+ ),
+ (
+ "2007-01-14T20:34:22Z",
+ datetime.datetime(2007, 1, 14, 20, 34, 22, tzinfo=datetime.timezone.utc),
+ ),
+ ],
+)
+def test_parse_iso8601_date_to_utc_ok(input_timestamp, output_date):
+ assert utils.parse_iso8601_date_to_utc(input_timestamp) == output_date
+
+
+@pytest.mark.parametrize(
+ "invalid_iso8601_timestamp", ["Today is January 1, 2047 at 8:21:00AM", "1452591542"]
+)
+def test_parse_iso8601_date_to_utc_ko(invalid_iso8601_timestamp):
+ with pytest.raises(BadInputExc):
+ utils.parse_iso8601_date_to_utc(invalid_iso8601_timestamp)
def test_format_utc_iso_date():
assert (
utils.format_utc_iso_date("2017-05-04T13:27:13+02:00")
== "04 May 2017, 11:27 UTC"
)
def test_gen_path_info():
input_path = "/home/user/swh-environment/swh-web/"
expected_result = [
{"name": "home", "path": "home"},
{"name": "user", "path": "home/user"},
{"name": "swh-environment", "path": "home/user/swh-environment"},
{"name": "swh-web", "path": "home/user/swh-environment/swh-web"},
]
path_info = utils.gen_path_info(input_path)
assert path_info == expected_result
input_path = "home/user/swh-environment/swh-web"
path_info = utils.gen_path_info(input_path)
assert path_info == expected_result
def test_rst_to_html():
rst = (
"Section\n"
"=======\n\n"
"**Some strong text**\n\n"
"Subsection\n"
"----------\n\n"
"* This is a bulleted list.\n"
"* It has two items, the second\n"
" item uses two lines.\n"
"\n"
"1. This is a numbered list.\n"
"2. It has two items too.\n"
"\n"
"#. This is a numbered list.\n"
"#. It has two items too.\n"
)
expected_html = (
'<div class="swh-rst"><h1 class="title">Section</h1>\n'
"<p><strong>Some strong text</strong></p>\n"
'<div class="section" id="subsection">\n'
"<h2>Subsection</h2>\n"
'<ul class="simple">\n'
"<li><p>This is a bulleted list.</p></li>\n"
"<li><p>It has two items, the second\n"
"item uses two lines.</p></li>\n"
"</ul>\n"
'<ol class="arabic simple">\n'
"<li><p>This is a numbered list.</p></li>\n"
"<li><p>It has two items too.</p></li>\n"
"<li><p>This is a numbered list.</p></li>\n"
"<li><p>It has two items too.</p></li>\n"
"</ol>\n"
"</div>\n"
"</div>"
)
assert utils.rst_to_html(rst) == expected_html
File Metadata
Details
Attached
Mime Type
text/x-diff
Expires
Tue, Jun 3, 7:40 AM (4 d, 5 h ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3393748
Attached To
R65 Staging repository
Event Timeline
Log In to Comment