Changeset View
Changeset View
Standalone View
Standalone View
swh/storage/tests/test_objstorage_api.py
- This file was added.
# Copyright (C) 2015 The Software Heritage developers | |||||
# See the AUTHORS file at the top-level directory of this distribution | |||||
# License: GNU General Public License version 3, or any later version | |||||
# See top-level LICENSE file for more information | |||||
import multiprocessing | |||||
import socket | |||||
import time | |||||
import tempfile | |||||
import unittest | |||||
from urllib.request import urlopen | |||||
from nose.tools import istest | |||||
from nose.plugins.attrib import attr | |||||
from swh.core import hashutil | |||||
from swh.storage.exc import ObjNotFoundError, Error | |||||
from swh.storage.objstorage.objstorage import _obj_path | |||||
from swh.storage.objstorage.api.client import RemoteArchive | |||||
from swh.storage.objstorage.api.server import app | |||||
@attr('!db') | |||||
class TestRemoteArchive(unittest.TestCase): | |||||
zack: this should be TestRemoteObjStorage | |||||
""" Test the remote archive API. | |||||
""" | |||||
def setUp(self): | |||||
self.start_server() | |||||
self.objstorage = RemoteArchive(self.url()) | |||||
def tearDown(self): | |||||
self.stop_server() | |||||
def url(self): | |||||
return 'http://127.0.0.1:%d/' % self.port | |||||
def start_server(self): | |||||
""" Spawn the API server using multiprocessing. | |||||
""" | |||||
self.process = None | |||||
# WSGI app configuration | |||||
self.app = app | |||||
self.app.config['storage_base'] = tempfile.mkdtemp() | |||||
self.app.config['storage_depth'] = 3 | |||||
# Get an available port number | |||||
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) | |||||
sock.bind(('127.0.0.1', 0)) | |||||
self.port = sock.getsockname()[1] | |||||
sock.close() | |||||
# Worker function for multiprocessing | |||||
def worker(app, port): | |||||
return app.run(port=port, use_reloader=False) | |||||
self.process = multiprocessing.Process( | |||||
target=worker, args=(self.app, self.port) | |||||
) | |||||
self.process.start() | |||||
# Wait max 5 seconds for server to spawn | |||||
i = 0 | |||||
while i < 20: | |||||
try: | |||||
urlopen(self.url()) | |||||
except Exception: | |||||
i += 1 | |||||
time.sleep(0.25) | |||||
else: | |||||
return | |||||
def stop_server(self): | |||||
""" Terminate the API server's process. | |||||
""" | |||||
if self.process: | |||||
self.process.terminate() | |||||
zackUnsubmitted Done Inline Actionsthis is copy/paste from test_api_client. We shouldn't do that. zack: this is copy/paste from test_api_client. We shouldn't do that.
Instead, we should write a… | |||||
@istest | |||||
def content_add(self): | |||||
content = bytes('Test content', 'utf8') | |||||
id = self.objstorage.content_add(content) | |||||
self.assertEquals(self.objstorage.content_get(id), content) | |||||
@istest | |||||
def content_get_present(self): | |||||
content = bytes('content_get_present', 'utf8') | |||||
content_hash = hashutil.hashdata(content) | |||||
id = self.objstorage.content_add(content) | |||||
self.assertEquals(content_hash['sha1'], id) | |||||
@istest | |||||
def content_get_missing(self): | |||||
content = bytes('content_get_missing', 'utf8') | |||||
content_hash = hashutil.hashdata(content) | |||||
with self.assertRaises(ObjNotFoundError): | |||||
self.objstorage.content_get(content_hash['sha1']) | |||||
@istest | |||||
def content_check_invalid(self): | |||||
content = bytes('content_check_invalid', 'utf8') | |||||
id = self.objstorage.content_add(content) | |||||
path = _obj_path(hashutil.hash_to_hex(id), | |||||
self.app.config['storage_base'], | |||||
self.app.config['storage_depth']) | |||||
content = list(content) | |||||
with open(path, 'bw') as f: | |||||
content[0] = (content[0] + 1) % 128 | |||||
f.write(bytes(content)) | |||||
with self.assertRaises(Error): | |||||
self.objstorage.content_check(id) | |||||
@istest | |||||
def content_check_valid(self): | |||||
content = bytes('content_check_valid', 'utf8') | |||||
id = self.objstorage.content_add(content) | |||||
try: | |||||
self.objstorage.content_check(id) | |||||
except: | |||||
self.fail('Integrity check failed') | |||||
@istest | |||||
def content_check_missing(self): | |||||
content = bytes('content_check_valid', 'utf8') | |||||
content_hash = hashutil.hashdata(content) | |||||
with self.assertRaises(ObjNotFoundError): | |||||
self.objstorage.content_check(content_hash['sha1']) |
this should be TestRemoteObjStorage