Page Menu
Home
Software Heritage
Search
Configure Global Search
Log In
Files
F9124863
loader.py
No One
Temporary
Actions
Download File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
10 KB
Subscribers
None
loader.py
View Options
# Copyright (C) 2018 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import
os
import
shutil
from
tempfile
import
mkdtemp
import
arrow
from
swh.loader.core.utils
import
clean_dangling_folders
from
swh.loader.core.loader
import
BufferedLoader
from
swh.model.from_disk
import
Directory
from
swh.model.identifiers
import
(
revision_identifier
,
snapshot_identifier
,
identifier_to_bytes
,
normalize_timestamp
)
from
swh.storage.algos.snapshot
import
snapshot_get_all_branches
from
.client
import
PyPIClient
,
PyPIProject
TEMPORARY_DIR_PREFIX_PATTERN
=
'swh.loader.pypi.'
DEBUG_MODE
=
'** DEBUG MODE **'
class
PyPILoader
(
BufferedLoader
):
CONFIG_BASE_FILENAME
=
'loader/pypi'
ADDITIONAL_CONFIG
=
{
'temp_directory'
:
(
'str'
,
'/tmp/swh.loader.pypi/'
),
'cache'
:
(
'bool'
,
False
),
'cache_dir'
:
(
'str'
,
''
),
'debug'
:
(
'bool'
,
False
),
# NOT FOR PRODUCTION
}
def
__init__
(
self
,
client
=
None
):
super
()
.
__init__
(
logging_class
=
'swh.loader.pypi.PyPILoader'
)
self
.
origin_id
=
None
if
not
client
:
temp_directory
=
self
.
config
[
'temp_directory'
]
os
.
makedirs
(
temp_directory
,
exist_ok
=
True
)
self
.
temp_directory
=
mkdtemp
(
suffix
=
'-
%s
'
%
os
.
getpid
(),
prefix
=
TEMPORARY_DIR_PREFIX_PATTERN
,
dir
=
temp_directory
)
self
.
pypi_client
=
PyPIClient
(
temp_directory
=
self
.
temp_directory
,
cache
=
self
.
config
[
'cache'
],
cache_dir
=
self
.
config
[
'cache_dir'
])
else
:
self
.
temp_directory
=
client
.
temp_directory
self
.
pypi_client
=
client
self
.
debug
=
self
.
config
[
'debug'
]
self
.
done
=
False
def
pre_cleanup
(
self
):
"""To prevent disk explosion if some other workers exploded
in mid-air (OOM killed), we try and clean up dangling files.
"""
if
self
.
debug
:
self
.
log
.
warn
(
'
%s
Will not pre-clean up temp dir
%s
'
%
(
DEBUG_MODE
,
self
.
temp_directory
))
return
clean_dangling_folders
(
self
.
config
[
'temp_directory'
],
pattern_check
=
TEMPORARY_DIR_PREFIX_PATTERN
,
log
=
self
.
log
)
def
cleanup
(
self
):
"""Clean up temporary disk use
"""
if
self
.
debug
:
self
.
log
.
warn
(
'
%s
Will not clean up temp dir
%s
'
%
(
DEBUG_MODE
,
self
.
temp_directory
))
return
if
os
.
path
.
exists
(
self
.
temp_directory
):
self
.
log
.
debug
(
'Clean up
%s
'
%
self
.
temp_directory
)
shutil
.
rmtree
(
self
.
temp_directory
)
def
prepare_origin_visit
(
self
,
project_name
,
project_url
,
project_metadata_url
=
None
):
"""Prepare the origin visit information
Args:
project_name (str): Project's simple name
project_url (str): Project's main url
project_metadata_url (str): Project's metadata url
"""
self
.
origin
=
{
'url'
:
project_url
,
'type'
:
'pypi'
,
}
self
.
visit_date
=
None
# loader core will populate it
def
_known_artifacts
(
self
,
last_snapshot
):
"""Retrieve the known releases/artifact for the origin_id.
Args
snapshot (dict): Last snapshot for the visit
Returns:
list of (filename, sha256) tuples.
"""
if
not
last_snapshot
or
'branches'
not
in
last_snapshot
:
return
{}
# retrieve only revisions (e.g the alias we do not want here)
revs
=
[
rev
[
'target'
]
for
rev
in
last_snapshot
[
'branches'
]
.
values
()
if
rev
and
rev
[
'target_type'
]
==
'revision'
]
known_revisions
=
self
.
storage
.
revision_get
(
revs
)
ret
=
{}
for
revision
in
known_revisions
:
if
not
revision
:
# revision_get can return None
continue
if
'original_artifact'
in
revision
[
'metadata'
]:
artifact
=
revision
[
'metadata'
][
'original_artifact'
]
ret
[
artifact
[
'filename'
],
artifact
[
'sha256'
]]
=
revision
[
'id'
]
return
ret
def
_last_snapshot
(
self
):
"""Retrieve the last snapshot
"""
snapshot
=
self
.
storage
.
snapshot_get_latest
(
self
.
origin_id
)
if
snapshot
and
snapshot
.
pop
(
'next_branch'
,
None
):
snapshot
=
snapshot_get_all_branches
(
self
.
storage
,
snapshot
[
'id'
])
return
snapshot
def
prepare
(
self
,
project_name
,
project_url
,
project_metadata_url
=
None
):
"""Keep reference to the origin url (project) and the
project metadata url
Args:
project_name (str): Project's simple name
project_url (str): Project's main url
project_metadata_url (str): Project's metadata url
"""
self
.
project_name
=
project_name
self
.
origin_url
=
project_url
self
.
project_metadata_url
=
project_metadata_url
self
.
project
=
PyPIProject
(
self
.
pypi_client
,
self
.
project_name
,
self
.
project_metadata_url
)
self
.
_prepare_state
()
def
_prepare_state
(
self
):
"""Initialize internal state (snapshot, contents, directories, etc...)
This is called from `prepare` method.
"""
last_snapshot
=
self
.
_last_snapshot
()
self
.
known_artifacts
=
self
.
_known_artifacts
(
last_snapshot
)
# and the artifacts
# that will be the source of data to retrieve
self
.
new_artifacts
=
self
.
project
.
download_new_releases
(
self
.
known_artifacts
)
# temporary state
self
.
_contents
=
[]
self
.
_directories
=
[]
self
.
_revisions
=
[]
self
.
_load_status
=
'uneventful'
self
.
_visit_status
=
'full'
def
fetch_data
(
self
):
"""Called once per release artifact version (can be many for one
release).
This will for each call:
- retrieve a release artifact (associated to a release version)
- Uncompress it and compute the necessary information
- Computes the swh objects
Returns:
True as long as data to fetch exist
"""
data
=
None
if
self
.
done
:
return
False
try
:
data
=
next
(
self
.
new_artifacts
)
self
.
_load_status
=
'eventful'
except
StopIteration
:
self
.
done
=
True
return
False
project_info
,
author
,
release
,
artifact
,
dir_path
=
data
dir_path
=
dir_path
.
encode
(
'utf-8'
)
directory
=
Directory
.
from_disk
(
path
=
dir_path
,
data
=
True
)
_objects
=
directory
.
collect
()
self
.
_contents
=
_objects
[
'content'
]
.
values
()
self
.
_directories
=
_objects
[
'directory'
]
.
values
()
date
=
normalize_timestamp
(
int
(
arrow
.
get
(
artifact
[
'date'
])
.
timestamp
))
name
=
release
[
'name'
]
.
encode
(
'utf-8'
)
message
=
release
[
'message'
]
.
encode
(
'utf-8'
)
if
message
:
message
=
b
'
%s
:
%s
'
%
(
name
,
message
)
else
:
message
=
name
_revision
=
{
'synthetic'
:
True
,
'metadata'
:
{
'original_artifact'
:
artifact
,
'project'
:
project_info
,
},
'author'
:
author
,
'date'
:
date
,
'committer'
:
author
,
'committer_date'
:
date
,
'message'
:
message
,
'directory'
:
directory
.
hash
,
'parents'
:
[],
'type'
:
'tar'
,
}
_revision
[
'id'
]
=
identifier_to_bytes
(
revision_identifier
(
_revision
))
self
.
_revisions
.
append
(
_revision
)
artifact_key
=
artifact
[
'filename'
],
artifact
[
'sha256'
]
self
.
known_artifacts
[
artifact_key
]
=
_revision
[
'id'
]
return
not
self
.
done
def
target_from_artifact
(
self
,
filename
,
sha256
):
target
=
self
.
known_artifacts
.
get
((
filename
,
sha256
))
if
target
:
return
{
'target'
:
target
,
'target_type'
:
'revision'
,
}
return
None
def
generate_and_load_snapshot
(
self
):
branches
=
{}
for
release
,
artifacts
in
self
.
project
.
all_release_artifacts
()
.
items
():
default_release
=
self
.
project
.
default_release
()
if
len
(
artifacts
)
==
1
:
# Only one artifact for this release, generate a single branch
branch_name
=
'releases/
%s
'
%
release
filename
,
sha256
=
artifacts
[
0
]
target
=
self
.
target_from_artifact
(
filename
,
sha256
)
branches
[
branch_name
.
encode
(
'utf-8'
)]
=
target
if
release
==
default_release
:
branches
[
b
'HEAD'
]
=
{
'target_type'
:
'alias'
,
'target'
:
branch_name
.
encode
(
'utf-8'
),
}
if
not
target
:
self
.
_visit_status
=
'partial'
else
:
# Several artifacts for this release, generate a separate
# pointer for each of them
for
filename
,
sha256
in
artifacts
:
branch_name
=
'releases/
%s
/
%s
'
%
(
release
,
filename
)
target
=
self
.
target_from_artifact
(
filename
,
sha256
)
branches
[
branch_name
.
encode
(
'utf-8'
)]
=
target
if
not
target
:
self
.
_visit_status
=
'partial'
snapshot
=
{
'branches'
:
branches
,
}
snapshot
[
'id'
]
=
identifier_to_bytes
(
snapshot_identifier
(
snapshot
))
self
.
maybe_load_snapshot
(
snapshot
)
def
store_data
(
self
):
"""(override) This sends collected objects to storage.
"""
self
.
maybe_load_contents
(
self
.
_contents
)
self
.
maybe_load_directories
(
self
.
_directories
)
self
.
maybe_load_revisions
(
self
.
_revisions
)
if
self
.
done
:
self
.
generate_and_load_snapshot
()
self
.
flush
()
def
load_status
(
self
):
return
{
'status'
:
self
.
_load_status
,
}
def
visit_status
(
self
):
return
self
.
_visit_status
if
__name__
==
'__main__'
:
import
logging
import
sys
logging
.
basicConfig
(
level
=
logging
.
DEBUG
)
if
len
(
sys
.
argv
)
!=
2
:
logging
.
error
(
'Usage:
%s
<module-name>'
%
sys
.
argv
[
0
])
sys
.
exit
(
1
)
module_name
=
sys
.
argv
[
1
]
loader
=
PyPILoader
()
loader
.
load
(
module_name
,
'https://pypi.org/projects/
%s
/'
%
module_name
,
'https://pypi.org/pypi/
%s
/json'
%
module_name
,
)
File Metadata
Details
Attached
Mime Type
text/x-python
Expires
Sat, Jun 21, 7:39 PM (2 w, 6 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3238699
Attached To
rDLDPY PyPI loader
Event Timeline
Log In to Comment