Page MenuHomeSoftware Heritage

No OneTemporary

diff --git a/swh/storage/archiver/db.py b/swh/storage/archiver/db.py
index 917e06f9..30b4e3f7 100644
--- a/swh/storage/archiver/db.py
+++ b/swh/storage/archiver/db.py
@@ -1,287 +1,269 @@
# Copyright (C) 2015-2017 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import json
import time
from swh.core import hashutil
from swh.storage.db import BaseDb, cursor_to_bytes, stored_procedure
class ArchiverDb(BaseDb):
"""Proxy to the SWH's archiver DB
"""
def archive_ls(self, cur=None):
""" Get all the archives registered on the server.
Yields:
a tuple (server_id, server_url) for each archive server.
"""
cur = self._cursor(cur)
cur.execute("SELECT * FROM archive")
yield from cursor_to_bytes(cur)
def content_archive_get(self, content_id, cur=None):
""" Get the archival status of a content in a specific server.
Retrieve from the database the archival status of the given content
in the given archive server.
Args:
content_id: the sha1 of the content.
Yields:
A tuple (content_id, present_copies, ongoing_copies), where
ongoing_copies is a dict mapping copy to mtime.
"""
query = """SELECT content_id,
array(
SELECT key
FROM jsonb_each(copies)
WHERE value->>'status' = 'present'
ORDER BY key
) AS present,
array(
SELECT key
FROM jsonb_each(copies)
WHERE value->>'status' = 'ongoing'
ORDER BY key
) AS ongoing,
array(
SELECT value->'mtime'
FROM jsonb_each(copies)
WHERE value->>'status' = 'ongoing'
ORDER BY key
) AS ongoing_mtime
FROM content_archive
WHERE content_id = %s
ORDER BY content_id
"""
cur = self._cursor(cur)
cur.execute(query, (content_id,))
row = cur.fetchone()
if not row:
return None
content_id, present, ongoing, mtimes = row
return (content_id, present, dict(zip(ongoing, mtimes)))
def content_archive_get_copies(self, last_content=None, limit=1000,
cur=None):
"""Get the list of copies for `limit` contents starting after
`last_content`.
Args:
last_content: sha1 of the last content retrieved. May be None
to start at the beginning.
limit: number of contents to retrieve. Can be None to retrieve all
objects (will be slow).
Yields:
A tuple (content_id, present_copies, ongoing_copies), where
ongoing_copies is a dict mapping copy to mtime.
"""
query = """SELECT content_id,
array(
SELECT key
FROM jsonb_each(copies)
WHERE value->>'status' = 'present'
ORDER BY key
) AS present,
array(
SELECT key
FROM jsonb_each(copies)
WHERE value->>'status' = 'ongoing'
ORDER BY key
) AS ongoing,
array(
SELECT value->'mtime'
FROM jsonb_each(copies)
WHERE value->>'status' = 'ongoing'
ORDER BY key
) AS ongoing_mtime
FROM content_archive
WHERE content_id > %s
ORDER BY content_id
LIMIT %s
"""
if last_content is None:
last_content = b''
cur = self._cursor(cur)
cur.execute(query, (last_content, limit))
for content_id, present, ongoing, mtimes in cursor_to_bytes(cur):
yield (content_id, present, dict(zip(ongoing, mtimes)))
def content_archive_get_unarchived_copies(
self, retention_policy, last_content=None,
limit=1000, cur=None):
""" Get the list of copies for `limit` contents starting after
`last_content`. Yields only copies with number of present
smaller than `retention policy`.
Args:
last_content: sha1 of the last content retrieved. May be None
to start at the beginning.
retention_policy: number of required present copies
limit: number of contents to retrieve. Can be None to retrieve all
objects (will be slow).
Yields:
A tuple (content_id, present_copies, ongoing_copies), where
ongoing_copies is a dict mapping copy to mtime.
"""
query = """SELECT content_id,
array(
SELECT key
FROM jsonb_each(copies)
WHERE value->>'status' = 'present'
ORDER BY key
) AS present,
array(
SELECT key
FROM jsonb_each(copies)
WHERE value->>'status' = 'ongoing'
ORDER BY key
) AS ongoing,
array(
SELECT value->'mtime'
FROM jsonb_each(copies)
WHERE value->>'status' = 'ongoing'
ORDER BY key
) AS ongoing_mtime
FROM content_archive
WHERE content_id > %s AND num_present < %s
ORDER BY content_id
LIMIT %s
"""
if last_content is None:
last_content = b''
cur = self._cursor(cur)
cur.execute(query, (last_content, retention_policy, limit))
for content_id, present, ongoing, mtimes in cursor_to_bytes(cur):
yield (content_id, present, dict(zip(ongoing, mtimes)))
@stored_procedure('swh_mktemp_content_archive')
def mktemp_content_archive(self, cur=None):
"""Trigger the creation of the temporary table tmp_content_archive
during the lifetime of the transaction.
Use from archiver.storage module:
self.db.mktemp_content_archive()
# copy data over to the temp table
self.db.copy_to([{'colname': id0}, {'colname': id1}],
'tmp_cache_content',
['colname'], cur)
"""
pass
def content_archive_get_missing(self, backend_name, cur=None):
"""Retrieve the content missing from backend_name.
"""
cur = self._cursor(cur)
cur.execute("select * from swh_content_archive_missing(%s)",
(backend_name,))
yield from cursor_to_bytes(cur)
def content_archive_get_unknown(self, cur=None):
"""Retrieve unknown sha1 from archiver db.
"""
cur = self._cursor(cur)
cur.execute('select * from swh_content_archive_unknown()')
yield from cursor_to_bytes(cur)
- def content_archive_insert(self, content_id, source, status, cur=None):
- """Insert a new entry in the db for the content_id.
-
- Args:
- content_id: content concerned
- source: name of the source
- status: the status of the content for that source
-
- """
- if isinstance(content_id, bytes):
- content_id = '\\x%s' % hashutil.hash_to_hex(content_id)
-
- query = """INSERT INTO content_archive(content_id, copies, num_present)
- VALUES('%s', '{"%s": {"status": "%s", "mtime": %d}}', 1)
- """ % (content_id, source, status, int(time.time()))
- cur = self._cursor(cur)
- cur.execute(query)
-
def content_archive_update(self, content_id, archive_id,
new_status=None, cur=None):
""" Update the status of an archive content and set its mtime to
Change the mtime of an archived content for the given archive and set
it's mtime to the current time.
Args:
content_id (str): content sha1
archive_id (str): name of the archive
new_status (str): one of 'missing', 'present' or 'ongoing'.
this status will replace the previous one. If not given,
the function only change the mtime of the content for the
given archive.
"""
if isinstance(content_id, bytes):
content_id = '\\x%s' % hashutil.hash_to_hex(content_id)
if new_status is not None:
query = """UPDATE content_archive
SET copies=jsonb_set(
copies, '{%s}',
'{"status":"%s", "mtime":%d}'
)
WHERE content_id='%s'
""" % (archive_id,
new_status, int(time.time()),
content_id)
else:
query = """ UPDATE content_archive
SET copies=jsonb_set(copies, '{%s,mtime}', '%d')
WHERE content_id='%s'
""" % (archive_id, int(time.time()))
cur = self._cursor(cur)
cur.execute(query)
- def content_archive_content_add(
+ def content_archive_add(
self, content_id, sources_present, sources_missing, cur=None):
if isinstance(content_id, bytes):
content_id = '\\x%s' % hashutil.hash_to_hex(content_id)
copies = {}
num_present = 0
for source in sources_present:
copies[source] = {
"status": "present",
"mtime": int(time.time()),
}
num_present += 1
for source in sources_missing:
copies[source] = {
"status": "absent",
}
query = """INSERT INTO content_archive(content_id, copies, num_present)
VALUES('%s', '%s', %s)
- """ % (content_id, json.dumps(copies), num_present)
+ """ % (content_id, json.dumps(copies), num_present)
cur = self._cursor(cur)
cur.execute(query)
diff --git a/swh/storage/archiver/director.py b/swh/storage/archiver/director.py
index 9e461540..80b36589 100644
--- a/swh/storage/archiver/director.py
+++ b/swh/storage/archiver/director.py
@@ -1,301 +1,303 @@
# Copyright (C) 2015-2016 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import abc
import click
import sys
from swh.core import config, utils, hashutil
from swh.objstorage import get_objstorage
from swh.scheduler.celery_backend.config import app
from . import tasks # noqa
from .storage import ArchiverStorage
class ArchiverDirectorBase(config.SWHConfig, metaclass=abc.ABCMeta):
"""Abstract Director class
An archiver director is in charge of dispatching batch of
contents to archiver workers (for them to archive).
Inherit from this class and provide:
- ADDITIONAL_CONFIG: Some added configuration needed for the
director to work
- CONFIG_BASE_FILENAME: relative path to lookup for the
configuration file
- def get_contents_to_archive(self): Implementation method to read
contents to archive
"""
DEFAULT_CONFIG = {
'batch_max_size': ('int', 1500),
'asynchronous': ('bool', True),
'dbconn': ('str', 'dbname=softwareheritage-archiver-dev user=guest')
}
# Destined to be overridden by subclass
ADDITIONAL_CONFIG = {}
# We use the same configuration file as the worker
CONFIG_BASE_FILENAME = 'archiver/worker'
# The worker's task queue name to use
TASK_NAME = None
def __init__(self):
""" Constructor of the archiver director.
Args:
db_conn_archiver: Either a libpq connection string,
or a psycopg2 connection for the archiver db.
config: optionnal additional configuration. Keys in the dict will
override the one parsed from the configuration file.
"""
super().__init__()
self.config = self.parse_config_file(
additional_configs=[self.ADDITIONAL_CONFIG])
self.archiver_storage = ArchiverStorage(self.config['dbconn'])
def run(self):
""" Run the archiver director.
The archiver director will check all the contents of the archiver
database and do the required backup jobs.
"""
if self.config['asynchronous']:
run_fn = self.run_async_worker
else:
run_fn = self.run_sync_worker
for batch in self.read_batch_contents():
run_fn(batch)
def run_async_worker(self, batch):
"""Produce a worker that will be added to the task queue.
"""
task = app.tasks[self.TASK_NAME]
task.delay(batch=batch)
def run_sync_worker(self, batch):
"""Run synchronously a worker on the given batch.
"""
task = app.tasks[self.TASK_NAME]
task(batch=batch)
def read_batch_contents(self):
""" Create batch of contents that needs to be archived
Yields:
batch of sha1 that corresponds to contents that needs more archive
copies.
"""
contents = []
for content in self.get_contents_to_archive():
contents.append(content)
if len(contents) > self.config['batch_max_size']:
yield contents
contents = []
if len(contents) > 0:
yield contents
@abc.abstractmethod
def get_contents_to_archive(self):
"""Retrieve generator of sha1 to archive
Yields:
sha1 to archive
"""
pass
class ArchiverWithRetentionPolicyDirector(ArchiverDirectorBase):
"""Process the files in order to know which one is needed as backup.
The archiver director processes the files in the local storage in order
to know which one needs archival and it delegates this task to
archiver workers.
"""
ADDITIONAL_CONFIG = {
'retention_policy': ('int', 2),
}
TASK_NAME = 'swh.storage.archiver.tasks.SWHArchiverWithRetentionPolicyTask'
def get_contents_to_archive(self):
"""Create batch of contents that needs to be archived
Yields:
Datas about a content as a tuple
(content_id, present_copies, ongoing_copies) where ongoing_copies
is a dict mapping copy to mtime.
"""
last_content = None
while True:
archiver_contents = list(
self.archiver_storage.content_archive_get_unarchived_copies(
last_content=last_content,
retention_policy=self.config['retention_policy']))
if not archiver_contents:
return
for content_id, _, _ in archiver_contents:
last_content = content_id
yield content_id
def read_sha1_from_stdin():
"""Read sha1 from stdin.
"""
for sha1 in sys.stdin:
yield {'content_id': hashutil.hex_to_hash(sha1.rstrip())}
class ArchiverStdinToBackendDirector(ArchiverDirectorBase):
"""A cloud archiver director in charge of reading contents and send
them in batch in the cloud.
The archiver director, in order:
- Reads sha1 to send to a specific backend.
- Checks if those sha1 are known in the archiver. If they are not,
add them
- if the sha1 are missing, they are sent for the worker to archive
If the flag force_copy is set, this will force the copy to be sent
for archive even though it has already been done.
"""
ADDITIONAL_CONFIG = {
'destination': ('str', 'azure'),
'force_copy': ('bool', False),
'source': ('str', 'uffizi'),
'storages': ('list[dict]',
[
{'host': 'uffizi',
'cls': 'pathslicing',
'args': {'root': '/tmp/softwareheritage/objects',
'slicing': '0:2/2:4/4:6'}},
{'host': 'banco',
'cls': 'remote',
'args': {'base_url': 'http://banco:5003/'}}
])
}
CONFIG_BASE_FILENAME = 'archiver/worker-to-backend'
TASK_NAME = 'swh.storage.archiver.tasks.SWHArchiverToBackendTask'
def __init__(self):
super().__init__()
self.destination = self.config['destination']
self.force_copy = self.config['force_copy']
self.objstorages = {
storage['host']: get_objstorage(storage['cls'], storage['args'])
for storage in self.config.get('storages', [])
}
# Fallback objstorage
self.source = self.config['source']
+ # Where the content is missing
+ self.sources_missing = list(
+ set(self.objstorages.keys()) - set(self.source))
def _add_unknown_content_ids(self, content_ids, source_objstorage):
"""Check whether some content_id are unknown.
If they are, add them to the archiver db.
Args:
content_ids: List of dict with one key content_id
source_objstorage (ObjStorage): objstorage to check if
content_id is there
"""
unknowns = self.archiver_storage.content_archive_get_unknown(
content_ids)
- for unknown_id in unknowns:
- if unknown_id not in source_objstorage:
- continue
- self.archiver_storage.content_archive_insert(
- unknown_id, self.source, 'present')
+ self.archiver_storage.content_archive_add(
+ [u_id for u_id in unknowns if u_id in self.source],
+ sources_present=[self.source],
+ sources_missing=self.sources_missing)
def get_contents_to_archive(self):
gen_content_ids = (
ids for ids in utils.grouper(read_sha1_from_stdin(),
self.config['batch_max_size']))
source_objstorage = self.objstorages[self.source]
if self.force_copy:
for content_ids in gen_content_ids:
content_ids = list(content_ids)
if not content_ids:
continue
# Add missing entries in archiver table
self._add_unknown_content_ids(content_ids, source_objstorage)
print('Send %s contents to archive' % len(content_ids))
for content in content_ids:
content_id = content['content_id']
# force its status to missing
self.archiver_storage.content_archive_update(
content_id, self.destination, 'missing')
yield content_id
else:
for content_ids in gen_content_ids:
content_ids = list(content_ids)
# Add missing entries in archiver table
self._add_unknown_content_ids(content_ids, source_objstorage)
# Filter already copied data
content_ids = list(
self.archiver_storage.content_archive_get_missing(
content_ids=content_ids,
backend_name=self.destination))
if not content_ids:
continue
print('Send %s contents to archive' % len(content_ids))
for content in content_ids:
yield content
def run_async_worker(self, batch):
"""Produce a worker that will be added to the task queue.
"""
task = app.tasks[self.TASK_NAME]
task.delay(destination=self.destination, batch=batch)
def run_sync_worker(self, batch):
"""Run synchronously a worker on the given batch.
"""
task = app.tasks[self.TASK_NAME]
task(destination=self.destination, batch=batch)
@click.command()
@click.option('--direct', is_flag=True,
help="""The archiver sends content for backup to
one storage.""")
def launch(direct):
if direct:
archiver = ArchiverStdinToBackendDirector()
else:
archiver = ArchiverWithRetentionPolicyDirector()
archiver.run()
if __name__ == '__main__':
launch()
diff --git a/swh/storage/archiver/storage.py b/swh/storage/archiver/storage.py
index fce3c94a..f60c77ca 100644
--- a/swh/storage/archiver/storage.py
+++ b/swh/storage/archiver/storage.py
@@ -1,183 +1,173 @@
# Copyright (C) 2016 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import psycopg2
from .db import ArchiverDb
from swh.storage.common import db_transaction_generator, db_transaction
from swh.storage.exc import StorageDBError
class ArchiverStorage():
"""SWH Archiver storage proxy, encompassing DB
"""
def __init__(self, db_conn):
"""
Args:
db_conn: either a libpq connection string, or a psycopg2 connection
"""
try:
if isinstance(db_conn, psycopg2.extensions.connection):
self.db = ArchiverDb(db_conn)
else:
self.db = ArchiverDb.connect(db_conn)
except psycopg2.OperationalError as e:
raise StorageDBError(e)
@db_transaction_generator
def archive_ls(self, cur=None):
""" Get all the archives registered on the server.
Yields:
a tuple (server_id, server_url) for each archive server.
"""
yield from self.db.archive_ls(cur)
@db_transaction
def content_archive_get(self, content_id, cur=None):
""" Get the archival status of a content.
Retrieve from the database the archival status of the given content
Args:
content_id: the sha1 of the content
Yields:
A tuple (content_id, present_copies, ongoing_copies), where
ongoing_copies is a dict mapping copy to mtime.
"""
return self.db.content_archive_get(content_id, cur)
@db_transaction_generator
def content_archive_get_copies(self, last_content=None, limit=1000,
cur=None):
""" Get the list of copies for `limit` contents starting after
`last_content`.
Args:
last_content: sha1 of the last content retrieved. May be None
to start at the beginning.
limit: number of contents to retrieve. Can be None to retrieve all
objects (will be slow).
Yields:
A tuple (content_id, present_copies, ongoing_copies), where
ongoing_copies is a dict mapping copy to mtime.
"""
yield from self.db.content_archive_get_copies(last_content, limit,
cur)
@db_transaction_generator
def content_archive_get_unarchived_copies(
self, retention_policy, last_content=None,
limit=1000, cur=None):
""" Get the list of copies for `limit` contents starting after
`last_content`. Yields only copies with number of present
smaller than `retention policy`.
Args:
last_content: sha1 of the last content retrieved. May be None
to start at the beginning.
retention_policy: number of required present copies
limit: number of contents to retrieve. Can be None to retrieve all
objects (will be slow).
Yields:
A tuple (content_id, present_copies, ongoing_copies), where
ongoing_copies is a dict mapping copy to mtime.
"""
yield from self.db.content_archive_get_unarchived_copies(
retention_policy, last_content, limit, cur)
@db_transaction_generator
def content_archive_get_missing(self, content_ids, backend_name, cur=None):
"""Retrieve missing sha1s from source_name.
Args:
content_ids ([sha1s]): list of sha1s to test
source_name (str): Name of the backend to check for content
Yields:
missing sha1s from backend_name
"""
db = self.db
db.mktemp_content_archive()
db.copy_to(content_ids, 'tmp_content_archive', ['content_id'], cur)
for content_id in db.content_archive_get_missing(backend_name, cur):
yield content_id[0]
@db_transaction_generator
def content_archive_get_unknown(self, content_ids, cur=None):
"""Retrieve unknown sha1s from content_archive.
Args:
content_ids ([sha1s]): list of sha1s to test
Yields:
Unknown sha1s from content_archive
"""
db = self.db
db.mktemp_content_archive()
db.copy_to(content_ids, 'tmp_content_archive', ['content_id'], cur)
for content_id in db.content_archive_get_unknown(cur):
yield content_id[0]
@db_transaction
def content_archive_update(self, content_id, archive_id,
new_status=None, cur=None):
""" Update the status of an archive content and set its mtime to now
Change the mtime of an archived content for the given archive and set
it's mtime to the current time.
Args:
content_id (str): content sha1
archive_id (str): name of the archive
new_status (str): one of 'missing', 'present' or 'ongoing'.
this status will replace the previous one. If not given,
the function only change the mtime of the content for the
given archive.
"""
self.db.content_archive_update(content_id, archive_id, new_status, cur)
@db_transaction
- def content_archive_insert(self, content_id, source, status, cur=None):
- """Insert a new entry in db about content_id.
-
- Args:
- content_id: content concerned
- source: name of the source
- status: the status of the content for that source
-
- """
- self.db.content_archive_insert(content_id, source, status, cur)
-
- @db_transaction
- def content_archive_content_add(
+ def content_archive_add(
self, content_ids, sources_present, sources_missing, cur=None):
"""Insert a new entry in db about content_id.
Args:
content_ids ([bytes|str]): content identifiers
- sources_present: name of the source where the contents are present
- sources_missing: name of the source where the contents are missing
+ sources_present ([str]): List of source names where
+ contents are present
+ sources_missing ([str]): List of sources names where
+ contents are missing
"""
for content_id in content_ids:
- self.db.content_archive_content_add(
+ self.db.content_archive_add(
content_id, sources_present, sources_missing)

File Metadata

Mime Type
text/x-diff
Expires
Fri, Jul 4, 1:28 PM (6 d, 8 h ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3263083

Event Timeline