diff --git a/swh/web/assets/src/bundles/origin/visits-calendar.js b/swh/web/assets/src/bundles/origin/visits-calendar.js
--- a/swh/web/assets/src/bundles/origin/visits-calendar.js
+++ b/swh/web/assets/src/bundles/origin/visits-calendar.js
@@ -110,9 +110,9 @@
let content = '
' + e.date.toDateString() + '
';
content += '';
for (let i = 0; i < visits.length; ++i) {
- let visitTime = visits[i].fmt_date.substr(visits[i].fmt_date.indexOf(',') + 2);
+ let visitTime = visits[i].formatted_date.substr(visits[i].formatted_date.indexOf(',') + 2);
content += '- ' + visitTime + '
';
+ ' visit" href="' + visits[i].url + '">' + visitTime + '';
}
content += '
';
diff --git a/swh/web/assets/src/bundles/origin/visits-reporting.js b/swh/web/assets/src/bundles/origin/visits-reporting.js
--- a/swh/web/assets/src/bundles/origin/visits-reporting.js
+++ b/swh/web/assets/src/bundles/origin/visits-reporting.js
@@ -53,7 +53,7 @@
}
visitsListHtml += '';
visitsListHtml += '
' + visitsByYear[i].fmt_date + '';
+ ' visit" href="' + visitsByYear[i].url + '">' + visitsByYear[i].formatted_date + '';
visitsListHtml += '
';
++visitsCpt;
}
@@ -114,7 +114,7 @@
allVisits.forEach((v, i) => {
// Turn Unix epoch into Javascript Date object
v.date = new Date(Math.floor(v.date * 1000));
- let visitLink = '' + v.fmt_date + '';
+ let visitLink = '' + v.formatted_date + '';
if (v.status === 'full') {
if (!firstFullVisit) {
firstFullVisit = v;
diff --git a/swh/web/browse/snapshot_context.py b/swh/web/browse/snapshot_context.py
--- a/swh/web/browse/snapshot_context.py
+++ b/swh/web/browse/snapshot_context.py
@@ -6,6 +6,8 @@
# Utility module for browsing the archive in a snapshot context.
from collections import defaultdict
+from copy import copy
+from typing import Any, Dict, List, Optional, Union, Tuple
from django.core.cache import cache
@@ -35,6 +37,13 @@
from swh.web.common import service, highlightjs
from swh.web.common.exc import handle_view_exception, NotFoundExc
from swh.web.common.origin_visits import get_origin_visit
+from swh.web.common.typing import (
+ OriginInfo,
+ QueryParameters,
+ SnapshotBranchInfo,
+ SnapshotReleaseInfo,
+ SnapshotContext,
+)
from swh.web.common.utils import (
reverse,
gen_path_info,
@@ -154,26 +163,27 @@
raise NotFoundExc(escape(msg))
-def process_snapshot_branches(snapshot):
+def process_snapshot_branches(
+ snapshot: Dict[str, Any]
+) -> Tuple[List[SnapshotBranchInfo], List[SnapshotReleaseInfo]]:
"""
Process a dictionary describing snapshot branches: extract those
targeting revisions and releases, put them in two different lists,
then sort those lists in lexicographical order of the branches' names.
Args:
- snapshot_branches (dict): A dict describing the branches of a snapshot
- as returned for instance by
+ snapshot: A dict describing a snapshot as returned for instance by
:func:`swh.web.common.service.lookup_snapshot`
Returns:
- tuple: A tuple whose first member is the sorted list of branches
- targeting revisions and second member the sorted list of branches
- targeting releases
+ A tuple whose first member is the sorted list of branches
+ targeting revisions and second member the sorted list of branches
+ targeting releases
"""
snapshot_branches = snapshot["branches"]
- branches = {}
- branch_aliases = {}
- releases = {}
+ branches: Dict[str, SnapshotBranchInfo] = {}
+ branch_aliases: Dict[str, str] = {}
+ releases: Dict[str, SnapshotReleaseInfo] = {}
revision_to_branch = defaultdict(set)
revision_to_release = defaultdict(set)
release_to_branch = defaultdict(set)
@@ -184,10 +194,6 @@
target_id = target["target"]
target_type = target["target_type"]
if target_type == "revision":
- branches[branch_name] = {
- "name": branch_name,
- "revision": target_id,
- }
revision_to_branch[target_id].add(branch_name)
elif target_type == "release":
release_to_branch[target_id].add(branch_name)
@@ -195,32 +201,34 @@
branch_aliases[branch_name] = target_id
# FIXME: handle pointers to other object types
- def _enrich_release_branch(branch, release):
- releases[branch] = {
- "name": release["name"],
- "branch_name": branch,
- "date": format_utc_iso_date(release["date"]),
- "id": release["id"],
- "message": release["message"],
- "target_type": release["target_type"],
- "target": release["target"],
- }
+ def _add_release_info(branch, release):
+ releases[branch] = SnapshotReleaseInfo(
+ name=release["name"],
+ branch_name=branch,
+ date=format_utc_iso_date(release["date"]),
+ directory=None,
+ id=release["id"],
+ message=release["message"],
+ target_type=release["target_type"],
+ target=release["target"],
+ url=None,
+ )
- def _enrich_revision_branch(branch, revision):
- branches[branch].update(
- {
- "revision": revision["id"],
- "directory": revision["directory"],
- "date": format_utc_iso_date(revision["date"]),
- "message": revision["message"],
- }
+ def _add_branch_info(branch, revision):
+ branches[branch] = SnapshotBranchInfo(
+ name=branch,
+ revision=revision["id"],
+ directory=revision["directory"],
+ date=format_utc_iso_date(revision["date"]),
+ message=revision["message"],
+ url=None,
)
releases_info = service.lookup_release_multiple(release_to_branch.keys())
for release in releases_info:
branches_to_update = release_to_branch[release["id"]]
for branch in branches_to_update:
- _enrich_release_branch(branch, release)
+ _add_release_info(branch, release)
if release["target_type"] == "revision":
revision_to_release[release["target"]].update(branches_to_update)
@@ -232,13 +240,13 @@
if not revision:
continue
for branch in revision_to_branch[revision["id"]]:
- _enrich_revision_branch(branch, revision)
+ _add_branch_info(branch, revision)
for release in revision_to_release[revision["id"]]:
releases[release]["directory"] = revision["directory"]
for branch_alias, branch_target in branch_aliases.items():
if branch_target in branches:
- branches[branch_alias] = dict(branches[branch_target])
+ branches[branch_alias] = copy(branches[branch_target])
else:
snp = service.lookup_snapshot(
snapshot["id"], branches_from=branch_target, branches_count=1
@@ -253,10 +261,10 @@
if target_type == "revision":
branches[branch_alias] = snp["branches"][branch_target]
revision = service.lookup_revision(target)
- _enrich_revision_branch(branch_alias, revision)
+ _add_branch_info(branch_alias, revision)
elif target_type == "release":
release = service.lookup_release(target)
- _enrich_release_branch(branch_alias, release)
+ _add_release_info(branch_alias, release)
if branch_alias in branches:
branches[branch_alias]["name"] = branch_alias
@@ -267,7 +275,9 @@
return ret_branches, ret_releases
-def get_snapshot_content(snapshot_id):
+def get_snapshot_content(
+ snapshot_id: str,
+) -> Tuple[List[SnapshotBranchInfo], List[SnapshotReleaseInfo]]:
"""Returns the lists of branches and releases
associated to a swh snapshot.
That list is put in cache in order to speedup the navigation
@@ -277,8 +287,7 @@
will be returned for performance reasons.
Args:
- snapshot_id (str): hexadecimal representation of the snapshot
- identifier
+ snapshot_id: hexadecimal representation of the snapshot identifier
Returns:
A tuple with two members. The first one is a list of dict describing
@@ -294,8 +303,8 @@
if cache_entry:
return cache_entry["branches"], cache_entry["releases"]
- branches = []
- releases = []
+ branches: List[SnapshotBranchInfo] = []
+ releases: List[SnapshotReleaseInfo] = []
snapshot_content_max_size = get_config()["snapshot_content_max_size"]
@@ -311,14 +320,23 @@
def get_origin_visit_snapshot(
- origin_info, visit_ts=None, visit_id=None, snapshot_id=None
-):
- """Returns the lists of branches and releases
- associated to a swh origin for a given visit.
- The visit is expressed by a timestamp. In the latter case,
- the closest visit from the provided timestamp will be used.
+ origin_info: OriginInfo,
+ visit_ts: Optional[Union[int, str]] = None,
+ visit_id: Optional[int] = None,
+ snapshot_id: Optional[str] = None,
+) -> Tuple[List[SnapshotBranchInfo], List[SnapshotReleaseInfo]]:
+ """Returns the lists of branches and releases associated to an origin for
+ a given visit.
+
+ The visit is expressed by either:
+
+ * a snapshot identifier
+ * a timestamp, if no visit with that exact timestamp is found,
+ the closest one from the provided timestamp will be used.
+
If no visit parameter is provided, it returns the list of branches
found for the latest visit.
+
That list is put in cache in order to speedup the navigation
in the swh-web/browse ui.
@@ -326,11 +344,11 @@
will be returned for performance reasons.
Args:
- origin_info (dict): a dict filled with origin information
- (id, url, type)
- visit_ts (int or str): an ISO date string or Unix timestamp to parse
- visit_id (int): optional visit id for disambiguation in case
- several visits have the same timestamp
+ origin_info: a dict filled with origin information
+ visit_ts: an ISO date string or Unix timestamp to parse
+ visit_id: visit id for disambiguation in case several visits have
+ the same timestamp
+ snapshot_id: if provided, visit associated to the snapshot will be processed
Returns:
A tuple with two members. The first one is a list of dict describing
@@ -348,72 +366,67 @@
def get_snapshot_context(
- snapshot_id=None, origin_url=None, timestamp=None, visit_id=None
-):
+ snapshot_id: Optional[str] = None,
+ origin_url: Optional[str] = None,
+ timestamp: Optional[str] = None,
+ visit_id: Optional[int] = None,
+ branch_name: Optional[str] = None,
+ release_name: Optional[str] = None,
+ revision_id: Optional[str] = None,
+ path: Optional[str] = None,
+ browse_context: str = "directory",
+) -> SnapshotContext:
"""
Utility function to compute relevant information when navigating
the archive in a snapshot context. The snapshot is either
referenced by its id or it will be retrieved from an origin visit.
Args:
- snapshot_id (str): hexadecimal representation of a snapshot identifier,
- all other parameters will be ignored if it is provided
- origin_url (str): the origin_url
- (e.g. https://github.com/(user)/(repo)/)
- timestamp (str): a datetime string for retrieving the closest
+ snapshot_id: hexadecimal representation of a snapshot identifier
+ origin_url: an origin_url
+ timestamp: a datetime string for retrieving the closest
visit of the origin
- visit_id (int): optional visit id for disambiguation in case
+ visit_id: optional visit id for disambiguation in case
of several visits with the same timestamp
+ branch_name: optional branch name set when browsing the snapshot in
+ that scope (will default to "HEAD" if not provided)
+ release_name: optional release name set when browsing the snapshot in
+ that scope
+ revision_id: optional revision identifier set when browsing the snapshot in
+ that scope
+ path: optional path of the object currently browsed in the snapshot
+ browse_context: indicates which type of object is currently browsed
Returns:
- A dict with the following entries:
- * origin_info: dict containing origin information
- * visit_info: dict containing visit information
- * branches: the list of branches for the origin found
- during the visit
- * releases: the list of releases for the origin found
- during the visit
- * origin_browse_url: the url to browse the origin
- * origin_branches_url: the url to browse the origin branches
- * origin_releases_url': the url to browse the origin releases
- * origin_visit_url: the url to browse the snapshot of the origin
- found during the visit
- * url_args: dict containing url arguments to use when browsing in
- the context of the origin and its visit
+ A dict filled with snapshot context information.
Raises:
swh.web.common.exc.NotFoundExc: if no snapshot is found for the visit
of an origin.
"""
+ assert origin_url is not None or snapshot_id is not None
origin_info = None
visit_info = None
- url_args = None
- query_params = {}
- branches = []
- releases = []
- browse_url = None
- visit_url = None
- branches_url = None
- releases_url = None
- swh_type = "snapshot"
+ url_args = {}
+ query_params: QueryParameters = {}
+ origin_visits_url = None
if origin_url:
- swh_type = "origin"
origin_info = service.lookup_origin({"url": origin_url})
visit_info = get_origin_visit(origin_info, timestamp, visit_id, snapshot_id)
- fmt_date = format_utc_iso_date(visit_info["date"])
- visit_info["fmt_date"] = fmt_date
+ formatted_date = format_utc_iso_date(visit_info["date"])
+ visit_info["formatted_date"] = formatted_date
snapshot_id = visit_info["snapshot"]
if not snapshot_id:
raise NotFoundExc(
"No snapshot associated to the visit of origin "
- "%s on %s" % (escape(origin_url), fmt_date)
+ "%s on %s" % (escape(origin_url), formatted_date)
)
# provided timestamp is not necessarily equals to the one
# of the retrieved visit, so get the exact one in order
- # use it in the urls generated below
+ # to use it in the urls generated below
if timestamp:
timestamp = visit_info["date"]
@@ -423,12 +436,14 @@
url_args = {"origin_url": origin_info["url"]}
- query_params = {"visit_id": visit_id}
+ query_params = {}
+ if visit_id is not None:
+ query_params["visit_id"] = visit_id
- browse_url = reverse("browse-origin-visits", url_args=url_args)
+ origin_visits_url = reverse("browse-origin-visits", url_args=url_args)
if timestamp:
- url_args["timestamp"] = format_utc_iso_date(timestamp, "%Y-%m-%dT%H:%M:%S")
+ url_args["timestamp"] = format_utc_iso_date(timestamp, "%Y-%m-%dT%H:%M:%SZ")
visit_url = reverse(
"browse-origin-directory", url_args=url_args, query_params=query_params
)
@@ -441,10 +456,10 @@
releases_url = reverse(
"browse-origin-releases", url_args=url_args, query_params=query_params
)
- elif snapshot_id:
+ else:
+ assert snapshot_id is not None
branches, releases = get_snapshot_content(snapshot_id)
url_args = {"snapshot_id": snapshot_id}
- browse_url = reverse("browse-snapshot", url_args=url_args)
branches_url = reverse("browse-snapshot-branches", url_args=url_args)
releases_url = reverse("browse-snapshot-releases", url_args=url_args)
@@ -457,88 +472,38 @@
swh_snp_id = persistent_identifier("snapshot", snapshot_id)
- return {
- "swh_type": swh_type,
- "swh_object_id": swh_snp_id,
- "snapshot_id": snapshot_id,
- "snapshot_sizes": snapshot_sizes,
- "is_empty": is_empty,
- "origin_info": origin_info,
- "visit_info": visit_info,
- "branches": branches,
- "releases": releases,
- "branch": None,
- "release": None,
- "browse_url": browse_url,
- "branches_url": branches_url,
- "releases_url": releases_url,
- "url_args": url_args,
- "query_params": query_params,
- }
-
-
-def _process_snapshot_request(
- request,
- snapshot_id=None,
- origin_url=None,
- timestamp=None,
- path=None,
- browse_context="directory",
-):
- """
- Utility function to perform common input request processing
- for snapshot context views.
- """
-
- visit_id = request.GET.get("visit_id", None)
-
- snapshot_context = get_snapshot_context(
- snapshot_id, origin_url, timestamp, visit_id
- )
-
- swh_type = snapshot_context["swh_type"]
- origin_info = snapshot_context["origin_info"]
- branches = snapshot_context["branches"]
- releases = snapshot_context["releases"]
- url_args = snapshot_context["url_args"]
- query_params = snapshot_context["query_params"]
-
- if snapshot_context["visit_info"]:
- timestamp = format_utc_iso_date(
- snapshot_context["visit_info"]["date"], "%Y-%m-%dT%H:%M:%SZ"
- )
- snapshot_context["timestamp"] = format_utc_iso_date(
- snapshot_context["visit_info"]["date"]
- )
+ if visit_info:
+ timestamp = format_utc_iso_date(visit_info["date"])
- browse_view_name = "browse-" + swh_type + "-" + browse_context
+ if origin_info:
+ browse_view_name = f"browse-origin-{browse_context}"
+ else:
+ browse_view_name = f"browse-snapshot-{browse_context}"
- root_sha1_git = None
- revision_id = request.GET.get("revision", None)
- release_name = request.GET.get("release", None)
release_id = None
- branch_name = None
+ root_directory = None
- snapshot_sizes = snapshot_context["snapshot_sizes"]
snapshot_total_size = sum(snapshot_sizes.values())
- if snapshot_total_size and revision_id:
+ if snapshot_total_size and revision_id is not None:
revision = service.lookup_revision(revision_id)
- root_sha1_git = revision["directory"]
+ root_directory = revision["directory"]
branches.append(
- {
- "name": revision_id,
- "revision": revision_id,
- "directory": root_sha1_git,
- "url": None,
- }
+ SnapshotBranchInfo(
+ name=revision_id,
+ revision=revision_id,
+ directory=root_directory,
+ date=revision["date"],
+ message=revision["message"],
+ url=None,
+ )
)
branch_name = revision_id
query_params["revision"] = revision_id
elif snapshot_total_size and release_name:
- release = _get_release(releases, release_name, snapshot_context["snapshot_id"])
+ release = _get_release(releases, release_name, snapshot_id)
try:
- root_sha1_git = release["directory"]
+ root_directory = release["directory"]
revision_id = release["target"]
release_id = release["id"]
query_params["release"] = release_name
@@ -554,16 +519,13 @@
visit_id,
)
elif snapshot_total_size:
- branch_name = request.GET.get("branch", None)
if branch_name:
query_params["branch"] = branch_name
- branch = _get_branch(
- branches, branch_name or "HEAD", snapshot_context["snapshot_id"]
- )
+ branch = _get_branch(branches, branch_name or "HEAD", snapshot_id)
try:
branch_name = branch["name"]
revision_id = branch["revision"]
- root_sha1_git = branch["directory"]
+ root_directory = branch["directory"]
except Exception as exc:
sentry_sdk.capture_exception(exc)
_branch_not_found(
@@ -579,11 +541,11 @@
for b in branches:
branch_url_args = dict(url_args)
branch_query_params = dict(query_params)
- if "release" in branch_query_params:
- del branch_query_params["release"]
- branch_query_params["branch"] = b["name"]
+ branch_query_params.pop("release", None)
+ if b["name"] != b["revision"]:
+ branch_query_params.pop("revision", None)
+ branch_query_params["branch"] = b["name"]
if path:
- b["path"] = path
branch_url_args["path"] = path
b["url"] = reverse(
browse_view_name, url_args=branch_url_args, query_params=branch_query_params
@@ -592,11 +554,10 @@
for r in releases:
release_url_args = dict(url_args)
release_query_params = dict(query_params)
- if "branch" in release_query_params:
- del release_query_params["branch"]
+ release_query_params.pop("branch", None)
+ release_query_params.pop("revision", None)
release_query_params["release"] = r["name"]
if path:
- r["path"] = path
release_url_args["path"] = path
r["url"] = reverse(
browse_view_name,
@@ -604,14 +565,63 @@
query_params=release_query_params,
)
- snapshot_context["query_params"] = query_params
- snapshot_context["root_sha1_git"] = root_sha1_git
- snapshot_context["revision_id"] = revision_id
- snapshot_context["branch"] = branch_name
- snapshot_context["release"] = release_name
- snapshot_context["release_id"] = release_id
+ return SnapshotContext(
+ branch=branch_name,
+ branches=branches,
+ branches_url=branches_url,
+ is_empty=is_empty,
+ origin_info=origin_info,
+ origin_visits_url=origin_visits_url,
+ release=release_name,
+ release_id=release_id,
+ query_params=query_params,
+ releases=releases,
+ releases_url=releases_url,
+ revision_id=revision_id,
+ root_directory=root_directory,
+ snapshot_id=snapshot_id,
+ snapshot_sizes=snapshot_sizes,
+ snapshot_swhid=swh_snp_id,
+ url_args=url_args,
+ visit_info=visit_info,
+ )
+
- return snapshot_context
+def _build_breadcrumbs(snapshot_context: SnapshotContext, path: str):
+ origin_info = snapshot_context["origin_info"]
+ url_args = snapshot_context["url_args"]
+ query_params = snapshot_context["query_params"]
+ root_directory = snapshot_context["root_directory"]
+
+ path_info = gen_path_info(path)
+
+ if origin_info:
+ browse_view_name = "browse-origin-directory"
+ else:
+ browse_view_name = "browse-snapshot-directory"
+
+ breadcrumbs = []
+ if root_directory:
+ breadcrumbs.append(
+ {
+ "name": root_directory[:7],
+ "url": reverse(
+ browse_view_name, url_args=url_args, query_params=query_params
+ ),
+ }
+ )
+ for pi in path_info:
+ bc_url_args = dict(url_args)
+ bc_url_args["path"] = pi["path"]
+ breadcrumbs.append(
+ {
+ "name": pi["name"],
+ "url": reverse(
+ browse_view_name, url_args=bc_url_args, query_params=query_params
+ ),
+ }
+ )
+ return breadcrumbs
def browse_snapshot_directory(
@@ -622,19 +632,22 @@
"""
try:
- snapshot_context = _process_snapshot_request(
- request,
- snapshot_id,
- origin_url,
- timestamp,
- path,
+ snapshot_context = get_snapshot_context(
+ snapshot_id=snapshot_id,
+ origin_url=origin_url,
+ timestamp=timestamp,
+ visit_id=request.GET.get("visit_id"),
+ path=path,
browse_context="directory",
+ branch_name=request.GET.get("branch"),
+ release_name=request.GET.get("release"),
+ revision_id=request.GET.get("revision"),
)
- root_sha1_git = snapshot_context["root_sha1_git"]
- sha1_git = root_sha1_git
- if root_sha1_git and path:
- dir_info = service.lookup_directory_with_path(root_sha1_git, path)
+ root_directory = snapshot_context["root_directory"]
+ sha1_git = root_directory
+ if root_directory and path:
+ dir_info = service.lookup_directory_with_path(root_directory, path)
sha1_git = dir_info["target"]
dirs = []
@@ -645,7 +658,6 @@
except Exception as exc:
return handle_view_exception(request, exc)
- swh_type = snapshot_context["swh_type"]
origin_info = snapshot_context["origin_info"]
visit_info = snapshot_context["visit_info"]
url_args = snapshot_context["url_args"]
@@ -653,31 +665,12 @@
revision_id = snapshot_context["revision_id"]
snapshot_id = snapshot_context["snapshot_id"]
- path_info = gen_path_info(path)
+ if origin_info:
+ browse_view_name = "browse-origin-directory"
+ else:
+ browse_view_name = "browse-snapshot-directory"
- browse_view_name = "browse-" + swh_type + "-directory"
-
- breadcrumbs = []
- if root_sha1_git:
- breadcrumbs.append(
- {
- "name": root_sha1_git[:7],
- "url": reverse(
- browse_view_name, url_args=url_args, query_params=query_params
- ),
- }
- )
- for pi in path_info:
- bc_url_args = dict(url_args)
- bc_url_args["path"] = pi["path"]
- breadcrumbs.append(
- {
- "name": pi["name"],
- "url": reverse(
- browse_view_name, url_args=bc_url_args, query_params=query_params
- ),
- }
- )
+ breadcrumbs = _build_breadcrumbs(snapshot_context, path)
path = "" if path is None else (path + "/")
@@ -695,7 +688,10 @@
readmes = {}
- browse_view_name = "browse-" + swh_type + "-content"
+ if origin_info:
+ browse_view_name = "browse-origin-content"
+ else:
+ browse_view_name = "browse-snapshot-content"
for f in files:
bc_url_args = dict(url_args)
@@ -711,7 +707,10 @@
readme_name, readme_url, readme_html = get_readme_to_display(readmes)
- browse_view_name = "browse-" + swh_type + "-log"
+ if origin_info:
+ browse_view_name = "browse-origin-log"
+ else:
+ browse_view_name = "browse-snapshot-log"
history_url = None
if snapshot_id != _empty_snapshot_id:
@@ -722,7 +721,7 @@
nb_files = None
nb_dirs = None
dir_path = None
- if root_sha1_git:
+ if root_directory:
nb_files = len(files)
nb_dirs = len(dirs)
sum_file_sizes = filesizeformat(sum_file_sizes)
@@ -800,7 +799,7 @@
"swh_object_metadata": dir_metadata,
"dirs": dirs,
"files": files,
- "breadcrumbs": breadcrumbs if root_sha1_git else [],
+ "breadcrumbs": breadcrumbs if root_directory else [],
"top_right_link": top_right_link,
"readme_name": readme_name,
"readme_url": readme_url,
@@ -826,11 +825,19 @@
"""
try:
- snapshot_context = _process_snapshot_request(
- request, snapshot_id, origin_url, timestamp, path, browse_context="content"
+ snapshot_context = get_snapshot_context(
+ snapshot_id=snapshot_id,
+ origin_url=origin_url,
+ timestamp=timestamp,
+ visit_id=request.GET.get("visit_id"),
+ path=path,
+ browse_context="content",
+ branch_name=request.GET.get("branch"),
+ release_name=request.GET.get("release"),
+ revision_id=request.GET.get("revision"),
)
- root_sha1_git = snapshot_context["root_sha1_git"]
+ root_directory = snapshot_context["root_directory"]
sha1_git = None
query_string = None
content_data = None
@@ -838,24 +845,21 @@
split_path = path.split("/")
filename = split_path[-1]
filepath = path[: -len(filename)]
- if root_sha1_git:
- content_info = service.lookup_directory_with_path(root_sha1_git, path)
+ if root_directory:
+ content_info = service.lookup_directory_with_path(root_directory, path)
sha1_git = content_info["target"]
query_string = "sha1_git:" + sha1_git
content_data = request_content(query_string, raise_if_unavailable=False)
if filepath:
- dir_info = service.lookup_directory_with_path(root_sha1_git, filepath)
+ dir_info = service.lookup_directory_with_path(root_directory, filepath)
directory_id = dir_info["target"]
else:
- directory_id = root_sha1_git
+ directory_id = root_directory
except Exception as exc:
return handle_view_exception(request, exc)
- swh_type = snapshot_context["swh_type"]
- url_args = snapshot_context["url_args"]
- query_params = snapshot_context["query_params"]
revision_id = snapshot_context["revision_id"]
origin_info = snapshot_context["origin_info"]
visit_info = snapshot_context["visit_info"]
@@ -881,31 +885,7 @@
if mimetype and "text/" in mimetype:
available_languages = highlightjs.get_supported_languages()
- browse_view_name = "browse-" + swh_type + "-directory"
-
- breadcrumbs = []
-
- path_info = gen_path_info(filepath)
- if root_sha1_git:
- breadcrumbs.append(
- {
- "name": root_sha1_git[:7],
- "url": reverse(
- browse_view_name, url_args=url_args, query_params=query_params
- ),
- }
- )
- for pi in path_info:
- bc_url_args = dict(url_args)
- bc_url_args["path"] = pi["path"]
- breadcrumbs.append(
- {
- "name": pi["name"],
- "url": reverse(
- browse_view_name, url_args=bc_url_args, query_params=query_params
- ),
- }
- )
+ breadcrumbs = _build_breadcrumbs(snapshot_context, filepath)
breadcrumbs.append({"name": filename, "url": None})
@@ -1010,7 +990,7 @@
"mimetype": mimetype,
"language": language,
"available_languages": available_languages,
- "breadcrumbs": breadcrumbs if root_sha1_git else [],
+ "breadcrumbs": breadcrumbs if root_directory else [],
"top_right_link": top_right_link,
"snapshot_context": snapshot_context,
"vault_cooking": None,
@@ -1034,8 +1014,15 @@
"""
try:
- snapshot_context = _process_snapshot_request(
- request, snapshot_id, origin_url, timestamp, browse_context="log"
+ snapshot_context = get_snapshot_context(
+ snapshot_id=snapshot_id,
+ origin_url=origin_url,
+ timestamp=timestamp,
+ visit_id=request.GET.get("visit_id"),
+ browse_context="log",
+ branch_name=request.GET.get("branch"),
+ release_name=request.GET.get("release"),
+ revision_id=request.GET.get("revision"),
)
revision_id = snapshot_context["revision_id"]
@@ -1072,7 +1059,6 @@
except Exception as exc:
return handle_view_exception(request, exc)
- swh_type = snapshot_context["swh_type"]
origin_info = snapshot_context["origin_info"]
visit_info = snapshot_context["visit_info"]
url_args = snapshot_context["url_args"]
@@ -1083,7 +1069,10 @@
revs_ordering = request.GET.get("revs_ordering", "")
query_params["revs_ordering"] = revs_ordering
- browse_view_name = "browse-" + swh_type + "-log"
+ if origin_info:
+ browse_view_name = "browse-origin-log"
+ else:
+ browse_view_name = "browse-snapshot-log"
prev_log_url = None
if len(rev_log) > offset + per_page:
@@ -1167,20 +1156,26 @@
context.
"""
try:
- snapshot_context = _process_snapshot_request(
- request, snapshot_id, origin_url, timestamp
+
+ snapshot_context = get_snapshot_context(
+ snapshot_id=snapshot_id,
+ origin_url=origin_url,
+ timestamp=timestamp,
+ visit_id=request.GET.get("visit_id"),
)
branches_bc = request.GET.get("branches_breadcrumbs", "")
branches_bc = branches_bc.split(",") if branches_bc else []
branches_from = branches_bc[-1] if branches_bc else ""
- swh_type = snapshot_context["swh_type"]
origin_info = snapshot_context["origin_info"]
url_args = snapshot_context["url_args"]
query_params = snapshot_context["query_params"]
- browse_view_name = "browse-" + swh_type + "-directory"
+ if origin_info:
+ browse_view_name = "browse-origin-directory"
+ else:
+ browse_view_name = "browse-snapshot-directory"
snapshot = service.lookup_snapshot(
snapshot_context["snapshot_id"],
@@ -1215,7 +1210,10 @@
branch["revision_url"] = revision_url
branch["directory_url"] = directory_url
- browse_view_name = "browse-" + swh_type + "-branches"
+ if origin_info:
+ browse_view_name = "browse-origin-branches"
+ else:
+ browse_view_name = "browse-snapshot-branches"
prev_branches_url = None
next_branches_url = None
@@ -1272,15 +1270,17 @@
context.
"""
try:
- snapshot_context = _process_snapshot_request(
- request, snapshot_id, origin_url, timestamp
+ snapshot_context = get_snapshot_context(
+ snapshot_id=snapshot_id,
+ origin_url=origin_url,
+ timestamp=timestamp,
+ visit_id=request.GET.get("visit_id"),
)
rel_bc = request.GET.get("releases_breadcrumbs", "")
rel_bc = rel_bc.split(",") if rel_bc else []
rel_from = rel_bc[-1] if rel_bc else ""
- swh_type = snapshot_context["swh_type"]
origin_info = snapshot_context["origin_info"]
url_args = snapshot_context["url_args"]
query_params = snapshot_context["query_params"]
@@ -1337,7 +1337,10 @@
release["release_url"] = release_url
release["target_url"] = target_url
- browse_view_name = "browse-" + swh_type + "-releases"
+ if origin_info:
+ browse_view_name = "browse-origin-releases"
+ else:
+ browse_view_name = "browse-snapshot-releases"
prev_releases_url = None
next_releases_url = None
diff --git a/swh/web/browse/views/origin.py b/swh/web/browse/views/origin.py
--- a/swh/web/browse/views/origin.py
+++ b/swh/web/browse/views/origin.py
@@ -140,7 +140,7 @@
for i, visit in enumerate(origin_visits):
url_date = format_utc_iso_date(visit["date"], "%Y-%m-%dT%H:%M:%SZ")
- visit["fmt_date"] = format_utc_iso_date(visit["date"])
+ visit["formatted_date"] = format_utc_iso_date(visit["date"])
query_params = {}
if i < len(origin_visits) - 1:
if visit["date"] == origin_visits[i + 1]["date"]:
@@ -151,7 +151,7 @@
snapshot = visit["snapshot"] if visit["snapshot"] else ""
- visit["browse_url"] = reverse(
+ visit["url"] = reverse(
"browse-origin-directory",
url_args={"origin_url": origin_url, "timestamp": url_date},
query_params=query_params,
diff --git a/swh/web/common/converters.py b/swh/web/common/converters.py
--- a/swh/web/common/converters.py
+++ b/swh/web/common/converters.py
@@ -5,9 +5,11 @@
import datetime
import json
+from typing import Dict, Any
-from swh.model import hashutil
from swh.core.utils import decode_with_escape
+from swh.model import hashutil
+from swh.web.common.typing import OriginInfo, OriginVisitInfo
def _group_checksums(data):
@@ -191,7 +193,7 @@
return new_dict
-def from_origin(origin):
+def from_origin(origin: Dict[str, Any]) -> OriginInfo:
"""Convert from a swh origin to an origin dictionary.
"""
@@ -332,7 +334,7 @@
return from_swh(person, bytess={"name", "fullname", "email"})
-def from_origin_visit(visit):
+def from_origin_visit(visit: Dict[str, Any]) -> OriginVisitInfo:
"""Convert swh origin_visit to serializable origin_visit dictionary.
"""
diff --git a/swh/web/common/origin_visits.py b/swh/web/common/origin_visits.py
--- a/swh/web/common/origin_visits.py
+++ b/swh/web/common/origin_visits.py
@@ -4,31 +4,25 @@
# See top-level LICENSE file for more information
import math
+from typing import List, Optional, Union
from django.core.cache import cache
from swh.web.common.exc import NotFoundExc
+from swh.web.common.typing import OriginInfo, OriginVisitInfo
from swh.web.common.utils import parse_timestamp
-def get_origin_visits(origin_info):
+def get_origin_visits(origin_info: OriginInfo) -> List[OriginVisitInfo]:
"""Function that returns the list of visits for a swh origin.
That list is put in cache in order to speedup the navigation
in the swh web browse ui.
Args:
- origin_info (dict): dict describing the origin to fetch visits from
+ origin_info: dict describing the origin to fetch visits from
Returns:
- list: A list of dict describing the origin visits with the
- following keys:
-
- * **date**: UTC visit date in ISO format,
- * **origin**: the origin url
- * **status**: the visit status, either **full**, **partial**
- or **ongoing**
- * **visit**: the visit id
- * **type**: the visit type
+ A list of dict describing the origin visits
Raises:
swh.web.common.exc.NotFoundExc: if the origin is not found
@@ -77,10 +71,6 @@
ts = parse_timestamp(visit["date"]).timestamp()
return ts + (float(visit["visit"]) / 10e3)
- for v in origin_visits:
- if "metadata" in v:
- del v["metadata"]
- origin_visits = [dict(t) for t in set([tuple(d.items()) for d in origin_visits])]
origin_visits = sorted(origin_visits, key=lambda v: _visit_sort_key(v))
cache.set(cache_entry_id, origin_visits)
@@ -88,25 +78,22 @@
return origin_visits
-def get_origin_visit(origin_info, visit_ts=None, visit_id=None, snapshot_id=None):
- """Function that returns information about a visit for
- a given origin.
+def get_origin_visit(
+ origin_info: OriginInfo,
+ visit_ts: Optional[Union[int, str]] = None,
+ visit_id: Optional[int] = None,
+ snapshot_id: Optional[str] = None,
+) -> OriginVisitInfo:
+ """Function that returns information about a visit for a given origin.
The visit is retrieved from a provided timestamp.
The closest visit from that timestamp is selected.
Args:
- origin_info (dict): a dict filled with origin information
- visit_ts (int or str): an ISO date string or Unix timestamp to parse
+ origin_info: a dict filled with origin information
+ visit_ts: an ISO date string or Unix timestamp to parse
Returns:
- A dict containing the visit info as described below::
-
- {'origin': 'https://forge.softwareheritage.org/source/swh-web/',
- 'date': '2017-10-08T11:54:25.582463+00:00',
- 'metadata': {},
- 'visit': 25,
- 'status': 'full'}
-
+ A dict containing the visit info.
"""
visits = get_origin_visits(origin_info)
@@ -116,26 +103,26 @@
)
if snapshot_id:
- visit = [v for v in visits if v["snapshot"] == snapshot_id]
- if len(visit) == 0:
+ visits = [v for v in visits if v["snapshot"] == snapshot_id]
+ if len(visits) == 0:
raise NotFoundExc(
(
"Visit for snapshot with id %s for origin with"
" url %s not found!" % (snapshot_id, origin_info["url"])
)
)
- return visit[0]
+ return visits[0]
if visit_id:
- visit = [v for v in visits if v["visit"] == int(visit_id)]
- if len(visit) == 0:
+ visits = [v for v in visits if v["visit"] == int(visit_id)]
+ if len(visits) == 0:
raise NotFoundExc(
(
"Visit with id %s for origin with"
" url %s not found!" % (visit_id, origin_info["url"])
)
)
- return visit[0]
+ return visits[0]
if not visit_ts:
# returns the latest visit with a valid snapshot when no timestamp is provided
diff --git a/swh/web/common/service.py b/swh/web/common/service.py
--- a/swh/web/common/service.py
+++ b/swh/web/common/service.py
@@ -8,18 +8,20 @@
import re
from collections import defaultdict
-from typing import Any, Dict, List, Set
+from typing import Any, Dict, List, Set, Iterator, Optional, Tuple
from swh.model import hashutil
from swh.storage.algos import diff, revisions_walker
from swh.model.identifiers import CONTENT, DIRECTORY, RELEASE, REVISION, SNAPSHOT
+from swh.web import config
from swh.web.common import converters
from swh.web.common import query
from swh.web.common.exc import BadInputExc, NotFoundExc
from swh.web.common.origin_visits import get_origin_visit
-from swh.web import config
+from swh.web.common.typing import OriginInfo, OriginVisitInfo
+
search = config.search()
storage = config.storage()
@@ -211,7 +213,7 @@
return converters.from_swh({"id": sha1, "facts": lic[sha1]}, hashess={"id"})
-def lookup_origin(origin: Dict[str, str]) -> Dict[str, str]:
+def lookup_origin(origin: OriginInfo) -> OriginInfo:
"""Return information about the origin matching dict origin.
Args:
@@ -242,7 +244,9 @@
return converters.from_origin(origin_info)
-def lookup_origins(origin_from=1, origin_count=100):
+def lookup_origins(
+ origin_from: int = 1, origin_count: int = 100
+) -> Iterator[OriginInfo]:
"""Get list of archived software origins in a paginated way.
Origins are sorted by id before returning them
@@ -258,7 +262,9 @@
return map(converters.from_origin, origins)
-def search_origin(url_pattern, limit=50, with_visit=False, page_token=None):
+def search_origin(
+ url_pattern: str, limit: int = 50, with_visit: bool = False, page_token: Any = None
+) -> Tuple[List[OriginInfo], Any]:
"""Search for origins whose urls contain a provided string pattern
or match a provided regular expression.
@@ -293,8 +299,10 @@
pattern_parts.append(".*".join(permut))
url_pattern = "|".join(pattern_parts)
- origins = storage.origin_search(url_pattern, offset, limit, regexp, with_visit)
- origins = list(map(converters.from_origin, origins))
+ origins_raw = storage.origin_search(
+ url_pattern, offset, limit, regexp, with_visit
+ )
+ origins = list(map(converters.from_origin, origins_raw))
if len(origins) >= limit:
page_token = str(offset + len(origins))
else:
@@ -490,7 +498,7 @@
sha1_git_list: A list of revision sha1_git identifiers
Returns:
- Generator of revisions information as dict.
+ Iterator of revisions information as dict.
Raises:
ValueError if the identifier provided is not of sha1 nature.
@@ -854,7 +862,9 @@
return storage.stat_counters()
-def _lookup_origin_visits(origin_url, last_visit=None, limit=10):
+def _lookup_origin_visits(
+ origin_url: str, last_visit: Optional[int] = None, limit: int = 10
+) -> Iterator[Dict[str, Any]]:
"""Yields the origin origins' visits.
Args:
@@ -874,7 +884,9 @@
yield visit
-def lookup_origin_visits(origin, last_visit=None, per_page=10):
+def lookup_origin_visits(
+ origin: str, last_visit: Optional[int] = None, per_page: int = 10
+) -> Iterator[OriginVisitInfo]:
"""Yields the origin origins' visits.
Args:
@@ -889,7 +901,9 @@
yield converters.from_origin_visit(visit)
-def lookup_origin_visit_latest(origin_url, require_snapshot):
+def lookup_origin_visit_latest(
+ origin_url: str, require_snapshot: bool
+) -> OriginVisitInfo:
"""Return the origin's latest visit
Args:
@@ -906,7 +920,7 @@
return converters.from_origin_visit(visit)
-def lookup_origin_visit(origin_url, visit_id):
+def lookup_origin_visit(origin_url: str, visit_id: int) -> OriginVisitInfo:
"""Return information about visit visit_id with origin origin.
Args:
diff --git a/swh/web/common/typing.py b/swh/web/common/typing.py
--- a/swh/web/common/typing.py
+++ b/swh/web/common/typing.py
@@ -3,8 +3,110 @@
# License: GNU Affero General Public License version 3, or any later version
# See top-level LICENSE file for more information
-from typing import Any, Dict, Union
+from typing import Any, Dict, List, Optional, Union
+from typing_extensions import TypedDict
from django.http import QueryDict
QueryParameters = Union[Dict[str, Any], QueryDict]
+
+
+class OriginInfo(TypedDict):
+ url: str
+ """URL of the origin"""
+
+
+class OriginVisitInfo(TypedDict):
+ date: str
+ """date of the visit in iso format"""
+ formatted_date: str
+ """formatted date of the visit"""
+ metadata: Dict[str, Any]
+ """metadata associated to the visit"""
+ origin: str
+ """visited origin URL"""
+ snapshot: str
+ """snapshot identifier computed during the visit"""
+ status: str
+ """status of the visit ("ongoing", "full" or "partial") """
+ type: str
+ """visit type (git, hg, debian, ...)"""
+ url: str
+ """URL to browse the snapshot"""
+ visit: int
+ """visit identifier"""
+
+
+class SnapshotBranchInfo(TypedDict):
+ date: str
+ """"author date of branch heading revision"""
+ directory: str
+ """directory associated to branch heading revision"""
+ message: str
+ """message of branch heading revision"""
+ name: str
+ """branch name"""
+ revision: str
+ """branch heading revision"""
+ url: Optional[str]
+ """optional browse URL (content, directory, ...) scoped to branch"""
+
+
+class SnapshotReleaseInfo(TypedDict):
+ branch_name: str
+ """branch name associated to release in snapshot"""
+ date: str
+ """release date"""
+ directory: Optional[str]
+ """optional directory associatd to the release"""
+ id: str
+ """release identifier"""
+ message: str
+ """release message"""
+ name: str
+ """release name"""
+ target: str
+ """release target"""
+ target_type: str
+ """release target_type"""
+ url: Optional[str]
+ """optional browse URL (content, directory, ...) scoped to release"""
+
+
+class SnapshotContext(TypedDict):
+ branch: Optional[str]
+ """optional branch name set when browsing snapshot int that scope"""
+ branches: List[SnapshotBranchInfo]
+ """list of snapshot branches (possibly truncated)"""
+ branches_url: str
+ """snapshot branches list browse URL"""
+ is_empty: bool
+ """indicates if the snapshot is empty"""
+ origin_info: Optional[OriginInfo]
+ """optional origin info associated to the snapshot"""
+ origin_visits_url: Optional[str]
+ """optional origin visits URL"""
+ query_params: QueryParameters
+ """common query parameters when browsing snapshot content"""
+ release: Optional[str]
+ """optional release name set when browsing snapshot int that scope"""
+ release_id: Optional[str]
+ """optional release identifier set when browsing snapshot int that scope"""
+ releases: List[SnapshotReleaseInfo]
+ """list of snapshot releases (possibly truncated)"""
+ releases_url: str
+ """snapshot releases list browse URL"""
+ revision_id: Optional[str]
+ """optional revision identifier set when browsing snapshot int that scope"""
+ root_directory: Optional[str]
+ """optional root directory identifier set when browsing snapshot content"""
+ snapshot_id: str
+ """snapshot identifier"""
+ snapshot_sizes: Dict[str, int]
+ """snapshot sizes grouped by branch target type"""
+ snapshot_swhid: str
+ """snapshot SWHID"""
+ url_args: Dict[str, Any]
+ """common URL arguments when browsing snapshot content"""
+ visit_info: Optional[OriginVisitInfo]
+ """optional origin visit info associated to the snapshot"""
diff --git a/swh/web/templates/browse/browse.html b/swh/web/templates/browse/browse.html
--- a/swh/web/templates/browse/browse.html
+++ b/swh/web/templates/browse/browse.html
@@ -28,8 +28,8 @@
{% endif %}
{% else %}
for snapshot
-
- {{ snapshot_context.swh_object_id }}
+
+ {{ snapshot_context.snapshot_swhid }}
{% endif %}
diff --git a/swh/web/templates/includes/snapshot-context.html b/swh/web/templates/includes/snapshot-context.html
--- a/swh/web/templates/includes/snapshot-context.html
+++ b/swh/web/templates/includes/snapshot-context.html
@@ -11,10 +11,10 @@
{% if snapshot_context.origin_info %}
- - Visits
+ - Visits
{% endif %}
{% if snapshot_context.visit_info %}
- - Snapshot date: {{ snapshot_context.visit_info.fmt_date }}
+ - Snapshot date: {{ snapshot_context.visit_info.formatted_date }}
{% endif %}
{% if not snapshot_context.snapshot_sizes.revision %}
- Branches (0)
diff --git a/swh/web/tests/browse/test_snapshot_context.py b/swh/web/tests/browse/test_snapshot_context.py
--- a/swh/web/tests/browse/test_snapshot_context.py
+++ b/swh/web/tests/browse/test_snapshot_context.py
@@ -3,11 +3,24 @@
# License: GNU Affero General Public License version 3, or any later version
# See top-level LICENSE file for more information
+import random
+
from hypothesis import given
-from swh.web.browse.snapshot_context import get_origin_visit_snapshot
-from swh.web.common.utils import format_utc_iso_date
-from swh.web.tests.strategies import origin_with_multiple_visits
+from swh.web.browse.snapshot_context import (
+ get_origin_visit_snapshot,
+ get_snapshot_content,
+ get_snapshot_context,
+)
+from swh.web.common.identifiers import get_swh_persistent_id
+from swh.web.common.origin_visits import get_origin_visit, get_origin_visits
+from swh.web.common.typing import (
+ SnapshotBranchInfo,
+ SnapshotReleaseInfo,
+ SnapshotContext,
+)
+from swh.web.common.utils import format_utc_iso_date, reverse
+from swh.web.tests.strategies import origin_with_multiple_visits, snapshot
@given(origin_with_multiple_visits())
@@ -24,28 +37,30 @@
if branch_data["target_type"] == "revision":
rev_data = archive_data.revision_get(branch_data["target"])
branches.append(
- {
- "name": branch,
- "revision": branch_data["target"],
- "directory": rev_data["directory"],
- "date": format_utc_iso_date(rev_data["date"]),
- "message": rev_data["message"],
- }
+ SnapshotBranchInfo(
+ name=branch,
+ revision=branch_data["target"],
+ directory=rev_data["directory"],
+ date=format_utc_iso_date(rev_data["date"]),
+ message=rev_data["message"],
+ url=None,
+ )
)
elif branch_data["target_type"] == "release":
rel_data = archive_data.release_get(branch_data["target"])
rev_data = archive_data.revision_get(rel_data["target"])
releases.append(
- {
- "name": rel_data["name"],
- "branch_name": branch,
- "date": format_utc_iso_date(rel_data["date"]),
- "id": rel_data["id"],
- "message": rel_data["message"],
- "target_type": rel_data["target_type"],
- "target": rel_data["target"],
- "directory": rev_data["directory"],
- }
+ SnapshotReleaseInfo(
+ name=rel_data["name"],
+ branch_name=branch,
+ date=format_utc_iso_date(rel_data["date"]),
+ id=rel_data["id"],
+ message=rel_data["message"],
+ target_type=rel_data["target_type"],
+ target=rel_data["target"],
+ directory=rev_data["directory"],
+ url=None,
+ )
)
for branch in sorted(snapshot["branches"].keys()):
@@ -63,3 +78,252 @@
)
assert origin_visit_branches == (branches, releases)
+
+
+@given(snapshot())
+def test_get_snapshot_context_no_origin(archive_data, snapshot):
+
+ for browse_context, kwargs in (
+ ("content", {"snapshot_id": snapshot, "path": "/some/path"}),
+ ("directory", {"snapshot_id": snapshot}),
+ ("log", {"snapshot_id": snapshot}),
+ ):
+
+ url_args = {"snapshot_id": snapshot}
+
+ snapshot_context = get_snapshot_context(**kwargs, browse_context=browse_context)
+
+ branches, releases = get_snapshot_content(snapshot)
+ releases = list(reversed(releases))
+ revision_id = None
+ root_directory = None
+ for branch in branches:
+ if branch["name"] == "HEAD":
+ revision_id = branch["revision"]
+ root_directory = branch["directory"]
+ branch["url"] = reverse(
+ f"browse-snapshot-{browse_context}",
+ url_args=kwargs,
+ query_params={"branch": branch["name"]},
+ )
+ for release in releases:
+ release["url"] = reverse(
+ f"browse-snapshot-{browse_context}",
+ url_args=kwargs,
+ query_params={"release": release["name"]},
+ )
+
+ branches_url = reverse("browse-snapshot-branches", url_args=url_args)
+ releases_url = reverse("browse-snapshot-releases", url_args=url_args)
+ is_empty = not branches and not releases
+ snapshot_swhid = get_swh_persistent_id("snapshot", snapshot)
+ snapshot_sizes = {"revision": len(branches), "release": len(releases)}
+
+ expected = SnapshotContext(
+ branch="HEAD",
+ branches=branches,
+ branches_url=branches_url,
+ is_empty=is_empty,
+ origin_info=None,
+ origin_visits_url=None,
+ release=None,
+ release_id=None,
+ query_params={},
+ releases=releases,
+ releases_url=releases_url,
+ revision_id=revision_id,
+ root_directory=root_directory,
+ snapshot_id=snapshot,
+ snapshot_sizes=snapshot_sizes,
+ snapshot_swhid=snapshot_swhid,
+ url_args=url_args,
+ visit_info=None,
+ )
+
+ assert snapshot_context == expected
+
+ _check_branch_release_revision_parameters(
+ archive_data, expected, browse_context, kwargs, branches, releases
+ )
+
+
+@given(origin_with_multiple_visits())
+def test_get_snapshot_context_with_origin(archive_data, origin):
+
+ origin_visits = get_origin_visits(origin)
+
+ timestamp = format_utc_iso_date(origin_visits[0]["date"], "%Y-%m-%dT%H:%M:%SZ")
+ visit_id = origin_visits[1]["visit"]
+
+ for browse_context, kwargs in (
+ ("content", {"origin_url": origin["url"], "path": "/some/path"}),
+ ("directory", {"origin_url": origin["url"]}),
+ ("log", {"origin_url": origin["url"]}),
+ ("directory", {"origin_url": origin["url"], "timestamp": timestamp,},),
+ ("directory", {"origin_url": origin["url"], "visit_id": visit_id,},),
+ ):
+
+ visit_id = kwargs["visit_id"] if "visit_id" in kwargs else None
+ visit_ts = kwargs["timestamp"] if "timestamp" in kwargs else None
+ visit_info = get_origin_visit(
+ {"url": kwargs["origin_url"]}, visit_ts=visit_ts, visit_id=visit_id
+ )
+
+ snapshot = visit_info["snapshot"]
+
+ snapshot_context = get_snapshot_context(**kwargs, browse_context=browse_context)
+
+ url_args = dict(kwargs)
+ url_args.pop("visit_id", None)
+
+ query_params = {}
+ if "visit_id" in kwargs:
+ query_params["visit_id"] = kwargs["visit_id"]
+
+ branches, releases = get_snapshot_content(snapshot)
+ releases = list(reversed(releases))
+ revision_id = None
+ root_directory = None
+ for branch in branches:
+ if branch["name"] == "HEAD":
+ revision_id = branch["revision"]
+ root_directory = branch["directory"]
+ branch["url"] = reverse(
+ f"browse-origin-{browse_context}",
+ url_args=url_args,
+ query_params={"branch": branch["name"], **query_params},
+ )
+ for release in releases:
+ release["url"] = reverse(
+ f"browse-origin-{browse_context}",
+ url_args=url_args,
+ query_params={"release": release["name"], **query_params},
+ )
+
+ url_args.pop("path", None)
+
+ branches_url = reverse(
+ "browse-origin-branches", url_args=url_args, query_params=query_params
+ )
+ releases_url = reverse(
+ "browse-origin-releases", url_args=url_args, query_params=query_params
+ )
+ origin_visits_url = reverse(
+ "browse-origin-visits", url_args={"origin_url": kwargs["origin_url"]}
+ )
+ is_empty = not branches and not releases
+ snapshot_swhid = get_swh_persistent_id("snapshot", snapshot)
+ snapshot_sizes = {"revision": len(branches), "release": len(releases)}
+
+ visit_info["url"] = reverse(
+ "browse-origin-directory", url_args=url_args, query_params=query_params
+ )
+ visit_info["formatted_date"] = format_utc_iso_date(visit_info["date"])
+
+ expected = SnapshotContext(
+ branch="HEAD",
+ branches=branches,
+ branches_url=branches_url,
+ is_empty=is_empty,
+ origin_info={"url": origin["url"]},
+ origin_visits_url=origin_visits_url,
+ release=None,
+ release_id=None,
+ query_params=query_params,
+ releases=releases,
+ releases_url=releases_url,
+ revision_id=revision_id,
+ root_directory=root_directory,
+ snapshot_id=snapshot,
+ snapshot_sizes=snapshot_sizes,
+ snapshot_swhid=snapshot_swhid,
+ url_args=url_args,
+ visit_info=visit_info,
+ )
+
+ assert snapshot_context == expected
+
+ _check_branch_release_revision_parameters(
+ archive_data, expected, browse_context, kwargs, branches, releases
+ )
+
+
+def _check_branch_release_revision_parameters(
+ archive_data, base_expected_context, browse_context, kwargs, branches, releases,
+):
+ branch = random.choice(branches)
+
+ snapshot_context = get_snapshot_context(
+ **kwargs, browse_context=browse_context, branch_name=branch["name"]
+ )
+
+ query_params = {}
+ if "visit_id" in kwargs:
+ query_params["visit_id"] = kwargs["visit_id"]
+
+ expected_branch = dict(base_expected_context)
+ expected_branch["branch"] = branch["name"]
+ expected_branch["revision_id"] = branch["revision"]
+ expected_branch["root_directory"] = branch["directory"]
+ expected_branch["query_params"] = {"branch": branch["name"], **query_params}
+
+ assert snapshot_context == expected_branch
+
+ if releases:
+
+ release = random.choice(releases)
+
+ snapshot_context = get_snapshot_context(
+ **kwargs, browse_context=browse_context, release_name=release["name"]
+ )
+
+ expected_release = dict(base_expected_context)
+ expected_release["branch"] = None
+ expected_release["release"] = release["name"]
+ expected_release["release_id"] = release["id"]
+ if release["target_type"] == "revision":
+ expected_release["revision_id"] = release["target"]
+ expected_release["root_directory"] = release["directory"]
+ expected_release["query_params"] = {"release": release["name"], **query_params}
+
+ assert snapshot_context == expected_release
+
+ revision_log = archive_data.revision_log(branch["revision"])
+ revision = revision_log[-1]
+
+ snapshot_context = get_snapshot_context(
+ **kwargs, browse_context=browse_context, revision_id=revision["id"]
+ )
+
+ if "origin_url" in kwargs:
+ view_name = f"browse-origin-{browse_context}"
+ else:
+ view_name = f"browse-snapshot-{browse_context}"
+
+ kwargs.pop("visit_id", None)
+
+ revision_browse_url = reverse(
+ view_name,
+ url_args=kwargs,
+ query_params={"revision": revision["id"], **query_params},
+ )
+
+ branches.append(
+ SnapshotBranchInfo(
+ name=revision["id"],
+ revision=revision["id"],
+ directory=revision["directory"],
+ date=revision["date"],
+ message=revision["message"],
+ url=revision_browse_url,
+ )
+ )
+
+ expected_revision = dict(base_expected_context)
+ expected_revision["branch"] = revision["id"]
+ expected_revision["branches"] = branches
+ expected_revision["revision_id"] = revision["id"]
+ expected_revision["root_directory"] = revision["directory"]
+ expected_revision["query_params"] = {"revision": revision["id"], **query_params}
+
+ assert snapshot_context == expected_revision
diff --git a/swh/web/tests/browse/views/test_origin.py b/swh/web/tests/browse/views/test_origin.py
--- a/swh/web/tests/browse/views/test_origin.py
+++ b/swh/web/tests/browse/views/test_origin.py
@@ -594,7 +594,7 @@
"release": 0,
}
mock_service.lookup_origin.return_value = origin
- url = reverse("browse-origin-directory", url_args={"origin_url": "bar"})
+ url = reverse("browse-origin-directory", url_args={"origin_url": origin["url"]})
resp = client.get(url)
assert resp.status_code == 200
assert_template_used(resp, "browse/directory.html")
@@ -697,7 +697,7 @@
if timestamp:
url_args["timestamp"] = format_utc_iso_date(
- parse_timestamp(timestamp).isoformat(), "%Y-%m-%dT%H:%M:%S"
+ parse_timestamp(timestamp).isoformat(), "%Y-%m-%dT%H:%M:%SZ"
)
root_dir_url = reverse(
@@ -831,7 +831,7 @@
if timestamp:
url_args["timestamp"] = format_utc_iso_date(
- parse_timestamp(timestamp).isoformat(), "%Y-%m-%dT%H:%M:%S"
+ parse_timestamp(timestamp).isoformat(), "%Y-%m-%dT%H:%M:%SZ"
)
for d in dirs: