diff --git a/swh/web/api/views/identifiers.py b/swh/web/api/views/identifiers.py
--- a/swh/web/api/views/identifiers.py
+++ b/swh/web/api/views/identifiers.py
@@ -54,9 +54,9 @@
# object is present in the archive, NotFoundExc
# will be raised otherwise
swhid_parsed = swhid_resolved["swhid_parsed"]
- object_type = swhid_parsed.object_type.name.lower()
+ object_type = swhid_parsed.object_type
object_id = hash_to_hex(swhid_parsed.object_id)
- archive.lookup_object(object_type, object_id)
+ archive.lookup_object(swhid_parsed.object_type, object_id)
# id is well-formed and the pointed object exists
return {
"namespace": swhid_parsed.namespace,
diff --git a/swh/web/browse/snapshot_context.py b/swh/web/browse/snapshot_context.py
--- a/swh/web/browse/snapshot_context.py
+++ b/swh/web/browse/snapshot_context.py
@@ -14,15 +14,7 @@
from django.utils.html import escape
from swh.model.hashutil import hash_to_bytes
-from swh.model.identifiers import (
- CONTENT,
- DIRECTORY,
- RELEASE,
- REVISION,
- SNAPSHOT,
- CoreSWHID,
- ObjectType,
-)
+from swh.model.identifiers import CoreSWHID, ObjectType
from swh.model.model import Snapshot
from swh.web.browse.utils import (
content_display_max_size,
@@ -805,9 +797,9 @@
revision_found = False
swh_objects = [
- SWHObjectInfo(object_type=DIRECTORY, object_id=sha1_git),
- SWHObjectInfo(object_type=REVISION, object_id=revision_id),
- SWHObjectInfo(object_type=SNAPSHOT, object_id=snapshot_id),
+ SWHObjectInfo(object_type=ObjectType.DIRECTORY, object_id=sha1_git),
+ SWHObjectInfo(object_type=ObjectType.REVISION, object_id=revision_id),
+ SWHObjectInfo(object_type=ObjectType.SNAPSHOT, object_id=snapshot_id),
]
visit_date = None
@@ -818,10 +810,12 @@
release_id = snapshot_context["release_id"]
if release_id:
- swh_objects.append(SWHObjectInfo(object_type=RELEASE, object_id=release_id))
+ swh_objects.append(
+ SWHObjectInfo(object_type=ObjectType.RELEASE, object_id=release_id)
+ )
dir_metadata = DirectoryMetadata(
- object_type=DIRECTORY,
+ object_type=ObjectType.DIRECTORY,
object_id=sha1_git,
directory=sha1_git,
nb_files=nb_files,
@@ -984,10 +978,12 @@
content_checksums = content_data.get("checksums", {})
swh_objects = [
- SWHObjectInfo(object_type=CONTENT, object_id=content_checksums.get("sha1_git")),
- SWHObjectInfo(object_type=DIRECTORY, object_id=directory_id),
- SWHObjectInfo(object_type=REVISION, object_id=revision_id),
- SWHObjectInfo(object_type=SNAPSHOT, object_id=snapshot_id),
+ SWHObjectInfo(
+ object_type=ObjectType.CONTENT, object_id=content_checksums.get("sha1_git")
+ ),
+ SWHObjectInfo(object_type=ObjectType.DIRECTORY, object_id=directory_id),
+ SWHObjectInfo(object_type=ObjectType.REVISION, object_id=revision_id),
+ SWHObjectInfo(object_type=ObjectType.SNAPSHOT, object_id=snapshot_id),
]
visit_date = None
@@ -998,10 +994,12 @@
release_id = snapshot_context["release_id"]
if release_id:
- swh_objects.append(SWHObjectInfo(object_type=RELEASE, object_id=release_id))
+ swh_objects.append(
+ SWHObjectInfo(object_type=ObjectType.RELEASE, object_id=release_id)
+ )
content_metadata = ContentMetadata(
- object_type=CONTENT,
+ object_type=ObjectType.CONTENT,
object_id=content_checksums.get("sha1_git"),
sha1=content_checksums.get("sha1"),
sha1_git=content_checksums.get("sha1_git"),
@@ -1175,13 +1173,15 @@
revision_metadata["origin visit type"] = visit_info["type"]
swh_objects = [
- SWHObjectInfo(object_type=REVISION, object_id=revision_id),
- SWHObjectInfo(object_type=SNAPSHOT, object_id=snapshot_id),
+ SWHObjectInfo(object_type=ObjectType.REVISION, object_id=revision_id),
+ SWHObjectInfo(object_type=ObjectType.SNAPSHOT, object_id=snapshot_id),
]
release_id = snapshot_context["release_id"]
if release_id:
- swh_objects.append(SWHObjectInfo(object_type=RELEASE, object_id=release_id))
+ swh_objects.append(
+ SWHObjectInfo(object_type=ObjectType.RELEASE, object_id=release_id)
+ )
browse_rel_link = gen_release_link(release_id)
revision_metadata["release"] = release_id
revision_metadata["context-independent release"] = browse_rel_link
diff --git a/swh/web/browse/views/content.py b/swh/web/browse/views/content.py
--- a/swh/web/browse/views/content.py
+++ b/swh/web/browse/views/content.py
@@ -13,7 +13,7 @@
from django.template.defaultfilters import filesizeformat
from swh.model.hashutil import hash_to_hex
-from swh.model.identifiers import CONTENT, DIRECTORY, RELEASE, REVISION, SNAPSHOT
+from swh.model.identifiers import ObjectType
from swh.web.browse.browseurls import browse_route
from swh.web.browse.snapshot_context import get_snapshot_context
from swh.web.browse.utils import (
@@ -209,7 +209,7 @@
release_name=request.GET.get("release"),
revision_id=request.GET.get("revision"),
path=path,
- browse_context=CONTENT,
+ browse_context=ObjectType.CONTENT,
)
except NotFoundExc as e:
if str(e).startswith("Origin"):
@@ -306,7 +306,7 @@
)
content_metadata = ContentMetadata(
- object_type=CONTENT,
+ object_type=ObjectType.CONTENT,
object_id=content_checksums.get("sha1_git"),
sha1=content_checksums.get("sha1"),
sha1_git=content_checksums.get("sha1_git"),
@@ -328,27 +328,34 @@
)
swh_objects = [
- SWHObjectInfo(object_type=CONTENT, object_id=content_checksums.get("sha1_git"))
+ SWHObjectInfo(
+ object_type=ObjectType.CONTENT, object_id=content_checksums.get("sha1_git")
+ )
]
if directory_id:
- swh_objects.append(SWHObjectInfo(object_type=DIRECTORY, object_id=directory_id))
+ swh_objects.append(
+ SWHObjectInfo(object_type=ObjectType.DIRECTORY, object_id=directory_id)
+ )
if snapshot_context:
swh_objects.append(
SWHObjectInfo(
- object_type=REVISION, object_id=snapshot_context["revision_id"]
+ object_type=ObjectType.REVISION,
+ object_id=snapshot_context["revision_id"],
)
)
swh_objects.append(
SWHObjectInfo(
- object_type=SNAPSHOT, object_id=snapshot_context["snapshot_id"]
+ object_type=ObjectType.SNAPSHOT,
+ object_id=snapshot_context["snapshot_id"],
)
)
if snapshot_context["release_id"]:
swh_objects.append(
SWHObjectInfo(
- object_type=RELEASE, object_id=snapshot_context["release_id"]
+ object_type=ObjectType.RELEASE,
+ object_id=snapshot_context["release_id"],
)
)
diff --git a/swh/web/browse/views/directory.py b/swh/web/browse/views/directory.py
--- a/swh/web/browse/views/directory.py
+++ b/swh/web/browse/views/directory.py
@@ -11,7 +11,7 @@
from django.shortcuts import redirect, render
from django.template.defaultfilters import filesizeformat
-from swh.model.identifiers import DIRECTORY, RELEASE, REVISION, SNAPSHOT
+from swh.model.identifiers import ObjectType
from swh.web.browse.browseurls import browse_route
from swh.web.browse.snapshot_context import get_snapshot_context
from swh.web.browse.utils import gen_link, get_directory_entries, get_readme_to_display
@@ -139,7 +139,7 @@
sum_file_sizes = filesizeformat(sum_file_sizes)
dir_metadata = DirectoryMetadata(
- object_type=DIRECTORY,
+ object_type=ObjectType.DIRECTORY,
object_id=sha1_git,
directory=root_sha1_git,
nb_files=len(files),
@@ -160,23 +160,26 @@
"revision_id": None,
}
- swh_objects = [SWHObjectInfo(object_type=DIRECTORY, object_id=sha1_git)]
+ swh_objects = [SWHObjectInfo(object_type=ObjectType.DIRECTORY, object_id=sha1_git)]
if snapshot_context:
swh_objects.append(
SWHObjectInfo(
- object_type=REVISION, object_id=snapshot_context["revision_id"]
+ object_type=ObjectType.REVISION,
+ object_id=snapshot_context["revision_id"],
)
)
swh_objects.append(
SWHObjectInfo(
- object_type=SNAPSHOT, object_id=snapshot_context["snapshot_id"]
+ object_type=ObjectType.SNAPSHOT,
+ object_id=snapshot_context["snapshot_id"],
)
)
if snapshot_context["release_id"]:
swh_objects.append(
SWHObjectInfo(
- object_type=RELEASE, object_id=snapshot_context["release_id"]
+ object_type=ObjectType.RELEASE,
+ object_id=snapshot_context["release_id"],
)
)
diff --git a/swh/web/browse/views/release.py b/swh/web/browse/views/release.py
--- a/swh/web/browse/views/release.py
+++ b/swh/web/browse/views/release.py
@@ -7,7 +7,7 @@
from django.shortcuts import render
-from swh.model.identifiers import CONTENT, DIRECTORY, RELEASE, REVISION, SNAPSHOT
+from swh.model.identifiers import ObjectType
from swh.web.browse.browseurls import browse_route
from swh.web.browse.snapshot_context import get_snapshot_context
from swh.web.browse.utils import (
@@ -81,7 +81,7 @@
snapshot_id = snapshot_context.get("snapshot_id", None)
release_metadata = ReleaseMetadata(
- object_type=RELEASE,
+ object_type=ObjectType.RELEASE,
object_id=sha1_git,
release=sha1_git,
author=release["author"]["fullname"] if release["author"] else "None",
@@ -101,13 +101,13 @@
if release["message"]:
release_note_lines = release["message"].split("\n")
- swh_objects = [SWHObjectInfo(object_type=RELEASE, object_id=sha1_git)]
+ swh_objects = [SWHObjectInfo(object_type=ObjectType.RELEASE, object_id=sha1_git)]
vault_cooking = None
rev_directory = None
target_link = None
- if release["target_type"] == REVISION:
+ if release["target_type"] == ObjectType.REVISION:
target_link = gen_revision_link(
release["target"],
snapshot_context=snapshot_context,
@@ -124,14 +124,16 @@
"revision_id": release["target"],
}
swh_objects.append(
- SWHObjectInfo(object_type=REVISION, object_id=release["target"])
+ SWHObjectInfo(
+ object_type=ObjectType.REVISION, object_id=release["target"]
+ )
)
swh_objects.append(
- SWHObjectInfo(object_type=DIRECTORY, object_id=rev_directory)
+ SWHObjectInfo(object_type=ObjectType.DIRECTORY, object_id=rev_directory)
)
except Exception as exc:
sentry_sdk.capture_exception(exc)
- elif release["target_type"] == DIRECTORY:
+ elif release["target_type"] == ObjectType.DIRECTORY:
target_link = gen_directory_link(
release["target"],
snapshot_context=snapshot_context,
@@ -148,11 +150,13 @@
"revision_id": None,
}
swh_objects.append(
- SWHObjectInfo(object_type=DIRECTORY, object_id=release["target"])
+ SWHObjectInfo(
+ object_type=ObjectType.DIRECTORY, object_id=release["target"]
+ )
)
except Exception as exc:
sentry_sdk.capture_exception(exc)
- elif release["target_type"] == CONTENT:
+ elif release["target_type"] == ObjectType.CONTENT:
target_link = gen_content_link(
release["target"],
snapshot_context=snapshot_context,
@@ -160,9 +164,9 @@
link_attrs=None,
)
swh_objects.append(
- SWHObjectInfo(object_type=CONTENT, object_id=release["target"])
+ SWHObjectInfo(object_type=ObjectType.CONTENT, object_id=release["target"])
)
- elif release["target_type"] == RELEASE:
+ elif release["target_type"] == ObjectType.RELEASE:
target_link = gen_release_link(
release["target"],
snapshot_context=snapshot_context,
@@ -202,7 +206,9 @@
snapshot_id = snapshot_context["snapshot_id"]
if snapshot_id:
- swh_objects.append(SWHObjectInfo(object_type=SNAPSHOT, object_id=snapshot_id))
+ swh_objects.append(
+ SWHObjectInfo(object_type=ObjectType.SNAPSHOT, object_id=snapshot_id)
+ )
swhids_info = get_swhids_info(swh_objects, snapshot_context)
diff --git a/swh/web/browse/views/revision.py b/swh/web/browse/views/revision.py
--- a/swh/web/browse/views/revision.py
+++ b/swh/web/browse/views/revision.py
@@ -13,14 +13,7 @@
from django.utils.safestring import mark_safe
from swh.model.hashutil import hash_to_bytes
-from swh.model.identifiers import (
- CONTENT,
- DIRECTORY,
- REVISION,
- SNAPSHOT,
- CoreSWHID,
- ObjectType,
-)
+from swh.model.identifiers import CoreSWHID, ObjectType
from swh.web.browse.browseurls import browse_route
from swh.web.browse.snapshot_context import get_snapshot_context
from swh.web.browse.utils import (
@@ -378,7 +371,7 @@
dirs, files = get_directory_entries(dir_id)
revision_metadata = RevisionMetadata(
- object_type=REVISION,
+ object_type=ObjectType.REVISION,
object_id=sha1_git,
revision=sha1_git,
author=revision["author"]["fullname"] if revision["author"] else "None",
@@ -449,7 +442,7 @@
"revision_id": sha1_git,
}
- swh_objects = [SWHObjectInfo(object_type=REVISION, object_id=sha1_git)]
+ swh_objects = [SWHObjectInfo(object_type=ObjectType.REVISION, object_id=sha1_git)]
content = None
content_size = None
@@ -493,7 +486,7 @@
}
swh_objects.append(
- SWHObjectInfo(object_type=CONTENT, object_id=file_info["target"])
+ SWHObjectInfo(object_type=ObjectType.CONTENT, object_id=file_info["target"])
)
else:
for d in dirs:
@@ -531,7 +524,9 @@
vault_cooking["directory_context"] = True
vault_cooking["directory_id"] = dir_id
- swh_objects.append(SWHObjectInfo(object_type=DIRECTORY, object_id=dir_id))
+ swh_objects.append(
+ SWHObjectInfo(object_type=ObjectType.DIRECTORY, object_id=dir_id)
+ )
query_params.pop("path", None)
@@ -540,7 +535,9 @@
)
if snapshot_id:
- swh_objects.append(SWHObjectInfo(object_type=SNAPSHOT, object_id=snapshot_id))
+ swh_objects.append(
+ SWHObjectInfo(object_type=ObjectType.SNAPSHOT, object_id=snapshot_id)
+ )
swhids_info = get_swhids_info(swh_objects, snapshot_context, extra_context)
diff --git a/swh/web/common/archive.py b/swh/web/common/archive.py
--- a/swh/web/common/archive.py
+++ b/swh/web/common/archive.py
@@ -11,7 +11,7 @@
from urllib.parse import urlparse
from swh.model import hashutil
-from swh.model.identifiers import CONTENT, DIRECTORY, RELEASE, REVISION, SNAPSHOT
+from swh.model.identifiers import ObjectType
from swh.model.model import OriginVisit, Revision
from swh.storage.algos import diff, revisions_walker
from swh.storage.algos.origin import origin_get_latest_visit_status
@@ -19,7 +19,7 @@
from swh.vault.exc import NotFoundExc as VaultNotFoundExc
from swh.web import config
from swh.web.common import converters, query
-from swh.web.common.exc import BadInputExc, NotFoundExc
+from swh.web.common.exc import NotFoundExc
from swh.web.common.typing import (
OriginInfo,
OriginMetadataInfo,
@@ -1115,7 +1115,7 @@
while not branches_from or len(snapshot["branches"]) == per_page + 1:
snapshot = lookup_snapshot(
snapshot_id,
- target_types=[REVISION],
+ target_types=[ObjectType.REVISION],
branches_from=branches_from,
branches_count=per_page + 1,
)
@@ -1307,7 +1307,7 @@
return _RevisionsWalkerProxy(rev_walker_type, rev_start, *args, **kwargs)
-def lookup_object(object_type: str, object_id: str) -> Dict[str, Any]:
+def lookup_object(object_type: ObjectType, object_id: str) -> Dict[str, Any]:
"""
Utility function for looking up an object in the archive by its type
and id.
@@ -1327,24 +1327,18 @@
the archive
BadInputExc: if the object identifier is invalid
"""
- if object_type == CONTENT:
+ if object_type == ObjectType.CONTENT:
return lookup_content(f"sha1_git:{object_id}")
- elif object_type == DIRECTORY:
+ elif object_type == ObjectType.DIRECTORY:
return {"id": object_id, "content": list(lookup_directory(object_id))}
- elif object_type == RELEASE:
+ elif object_type == ObjectType.RELEASE:
return lookup_release(object_id)
- elif object_type == REVISION:
+ elif object_type == ObjectType.REVISION:
return lookup_revision(object_id)
- elif object_type == SNAPSHOT:
+ elif object_type == ObjectType.SNAPSHOT:
return lookup_snapshot(object_id)
-
- raise BadInputExc(
- (
- "Invalid swh object type! Valid types are "
- f"{CONTENT}, {DIRECTORY}, {RELEASE} "
- f"{REVISION} or {SNAPSHOT}."
- )
- )
+ else:
+ raise ValueError(f"Unexpected object type variant: {object_type}")
def lookup_missing_hashes(grouped_swhids: Dict[str, List[bytes]]) -> Set[str]:
@@ -1361,15 +1355,15 @@
missing_hashes = []
for obj_type, obj_ids in grouped_swhids.items():
- if obj_type == CONTENT:
+ if obj_type == ObjectType.CONTENT:
missing_hashes.append(storage.content_missing_per_sha1_git(obj_ids))
- elif obj_type == DIRECTORY:
+ elif obj_type == ObjectType.DIRECTORY:
missing_hashes.append(storage.directory_missing(obj_ids))
- elif obj_type == REVISION:
+ elif obj_type == ObjectType.REVISION:
missing_hashes.append(storage.revision_missing(obj_ids))
- elif obj_type == RELEASE:
+ elif obj_type == ObjectType.RELEASE:
missing_hashes.append(storage.release_missing(obj_ids))
- elif obj_type == SNAPSHOT:
+ elif obj_type == ObjectType.SNAPSHOT:
missing_hashes.append(storage.snapshot_missing(obj_ids))
missing = set(
diff --git a/swh/web/common/converters.py b/swh/web/common/converters.py
--- a/swh/web/common/converters.py
+++ b/swh/web/common/converters.py
@@ -7,8 +7,11 @@
import json
from typing import Any, Dict, Union
+from django.core.serializers.json import DjangoJSONEncoder
+
from swh.core.utils import decode_with_escape
from swh.model import hashutil
+from swh.model.identifiers import ObjectType
from swh.model.model import Release, Revision
from swh.storage.interface import PartialBranches
from swh.web.common.typing import OriginInfo, OriginVisitInfo
@@ -229,6 +232,17 @@
)
+class SWHDjangoJSONEncoder(DjangoJSONEncoder):
+ """Wrapper around DjangoJSONEncoder to serialize SWH-specific types
+ found in :cls:`swh.web.common.typing.SWHObjectInfo`."""
+
+ def default(self, o):
+ if isinstance(o, ObjectType):
+ return o.name.lower()
+ else:
+ super().default(o)
+
+
class SWHMetadataEncoder(json.JSONEncoder):
"""Special json encoder for metadata field which can contain bytes
encoded value.
diff --git a/swh/web/common/identifiers.py b/swh/web/common/identifiers.py
--- a/swh/web/common/identifiers.py
+++ b/swh/web/common/identifiers.py
@@ -12,15 +12,7 @@
from swh.model.exceptions import ValidationError
from swh.model.hashutil import hash_to_bytes, hash_to_hex
-from swh.model.identifiers import (
- CONTENT,
- DIRECTORY,
- RELEASE,
- REVISION,
- SNAPSHOT,
- ObjectType,
- QualifiedSWHID,
-)
+from swh.model.identifiers import ObjectType, QualifiedSWHID
from swh.web.common import archive
from swh.web.common.exc import BadInputExc
from swh.web.common.typing import (
@@ -33,8 +25,18 @@
from swh.web.common.utils import reverse
+def parse_object_type(object_type: str) -> ObjectType:
+ try:
+ return ObjectType[object_type.upper()]
+ except KeyError:
+ valid_types = ", ".join(variant.name.lower() for variant in ObjectType)
+ raise BadInputExc(
+ f"Invalid swh object type! Valid types are {valid_types}; not {object_type}"
+ )
+
+
def gen_swhid(
- object_type: str,
+ object_type: ObjectType,
object_id: str,
scheme_version: int = 1,
metadata: SWHIDContext = {},
@@ -61,11 +63,10 @@
generate a valid identifier
"""
try:
- decoded_object_type = ObjectType[object_type.upper()]
decoded_object_id = hash_to_bytes(object_id)
obj_swhid = str(
QualifiedSWHID(
- object_type=decoded_object_type,
+ object_type=object_type,
object_id=decoded_object_id,
scheme_version=scheme_version,
**metadata,
@@ -136,7 +137,7 @@
release = archive.lookup_release(
hash_to_hex(swhid_parsed.anchor.object_id)
)
- if release["target_type"] == REVISION:
+ if release["target_type"] == ObjectType.REVISION:
revision = archive.lookup_revision(release["target"])
directory = revision["directory"]
if object_type == ObjectType.CONTENT:
@@ -249,7 +250,7 @@
return swhid_parsed
-def group_swhids(swhids: Iterable[QualifiedSWHID],) -> Dict[str, List[bytes]]:
+def group_swhids(swhids: Iterable[QualifiedSWHID],) -> Dict[ObjectType, List[bytes]]:
"""
Groups many SoftWare Heritage persistent IDentifiers into a
dictionary depending on their type.
@@ -263,18 +264,18 @@
keys: object types
values: object hashes
"""
- swhids_by_type: Dict[str, List[bytes]] = {
- CONTENT: [],
- DIRECTORY: [],
- REVISION: [],
- RELEASE: [],
- SNAPSHOT: [],
+ swhids_by_type: Dict[ObjectType, List[bytes]] = {
+ ObjectType.CONTENT: [],
+ ObjectType.DIRECTORY: [],
+ ObjectType.REVISION: [],
+ ObjectType.RELEASE: [],
+ ObjectType.SNAPSHOT: [],
}
for obj_swhid in swhids:
obj_id = obj_swhid.object_id
obj_type = obj_swhid.object_type
- swhids_by_type[obj_type.name.lower()].append(hash_to_bytes(obj_id))
+ swhids_by_type[obj_type].append(hash_to_bytes(obj_id))
return swhids_by_type
@@ -321,47 +322,49 @@
swhid_context["origin"] = quote(
snapshot_context["origin_info"]["url"], safe="/?:@&"
)
- if object_type != SNAPSHOT:
+ if object_type != ObjectType.SNAPSHOT:
swhid_context["visit"] = gen_swhid(
- SNAPSHOT, snapshot_context["snapshot_id"]
+ ObjectType.SNAPSHOT, snapshot_context["snapshot_id"]
)
- if object_type in (CONTENT, DIRECTORY):
+ if object_type in (ObjectType.CONTENT, ObjectType.DIRECTORY):
if snapshot_context["release_id"] is not None:
swhid_context["anchor"] = gen_swhid(
- RELEASE, snapshot_context["release_id"]
+ ObjectType.RELEASE, snapshot_context["release_id"]
)
elif snapshot_context["revision_id"] is not None:
swhid_context["anchor"] = gen_swhid(
- REVISION, snapshot_context["revision_id"]
+ ObjectType.REVISION, snapshot_context["revision_id"]
)
- if object_type in (CONTENT, DIRECTORY):
+ if object_type in (ObjectType.CONTENT, ObjectType.DIRECTORY):
if (
extra_context
and "revision" in extra_context
and extra_context["revision"]
and "anchor" not in swhid_context
):
- swhid_context["anchor"] = gen_swhid(REVISION, extra_context["revision"])
+ swhid_context["anchor"] = gen_swhid(
+ ObjectType.REVISION, extra_context["revision"]
+ )
elif (
extra_context
and "root_directory" in extra_context
and extra_context["root_directory"]
and "anchor" not in swhid_context
and (
- object_type != DIRECTORY
+ object_type != ObjectType.DIRECTORY
or extra_context["root_directory"] != object_id
)
):
swhid_context["anchor"] = gen_swhid(
- DIRECTORY, extra_context["root_directory"]
+ ObjectType.DIRECTORY, extra_context["root_directory"]
)
path = None
if extra_context and "path" in extra_context:
path = extra_context["path"] or "/"
- if "filename" in extra_context and object_type == CONTENT:
+ if "filename" in extra_context and object_type == ObjectType.CONTENT:
path += extra_context["filename"]
- if object_type == DIRECTORY and path == "/":
+ if object_type == ObjectType.DIRECTORY and path == "/":
path = None
if path:
swhid_context["path"] = quote(path, safe="/?:@&")
diff --git a/swh/web/common/swh_templatetags.py b/swh/web/common/swh_templatetags.py
--- a/swh/web/common/swh_templatetags.py
+++ b/swh/web/common/swh_templatetags.py
@@ -7,9 +7,9 @@
import re
from django import template
-from django.core.serializers.json import DjangoJSONEncoder
from django.utils.safestring import mark_safe
+from swh.web.common.converters import SWHDjangoJSONEncoder
from swh.web.common.origin_save import get_savable_visit_types
from swh.web.common.utils import rst_to_html
@@ -79,7 +79,7 @@
JSON representation of the variable.
"""
- return mark_safe(json.dumps(obj, cls=DjangoJSONEncoder))
+ return mark_safe(json.dumps(obj, cls=SWHDjangoJSONEncoder))
@register.filter
diff --git a/swh/web/common/typing.py b/swh/web/common/typing.py
--- a/swh/web/common/typing.py
+++ b/swh/web/common/typing.py
@@ -10,6 +10,7 @@
from django.http import QueryDict
from swh.core.api.classes import PagedResult as CorePagedResult
+from swh.model.identifiers import ObjectType
QueryParameters = Union[Dict[str, Any], QueryDict]
@@ -133,7 +134,7 @@
class SWHObjectInfo(TypedDict):
- object_type: str
+ object_type: ObjectType
object_id: str
diff --git a/swh/web/misc/badges.py b/swh/web/misc/badges.py
--- a/swh/web/misc/badges.py
+++ b/swh/web/misc/badges.py
@@ -14,10 +14,10 @@
from swh.model.exceptions import ValidationError
from swh.model.hashutil import hash_to_bytes, hash_to_hex
-from swh.model.identifiers import RELEASE, CoreSWHID, ObjectType, QualifiedSWHID
+from swh.model.identifiers import CoreSWHID, ObjectType, QualifiedSWHID
from swh.web.common import archive
from swh.web.common.exc import BadInputExc, NotFoundExc
-from swh.web.common.identifiers import resolve_swhid
+from swh.web.common.identifiers import parse_object_type, resolve_swhid
from swh.web.common.utils import reverse
_orange = "#f36a24"
@@ -92,9 +92,9 @@
# from it
if object_swhid:
parsed_swhid = QualifiedSWHID.from_string(object_swhid)
- object_type = parsed_swhid.object_type.name.lower()
+ parsed_object_type = parsed_swhid.object_type
object_id = hash_to_hex(parsed_swhid.object_id)
- swh_object = archive.lookup_object(object_type, object_id)
+ swh_object = archive.lookup_object(parsed_swhid.object_type, object_id)
# remove SWHID qualified if any for badge text
right_text = str(
CoreSWHID(
@@ -102,18 +102,20 @@
object_id=parsed_swhid.object_id,
)
)
+ object_type = parsed_swhid.object_type.name.lower()
else:
+ parsed_object_type = parse_object_type(object_type)
right_text = str(
CoreSWHID(
- object_type=ObjectType[object_type.upper()],
+ object_type=parsed_object_type,
object_id=hash_to_bytes(object_id),
)
)
- swh_object = archive.lookup_object(object_type, object_id)
+ swh_object = archive.lookup_object(parsed_object_type, object_id)
whole_link = resolve_swhid(str(right_text))["browse_url"]
# use release name for badge text
- if object_type == RELEASE:
+ if parsed_object_type == ObjectType.RELEASE:
right_text = "release %s" % swh_object["name"]
left_text = "archived"
except (BadInputExc, ValidationError):
diff --git a/swh/web/tests/api/test_apiresponse.py b/swh/web/tests/api/test_apiresponse.py
--- a/swh/web/tests/api/test_apiresponse.py
+++ b/swh/web/tests/api/test_apiresponse.py
@@ -12,7 +12,7 @@
)
from hypothesis import given
-from swh.model.identifiers import CONTENT, DIRECTORY, REVISION
+from swh.model.identifiers import ObjectType
from swh.web.api.apiresponse import (
compute_link_header,
filter_by_fields,
@@ -163,9 +163,9 @@
assert ACCESS_CONTROL_ALLOW_ORIGIN in resp
swhids = [
- gen_swhid(CONTENT, content["sha1_git"]),
- gen_swhid(DIRECTORY, directory),
- gen_swhid(REVISION, revision),
+ gen_swhid(ObjectType.CONTENT, content["sha1_git"]),
+ gen_swhid(ObjectType.DIRECTORY, directory),
+ gen_swhid(ObjectType.REVISION, revision),
]
url = reverse("api-1-known")
ac_request_method = "POST"
diff --git a/swh/web/tests/api/views/test_identifiers.py b/swh/web/tests/api/views/test_identifiers.py
--- a/swh/web/tests/api/views/test_identifiers.py
+++ b/swh/web/tests/api/views/test_identifiers.py
@@ -5,7 +5,7 @@
from hypothesis import given
-from swh.model.identifiers import CONTENT, DIRECTORY, RELEASE, REVISION, SNAPSHOT
+from swh.model.identifiers import ObjectType
from swh.web.common.identifiers import gen_swhid
from swh.web.common.utils import reverse
from swh.web.tests.data import random_sha1
@@ -31,11 +31,11 @@
):
for obj_type, obj_id in (
- (CONTENT, content["sha1_git"]),
- (DIRECTORY, directory),
- (RELEASE, release),
- (REVISION, revision),
- (SNAPSHOT, snapshot),
+ (ObjectType.CONTENT, content["sha1_git"]),
+ (ObjectType.DIRECTORY, directory),
+ (ObjectType.RELEASE, release),
+ (ObjectType.REVISION, revision),
+ (ObjectType.SNAPSHOT, snapshot),
):
swhid = gen_swhid(obj_type, obj_id, metadata={"origin": origin["url"]})
@@ -43,9 +43,9 @@
resp = check_api_get_responses(api_client, url, status_code=200)
- if obj_type == CONTENT:
+ if obj_type == ObjectType.CONTENT:
url_args = {"query_string": "sha1_git:%s" % obj_id}
- elif obj_type == SNAPSHOT:
+ elif obj_type == ObjectType.SNAPSHOT:
url_args = {"snapshot_id": obj_id}
else:
url_args = {"sha1_git": obj_id}
@@ -93,11 +93,11 @@
):
for obj_type, obj_id in (
- (CONTENT, unknown_content["sha1_git"]),
- (DIRECTORY, unknown_directory),
- (RELEASE, unknown_release),
- (REVISION, unknown_revision),
- (SNAPSHOT, unknown_snapshot),
+ (ObjectType.CONTENT, unknown_content["sha1_git"]),
+ (ObjectType.DIRECTORY, unknown_directory),
+ (ObjectType.RELEASE, unknown_release),
+ (ObjectType.REVISION, unknown_revision),
+ (ObjectType.SNAPSHOT, unknown_snapshot),
):
swhid = gen_swhid(obj_type, obj_id)
@@ -118,11 +118,11 @@
api_client, content, directory, release, revision, snapshot
):
input_swhids = [
- gen_swhid(CONTENT, content["sha1_git"]),
- gen_swhid(DIRECTORY, directory),
- gen_swhid(REVISION, revision),
- gen_swhid(RELEASE, release),
- gen_swhid(SNAPSHOT, snapshot),
+ gen_swhid(ObjectType.CONTENT, content["sha1_git"]),
+ gen_swhid(ObjectType.DIRECTORY, directory),
+ gen_swhid(ObjectType.REVISION, revision),
+ gen_swhid(ObjectType.RELEASE, release),
+ gen_swhid(ObjectType.SNAPSHOT, snapshot),
]
url = reverse("api-1-known")
@@ -134,11 +134,11 @@
@given(content(), directory())
def test_api_known_swhid_some_present(api_client, content, directory):
- content_ = gen_swhid(CONTENT, content["sha1_git"])
- directory_ = gen_swhid(DIRECTORY, directory)
- unknown_revision_ = gen_swhid(REVISION, random_sha1())
- unknown_release_ = gen_swhid(RELEASE, random_sha1())
- unknown_snapshot_ = gen_swhid(SNAPSHOT, random_sha1())
+ content_ = gen_swhid(ObjectType.CONTENT, content["sha1_git"])
+ directory_ = gen_swhid(ObjectType.DIRECTORY, directory)
+ unknown_revision_ = gen_swhid(ObjectType.REVISION, random_sha1())
+ unknown_release_ = gen_swhid(ObjectType.RELEASE, random_sha1())
+ unknown_snapshot_ = gen_swhid(ObjectType.SNAPSHOT, random_sha1())
input_swhids = [
content_,
diff --git a/swh/web/tests/browse/views/test_content.py b/swh/web/tests/browse/views/test_content.py
--- a/swh/web/tests/browse/views/test_content.py
+++ b/swh/web/tests/browse/views/test_content.py
@@ -9,7 +9,7 @@
from django.utils.html import escape
-from swh.model.identifiers import CONTENT, DIRECTORY, RELEASE, REVISION, SNAPSHOT
+from swh.model.identifiers import ObjectType
from swh.web.browse.snapshot_context import process_snapshot_branches
from swh.web.browse.utils import (
_re_encode_content,
@@ -59,7 +59,7 @@
assert_contains(resp, escape(content_display["content_data"]))
assert_contains(resp, url_raw)
- swh_cnt_id = gen_swhid(CONTENT, sha1_git)
+ swh_cnt_id = gen_swhid(ObjectType.CONTENT, sha1_git)
swh_cnt_id_url = reverse("browse-swhid", url_args={"swhid": swh_cnt_id})
assert_contains(resp, swh_cnt_id)
assert_contains(resp, swh_cnt_id_url)
@@ -84,7 +84,7 @@
assert_contains(resp, escape(content_display["content_data"]))
assert_contains(resp, url_raw)
- swh_cnt_id = gen_swhid(CONTENT, sha1_git)
+ swh_cnt_id = gen_swhid(ObjectType.CONTENT, sha1_git)
swh_cnt_id_url = reverse("browse-swhid", url_args={"swhid": swh_cnt_id})
assert_contains(resp, swh_cnt_id)
@@ -103,7 +103,7 @@
content_display = _process_content_for_display(archive_data, content)
- swh_cnt_id = gen_swhid(CONTENT, sha1_git)
+ swh_cnt_id = gen_swhid(ObjectType.CONTENT, sha1_git)
swh_cnt_id_url = reverse("browse-swhid", url_args={"swhid": swh_cnt_id})
assert_contains(resp, swh_cnt_id_url)
assert_contains(resp, escape(content_display["content_data"]))
@@ -176,11 +176,13 @@
path = path.replace(root_dir_sha1 + "/", "").replace(filename, "")
swhid_context = {
- "anchor": gen_swhid(DIRECTORY, root_dir_sha1),
+ "anchor": gen_swhid(ObjectType.DIRECTORY, root_dir_sha1),
"path": f"/{path}{filename}",
}
- swh_cnt_id = gen_swhid(CONTENT, content["sha1_git"], metadata=swhid_context)
+ swh_cnt_id = gen_swhid(
+ ObjectType.CONTENT, content["sha1_git"], metadata=swhid_context
+ )
swh_cnt_id_url = reverse("browse-swhid", url_args={"swhid": swh_cnt_id})
assert_contains(resp, swh_cnt_id)
assert_contains(resp, swh_cnt_id_url)
@@ -444,40 +446,40 @@
assert_contains(resp, f"Branch: {branch_info['name']}")
cnt_swhid = gen_swhid(
- CONTENT,
+ ObjectType.CONTENT,
directory_file["checksums"]["sha1_git"],
metadata={
"origin": origin["url"],
- "visit": gen_swhid(SNAPSHOT, snapshot["id"]),
- "anchor": gen_swhid(REVISION, branch_info["revision"]),
+ "visit": gen_swhid(ObjectType.SNAPSHOT, snapshot["id"]),
+ "anchor": gen_swhid(ObjectType.REVISION, branch_info["revision"]),
"path": f"/{directory_file['name']}",
},
)
assert_contains(resp, cnt_swhid)
dir_swhid = gen_swhid(
- DIRECTORY,
+ ObjectType.DIRECTORY,
directory,
metadata={
"origin": origin["url"],
- "visit": gen_swhid(SNAPSHOT, snapshot["id"]),
- "anchor": gen_swhid(REVISION, branch_info["revision"]),
+ "visit": gen_swhid(ObjectType.SNAPSHOT, snapshot["id"]),
+ "anchor": gen_swhid(ObjectType.REVISION, branch_info["revision"]),
},
)
assert_contains(resp, dir_swhid)
rev_swhid = gen_swhid(
- REVISION,
+ ObjectType.REVISION,
branch_info["revision"],
metadata={
"origin": origin["url"],
- "visit": gen_swhid(SNAPSHOT, snapshot["id"]),
+ "visit": gen_swhid(ObjectType.SNAPSHOT, snapshot["id"]),
},
)
assert_contains(resp, rev_swhid)
snp_swhid = gen_swhid(
- SNAPSHOT, snapshot["id"], metadata={"origin": origin["url"],},
+ ObjectType.SNAPSHOT, snapshot["id"], metadata={"origin": origin["url"],},
)
assert_contains(resp, snp_swhid)
@@ -518,50 +520,50 @@
assert_contains(resp, f"Release: {release_info['name']}")
cnt_swhid = gen_swhid(
- CONTENT,
+ ObjectType.CONTENT,
directory_file["checksums"]["sha1_git"],
metadata={
"origin": origin["url"],
- "visit": gen_swhid(SNAPSHOT, snapshot["id"]),
- "anchor": gen_swhid(RELEASE, release_info["id"]),
+ "visit": gen_swhid(ObjectType.SNAPSHOT, snapshot["id"]),
+ "anchor": gen_swhid(ObjectType.RELEASE, release_info["id"]),
"path": f"/{directory_file['name']}",
},
)
assert_contains(resp, cnt_swhid)
dir_swhid = gen_swhid(
- DIRECTORY,
+ ObjectType.DIRECTORY,
release_info["directory"],
metadata={
"origin": origin["url"],
- "visit": gen_swhid(SNAPSHOT, snapshot["id"]),
- "anchor": gen_swhid(RELEASE, release_info["id"]),
+ "visit": gen_swhid(ObjectType.SNAPSHOT, snapshot["id"]),
+ "anchor": gen_swhid(ObjectType.RELEASE, release_info["id"]),
},
)
assert_contains(resp, dir_swhid)
rev_swhid = gen_swhid(
- REVISION,
+ ObjectType.REVISION,
release_info["target"],
metadata={
"origin": origin["url"],
- "visit": gen_swhid(SNAPSHOT, snapshot["id"]),
+ "visit": gen_swhid(ObjectType.SNAPSHOT, snapshot["id"]),
},
)
assert_contains(resp, rev_swhid)
rel_swhid = gen_swhid(
- RELEASE,
+ ObjectType.RELEASE,
release_info["id"],
metadata={
"origin": origin["url"],
- "visit": gen_swhid(SNAPSHOT, snapshot["id"]),
+ "visit": gen_swhid(ObjectType.SNAPSHOT, snapshot["id"]),
},
)
assert_contains(resp, rel_swhid)
snp_swhid = gen_swhid(
- SNAPSHOT, snapshot["id"], metadata={"origin": origin["url"],},
+ ObjectType.SNAPSHOT, snapshot["id"], metadata={"origin": origin["url"],},
)
assert_contains(resp, snp_swhid)
diff --git a/swh/web/tests/browse/views/test_directory.py b/swh/web/tests/browse/views/test_directory.py
--- a/swh/web/tests/browse/views/test_directory.py
+++ b/swh/web/tests/browse/views/test_directory.py
@@ -11,7 +11,7 @@
from swh.model.from_disk import DentryPerms
from swh.model.hashutil import hash_to_bytes, hash_to_hex
-from swh.model.identifiers import DIRECTORY, RELEASE, REVISION, SNAPSHOT
+from swh.model.identifiers import ObjectType
from swh.model.model import (
Directory,
DirectoryEntry,
@@ -236,29 +236,29 @@
assert_contains(resp, f"Branch: {branch_info['name']}")
dir_swhid = gen_swhid(
- DIRECTORY,
+ ObjectType.DIRECTORY,
directory_subdir["target"],
metadata={
"origin": origin["url"],
- "visit": gen_swhid(SNAPSHOT, snapshot["id"]),
- "anchor": gen_swhid(REVISION, branch_info["revision"]),
+ "visit": gen_swhid(ObjectType.SNAPSHOT, snapshot["id"]),
+ "anchor": gen_swhid(ObjectType.REVISION, branch_info["revision"]),
"path": "/",
},
)
assert_contains(resp, dir_swhid)
rev_swhid = gen_swhid(
- REVISION,
+ ObjectType.REVISION,
branch_info["revision"],
metadata={
"origin": origin["url"],
- "visit": gen_swhid(SNAPSHOT, snapshot["id"]),
+ "visit": gen_swhid(ObjectType.SNAPSHOT, snapshot["id"]),
},
)
assert_contains(resp, rev_swhid)
snp_swhid = gen_swhid(
- SNAPSHOT, snapshot["id"], metadata={"origin": origin["url"],},
+ ObjectType.SNAPSHOT, snapshot["id"], metadata={"origin": origin["url"],},
)
assert_contains(resp, snp_swhid)
@@ -300,39 +300,39 @@
assert_contains(resp, f"Release: {release_info['name']}")
dir_swhid = gen_swhid(
- DIRECTORY,
+ ObjectType.DIRECTORY,
directory_subdir["target"],
metadata={
"origin": origin["url"],
- "visit": gen_swhid(SNAPSHOT, snapshot["id"]),
- "anchor": gen_swhid(RELEASE, release_info["id"]),
+ "visit": gen_swhid(ObjectType.SNAPSHOT, snapshot["id"]),
+ "anchor": gen_swhid(ObjectType.RELEASE, release_info["id"]),
"path": "/",
},
)
assert_contains(resp, dir_swhid)
rev_swhid = gen_swhid(
- REVISION,
+ ObjectType.REVISION,
release_info["target"],
metadata={
"origin": origin["url"],
- "visit": gen_swhid(SNAPSHOT, snapshot["id"]),
+ "visit": gen_swhid(ObjectType.SNAPSHOT, snapshot["id"]),
},
)
assert_contains(resp, rev_swhid)
rel_swhid = gen_swhid(
- RELEASE,
+ ObjectType.RELEASE,
release_info["id"],
metadata={
"origin": origin["url"],
- "visit": gen_swhid(SNAPSHOT, snapshot["id"]),
+ "visit": gen_swhid(ObjectType.SNAPSHOT, snapshot["id"]),
},
)
assert_contains(resp, rel_swhid)
snp_swhid = gen_swhid(
- SNAPSHOT, snapshot["id"], metadata={"origin": origin["url"],},
+ ObjectType.SNAPSHOT, snapshot["id"], metadata={"origin": origin["url"],},
)
assert_contains(resp, snp_swhid)
@@ -444,24 +444,24 @@
assert_contains(resp, "vault-cook-directory")
- swh_dir_id = gen_swhid(DIRECTORY, directory_entries[0]["dir_id"])
+ swh_dir_id = gen_swhid(ObjectType.DIRECTORY, directory_entries[0]["dir_id"])
swh_dir_id_url = reverse("browse-swhid", url_args={"swhid": swh_dir_id})
swhid_context = {}
if origin_url:
swhid_context["origin"] = origin_url
if snapshot_id:
- swhid_context["visit"] = gen_swhid(SNAPSHOT, snapshot_id)
+ swhid_context["visit"] = gen_swhid(ObjectType.SNAPSHOT, snapshot_id)
if root_directory_sha1 != directory_entries[0]["dir_id"]:
- swhid_context["anchor"] = gen_swhid(DIRECTORY, root_directory_sha1)
+ swhid_context["anchor"] = gen_swhid(ObjectType.DIRECTORY, root_directory_sha1)
if root_directory_sha1 != directory_entries[0]["dir_id"]:
- swhid_context["anchor"] = gen_swhid(DIRECTORY, root_directory_sha1)
+ swhid_context["anchor"] = gen_swhid(ObjectType.DIRECTORY, root_directory_sha1)
if revision_id:
- swhid_context["anchor"] = gen_swhid(REVISION, revision_id)
+ swhid_context["anchor"] = gen_swhid(ObjectType.REVISION, revision_id)
swhid_context["path"] = f"/{path}/" if path else None
swh_dir_id = gen_swhid(
- DIRECTORY, directory_entries[0]["dir_id"], metadata=swhid_context
+ ObjectType.DIRECTORY, directory_entries[0]["dir_id"], metadata=swhid_context
)
swh_dir_id_url = reverse("browse-swhid", url_args={"swhid": swh_dir_id})
assert_contains(resp, swh_dir_id)
diff --git a/swh/web/tests/browse/views/test_identifiers.py b/swh/web/tests/browse/views/test_identifiers.py
--- a/swh/web/tests/browse/views/test_identifiers.py
+++ b/swh/web/tests/browse/views/test_identifiers.py
@@ -8,7 +8,7 @@
from hypothesis import given
-from swh.model.identifiers import CONTENT, DIRECTORY, RELEASE, REVISION, SNAPSHOT
+from swh.model.identifiers import ObjectType
from swh.model.model import Origin
from swh.web.common.identifiers import gen_swhid
from swh.web.common.utils import reverse
@@ -27,7 +27,7 @@
@given(content())
def test_content_id_browse(client, content):
cnt_sha1_git = content["sha1_git"]
- swhid = gen_swhid(CONTENT, cnt_sha1_git)
+ swhid = gen_swhid(ObjectType.CONTENT, cnt_sha1_git)
url = reverse("browse-swhid", url_args={"swhid": swhid})
query_string = "sha1_git:" + cnt_sha1_git
@@ -41,7 +41,7 @@
@given(directory())
def test_directory_id_browse(client, directory):
- swhid = gen_swhid(DIRECTORY, directory)
+ swhid = gen_swhid(ObjectType.DIRECTORY, directory)
url = reverse("browse-swhid", url_args={"swhid": swhid})
directory_browse_url = reverse("browse-directory", url_args={"sha1_git": directory})
@@ -52,7 +52,7 @@
@given(revision())
def test_revision_id_browse(client, revision):
- swhid = gen_swhid(REVISION, revision)
+ swhid = gen_swhid(ObjectType.REVISION, revision)
url = reverse("browse-swhid", url_args={"swhid": swhid})
revision_browse_url = reverse("browse-revision", url_args={"sha1_git": revision})
@@ -73,7 +73,7 @@
@given(release())
def test_release_id_browse(client, release):
- swhid = gen_swhid(RELEASE, release)
+ swhid = gen_swhid(ObjectType.RELEASE, release)
url = reverse("browse-swhid", url_args={"swhid": swhid})
release_browse_url = reverse("browse-release", url_args={"sha1_git": release})
@@ -95,7 +95,7 @@
@given(snapshot())
def test_snapshot_id_browse(client, snapshot):
- swhid = gen_swhid(SNAPSHOT, snapshot)
+ swhid = gen_swhid(ObjectType.SNAPSHOT, snapshot)
url = reverse("browse-swhid", url_args={"swhid": swhid})
snapshot_browse_url = reverse("browse-snapshot", url_args={"snapshot_id": snapshot})
@@ -131,7 +131,9 @@
archive_data.origin_add([Origin(url=origin_url)])
swhid = gen_swhid(
- CONTENT, cnt_sha1_git, metadata={"lines": "4-20", "origin": origin_url},
+ ObjectType.CONTENT,
+ cnt_sha1_git,
+ metadata={"lines": "4-20", "origin": origin_url},
)
url = reverse("browse-swhid", url_args={"swhid": swhid})
@@ -165,7 +167,7 @@
[e for e in directory_content if e["type"] == "file"]
)
legacy_swhid = gen_swhid(
- CONTENT,
+ ObjectType.CONTENT,
directory_file["checksums"]["sha1_git"],
metadata={"origin": origin["url"]},
)
@@ -178,12 +180,12 @@
)
swhid = gen_swhid(
- CONTENT,
+ ObjectType.CONTENT,
directory_file["checksums"]["sha1_git"],
metadata={
"origin": origin["url"],
- "visit": gen_swhid(SNAPSHOT, snapshot["id"]),
- "anchor": gen_swhid(REVISION, revision),
+ "visit": gen_swhid(ObjectType.SNAPSHOT, snapshot["id"]),
+ "anchor": gen_swhid(ObjectType.REVISION, revision),
},
)
@@ -196,7 +198,9 @@
archive_data.origin_add([Origin(url=origin)])
origin_swhid_escaped = quote(origin, safe="/?:@&")
origin_swhid_url_escaped = quote(origin, safe="/:@;")
- swhid = gen_swhid(DIRECTORY, directory, metadata={"origin": origin_swhid_escaped})
+ swhid = gen_swhid(
+ ObjectType.DIRECTORY, directory, metadata={"origin": origin_swhid_escaped}
+ )
url = reverse("browse-swhid", url_args={"swhid": swhid})
resp = check_html_get_response(client, url, status_code=302)
diff --git a/swh/web/tests/browse/views/test_origin.py b/swh/web/tests/browse/views/test_origin.py
--- a/swh/web/tests/browse/views/test_origin.py
+++ b/swh/web/tests/browse/views/test_origin.py
@@ -12,7 +12,7 @@
from django.utils.html import escape
from swh.model.hashutil import hash_to_bytes
-from swh.model.identifiers import CONTENT, DIRECTORY, RELEASE, REVISION, SNAPSHOT
+from swh.model.identifiers import ObjectType
from swh.model.model import (
OriginVisit,
OriginVisitStatus,
@@ -755,12 +755,12 @@
swhid_context = {
"origin": origin["url"],
- "visit": gen_swhid(SNAPSHOT, snapshot["id"]),
- "anchor": gen_swhid(RELEASE, release_data["id"]),
+ "visit": gen_swhid(ObjectType.SNAPSHOT, snapshot["id"]),
+ "anchor": gen_swhid(ObjectType.RELEASE, release_data["id"]),
}
swh_dir_id = gen_swhid(
- DIRECTORY, revision_data["directory"], metadata=swhid_context
+ ObjectType.DIRECTORY, revision_data["directory"], metadata=swhid_context
)
swh_dir_id_url = reverse("browse-swhid", url_args={"swhid": swh_dir_id})
assert_contains(resp, swh_dir_id)
@@ -977,12 +977,14 @@
swhid_context = {
"origin": origin_info["url"],
- "visit": gen_swhid(SNAPSHOT, snapshot["id"]),
- "anchor": gen_swhid(REVISION, head_rev_id),
+ "visit": gen_swhid(ObjectType.SNAPSHOT, snapshot["id"]),
+ "anchor": gen_swhid(ObjectType.REVISION, head_rev_id),
"path": f"/{content_path}",
}
- swh_cnt_id = gen_swhid(CONTENT, content["sha1_git"], metadata=swhid_context)
+ swh_cnt_id = gen_swhid(
+ ObjectType.CONTENT, content["sha1_git"], metadata=swhid_context
+ )
swh_cnt_id_url = reverse("browse-swhid", url_args={"swhid": swh_cnt_id})
assert_contains(resp, swh_cnt_id)
assert_contains(resp, swh_cnt_id_url)
@@ -1117,13 +1119,13 @@
swhid_context = {
"origin": origin_info["url"],
- "visit": gen_swhid(SNAPSHOT, snapshot["id"]),
- "anchor": gen_swhid(REVISION, head_rev_id),
+ "visit": gen_swhid(ObjectType.SNAPSHOT, snapshot["id"]),
+ "anchor": gen_swhid(ObjectType.REVISION, head_rev_id),
"path": f"/{path}" if path else None,
}
swh_dir_id = gen_swhid(
- DIRECTORY, directory_entries[0]["dir_id"], metadata=swhid_context
+ ObjectType.DIRECTORY, directory_entries[0]["dir_id"], metadata=swhid_context
)
swh_dir_id_url = reverse("browse-swhid", url_args={"swhid": swh_dir_id})
assert_contains(resp, swh_dir_id)
diff --git a/swh/web/tests/browse/views/test_release.py b/swh/web/tests/browse/views/test_release.py
--- a/swh/web/tests/browse/views/test_release.py
+++ b/swh/web/tests/browse/views/test_release.py
@@ -9,7 +9,7 @@
from django.utils.html import escape
-from swh.model.identifiers import DIRECTORY, RELEASE, REVISION, SNAPSHOT
+from swh.model.identifiers import ObjectType
from swh.web.common.identifiers import gen_swhid
from swh.web.common.utils import format_utc_iso_date, reverse
from swh.web.tests.django_asserts import assert_contains
@@ -107,7 +107,7 @@
assert_contains(resp, target_type)
assert_contains(resp, '%s' % (escape(target_url), target))
- swh_rel_id = gen_swhid(RELEASE, release_id)
+ swh_rel_id = gen_swhid(ObjectType.RELEASE, release_id)
swh_rel_id_url = reverse("browse-swhid", url_args={"swhid": swh_rel_id})
assert_contains(resp, swh_rel_id)
assert_contains(resp, swh_rel_id_url)
@@ -118,7 +118,7 @@
)
assert_contains(resp, f'href="{browse_origin_url}"')
elif snapshot_id:
- swh_snp_id = gen_swhid(SNAPSHOT, snapshot_id)
+ swh_snp_id = gen_swhid(ObjectType.SNAPSHOT, snapshot_id)
swh_snp_id_url = reverse("browse-swhid", url_args={"swhid": swh_snp_id})
assert_contains(resp, f'href="{swh_snp_id_url}"')
@@ -139,9 +139,9 @@
rev_metadata["origin"] = dir_metadata["origin"] = origin_url
snapshot = archive_data.snapshot_get_latest(origin_url)
rev_metadata["visit"] = dir_metadata["visit"] = gen_swhid(
- SNAPSHOT, snapshot["id"]
+ ObjectType.SNAPSHOT, snapshot["id"]
)
- dir_metadata["anchor"] = gen_swhid(RELEASE, release_id)
+ dir_metadata["anchor"] = gen_swhid(ObjectType.RELEASE, release_id)
elif snapshot_id:
directory_url = reverse(
@@ -150,17 +150,17 @@
query_params={"release": release_data["name"],},
)
rev_metadata["visit"] = dir_metadata["visit"] = gen_swhid(
- SNAPSHOT, snapshot_id
+ ObjectType.SNAPSHOT, snapshot_id
)
- dir_metadata["anchor"] = gen_swhid(RELEASE, release_id)
+ dir_metadata["anchor"] = gen_swhid(ObjectType.RELEASE, release_id)
else:
directory_url = reverse("browse-directory", url_args={"sha1_git": rev_dir})
assert_contains(resp, escape(directory_url))
- swh_rev_id = gen_swhid(REVISION, rev["id"], metadata=rev_metadata)
+ swh_rev_id = gen_swhid(ObjectType.REVISION, rev["id"], metadata=rev_metadata)
swh_rev_id_url = reverse("browse-swhid", url_args={"swhid": swh_rev_id})
assert_contains(resp, swh_rev_id_url)
- swh_dir_id = gen_swhid(DIRECTORY, rev_dir, metadata=dir_metadata)
+ swh_dir_id = gen_swhid(ObjectType.DIRECTORY, rev_dir, metadata=dir_metadata)
swh_dir_id_url = reverse("browse-swhid", url_args={"swhid": swh_dir_id})
assert_contains(resp, swh_dir_id_url)
diff --git a/swh/web/tests/browse/views/test_revision.py b/swh/web/tests/browse/views/test_revision.py
--- a/swh/web/tests/browse/views/test_revision.py
+++ b/swh/web/tests/browse/views/test_revision.py
@@ -11,7 +11,7 @@
from django.utils.html import escape
from swh.model.hashutil import hash_to_bytes, hash_to_hex
-from swh.model.identifiers import DIRECTORY, REVISION, SNAPSHOT
+from swh.model.identifiers import ObjectType
from swh.model.model import Revision, RevisionType, TimestampWithTimezone
from swh.web.common.identifiers import gen_swhid
from swh.web.common.utils import format_utc_iso_date, parse_iso8601_date_to_utc, reverse
@@ -272,7 +272,7 @@
if origin_url:
assert_contains(resp, "swh-take-new-snapshot")
- swh_rev_id = gen_swhid(REVISION, revision)
+ swh_rev_id = gen_swhid(ObjectType.REVISION, revision)
swh_rev_id_url = reverse("browse-swhid", url_args={"swhid": swh_rev_id})
if origin_url:
@@ -289,16 +289,16 @@
if origin_url:
swhid_context["origin"] = origin_url
if snapshot:
- swhid_context["visit"] = gen_swhid(SNAPSHOT, snapshot["id"])
+ swhid_context["visit"] = gen_swhid(ObjectType.SNAPSHOT, snapshot["id"])
- swh_rev_id = gen_swhid(REVISION, revision, metadata=swhid_context)
+ swh_rev_id = gen_swhid(ObjectType.REVISION, revision, metadata=swhid_context)
swh_rev_id_url = reverse("browse-swhid", url_args={"swhid": swh_rev_id})
assert_contains(resp, swh_rev_id)
assert_contains(resp, swh_rev_id_url)
- swhid_context["anchor"] = gen_swhid(REVISION, revision)
+ swhid_context["anchor"] = gen_swhid(ObjectType.REVISION, revision)
- swh_dir_id = gen_swhid(DIRECTORY, dir_id, metadata=swhid_context)
+ swh_dir_id = gen_swhid(ObjectType.DIRECTORY, dir_id, metadata=swhid_context)
swh_dir_id_url = reverse("browse-swhid", url_args={"swhid": swh_dir_id})
assert_contains(resp, swh_dir_id)
assert_contains(resp, swh_dir_id_url)
diff --git a/swh/web/tests/common/test_archive.py b/swh/web/tests/common/test_archive.py
--- a/swh/web/tests/common/test_archive.py
+++ b/swh/web/tests/common/test_archive.py
@@ -13,7 +13,7 @@
from swh.model.from_disk import DentryPerms
from swh.model.hashutil import hash_to_bytes, hash_to_hex
-from swh.model.identifiers import CONTENT, DIRECTORY, RELEASE, REVISION, SNAPSHOT
+from swh.model.identifiers import ObjectType
from swh.model.model import Directory, DirectoryEntry, Origin, OriginVisit, Revision
from swh.web.common import archive
from swh.web.common.exc import BadInputExc, NotFoundExc
@@ -839,19 +839,19 @@
archive_data, content, directory, release, revision, snapshot
):
expected = archive_data.content_find(content)
- assert archive.lookup_object(CONTENT, content["sha1_git"]) == expected
+ assert archive.lookup_object(ObjectType.CONTENT, content["sha1_git"]) == expected
expected = archive_data.directory_get(directory)
- assert archive.lookup_object(DIRECTORY, directory) == expected
+ assert archive.lookup_object(ObjectType.DIRECTORY, directory) == expected
expected = archive_data.release_get(release)
- assert archive.lookup_object(RELEASE, release) == expected
+ assert archive.lookup_object(ObjectType.RELEASE, release) == expected
expected = archive_data.revision_get(revision)
- assert archive.lookup_object(REVISION, revision) == expected
+ assert archive.lookup_object(ObjectType.REVISION, revision) == expected
expected = {**archive_data.snapshot_get(snapshot), "next_branch": None}
- assert archive.lookup_object(SNAPSHOT, snapshot) == expected
+ assert archive.lookup_object(ObjectType.SNAPSHOT, snapshot) == expected
@given(
@@ -869,23 +869,23 @@
unknown_snapshot,
):
with pytest.raises(NotFoundExc) as e:
- archive.lookup_object(CONTENT, unknown_content["sha1_git"])
+ archive.lookup_object(ObjectType.CONTENT, unknown_content["sha1_git"])
assert e.match(r"Content.*not found")
with pytest.raises(NotFoundExc) as e:
- archive.lookup_object(DIRECTORY, unknown_directory)
+ archive.lookup_object(ObjectType.DIRECTORY, unknown_directory)
assert e.match(r"Directory.*not found")
with pytest.raises(NotFoundExc) as e:
- archive.lookup_object(RELEASE, unknown_release)
+ archive.lookup_object(ObjectType.RELEASE, unknown_release)
assert e.match(r"Release.*not found")
with pytest.raises(NotFoundExc) as e:
- archive.lookup_object(REVISION, unknown_revision)
+ archive.lookup_object(ObjectType.REVISION, unknown_revision)
assert e.match(r"Revision.*not found")
with pytest.raises(NotFoundExc) as e:
- archive.lookup_object(SNAPSHOT, unknown_snapshot)
+ archive.lookup_object(ObjectType.SNAPSHOT, unknown_snapshot)
assert e.match(r"Snapshot.*not found")
@@ -893,27 +893,23 @@
def test_lookup_invalid_objects(invalid_sha1):
with pytest.raises(BadInputExc) as e:
- archive.lookup_object("foo", invalid_sha1)
- assert e.match("Invalid swh object type")
-
- with pytest.raises(BadInputExc) as e:
- archive.lookup_object(CONTENT, invalid_sha1)
+ archive.lookup_object(ObjectType.CONTENT, invalid_sha1)
assert e.match("Invalid hash")
with pytest.raises(BadInputExc) as e:
- archive.lookup_object(DIRECTORY, invalid_sha1)
+ archive.lookup_object(ObjectType.DIRECTORY, invalid_sha1)
assert e.match("Invalid checksum")
with pytest.raises(BadInputExc) as e:
- archive.lookup_object(RELEASE, invalid_sha1)
+ archive.lookup_object(ObjectType.RELEASE, invalid_sha1)
assert e.match("Invalid checksum")
with pytest.raises(BadInputExc) as e:
- archive.lookup_object(REVISION, invalid_sha1)
+ archive.lookup_object(ObjectType.REVISION, invalid_sha1)
assert e.match("Invalid checksum")
with pytest.raises(BadInputExc) as e:
- archive.lookup_object(SNAPSHOT, invalid_sha1)
+ archive.lookup_object(ObjectType.SNAPSHOT, invalid_sha1)
assert e.match("Invalid checksum")
@@ -925,11 +921,11 @@
missing_snp = random_sha1()
grouped_swhids = {
- CONTENT: [hash_to_bytes(missing_cnt)],
- DIRECTORY: [hash_to_bytes(missing_dir)],
- REVISION: [hash_to_bytes(missing_rev)],
- RELEASE: [hash_to_bytes(missing_rel)],
- SNAPSHOT: [hash_to_bytes(missing_snp)],
+ ObjectType.CONTENT: [hash_to_bytes(missing_cnt)],
+ ObjectType.DIRECTORY: [hash_to_bytes(missing_dir)],
+ ObjectType.REVISION: [hash_to_bytes(missing_rev)],
+ ObjectType.RELEASE: [hash_to_bytes(missing_rel)],
+ ObjectType.SNAPSHOT: [hash_to_bytes(missing_snp)],
}
actual_result = archive.lookup_missing_hashes(grouped_swhids)
@@ -950,11 +946,11 @@
missing_snp = random_sha1()
grouped_swhids = {
- CONTENT: [hash_to_bytes(content["sha1_git"])],
- DIRECTORY: [hash_to_bytes(directory)],
- REVISION: [hash_to_bytes(missing_rev)],
- RELEASE: [hash_to_bytes(missing_rel)],
- SNAPSHOT: [hash_to_bytes(missing_snp)],
+ ObjectType.CONTENT: [hash_to_bytes(content["sha1_git"])],
+ ObjectType.DIRECTORY: [hash_to_bytes(directory)],
+ ObjectType.REVISION: [hash_to_bytes(missing_rev)],
+ ObjectType.RELEASE: [hash_to_bytes(missing_rel)],
+ ObjectType.SNAPSHOT: [hash_to_bytes(missing_snp)],
}
actual_result = archive.lookup_missing_hashes(grouped_swhids)
diff --git a/swh/web/tests/common/test_identifiers.py b/swh/web/tests/common/test_identifiers.py
--- a/swh/web/tests/common/test_identifiers.py
+++ b/swh/web/tests/common/test_identifiers.py
@@ -10,14 +10,7 @@
import pytest
from swh.model.hashutil import hash_to_bytes
-from swh.model.identifiers import (
- CONTENT,
- DIRECTORY,
- RELEASE,
- REVISION,
- SNAPSHOT,
- QualifiedSWHID,
-)
+from swh.model.identifiers import ObjectType, QualifiedSWHID
from swh.model.model import Origin
from swh.web.browse.snapshot_context import get_snapshot_context
from swh.web.common.exc import BadInputExc
@@ -26,6 +19,7 @@
get_swhid,
get_swhids_info,
group_swhids,
+ parse_object_type,
resolve_swhid,
)
from swh.web.common.typing import SWHObjectInfo
@@ -45,7 +39,7 @@
@given(content())
def test_gen_swhid(content):
- swh_object_type = CONTENT
+ swh_object_type = ObjectType.CONTENT
sha1_git = content["sha1_git"]
expected_swhid = "swh:1:cnt:" + sha1_git
@@ -63,36 +57,46 @@
)
with pytest.raises(BadInputExc) as e:
- gen_swhid("foo", sha1_git)
+ gen_swhid(swh_object_type, "not a valid id")
assert e.match("Invalid object")
+
+def test_parse_object_type():
+ assert parse_object_type("content") == ObjectType.CONTENT
+ assert parse_object_type("directory") == ObjectType.DIRECTORY
+ assert parse_object_type("revision") == ObjectType.REVISION
+ assert parse_object_type("release") == ObjectType.RELEASE
+ assert parse_object_type("snapshot") == ObjectType.SNAPSHOT
+
with pytest.raises(BadInputExc) as e:
- gen_swhid(swh_object_type, "not a valid id")
+ parse_object_type("foo")
assert e.match("Invalid object")
@given(content(), directory(), release(), revision(), snapshot())
def test_resolve_swhid_legacy(content, directory, release, revision, snapshot):
for obj_type, obj_id in (
- (CONTENT, content["sha1_git"]),
- (DIRECTORY, directory),
- (RELEASE, release),
- (REVISION, revision),
- (SNAPSHOT, snapshot),
+ (ObjectType.CONTENT, content["sha1_git"]),
+ (ObjectType.DIRECTORY, directory),
+ (ObjectType.RELEASE, release),
+ (ObjectType.REVISION, revision),
+ (ObjectType.SNAPSHOT, snapshot),
):
swhid = gen_swhid(obj_type, obj_id)
url_args = {}
- if obj_type == CONTENT:
+ if obj_type == ObjectType.CONTENT:
url_args["query_string"] = f"sha1_git:{obj_id}"
- elif obj_type == SNAPSHOT:
+ elif obj_type == ObjectType.SNAPSHOT:
url_args["snapshot_id"] = obj_id
else:
url_args["sha1_git"] = obj_id
query_params = {"origin_url": "some-origin"}
browse_url = reverse(
- f"browse-{obj_type}", url_args=url_args, query_params=query_params
+ f"browse-{obj_type.name.lower()}",
+ url_args=url_args,
+ query_params=query_params,
)
resolved_swhid = resolve_swhid(swhid, query_params)
@@ -108,11 +112,11 @@
@given(content(), directory(), release(), revision(), snapshot())
def test_get_swhid(content, directory, release, revision, snapshot):
for obj_type, obj_id in (
- (CONTENT, content["sha1_git"]),
- (DIRECTORY, directory),
- (RELEASE, release),
- (REVISION, revision),
- (SNAPSHOT, snapshot),
+ (ObjectType.CONTENT, content["sha1_git"]),
+ (ObjectType.DIRECTORY, directory),
+ (ObjectType.RELEASE, release),
+ (ObjectType.REVISION, revision),
+ (ObjectType.SNAPSHOT, snapshot),
):
swhid = gen_swhid(obj_type, obj_id)
swh_parsed_swhid = get_swhid(swhid)
@@ -129,11 +133,11 @@
swhids = []
expected = {}
for obj_type, obj_id in (
- (CONTENT, content["sha1_git"]),
- (DIRECTORY, directory),
- (RELEASE, release),
- (REVISION, revision),
- (SNAPSHOT, snapshot),
+ (ObjectType.CONTENT, content["sha1_git"]),
+ (ObjectType.DIRECTORY, directory),
+ (ObjectType.RELEASE, release),
+ (ObjectType.REVISION, revision),
+ (ObjectType.SNAPSHOT, snapshot),
):
swhid = gen_swhid(obj_type, obj_id)
swhid = get_swhid(swhid)
@@ -148,14 +152,14 @@
@given(directory_with_subdirs())
def test_get_swhids_info_directory_context(archive_data, directory):
swhid = get_swhids_info(
- [SWHObjectInfo(object_type=DIRECTORY, object_id=directory)],
+ [SWHObjectInfo(object_type=ObjectType.DIRECTORY, object_id=directory)],
snapshot_context=None,
)[0]
assert swhid["swhid_with_context"] is None
# path qualifier should be discarded for a root directory
swhid = get_swhids_info(
- [SWHObjectInfo(object_type=DIRECTORY, object_id=directory)],
+ [SWHObjectInfo(object_type=ObjectType.DIRECTORY, object_id=directory)],
snapshot_context=None,
extra_context={"path": "/"},
)[0]
@@ -170,7 +174,7 @@
dir_subdir_files = [e for e in dir_subdir_content if e["type"] == "file"]
swh_objects_info = [
- SWHObjectInfo(object_type=DIRECTORY, object_id=dir_subdir["target"])
+ SWHObjectInfo(object_type=ObjectType.DIRECTORY, object_id=dir_subdir["target"])
]
extra_context = {
@@ -183,7 +187,8 @@
extra_context["filename"] = dir_subdir_file["name"]
swh_objects_info.append(
SWHObjectInfo(
- object_type=CONTENT, object_id=dir_subdir_file["checksums"]["sha1_git"]
+ object_type=ObjectType.CONTENT,
+ object_id=dir_subdir_file["checksums"]["sha1_git"],
)
)
@@ -193,7 +198,7 @@
swhid_dir_parsed = get_swhid(swhids[0]["swhid_with_context"])
- anchor = gen_swhid(DIRECTORY, directory)
+ anchor = gen_swhid(ObjectType.DIRECTORY, directory)
assert swhid_dir_parsed.qualifiers() == {
"anchor": anchor,
@@ -217,15 +222,16 @@
dir_entry = random.choice(dir_content)
swh_objects = [
- SWHObjectInfo(object_type=REVISION, object_id=revision),
- SWHObjectInfo(object_type=DIRECTORY, object_id=directory),
+ SWHObjectInfo(object_type=ObjectType.REVISION, object_id=revision),
+ SWHObjectInfo(object_type=ObjectType.DIRECTORY, object_id=directory),
]
extra_context = {"revision": revision, "path": "/"}
if dir_entry["type"] == "file":
swh_objects.append(
SWHObjectInfo(
- object_type=CONTENT, object_id=dir_entry["checksums"]["sha1_git"]
+ object_type=ObjectType.CONTENT,
+ object_id=dir_entry["checksums"]["sha1_git"],
)
)
extra_context["filename"] = dir_entry["name"]
@@ -237,7 +243,7 @@
assert swhids[0]["context"] == {}
swhid_dir_parsed = get_swhid(swhids[1]["swhid_with_context"])
- anchor = gen_swhid(REVISION, revision)
+ anchor = gen_swhid(ObjectType.REVISION, revision)
assert swhid_dir_parsed.qualifiers() == {
"anchor": anchor,
@@ -290,23 +296,26 @@
for snp_ctx_params, anchor_info in (
(
{"snapshot_id": snapshot_id},
- {"anchor_type": REVISION, "anchor_id": head_rev_id},
+ {"anchor_type": ObjectType.REVISION, "anchor_id": head_rev_id},
),
(
{"snapshot_id": snapshot_id, "branch_name": branch_name},
- {"anchor_type": REVISION, "anchor_id": branches[branch_name]},
+ {
+ "anchor_type": ObjectType.REVISION,
+ "anchor_id": branches[branch_name],
+ },
),
(
{"snapshot_id": snapshot_id, "release_name": release_name},
- {"anchor_type": RELEASE, "anchor_id": releases[release]},
+ {"anchor_type": ObjectType.RELEASE, "anchor_id": releases[release]},
),
(
{"snapshot_id": snapshot_id, "revision_id": revision_id},
- {"anchor_type": REVISION, "anchor_id": revision_id},
+ {"anchor_type": ObjectType.REVISION, "anchor_id": revision_id},
),
(
{"origin_url": origin["url"], "snapshot_id": snapshot_id},
- {"anchor_type": REVISION, "anchor_id": head_rev_id},
+ {"anchor_type": ObjectType.REVISION, "anchor_id": head_rev_id},
),
(
{
@@ -314,7 +323,10 @@
"snapshot_id": snapshot_id,
"branch_name": branch_name,
},
- {"anchor_type": REVISION, "anchor_id": branches[branch_name]},
+ {
+ "anchor_type": ObjectType.REVISION,
+ "anchor_id": branches[branch_name],
+ },
),
(
{
@@ -322,7 +334,7 @@
"snapshot_id": snapshot_id,
"release_name": release_name,
},
- {"anchor_type": RELEASE, "anchor_id": releases[release]},
+ {"anchor_type": ObjectType.RELEASE, "anchor_id": releases[release]},
),
(
{
@@ -330,7 +342,7 @@
"snapshot_id": snapshot_id,
"revision_id": revision_id,
},
- {"anchor_type": REVISION, "anchor_id": revision_id},
+ {"anchor_type": ObjectType.REVISION, "anchor_id": revision_id},
),
):
@@ -346,16 +358,19 @@
swh_objects = [
SWHObjectInfo(
- object_type=CONTENT, object_id=dir_file["checksums"]["sha1_git"]
+ object_type=ObjectType.CONTENT,
+ object_id=dir_file["checksums"]["sha1_git"],
),
- SWHObjectInfo(object_type=DIRECTORY, object_id=root_dir),
- SWHObjectInfo(object_type=REVISION, object_id=rev_id),
- SWHObjectInfo(object_type=SNAPSHOT, object_id=snapshot_id),
+ SWHObjectInfo(object_type=ObjectType.DIRECTORY, object_id=root_dir),
+ SWHObjectInfo(object_type=ObjectType.REVISION, object_id=rev_id),
+ SWHObjectInfo(object_type=ObjectType.SNAPSHOT, object_id=snapshot_id),
]
if "release_name" in snp_ctx_params:
swh_objects.append(
- SWHObjectInfo(object_type=RELEASE, object_id=release_data["id"])
+ SWHObjectInfo(
+ object_type=ObjectType.RELEASE, object_id=release_data["id"]
+ )
)
swhids = get_swhids_info(
@@ -381,7 +396,9 @@
object_id=anchor_info["anchor_id"],
)
- snapshot_swhid = gen_swhid(object_type=SNAPSHOT, object_id=snapshot_id)
+ snapshot_swhid = gen_swhid(
+ object_type=ObjectType.SNAPSHOT, object_id=snapshot_id
+ )
expected_cnt_context = {
"visit": snapshot_swhid,
@@ -420,7 +437,7 @@
path = "/foo;/bar%"
swhid_info = get_swhids_info(
- [SWHObjectInfo(object_type=DIRECTORY, object_id=directory)],
+ [SWHObjectInfo(object_type=ObjectType.DIRECTORY, object_id=directory)],
snapshot_context=snapshot_context,
extra_context={"path": path},
)[0]
@@ -485,7 +502,9 @@
snapshot["id"], origin["url"], **snp_ctx_params
)
- _check_resolved_swhid_browse_url(SNAPSHOT, snapshot["id"], snapshot_context)
+ _check_resolved_swhid_browse_url(
+ ObjectType.SNAPSHOT, snapshot["id"], snapshot_context
+ )
rev = head_rev_id
if "branch_name" in snp_ctx_params:
@@ -493,15 +512,15 @@
if "revision_id" in snp_ctx_params:
rev = random_rev_id
- _check_resolved_swhid_browse_url(REVISION, rev, snapshot_context)
+ _check_resolved_swhid_browse_url(ObjectType.REVISION, rev, snapshot_context)
_check_resolved_swhid_browse_url(
- DIRECTORY, directory, snapshot_context, path="/"
+ ObjectType.DIRECTORY, directory, snapshot_context, path="/"
)
if directory_subdir:
_check_resolved_swhid_browse_url(
- DIRECTORY,
+ ObjectType.DIRECTORY,
directory_subdir["target"],
snapshot_context,
path=f"/{directory_subdir['name']}/",
@@ -509,14 +528,14 @@
if directory_file:
_check_resolved_swhid_browse_url(
- CONTENT,
+ ObjectType.CONTENT,
directory_file["target"],
snapshot_context,
path=f"/{directory_file['name']}",
)
_check_resolved_swhid_browse_url(
- CONTENT,
+ ObjectType.CONTENT,
directory_file["target"],
snapshot_context,
path=f"/{directory_file['name']}",
@@ -524,7 +543,7 @@
)
_check_resolved_swhid_browse_url(
- CONTENT,
+ ObjectType.CONTENT,
directory_file["target"],
snapshot_context,
path=f"/{directory_file['name']}",
@@ -547,15 +566,19 @@
obj_context["origin"] = origin_url
query_params["origin_url"] = origin_url
- obj_context["visit"] = gen_swhid(SNAPSHOT, snapshot_id)
+ obj_context["visit"] = gen_swhid(ObjectType.SNAPSHOT, snapshot_id)
query_params["snapshot"] = snapshot_id
- if object_type in (CONTENT, DIRECTORY, REVISION):
+ if object_type in (ObjectType.CONTENT, ObjectType.DIRECTORY, ObjectType.REVISION):
if snapshot_context["release"]:
- obj_context["anchor"] = gen_swhid(RELEASE, snapshot_context["release_id"])
+ obj_context["anchor"] = gen_swhid(
+ ObjectType.RELEASE, snapshot_context["release_id"]
+ )
query_params["release"] = snapshot_context["release"]
else:
- obj_context["anchor"] = gen_swhid(REVISION, snapshot_context["revision_id"])
+ obj_context["anchor"] = gen_swhid(
+ ObjectType.REVISION, snapshot_context["revision_id"]
+ )
if (
snapshot_context["branch"]
and snapshot_context["branch"] != snapshot_context["revision_id"]
@@ -571,18 +594,18 @@
break
query_params["branch"] = branch
- elif object_type != REVISION:
+ elif object_type != ObjectType.REVISION:
query_params["revision"] = snapshot_context["revision_id"]
if path:
obj_context["path"] = path
if path != "/":
- if object_type == CONTENT:
+ if object_type == ObjectType.CONTENT:
query_params["path"] = path[1:]
else:
query_params["path"] = path[1:-1]
- if object_type == DIRECTORY:
+ if object_type == ObjectType.DIRECTORY:
object_id = snapshot_context["root_directory"]
if lines:
@@ -593,13 +616,15 @@
obj_swhid_resolved = resolve_swhid(obj_swhid)
url_args = {"sha1_git": object_id}
- if object_type == CONTENT:
+ if object_type == ObjectType.CONTENT:
url_args = {"query_string": f"sha1_git:{object_id}"}
- elif object_type == SNAPSHOT:
+ elif object_type == ObjectType.SNAPSHOT:
url_args = {"snapshot_id": object_id}
expected_url = reverse(
- f"browse-{object_type}", url_args=url_args, query_params=query_params,
+ f"browse-{object_type.name.lower()}",
+ url_args=url_args,
+ query_params=query_params,
)
if lines:
lines_number = lines.split("-")
@@ -615,7 +640,9 @@
origin = "http://example.org/?project=abc;"
origin_swhid_escaped = quote(origin, safe="/?:@&")
origin_swhid_url_escaped = quote(origin, safe="/:@;")
- swhid = gen_swhid(DIRECTORY, directory, metadata={"origin": origin_swhid_escaped})
+ swhid = gen_swhid(
+ ObjectType.DIRECTORY, directory, metadata={"origin": origin_swhid_escaped}
+ )
resolved_swhid = resolve_swhid(swhid)
assert resolved_swhid["swhid_parsed"].origin == origin_swhid_escaped
assert origin_swhid_url_escaped in resolved_swhid["browse_url"]
@@ -627,9 +654,9 @@
dir_subdirs = [e for e in dir_content if e["type"] == "dir"]
dir_subdir = random.choice(dir_subdirs)
dir_subdir_path = dir_subdir["name"]
- anchor = gen_swhid(DIRECTORY, directory)
+ anchor = gen_swhid(ObjectType.DIRECTORY, directory)
swhid = gen_swhid(
- DIRECTORY,
+ ObjectType.DIRECTORY,
dir_subdir["target"],
metadata={"anchor": anchor, "path": "/" + dir_subdir_path},
)
@@ -647,6 +674,8 @@
origin_url = "http://example.org/project/abc"
malformed_origin_url = "http:/example.org/project/abc"
archive_data.origin_add([Origin(url=origin_url)])
- swhid = gen_swhid(DIRECTORY, directory, metadata={"origin": malformed_origin_url})
+ swhid = gen_swhid(
+ ObjectType.DIRECTORY, directory, metadata={"origin": malformed_origin_url}
+ )
resolved_swhid = resolve_swhid(swhid)
assert origin_url in resolved_swhid["browse_url"]