Page Menu
Home
Software Heritage
Search
Configure Global Search
Log In
Files
F9344586
identifiers.py
No One
Temporary
Actions
Download File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
13 KB
Subscribers
None
identifiers.py
View Options
# Copyright (C) 2020-2022 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU Affero General Public License version 3, or any later version
# See top-level LICENSE file for more information
from
typing
import
Any
,
Dict
,
Iterable
,
List
,
Mapping
,
Optional
from
urllib.parse
import
quote
,
unquote
from
typing_extensions
import
TypedDict
from
swh.model.exceptions
import
ValidationError
from
swh.model.hashutil
import
hash_to_bytes
,
hash_to_hex
from
swh.model.swhids
import
ObjectType
,
QualifiedSWHID
from
swh.web.common
import
archive
from
swh.web.common.exc
import
BadInputExc
from
swh.web.common.typing
import
(
SnapshotContext
,
SWHIDContext
,
SWHIDInfo
,
SWHObjectInfo
,
)
from
swh.web.common.utils
import
reverse
def
parse_object_type
(
object_type
:
str
)
->
ObjectType
:
try
:
return
ObjectType
[
object_type
.
upper
()]
except
KeyError
:
valid_types
=
", "
.
join
(
variant
.
name
.
lower
()
for
variant
in
ObjectType
)
raise
BadInputExc
(
f
"Invalid swh object type! Valid types are {valid_types}; not {object_type}"
)
def
gen_swhid
(
object_type
:
ObjectType
,
object_id
:
str
,
scheme_version
:
int
=
1
,
metadata
:
SWHIDContext
=
{},
)
->
str
:
"""
Returns the SoftWare Heritage persistent IDentifier for a swh object based on:
* the object type
* the object id
* the SWHID scheme version
Args:
object_type: the swh object type
(content/directory/release/revision/snapshot)
object_id: the swh object id (hexadecimal representation
of its hash value)
scheme_version: the scheme version of the SWHIDs
Returns:
the SWHID of the object
Raises:
BadInputExc: if the provided parameters do not enable to
generate a valid identifier
"""
try
:
decoded_object_id
=
hash_to_bytes
(
object_id
)
obj_swhid
=
str
(
QualifiedSWHID
(
object_type
=
object_type
,
object_id
=
decoded_object_id
,
scheme_version
=
scheme_version
,
**
metadata
,
)
)
except
(
ValidationError
,
KeyError
,
ValueError
)
as
e
:
raise
BadInputExc
(
"Invalid object (
%s
) for SWHID.
%s
"
%
(
object_id
,
e
))
else
:
return
obj_swhid
class
ResolvedSWHID
(
TypedDict
):
"""parsed SWHID with context"""
swhid_parsed
:
QualifiedSWHID
"""URL to browse object according to SWHID context"""
browse_url
:
Optional
[
str
]
def
resolve_swhid
(
swhid
:
str
,
query_params
:
Optional
[
Mapping
[
str
,
str
]]
=
None
)
->
ResolvedSWHID
:
"""
Try to resolve a SoftWare Heritage persistent IDentifier into an url for
browsing the targeted object.
Args:
swhid: a SoftWare Heritage persistent IDentifier
query_params: optional dict filled with
query parameters to append to the browse url
Returns:
a dict with the following keys:
* **swhid_parsed**: the parsed identifier
* **browse_url**: the url for browsing the targeted object
"""
swhid_parsed
=
get_swhid
(
swhid
)
object_type
=
swhid_parsed
.
object_type
object_id
=
swhid_parsed
.
object_id
browse_url
=
None
url_args
=
{}
fragment
=
""
process_lines
=
object_type
==
ObjectType
.
CONTENT
query_dict
:
Dict
[
str
,
str
]
=
dict
(
query_params
or
{})
if
swhid_parsed
.
origin
:
origin_url
=
unquote
(
swhid_parsed
.
origin
)
origin_url
=
archive
.
lookup_origin
({
"url"
:
origin_url
})[
"url"
]
query_dict
[
"origin_url"
]
=
origin_url
if
swhid_parsed
.
path
and
swhid_parsed
.
path
!=
b
"/"
:
query_dict
[
"path"
]
=
swhid_parsed
.
path
.
decode
(
"utf8"
,
errors
=
"replace"
)
if
swhid_parsed
.
anchor
:
directory
=
b
""
if
swhid_parsed
.
anchor
.
object_type
==
ObjectType
.
DIRECTORY
:
directory
=
swhid_parsed
.
anchor
.
object_id
elif
swhid_parsed
.
anchor
.
object_type
==
ObjectType
.
REVISION
:
revision
=
archive
.
lookup_revision
(
hash_to_hex
(
swhid_parsed
.
anchor
.
object_id
)
)
directory
=
revision
[
"directory"
]
elif
swhid_parsed
.
anchor
.
object_type
==
ObjectType
.
RELEASE
:
release
=
archive
.
lookup_release
(
hash_to_hex
(
swhid_parsed
.
anchor
.
object_id
)
)
if
release
[
"target_type"
]
==
ObjectType
.
REVISION
.
name
.
lower
():
revision
=
archive
.
lookup_revision
(
release
[
"target"
])
directory
=
revision
[
"directory"
]
if
object_type
==
ObjectType
.
CONTENT
:
if
(
not
swhid_parsed
.
origin
and
swhid_parsed
.
anchor
.
object_type
!=
ObjectType
.
REVISION
):
# when no origin or revision context, content objects need to have
# their path prefixed by root directory id for breadcrumbs display
query_dict
[
"path"
]
=
hash_to_hex
(
directory
)
+
query_dict
[
"path"
]
elif
query_dict
[
"path"
]
is
not
None
:
# remove leading slash from SWHID content path
query_dict
[
"path"
]
=
query_dict
[
"path"
]
.
lstrip
(
"/"
)
elif
object_type
==
ObjectType
.
DIRECTORY
and
query_dict
[
"path"
]
is
not
None
:
object_id
=
directory
# remove leading and trailing slashes from SWHID directory path
query_dict
[
"path"
]
=
query_dict
[
"path"
]
.
strip
(
"/"
)
# snapshot context
if
swhid_parsed
.
visit
:
if
swhid_parsed
.
visit
.
object_type
!=
ObjectType
.
SNAPSHOT
:
raise
BadInputExc
(
"Visit must be a snapshot SWHID."
)
query_dict
[
"snapshot"
]
=
hash_to_hex
(
swhid_parsed
.
visit
.
object_id
)
if
swhid_parsed
.
anchor
:
if
(
swhid_parsed
.
anchor
.
object_type
==
ObjectType
.
REVISION
and
object_type
!=
ObjectType
.
REVISION
):
query_dict
[
"revision"
]
=
hash_to_hex
(
swhid_parsed
.
anchor
.
object_id
)
elif
swhid_parsed
.
anchor
.
object_type
==
ObjectType
.
RELEASE
:
release
=
archive
.
lookup_release
(
hash_to_hex
(
swhid_parsed
.
anchor
.
object_id
)
)
if
release
:
query_dict
[
"release"
]
=
release
[
"name"
]
# browsing content or directory without snapshot context
elif
(
object_type
in
(
ObjectType
.
CONTENT
,
ObjectType
.
DIRECTORY
)
and
swhid_parsed
.
anchor
):
if
swhid_parsed
.
anchor
.
object_type
==
ObjectType
.
REVISION
:
# anchor revision, objects are browsed from its view
object_type
=
ObjectType
.
REVISION
object_id
=
swhid_parsed
.
anchor
.
object_id
elif
(
object_type
==
ObjectType
.
DIRECTORY
and
swhid_parsed
.
anchor
.
object_type
==
ObjectType
.
DIRECTORY
):
# a directory is browsed from its root
object_id
=
swhid_parsed
.
anchor
.
object_id
if
object_type
==
ObjectType
.
CONTENT
:
url_args
[
"query_string"
]
=
f
"sha1_git:{hash_to_hex(object_id)}"
elif
object_type
in
(
ObjectType
.
DIRECTORY
,
ObjectType
.
RELEASE
,
ObjectType
.
REVISION
):
url_args
[
"sha1_git"
]
=
hash_to_hex
(
object_id
)
elif
object_type
==
ObjectType
.
SNAPSHOT
:
url_args
[
"snapshot_id"
]
=
hash_to_hex
(
object_id
)
if
swhid_parsed
.
lines
and
process_lines
:
lines
=
swhid_parsed
.
lines
fragment
+=
"#L"
+
str
(
lines
[
0
])
if
lines
[
1
]:
fragment
+=
"-L"
+
str
(
lines
[
1
])
if
url_args
:
browse_url
=
(
reverse
(
f
"browse-{object_type.name.lower()}"
,
url_args
=
url_args
,
query_params
=
query_dict
,
)
+
fragment
)
return
ResolvedSWHID
(
swhid_parsed
=
swhid_parsed
,
browse_url
=
browse_url
)
def
get_swhid
(
swhid
:
str
)
->
QualifiedSWHID
:
"""Check if a SWHID is valid and return it parsed.
Args:
swhid: a SoftWare Heritage persistent IDentifier.
Raises:
BadInputExc: if the provided SWHID can not be parsed.
Return:
A parsed SWHID.
"""
try
:
# ensure core part of SWHID is in lower case to avoid parsing error
(
core
,
sep
,
qualifiers
)
=
swhid
.
partition
(
";"
)
core
=
core
.
lower
()
return
QualifiedSWHID
.
from_string
(
core
+
sep
+
qualifiers
)
except
ValidationError
as
ve
:
raise
BadInputExc
(
"Error when parsing identifier:
%s
"
%
" "
.
join
(
ve
.
messages
))
def
group_swhids
(
swhids
:
Iterable
[
QualifiedSWHID
],
)
->
Dict
[
ObjectType
,
List
[
bytes
]]:
"""
Groups many SoftWare Heritage persistent IDentifiers into a
dictionary depending on their type.
Args:
swhids: an iterable of SoftWare Heritage persistent
IDentifier objects
Returns:
A dictionary with:
keys: object types
values: object hashes
"""
swhids_by_type
:
Dict
[
ObjectType
,
List
[
bytes
]]
=
{
ObjectType
.
CONTENT
:
[],
ObjectType
.
DIRECTORY
:
[],
ObjectType
.
REVISION
:
[],
ObjectType
.
RELEASE
:
[],
ObjectType
.
SNAPSHOT
:
[],
}
for
obj_swhid
in
swhids
:
obj_id
=
obj_swhid
.
object_id
obj_type
=
obj_swhid
.
object_type
swhids_by_type
[
obj_type
]
.
append
(
hash_to_bytes
(
obj_id
))
return
swhids_by_type
def
get_swhids_info
(
swh_objects
:
Iterable
[
SWHObjectInfo
],
snapshot_context
:
Optional
[
SnapshotContext
]
=
None
,
extra_context
:
Optional
[
Mapping
[
str
,
Any
]]
=
None
,
)
->
List
[
SWHIDInfo
]:
"""
Returns a list of dict containing info related to SWHIDs of objects.
Args:
swh_objects: an iterable of dict describing archived objects
snapshot_context: optional dict parameter describing the snapshot in
which the objects have been found
extra_context: optional dict filled with extra contextual info about
the objects
Returns:
a list of dict containing SWHIDs info
"""
swhids_info
=
[]
for
swh_object
in
swh_objects
:
if
not
swh_object
[
"object_id"
]:
swhids_info
.
append
(
SWHIDInfo
(
object_type
=
swh_object
[
"object_type"
],
object_id
=
""
,
swhid
=
""
,
swhid_url
=
""
,
context
=
{},
swhid_with_context
=
None
,
swhid_with_context_url
=
None
,
)
)
continue
object_type
=
swh_object
[
"object_type"
]
object_id
=
swh_object
[
"object_id"
]
swhid_context
:
SWHIDContext
=
{}
if
snapshot_context
:
if
snapshot_context
[
"origin_info"
]
is
not
None
:
swhid_context
[
"origin"
]
=
quote
(
snapshot_context
[
"origin_info"
][
"url"
],
safe
=
"/?:@&"
)
if
object_type
!=
ObjectType
.
SNAPSHOT
:
swhid_context
[
"visit"
]
=
gen_swhid
(
ObjectType
.
SNAPSHOT
,
snapshot_context
[
"snapshot_id"
]
)
if
object_type
in
(
ObjectType
.
CONTENT
,
ObjectType
.
DIRECTORY
):
if
snapshot_context
[
"release_id"
]
is
not
None
:
swhid_context
[
"anchor"
]
=
gen_swhid
(
ObjectType
.
RELEASE
,
snapshot_context
[
"release_id"
]
)
elif
snapshot_context
[
"revision_id"
]
is
not
None
:
swhid_context
[
"anchor"
]
=
gen_swhid
(
ObjectType
.
REVISION
,
snapshot_context
[
"revision_id"
]
)
if
object_type
in
(
ObjectType
.
CONTENT
,
ObjectType
.
DIRECTORY
):
if
(
extra_context
and
"revision"
in
extra_context
and
extra_context
[
"revision"
]
and
"anchor"
not
in
swhid_context
):
swhid_context
[
"anchor"
]
=
gen_swhid
(
ObjectType
.
REVISION
,
extra_context
[
"revision"
]
)
elif
(
extra_context
and
"root_directory"
in
extra_context
and
extra_context
[
"root_directory"
]
and
"anchor"
not
in
swhid_context
and
(
object_type
!=
ObjectType
.
DIRECTORY
or
extra_context
[
"root_directory"
]
!=
object_id
)
):
swhid_context
[
"anchor"
]
=
gen_swhid
(
ObjectType
.
DIRECTORY
,
extra_context
[
"root_directory"
]
)
path
=
None
if
extra_context
and
"path"
in
extra_context
:
path
=
extra_context
[
"path"
]
or
"/"
if
"filename"
in
extra_context
and
object_type
==
ObjectType
.
CONTENT
:
path
+=
extra_context
[
"filename"
]
if
object_type
==
ObjectType
.
DIRECTORY
and
path
==
"/"
:
path
=
None
if
path
:
swhid_context
[
"path"
]
=
quote
(
path
,
safe
=
"/?:@&"
)
swhid
=
gen_swhid
(
object_type
,
object_id
)
swhid_url
=
reverse
(
"browse-swhid"
,
url_args
=
{
"swhid"
:
swhid
})
swhid_with_context
=
None
swhid_with_context_url
=
None
if
swhid_context
:
swhid_with_context
=
gen_swhid
(
object_type
,
object_id
,
metadata
=
swhid_context
)
swhid_with_context_url
=
reverse
(
"browse-swhid"
,
url_args
=
{
"swhid"
:
swhid_with_context
}
)
swhids_info
.
append
(
SWHIDInfo
(
object_type
=
object_type
,
object_id
=
object_id
,
swhid
=
swhid
,
swhid_url
=
swhid_url
,
context
=
swhid_context
,
swhid_with_context
=
swhid_with_context
,
swhid_with_context_url
=
swhid_with_context_url
,
)
)
return
swhids_info
File Metadata
Details
Attached
Mime Type
text/x-python
Expires
Fri, Jul 4, 2:35 PM (3 d, 18 h ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3296740
Attached To
R65 Staging repository
Event Timeline
Log In to Comment