Page Menu
Home
Software Heritage
Search
Configure Global Search
Log In
Files
F9348509
identifiers.py
No One
Temporary
Actions
Download File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
13 KB
Subscribers
None
identifiers.py
View Options
# Copyright (C) 2020 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU Affero General Public License version 3, or any later version
# See top-level LICENSE file for more information
from
urllib.parse
import
quote
from
typing
import
cast
,
Any
,
Dict
,
Iterable
,
List
,
Optional
from
typing_extensions
import
TypedDict
from
django.http
import
QueryDict
from
swh.model.exceptions
import
ValidationError
from
swh.model.hashutil
import
hash_to_bytes
from
swh.model.identifiers
import
(
swhid
,
parse_swhid
,
CONTENT
,
DIRECTORY
,
ORIGIN
,
RELEASE
,
REVISION
,
SNAPSHOT
,
SWHID
,
)
from
swh.web.common
import
service
from
swh.web.common.exc
import
BadInputExc
from
swh.web.common.typing
import
(
QueryParameters
,
SnapshotContext
,
SWHObjectInfo
,
SWHIDInfo
,
SWHIDContext
,
)
from
swh.web.common.utils
import
reverse
def
gen_swhid
(
object_type
:
str
,
object_id
:
str
,
scheme_version
:
int
=
1
,
metadata
:
SWHIDContext
=
{},
)
->
str
:
"""
Returns the SoftWare Heritage persistent IDentifier for a swh object based on:
* the object type
* the object id
* the SWHID scheme version
Args:
object_type: the swh object type
(content/directory/release/revision/snapshot)
object_id: the swh object id (hexadecimal representation
of its hash value)
scheme_version: the scheme version of the SWHIDs
Returns:
the SWHID of the object
Raises:
BadInputExc: if the provided parameters do not enable to
generate a valid identifier
"""
try
:
obj_swhid
=
swhid
(
object_type
,
object_id
,
scheme_version
,
cast
(
Dict
[
str
,
Any
],
metadata
)
)
except
ValidationError
as
e
:
raise
BadInputExc
(
"Invalid object (
%s
) for SWHID.
%s
"
%
(
object_id
,
e
))
else
:
return
obj_swhid
class
ResolvedSWHID
(
TypedDict
):
"""parsed SWHID with context"""
swhid_parsed
:
SWHID
"""URL to browse object according to SWHID context"""
browse_url
:
Optional
[
str
]
def
resolve_swhid
(
swhid
:
str
,
query_params
:
Optional
[
QueryParameters
]
=
None
)
->
ResolvedSWHID
:
"""
Try to resolve a SoftWare Heritage persistent IDentifier into an url for
browsing the targeted object.
Args:
swhid: a SoftWare Heritage persistent IDentifier
query_params: optional dict filled with
query parameters to append to the browse url
Returns:
a dict with the following keys:
* **swhid_parsed**: the parsed identifier
* **browse_url**: the url for browsing the targeted object
"""
swhid_parsed
=
get_swhid
(
swhid
)
object_type
=
swhid_parsed
.
object_type
object_id
=
swhid_parsed
.
object_id
browse_url
=
None
url_args
=
{}
query_dict
=
QueryDict
(
""
,
mutable
=
True
)
fragment
=
""
anchor_swhid_parsed
=
None
process_lines
=
object_type
is
CONTENT
if
query_params
and
len
(
query_params
)
>
0
:
for
k
in
sorted
(
query_params
.
keys
()):
query_dict
[
k
]
=
query_params
[
k
]
if
"origin"
in
swhid_parsed
.
metadata
:
query_dict
[
"origin_url"
]
=
swhid_parsed
.
metadata
[
"origin"
]
if
"anchor"
in
swhid_parsed
.
metadata
:
anchor_swhid_parsed
=
get_swhid
(
swhid_parsed
.
metadata
[
"anchor"
])
if
"path"
in
swhid_parsed
.
metadata
and
swhid_parsed
.
metadata
[
"path"
]
!=
"/"
:
query_dict
[
"path"
]
=
swhid_parsed
.
metadata
[
"path"
]
if
anchor_swhid_parsed
:
directory
=
""
if
anchor_swhid_parsed
.
object_type
==
DIRECTORY
:
directory
=
anchor_swhid_parsed
.
object_id
elif
anchor_swhid_parsed
.
object_type
==
REVISION
:
revision
=
service
.
lookup_revision
(
anchor_swhid_parsed
.
object_id
)
directory
=
revision
[
"directory"
]
elif
anchor_swhid_parsed
.
object_type
==
RELEASE
:
release
=
service
.
lookup_release
(
anchor_swhid_parsed
.
object_id
)
if
release
[
"target_type"
]
==
REVISION
:
revision
=
service
.
lookup_revision
(
release
[
"target"
])
directory
=
revision
[
"directory"
]
if
object_type
==
CONTENT
:
if
"origin"
not
in
swhid_parsed
.
metadata
:
# when no origin context, content objects need to have their
# path prefixed by root directory id for proper breadcrumbs display
query_dict
[
"path"
]
=
directory
+
query_dict
[
"path"
]
else
:
# remove leading slash from SWHID content path
query_dict
[
"path"
]
=
query_dict
[
"path"
][
1
:]
elif
object_type
==
DIRECTORY
:
object_id
=
directory
# remove leading and trailing slashes from SWHID directory path
query_dict
[
"path"
]
=
query_dict
[
"path"
][
1
:
-
1
]
# snapshot context
if
"visit"
in
swhid_parsed
.
metadata
:
snp_swhid_parsed
=
get_swhid
(
swhid_parsed
.
metadata
[
"visit"
])
if
snp_swhid_parsed
.
object_type
!=
SNAPSHOT
:
raise
BadInputExc
(
"Visit must be a snapshot SWHID."
)
query_dict
[
"snapshot"
]
=
snp_swhid_parsed
.
object_id
if
anchor_swhid_parsed
:
if
anchor_swhid_parsed
.
object_type
==
REVISION
:
# check if the anchor revision is the tip of a branch
branch_name
=
service
.
lookup_snapshot_branch_name_from_tip_revision
(
snp_swhid_parsed
.
object_id
,
anchor_swhid_parsed
.
object_id
)
if
branch_name
:
query_dict
[
"branch"
]
=
branch_name
elif
object_type
!=
REVISION
:
query_dict
[
"revision"
]
=
anchor_swhid_parsed
.
object_id
elif
anchor_swhid_parsed
.
object_type
==
RELEASE
:
release
=
service
.
lookup_release
(
anchor_swhid_parsed
.
object_id
)
if
release
:
query_dict
[
"release"
]
=
release
[
"name"
]
if
object_type
==
REVISION
and
"release"
not
in
query_dict
:
branch_name
=
service
.
lookup_snapshot_branch_name_from_tip_revision
(
snp_swhid_parsed
.
object_id
,
object_id
)
if
branch_name
:
query_dict
[
"branch"
]
=
branch_name
# browsing content or directory without snapshot context
elif
object_type
in
(
CONTENT
,
DIRECTORY
)
and
anchor_swhid_parsed
:
if
anchor_swhid_parsed
.
object_type
==
REVISION
:
# anchor revision, objects are browsed from its view
object_type
=
REVISION
object_id
=
anchor_swhid_parsed
.
object_id
elif
object_type
==
DIRECTORY
and
anchor_swhid_parsed
.
object_type
==
DIRECTORY
:
# a directory is browsed from its root
object_id
=
anchor_swhid_parsed
.
object_id
if
object_type
==
CONTENT
:
url_args
[
"query_string"
]
=
f
"sha1_git:{object_id}"
elif
object_type
==
DIRECTORY
:
url_args
[
"sha1_git"
]
=
object_id
elif
object_type
==
RELEASE
:
url_args
[
"sha1_git"
]
=
object_id
elif
object_type
==
REVISION
:
url_args
[
"sha1_git"
]
=
object_id
elif
object_type
==
SNAPSHOT
:
url_args
[
"snapshot_id"
]
=
object_id
elif
object_type
==
ORIGIN
:
raise
BadInputExc
(
(
"Origin SWHIDs are not publicly resolvable because they are for "
"internal usage only"
)
)
if
"lines"
in
swhid_parsed
.
metadata
and
process_lines
:
lines
=
swhid_parsed
.
metadata
[
"lines"
]
.
split
(
"-"
)
fragment
+=
"#L"
+
lines
[
0
]
if
len
(
lines
)
>
1
:
fragment
+=
"-L"
+
lines
[
1
]
if
url_args
:
browse_url
=
(
reverse
(
f
"browse-{object_type}"
,
url_args
=
url_args
,
query_params
=
query_dict
,
)
+
fragment
)
return
ResolvedSWHID
(
swhid_parsed
=
swhid_parsed
,
browse_url
=
browse_url
)
def
get_swhid
(
swhid
:
str
)
->
SWHID
:
"""Check if a SWHID is valid and return it parsed.
Args:
swhid: a SoftWare Heritage persistent IDentifier.
Raises:
BadInputExc: if the provided SWHID can not be parsed.
Return:
A parsed SWHID.
"""
try
:
swhid_parsed
=
parse_swhid
(
swhid
)
except
ValidationError
as
ve
:
raise
BadInputExc
(
"Error when parsing identifier:
%s
"
%
" "
.
join
(
ve
.
messages
))
else
:
return
swhid_parsed
def
group_swhids
(
swhids
:
Iterable
[
SWHID
],)
->
Dict
[
str
,
List
[
bytes
]]:
"""
Groups many SoftWare Heritage persistent IDentifiers into a
dictionary depending on their type.
Args:
swhids: an iterable of SoftWare Heritage persistent
IDentifier objects
Returns:
A dictionary with:
keys: object types
values: object hashes
"""
swhids_by_type
:
Dict
[
str
,
List
[
bytes
]]
=
{
CONTENT
:
[],
DIRECTORY
:
[],
REVISION
:
[],
RELEASE
:
[],
SNAPSHOT
:
[],
}
for
obj_swhid
in
swhids
:
obj_id
=
obj_swhid
.
object_id
obj_type
=
obj_swhid
.
object_type
swhids_by_type
[
obj_type
]
.
append
(
hash_to_bytes
(
obj_id
))
return
swhids_by_type
def
get_swhids_info
(
swh_objects
:
Iterable
[
SWHObjectInfo
],
snapshot_context
:
Optional
[
SnapshotContext
]
=
None
,
extra_context
:
Optional
[
Dict
[
str
,
Any
]]
=
None
,
)
->
List
[
SWHIDInfo
]:
"""
Returns a list of dict containing info related to SWHIDs of objects.
Args:
swh_objects: an iterable of dict describing archived objects
snapshot_context: optional dict parameter describing the snapshot in
which the objects have been found
extra_context: optional dict filled with extra contextual info about
the objects
Returns:
a list of dict containing SWHIDs info
"""
swhids_info
=
[]
for
swh_object
in
swh_objects
:
if
not
swh_object
[
"object_id"
]:
swhids_info
.
append
(
SWHIDInfo
(
object_type
=
swh_object
[
"object_type"
],
object_id
=
""
,
swhid
=
""
,
swhid_url
=
""
,
context
=
{},
swhid_with_context
=
None
,
swhid_with_context_url
=
None
,
)
)
continue
object_type
=
swh_object
[
"object_type"
]
object_id
=
swh_object
[
"object_id"
]
swhid_context
:
SWHIDContext
=
{}
if
snapshot_context
:
if
snapshot_context
[
"origin_info"
]
is
not
None
:
swhid_context
[
"origin"
]
=
quote
(
snapshot_context
[
"origin_info"
][
"url"
],
safe
=
"/?:@&"
)
if
object_type
!=
SNAPSHOT
:
swhid_context
[
"visit"
]
=
gen_swhid
(
SNAPSHOT
,
snapshot_context
[
"snapshot_id"
]
)
if
object_type
in
(
CONTENT
,
DIRECTORY
):
if
snapshot_context
[
"release_id"
]
is
not
None
:
swhid_context
[
"anchor"
]
=
gen_swhid
(
RELEASE
,
snapshot_context
[
"release_id"
]
)
elif
snapshot_context
[
"revision_id"
]
is
not
None
:
swhid_context
[
"anchor"
]
=
gen_swhid
(
REVISION
,
snapshot_context
[
"revision_id"
]
)
if
object_type
in
(
CONTENT
,
DIRECTORY
):
if
(
extra_context
and
"revision"
in
extra_context
and
extra_context
[
"revision"
]
and
"anchor"
not
in
swhid_context
):
swhid_context
[
"anchor"
]
=
gen_swhid
(
REVISION
,
extra_context
[
"revision"
])
elif
(
extra_context
and
"root_directory"
in
extra_context
and
extra_context
[
"root_directory"
]
and
"anchor"
not
in
swhid_context
and
(
object_type
!=
DIRECTORY
or
extra_context
[
"root_directory"
]
!=
object_id
)
):
swhid_context
[
"anchor"
]
=
gen_swhid
(
DIRECTORY
,
extra_context
[
"root_directory"
]
)
path
=
None
if
extra_context
and
"path"
in
extra_context
:
path
=
extra_context
[
"path"
]
or
"/"
if
"filename"
in
extra_context
and
object_type
==
CONTENT
:
path
+=
extra_context
[
"filename"
]
if
path
:
swhid_context
[
"path"
]
=
quote
(
path
,
safe
=
"/?:@&"
)
swhid
=
gen_swhid
(
object_type
,
object_id
)
swhid_url
=
reverse
(
"browse-swhid"
,
url_args
=
{
"swhid"
:
swhid
})
swhid_with_context
=
None
swhid_with_context_url
=
None
if
swhid_context
:
swhid_with_context
=
gen_swhid
(
object_type
,
object_id
,
metadata
=
swhid_context
)
swhid_with_context_url
=
reverse
(
"browse-swhid"
,
url_args
=
{
"swhid"
:
swhid_with_context
}
)
swhids_info
.
append
(
SWHIDInfo
(
object_type
=
object_type
,
object_id
=
object_id
,
swhid
=
swhid
,
swhid_url
=
swhid_url
,
context
=
swhid_context
,
swhid_with_context
=
swhid_with_context
,
swhid_with_context_url
=
swhid_with_context_url
,
)
)
return
swhids_info
File Metadata
Details
Attached
Mime Type
text/x-python
Expires
Jul 4 2025, 6:34 PM (5 w, 5 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3393258
Attached To
R65 Staging repository
Event Timeline
Log In to Comment