Page Menu
Home
Software Heritage
Search
Configure Global Search
Log In
Files
F9312426
loader.py
No One
Temporary
Actions
Download File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
15 KB
Subscribers
None
loader.py
View Options
# Copyright (C) 2019-2020 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import
datetime
import
logging
import
tempfile
import
os
from
typing
import
(
Any
,
Dict
,
Generator
,
List
,
Mapping
,
Optional
,
Sequence
,
Tuple
)
from
swh.core.tarball
import
uncompress
from
swh.core.config
import
SWHConfig
from
swh.model.from_disk
import
Directory
from
swh.model.hashutil
import
hash_to_hex
from
swh.model.identifiers
import
(
revision_identifier
,
snapshot_identifier
,
identifier_to_bytes
)
from
swh.model.model
import
Sha1Git
from
swh.storage
import
get_storage
from
swh.storage.algos.snapshot
import
snapshot_get_all_branches
from
swh.loader.core.converters
import
prepare_contents
from
swh.loader.package.utils
import
download
logger
=
logging
.
getLogger
(
__name__
)
# Not implemented yet:
# - clean up disk routines from previous killed workers (when OOMkilled)
# -> separation of concern would like this to be abstracted from the code
# -> experience tells us it's complicated to do as such (T903, T964, T982,
# etc...)
#
# - model: swh.model.merkle.from_disk should output swh.model.model.* objects
# to avoid this layer's conversion routine call
# -> Take this up within swh.model's current implementation
class
PackageLoader
:
# Origin visit type (str) set by the loader
visit_type
=
''
def
__init__
(
self
,
url
):
"""Loader's constructor. This raises exception if the minimal required
configuration is missing (cf. fn:`check` method).
Args:
url (str): Origin url to load data from
"""
# This expects to use the environment variable SWH_CONFIG_FILENAME
self
.
config
=
SWHConfig
.
parse_config_file
()
self
.
_check_configuration
()
self
.
storage
=
get_storage
(
**
self
.
config
[
'storage'
])
self
.
url
=
url
self
.
visit_date
=
datetime
.
datetime
.
now
(
tz
=
datetime
.
timezone
.
utc
)
self
.
max_content_size
=
self
.
config
[
'max_content_size'
]
def
_check_configuration
(
self
):
"""Checks the minimal configuration required is set for the loader.
If some required configuration is missing, exception detailing the
issue is raised.
"""
if
'storage'
not
in
self
.
config
:
raise
ValueError
(
'Misconfiguration, at least the storage key should be set'
)
def
get_versions
(
self
)
->
Sequence
[
str
]:
"""Return the list of all published package versions.
Returns:
Sequence of published versions
"""
return
[]
def
get_package_info
(
self
,
version
:
str
)
->
Generator
[
Tuple
[
str
,
Mapping
[
str
,
Any
]],
None
,
None
]:
"""Given a release version of a package, retrieve the associated
package information for such version.
Args:
version: Package version
Returns:
(branch name, package metadata)
"""
yield from
{}
def
build_revision
(
self
,
a_metadata
:
Dict
,
uncompressed_path
:
str
)
->
Dict
:
"""Build the revision dict from the archive metadata (extrinsic
artifact metadata) and the intrinsic metadata.
Args:
a_metadata: Artifact metadata
uncompressed_path: Artifact uncompressed path on disk
Returns:
SWH data dict
"""
return
{}
def
get_default_version
(
self
)
->
str
:
"""Retrieve the latest release version if any.
Returns:
Latest version
"""
return
''
def
last_snapshot
(
self
)
->
Optional
[
Dict
]:
"""Retrieve the last snapshot
"""
snapshot
=
None
visit
=
self
.
storage
.
origin_visit_get_latest
(
self
.
url
,
require_snapshot
=
True
)
if
visit
:
snapshot
=
snapshot_get_all_branches
(
self
.
storage
,
visit
[
'snapshot'
])
return
snapshot
def
known_artifacts
(
self
,
snapshot
:
Optional
[
Dict
])
->
Dict
:
"""Retrieve the known releases/artifact for the origin.
Args
snapshot: snapshot for the visit
Returns:
Dict of keys revision id (bytes), values a metadata Dict.
"""
if
not
snapshot
or
'branches'
not
in
snapshot
:
return
{}
# retrieve only revisions (e.g the alias we do not want here)
revs
=
[
rev
[
'target'
]
for
rev
in
snapshot
[
'branches'
]
.
values
()
if
rev
and
rev
[
'target_type'
]
==
'revision'
]
known_revisions
=
self
.
storage
.
revision_get
(
revs
)
ret
=
{}
for
revision
in
known_revisions
:
if
not
revision
:
# revision_get can return None
continue
ret
[
revision
[
'id'
]]
=
revision
[
'metadata'
]
return
ret
def
resolve_revision_from
(
self
,
known_artifacts
:
Dict
,
artifact_metadata
:
Dict
)
\
->
Optional
[
bytes
]:
"""Resolve the revision from a snapshot and an artifact metadata dict.
If the artifact has already been downloaded, this will return the
existing revision targeting that uncompressed artifact directory.
Otherwise, this returns None.
Args:
snapshot: Snapshot
artifact_metadata: Information dict
Returns:
None or revision identifier
"""
return
None
def
download_package
(
self
,
p_info
:
Mapping
[
str
,
Any
],
tmpdir
:
str
)
->
List
[
Tuple
[
str
,
Mapping
]]:
"""Download artifacts for a specific package. All downloads happen in
in the tmpdir folder.
Default implementation expects the artifacts package info to be
about one artifact per package.
Note that most implementation have 1 artifact per package. But some
implementation have multiple artifacts per package (debian), some have
none, the package is the artifact (gnu).
Args:
artifacts_package_info: Information on the package artifacts to
download (url, filename, etc...)
tmpdir: Location to retrieve such artifacts
Returns:
List of (path, computed hashes)
"""
a_uri
=
p_info
[
'url'
]
filename
=
p_info
.
get
(
'filename'
)
return
[
download
(
a_uri
,
dest
=
tmpdir
,
filename
=
filename
)]
def
uncompress
(
self
,
dl_artifacts
:
List
[
Tuple
[
str
,
Mapping
[
str
,
Any
]]],
dest
:
str
)
->
str
:
"""Uncompress the artifact(s) in the destination folder dest.
Optionally, this could need to use the p_info dict for some more
information (debian).
"""
uncompressed_path
=
os
.
path
.
join
(
dest
,
'src'
)
for
a_path
,
_
in
dl_artifacts
:
uncompress
(
a_path
,
dest
=
uncompressed_path
)
return
uncompressed_path
def
load
(
self
)
->
Dict
:
"""Load for a specific origin the associated contents.
for each package version of the origin
1. Fetch the files for one package version By default, this can be
implemented as a simple HTTP request. Loaders with more specific
requirements can override this, e.g.: the PyPI loader checks the
integrity of the downloaded files; the Debian loader has to download
and check several files for one package version.
2. Extract the downloaded files By default, this would be a universal
archive/tarball extraction.
Loaders for specific formats can override this method (for instance,
the Debian loader uses dpkg-source -x).
3. Convert the extracted directory to a set of Software Heritage
objects Using swh.model.from_disk.
4. Extract the metadata from the unpacked directories This would only
be applicable for "smart" loaders like npm (parsing the
package.json), PyPI (parsing the PKG-INFO file) or Debian (parsing
debian/changelog and debian/control).
On "minimal-metadata" sources such as the GNU archive, the lister
should provide the minimal set of metadata needed to populate the
revision/release objects (authors, dates) as an argument to the
task.
5. Generate the revision/release objects for the given version. From
the data generated at steps 3 and 4.
end for each
6. Generate and load the snapshot for the visit
Using the revisions/releases collected at step 5., and the branch
information from step 0., generate a snapshot and load it into the
Software Heritage archive
"""
status_load
=
'uneventful'
# either: eventful, uneventful, failed
status_visit
=
'full'
# either: partial, full
tmp_revisions
=
{}
# type: Dict[str, List]
snapshot
=
None
# Prepare origin and origin_visit
origin
=
{
'url'
:
self
.
url
}
try
:
self
.
storage
.
origin_add_one
(
origin
)
visit_id
=
self
.
storage
.
origin_visit_add
(
origin
=
self
.
url
,
date
=
self
.
visit_date
,
type
=
self
.
visit_type
)[
'visit'
]
except
Exception
as
e
:
logger
.
error
(
'Failed to create origin/origin_visit. Reason:
%s
'
,
e
)
return
{
'status'
:
'failed'
}
try
:
last_snapshot
=
self
.
last_snapshot
()
logger
.
debug
(
'last snapshot:
%s
'
,
last_snapshot
)
known_artifacts
=
self
.
known_artifacts
(
last_snapshot
)
logger
.
debug
(
'known artifacts:
%s
'
,
known_artifacts
)
# Retrieve the default release version (the "latest" one)
default_version
=
self
.
get_default_version
()
logger
.
debug
(
'default version:
%s
'
,
default_version
)
for
version
in
self
.
get_versions
():
# for each
logger
.
debug
(
'version:
%s
'
,
version
)
tmp_revisions
[
version
]
=
[]
# `p_` stands for `package_`
for
branch_name
,
p_info
in
self
.
get_package_info
(
version
):
logger
.
debug
(
'package_info:
%s
'
,
p_info
)
revision_id
=
self
.
resolve_revision_from
(
known_artifacts
,
p_info
[
'raw'
])
if
revision_id
is
None
:
(
revision_id
,
loaded
)
=
\
self
.
_load_revision
(
p_info
,
origin
)
if
loaded
:
status_load
=
'eventful'
else
:
status_visit
=
'partial'
if
revision_id
is
None
:
continue
tmp_revisions
[
version
]
.
append
((
branch_name
,
revision_id
))
logger
.
debug
(
'tmp_revisions:
%s
'
,
tmp_revisions
)
# Build and load the snapshot
branches
=
{}
# type: Dict[bytes, Mapping[str, Any]]
for
version
,
branch_name_revisions
in
tmp_revisions
.
items
():
if
version
==
default_version
and
\
len
(
branch_name_revisions
)
==
1
:
# only 1 branch (no ambiguity), we can create an alias
# branch 'HEAD'
branch_name
,
_
=
branch_name_revisions
[
0
]
# except for some corner case (deposit)
if
branch_name
!=
'HEAD'
:
branches
[
b
'HEAD'
]
=
{
'target_type'
:
'alias'
,
'target'
:
branch_name
.
encode
(
'utf-8'
),
}
for
branch_name
,
target
in
branch_name_revisions
:
branches
[
branch_name
.
encode
(
'utf-8'
)]
=
{
'target_type'
:
'revision'
,
'target'
:
target
,
}
snapshot
=
{
'branches'
:
branches
}
logger
.
debug
(
'snapshot:
%s
'
,
snapshot
)
snapshot
[
'id'
]
=
identifier_to_bytes
(
snapshot_identifier
(
snapshot
))
logger
.
debug
(
'snapshot:
%s
'
,
snapshot
)
self
.
storage
.
snapshot_add
([
snapshot
])
if
hasattr
(
self
.
storage
,
'flush'
):
self
.
storage
.
flush
()
except
Exception
:
logger
.
exception
(
'Fail to load
%s
'
%
self
.
url
)
status_visit
=
'partial'
status_load
=
'failed'
finally
:
self
.
storage
.
origin_visit_update
(
origin
=
self
.
url
,
visit_id
=
visit_id
,
status
=
status_visit
,
snapshot
=
snapshot
and
snapshot
[
'id'
])
result
=
{
'status'
:
status_load
,
}
# type: Dict[str, Any]
if
snapshot
:
result
[
'snapshot_id'
]
=
hash_to_hex
(
snapshot
[
'id'
])
return
result
def
_load_revision
(
self
,
p_info
,
origin
)
->
Tuple
[
Optional
[
Sha1Git
],
bool
]:
"""Does all the loading of a revision itself:
* downloads a package and uncompresses it
* loads it from disk
* adds contents, directories, and revision to self.storage
* returns (revision_id, loaded)
"""
with
tempfile
.
TemporaryDirectory
()
as
tmpdir
:
try
:
dl_artifacts
=
self
.
download_package
(
p_info
,
tmpdir
)
except
Exception
:
logger
.
exception
(
'Unable to retrieve
%s
'
,
p_info
)
return
(
None
,
False
)
uncompressed_path
=
self
.
uncompress
(
dl_artifacts
,
dest
=
tmpdir
)
logger
.
debug
(
'uncompressed_path:
%s
'
,
uncompressed_path
)
directory
=
Directory
.
from_disk
(
path
=
uncompressed_path
.
encode
(
'utf-8'
),
data
=
True
)
# noqa
# FIXME: Try not to load the full raw content in
# memory
objects
=
directory
.
collect
()
contents
,
skipped_contents
=
prepare_contents
(
objects
.
get
(
'content'
,
{})
.
values
(),
max_content_size
=
self
.
max_content_size
,
origin_url
=
origin
[
'url'
])
self
.
storage
.
skipped_content_add
(
skipped_contents
)
logger
.
debug
(
'Number of skipped contents:
%s
'
,
len
(
skipped_contents
))
self
.
storage
.
content_add
(
contents
)
logger
.
debug
(
'Number of contents:
%s
'
,
len
(
contents
))
directories
=
list
(
objects
.
get
(
'directory'
,
{})
.
values
())
logger
.
debug
(
'Number of directories:
%s
'
,
len
(
directories
))
self
.
storage
.
directory_add
(
directories
)
# FIXME: This should be release. cf. D409
revision
=
self
.
build_revision
(
p_info
[
'raw'
],
uncompressed_path
)
if
not
revision
:
# Some artifacts are missing intrinsic metadata
# skipping those
return
(
None
,
True
)
revision
.
update
({
'synthetic'
:
True
,
'directory'
:
directory
.
hash
,
})
revision
[
'metadata'
]
.
update
({
'original_artifact'
:
[
hashes
for
_
,
hashes
in
dl_artifacts
],
})
revision
[
'id'
]
=
identifier_to_bytes
(
revision_identifier
(
revision
))
logger
.
debug
(
'Revision:
%s
'
,
revision
)
self
.
storage
.
revision_add
([
revision
])
return
(
revision
[
'id'
],
True
)
File Metadata
Details
Attached
Mime Type
text/x-ruby
Expires
Thu, Jul 3, 10:52 AM (1 w, 4 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3304019
Attached To
rDLDBASE Generic VCS/Package Loader
Event Timeline
Log In to Comment