Page Menu
Home
Software Heritage
Search
Configure Global Search
Log In
Files
F9123543
backend.py
No One
Temporary
Actions
Download File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
15 KB
Subscribers
None
backend.py
View Options
# Copyright (C) 2017-2018 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import
smtplib
import
psycopg2.extras
import
psycopg2.pool
from
email.mime.text
import
MIMEText
from
swh.core.db
import
BaseDb
from
swh.core.db.common
import
db_transaction
from
swh.model
import
hashutil
from
swh.scheduler.utils
import
create_oneshot_task_dict
from
swh.vault.cookers
import
get_cooker_cls
cooking_task_name
=
'swh.vault.cooking_tasks.SWHCookingTask'
NOTIF_EMAIL_FROM
=
(
'"Software Heritage Vault" '
'<bot@softwareheritage.org>'
)
NOTIF_EMAIL_SUBJECT_SUCCESS
=
(
"Bundle ready: {obj_type} {short_id}"
)
NOTIF_EMAIL_SUBJECT_FAILURE
=
(
"Bundle failed: {obj_type} {short_id}"
)
NOTIF_EMAIL_BODY_SUCCESS
=
"""
You have requested the following bundle from the Software Heritage
Vault:
Object Type: {obj_type}
Object ID: {hex_id}
This bundle is now available for download at the following address:
{url}
Please keep in mind that this link might expire at some point, in which
case you will need to request the bundle again.
--\x20
The Software Heritage Developers
"""
NOTIF_EMAIL_BODY_FAILURE
=
"""
You have requested the following bundle from the Software Heritage
Vault:
Object Type: {obj_type}
Object ID: {hex_id}
This bundle could not be cooked for the following reason:
{progress_msg}
We apologize for the inconvenience.
--\x20
The Software Heritage Developers
"""
class
NotFoundExc
(
Exception
):
"""Bundle was not found."""
pass
def
batch_to_bytes
(
batch
):
return
[(
obj_type
,
hashutil
.
hash_to_bytes
(
obj_id
))
for
obj_type
,
obj_id
in
batch
]
class
VaultBackend
:
"""
Backend for the Software Heritage vault.
"""
def
__init__
(
self
,
db
,
cache
,
scheduler
,
storage
=
None
,
**
config
):
self
.
config
=
config
self
.
cache
=
cache
self
.
scheduler
=
scheduler
self
.
storage
=
storage
self
.
smtp_server
=
smtplib
.
SMTP
()
self
.
_pool
=
psycopg2
.
pool
.
ThreadedConnectionPool
(
config
.
get
(
'min_pool_conns'
,
1
),
config
.
get
(
'max_pool_conns'
,
10
),
db
,
cursor_factory
=
psycopg2
.
extras
.
RealDictCursor
,
)
self
.
_db
=
None
def
get_db
(
self
):
if
self
.
_db
:
return
self
.
_db
return
BaseDb
.
from_pool
(
self
.
_pool
)
def
put_db
(
self
,
db
):
if
db
is
not
self
.
_db
:
db
.
put_conn
()
@db_transaction
()
def
task_info
(
self
,
obj_type
,
obj_id
,
db
=
None
,
cur
=
None
):
"""Fetch information from a bundle"""
obj_id
=
hashutil
.
hash_to_bytes
(
obj_id
)
cur
.
execute
(
'''
SELECT id, type, object_id, task_id, task_status, sticky,
ts_created, ts_done, ts_last_access, progress_msg
FROM vault_bundle
WHERE type = %s AND object_id = %s'''
,
(
obj_type
,
obj_id
))
res
=
cur
.
fetchone
()
if
res
:
res
[
'object_id'
]
=
bytes
(
res
[
'object_id'
])
return
res
def
_send_task
(
self
,
*
args
):
"""Send a cooking task to the celery scheduler"""
task
=
create_oneshot_task_dict
(
'cook-vault-bundle'
,
*
args
)
added_tasks
=
self
.
scheduler
.
create_tasks
([
task
])
return
added_tasks
[
0
][
'id'
]
@db_transaction
()
def
create_task
(
self
,
obj_type
,
obj_id
,
sticky
=
False
,
db
=
None
,
cur
=
None
):
"""Create and send a cooking task"""
obj_id
=
hashutil
.
hash_to_bytes
(
obj_id
)
hex_id
=
hashutil
.
hash_to_hex
(
obj_id
)
cooker_class
=
get_cooker_cls
(
obj_type
)
cooker
=
cooker_class
(
obj_type
,
hex_id
,
backend
=
self
,
storage
=
self
.
storage
)
if
not
cooker
.
check_exists
():
raise
NotFoundExc
(
"Object {} was not found."
.
format
(
hex_id
))
cur
.
execute
(
'''
INSERT INTO vault_bundle (type, object_id, sticky)
VALUES (%s, %s, %s)'''
,
(
obj_type
,
obj_id
,
sticky
))
db
.
conn
.
commit
()
task_id
=
self
.
_send_task
(
obj_type
,
hex_id
)
cur
.
execute
(
'''
UPDATE vault_bundle
SET task_id = %s
WHERE type = %s AND object_id = %s'''
,
(
task_id
,
obj_type
,
obj_id
))
@db_transaction
()
def
add_notif_email
(
self
,
obj_type
,
obj_id
,
email
,
db
=
None
,
cur
=
None
):
"""Add an e-mail address to notify when a given bundle is ready"""
obj_id
=
hashutil
.
hash_to_bytes
(
obj_id
)
cur
.
execute
(
'''
INSERT INTO vault_notif_email (email, bundle_id)
VALUES (%s, (SELECT id FROM vault_bundle
WHERE type = %s AND object_id = %s))'''
,
(
email
,
obj_type
,
obj_id
))
@db_transaction
()
def
cook_request
(
self
,
obj_type
,
obj_id
,
*
,
sticky
=
False
,
email
=
None
,
db
=
None
,
cur
=
None
):
"""Main entry point for cooking requests. This starts a cooking task if
needed, and add the given e-mail to the notify list"""
obj_id
=
hashutil
.
hash_to_bytes
(
obj_id
)
info
=
self
.
task_info
(
obj_type
,
obj_id
)
# If there's a failed bundle entry, delete it first.
if
info
is
not
None
and
info
[
'task_status'
]
==
'failed'
:
cur
.
execute
(
'''DELETE FROM vault_bundle
WHERE type = %s AND object_id = %s'''
,
(
obj_type
,
obj_id
))
db
.
conn
.
commit
()
info
=
None
# If there's no bundle entry, create the task.
if
info
is
None
:
self
.
create_task
(
obj_type
,
obj_id
,
sticky
)
if
email
is
not
None
:
# If the task is already done, send the email directly
if
info
is
not
None
and
info
[
'task_status'
]
==
'done'
:
self
.
send_notification
(
None
,
email
,
obj_type
,
obj_id
,
info
[
'task_status'
])
# Else, add it to the notification queue
else
:
self
.
add_notif_email
(
obj_type
,
obj_id
,
email
)
info
=
self
.
task_info
(
obj_type
,
obj_id
)
return
info
@db_transaction
()
def
batch_cook
(
self
,
batch
,
db
=
None
,
cur
=
None
):
"""Cook a batch of bundles and returns the cooking id."""
# Import execute_values at runtime only, because it requires
# psycopg2 >= 2.7 (only available on postgresql servers)
from
psycopg2.extras
import
execute_values
cur
.
execute
(
'''
INSERT INTO vault_batch (id)
VALUES (DEFAULT)
RETURNING id'''
)
batch_id
=
cur
.
fetchone
()[
'id'
]
batch
=
batch_to_bytes
(
batch
)
# Delete all failed bundles from the batch
cur
.
execute
(
'''
DELETE FROM vault_bundle
WHERE task_status = 'failed'
AND (type, object_id) IN %s'''
,
(
tuple
(
batch
),))
# Insert all the bundles, return the new ones
execute_values
(
cur
,
'''
INSERT INTO vault_bundle (type, object_id)
VALUES %s ON CONFLICT DO NOTHING'''
,
batch
)
# Get the bundle ids and task status
cur
.
execute
(
'''
SELECT id, type, object_id, task_id FROM vault_bundle
WHERE (type, object_id) IN %s'''
,
(
tuple
(
batch
),))
bundles
=
cur
.
fetchall
()
# Insert the batch-bundle entries
batch_id_bundle_ids
=
[(
batch_id
,
row
[
'id'
])
for
row
in
bundles
]
execute_values
(
cur
,
'''
INSERT INTO vault_batch_bundle (batch_id, bundle_id)
VALUES %s ON CONFLICT DO NOTHING'''
,
batch_id_bundle_ids
)
db
.
conn
.
commit
()
# Get the tasks to fetch
batch_new
=
[(
row
[
'type'
],
bytes
(
row
[
'object_id'
]))
for
row
in
bundles
if
row
[
'task_id'
]
is
None
]
# Send the tasks
args_batch
=
[(
obj_type
,
hashutil
.
hash_to_hex
(
obj_id
))
for
obj_type
,
obj_id
in
batch_new
]
# TODO: change once the scheduler handles priority tasks
tasks
=
[
create_oneshot_task_dict
(
'swh-vault-batch-cooking'
,
*
args
)
for
args
in
args_batch
]
added_tasks
=
self
.
scheduler
.
create_tasks
(
tasks
)
tasks_ids_bundle_ids
=
zip
([
task
[
'id'
]
for
task
in
added_tasks
],
batch_new
)
tasks_ids_bundle_ids
=
[(
task_id
,
obj_type
,
obj_id
)
for
task_id
,
(
obj_type
,
obj_id
)
in
tasks_ids_bundle_ids
]
# Update the task ids
execute_values
(
cur
,
'''
UPDATE vault_bundle
SET task_id = s_task_id
FROM (VALUES %s) AS sub (s_task_id, s_type, s_object_id)
WHERE type = s_type::cook_type AND object_id = s_object_id '''
,
tasks_ids_bundle_ids
)
return
batch_id
@db_transaction
()
def
batch_info
(
self
,
batch_id
,
db
=
None
,
cur
=
None
):
"""Fetch information from a batch of bundles"""
cur
.
execute
(
'''
SELECT vault_bundle.id as id,
type, object_id, task_id, task_status, sticky,
ts_created, ts_done, ts_last_access, progress_msg
FROM vault_batch_bundle
LEFT JOIN vault_bundle ON vault_bundle.id = bundle_id
WHERE batch_id = %s'''
,
(
batch_id
,))
res
=
cur
.
fetchall
()
if
res
:
for
d
in
res
:
d
[
'object_id'
]
=
bytes
(
d
[
'object_id'
])
return
res
@db_transaction
()
def
is_available
(
self
,
obj_type
,
obj_id
,
db
=
None
,
cur
=
None
):
"""Check whether a bundle is available for retrieval"""
info
=
self
.
task_info
(
obj_type
,
obj_id
,
cur
=
cur
)
return
(
info
is
not
None
and
info
[
'task_status'
]
==
'done'
and
self
.
cache
.
is_cached
(
obj_type
,
obj_id
))
@db_transaction
()
def
fetch
(
self
,
obj_type
,
obj_id
,
db
=
None
,
cur
=
None
):
"""Retrieve a bundle from the cache"""
if
not
self
.
is_available
(
obj_type
,
obj_id
,
cur
=
cur
):
return
None
self
.
update_access_ts
(
obj_type
,
obj_id
,
cur
=
cur
)
return
self
.
cache
.
get
(
obj_type
,
obj_id
)
@db_transaction
()
def
update_access_ts
(
self
,
obj_type
,
obj_id
,
db
=
None
,
cur
=
None
):
"""Update the last access timestamp of a bundle"""
obj_id
=
hashutil
.
hash_to_bytes
(
obj_id
)
cur
.
execute
(
'''
UPDATE vault_bundle
SET ts_last_access = NOW()
WHERE type = %s AND object_id = %s'''
,
(
obj_type
,
obj_id
))
@db_transaction
()
def
set_status
(
self
,
obj_type
,
obj_id
,
status
,
db
=
None
,
cur
=
None
):
"""Set the cooking status of a bundle"""
obj_id
=
hashutil
.
hash_to_bytes
(
obj_id
)
req
=
(
'''
UPDATE vault_bundle
SET task_status = %s '''
+
(
''', ts_done = NOW() '''
if
status
==
'done'
else
''
)
+
'''WHERE type = %s AND object_id = %s'''
)
cur
.
execute
(
req
,
(
status
,
obj_type
,
obj_id
))
@db_transaction
()
def
set_progress
(
self
,
obj_type
,
obj_id
,
progress
,
db
=
None
,
cur
=
None
):
"""Set the cooking progress of a bundle"""
obj_id
=
hashutil
.
hash_to_bytes
(
obj_id
)
cur
.
execute
(
'''
UPDATE vault_bundle
SET progress_msg = %s
WHERE type = %s AND object_id = %s'''
,
(
progress
,
obj_type
,
obj_id
))
@db_transaction
()
def
send_all_notifications
(
self
,
obj_type
,
obj_id
,
db
=
None
,
cur
=
None
):
"""Send all the e-mails in the notification list of a bundle"""
obj_id
=
hashutil
.
hash_to_bytes
(
obj_id
)
cur
.
execute
(
'''
SELECT vault_notif_email.id AS id, email, task_status, progress_msg
FROM vault_notif_email
INNER JOIN vault_bundle ON bundle_id = vault_bundle.id
WHERE vault_bundle.type = %s AND vault_bundle.object_id = %s'''
,
(
obj_type
,
obj_id
))
for
d
in
cur
:
self
.
send_notification
(
d
[
'id'
],
d
[
'email'
],
obj_type
,
obj_id
,
status
=
d
[
'task_status'
],
progress_msg
=
d
[
'progress_msg'
])
@db_transaction
()
def
send_notification
(
self
,
n_id
,
email
,
obj_type
,
obj_id
,
status
,
progress_msg
=
None
,
db
=
None
,
cur
=
None
):
"""Send the notification of a bundle to a specific e-mail"""
hex_id
=
hashutil
.
hash_to_hex
(
obj_id
)
short_id
=
hex_id
[:
7
]
# TODO: instead of hardcoding this, we should probably:
# * add a "fetch_url" field in the vault_notif_email table
# * generate the url with flask.url_for() on the web-ui side
# * send this url as part of the cook request and store it in
# the table
# * use this url for the notification e-mail
url
=
(
'https://archive.softwareheritage.org/api/1/vault/{}/{}/'
'raw'
.
format
(
obj_type
,
hex_id
))
if
status
==
'done'
:
text
=
NOTIF_EMAIL_BODY_SUCCESS
.
strip
()
text
=
text
.
format
(
obj_type
=
obj_type
,
hex_id
=
hex_id
,
url
=
url
)
msg
=
MIMEText
(
text
)
msg
[
'Subject'
]
=
(
NOTIF_EMAIL_SUBJECT_SUCCESS
.
format
(
obj_type
=
obj_type
,
short_id
=
short_id
))
elif
status
==
'failed'
:
text
=
NOTIF_EMAIL_BODY_FAILURE
.
strip
()
text
=
text
.
format
(
obj_type
=
obj_type
,
hex_id
=
hex_id
,
progress_msg
=
progress_msg
)
msg
=
MIMEText
(
text
)
msg
[
'Subject'
]
=
(
NOTIF_EMAIL_SUBJECT_FAILURE
.
format
(
obj_type
=
obj_type
,
short_id
=
short_id
))
else
:
raise
RuntimeError
(
"send_notification called on a '{}' bundle"
.
format
(
status
))
msg
[
'From'
]
=
NOTIF_EMAIL_FROM
msg
[
'To'
]
=
email
self
.
_smtp_send
(
msg
)
if
n_id
is
not
None
:
cur
.
execute
(
'''
DELETE FROM vault_notif_email
WHERE id = %s'''
,
(
n_id
,))
def
_smtp_send
(
self
,
msg
):
# Reconnect if needed
try
:
status
=
self
.
smtp_server
.
noop
()[
0
]
except
smtplib
.
SMTPException
:
status
=
-
1
if
status
!=
250
:
self
.
smtp_server
.
connect
(
'localhost'
,
25
)
# Send the message
self
.
smtp_server
.
send_message
(
msg
)
@db_transaction
()
def
_cache_expire
(
self
,
cond
,
*
args
,
db
=
None
,
cur
=
None
):
"""Low-level expiration method, used by cache_expire_* methods"""
# Embedded SELECT query to be able to use ORDER BY and LIMIT
cur
.
execute
(
'''
DELETE FROM vault_bundle
WHERE ctid IN (
SELECT ctid
FROM vault_bundle
WHERE sticky = false
{}
)
RETURNING type, object_id
'''
.
format
(
cond
),
args
)
for
d
in
cur
:
self
.
cache
.
delete
(
d
[
'type'
],
bytes
(
d
[
'object_id'
]))
@db_transaction
()
def
cache_expire_oldest
(
self
,
n
=
1
,
by
=
'last_access'
,
db
=
None
,
cur
=
None
):
"""Expire the `n` oldest bundles"""
assert
by
in
(
'created'
,
'done'
,
'last_access'
)
filter
=
'''ORDER BY ts_{} LIMIT {}'''
.
format
(
by
,
n
)
return
self
.
_cache_expire
(
filter
)
@db_transaction
()
def
cache_expire_until
(
self
,
date
,
by
=
'last_access'
,
db
=
None
,
cur
=
None
):
"""Expire all the bundles until a certain date"""
assert
by
in
(
'created'
,
'done'
,
'last_access'
)
filter
=
'''AND ts_{} <= %s'''
.
format
(
by
)
return
self
.
_cache_expire
(
filter
,
date
)
File Metadata
Details
Attached
Mime Type
text/x-python
Expires
Sat, Jun 21, 5:39 PM (1 w, 6 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3366588
Attached To
rDVAU Software Heritage Vault
Event Timeline
Log In to Comment