Page Menu
Home
Software Heritage
Search
Configure Global Search
Log In
Files
F9340306
deposit_read.py
No One
Temporary
Actions
Download File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
7 KB
Subscribers
None
deposit_read.py
View Options
# Copyright (C) 2017-2022 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
from
contextlib
import
contextmanager
import
os
import
shutil
import
tempfile
from
typing
import
Any
,
Dict
,
Optional
,
Tuple
from
xml.etree
import
ElementTree
from
rest_framework
import
status
from
swh.core
import
tarball
from
swh.deposit.utils
import
NAMESPACES
,
normalize_date
from
swh.model.hashutil
import
hash_to_hex
from
swh.model.model
import
MetadataAuthorityType
from
swh.model.swhids
import
CoreSWHID
from
.
import
APIPrivateView
,
DepositReadMixin
from
...config
import
ARCHIVE_TYPE
,
SWH_PERSON
from
...models
import
Deposit
from
..common
import
APIGet
@contextmanager
def
aggregate_tarballs
(
extraction_dir
,
archive_paths
):
"""Aggregate multiple tarballs into one and returns this new archive's
path.
Args:
extraction_dir (path): Path to use for the tarballs computation
archive_paths ([str]): Deposit's archive paths
Returns:
Tuple (directory to clean up, archive path (aggregated or not))
"""
# rebuild one zip archive from (possibly) multiple ones
os
.
makedirs
(
extraction_dir
,
0
o755
,
exist_ok
=
True
)
dir_path
=
tempfile
.
mkdtemp
(
prefix
=
"swh.deposit-"
,
dir
=
extraction_dir
)
# root folder to build an aggregated tarball
aggregated_tarball_rootdir
=
os
.
path
.
join
(
dir_path
,
"aggregate"
)
os
.
makedirs
(
aggregated_tarball_rootdir
,
0
o755
,
exist_ok
=
True
)
# uncompress in a temporary location all archives
for
archive_path
in
archive_paths
:
tarball
.
uncompress
(
archive_path
,
aggregated_tarball_rootdir
)
# Aggregate into one big tarball the multiple smaller ones
temp_tarpath
=
shutil
.
make_archive
(
aggregated_tarball_rootdir
,
"tar"
,
aggregated_tarball_rootdir
)
# can already clean up temporary directory
shutil
.
rmtree
(
aggregated_tarball_rootdir
)
try
:
yield
temp_tarpath
finally
:
shutil
.
rmtree
(
dir_path
)
class
APIReadArchives
(
APIPrivateView
,
APIGet
,
DepositReadMixin
):
"""Dedicated class to read a deposit's raw archives content.
Only GET is supported.
"""
def
__init__
(
self
):
super
()
.
__init__
()
self
.
extraction_dir
=
self
.
config
[
"extraction_dir"
]
if
not
os
.
path
.
exists
(
self
.
extraction_dir
):
os
.
makedirs
(
self
.
extraction_dir
)
def
process_get
(
self
,
request
,
collection_name
:
str
,
deposit
:
Deposit
)
->
Tuple
[
int
,
Any
,
str
]:
"""Build a unique tarball from the multiple received and stream that
content to the client.
Args:
request (Request):
collection_name: Collection owning the deposit
deposit: Deposit concerned by the reading
Returns:
Tuple status, stream of content, content-type
"""
archive_paths
=
[
r
.
archive
.
path
for
r
in
self
.
_deposit_requests
(
deposit
,
request_type
=
ARCHIVE_TYPE
)
]
return
(
status
.
HTTP_200_OK
,
aggregate_tarballs
(
self
.
extraction_dir
,
archive_paths
),
"swh/generator"
,
)
class
APIReadMetadata
(
APIPrivateView
,
APIGet
,
DepositReadMixin
):
"""Class in charge of aggregating metadata on a deposit."""
def
_parse_dates
(
self
,
deposit
:
Deposit
,
metadata
:
ElementTree
.
Element
)
->
Tuple
[
dict
,
dict
]:
"""Normalize the date to use as a tuple of author date, committer date
from the incoming metadata.
Returns:
Tuple of author date, committer date. Those dates are
swh normalized.
"""
commit_date_elt
=
metadata
.
find
(
"codemeta:datePublished"
,
namespaces
=
NAMESPACES
)
author_date_elt
=
metadata
.
find
(
"codemeta:dateCreated"
,
namespaces
=
NAMESPACES
)
author_date
:
Any
commit_date
:
Any
if
author_date_elt
is
None
and
commit_date_elt
is
None
:
author_date
=
commit_date
=
deposit
.
complete_date
elif
commit_date_elt
is
None
:
author_date
=
commit_date
=
author_date_elt
.
text
# type: ignore
elif
author_date_elt
is
None
:
author_date
=
commit_date
=
commit_date_elt
.
text
else
:
author_date
=
author_date_elt
.
text
commit_date
=
commit_date_elt
.
text
return
(
normalize_date
(
author_date
),
normalize_date
(
commit_date
))
def
metadata_read
(
self
,
deposit
:
Deposit
)
->
Dict
[
str
,
Any
]:
"""Read and aggregate multiple deposit information into one unified dictionary.
Args:
deposit: Deposit to retrieve information from
Returns:
Dictionary of deposit information read by the deposit loader, with the
following keys:
**origin** (Dict): Information about the origin
**raw_metadata** (str): List of raw metadata received for the
deposit
**provider** (Dict): the metadata provider information about the
deposit client
**tool** (Dict): the deposit information
**deposit** (Dict): deposit information relevant to build the revision
(author_date, committer_date, etc...)
"""
raw_metadata
=
self
.
_metadata_get
(
deposit
)
author_date
:
Optional
[
dict
]
commit_date
:
Optional
[
dict
]
if
raw_metadata
:
metadata_tree
=
ElementTree
.
fromstring
(
raw_metadata
)
author_date
,
commit_date
=
self
.
_parse_dates
(
deposit
,
metadata_tree
)
release_notes_elements
=
metadata_tree
.
findall
(
"codemeta:releaseNotes"
,
namespaces
=
NAMESPACES
)
else
:
author_date
=
commit_date
=
None
release_notes_elements
=
[]
if
deposit
.
parent
and
deposit
.
parent
.
swhid
:
parent_swhid
=
deposit
.
parent
.
swhid
assert
parent_swhid
is
not
None
swhid
=
CoreSWHID
.
from_string
(
parent_swhid
)
parent_revision
=
hash_to_hex
(
swhid
.
object_id
)
parents
=
[
parent_revision
]
else
:
parents
=
[]
release_notes
:
Optional
[
str
]
if
release_notes_elements
:
release_notes
=
"
\n\n
"
.
join
(
element
.
text
for
element
in
release_notes_elements
if
element
.
text
)
else
:
release_notes
=
None
return
{
"origin"
:
{
"type"
:
"deposit"
,
"url"
:
deposit
.
origin_url
},
"provider"
:
{
"provider_name"
:
deposit
.
client
.
last_name
,
"provider_url"
:
deposit
.
client
.
provider_url
,
"provider_type"
:
MetadataAuthorityType
.
DEPOSIT_CLIENT
.
value
,
"metadata"
:
{},
},
"tool"
:
self
.
tool
,
"raw_metadata"
:
raw_metadata
,
"deposit"
:
{
"id"
:
deposit
.
id
,
"client"
:
deposit
.
client
.
username
,
"collection"
:
deposit
.
collection
.
name
,
"author"
:
SWH_PERSON
,
"author_date"
:
author_date
,
"committer"
:
SWH_PERSON
,
"committer_date"
:
commit_date
,
"revision_parents"
:
parents
,
"release_notes"
:
release_notes
,
},
}
def
process_get
(
self
,
request
,
collection_name
:
str
,
deposit
:
Deposit
)
->
Tuple
[
int
,
Dict
,
str
]:
data
=
self
.
metadata_read
(
deposit
)
return
status
.
HTTP_200_OK
,
data
if
data
else
{},
"application/json"
File Metadata
Details
Attached
Mime Type
text/x-python
Expires
Jul 4 2025, 10:31 AM (5 w, 21 h ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3237752
Attached To
rDDEP Push deposit
Event Timeline
Log In to Comment