Page MenuHomeSoftware Heritage

No OneTemporary

diff --git a/PKG-INFO b/PKG-INFO
index 5ab929225..9b08d190e 100644
--- a/PKG-INFO
+++ b/PKG-INFO
@@ -1,10 +1,10 @@
Metadata-Version: 1.0
Name: swh.storage
-Version: 0.0.40
+Version: 0.0.41
Summary: Software Heritage storage manager
Home-page: https://forge.softwareheritage.org/diffusion/DSTO/
Author: Software Heritage developers
Author-email: swh-devel@inria.fr
License: UNKNOWN
Description: UNKNOWN
Platform: UNKNOWN
diff --git a/swh.storage.egg-info/PKG-INFO b/swh.storage.egg-info/PKG-INFO
index 5ab929225..9b08d190e 100644
--- a/swh.storage.egg-info/PKG-INFO
+++ b/swh.storage.egg-info/PKG-INFO
@@ -1,10 +1,10 @@
Metadata-Version: 1.0
Name: swh.storage
-Version: 0.0.40
+Version: 0.0.41
Summary: Software Heritage storage manager
Home-page: https://forge.softwareheritage.org/diffusion/DSTO/
Author: Software Heritage developers
Author-email: swh-devel@inria.fr
License: UNKNOWN
Description: UNKNOWN
Platform: UNKNOWN
diff --git a/swh/storage/checker/checker.py b/swh/storage/checker/checker.py
index 110d72ea5..69afe4f62 100644
--- a/swh/storage/checker/checker.py
+++ b/swh/storage/checker/checker.py
@@ -1,171 +1,171 @@
# Copyright (C) 2015-2016 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import click
import logging
from swh.core import config, hashutil
from .. import get_storage
from ..objstorage import PathSlicingObjStorage
from ..exc import ObjNotFoundError, Error
DEFAULT_CONFIG = {
'storage_path': ('str', '/srv/softwareheritage/objects'),
'storage_depth': ('int', 3),
'backup_url': ('str', 'http://uffizi:5002/'),
'batch_size': ('int', 1000),
}
class ContentChecker():
""" Content integrity checker that will check local objstorage content.
The checker will check the data of an object storage in order to verify
that no file have been corrupted.
Attributes:
config: dictionary that contains this
checker configuration
objstorage (ObjStorage): Local object storage that will be checked.
master_storage (RemoteStorage): A distant storage that will be used to
restore corrupted content.
"""
- def __init__(self, config, root, depth, backup_urls):
+ def __init__(self, config, root, slicing, backup_urls):
""" Create a checker that ensure the objstorage have no corrupted file.
Args:
config (dict): Dictionary that contains the following keys :
batch_size: Number of content that should be tested each
time the content checker runs.
root (string): Path to the objstorage directory
depth (int): Depth of the object storage.
backup_urls: List of url that can be contacted in order to
get a content.
"""
self.config = config
- self.objstorage = PathSlicingObjStorage(root, depth, slicing=2)
+ self.objstorage = PathSlicingObjStorage(root, slicing)
self.backup_storages = [get_storage('remote_storage', [backup_url])
for backup_url in backup_urls]
def run(self):
""" Start the check routine
"""
corrupted_contents = []
batch_size = self.config['batch_size']
for content_id in self.get_content_to_check(batch_size):
if not self.check_content(content_id):
corrupted_contents.append(content_id)
logging.error('The content', content_id, 'have been corrupted')
self.repair_contents(corrupted_contents)
def run_as_daemon(self):
""" Start the check routine and perform it forever.
Use this method to run the checker when it's done as a daemon that
will iterate over the content forever in background.
"""
while True:
try:
self.run()
except Exception as e:
logging.error('An error occured while verifing the content: %s'
% e)
def get_content_to_check(self, batch_size):
""" Get the content that should be verified.
Returns:
An iterable of the content's id that need to be checked.
"""
contents = self.objstorage.get_random_contents(batch_size)
yield from contents
def check_content(self, content_id):
""" Check the validity of the given content.
Returns:
True if the content was valid, false if it was corrupted.
"""
try:
self.objstorage.check(content_id)
except (ObjNotFoundError, Error) as e:
logging.warning(e)
return False
else:
return True
def repair_contents(self, content_ids):
""" Try to restore the given contents.
Ask the backup storages for the contents that are corrupted on
the local object storage.
If the first storage does not contain the missing contents, send
a request to the second one with only the content that couldn't be
retrieved, and so on until there is no remaining content or servers.
If a content couldn't be retrieved on all the servers, then log it as
an error.
"""
contents_to_get = set(content_ids)
# Iterates over the backup storages.
for backup_storage in self.backup_storages:
# Try to get all the contents that still need to be retrieved.
contents = backup_storage.content_get(list(contents_to_get))
for content in contents:
if content:
hash = content['sha1']
data = content['data']
# When a content is retrieved, remove it from the set
# of needed contents.
contents_to_get.discard(hash)
self.objstorage.restore(data)
# Contents still in contents_to_get couldn't be retrieved.
if contents_to_get:
logging.error(
"Some corrupted contents could not be retrieved : %s"
% [hashutil.hash_to_hex(id) for id in contents_to_get]
)
@click.command()
@click.argument('config-path', required=1)
@click.option('--storage-path', default=DEFAULT_CONFIG['storage_path'][1],
help='Path to the storage to verify')
@click.option('--depth', default=DEFAULT_CONFIG['storage_depth'][1],
type=click.INT, help='Depth of the object storage')
@click.option('--backup-url', default=DEFAULT_CONFIG['backup_url'][1],
help='Url of a remote storage to retrieve corrupted content')
@click.option('--daemon/--nodaemon', default=True,
help='Indicates if the checker should run forever '
'or on a single batch of content')
def launch(config_path, storage_path, depth, backup_url, is_daemon):
# The configuration have following priority :
# command line > file config > default config
cl_config = {
'storage_path': storage_path,
'storage_depth': depth,
'backup_url': backup_url
}
conf = config.read(config_path, DEFAULT_CONFIG)
conf.update(cl_config)
# Create the checker and run
checker = ContentChecker(
{'batch_size': conf['batch_size']},
conf['storage_path'],
conf['storage_depth'],
map(lambda x: x.strip(), conf['backup_url'].split(','))
)
if is_daemon:
checker.run_as_daemon()
else:
checker.run()
if __name__ == '__main__':
launch()
diff --git a/swh/storage/objstorage/api/server.py b/swh/storage/objstorage/api/server.py
index 968ef4620..6cb8885f1 100644
--- a/swh/storage/objstorage/api/server.py
+++ b/swh/storage/objstorage/api/server.py
@@ -1,97 +1,96 @@
# Copyright (C) 2015 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import click
import logging
from flask import Flask, g, request
from swh.core import config
from swh.storage.objstorage import PathSlicingObjStorage
from swh.storage.api.common import (BytesRequest, decode_request,
error_handler,
encode_data_server as encode_data)
DEFAULT_CONFIG = {
'storage_base': ('str', '/tmp/swh-storage/objects/'),
- 'storage_depth': ('int', 3)
+ 'storage_slicing': ('str', '0:2/2:4/4:6')
}
app = Flask(__name__)
app.request_class = BytesRequest
@app.errorhandler(Exception)
def my_error_handler(exception):
return error_handler(exception, encode_data)
@app.before_request
def before_request():
g.objstorage = PathSlicingObjStorage(app.config['storage_base'],
- app.config['storage_depth'],
- slicing=2)
+ app.config['storage_slicing'])
@app.route('/')
def index():
return "SWH Objstorage API server"
@app.route('/content')
def content():
return str(list(g.storage))
@app.route('/content/add', methods=['POST'])
def add_bytes():
return encode_data(g.objstorage.add(**decode_request(request)))
@app.route('/content/get', methods=['POST'])
def get_bytes():
return encode_data(g.objstorage.get(**decode_request(request)))
@app.route('/content/get/random', methods=['POST'])
def get_random_contents():
return encode_data(
g.objstorage.get_random(**decode_request(request))
)
@app.route('/content/check', methods=['POST'])
def check():
return encode_data(g.objstorage.check(**decode_request(request)))
def run_from_webserver(environ, start_response):
"""Run the WSGI app from the webserver, loading the configuration.
"""
config_path = '/etc/softwareheritage/storage/objstorage.ini'
app.config.update(config.read(config_path, DEFAULT_CONFIG))
handler = logging.StreamHandler()
app.logger.addHandler(handler)
return app(environ, start_response)
@click.command()
@click.argument('config-path', required=1)
@click.option('--host', default='0.0.0.0', help="Host to run the server")
@click.option('--port', default=5000, type=click.INT,
help="Binding port of the server")
@click.option('--debug/--nodebug', default=True,
help="Indicates if the server should run in debug mode")
def launch(config_path, host, port, debug):
app.config.update(config.read(config_path, DEFAULT_CONFIG))
app.run(host, port=int(port), debug=bool(debug))
if __name__ == '__main__':
launch()
diff --git a/swh/storage/objstorage/objstorage.py b/swh/storage/objstorage/objstorage.py
index c651b37c1..9e4291766 100644
--- a/swh/storage/objstorage/objstorage.py
+++ b/swh/storage/objstorage/objstorage.py
@@ -1,115 +1,119 @@
# Copyright (C) 2015-2016 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
+ID_HASH_ALGO = 'sha1'
+ID_HASH_LENGTH = 40 # Size in bytes of the hash hexadecimal representation.
+
+
class ObjStorage():
""" High-level API to manipulate the Software Heritage object storage.
Conceptually, the object storage offers 5 methods:
- __contains__() check if an object is present, by object id
- add() add a new object, returning an object id
- restore() same as add() but erase an already existed content
- get() retrieve the content of an object, by object id
- check() check the integrity of an object, by object id
And some management methods:
- get_random() get random object id of existing contents (used for the
content integrity checker).
Each implementation of this interface can have a different behavior and
its own way to store the contents.
"""
def __contains__(self, *args, **kwargs):
raise NotImplementedError(
"Implementations of ObjStorage must have a '__contains__' method"
)
def add(self, content, obj_id=None, check_presence=True, *args, **kwargs):
""" Add a new object to the object storage.
Args:
content: content of the object to be added to the storage.
obj_id: checksum of [bytes] using [ID_HASH_ALGO] algorithm. When
given, obj_id will be trusted to match the bytes. If missing,
obj_id will be computed on the fly.
check_presence: indicate if the presence of the content should be
verified before adding the file.
Returns:
the id of the object into the storage.
"""
raise NotImplementedError(
"Implementations of ObjStorage must have a 'add' method"
)
- def restore(self, content, obj_id, *args, **kwargs):
+ def restore(self, content, obj_id=None, *args, **kwargs):
""" Restore a content that have been corrupted.
This function is identical to add_bytes but does not check if
the object id is already in the file system.
Args:
content: content of the object to be added to the storage
obj_id: checksums of `bytes` as computed by ID_HASH_ALGO. When
given, obj_id will be trusted to match bytes. If missing,
obj_id will be computed on the fly.
"""
raise NotImplemented(
"Implementations of ObjStorage must have a 'restore' method"
)
def get(self, obj_id, *args, **kwargs):
""" Retrieve the content of a given object.
Args:
obj_id: object id.
Returns:
the content of the requested object as bytes.
Raises:
ObjNotFoundError: if the requested object is missing.
"""
raise NotImplementedError(
"Implementations of ObjStorage must have a 'get' method"
)
def check(self, obj_id, *args, **kwargs):
""" Perform an integrity check for a given object.
Verify that the file object is in place and that the gziped content
matches the object id.
Args:
obj_id: object id.
Raises:
ObjNotFoundError: if the requested object is missing.
Error: if the request object is corrupted.
"""
raise NotImplementedError(
"Implementations of ObjStorage must have a 'check' method"
)
def get_random(self, batch_size, *args, **kwargs):
""" Get random ids of existing contents
This method is used in order to get random ids to perform
content integrity verifications on random contents.
Attributes:
batch_size (int): Number of ids that will be given
Yields:
An iterable of ids of contents that are in the current object
storage.
"""
raise NotImplementedError(
"The current implementation of ObjStorage does not support "
"'get_random' operation"
)
diff --git a/swh/storage/objstorage/objstorage_pathslicing.py b/swh/storage/objstorage/objstorage_pathslicing.py
index 875fd753a..7da58b450 100644
--- a/swh/storage/objstorage/objstorage_pathslicing.py
+++ b/swh/storage/objstorage/objstorage_pathslicing.py
@@ -1,350 +1,347 @@
# Copyright (C) 2015-2016 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import os
import gzip
import tempfile
import random
from contextlib import contextmanager
from swh.core import hashutil
-from .objstorage import ObjStorage
+from .objstorage import ObjStorage, ID_HASH_ALGO, ID_HASH_LENGTH
from ..exc import ObjNotFoundError, Error
-ID_HASH_ALGO = 'sha1'
-
GZIP_BUFSIZ = 1048576
DIR_MODE = 0o755
FILE_MODE = 0o644
@contextmanager
def _write_obj_file(hex_obj_id, objstorage):
""" Context manager for writing object files to the object storage.
During writing, data are written to a temporary file, which is atomically
renamed to the right file name after closing. This context manager also
takes care of (gzip) compressing the data on the fly.
Usage sample:
with _write_obj_file(hex_obj_id, objstorage):
f.write(obj_data)
Yields:
a file-like object open for writing bytes.
"""
# Get the final paths and create the directory if absent.
dir = objstorage._obj_dir(hex_obj_id)
if not os.path.isdir(dir):
os.makedirs(dir, DIR_MODE, exist_ok=True)
path = os.path.join(dir, hex_obj_id)
# Create a temporary file.
(tmp, tmp_path) = tempfile.mkstemp(suffix='.tmp', prefix='hex_obj_id.',
dir=dir)
# Open the file and yield it for writing.
tmp_f = os.fdopen(tmp, 'wb')
with gzip.GzipFile(filename=tmp_path, fileobj=tmp_f) as f:
yield f
# Then close the temporary file and move it to the right directory.
tmp_f.close()
os.chmod(tmp_path, FILE_MODE)
os.rename(tmp_path, path)
@contextmanager
def _read_obj_file(hex_obj_id, objstorage):
""" Context manager for reading object file in the object storage.
Usage sample:
with _read_obj_file(hex_obj_id, objstorage) as f:
b = f.read()
Yields:
a file-like object open for reading bytes.
"""
path = objstorage._obj_path(hex_obj_id)
with gzip.GzipFile(path, 'rb') as f:
yield f
class PathSlicingObjStorage(ObjStorage):
""" Implementation of the ObjStorage API based on the hash of the content.
On disk, an object storage is a directory tree containing files named after
their object IDs. An object ID is a checksum of its content, depending on
the value of the ID_HASH_ALGO constant (see hashutil for its meaning).
To avoid directories that contain too many files, the object storage has a
- given depth. Each depth level consumes a given amount of characters of
- the object id.
+ given slicing. Each slicing correspond to a directory that is named
+ according to the hash of its content.
So for instance a file with SHA1 34973274ccef6ab4dfaaf86599792fa9c3fe4689
will be stored in the given object storages :
- - depth=3, slicing=2 : 34/97/32/34973274ccef6ab4dfaaf86599792fa9c3fe4689
- - depth=1, slicing=5 : 34973/34973274ccef6ab4dfaaf86599792fa9c3fe4689
+ - 0:2/2:4/4:6 : 34/97/32/34973274ccef6ab4dfaaf86599792fa9c3fe4689
+ - 0:1/0:5/ : 3/34973/34973274ccef6ab4dfaaf86599792fa9c3fe4689
The files in the storage are stored in gzipped compressed format.
Attributes:
root (string): path to the root directory of the storage on the disk.
- depth (int): number of subdirectories created to store a file.
- slicing (int): number of hash character consumed for each
- subdirectories.
+ bounds: list of tuples that indicates the beginning and the end of
+ each subdirectory for a content.
"""
- def __init__(self, root, depth, slicing):
+ def __init__(self, root, slicing):
""" Create an object to access a hash-slicing based object storage.
Args:
root (string): path to the root directory of the storage on
the disk.
- depth (int): number of subdirectories created to store a file.
- slicing (int): number of hash character consumed for each
- subdirectories.
+ slicing (string): string that indicates the slicing to perform
+ on the hash of the content to know the path where it should
+ be stored.
"""
if not os.path.isdir(root):
raise ValueError(
'PathSlicingObjStorage root "%s" is not a directory' % root
)
self.root = root
- self.depth = depth
- self.slicing = slicing
+ # Make a list of tuples where each tuple contains the beginning
+ # and the end of each slicing.
+ self.bounds = [
+ slice(*map(int, sbounds.split(':')))
+ for sbounds in slicing.split('/')
+ if sbounds
+ ]
+
+ max_endchar = max(map(lambda bound: bound.stop, self.bounds))
+ if ID_HASH_LENGTH < max_endchar:
+ raise ValueError(
+ 'Algorithm %s has too short hash for slicing to char %d'
+ % (ID_HASH_ALGO, max_endchar)
+ )
def __contains__(self, obj_id):
""" Check whether the given object is present in the storage or not.
Returns:
True iff the object is present in the storage.
"""
hex_obj_id = hashutil.hash_to_hex(obj_id)
return os.path.exists(self._obj_path(hex_obj_id))
def __iter__(self):
"""iterate over the object identifiers currently available in the storage
Warning: with the current implementation of the object storage, this
method will walk the filesystem to list objects, meaning that listing
all objects will be very slow for large storages. You almost certainly
don't want to use this method in production.
Return:
iterator over object IDs
"""
def obj_iterator():
# XXX hackish: it does not verify that the depth of found files
# matches the slicing depth of the storage
for root, _dirs, files in os.walk(self.root):
for f in files:
yield bytes.fromhex(f)
return obj_iterator()
def __len__(self):
"""compute the number of objects available in the storage
Warning: this currently uses `__iter__`, its warning about bad
performances applies
Return:
number of objects contained in the storage
"""
return sum(1 for i in self)
def _obj_dir(self, hex_obj_id):
""" Compute the storage directory of an object.
See also: PathSlicingObjStorage::_obj_path
Args:
hex_obj_id: object id as hexlified string.
Returns:
Path to the directory that contains the required object.
"""
- if len(hex_obj_id) < self.depth * self.slicing:
- raise ValueError(
- 'Object id "%s" is to short for %d-slicing at depth %d'
- % (hex_obj_id, self.slicing, self.depth)
- )
-
- # Compute [depth] substrings of [hex_obj_id], each of length [slicing],
- # starting from the beginning.
- id_steps = [hex_obj_id[i * self.slicing:
- i * self.slicing + self.slicing]
- for i in range(self.depth)]
- steps = [self.root] + id_steps
-
- return os.path.join(*steps)
+ slices = [hex_obj_id[bound] for bound in self.bounds]
+ return os.path.join(self.root, *slices)
def _obj_path(self, hex_obj_id):
""" Compute the full path to an object into the current storage.
See also: PathSlicingObjStorage::_obj_dir
Args:
hex_obj_id: object id as hexlified string.
Returns:
Path to the actual object corresponding to the given id.
"""
return os.path.join(self._obj_dir(hex_obj_id), hex_obj_id)
def add(self, bytes, obj_id=None, check_presence=True):
""" Add a new object to the object storage.
Args:
bytes: content of the object to be added to the storage.
obj_id: checksum of [bytes] using [ID_HASH_ALGO] algorithm. When
given, obj_id will be trusted to match the bytes. If missing,
obj_id will be computed on the fly.
check_presence: indicate if the presence of the content should be
verified before adding the file.
Returns:
the id of the object into the storage.
"""
if obj_id is None:
# Checksum is missing, compute it on the fly.
h = hashutil._new_hash(ID_HASH_ALGO, len(bytes))
h.update(bytes)
obj_id = h.digest()
if check_presence and obj_id in self:
# If the object is already present, return immediatly.
return obj_id
hex_obj_id = hashutil.hash_to_hex(obj_id)
with _write_obj_file(hex_obj_id, self) as f:
f.write(bytes)
return obj_id
def restore(self, bytes, obj_id=None):
""" Restore a content that have been corrupted.
This function is identical to add_bytes but does not check if
the object id is already in the file system.
Args:
bytes: content of the object to be added to the storage
obj_id: checksums of `bytes` as computed by ID_HASH_ALGO. When
given, obj_id will be trusted to match bytes. If missing,
obj_id will be computed on the fly.
"""
return self.add(bytes, obj_id, check_presence=False)
def get(self, obj_id):
""" Retrieve the content of a given object.
Args:
obj_id: object id.
Returns:
the content of the requested object as bytes.
Raises:
ObjNotFoundError: if the requested object is missing.
"""
if obj_id not in self:
raise ObjNotFoundError(obj_id)
# Open the file and return its content as bytes
hex_obj_id = hashutil.hash_to_hex(obj_id)
with _read_obj_file(hex_obj_id, self) as f:
return f.read()
def check(self, obj_id):
""" Perform an integrity check for a given object.
Verify that the file object is in place and that the gziped content
matches the object id.
Args:
obj_id: object id.
Raises:
ObjNotFoundError: if the requested object is missing.
Error: if the request object is corrupted.
"""
if obj_id not in self:
raise ObjNotFoundError(obj_id)
hex_obj_id = hashutil.hash_to_hex(obj_id)
try:
with gzip.open(self._obj_path(hex_obj_id)) as f:
length = None
if ID_HASH_ALGO.endswith('_git'):
# if the hashing algorithm is git-like, we need to know the
# content size to hash on the fly. Do a first pass here to
# compute the size
length = 0
while True:
chunk = f.read(GZIP_BUFSIZ)
length += len(chunk)
if not chunk:
break
f.rewind()
checksums = hashutil._hash_file_obj(f, length,
algorithms=[ID_HASH_ALGO])
actual_obj_id = checksums[ID_HASH_ALGO]
if obj_id != actual_obj_id:
raise Error(
'Corrupt object %s should have id %s'
% (hashutil.hash_to_hex(obj_id),
hashutil.hash_to_hex(actual_obj_id))
)
except (OSError, IOError):
# IOError is for compatibility with older python versions
raise Error('Corrupt object %s is not a gzip file' % obj_id)
def get_random(self, batch_size):
""" Get random ids of existing contents
This method is used in order to get random ids to perform
content integrity verifications on random contents.
Attributes:
batch_size (int): Number of ids that will be given
Yields:
An iterable of ids of contents that are in the current object
storage.
"""
def get_random_content(self, batch_size):
""" Get a batch of content inside a single directory.
Returns:
a tuple (batch size, batch).
"""
dirs = []
- for level in range(self.depth):
+ for level in range(len(self.bounds)):
path = os.path.join(self.root, *dirs)
dir_list = next(os.walk(path))[1]
if 'tmp' in dir_list:
dir_list.remove('tmp')
dirs.append(random.choice(dir_list))
path = os.path.join(self.root, *dirs)
content_list = next(os.walk(path))[2]
length = min(batch_size, len(content_list))
return length, map(hashutil.hex_to_hash,
random.sample(content_list, length))
while batch_size:
length, it = get_random_content(self, batch_size)
batch_size = batch_size - length
yield from it
diff --git a/swh/storage/storage.py b/swh/storage/storage.py
index 3fb8cc6a7..c345539d1 100644
--- a/swh/storage/storage.py
+++ b/swh/storage/storage.py
@@ -1,1078 +1,1080 @@
# Copyright (C) 2015 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
from collections import defaultdict
import datetime
import functools
import itertools
import dateutil.parser
import psycopg2
from . import converters
from .db import Db
from .objstorage import PathSlicingObjStorage
from .exc import ObjNotFoundError, StorageDBError
from swh.core.hashutil import ALGORITHMS
# Max block size of contents to return
BULK_BLOCK_CONTENT_LEN_MAX = 10000
def db_transaction(meth):
"""decorator to execute Storage methods within DB transactions
The decorated method must accept a `cur` keyword argument
"""
@functools.wraps(meth)
def _meth(self, *args, **kwargs):
with self.db.transaction() as cur:
return meth(self, *args, cur=cur, **kwargs)
return _meth
def db_transaction_generator(meth):
"""decorator to execute Storage methods within DB transactions, while
returning a generator
The decorated method must accept a `cur` keyword argument
"""
@functools.wraps(meth)
def _meth(self, *args, **kwargs):
with self.db.transaction() as cur:
yield from meth(self, *args, cur=cur, **kwargs)
return _meth
class Storage():
"""SWH storage proxy, encompassing DB and object storage
"""
def __init__(self, db_conn, obj_root):
"""
Args:
db_conn: either a libpq connection string, or a psycopg2 connection
obj_root: path to the root of the object storage
"""
try:
if isinstance(db_conn, psycopg2.extensions.connection):
self.db = Db(db_conn)
else:
self.db = Db.connect(db_conn)
except psycopg2.OperationalError as e:
raise StorageDBError(e)
- self.objstorage = PathSlicingObjStorage(obj_root, depth=3, slicing=2)
+ # TODO this needs to be configured
+ self.objstorage = PathSlicingObjStorage(obj_root,
+ slicing='0:2/2:4/4:6')
def content_add(self, content):
"""Add content blobs to the storage
Note: in case of DB errors, objects might have already been added to
the object storage and will not be removed. Since addition to the
object storage is idempotent, that should not be a problem.
Args:
content: iterable of dictionaries representing individual pieces of
content to add. Each dictionary has the following keys:
- data (bytes): the actual content
- length (int): content length (default: -1)
- one key for each checksum algorithm in
swh.core.hashutil.ALGORITHMS, mapped to the corresponding
checksum
- status (str): one of visible, hidden, absent
- reason (str): if status = absent, the reason why
- origin (int): if status = absent, the origin we saw the
content in
"""
db = self.db
content_by_status = defaultdict(list)
for d in content:
if 'status' not in d:
d['status'] = 'visible'
if 'length' not in d:
d['length'] = -1
content_by_status[d['status']].append(d)
content_with_data = content_by_status['visible']
content_without_data = content_by_status['absent']
missing_content = set(self.content_missing(content_with_data))
missing_skipped = set(
sha1_git for sha1, sha1_git, sha256
in self.skipped_content_missing(content_without_data))
with db.transaction() as cur:
if missing_content:
# create temporary table for metadata injection
db.mktemp('content', cur)
def add_to_objstorage(cont):
self.objstorage.add(cont['data'],
obj_id=cont['sha1'])
content_filtered = (cont for cont in content_with_data
if cont['sha1'] in missing_content)
db.copy_to(content_filtered, 'tmp_content',
['sha1', 'sha1_git', 'sha256', 'length', 'status'],
cur, item_cb=add_to_objstorage)
# move metadata in place
db.content_add_from_temp(cur)
if missing_skipped:
missing_filtered = (cont for cont in content_without_data
if cont['sha1_git'] in missing_skipped)
db.mktemp('skipped_content', cur)
db.copy_to(missing_filtered, 'tmp_skipped_content',
['sha1', 'sha1_git', 'sha256', 'length',
'reason', 'status', 'origin'], cur)
# move metadata in place
db.skipped_content_add_from_temp(cur)
def content_get(self, content):
"""Retrieve in bulk contents and their data.
Args:
content: iterables of sha1
Returns:
Generates streams of contents as dict with their raw data:
- sha1: sha1's content
- data: bytes data of the content
Raises:
ValueError in case of too much contents are required.
cf. BULK_BLOCK_CONTENT_LEN_MAX
"""
# FIXME: Improve on server module to slice the result
if len(content) > BULK_BLOCK_CONTENT_LEN_MAX:
raise ValueError(
"Send at maximum %s contents." % BULK_BLOCK_CONTENT_LEN_MAX)
for obj_id in content:
try:
data = self.objstorage.get(obj_id)
except ObjNotFoundError:
yield None
continue
yield {'sha1': obj_id, 'data': data}
@db_transaction_generator
def content_missing(self, content, key_hash='sha1', cur=None):
"""List content missing from storage
Args:
content: iterable of dictionaries containing one key for each
checksum algorithm in swh.core.hashutil.ALGORITHMS, mapped to
the corresponding checksum, and a length key mapped to the
content length.
key_hash: the name of the hash used as key (default: 'sha1')
Returns:
an iterable of `key_hash`es missing from the storage
Raises:
TODO: an exception when we get a hash collision.
"""
db = self.db
keys = ['sha1', 'sha1_git', 'sha256']
if key_hash not in keys:
raise ValueError("key_hash should be one of %s" % keys)
key_hash_idx = keys.index(key_hash)
# Create temporary table for metadata injection
db.mktemp('content', cur)
db.copy_to(content, 'tmp_content', keys + ['length'], cur)
for obj in db.content_missing_from_temp(cur):
yield obj[key_hash_idx]
@db_transaction_generator
def content_missing_per_sha1(self, contents, cur=None):
"""List content missing from storage based only on sha1.
Args:
contents: Iterable of sha1 to check for absence.
Returns:
an iterable of `sha1`s missing from the storage.
Raises:
TODO: an exception when we get a hash collision.
"""
db = self.db
db.store_tmp_bytea(contents, cur)
for obj in db.content_missing_per_sha1_from_temp(cur):
yield obj[0]
@db_transaction_generator
def skipped_content_missing(self, content, cur=None):
"""List skipped_content missing from storage
Args:
content: iterable of dictionaries containing the data for each
checksum algorithm.
Returns:
an iterable of signatures missing from the storage
"""
keys = ['sha1', 'sha1_git', 'sha256']
db = self.db
db.mktemp('skipped_content', cur)
db.copy_to(content, 'tmp_skipped_content',
keys + ['length', 'reason'], cur)
yield from db.skipped_content_missing_from_temp(cur)
@db_transaction
def content_find(self, content, cur=None):
"""Find a content hash in db.
Args:
content: a dictionary representing one content hash, mapping
checksum algorithm names (see swh.core.hashutil.ALGORITHMS) to
checksum values
Returns:
a triplet (sha1, sha1_git, sha256) if the content exist
or None otherwise.
Raises:
ValueError in case the key of the dictionary is not sha1, sha1_git
nor sha256.
"""
db = self.db
if not set(content).intersection(ALGORITHMS):
raise ValueError('content keys must contain at least one of: '
'sha1, sha1_git, sha256')
c = db.content_find(sha1=content.get('sha1'),
sha1_git=content.get('sha1_git'),
sha256=content.get('sha256'),
cur=cur)
if c:
keys = ['sha1', 'sha1_git', 'sha256', 'length', 'ctime', 'status']
return dict(zip(keys, c))
return None
@db_transaction
def content_find_occurrence(self, content, cur=None):
"""Find the content's occurrence.
Args:
content: a dictionary entry representing one content hash.
The dictionary key is one of swh.core.hashutil.ALGORITHMS.
The value mapped to the corresponding checksum.
Returns:
The occurrence of the content.
Raises:
ValueError in case the key of the dictionary is not sha1, sha1_git
nor sha256.
"""
db = self.db
c = self.content_find(content)
if not c:
return None
sha1 = c['sha1']
found_occ = db.content_find_occurrence(sha1, cur=cur)
if found_occ:
keys = ['origin_type', 'origin_url', 'branch', 'target',
'target_type', 'path']
return dict(zip(keys, found_occ))
return None
def directory_add(self, directories):
"""Add directories to the storage
Args:
directories: iterable of dictionaries representing the individual
directories to add. Each dict has the following keys:
- id (sha1_git): the id of the directory to add
- entries (list): list of dicts for each entry in the
directory. Each dict has the following keys:
- name (bytes)
- type (one of 'file', 'dir', 'rev'):
type of the directory entry (file, directory, revision)
- target (sha1_git): id of the object pointed at by the
directory entry
- perms (int): entry permissions
"""
dirs = set()
dir_entries = {
'file': defaultdict(list),
'dir': defaultdict(list),
'rev': defaultdict(list),
}
for cur_dir in directories:
dir_id = cur_dir['id']
dirs.add(dir_id)
for src_entry in cur_dir['entries']:
entry = src_entry.copy()
entry['dir_id'] = dir_id
dir_entries[entry['type']][dir_id].append(entry)
dirs_missing = set(self.directory_missing(dirs))
if not dirs_missing:
return
db = self.db
with db.transaction() as cur:
# Copy directory ids
dirs_missing_dict = ({'id': dir} for dir in dirs_missing)
db.mktemp('directory', cur)
db.copy_to(dirs_missing_dict, 'tmp_directory', ['id'], cur)
# Copy entries
for entry_type, entry_list in dir_entries.items():
entries = itertools.chain.from_iterable(
entries_for_dir
for dir_id, entries_for_dir
in entry_list.items()
if dir_id in dirs_missing)
db.mktemp_dir_entry(entry_type)
db.copy_to(
entries,
'tmp_directory_entry_%s' % entry_type,
['target', 'name', 'perms', 'dir_id'],
cur,
)
# Do the final copy
db.directory_add_from_temp(cur)
@db_transaction_generator
def directory_missing(self, directories, cur):
"""List directories missing from storage
Args: an iterable of directory ids
Returns: a list of missing directory ids
"""
db = self.db
# Create temporary table for metadata injection
db.mktemp('directory', cur)
directories_dicts = ({'id': dir} for dir in directories)
db.copy_to(directories_dicts, 'tmp_directory', ['id'], cur)
for obj in db.directory_missing_from_temp(cur):
yield obj[0]
@db_transaction_generator
def directory_get(self,
directories,
cur=None):
"""Get information on directories.
Args:
- directories: an iterable of directory ids
Returns:
List of directories as dict with keys and associated values.
"""
db = self.db
keys = ('id', 'dir_entries', 'file_entries', 'rev_entries')
db.mktemp('directory', cur)
db.copy_to(({'id': dir_id} for dir_id in directories),
'tmp_directory', ['id'], cur)
dirs = db.directory_get_from_temp(cur)
for line in dirs:
yield dict(zip(keys, line))
@db_transaction_generator
def directory_ls(self, directory, recursive=False, cur=None):
"""Get entries for one directory.
Args:
- directory: the directory to list entries from.
- recursive: if flag on, this list recursively from this directory.
Returns:
List of entries for such directory.
"""
db = self.db
keys = ['dir_id', 'type', 'target', 'name', 'perms', 'status',
'sha1', 'sha1_git', 'sha256']
if recursive:
res_gen = db.directory_walk(directory)
else:
res_gen = db.directory_walk_one(directory)
for line in res_gen:
yield dict(zip(keys, line))
@db_transaction
def directory_entry_get_by_path(self, directory, paths, cur=None):
"""Get the directory entry (either file or dir) from directory with
path.
Args:
- directory: sha1 of the top level directory
- paths: path to lookup from the top level directory. From left
(top) to right (bottom).
Returns:
The corresponding directory entry if found, None otherwise.
"""
db = self.db
keys = ('dir_id', 'type', 'target', 'name', 'perms', 'status',
'sha1', 'sha1_git', 'sha256')
res = db.directory_entry_get_by_path(directory, paths, cur)
if res:
return dict(zip(keys, res))
def revision_add(self, revisions):
"""Add revisions to the storage
Args:
revisions: iterable of dictionaries representing the individual
revisions to add. Each dict has the following keys:
- id (sha1_git): id of the revision to add
- date (datetime.DateTime): date the revision was written
- date_offset (int): offset from UTC in minutes the revision
was written
- date_neg_utc_offset (boolean): whether a null date_offset
represents a negative UTC offset
- committer_date (datetime.DateTime): date the revision got
added to the origin
- committer_date_offset (int): offset from UTC in minutes the
revision was added to the origin
- committer_date_neg_utc_offset (boolean): whether a null
committer_date_offset represents a negative UTC offset
- type (one of 'git', 'tar'): type of the revision added
- directory (sha1_git): the directory the revision points at
- message (bytes): the message associated with the revision
- author_name (bytes): the name of the revision author
- author_email (bytes): the email of the revision author
- committer_name (bytes): the name of the revision committer
- committer_email (bytes): the email of the revision committer
- metadata (jsonb): extra information as dictionary
- synthetic (bool): revision's nature (tarball, directory
creates synthetic revision)
- parents (list of sha1_git): the parents of this revision
"""
db = self.db
revisions_missing = set(self.revision_missing(
set(revision['id'] for revision in revisions)))
if not revisions_missing:
return
with db.transaction() as cur:
db.mktemp_revision(cur)
revisions_filtered = (
converters.revision_to_db(revision) for revision in revisions
if revision['id'] in revisions_missing)
parents_filtered = []
db.copy_to(
revisions_filtered, 'tmp_revision', db.revision_add_cols,
cur,
lambda rev: parents_filtered.extend(rev['parents']))
db.revision_add_from_temp(cur)
db.copy_to(parents_filtered, 'revision_history',
['id', 'parent_id', 'parent_rank'], cur)
@db_transaction_generator
def revision_missing(self, revisions, cur=None):
"""List revisions missing from storage
Args: an iterable of revision ids
Returns: a list of missing revision ids
"""
db = self.db
db.store_tmp_bytea(revisions, cur)
for obj in db.revision_missing_from_temp(cur):
yield obj[0]
@db_transaction_generator
def revision_get(self, revisions, cur):
"""Get all revisions from storage
Args: an iterable of revision ids
Returns: an iterable of revisions as dictionaries
(or None if the revision doesn't exist)
"""
db = self.db
db.store_tmp_bytea(revisions, cur)
for line in self.db.revision_get_from_temp(cur):
data = converters.db_to_revision(
dict(zip(db.revision_get_cols, line))
)
if not data['type']:
yield None
continue
yield data
@db_transaction_generator
def revision_log(self, revisions, limit=None, cur=None):
"""Fetch revision entry from the given root revisions.
Args:
- revisions: array of root revision to lookup
- limit: limitation on the output result. Default to null.
Yields:
List of revision log from such revisions root.
"""
db = self.db
for line in db.revision_log(revisions, limit, cur):
data = converters.db_to_revision(
dict(zip(db.revision_get_cols, line))
)
if not data['type']:
yield None
continue
yield data
@db_transaction_generator
def revision_shortlog(self, revisions, limit=None, cur=None):
"""Fetch the shortlog for the given revisions
Args:
revisions: list of root revisions to lookup
limit: depth limitation for the output
Yields:
a list of (id, parents) tuples.
"""
db = self.db
yield from db.revision_shortlog(revisions, limit, cur)
@db_transaction_generator
def revision_log_by(self, origin_id, limit=None, cur=None):
"""Fetch revision entry from the actual origin_id's latest revision.
"""
db = self.db
for line in db.revision_log_by(origin_id, limit, cur):
data = converters.db_to_revision(
dict(zip(db.revision_get_cols, line))
)
if not data['type']:
yield None
continue
yield data
def release_add(self, releases):
"""Add releases to the storage
Args:
releases: iterable of dictionaries representing the individual
releases to add. Each dict has the following keys:
- id (sha1_git): id of the release to add
- revision (sha1_git): id of the revision the release points
to
- date (datetime.DateTime): the date the release was made
- date_offset (int): offset from UTC in minutes the release was
made
- date_neg_utc_offset (boolean): whether a null date_offset
represents a negative UTC offset
- name (bytes): the name of the release
- comment (bytes): the comment associated with the release
- author_name (bytes): the name of the release author
- author_email (bytes): the email of the release author
"""
db = self.db
release_ids = set(release['id'] for release in releases)
releases_missing = set(self.release_missing(release_ids))
if not releases_missing:
return
with db.transaction() as cur:
db.mktemp_release(cur)
releases_filtered = (
converters.release_to_db(release) for release in releases
if release['id'] in releases_missing
)
db.copy_to(releases_filtered, 'tmp_release', db.release_add_cols,
cur)
db.release_add_from_temp(cur)
@db_transaction_generator
def release_missing(self, releases, cur=None):
"""List releases missing from storage
Args: an iterable of release ids
Returns: a list of missing release ids
"""
db = self.db
# Create temporary table for metadata injection
db.store_tmp_bytea(releases, cur)
for obj in db.release_missing_from_temp(cur):
yield obj[0]
@db_transaction_generator
def release_get(self, releases, cur=None):
"""Given a list of sha1, return the releases's information
Args:
releases: list of sha1s
Returns:
Generates the list of releases dict with the following keys:
- id: origin's id
- revision: origin's type
- url: origin's url
- lister: lister's uuid
- project: project's uuid (FIXME, retrieve this information)
Raises:
ValueError if the keys does not match (url and type) nor id.
"""
db = self.db
# Create temporary table for metadata injection
db.store_tmp_bytea(releases, cur)
for release in db.release_get_from_temp(cur):
yield converters.db_to_release(
dict(zip(db.release_get_cols, release))
)
@db_transaction
def occurrence_add(self, occurrences, cur=None):
"""Add occurrences to the storage
Args:
occurrences: iterable of dictionaries representing the individual
occurrences to add. Each dict has the following keys:
- origin (int): id of the origin corresponding to the
occurrence
- branch (str): the reference name of the occurrence
- target (sha1_git): the id of the object pointed to by
the occurrence
- target_type (str): the type of object pointed to by the
occurrence
- date (datetime.DateTime): the validity date for the given
occurrence
"""
db = self.db
processed = []
for occurrence in occurrences:
if isinstance(occurrence['date'], str):
occurrence['date'] = dateutil.parser.parse(occurrence['date'])
processed.append(occurrence)
db.mktemp_occurrence_history(cur)
db.copy_to(processed, 'tmp_occurrence_history',
['origin', 'branch', 'target', 'target_type', 'date'], cur)
db.occurrence_history_add_from_temp(cur)
@db_transaction_generator
def occurrence_get(self, origin_id, cur=None):
"""Retrieve occurrence information per origin_id.
Args:
origin_id: The occurrence's origin.
Yields:
List of occurrences matching criterion.
"""
db = self.db
for line in db.occurrence_get(origin_id, cur):
yield {
'origin': line[0],
'branch': line[1],
'target': line[2],
'target_type': line[3],
}
@db_transaction_generator
def revision_get_by(self,
origin_id,
branch_name=None,
timestamp=None,
limit=None,
cur=None):
"""Given an origin_id, retrieve occurrences' list per given criterions.
Args:
origin_id: The origin to filter on.
branch_name: optional branch name.
timestamp:
limit:
Yields:
List of occurrences matching the criterions or None if nothing is
found.
"""
for line in self.db.revision_get_by(origin_id,
branch_name,
timestamp,
limit=limit,
cur=cur):
data = converters.db_to_revision(
dict(zip(self.db.revision_get_cols, line))
)
if not data['type']:
yield None
continue
yield data
def release_get_by(self, origin_id, limit=None):
"""Given an origin id, return all the tag objects pointing to heads of
origin_id.
Args:
origin_id: the origin to filter on.
limit: None by default
Yields:
List of releases matching the criterions or None if nothing is
found.
"""
for line in self.db.release_get_by(origin_id, limit=limit):
data = converters.db_to_release(
dict(zip(self.db.release_get_cols, line))
)
yield data
@db_transaction
def object_find_by_sha1_git(self, ids, cur=None):
"""Return the objects found with the given ids.
Args:
ids: a generator of sha1_gits
Returns:
a dict mapping the id to the list of objects found. Each object
found is itself a dict with keys:
sha1_git: the input id
type: the type of object found
id: the id of the object found
object_id: the numeric id of the object found.
"""
db = self.db
ret = {id: [] for id in ids}
for retval in db.object_find_by_sha1_git(ids):
if retval[1]:
ret[retval[0]].append(dict(zip(db.object_find_by_sha1_git_cols,
retval)))
return ret
@db_transaction
def origin_get(self, origin, cur=None):
"""Return the origin either identified by its id or its tuple
(type, url).
Args:
origin: dictionary representing the individual
origin to find.
This dict has either the keys type and url:
- type (FIXME: enum TBD): the origin type ('git', 'wget', ...)
- url (bytes): the url the origin points to
either the id:
- id: the origin id
Returns:
the origin dict with the keys:
- id: origin's id
- type: origin's type
- url: origin's url
- lister: lister's uuid
- project: project's uuid (FIXME, retrieve this information)
Raises:
ValueError if the keys does not match (url and type) nor id.
"""
db = self.db
keys = ['id', 'type', 'url', 'lister', 'project']
origin_id = origin.get('id')
if origin_id: # check lookup per id first
ori = db.origin_get(origin_id, cur)
elif 'type' in origin and 'url' in origin: # or lookup per type, url
ori = db.origin_get_with(origin['type'], origin['url'], cur)
else: # unsupported lookup
raise ValueError('Origin must have either id or (type and url).')
if ori:
return dict(zip(keys, ori))
return None
@db_transaction
def _person_add(self, person, cur=None):
"""Add a person in storage.
BEWARE: Internal function for now.
Do not do anything fancy in case a person already exists.
Please adapt code if more checks are needed.
Args:
person dictionary with keys name and email.
Returns:
Id of the new person.
"""
db = self.db
return db.person_add(person)
@db_transaction_generator
def person_get(self, person, cur=None):
"""Return the persons identified by their ids.
Args:
person: array of ids.
Returns:
The array of persons corresponding of the ids.
"""
db = self.db
for person in db.person_get(person):
yield dict(zip(db.person_get_cols, person))
@db_transaction
def origin_add_one(self, origin, cur=None):
"""Add origin to the storage
Args:
origin: dictionary representing the individual
origin to add. This dict has the following keys:
- type (FIXME: enum TBD): the origin type ('git', 'wget', ...)
- url (bytes): the url the origin points to
Returns:
the id of the added origin, or of the identical one that already
exists.
"""
db = self.db
data = db.origin_get_with(origin['type'], origin['url'], cur)
if data:
return data[0]
return db.origin_add(origin['type'], origin['url'], cur)
@db_transaction
def fetch_history_start(self, origin_id, cur=None):
"""Add an entry for origin origin_id in fetch_history. Returns the id
of the added fetch_history entry
"""
fetch_history = {
'origin': origin_id,
'date': datetime.datetime.now(tz=datetime.timezone.utc),
}
return self.db.create_fetch_history(fetch_history, cur)
@db_transaction
def fetch_history_end(self, fetch_history_id, data, cur=None):
"""Close the fetch_history entry with id `fetch_history_id`, replacing
its data with `data`.
"""
now = datetime.datetime.now(tz=datetime.timezone.utc)
fetch_history = self.db.get_fetch_history(fetch_history_id, cur)
if not fetch_history:
raise ValueError('No fetch_history with id %d' % fetch_history_id)
fetch_history['duration'] = now - fetch_history['date']
fetch_history.update(data)
self.db.update_fetch_history(fetch_history, cur)
@db_transaction
def fetch_history_get(self, fetch_history_id, cur=None):
"""Get the fetch_history entry with id `fetch_history_id`.
"""
return self.db.get_fetch_history(fetch_history_id, cur)
@db_transaction
def entity_add(self, entities, cur=None):
"""Add the given entitites to the database (in entity_history).
Args:
- entities: iterable of dictionaries containing the following keys:
- uuid (uuid): id of the entity
- parent (uuid): id of the parent entity
- name (str): name of the entity
- type (str): type of entity (one of 'organization',
'group_of_entities', 'hosting', 'group_of_persons',
'person', 'project')
- description (str, optional): description of the entity
- homepage (str): url of the entity's homepage
- active (bool): whether the entity is active
- generated (bool): whether the entity was generated
- lister_metadata (dict): lister-specific entity metadata
- metadata (dict): other metadata for the entity
- validity (datetime.DateTime array): timestamps at which we
listed the entity.
"""
db = self.db
cols = list(db.entity_history_cols)
cols.remove('id')
db.mktemp_entity_history()
db.copy_to(entities, 'tmp_entity_history', cols, cur)
db.entity_history_add_from_temp()
@db_transaction_generator
def entity_get_from_lister_metadata(self, entities, cur=None):
"""Fetch entities from the database, matching with the lister and
associated metadata.
Args:
entities: iterable of dictionaries containing the lister metadata
to look for. Useful keys are 'lister', 'type', 'id', ...
Returns:
A generator of fetched entities with all their attributes. If no
match was found, the returned entity is None.
"""
db = self.db
db.mktemp_entity_lister(cur)
mapped_entities = []
for i, entity in enumerate(entities):
mapped_entity = {
'id': i,
'lister_metadata': entity,
}
mapped_entities.append(mapped_entity)
db.copy_to(mapped_entities, 'tmp_entity_lister',
['id', 'lister_metadata'], cur)
cur.execute('''select id, %s
from swh_entity_from_tmp_entity_lister()
order by id''' %
','.join(db.entity_cols))
for id, *entity_vals in cur:
fetched_entity = dict(zip(db.entity_cols, entity_vals))
if fetched_entity['uuid']:
yield fetched_entity
else:
yield {
'uuid': None,
'lister_metadata': entities[i],
}
@db_transaction_generator
def entity_get(self, uuid, cur=None):
"""Returns the list of entity per its uuid identifier and also its
parent hierarchy.
Args:
uuid: entity's identifier
Returns:
List of entities starting with entity with uuid and the parent
hierarchy from such entity.
"""
db = self.db
for entity in db.entity_get(uuid, cur):
yield dict(zip(db.entity_cols, entity))
@db_transaction
def entity_get_one(self, uuid, cur=None):
"""Returns one entity using its uuid identifier.
Args:
uuid: entity's identifier
Returns:
the object corresponding to the given entity
"""
db = self.db
entity = db.entity_get_one(uuid, cur)
if entity:
return dict(zip(db.entity_cols, entity))
else:
return None
@db_transaction
def stat_counters(self, cur=None):
"""compute statistics about the number of tuples in various tables
Returns:
a dictionary mapping textual labels (e.g., content) to integer
values (e.g., the number of tuples in table content)
"""
return {k: v for (k, v) in self.db.stat_counters()}
diff --git a/swh/storage/tests/test_archiver.py b/swh/storage/tests/test_archiver.py
index 0db83a2ee..df5f45e91 100644
--- a/swh/storage/tests/test_archiver.py
+++ b/swh/storage/tests/test_archiver.py
@@ -1,245 +1,245 @@
# Copyright (C) 2015 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import tempfile
import unittest
import os
from nose.tools import istest
from nose.plugins.attrib import attr
from datetime import datetime, timedelta
from swh.core import hashutil
from swh.core.tests.db_testing import DbTestFixture
from server_testing import ServerTestFixture
from swh.storage import Storage
from swh.storage.exc import ObjNotFoundError
from swh.storage.archiver import ArchiverDirector, ArchiverWorker
from swh.storage.objstorage.api.client import RemoteObjStorage
from swh.storage.objstorage.api.server import app
TEST_DIR = os.path.dirname(os.path.abspath(__file__))
TEST_DATA_DIR = os.path.join(TEST_DIR, '../../../../swh-storage-testdata')
@attr('db')
class TestArchiver(DbTestFixture, ServerTestFixture,
unittest.TestCase):
""" Test the objstorage archiver.
"""
TEST_DB_DUMP = os.path.join(TEST_DATA_DIR, 'dumps/swh.dump')
def setUp(self):
# Launch the backup server
self.backup_objroot = tempfile.mkdtemp(prefix='remote')
self.config = {'storage_base': self.backup_objroot,
- 'storage_depth': 3}
+ 'storage_slicing': '0:2/2:4/4:6'}
self.app = app
super().setUp()
# Launch a client to check objects presence
self.remote_objstorage = RemoteObjStorage(self.url())
# Create the local storage.
self.objroot = tempfile.mkdtemp(prefix='local')
self.storage = Storage(self.conn, self.objroot)
# Initializes and fill the tables.
self.initialize_tables()
# Create the archiver
self.archiver = self.__create_director()
self.storage_data = ('Local', 'http://localhost:%s/' % self.port)
def tearDown(self):
self.empty_tables()
super().tearDown()
def initialize_tables(self):
""" Initializes the database with a sample of items.
"""
# Add an archive
self.cursor.execute("""INSERT INTO archives(id, url)
VALUES('Local', 'http://localhost:{}/')
""".format(self.port))
self.conn.commit()
def empty_tables(self):
# Remove all content
self.cursor.execute('DELETE FROM content_archive')
self.cursor.execute('DELETE FROM archives')
self.conn.commit()
def __add_content(self, content_data, status='missing', date='now()'):
# Add the content
content = hashutil.hashdata(content_data)
content.update({'data': content_data})
self.storage.content_add([content])
# Then update database
content_id = r'\x' + hashutil.hash_to_hex(content['sha1'])
self.cursor.execute("""INSERT INTO content_archive
VALUES('%s'::sha1, 'Local', '%s', %s)
""" % (content_id, status, date))
return content['sha1']
def __get_missing(self):
self.cursor.execute("""SELECT content_id
FROM content_archive
WHERE status='missing'""")
return self.cursor.fetchall()
def __create_director(self, batch_size=5000, archival_max_age=3600,
retention_policy=1, asynchronous=False):
config = {
'objstorage_path': self.objroot,
'batch_max_size': batch_size,
'archival_max_age': archival_max_age,
'retention_policy': retention_policy,
'asynchronous': asynchronous # Avoid depending on queue for tests.
}
director = ArchiverDirector(self.conn, config)
return director
def __create_worker(self, batch={}, config={}):
mstorage_args = [self.archiver.master_storage.db.conn,
self.objroot]
slaves = [self.storage_data]
if not config:
config = self.archiver.config
return ArchiverWorker(batch, mstorage_args, slaves, config)
# Integration test
@istest
def archive_missing_content(self):
""" Run archiver on a missing content should archive it.
"""
content_data = b'archive_missing_content'
id = self.__add_content(content_data)
# After the run, the content should be in the archive.
self.archiver.run()
remote_data = self.remote_objstorage.content_get(id)
self.assertEquals(content_data, remote_data)
@istest
def archive_present_content(self):
""" A content that is not 'missing' shouldn't be archived.
"""
id = self.__add_content(b'archive_present_content', status='present')
# After the run, the content should NOT be in the archive.*
self.archiver.run()
with self.assertRaises(ObjNotFoundError):
self.remote_objstorage.content_get(id)
@istest
def archive_already_enough(self):
""" A content missing with enough copies shouldn't be archived.
"""
id = self.__add_content(b'archive_alread_enough')
director = self.__create_director(retention_policy=0)
director.run()
with self.assertRaises(ObjNotFoundError):
self.remote_objstorage.content_get(id)
# Unit test for ArchiverDirector
def vstatus(self, status, mtime):
return self.archiver.get_virtual_status(status, mtime)
@istest
def vstatus_present(self):
self.assertEquals(
self.vstatus('present', None),
'present'
)
@istest
def vstatus_missing(self):
self.assertEquals(
self.vstatus('missing', None),
'missing'
)
@istest
def vstatus_ongoing_remaining(self):
current_time = datetime.now()
self.assertEquals(
self.vstatus('ongoing', current_time),
'present'
)
@istest
def vstatus_ongoing_elapsed(self):
past_time = datetime.now() - timedelta(
seconds=self.archiver.config['archival_max_age'] + 1
)
self.assertEquals(
self.vstatus('ongoing', past_time),
'missing'
)
# Unit tests for archive worker
@istest
def need_archival_missing(self):
""" A content should still need archival when it is missing.
"""
id = self.__add_content(b'need_archival_missing', status='missing')
id = r'\x' + hashutil.hash_to_hex(id)
worker = self.__create_worker()
self.assertEqual(worker.need_archival(id, self.storage_data), True)
@istest
def need_archival_present(self):
""" A content should still need archival when it is missing
"""
id = self.__add_content(b'need_archival_missing', status='present')
id = r'\x' + hashutil.hash_to_hex(id)
worker = self.__create_worker()
self.assertEqual(worker.need_archival(id, self.storage_data), False)
@istest
def need_archival_ongoing_remaining(self):
""" An ongoing archival with remaining time shouldnt need archival.
"""
id = self.__add_content(b'need_archival_ongoing_remaining',
status='ongoing', date="'%s'" % datetime.now())
id = r'\x' + hashutil.hash_to_hex(id)
worker = self.__create_worker()
self.assertEqual(worker.need_archival(id, self.storage_data), False)
@istest
def need_archival_ongoing_elasped(self):
""" An ongoing archival with elapsed time should be scheduled again.
"""
id = self.__add_content(
b'archive_ongoing_elapsed',
status='ongoing',
date="'%s'" % (datetime.now() - timedelta(
seconds=self.archiver.config['archival_max_age'] + 1
))
)
id = r'\x' + hashutil.hash_to_hex(id)
worker = self.__create_worker()
self.assertEqual(worker.need_archival(id, self.storage_data), True)
@istest
def content_sorting_by_archiver(self):
""" Check that the content is correctly sorted.
"""
batch = {
'id1': {
'present': [('slave1', 'slave1_url')],
'missing': []
},
'id2': {
'present': [],
'missing': [('slave1', 'slave1_url')]
}
}
worker = self.__create_worker(batch=batch)
mapping = worker.sort_content_by_archive()
self.assertNotIn('id1', mapping[('slave1', 'slave1_url')])
self.assertIn('id2', mapping[('slave1', 'slave1_url')])
diff --git a/swh/storage/tests/test_checker.py b/swh/storage/tests/test_checker.py
index 95e96a1f9..3069abe5c 100644
--- a/swh/storage/tests/test_checker.py
+++ b/swh/storage/tests/test_checker.py
@@ -1,128 +1,128 @@
# Copyright (C) 2015-2016 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import gzip
import tempfile
import unittest
from nose.tools import istest
from nose.plugins.attrib import attr
from swh.core import hashutil
from swh.storage.checker.checker import ContentChecker
class MockBackupStorage():
def __init__(self):
self.values = {}
def content_add(self, id, value):
self.values[id] = value
def content_get(self, ids):
for id in ids:
try:
data = self.values[id]
except KeyError:
yield None
continue
yield {'sha1': id, 'data': data}
@attr('fs')
class TestChecker(unittest.TestCase):
""" Test the content integrity checker
"""
def setUp(self):
super().setUp()
# Connect to an objstorage
config = {'batch_size': 10}
path = tempfile.mkdtemp()
- depth = 3
- self.checker = ContentChecker(config, path, depth, 'http://None')
+ slicing = '0:2/2:4/4:6'
+ self.checker = ContentChecker(config, path, slicing, 'http://None')
self.checker.backup_storages = [MockBackupStorage(),
MockBackupStorage()]
def corrupt_content(self, id):
""" Make the given content invalid.
"""
hex_id = hashutil.hash_to_hex(id)
file_path = self.checker.objstorage._obj_path(hex_id)
with gzip.open(file_path, 'wb') as f:
f.write(b'Unexpected content')
@istest
def check_valid_content(self):
# Check that a valid content is valid.
content = b'check_valid_content'
id = self.checker.objstorage.add(content)
self.assertTrue(self.checker.check_content(id))
@istest
def check_invalid_content(self):
# Check that an invalid content is noticed.
content = b'check_invalid_content'
id = self.checker.objstorage.add(content)
self.corrupt_content(id)
self.assertFalse(self.checker.check_content(id))
@istest
def repair_content_present_first(self):
# Try to repair a content that is in the backup storage.
content = b'repair_content_present_first'
id = self.checker.objstorage.add(content)
# Add a content to the mock
self.checker.backup_storages[0].content_add(id, content)
# Corrupt and repair it.
self.corrupt_content(id)
self.assertFalse(self.checker.check_content(id))
self.checker.repair_contents([id])
self.assertTrue(self.checker.check_content(id))
@istest
def repair_content_present_second(self):
# Try to repair a content that is not in the first backup storage.
content = b'repair_content_present_second'
id = self.checker.objstorage.add(content)
# Add a content to the mock
self.checker.backup_storages[1].content_add(id, content)
# Corrupt and repair it.
self.corrupt_content(id)
self.assertFalse(self.checker.check_content(id))
self.checker.repair_contents([id])
self.assertTrue(self.checker.check_content(id))
@istest
def repair_content_present_distributed(self):
# Try to repair two contents that are in separate backup storages.
content1 = b'repair_content_present_distributed_2'
content2 = b'repair_content_present_distributed_1'
id1 = self.checker.objstorage.add(content1)
id2 = self.checker.objstorage.add(content2)
# Add content to the mock.
self.checker.backup_storages[0].content_add(id1, content1)
self.checker.backup_storages[0].content_add(id2, content2)
# Corrupt and repair it
self.corrupt_content(id1)
self.corrupt_content(id2)
self.assertFalse(self.checker.check_content(id1))
self.assertFalse(self.checker.check_content(id2))
self.checker.repair_contents([id1, id2])
self.assertTrue(self.checker.check_content(id1))
self.assertTrue(self.checker.check_content(id2))
@istest
def repair_content_missing(self):
# Try to repair a content that is NOT in the backup storage.
content = b'repair_content_present'
id = self.checker.objstorage.add(content)
# Corrupt and repair it.
self.corrupt_content(id)
self.assertFalse(self.checker.check_content(id))
self.checker.repair_contents([id])
self.assertFalse(self.checker.check_content(id))
diff --git a/swh/storage/tests/test_objstorage_api.py b/swh/storage/tests/test_objstorage_api.py
index 6676cd7c5..4e43f18fd 100644
--- a/swh/storage/tests/test_objstorage_api.py
+++ b/swh/storage/tests/test_objstorage_api.py
@@ -1,97 +1,88 @@
# Copyright (C) 2015 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
-import os
import tempfile
import unittest
from nose.tools import istest
from nose.plugins.attrib import attr
from swh.core import hashutil
from swh.storage.exc import ObjNotFoundError, Error
from swh.storage.tests.server_testing import ServerTestFixture
from swh.storage.objstorage.api.client import RemoteObjStorage
from swh.storage.objstorage.api.server import app
@attr('db')
class TestRemoteObjStorage(ServerTestFixture, unittest.TestCase):
""" Test the remote archive API.
"""
def setUp(self):
self.config = {'storage_base': tempfile.mkdtemp(),
- 'storage_depth': 3}
+ 'storage_slicing': '0:1/0:5'}
self.app = app
super().setUp()
self.objstorage = RemoteObjStorage(self.url())
def tearDown(self):
super().tearDown()
@istest
def content_add(self):
content = bytes('Test content', 'utf8')
id = self.objstorage.content_add(content)
self.assertEquals(self.objstorage.content_get(id), content)
@istest
def content_get_present(self):
content = bytes('content_get_present', 'utf8')
content_hash = hashutil.hashdata(content)
id = self.objstorage.content_add(content)
self.assertEquals(content_hash['sha1'], id)
@istest
def content_get_missing(self):
content = bytes('content_get_missing', 'utf8')
content_hash = hashutil.hashdata(content)
with self.assertRaises(ObjNotFoundError):
self.objstorage.content_get(content_hash['sha1'])
@istest
def content_get_random(self):
ids = []
for i in range(100):
content = bytes('content_get_present', 'utf8')
id = self.objstorage.content_add(content)
ids.append(id)
for id in self.objstorage.content_get_random(50):
self.assertIn(id, ids)
@istest
def content_check_invalid(self):
content = bytes('content_check_invalid', 'utf8')
- id = self.objstorage.content_add(content)
- hex_obj_id = hashutil.hash_to_hex(id)
- dir_path = os.path.join(
- self.config['storage_base'],
- *[hex_obj_id[i*2:i*2+2]
- for i in range(int(self.config['storage_depth']))]
- )
- path = os.path.join(dir_path, hex_obj_id)
- content = list(content)
- with open(path, 'bw') as f:
- content[0] = (content[0] + 1) % 128
- f.write(bytes(content))
+ invalid_id = hashutil.hashdata(b'invalid content')['sha1']
+ # Add the content with an invalid id.
+ self.objstorage.content_add(content, invalid_id)
+ # Then check it and expect an error.
with self.assertRaises(Error):
- self.objstorage.content_check(id)
+ self.objstorage.content_check(invalid_id)
@istest
def content_check_valid(self):
content = bytes('content_check_valid', 'utf8')
id = self.objstorage.content_add(content)
try:
self.objstorage.content_check(id)
except:
self.fail('Integrity check failed')
@istest
def content_check_missing(self):
content = bytes('content_check_valid', 'utf8')
content_hash = hashutil.hashdata(content)
with self.assertRaises(ObjNotFoundError):
self.objstorage.content_check(content_hash['sha1'])
diff --git a/swh/storage/tests/test_objstorage_pathslicing.py b/swh/storage/tests/test_objstorage_pathslicing.py
index cc17a52ee..aedd63ec7 100644
--- a/swh/storage/tests/test_objstorage_pathslicing.py
+++ b/swh/storage/tests/test_objstorage_pathslicing.py
@@ -1,78 +1,76 @@
# Copyright (C) 2015-2016 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import tempfile
import unittest
from nose.tools import istest
from swh.core import hashutil
from swh.storage import exc
from swh.storage.objstorage import PathSlicingObjStorage
from objstorage_testing import ObjStorageTestFixture
class TestpathSlicingObjStorage(ObjStorageTestFixture, unittest.TestCase):
def setUp(self):
super().setUp()
- self.depth = 3
- self.slicing = 2
+ self.slicing = '0:2/2:4/4:6'
self.tmpdir = tempfile.mkdtemp()
- self.storage = PathSlicingObjStorage(self.tmpdir, self.depth,
- self.slicing)
+ self.storage = PathSlicingObjStorage(self.tmpdir, self.slicing)
def content_path(self, obj_id):
hex_obj_id = hashutil.hash_to_hex(obj_id)
return self.storage._obj_path(hex_obj_id)
@istest
def contains(self):
content_p, obj_id_p = self.hash_content(b'contains_present')
content_m, obj_id_m = self.hash_content(b'contains_missing')
self.storage.add(content_p, obj_id=obj_id_p)
self.assertIn(obj_id_p, self.storage)
self.assertNotIn(obj_id_m, self.storage)
@istest
def iter(self):
content, obj_id = self.hash_content(b'iter')
self.assertEqual(list(iter(self.storage)), [])
self.storage.add(content, obj_id=obj_id)
self.assertEqual(list(iter(self.storage)), [obj_id])
@istest
def len(self):
content, obj_id = self.hash_content(b'check_not_gzip')
self.assertEqual(len(self.storage), 0)
self.storage.add(content, obj_id=obj_id)
self.assertEqual(len(self.storage), 1)
@istest
def check_not_gzip(self):
content, obj_id = self.hash_content(b'check_not_gzip')
self.storage.add(content, obj_id=obj_id)
with open(self.content_path(obj_id), 'ab') as f: # Add garbage.
f.write(b'garbage')
with self.assertRaises(exc.Error):
self.storage.check(obj_id)
@istest
def check_id_mismatch(self):
content, obj_id = self.hash_content(b'check_id_mismatch')
self.storage.add(content, obj_id=obj_id)
with open(self.content_path(obj_id), 'wb') as f:
f.write(b'unexpected content')
with self.assertRaises(exc.Error):
self.storage.check(obj_id)
@istest
def get_random_contents(self):
content, obj_id = self.hash_content(b'get_random_content')
self.storage.add(content, obj_id=obj_id)
random_contents = list(self.storage.get_random(1))
self.assertEqual(1, len(random_contents))
self.assertIn(obj_id, random_contents)
diff --git a/version.txt b/version.txt
index c4346227d..5bf531f1f 100644
--- a/version.txt
+++ b/version.txt
@@ -1 +1 @@
-v0.0.40-0-g8c6e8d7
\ No newline at end of file
+v0.0.41-0-g68abde3
\ No newline at end of file

File Metadata

Mime Type
text/x-diff
Expires
Jul 4 2025, 6:06 PM (4 w, 3 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3246610

Event Timeline