' % root_dir_branch_url)
assert_contains(resp, '', count=len(origin_releases))
query_params["branch"] = None
for release in origin_releases:
root_dir_release_url = reverse(
"browse-origin-content",
query_params={"release": release["name"], **query_params},
)
assert_contains(resp, '' % root_dir_release_url)
url = reverse("browse-origin-content", query_params=query_params)
resp = check_html_get_response(
client, url, status_code=200, template_used="browse/content.html"
)
snapshot = archive_data.snapshot_get(origin_visit["snapshot"])
head_rev_id = archive_data.snapshot_get_head(snapshot)
swhid_context = {
"origin": origin_info["url"],
"visit": gen_swhid(SNAPSHOT, snapshot["id"]),
"anchor": gen_swhid(REVISION, head_rev_id),
"path": f"/{content_path}",
}
swh_cnt_id = gen_swhid(CONTENT, content["sha1_git"], metadata=swhid_context)
swh_cnt_id_url = reverse("browse-swhid", url_args={"swhid": swh_cnt_id})
assert_contains(resp, swh_cnt_id)
assert_contains(resp, swh_cnt_id_url)
assert_contains(resp, "swh-take-new-snapshot")
_check_origin_link(resp, origin_info["url"])
def _origin_directory_view_test_helper(
client,
archive_data,
origin_info,
origin_visit,
snapshot_sizes,
origin_branches,
origin_releases,
root_directory_sha1,
directory_entries,
visit_id=None,
timestamp=None,
snapshot_id=None,
path=None,
):
dirs = [e for e in directory_entries if e["type"] in ("dir", "rev")]
files = [e for e in directory_entries if e["type"] == "file"]
if not visit_id and not snapshot_id:
visit_id = origin_visit["visit"]
query_params = {"origin_url": origin_info["url"]}
if timestamp:
query_params["timestamp"] = timestamp
elif visit_id:
query_params["visit_id"] = visit_id
else:
query_params["snapshot"] = snapshot_id
if path:
query_params["path"] = path
url = reverse("browse-origin-directory", query_params=query_params)
resp = check_html_get_response(
client, url, status_code=200, template_used="browse/directory.html"
)
assert_contains(resp, '', count=len(dirs))
assert_contains(resp, ' | ', count=len(files))
if timestamp:
query_params["timestamp"] = format_utc_iso_date(
parse_iso8601_date_to_utc(timestamp).isoformat(), "%Y-%m-%dT%H:%M:%SZ"
)
for d in dirs:
if d["type"] == "rev":
dir_url = reverse("browse-revision", url_args={"sha1_git": d["target"]})
else:
dir_path = d["name"]
if path:
dir_path = "%s/%s" % (path, d["name"])
query_params["path"] = dir_path
dir_url = reverse("browse-origin-directory", query_params=query_params,)
assert_contains(resp, dir_url)
for f in files:
file_path = f["name"]
if path:
file_path = "%s/%s" % (path, f["name"])
query_params["path"] = file_path
file_url = reverse("browse-origin-content", query_params=query_params)
assert_contains(resp, file_url)
if "path" in query_params:
del query_params["path"]
root_dir_branch_url = reverse("browse-origin-directory", query_params=query_params)
nb_bc_paths = 1
if path:
nb_bc_paths = len(path.split("/")) + 1
assert_contains(resp, '', count=nb_bc_paths)
assert_contains(
resp, '%s' % (root_dir_branch_url, root_directory_sha1[:7])
)
origin_branches_url = reverse("browse-origin-branches", query_params=query_params)
assert_contains(resp, f'href="{escape(origin_branches_url)}"')
assert_contains(resp, f"Branches ({snapshot_sizes['revision']})")
origin_releases_url = reverse("browse-origin-releases", query_params=query_params)
nb_releases = len(origin_releases)
if nb_releases > 0:
assert_contains(resp, f'href="{escape(origin_releases_url)}"')
assert_contains(resp, f"Releases ({snapshot_sizes['release']})")
if path:
query_params["path"] = path
assert_contains(resp, '', count=len(origin_branches))
for branch in origin_branches:
query_params["branch"] = branch["name"]
root_dir_branch_url = reverse(
"browse-origin-directory", query_params=query_params
)
assert_contains(resp, '' % root_dir_branch_url)
assert_contains(resp, '', count=len(origin_releases))
query_params["branch"] = None
for release in origin_releases:
query_params["release"] = release["name"]
root_dir_release_url = reverse(
"browse-origin-directory", query_params=query_params
)
assert_contains(resp, 'href="%s"' % root_dir_release_url)
assert_contains(resp, "vault-cook-directory")
assert_contains(resp, "vault-cook-revision")
snapshot = archive_data.snapshot_get(origin_visit["snapshot"])
head_rev_id = archive_data.snapshot_get_head(snapshot)
swhid_context = {
"origin": origin_info["url"],
"visit": gen_swhid(SNAPSHOT, snapshot["id"]),
"anchor": gen_swhid(REVISION, head_rev_id),
"path": f"/{path}" if path else None,
}
swh_dir_id = gen_swhid(
DIRECTORY, directory_entries[0]["dir_id"], metadata=swhid_context
)
swh_dir_id_url = reverse("browse-swhid", url_args={"swhid": swh_dir_id})
assert_contains(resp, swh_dir_id)
assert_contains(resp, swh_dir_id_url)
assert_contains(resp, "swh-take-new-snapshot")
_check_origin_link(resp, origin_info["url"])
def _origin_branches_test_helper(
client, origin_info, origin_snapshot, snapshot_sizes, snapshot_id=None
):
query_params = {"origin_url": origin_info["url"], "snapshot": snapshot_id}
url = reverse("browse-origin-branches", query_params=query_params)
resp = check_html_get_response(
client, url, status_code=200, template_used="browse/branches.html"
)
origin_branches = origin_snapshot[0]
origin_releases = origin_snapshot[1]
origin_branches_url = reverse("browse-origin-branches", query_params=query_params)
assert_contains(resp, f'href="{escape(origin_branches_url)}"')
assert_contains(resp, f"Branches ({snapshot_sizes['revision']})")
origin_releases_url = reverse("browse-origin-releases", query_params=query_params)
nb_releases = len(origin_releases)
if nb_releases > 0:
assert_contains(resp, f'href="{escape(origin_releases_url)}">')
assert_contains(resp, f"Releases ({snapshot_sizes['release']})")
assert_contains(resp, '' % escape(browse_branch_url))
browse_revision_url = reverse(
"browse-revision",
url_args={"sha1_git": branch["revision"]},
query_params=query_params,
)
assert_contains(resp, '' % escape(browse_revision_url))
_check_origin_link(resp, origin_info["url"])
def _origin_releases_test_helper(
client, origin_info, origin_snapshot, snapshot_sizes, snapshot_id=None
):
query_params = {"origin_url": origin_info["url"], "snapshot": snapshot_id}
url = reverse("browse-origin-releases", query_params=query_params)
resp = check_html_get_response(
client, url, status_code=200, template_used="browse/releases.html"
)
origin_releases = origin_snapshot[1]
origin_branches_url = reverse("browse-origin-branches", query_params=query_params)
assert_contains(resp, f'href="{escape(origin_branches_url)}"')
assert_contains(resp, f"Branches ({snapshot_sizes['revision']})")
origin_releases_url = reverse("browse-origin-releases", query_params=query_params)
nb_releases = len(origin_releases)
if nb_releases > 0:
assert_contains(resp, f'href="{escape(origin_releases_url)}"')
assert_contains(resp, f"Releases ({snapshot_sizes['release']}")
assert_contains(resp, '' % escape(browse_release_url))
assert_contains(resp, '' % escape(browse_revision_url))
_check_origin_link(resp, origin_info["url"])
@given(
new_origin(), visit_dates(), revisions(min_size=10, max_size=10), existing_release()
)
def test_origin_branches_pagination_with_alias(
client, archive_data, mocker, new_origin, visit_dates, revisions, existing_release
):
"""
When a snapshot contains a branch or a release alias, pagination links
in the branches / releases view should be displayed.
"""
mocker.patch("swh.web.browse.snapshot_context.PER_PAGE", len(revisions) / 2)
snp_dict = {"branches": {}, "id": hash_to_bytes(random_sha1())}
for i in range(len(revisions)):
branch = "".join(random.choices(string.ascii_lowercase, k=8))
snp_dict["branches"][branch.encode()] = {
"target_type": "revision",
"target": hash_to_bytes(revisions[i]),
}
release = "".join(random.choices(string.ascii_lowercase, k=8))
snp_dict["branches"][b"RELEASE_ALIAS"] = {
"target_type": "alias",
"target": release.encode(),
}
snp_dict["branches"][release.encode()] = {
"target_type": "release",
"target": hash_to_bytes(existing_release),
}
archive_data.origin_add([new_origin])
archive_data.snapshot_add([Snapshot.from_dict(snp_dict)])
visit = archive_data.origin_visit_add(
[OriginVisit(origin=new_origin.url, date=visit_dates[0], type="git",)]
)[0]
visit_status = OriginVisitStatus(
origin=new_origin.url,
visit=visit.visit,
date=now(),
status="full",
snapshot=snp_dict["id"],
)
archive_data.origin_visit_status_add([visit_status])
url = reverse("browse-origin-branches", query_params={"origin_url": new_origin.url})
resp = check_html_get_response(
client, url, status_code=200, template_used="browse/branches.html"
)
assert_contains(resp, '')
if len(revision_log_sorted) > per_page:
assert_contains(
resp, 'Older' % escape(next_page_url),
)
for log in revision_log_sorted[:per_page]:
revision_url = reverse("browse-revision", url_args={"sha1_git": log["id"]})
assert_contains(resp, log["id"][:7])
assert_contains(resp, log["author"]["name"])
assert_contains(resp, format_utc_iso_date(log["date"]))
assert_contains(resp, escape(log["message"]))
assert_contains(resp, format_utc_iso_date(log["committer_date"]))
assert_contains(resp, revision_url)
if len(revision_log_sorted) <= per_page:
return
resp = check_html_get_response(
client, next_page_url, status_code=200, template_used="browse/revision-log.html"
)
prev_page_url = reverse(
"browse-revision-log",
url_args={"sha1_git": revision},
query_params={"offset": 0, "per_page": per_page},
)
next_page_url = reverse(
"browse-revision-log",
url_args={"sha1_git": revision},
query_params={"offset": 2 * per_page, "per_page": per_page},
)
nb_log_entries = len(revision_log_sorted) - per_page
if nb_log_entries > per_page:
nb_log_entries = per_page
assert_contains(resp, ' Newer' % escape(prev_page_url)
)
if len(revision_log_sorted) > 2 * per_page:
assert_contains(
resp, 'Older' % escape(next_page_url),
)
if len(revision_log_sorted) <= 2 * per_page:
return
resp = check_html_get_response(
client, next_page_url, status_code=200, template_used="browse/revision-log.html"
)
prev_page_url = reverse(
"browse-revision-log",
url_args={"sha1_git": revision},
query_params={"offset": per_page, "per_page": per_page},
)
next_page_url = reverse(
"browse-revision-log",
url_args={"sha1_git": revision},
query_params={"offset": 3 * per_page, "per_page": per_page},
)
nb_log_entries = len(revision_log_sorted) - 2 * per_page
if nb_log_entries > per_page:
nb_log_entries = per_page
assert_contains(resp, ' Newer' % escape(prev_page_url)
)
if len(revision_log_sorted) > 3 * per_page:
assert_contains(
resp, 'Older' % escape(next_page_url),
)
@given(revision(), unknown_revision(), new_origin())
def test_revision_request_errors(client, revision, unknown_revision, new_origin):
url = reverse("browse-revision", url_args={"sha1_git": unknown_revision})
resp = check_html_get_response(
client, url, status_code=404, template_used="error.html"
)
assert_contains(
resp, "Revision with sha1_git %s not found" % unknown_revision, status_code=404
)
url = reverse(
"browse-revision",
url_args={"sha1_git": revision},
query_params={"origin_url": new_origin.url},
)
resp = check_html_get_response(
client, url, status_code=404, template_used="error.html"
)
assert_contains(
resp, "the origin mentioned in your request" " appears broken", status_code=404
)
@given(revision())
def test_revision_uppercase(client, revision):
url = reverse(
"browse-revision-uppercase-checksum", url_args={"sha1_git": revision.upper()}
)
resp = check_html_get_response(client, url, status_code=302)
redirect_url = reverse("browse-revision", url_args={"sha1_git": revision})
assert resp["location"] == redirect_url
def _revision_browse_checks(
client, archive_data, revision, origin_url=None, snapshot=None
):
query_params = {}
if origin_url:
query_params["origin_url"] = origin_url
if snapshot:
query_params["snapshot"] = snapshot["id"]
url = reverse(
"browse-revision", url_args={"sha1_git": revision}, query_params=query_params
)
revision_data = archive_data.revision_get(revision)
author_name = revision_data["author"]["name"]
committer_name = revision_data["committer"]["name"]
dir_id = revision_data["directory"]
if origin_url:
snapshot = archive_data.snapshot_get_latest(origin_url)
history_url = reverse(
"browse-origin-log", query_params={"revision": revision, **query_params},
)
elif snapshot:
history_url = reverse(
"browse-snapshot-log",
url_args={"snapshot_id": snapshot["id"]},
query_params={"revision": revision},
)
else:
history_url = reverse("browse-revision-log", url_args={"sha1_git": revision})
resp = check_html_get_response(
client, url, status_code=200, template_used="browse/revision.html"
)
assert_contains(resp, author_name)
assert_contains(resp, committer_name)
assert_contains(resp, history_url)
for parent in revision_data["parents"]:
parent_url = reverse(
"browse-revision", url_args={"sha1_git": parent}, query_params=query_params
)
assert_contains(resp, '%s' % (escape(parent_url), parent[:7]))
author_date = revision_data["date"]
committer_date = revision_data["committer_date"]
message_lines = revision_data["message"].split("\n")
assert_contains(resp, format_utc_iso_date(author_date))
assert_contains(resp, format_utc_iso_date(committer_date))
assert_contains(resp, escape(message_lines[0]))
assert_contains(resp, escape("\n".join(message_lines[1:])))
assert_contains(resp, "vault-cook-directory")
assert_contains(resp, "vault-cook-revision")
swh_rev_id = gen_swhid("revision", revision)
swh_rev_id_url = reverse("browse-swhid", url_args={"swhid": swh_rev_id})
assert_contains(resp, swh_rev_id)
assert_contains(resp, swh_rev_id_url)
swh_dir_id = gen_swhid("directory", dir_id)
swh_dir_id_url = reverse("browse-swhid", url_args={"swhid": swh_dir_id})
assert_contains(resp, swh_dir_id)
assert_contains(resp, swh_dir_id_url)
if origin_url:
assert_contains(resp, "swh-take-new-snapshot")
swh_rev_id = gen_swhid(REVISION, revision)
swh_rev_id_url = reverse("browse-swhid", url_args={"swhid": swh_rev_id})
if origin_url:
browse_origin_url = reverse(
"browse-origin", query_params={"origin_url": origin_url}
)
assert_contains(resp, f'href="{browse_origin_url}"')
elif snapshot:
swh_snp_id = gen_swhid("snapshot", snapshot["id"])
swh_snp_id_url = reverse("browse-swhid", url_args={"swhid": swh_snp_id})
assert_contains(resp, f'href="{swh_snp_id_url}"')
swhid_context = {}
if origin_url:
swhid_context["origin"] = origin_url
if snapshot:
swhid_context["visit"] = gen_swhid(SNAPSHOT, snapshot["id"])
swh_rev_id = gen_swhid(REVISION, revision, metadata=swhid_context)
swh_rev_id_url = reverse("browse-swhid", url_args={"swhid": swh_rev_id})
assert_contains(resp, swh_rev_id)
assert_contains(resp, swh_rev_id_url)
swhid_context["anchor"] = gen_swhid(REVISION, revision)
swh_dir_id = gen_swhid(DIRECTORY, dir_id, metadata=swhid_context)
swh_dir_id_url = reverse("browse-swhid", url_args={"swhid": swh_dir_id})
assert_contains(resp, swh_dir_id)
assert_contains(resp, swh_dir_id_url)
+
+
+@given(revision())
+def test_revision_invalid_path(client, archive_data, revision):
+ path = "foo/bar"
+ url = reverse(
+ "browse-revision", url_args={"sha1_git": revision}, query_params={"path": path}
+ )
+
+ resp = check_html_get_response(
+ client, url, status_code=404, template_used="browse/revision.html"
+ )
+
+ directory = archive_data.revision_get(revision)["directory"]
+ error_message = (
+ f"Directory entry with path {path} from root directory {directory} not found"
+ )
+ assert_contains(resp, error_message, status_code=404)
diff --git a/swh/web/tests/common/test_archive.py b/swh/web/tests/common/test_archive.py
index 1755cb8a..f9ff241e 100644
--- a/swh/web/tests/common/test_archive.py
+++ b/swh/web/tests/common/test_archive.py
@@ -1,1040 +1,1042 @@
# Copyright (C) 2015-2020 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU Affero General Public License version 3, or any later version
# See top-level LICENSE file for more information
from collections import defaultdict
import hashlib
import itertools
import random
from hypothesis import given
import pytest
from swh.model.from_disk import DentryPerms
from swh.model.hashutil import hash_to_bytes, hash_to_hex
from swh.model.identifiers import CONTENT, DIRECTORY, RELEASE, REVISION, SNAPSHOT
from swh.model.model import Directory, DirectoryEntry, Origin, OriginVisit, Revision
from swh.web.common import archive
from swh.web.common.exc import BadInputExc, NotFoundExc
from swh.web.common.typing import OriginInfo
from swh.web.tests.conftest import ctags_json_missing, fossology_missing
from swh.web.tests.data import random_content, random_sha1
from swh.web.tests.strategies import (
ancestor_revisions,
content,
contents,
contents_with_ctags,
directory,
empty_content,
empty_directory,
invalid_sha1,
new_origin,
new_revision,
non_ancestor_revisions,
origin,
release,
releases,
revision,
revision_with_submodules,
revisions,
sha256,
snapshot,
unknown_content,
unknown_contents,
unknown_directory,
unknown_release,
unknown_revision,
unknown_snapshot,
visit_dates,
)
@given(contents())
def test_lookup_multiple_hashes_all_present(contents):
input_data = []
expected_output = []
for cnt in contents:
input_data.append({"sha1": cnt["sha1"]})
expected_output.append({"sha1": cnt["sha1"], "found": True})
assert archive.lookup_multiple_hashes(input_data) == expected_output
@given(contents(), unknown_contents())
def test_lookup_multiple_hashes_some_missing(contents, unknown_contents):
input_contents = list(itertools.chain(contents, unknown_contents))
random.shuffle(input_contents)
input_data = []
expected_output = []
for cnt in input_contents:
input_data.append({"sha1": cnt["sha1"]})
expected_output.append({"sha1": cnt["sha1"], "found": cnt in contents})
assert archive.lookup_multiple_hashes(input_data) == expected_output
def test_lookup_hash_does_not_exist():
unknown_content_ = random_content()
actual_lookup = archive.lookup_hash("sha1_git:%s" % unknown_content_["sha1_git"])
assert actual_lookup == {"found": None, "algo": "sha1_git"}
@given(content())
def test_lookup_hash_exist(archive_data, content):
actual_lookup = archive.lookup_hash("sha1:%s" % content["sha1"])
content_metadata = archive_data.content_get(content["sha1"])
assert {"found": content_metadata, "algo": "sha1"} == actual_lookup
def test_search_hash_does_not_exist():
unknown_content_ = random_content()
actual_lookup = archive.search_hash("sha1_git:%s" % unknown_content_["sha1_git"])
assert {"found": False} == actual_lookup
@given(content())
def test_search_hash_exist(content):
actual_lookup = archive.search_hash("sha1:%s" % content["sha1"])
assert {"found": True} == actual_lookup
@pytest.mark.skipif(
ctags_json_missing, reason="requires ctags with json output support"
)
@given(contents_with_ctags())
def test_lookup_content_ctags(indexer_data, contents_with_ctags):
content_sha1 = random.choice(contents_with_ctags["sha1s"])
indexer_data.content_add_ctags(content_sha1)
actual_ctags = list(archive.lookup_content_ctags("sha1:%s" % content_sha1))
expected_data = list(indexer_data.content_get_ctags(content_sha1))
for ctag in expected_data:
ctag["id"] = content_sha1
assert actual_ctags == expected_data
def test_lookup_content_ctags_no_hash():
unknown_content_ = random_content()
actual_ctags = list(
archive.lookup_content_ctags("sha1:%s" % unknown_content_["sha1"])
)
assert actual_ctags == []
@given(content())
def test_lookup_content_filetype(indexer_data, content):
indexer_data.content_add_mimetype(content["sha1"])
actual_filetype = archive.lookup_content_filetype(content["sha1"])
expected_filetype = indexer_data.content_get_mimetype(content["sha1"])
assert actual_filetype == expected_filetype
@given(contents_with_ctags())
def test_lookup_expression(indexer_data, contents_with_ctags):
per_page = 10
expected_ctags = []
for content_sha1 in contents_with_ctags["sha1s"]:
if len(expected_ctags) == per_page:
break
indexer_data.content_add_ctags(content_sha1)
for ctag in indexer_data.content_get_ctags(content_sha1):
if len(expected_ctags) == per_page:
break
if ctag["name"] == contents_with_ctags["symbol_name"]:
del ctag["id"]
ctag["sha1"] = content_sha1
expected_ctags.append(ctag)
actual_ctags = list(
archive.lookup_expression(
contents_with_ctags["symbol_name"], last_sha1=None, per_page=10
)
)
assert actual_ctags == expected_ctags
def test_lookup_expression_no_result():
expected_ctags = []
actual_ctags = list(
archive.lookup_expression("barfoo", last_sha1=None, per_page=10)
)
assert actual_ctags == expected_ctags
@pytest.mark.skipif(fossology_missing, reason="requires fossology-nomossa installed")
@given(content())
def test_lookup_content_license(indexer_data, content):
indexer_data.content_add_license(content["sha1"])
actual_license = archive.lookup_content_license(content["sha1"])
expected_license = indexer_data.content_get_license(content["sha1"])
assert actual_license == expected_license
def test_stat_counters(archive_data):
actual_stats = archive.stat_counters()
assert actual_stats == archive_data.stat_counters()
@given(new_origin(), visit_dates())
def test_lookup_origin_visits(archive_data, new_origin, visit_dates):
archive_data.origin_add([new_origin])
archive_data.origin_visit_add(
[OriginVisit(origin=new_origin.url, date=ts, type="git",) for ts in visit_dates]
)
actual_origin_visits = list(
archive.lookup_origin_visits(new_origin.url, per_page=100)
)
expected_visits = archive_data.origin_visit_get(new_origin.url)
for expected_visit in expected_visits:
expected_visit["origin"] = new_origin.url
assert actual_origin_visits == expected_visits
@given(new_origin(), visit_dates())
def test_lookup_origin_visit(archive_data, new_origin, visit_dates):
archive_data.origin_add([new_origin])
visits = archive_data.origin_visit_add(
[OriginVisit(origin=new_origin.url, date=ts, type="git",) for ts in visit_dates]
)
visit = random.choice(visits).visit
actual_origin_visit = archive.lookup_origin_visit(new_origin.url, visit)
expected_visit = dict(archive_data.origin_visit_get_by(new_origin.url, visit))
assert actual_origin_visit == expected_visit
@given(new_origin())
def test_lookup_origin(archive_data, new_origin):
archive_data.origin_add([new_origin])
actual_origin = archive.lookup_origin({"url": new_origin.url})
expected_origin = archive_data.origin_get([new_origin.url])[0]
assert actual_origin == expected_origin
@given(invalid_sha1())
def test_lookup_release_ko_id_checksum_not_a_sha1(invalid_sha1):
with pytest.raises(BadInputExc) as e:
archive.lookup_release(invalid_sha1)
assert e.match("Invalid checksum")
@given(sha256())
def test_lookup_release_ko_id_checksum_too_long(sha256):
with pytest.raises(BadInputExc) as e:
archive.lookup_release(sha256)
assert e.match("Only sha1_git is supported.")
@given(releases())
def test_lookup_release_multiple(archive_data, releases):
actual_releases = list(archive.lookup_release_multiple(releases))
expected_releases = []
for release_id in releases:
release_info = archive_data.release_get(release_id)
expected_releases.append(release_info)
assert actual_releases == expected_releases
def test_lookup_release_multiple_none_found():
unknown_releases_ = [random_sha1(), random_sha1(), random_sha1()]
actual_releases = list(archive.lookup_release_multiple(unknown_releases_))
assert actual_releases == [None] * len(unknown_releases_)
@given(directory())
def test_lookup_directory_with_path_not_found(directory):
path = "some/invalid/path/here"
with pytest.raises(NotFoundExc) as e:
archive.lookup_directory_with_path(directory, path)
- assert e.match("Directory entry with path %s from %s not found" % (path, directory))
+ assert e.match(
+ f"Directory entry with path {path} from root directory {directory} not found"
+ )
@given(directory())
def test_lookup_directory_with_path_found(archive_data, directory):
directory_content = archive_data.directory_ls(directory)
directory_entry = random.choice(directory_content)
path = directory_entry["name"]
actual_result = archive.lookup_directory_with_path(directory, path)
assert actual_result == directory_entry
@given(release())
def test_lookup_release(archive_data, release):
actual_release = archive.lookup_release(release)
assert actual_release == archive_data.release_get(release)
@given(revision(), invalid_sha1(), sha256())
def test_lookup_revision_with_context_ko_not_a_sha1(revision, invalid_sha1, sha256):
sha1_git_root = revision
sha1_git = invalid_sha1
with pytest.raises(BadInputExc) as e:
archive.lookup_revision_with_context(sha1_git_root, sha1_git)
assert e.match("Invalid checksum query string")
sha1_git = sha256
with pytest.raises(BadInputExc) as e:
archive.lookup_revision_with_context(sha1_git_root, sha1_git)
assert e.match("Only sha1_git is supported")
@given(revision(), unknown_revision())
def test_lookup_revision_with_context_ko_sha1_git_does_not_exist(
revision, unknown_revision
):
sha1_git_root = revision
sha1_git = unknown_revision
with pytest.raises(NotFoundExc) as e:
archive.lookup_revision_with_context(sha1_git_root, sha1_git)
assert e.match("Revision %s not found" % sha1_git)
@given(revision(), unknown_revision())
def test_lookup_revision_with_context_ko_root_sha1_git_does_not_exist(
revision, unknown_revision
):
sha1_git_root = unknown_revision
sha1_git = revision
with pytest.raises(NotFoundExc) as e:
archive.lookup_revision_with_context(sha1_git_root, sha1_git)
assert e.match("Revision root %s not found" % sha1_git_root)
@given(ancestor_revisions())
def test_lookup_revision_with_context(archive_data, ancestor_revisions):
sha1_git = ancestor_revisions["sha1_git"]
root_sha1_git = ancestor_revisions["sha1_git_root"]
for sha1_git_root in (root_sha1_git, {"id": hash_to_bytes(root_sha1_git)}):
actual_revision = archive.lookup_revision_with_context(sha1_git_root, sha1_git)
children = []
for rev in archive_data.revision_log(root_sha1_git):
for p_rev in rev["parents"]:
p_rev_hex = hash_to_hex(p_rev)
if p_rev_hex == sha1_git:
children.append(rev["id"])
expected_revision = archive_data.revision_get(sha1_git)
expected_revision["children"] = children
assert actual_revision == expected_revision
@given(non_ancestor_revisions())
def test_lookup_revision_with_context_ko(non_ancestor_revisions):
sha1_git = non_ancestor_revisions["sha1_git"]
root_sha1_git = non_ancestor_revisions["sha1_git_root"]
with pytest.raises(NotFoundExc) as e:
archive.lookup_revision_with_context(root_sha1_git, sha1_git)
assert e.match("Revision %s is not an ancestor of %s" % (sha1_git, root_sha1_git))
def test_lookup_directory_with_revision_not_found():
unknown_revision_ = random_sha1()
with pytest.raises(NotFoundExc) as e:
archive.lookup_directory_with_revision(unknown_revision_)
assert e.match("Revision %s not found" % unknown_revision_)
@given(new_revision())
def test_lookup_directory_with_revision_unknown_content(archive_data, new_revision):
unknown_content_ = random_content()
dir_path = "README.md"
# A directory that points to unknown content
dir = Directory(
entries=(
DirectoryEntry(
name=bytes(dir_path.encode("utf-8")),
type="file",
target=hash_to_bytes(unknown_content_["sha1_git"]),
perms=DentryPerms.content,
),
)
)
# Create a revision that points to a directory
# Which points to unknown content
new_revision = new_revision.to_dict()
new_revision["directory"] = dir.id
del new_revision["id"]
new_revision = Revision.from_dict(new_revision)
# Add the directory and revision in mem
archive_data.directory_add([dir])
archive_data.revision_add([new_revision])
new_revision_id = hash_to_hex(new_revision.id)
with pytest.raises(NotFoundExc) as e:
archive.lookup_directory_with_revision(new_revision_id, dir_path)
assert e.match("Content not found for revision %s" % new_revision_id)
@given(revision())
def test_lookup_directory_with_revision_ko_path_to_nowhere(revision):
invalid_path = "path/to/something/unknown"
with pytest.raises(NotFoundExc) as e:
archive.lookup_directory_with_revision(revision, invalid_path)
assert e.match("Directory or File")
assert e.match(invalid_path)
assert e.match("revision %s" % revision)
assert e.match("not found")
@given(revision_with_submodules())
def test_lookup_directory_with_revision_submodules(
archive_data, revision_with_submodules
):
rev_sha1_git = revision_with_submodules["rev_sha1_git"]
rev_dir_path = revision_with_submodules["rev_dir_rev_path"]
actual_data = archive.lookup_directory_with_revision(rev_sha1_git, rev_dir_path)
revision = archive_data.revision_get(revision_with_submodules["rev_sha1_git"])
directory = archive_data.directory_ls(revision["directory"])
rev_entry = next(e for e in directory if e["name"] == rev_dir_path)
expected_data = {
"content": archive_data.revision_get(rev_entry["target"]),
"path": rev_dir_path,
"revision": rev_sha1_git,
"type": "rev",
}
assert actual_data == expected_data
@given(revision())
def test_lookup_directory_with_revision_without_path(archive_data, revision):
actual_directory_entries = archive.lookup_directory_with_revision(revision)
revision_data = archive_data.revision_get(revision)
expected_directory_entries = archive_data.directory_ls(revision_data["directory"])
assert actual_directory_entries["type"] == "dir"
assert actual_directory_entries["content"] == expected_directory_entries
@given(revision())
def test_lookup_directory_with_revision_with_path(archive_data, revision):
rev_data = archive_data.revision_get(revision)
dir_entries = [
e
for e in archive_data.directory_ls(rev_data["directory"])
if e["type"] in ("file", "dir")
]
expected_dir_entry = random.choice(dir_entries)
actual_dir_entry = archive.lookup_directory_with_revision(
revision, expected_dir_entry["name"]
)
assert actual_dir_entry["type"] == expected_dir_entry["type"]
assert actual_dir_entry["revision"] == revision
assert actual_dir_entry["path"] == expected_dir_entry["name"]
if actual_dir_entry["type"] == "file":
del actual_dir_entry["content"]["checksums"]["blake2s256"]
for key in ("checksums", "status", "length"):
assert actual_dir_entry["content"][key] == expected_dir_entry[key]
else:
sub_dir_entries = archive_data.directory_ls(expected_dir_entry["target"])
assert actual_dir_entry["content"] == sub_dir_entries
@given(revision())
def test_lookup_directory_with_revision_with_path_to_file_and_data(
archive_data, revision
):
rev_data = archive_data.revision_get(revision)
dir_entries = [
e
for e in archive_data.directory_ls(rev_data["directory"])
if e["type"] == "file"
]
expected_dir_entry = random.choice(dir_entries)
expected_data = archive_data.content_get_data(
expected_dir_entry["checksums"]["sha1"]
)
actual_dir_entry = archive.lookup_directory_with_revision(
revision, expected_dir_entry["name"], with_data=True
)
assert actual_dir_entry["type"] == expected_dir_entry["type"]
assert actual_dir_entry["revision"] == revision
assert actual_dir_entry["path"] == expected_dir_entry["name"]
del actual_dir_entry["content"]["checksums"]["blake2s256"]
for key in ("checksums", "status", "length"):
assert actual_dir_entry["content"][key] == expected_dir_entry[key]
assert actual_dir_entry["content"]["data"] == expected_data["data"]
@given(revision())
def test_lookup_revision(archive_data, revision):
actual_revision = archive.lookup_revision(revision)
assert actual_revision == archive_data.revision_get(revision)
@given(new_revision())
def test_lookup_revision_invalid_msg(archive_data, new_revision):
new_revision = new_revision.to_dict()
new_revision["message"] = b"elegant fix for bug \xff"
archive_data.revision_add([Revision.from_dict(new_revision)])
revision = archive.lookup_revision(hash_to_hex(new_revision["id"]))
assert revision["message"] == "elegant fix for bug \\xff"
assert revision["decoding_failures"] == ["message"]
@given(new_revision())
def test_lookup_revision_msg_ok(archive_data, new_revision):
archive_data.revision_add([new_revision])
revision_message = archive.lookup_revision_message(hash_to_hex(new_revision.id))
assert revision_message == {"message": new_revision.message}
def test_lookup_revision_msg_no_rev():
unknown_revision_ = random_sha1()
with pytest.raises(NotFoundExc) as e:
archive.lookup_revision_message(unknown_revision_)
assert e.match("Revision with sha1_git %s not found." % unknown_revision_)
@given(revisions())
def test_lookup_revision_multiple(archive_data, revisions):
actual_revisions = list(archive.lookup_revision_multiple(revisions))
expected_revisions = []
for rev in revisions:
expected_revisions.append(archive_data.revision_get(rev))
assert actual_revisions == expected_revisions
def test_lookup_revision_multiple_none_found():
unknown_revisions_ = [random_sha1(), random_sha1(), random_sha1()]
actual_revisions = list(archive.lookup_revision_multiple(unknown_revisions_))
assert actual_revisions == [None] * len(unknown_revisions_)
@given(revision())
def test_lookup_revision_log(archive_data, revision):
actual_revision_log = list(archive.lookup_revision_log(revision, limit=25))
expected_revision_log = archive_data.revision_log(revision, limit=25)
assert actual_revision_log == expected_revision_log
def _get_origin_branches(archive_data, origin):
origin_visit = archive_data.origin_visit_get(origin["url"])[-1]
snapshot = archive_data.snapshot_get(origin_visit["snapshot"])
branches = {
k: v
for (k, v) in snapshot["branches"].items()
if v["target_type"] == "revision"
}
return branches
@given(origin())
def test_lookup_revision_log_by(archive_data, origin):
branches = _get_origin_branches(archive_data, origin)
branch_name = random.choice(list(branches.keys()))
actual_log = list(
archive.lookup_revision_log_by(origin["url"], branch_name, None, limit=25)
)
expected_log = archive_data.revision_log(branches[branch_name]["target"], limit=25)
assert actual_log == expected_log
@given(origin())
def test_lookup_revision_log_by_notfound(origin):
with pytest.raises(NotFoundExc):
archive.lookup_revision_log_by(
origin["url"], "unknown_branch_name", None, limit=100
)
def test_lookup_content_raw_not_found():
unknown_content_ = random_content()
with pytest.raises(NotFoundExc) as e:
archive.lookup_content_raw("sha1:" + unknown_content_["sha1"])
assert e.match(
"Content with %s checksum equals to %s not found!"
% ("sha1", unknown_content_["sha1"])
)
@given(content())
def test_lookup_content_raw(archive_data, content):
actual_content = archive.lookup_content_raw("sha256:%s" % content["sha256"])
expected_content = archive_data.content_get_data(content["sha1"])
assert actual_content == expected_content
@given(empty_content())
def test_lookup_empty_content_raw(archive_data, empty_content):
content_raw = archive.lookup_content_raw(f"sha1_git:{empty_content['sha1_git']}")
assert content_raw["data"] == b""
def test_lookup_content_not_found():
unknown_content_ = random_content()
with pytest.raises(NotFoundExc) as e:
archive.lookup_content("sha1:%s" % unknown_content_["sha1"])
assert e.match(
"Content with %s checksum equals to %s not found!"
% ("sha1", unknown_content_["sha1"])
)
@given(content())
def test_lookup_content_with_sha1(archive_data, content):
actual_content = archive.lookup_content(f"sha1:{content['sha1']}")
expected_content = archive_data.content_get(content["sha1"])
assert actual_content == expected_content
@given(content())
def test_lookup_content_with_sha256(archive_data, content):
actual_content = archive.lookup_content(f"sha256:{content['sha256']}")
expected_content = archive_data.content_get(content["sha1"])
assert actual_content == expected_content
def test_lookup_directory_bad_checksum():
with pytest.raises(BadInputExc):
archive.lookup_directory("directory_id")
def test_lookup_directory_not_found():
unknown_directory_ = random_sha1()
with pytest.raises(NotFoundExc) as e:
archive.lookup_directory(unknown_directory_)
assert e.match("Directory with sha1_git %s not found" % unknown_directory_)
@given(directory())
def test_lookup_directory(archive_data, directory):
actual_directory_ls = list(archive.lookup_directory(directory))
expected_directory_ls = archive_data.directory_ls(directory)
assert actual_directory_ls == expected_directory_ls
@given(empty_directory())
def test_lookup_directory_empty(empty_directory):
actual_directory_ls = list(archive.lookup_directory(empty_directory))
assert actual_directory_ls == []
@given(origin())
def test_lookup_revision_by_nothing_found(origin):
with pytest.raises(NotFoundExc):
archive.lookup_revision_by(origin["url"], "invalid-branch-name")
@given(origin())
def test_lookup_revision_by(archive_data, origin):
branches = _get_origin_branches(archive_data, origin)
branch_name = random.choice(list(branches.keys()))
actual_revision = archive.lookup_revision_by(origin["url"], branch_name)
expected_revision = archive_data.revision_get(branches[branch_name]["target"])
assert actual_revision == expected_revision
@given(origin(), revision())
def test_lookup_revision_with_context_by_ko(origin, revision):
with pytest.raises(NotFoundExc):
archive.lookup_revision_with_context_by(
origin["url"], "invalid-branch-name", None, revision
)
@given(origin())
def test_lookup_revision_with_context_by(archive_data, origin):
branches = _get_origin_branches(archive_data, origin)
branch_name = random.choice(list(branches.keys()))
root_rev = branches[branch_name]["target"]
root_rev_log = archive_data.revision_log(root_rev)
children = defaultdict(list)
for rev in root_rev_log:
for rev_p in rev["parents"]:
children[rev_p].append(rev["id"])
rev = root_rev_log[-1]["id"]
actual_root_rev, actual_rev = archive.lookup_revision_with_context_by(
origin["url"], branch_name, None, rev
)
expected_root_rev = archive_data.revision_get(root_rev)
expected_rev = archive_data.revision_get(rev)
expected_rev["children"] = children[rev]
assert actual_root_rev == expected_root_rev
assert actual_rev == expected_rev
def test_lookup_revision_through_ko_not_implemented():
with pytest.raises(NotImplementedError):
archive.lookup_revision_through({"something-unknown": 10})
@given(origin())
def test_lookup_revision_through_with_context_by(archive_data, origin):
branches = _get_origin_branches(archive_data, origin)
branch_name = random.choice(list(branches.keys()))
root_rev = branches[branch_name]["target"]
root_rev_log = archive_data.revision_log(root_rev)
rev = root_rev_log[-1]["id"]
assert archive.lookup_revision_through(
{
"origin_url": origin["url"],
"branch_name": branch_name,
"ts": None,
"sha1_git": rev,
}
) == archive.lookup_revision_with_context_by(origin["url"], branch_name, None, rev)
@given(origin())
def test_lookup_revision_through_with_revision_by(archive_data, origin):
branches = _get_origin_branches(archive_data, origin)
branch_name = random.choice(list(branches.keys()))
assert archive.lookup_revision_through(
{"origin_url": origin["url"], "branch_name": branch_name, "ts": None,}
) == archive.lookup_revision_by(origin["url"], branch_name, None)
@given(ancestor_revisions())
def test_lookup_revision_through_with_context(ancestor_revisions):
sha1_git = ancestor_revisions["sha1_git"]
sha1_git_root = ancestor_revisions["sha1_git_root"]
assert archive.lookup_revision_through(
{"sha1_git_root": sha1_git_root, "sha1_git": sha1_git,}
) == archive.lookup_revision_with_context(sha1_git_root, sha1_git)
@given(revision())
def test_lookup_revision_through_with_revision(revision):
assert archive.lookup_revision_through(
{"sha1_git": revision}
) == archive.lookup_revision(revision)
@given(revision())
def test_lookup_directory_through_revision_ko_not_found(revision):
with pytest.raises(NotFoundExc):
archive.lookup_directory_through_revision(
{"sha1_git": revision}, "some/invalid/path"
)
@given(revision())
def test_lookup_directory_through_revision_ok(archive_data, revision):
rev_data = archive_data.revision_get(revision)
dir_entries = [
e
for e in archive_data.directory_ls(rev_data["directory"])
if e["type"] == "file"
]
dir_entry = random.choice(dir_entries)
assert archive.lookup_directory_through_revision(
{"sha1_git": revision}, dir_entry["name"]
) == (revision, archive.lookup_directory_with_revision(revision, dir_entry["name"]))
@given(revision())
def test_lookup_directory_through_revision_ok_with_data(archive_data, revision):
rev_data = archive_data.revision_get(revision)
dir_entries = [
e
for e in archive_data.directory_ls(rev_data["directory"])
if e["type"] == "file"
]
dir_entry = random.choice(dir_entries)
assert archive.lookup_directory_through_revision(
{"sha1_git": revision}, dir_entry["name"], with_data=True
) == (
revision,
archive.lookup_directory_with_revision(
revision, dir_entry["name"], with_data=True
),
)
@given(content(), directory(), release(), revision(), snapshot())
def test_lookup_known_objects(
archive_data, content, directory, release, revision, snapshot
):
expected = archive_data.content_find(content)
assert archive.lookup_object(CONTENT, content["sha1_git"]) == expected
expected = archive_data.directory_get(directory)
assert archive.lookup_object(DIRECTORY, directory) == expected
expected = archive_data.release_get(release)
assert archive.lookup_object(RELEASE, release) == expected
expected = archive_data.revision_get(revision)
assert archive.lookup_object(REVISION, revision) == expected
expected = {**archive_data.snapshot_get(snapshot), "next_branch": None}
assert archive.lookup_object(SNAPSHOT, snapshot) == expected
@given(
unknown_content(),
unknown_directory(),
unknown_release(),
unknown_revision(),
unknown_snapshot(),
)
def test_lookup_unknown_objects(
unknown_content,
unknown_directory,
unknown_release,
unknown_revision,
unknown_snapshot,
):
with pytest.raises(NotFoundExc) as e:
archive.lookup_object(CONTENT, unknown_content["sha1_git"])
assert e.match(r"Content.*not found")
with pytest.raises(NotFoundExc) as e:
archive.lookup_object(DIRECTORY, unknown_directory)
assert e.match(r"Directory.*not found")
with pytest.raises(NotFoundExc) as e:
archive.lookup_object(RELEASE, unknown_release)
assert e.match(r"Release.*not found")
with pytest.raises(NotFoundExc) as e:
archive.lookup_object(REVISION, unknown_revision)
assert e.match(r"Revision.*not found")
with pytest.raises(NotFoundExc) as e:
archive.lookup_object(SNAPSHOT, unknown_snapshot)
assert e.match(r"Snapshot.*not found")
@given(invalid_sha1())
def test_lookup_invalid_objects(invalid_sha1):
with pytest.raises(BadInputExc) as e:
archive.lookup_object("foo", invalid_sha1)
assert e.match("Invalid swh object type")
with pytest.raises(BadInputExc) as e:
archive.lookup_object(CONTENT, invalid_sha1)
assert e.match("Invalid hash")
with pytest.raises(BadInputExc) as e:
archive.lookup_object(DIRECTORY, invalid_sha1)
assert e.match("Invalid checksum")
with pytest.raises(BadInputExc) as e:
archive.lookup_object(RELEASE, invalid_sha1)
assert e.match("Invalid checksum")
with pytest.raises(BadInputExc) as e:
archive.lookup_object(REVISION, invalid_sha1)
assert e.match("Invalid checksum")
with pytest.raises(BadInputExc) as e:
archive.lookup_object(SNAPSHOT, invalid_sha1)
assert e.match("Invalid checksum")
def test_lookup_missing_hashes_non_present():
missing_cnt = random_sha1()
missing_dir = random_sha1()
missing_rev = random_sha1()
missing_rel = random_sha1()
missing_snp = random_sha1()
grouped_swhids = {
CONTENT: [hash_to_bytes(missing_cnt)],
DIRECTORY: [hash_to_bytes(missing_dir)],
REVISION: [hash_to_bytes(missing_rev)],
RELEASE: [hash_to_bytes(missing_rel)],
SNAPSHOT: [hash_to_bytes(missing_snp)],
}
actual_result = archive.lookup_missing_hashes(grouped_swhids)
assert actual_result == {
missing_cnt,
missing_dir,
missing_rev,
missing_rel,
missing_snp,
}
@given(content(), directory())
def test_lookup_missing_hashes_some_present(archive_data, content, directory):
missing_rev = random_sha1()
missing_rel = random_sha1()
missing_snp = random_sha1()
grouped_swhids = {
CONTENT: [hash_to_bytes(content["sha1_git"])],
DIRECTORY: [hash_to_bytes(directory)],
REVISION: [hash_to_bytes(missing_rev)],
RELEASE: [hash_to_bytes(missing_rel)],
SNAPSHOT: [hash_to_bytes(missing_snp)],
}
actual_result = archive.lookup_missing_hashes(grouped_swhids)
assert actual_result == {missing_rev, missing_rel, missing_snp}
@given(origin())
def test_lookup_origin_extra_trailing_slash(origin):
origin_info = archive.lookup_origin({"url": f"{origin['url']}/"})
assert origin_info["url"] == origin["url"]
def test_lookup_origin_missing_trailing_slash(archive_data):
deb_origin = Origin(url="http://snapshot.debian.org/package/r-base/")
archive_data.origin_add([deb_origin])
origin_info = archive.lookup_origin({"url": deb_origin.url[:-1]})
assert origin_info["url"] == deb_origin.url
@given(snapshot())
def test_lookup_snapshot_branch_name_from_tip_revision(archive_data, snapshot_id):
snapshot = archive_data.snapshot_get(snapshot_id)
branches = [
{"name": k, "revision": v["target"]}
for k, v in snapshot["branches"].items()
if v["target_type"] == "revision"
]
branch_info = random.choice(branches)
possible_results = [
b["name"] for b in branches if b["revision"] == branch_info["revision"]
]
assert (
archive.lookup_snapshot_branch_name_from_tip_revision(
snapshot_id, branch_info["revision"]
)
in possible_results
)
@given(origin(), new_origin())
def test_lookup_origins_get_by_sha1s(origin, unknown_origin):
hasher = hashlib.sha1()
hasher.update(origin["url"].encode("ascii"))
origin_info = OriginInfo(url=origin["url"])
origin_sha1 = hasher.hexdigest()
hasher = hashlib.sha1()
hasher.update(unknown_origin.url.encode("ascii"))
unknown_origin_sha1 = hasher.hexdigest()
origins = list(archive.lookup_origins_by_sha1s([origin_sha1]))
assert origins == [origin_info]
origins = list(archive.lookup_origins_by_sha1s([origin_sha1, origin_sha1]))
assert origins == [origin_info, origin_info]
origins = list(archive.lookup_origins_by_sha1s([origin_sha1, unknown_origin_sha1]))
assert origins == [origin_info, None]
@given(snapshot())
def test_lookup_snapshot_sizes(archive_data, snapshot):
branches = archive_data.snapshot_get(snapshot)["branches"]
expected_sizes = {
"alias": 0,
"release": 0,
"revision": 0,
}
for branch_name, branch_info in branches.items():
if branch_info is not None:
expected_sizes[branch_info["target_type"]] += 1
assert archive.lookup_snapshot_sizes(snapshot) == expected_sizes
@given(snapshot())
def test_lookup_snapshot_alias(snapshot):
resolved_alias = archive.lookup_snapshot_alias(snapshot, "HEAD")
assert resolved_alias is not None
assert resolved_alias["target_type"] == "revision"
assert resolved_alias["target"] is not None
|