Page Menu
Home
Software Heritage
Search
Configure Global Search
Log In
Files
F9337055
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
82 KB
Subscribers
None
View Options
diff --git a/swh/loader/git/local_store.py b/swh/loader/git/local_store.py
index 2e0446c..9b715c4 100644
--- a/swh/loader/git/local_store.py
+++ b/swh/loader/git/local_store.py
@@ -1,109 +1,109 @@
# Copyright (C) 2015 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
-from swh.core.conf import reader
+from swh.core.git.conf import reader
from swh.loader.git.storage import storage, db, service
from swh.storage.objstorage import ObjStorage
# FIXME: duplicated from bin/swh-backend...
# Default configuration file
DEFAULT_CONF_FILE = '~/.config/swh/back.ini'
# default configuration
DEFAULT_CONF = {
'content_storage_dir': ('string', '/tmp/swh-loader-git/content-storage'),
'log_dir': ('string', '/tmp/swh-loader-git/log'),
'db_url': ('string', 'dbname=softwareheritage-dev'),
'folder_depth': ('int', 4),
'debug': ('bool', None),
'host': ('string', '127.0.0.1'),
'port': ('int', 5000)
}
def store_only_new(db_conn, conf, obj_type, obj):
"""Store object if not already present.
"""
if not storage.find(db_conn, obj['id'], obj_type):
storage.add(db_conn, conf, obj)
_obj_to_persist_fn = {storage.Type.revision: service.add_revisions}
def store_unknown_objects(db_conn, conf, obj_type, swhmap):
"""Load objects to the backend.
"""
sha1s = swhmap.keys()
# have: filter unknown obj
unknown_obj_sha1s = service.filter_unknowns_type(db_conn, obj_type, sha1s)
if not unknown_obj_sha1s:
return True
# seen: now store in backend
persist_fn = _obj_to_persist_fn.get(obj_type, service.add_objects)
obj_fulls = map(swhmap.get, unknown_obj_sha1s)
return persist_fn(db_conn, conf, obj_type, obj_fulls)
def load_to_back(conf, swh_repo):
"""Load to the backend the repository swh_repo.
"""
with db.connect(conf['db_url']) as db_conn:
# First, store/retrieve the origin identifier
# FIXME: should be done by the cloner worker (which is not yet plugged
# on the right swh db ftm)
service.add_origin(db_conn, swh_repo.get_origin())
# First reference all unknown persons
service.add_persons(db_conn, conf, storage.Type.person,
swh_repo.get_persons())
res = store_unknown_objects(db_conn, conf,
storage.Type.content,
swh_repo.get_contents())
if res:
res = store_unknown_objects(db_conn, conf,
storage.Type.directory,
swh_repo.get_directories())
if res:
res = store_unknown_objects(db_conn, conf,
storage.Type.revision,
swh_repo.get_revisions())
if res:
# brutally send all remaining occurrences
service.add_objects(db_conn, conf,
storage.Type.occurrence,
swh_repo.get_occurrences())
# and releases (the idea here is that compared to existing
# objects, the quantity is less)
service.add_objects(db_conn, conf,
storage.Type.release,
swh_repo.get_releases())
def prepare_and_load_to_back(backend_setup_file, swh_repo):
"""Prepare and load to back the swh_repo.
backend-setup-file is the backend's setup to load to access the db and file
storage.
"""
# Read the configuration file (no check yet)
conf = reader.read(backend_setup_file or DEFAULT_CONF_FILE, DEFAULT_CONF)
reader.prepare_folders(conf, 'content_storage_dir')
conf.update({
'objstorage': ObjStorage(conf['content_storage_dir'],
conf['folder_depth'])
})
load_to_back(conf, swh_repo)
diff --git a/swh/loader/git/tests/test_api_content.py b/swh/loader/git/tests/test_api_content.py
index 873c719..21aef7b 100644
--- a/swh/loader/git/tests/test_api_content.py
+++ b/swh/loader/git/tests/test_api_content.py
@@ -1,114 +1,114 @@
# Copyright (C) 2015 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import unittest
from nose.tools import istest
from nose.plugins.attrib import attr
from swh.loader.git.storage import db, models
from swh.loader.git.protocols import serial
from test_utils import app_client, app_client_teardown
from swh.core import hashutil
@attr('slow')
class ContentTestCase(unittest.TestCase):
@classmethod
def setUpClass(self):
self.app, db_url, self.content_storage_dir = app_client()
with db.connect(db_url) as db_conn:
self.content_sha1_id = '37bfdafafe3b16d970125df29e6ec9a3d2521fd2'
self.content_sha1_id_bin = hashutil.hex_to_hash(self.content_sha1_id)
content_sha1_id = '47bfdafafe3b16d970125df29e6ec9a3d2521fd2'
content_sha1_id_bin = hashutil.hex_to_hash(content_sha1_id)
self.content_sha256_bin = hashutil.hashdata(b'something-to-hash', ['sha256'])['sha256']
models.add_content(db_conn,
self.content_sha1_id_bin,
content_sha1_id_bin,
self.content_sha256_bin,
10)
@classmethod
def tearDownClass(self):
app_client_teardown(self.content_storage_dir)
@istest
def get_content_ok(self):
# when
rv = self.app.get('/vcs/contents/%s' % self.content_sha1_id)
# then
- assert rv.status_code == 200
+ self.assertEquals(rv.status_code, 200)
data = serial.loads(rv.data)
- assert data['id'] == self.content_sha1_id
+ self.assertEquals(data['id'], self.content_sha1_id)
@istest
def get_content_not_found(self):
# when
rv = self.app.get('/vcs/contents/222222f9dd5dc46ee476a8be155ab049994f7170')
# then
- assert rv.status_code == 404
- assert rv.data == b'Not found!'
+ self.assertEquals(rv.status_code, 404)
+ self.assertEquals(rv.data, b'Not found!')
@istest
def get_content_not_found_with_bad_format(self):
# when
rv = self.app.get('/vcs/contents/1')
# then
- assert rv.status_code == 404
- assert rv.data == b'Not found!'
+ self.assertEquals(rv.status_code, 404)
+ self.assertEquals(rv.data, b'Not found!')
@istest
def put_content_create_and_update(self):
content_sha1 = '57bfdafafe3b16d970125df29e6ec9a3d2521fd2'
content_git_sha1 = hashutil.hex_to_hash('57bfdafafe3b16d970125df29e6ec9a3d2521fd2')
content_sha1_bin = hashutil.hex_to_hash(content_sha1)
content_sha256_bin = hashutil.hashdata(b'another-thing-to-hash', ['sha256'])['sha256']
# does not exist
rv = self.app.get('/vcs/contents/%s' % content_sha1)
# then
- assert rv.status_code == 404
- assert rv.data == b'Not found!'
+ self.assertEquals(rv.status_code, 404)
+ self.assertEquals(rv.data, b'Not found!')
# we create it
body = {'id': content_sha1_bin,
'git-sha1': content_git_sha1,
'content-sha256': content_sha256_bin,
'content': b'bar',
'size': '3'}
rv = self.app.put('/vcs/contents/%s' % content_sha1,
data=serial.dumps(body),
headers={'Content-Type': serial.MIMETYPE})
- assert rv.status_code == 204
- assert rv.data == b''
+ self.assertEquals(rv.status_code, 204)
+ self.assertEquals(rv.data, b'')
# now it exists
rv = self.app.get('/vcs/contents/%s' % content_sha1)
# then
- assert rv.status_code == 200
- assert serial.loads(rv.data)['id'] == content_sha1
+ self.assertEquals(rv.status_code, 200)
+ self.assertEquals(serial.loads(rv.data)['id'], content_sha1)
# we update it
rv = self.app.put('/vcs/contents/%s' % content_sha1,
data=serial.dumps(body),
headers={'Content-Type': serial.MIMETYPE})
- assert rv.status_code == 204
- assert rv.data == b''
+ self.assertEquals(rv.status_code, 204)
+ self.assertEquals(rv.data, b'')
# still the same
rv = self.app.get('/vcs/contents/%s' % content_sha1)
# then
- assert rv.status_code == 200
- assert serial.loads(rv.data)['id'] == content_sha1
+ self.assertEquals(rv.status_code, 200)
+ self.assertEquals(serial.loads(rv.data)['id'], content_sha1)
diff --git a/swh/loader/git/tests/test_api_directory.py b/swh/loader/git/tests/test_api_directory.py
index 358e1fe..fc6d600 100644
--- a/swh/loader/git/tests/test_api_directory.py
+++ b/swh/loader/git/tests/test_api_directory.py
@@ -1,132 +1,132 @@
# Copyright (C) 2015 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import unittest
from nose.tools import istest
from nose.plugins.attrib import attr
from swh.loader.git.storage import db, models, storage
from swh.loader.git.protocols import serial
from test_utils import app_client, app_client_teardown
from swh.core import hashutil
@attr('slow')
class DirectoryTestCase(unittest.TestCase):
@classmethod
def setUpClass(self):
self.app, db_url, self.content_storage_dir = app_client()
with db.connect(db_url) as db_conn:
self.content_sha1_id = hashutil.hex_to_hash('e5ba97de299a0e1e26b4a471b3d67c098d178e6e')
content_sha1_bin = hashutil.hex_to_hash('d5ba97de299a0e1e26b4a471b3d67c098d178e6e')
content_sha256_bin = hashutil.hashdata(b'something-to-hash', ['sha256'])['sha256']
models.add_content(db_conn,
self.content_sha1_id,
content_sha1_bin,
content_sha256_bin,
10)
self.directory_sha1_hex = 'b5ba97de299a0e1e26b4a471b3d67c098d178e6e'
directory_sha1_bin = hashutil.hex_to_hash(self.directory_sha1_hex)
models.add_directory(db_conn, directory_sha1_bin)
self.directory_sha1_put = 'a5ba97de299a0e1e26b4a471b3d67c098d178e6e'
self.directory_sha1_put_bin = hashutil.hex_to_hash(self.directory_sha1_put)
models.add_directory(db_conn, self.directory_sha1_put_bin)
@classmethod
def tearDownClass(self):
app_client_teardown(self.content_storage_dir)
@istest
def get_directory_ok(self):
# when
rv = self.app.get('/vcs/directories/%s' % self.directory_sha1_hex)
# then
- assert rv.status_code == 200
- assert serial.loads(rv.data)['id'] == self.directory_sha1_hex
+ self.assertEquals(rv.status_code, 200)
+ self.assertEquals(serial.loads(rv.data)['id'], self.directory_sha1_hex)
@istest
def get_directory_not_found(self):
# when
rv = self.app.get('/vcs/directories/111111f9dd5dc46ee476a8be155ab049994f7170')
# then
- assert rv.status_code == 404
- assert rv.data == b'Not found!'
+ self.assertEquals(rv.status_code, 404)
+ self.assertEquals(rv.data, b'Not found!')
@istest
def get_directory_not_found_with_bad_format(self):
# when
rv = self.app.get('/vcs/directories/1')
# then
- assert rv.status_code == 404
- assert rv.data == b'Not found!'
+ self.assertEquals(rv.status_code, 404)
+ self.assertEquals(rv.data, b'Not found!')
@istest
def put_directory_create_and_update(self):
directory_sha1 = '15ba97de299a0e1e26b4a471b3d67c098d178e6e'
# does not exist
rv = self.app.get('/vcs/directories/%s' % directory_sha1)
# then
- assert rv.status_code == 404
- assert rv.data == b'Not found!'
+ self.assertEquals(rv.status_code, 404)
+ self.assertEquals(rv.data, b'Not found!')
# we create it
body = serial.dumps({'entry-files': [{'name': 'filename',
'type': storage.Type.directory_entry,
'target-sha1': self.content_sha1_id,
'perms': '000',
'atime': None,
'mtime': None,
'ctime': None}],
'entry-dirs': [{'name': 'dirname',
'type': storage.Type.directory_entry,
'target-sha1': self.directory_sha1_put_bin,
'perms': '012',
'atime': None,
'mtime': None,
'ctime': None}],
'entry-revs': [{'name': "rev-name",
'type': storage.Type.directory_entry,
'target-sha1': hashutil.hex_to_hash('35ba97de299a0e1e26b4a471b3d67c098d178e6e'),
'perms': '000',
'atime': None,
'mtime': None,
'ctime': None}]
})
rv = self.app.put('/vcs/directories/%s' % directory_sha1,
data=body,
headers={'Content-Type': serial.MIMETYPE})
print(rv.status_code)
- assert rv.status_code == 204
- assert rv.data == b''
+ self.assertEquals(rv.status_code, 204)
+ self.assertEquals(rv.data, b'')
# now it exists
rv = self.app.get('/vcs/directories/%s' % directory_sha1)
# then
- assert rv.status_code == 200
- assert serial.loads(rv.data)['id'] == directory_sha1
+ self.assertEquals(rv.status_code, 200)
+ self.assertEquals(serial.loads(rv.data)['id'], directory_sha1)
# we update it
rv = self.app.put('/vcs/directories/%s' % directory_sha1,
data=serial.dumps({'entry-files': 'directory-bar'}),
headers={'Content-Type': serial.MIMETYPE})
- assert rv.status_code == 204
- assert rv.data == b''
+ self.assertEquals(rv.status_code, 204)
+ self.assertEquals(rv.data, b'')
# still the same
rv = self.app.get('/vcs/directories/%s' % directory_sha1)
# then
- assert rv.status_code == 200
- assert serial.loads(rv.data)['id'] == directory_sha1
+ self.assertEquals(rv.status_code, 200)
+ self.assertEquals(serial.loads(rv.data)['id'], directory_sha1)
diff --git a/swh/loader/git/tests/test_api_home.py b/swh/loader/git/tests/test_api_home.py
index 99f7edc..8829c29 100644
--- a/swh/loader/git/tests/test_api_home.py
+++ b/swh/loader/git/tests/test_api_home.py
@@ -1,44 +1,44 @@
# Copyright (C) 2015 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import unittest
from nose.tools import istest
from nose.plugins.attrib import attr
from test_utils import app_client
@attr('slow')
class HomeTestCase(unittest.TestCase):
@classmethod
def setUpClass(self):
self.app, _, _ = app_client()
@istest
def get_slash(self):
# when
rv = self.app.get('/')
# then
- assert rv.status_code == 200
- assert rv.data == b'Dev SWH API'
+ self.assertEquals(rv.status_code, 200)
+ self.assertEquals(rv.data, b'Dev SWH API')
@istest
def get_404(self):
# when
rv = self.app.get('/nowhere')
# then
- assert rv.status_code == 404
+ self.assertEquals(rv.status_code, 404)
@istest
def get_bad_request(self):
# when
rv = self.app.get('/vcs/not-a-good-type/1')
# then
- assert rv.status_code == 400
- assert rv.data == b'Bad request!'
+ self.assertEquals(rv.status_code, 400)
+ self.assertEquals(rv.data, b'Bad request!')
diff --git a/swh/loader/git/tests/test_api_occurrence.py b/swh/loader/git/tests/test_api_occurrence.py
index f2b8723..6d7f422 100644
--- a/swh/loader/git/tests/test_api_occurrence.py
+++ b/swh/loader/git/tests/test_api_occurrence.py
@@ -1,145 +1,145 @@
# Copyright (C) 2015 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import unittest
from nose.tools import istest
from nose.plugins.attrib import attr
from swh.loader.git.storage import db, models
from swh.loader.git.protocols import serial
from test_utils import now, app_client, app_client_teardown
from swh.core import hashutil
@attr('slow')
class OccurrenceTestCase(unittest.TestCase):
@classmethod
def setUpClass(self):
self.app, db_url, self.content_storage_dir = app_client()
with db.connect(db_url) as db_conn:
self.directory_sha1_hex = '0876886dc3b49ebe1043e116727ae781be7c8583'
self.directory_sha1_bin = hashutil.hex_to_hash(self.directory_sha1_hex)
models.add_directory(db_conn, self.directory_sha1_bin)
authorAndCommitter = {'name': 'some-name', 'email': 'some-email'}
models.add_person(db_conn, authorAndCommitter['name'], authorAndCommitter['email'])
self.revision_sha1_hex = '1876886dc3b49ebe1043e116727ae781be7c8583'
self.revision_sha1_bin = hashutil.hex_to_hash(self.revision_sha1_hex)
models.add_revision(db_conn,
self.revision_sha1_bin,
now(),
now(),
self.directory_sha1_bin,
"revision message",
authorAndCommitter,
authorAndCommitter)
self.origin_url = "https://github.com/user/repo"
models.add_origin(db_conn, self.origin_url, 'git')
self.branch_name = 'master'
models.add_occurrence_history(db_conn,
self.origin_url,
self.branch_name,
self.revision_sha1_bin,
'softwareheritage')
self.branch_name2 = 'master2'
models.add_occurrence_history(db_conn,
self.origin_url,
self.branch_name2,
self.revision_sha1_bin,
'softwareheritage')
self.revision_sha1_hex_2 = '2876886dc3b49ebe1043e116727ae781be7c8583'
self.revision_sha1_bin_2 = hashutil.hex_to_hash(self.revision_sha1_hex_2)
models.add_revision(db_conn,
self.revision_sha1_bin_2,
now(),
now(),
self.directory_sha1_bin,
"revision message 2",
authorAndCommitter,
authorAndCommitter)
@classmethod
def tearDownClass(self):
app_client_teardown(self.content_storage_dir)
@istest
def get_occurrence_ok(self):
# when
rv = self.app.get('/vcs/occurrences/%s' % self.revision_sha1_hex)
# then
- assert rv.status_code == 200
+ self.assertEquals(rv.status_code, 200)
branches = serial.loads(rv.data)
- assert len(branches) == 2
- assert self.branch_name in branches
- assert self.branch_name2 in branches
+ self.assertEquals(len(branches), 2)
+ self.assertIn(self.branch_name, branches)
+ self.assertIn(self.branch_name2, branches)
@istest
def get_occurrence_not_found(self):
# when
rv = self.app.get('/vcs/occurrences/inexistant-sha1')
# then
- assert rv.status_code == 404
- assert rv.data == b'Not found!'
+ self.assertEquals(rv.status_code, 404)
+ self.assertEquals(rv.data, b'Not found!')
@istest
def get_occurrence_not_found_with_bad_format(self):
# when
rv = self.app.get('/vcs/occurrences/1')
# then
- assert rv.status_code == 404
- assert rv.data == b'Not found!'
+ self.assertEquals(rv.status_code, 404)
+ self.assertEquals(rv.data, b'Not found!')
@istest
def put_occurrence_create_and_update(self):
occ_revision_sha1_hex = self.revision_sha1_hex_2
rv = self.app.get('/vcs/occurrences/%s' % occ_revision_sha1_hex)
# then
- assert rv.status_code == 404
- assert rv.data == b'Not found!'
+ self.assertEquals(rv.status_code, 404)
+ self.assertEquals(rv.data, b'Not found!')
# we create it
body = serial.dumps({'revision': hashutil.hex_to_hash(occ_revision_sha1_hex), # FIXME: redundant with the one from uri..
'branch': 'master',
'authority': 'softwareheritage',
'url-origin': self.origin_url})
rv = self.app.put('/vcs/occurrences/%s' % occ_revision_sha1_hex, # ... here
data=body,
headers={'Content-Type': serial.MIMETYPE})
- assert rv.status_code == 204
- assert rv.data == b''
+ self.assertEquals(rv.status_code, 204)
+ self.assertEquals(rv.data, b'')
# now it exists
rv = self.app.get('/vcs/occurrences/%s' % occ_revision_sha1_hex)
# then
- assert rv.status_code == 200
- assert serial.loads(rv.data) == ['master']
+ self.assertEquals(rv.status_code, 200)
+ self.assertEquals(serial.loads(rv.data), ['master'])
# we update it
# rv = self.app.put('/vcs/occurrences/%s' % occ_revision_sha1_hex,
# data=body,
# headers={'Content-Type': serial.MIMETYPE})
- # assert rv.status_code == 204
- # assert rv.data == b''
+ #self.assertEquals(rv.status_code, 204)
+ #self.assertEquals(rv.data, b'')
# # still the same
# rv = self.app.get('/vcs/occurrences/%s' % occ_revision_sha1_hex)
# # then
# occs = serial.loads(rv.data)
- # assert rv.status_code == 200
- # assert occs == ['master']
+ # self.assertEquals(rv.status_code, 200)
+ # self.assertEquals(occs, ['master'])
diff --git a/swh/loader/git/tests/test_api_origin.py b/swh/loader/git/tests/test_api_origin.py
index f6dfcaa..21d3e2d 100644
--- a/swh/loader/git/tests/test_api_origin.py
+++ b/swh/loader/git/tests/test_api_origin.py
@@ -1,99 +1,99 @@
# Copyright (C) 2015 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import unittest
from nose.tools import istest
from nose.plugins.attrib import attr
from swh.loader.git.storage import db, models
from swh.loader.git.protocols import serial
from test_utils import app_client
@attr('slow')
class OriginTestCase(unittest.TestCase):
@classmethod
def setUpClass(self):
self.app, db_url, _ = app_client()
with db.connect(db_url) as db_conn:
self.origin_url = 'https://github.com/torvalds/linux.git'
self.origin_type = 'git'
self.origin_id = models.add_origin(db_conn, self.origin_url, self.origin_type)
@istest
def get_origin_ok(self):
# when
payload = {'url': self.origin_url,
'type': self.origin_type}
rv = self.app.post('/vcs/origins/',
data=serial.dumps(payload),
headers={'Content-Type': serial.MIMETYPE})
# then
- assert rv.status_code == 200
- assert serial.loads(rv.data)['id'] == self.origin_id
+ self.assertEquals(rv.status_code, 200)
+ self.assertEquals(serial.loads(rv.data)['id'], self.origin_id)
@istest
def get_origin_not_found(self):
# when
payload = {'url': 'unknown',
'type': 'blah'}
rv = self.app.post('/vcs/origins/',
data=serial.dumps(payload),
headers={'Content-Type': serial.MIMETYPE})
# then
- assert rv.status_code == 404
- assert rv.data == b'Origin not found!'
+ self.assertEquals(rv.status_code, 404)
+ self.assertEquals(rv.data, b'Origin not found!')
@istest
def get_origin_not_found_with_bad_format(self):
# when
rv = self.app.post('/vcs/origins/',
data=serial.dumps({'url': 'unknown'}),
headers={'Content-Type': serial.MIMETYPE})
# then
- assert rv.status_code == 400
+ self.assertEquals(rv.status_code, 400)
@istest
def put_origin(self):
# when
payload = {'url': 'unknown',
'type': 'blah'}
rv = self.app.post('/vcs/origins/',
data=serial.dumps(payload),
headers={'Content-Type': serial.MIMETYPE})
# then
- assert rv.status_code == 404
- assert rv.data == b'Origin not found!'
+ self.assertEquals(rv.status_code, 404)
+ self.assertEquals(rv.data, b'Origin not found!')
# when
rv = self.app.put('/vcs/origins/',
data=serial.dumps(payload),
headers={'Content-Type': serial.MIMETYPE})
# then
- assert rv.status_code == 200 # FIXME: 201
- assert serial.loads(rv.data)['id']
+ self.assertEquals(rv.status_code, 200) # FIXME: 201)
+ self.assertIsNotNone(serial.loads(rv.data)['id'])
payload = {'url': 'unknown',
'type': 'blah'}
rv = self.app.post('/vcs/origins/',
data=serial.dumps(payload),
headers={'Content-Type': serial.MIMETYPE})
# then
- assert rv.status_code == 200
+ self.assertEquals(rv.status_code, 200)
origin_id = serial.loads(rv.data)['id']
- assert origin_id
+ self.assertIsNotNone(origin_id)
# when
rv = self.app.put('/vcs/origins/',
data=serial.dumps(payload),
headers={'Content-Type': serial.MIMETYPE})
# then
- assert rv.status_code == 200 # FIXME: 204
- assert serial.loads(rv.data)['id'] == origin_id
+ self.assertEquals(rv.status_code, 200) # FIXME: 204
+ self.assertEquals(serial.loads(rv.data)['id'], origin_id)
diff --git a/swh/loader/git/tests/test_api_person.py b/swh/loader/git/tests/test_api_person.py
index 85e87fa..f0b2434 100644
--- a/swh/loader/git/tests/test_api_person.py
+++ b/swh/loader/git/tests/test_api_person.py
@@ -1,99 +1,99 @@
# Copyright (C) 2015 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import unittest
from nose.tools import istest
from nose.plugins.attrib import attr
from swh.loader.git.storage import db, models
from swh.loader.git.protocols import serial
from test_utils import app_client
@attr('slow')
class PersonTestCase(unittest.TestCase):
@classmethod
def setUpClass(self):
self.app, db_url, _ = app_client()
with db.connect(db_url) as db_conn:
self.person_name = 'some-name'
self.person_email = 'some@mail.git'
self.person_id = models.add_person(db_conn, self.person_name, self.person_email)
@istest
def get_person_ok(self):
# when
person = {'name': self.person_name,
'email': self.person_email}
rv = self.app.post('/vcs/persons/',
data=serial.dumps(person),
headers={'Content-Type': serial.MIMETYPE})
# then
- assert rv.status_code == 200
- assert serial.loads(rv.data)['id'] == self.person_id
+ self.assertEquals(rv.status_code, 200)
+ self.assertEquals(serial.loads(rv.data)['id'], self.person_id)
@istest
def get_person_not_found(self):
# when
person = {'name': 'unknown',
'email': 'blah'}
rv = self.app.post('/vcs/persons/',
data=serial.dumps(person),
headers={'Content-Type': serial.MIMETYPE})
# then
- assert rv.status_code == 404
- assert rv.data == b'Person not found!'
+ self.assertEquals(rv.status_code, 404)
+ self.assertEquals(rv.data, b'Person not found!')
@istest
def get_person_not_found_with_bad_format(self):
# when
rv = self.app.post('/vcs/persons/',
data=serial.dumps({'name': 'unknown'}),
headers={'Content-Type': serial.MIMETYPE})
# then
- assert rv.status_code == 400
+ self.assertEquals(rv.status_code, 400)
@istest
def put_person(self):
# when
person = {'name': 'unknown',
'email': 'blah'}
rv = self.app.post('/vcs/persons/',
data=serial.dumps(person),
headers={'Content-Type': serial.MIMETYPE})
# then
- assert rv.status_code == 404
- assert rv.data == b'Person not found!'
+ self.assertEquals(rv.status_code, 404)
+ self.assertEquals(rv.data, b'Person not found!')
# when
rv = self.app.put('/vcs/persons/',
data=serial.dumps([person]),
headers={'Content-Type': serial.MIMETYPE})
# then
- assert rv.status_code == 204
- assert rv.data == b''
+ self.assertEquals(rv.status_code, 204)
+ self.assertEquals(rv.data, b'')
person = {'name': 'unknown',
'email': 'blah'}
rv = self.app.post('/vcs/persons/',
data=serial.dumps(person),
headers={'Content-Type': serial.MIMETYPE})
# then
- assert rv.status_code == 200
+ self.assertEquals(rv.status_code, 200)
person_id = serial.loads(rv.data)['id']
- assert person_id
+ self.assertIsNotNone(person_id)
# when
rv = self.app.put('/vcs/persons/',
data=serial.dumps([person, person]),
headers={'Content-Type': serial.MIMETYPE})
# then
- assert rv.status_code == 204
- assert rv.data == b''
+ self.assertEquals(rv.status_code, 204)
+ self.assertEquals(rv.data, b'')
diff --git a/swh/loader/git/tests/test_api_post_per_type.py b/swh/loader/git/tests/test_api_post_per_type.py
index 0c8b34a..03988ba 100644
--- a/swh/loader/git/tests/test_api_post_per_type.py
+++ b/swh/loader/git/tests/test_api_post_per_type.py
@@ -1,224 +1,224 @@
# Copyright (C) 2015 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import unittest
from nose.tools import istest
from nose.plugins.attrib import attr
from swh.loader.git.storage import db, models
from swh.loader.git.protocols import serial
from test_utils import now, app_client, app_client_teardown
from swh.core import hashutil
@attr('slow')
class TestPostObjectsPerTypeCase(unittest.TestCase):
@classmethod
def setUpClass(self):
self.app, self.db_url, self.content_storage_dir = app_client()
with db.connect(self.db_url) as db_conn:
self.content_sha1_id = '09a8ca4c0f510fda04a4dfe04d842cdfaafa7d8c'
self.content_sha1_id_bin = hashutil.hex_to_hash(self.content_sha1_id)
self.content_sha256_bin = hashutil.hashdata(b'something-to-hash', ['sha256'])['sha256']
models.add_content(db_conn,
self.content_sha1_id_bin,
self.content_sha1_id_bin,
self.content_sha256_bin,
10)
self.directory_sha1_hex = '19a8ca4c0f510fda04a4dfe04d842cdfaafa7d8c'
self.directory_sha1_bin = hashutil.hex_to_hash(self.directory_sha1_hex)
models.add_directory(db_conn, self.directory_sha1_bin)
authorAndCommitter = {'name': 'some-name', 'email': 'some-email'}
models.add_person(db_conn, authorAndCommitter['name'], authorAndCommitter['email'])
authorAndCommitter2 = {'name': 'tony', 'email': 'tony@dude.org'}
models.add_person(db_conn, authorAndCommitter2['name'], authorAndCommitter2['email'])
self.revision_sha1_hex = '29a8ca4c0f510fda04a4dfe04d842cdfaafa7d8c'
self.revision_sha1_bin = hashutil.hex_to_hash(self.revision_sha1_hex)
models.add_revision(db_conn,
self.revision_sha1_bin,
now(),
now(),
self.directory_sha1_bin,
"revision message",
authorAndCommitter,
authorAndCommitter)
self.revision_sha1_hex2 = '39a8ca4c0f510fda04a4dfe04d842cdfaafa7d8c'
self.revision_sha1_bin2 = hashutil.hex_to_hash(self.revision_sha1_hex2)
models.add_revision(db_conn,
self.revision_sha1_bin2,
now(),
now(),
self.directory_sha1_bin,
"revision message",
authorAndCommitter2,
authorAndCommitter2,
parent_shas=['revision-sha1-to-test-existence9994f717e'])
self.release_sha1_hex = '49a8ca4c0f510fda04a4dfe04d842cdfaafa7d8c'
self.release_sha1_bin = hashutil.hex_to_hash(self.release_sha1_hex)
models.add_release(db_conn,
self.release_sha1_bin,
self.revision_sha1_bin,
now(),
"0.0.1",
"Super release tagged by tony",
authorAndCommitter2)
self.origin_url = "https://github.com/user/repo"
models.add_origin(db_conn, self.origin_url, 'git')
models.add_occurrence_history(db_conn,
self.origin_url,
'master',
self.revision_sha1_bin,
'softwareheritage')
@classmethod
def tearDownClass(self):
app_client_teardown(self.content_storage_dir)
@istest
def post_all_non_presents_contents(self):
# given
# when
payload = [self.content_sha1_id_bin,
hashutil.hex_to_hash('555444f9dd5dc46ee476a8be155ab049994f717e'),
hashutil.hex_to_hash('555444f9dd5dc46ee476a8be155ab049994f717e'),
hashutil.hex_to_hash('666777f9dd5dc46ee476a8be155ab049994f717e')]
query_payload = serial.dumps(payload)
rv = self.app.post('/vcs/contents/',
data=query_payload,
headers={'Content-Type': serial.MIMETYPE})
# then
- assert rv.status_code == 200
+ self.assertEquals(rv.status_code, 200)
sha1s = serial.loads(rv.data)
- assert len(sha1s) is 2 # only 2 sha1s
- assert hashutil.hex_to_hash("666777f9dd5dc46ee476a8be155ab049994f717e") in sha1s
- assert hashutil.hex_to_hash("555444f9dd5dc46ee476a8be155ab049994f717e") in sha1s
+ self.assertEquals(len(sha1s), 2) # only 2 sha1s
+ self.assertIn(hashutil.hex_to_hash("666777f9dd5dc46ee476a8be155ab049994f717e"), sha1s)
+ self.assertIn(hashutil.hex_to_hash("555444f9dd5dc46ee476a8be155ab049994f717e"), sha1s)
@istest
def post_all_non_presents_directories(self):
# given
# when
payload = [self.directory_sha1_bin,
hashutil.hex_to_hash('555444f9dd5dc46ee476a8be155ab049994f717e'),
hashutil.hex_to_hash('555444f9dd5dc46ee476a8be155ab049994f717e'),
hashutil.hex_to_hash('666777f9dd5dc46ee476a8be155ab049994f717e')]
query_payload = serial.dumps(payload)
rv = self.app.post('/vcs/directories/',
data=query_payload,
headers={'Content-Type': serial.MIMETYPE})
# then
- assert rv.status_code == 200
+ self.assertEquals(rv.status_code, 200)
sha1s = serial.loads(rv.data)
- assert len(sha1s) is 2 # only 2 sha1s
- assert hashutil.hex_to_hash("666777f9dd5dc46ee476a8be155ab049994f717e") in sha1s
- assert hashutil.hex_to_hash("555444f9dd5dc46ee476a8be155ab049994f717e") in sha1s
+ self.assertEquals(len(sha1s), 2) # only 2 sha1s
+ self.assertIn(hashutil.hex_to_hash("666777f9dd5dc46ee476a8be155ab049994f717e"), sha1s)
+ self.assertIn(hashutil.hex_to_hash("555444f9dd5dc46ee476a8be155ab049994f717e"), sha1s)
@istest
def post_all_non_presents_revisions(self):
# given
# when
payload = [self.revision_sha1_bin,
self.revision_sha1_bin,
hashutil.hex_to_hash('555444f9dd5dc46ee476a8be155ab049994f717e'),
hashutil.hex_to_hash('555444f9dd5dc46ee476a8be155ab049994f717e'),
hashutil.hex_to_hash('666777f9dd5dc46ee476a8be155ab049994f717e')]
query_payload = serial.dumps(payload)
rv = self.app.post('/vcs/revisions/',
data=query_payload,
headers={'Content-Type': serial.MIMETYPE})
# then
- assert rv.status_code == 200
+ self.assertEquals(rv.status_code, 200)
sha1s = serial.loads(rv.data)
- assert len(sha1s) is 2 # only 2 sha1s
- assert hashutil.hex_to_hash("666777f9dd5dc46ee476a8be155ab049994f717e") in sha1s
- assert hashutil.hex_to_hash("555444f9dd5dc46ee476a8be155ab049994f717e") in sha1s
+ self.assertEquals(len(sha1s), 2)
+ self.assertIn(hashutil.hex_to_hash("666777f9dd5dc46ee476a8be155ab049994f717e"), sha1s)
+ self.assertIn(hashutil.hex_to_hash("555444f9dd5dc46ee476a8be155ab049994f717e"), sha1s)
@istest
def post_all_non_presents_releases(self):
# given
# when
payload = [self.release_sha1_bin,
self.release_sha1_bin,
hashutil.hex_to_hash('555444f9dd5dc46ee476a8be155ab049994f717e'),
hashutil.hex_to_hash('555444f9dd5dc46ee476a8be155ab049994f717e'),
hashutil.hex_to_hash('666777f9dd5dc46ee476a8be155ab049994f717e')]
query_payload = serial.dumps(payload)
rv = self.app.post('/vcs/releases/',
data=query_payload,
headers={'Content-Type': serial.MIMETYPE})
# then
- assert rv.status_code == 400
- assert rv.data == b'Bad request. Type not supported!'
+ self.assertEquals(rv.status_code, 400)
+ self.assertEquals(rv.data, b'Bad request. Type not supported!')
@istest
def post_all_non_presents_occurrences_KO(self):
# given
# when
payload = [self.revision_sha1_bin,
self.revision_sha1_bin,
hashutil.hex_to_hash('555444f9dd5dc46ee476a8be155ab049994f717e'),
hashutil.hex_to_hash('555444f9dd5dc46ee476a8be155ab049994f717e'),
hashutil.hex_to_hash('666777f9dd5dc46ee476a8be155ab049994f717e')]
query_payload = serial.dumps(payload)
rv = self.app.post('/vcs/occurrences/',
data=query_payload,
headers={'Content-Type': serial.MIMETYPE})
# then
- assert rv.status_code == 400
- assert rv.data == b'Bad request. Type not supported!'
+ self.assertEquals(rv.status_code, 400)
+ self.assertEquals(rv.data, b'Bad request. Type not supported!')
@istest
def post_non_presents_objects_empty_payload_so_empty_results(self):
# given
# when
for api_type in ['contents', 'directories', 'revisions']:
rv = self.app.post('/vcs/%s/' % api_type,
data=serial.dumps({}),
headers={'Content-Type': serial.MIMETYPE})
# then
- assert rv.status_code == 200
- assert serial.loads(rv.data) == []
+ self.assertEquals(rv.status_code, 200)
+ self.assertEquals(serial.loads(rv.data), [])
@istest
def post_non_presents_objects_bad_requests_format_pickle(self):
# given
# when
for api_type in ['contents', 'directories', 'revisions']:
rv = self.app.post('/vcs/%s/' % api_type,
data="not pickle -> fail")
# then
- assert rv.status_code == 400
- assert rv.data == b'Bad request. Expected application/octet-stream data!'
+ self.assertEquals(rv.status_code, 400)
+ self.assertEquals(rv.data, b'Bad request. Expected application/octet-stream data!')
diff --git a/swh/loader/git/tests/test_api_release.py b/swh/loader/git/tests/test_api_release.py
index abc9cff..4784409 100644
--- a/swh/loader/git/tests/test_api_release.py
+++ b/swh/loader/git/tests/test_api_release.py
@@ -1,126 +1,126 @@
# Copyright (C) 2015 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import unittest
from nose.tools import istest
from nose.plugins.attrib import attr
from swh.loader.git.storage import db, models
from swh.loader.git.protocols import serial
from test_utils import now, app_client, app_client_teardown
from swh.core import hashutil
@attr('slow')
class ReleaseTestCase(unittest.TestCase):
@classmethod
def setUpClass(self):
self.app, db_url, self.content_storage_dir = app_client()
with db.connect(db_url) as db_conn:
self.directory_sha1_hex = 'ebefcd8f9c8a1003ee35f7f953c4c1480986e607'
self.directory_sha1_bin = hashutil.hex_to_hash(self.directory_sha1_hex)
models.add_directory(db_conn, self.directory_sha1_bin)
self.tagAuthor = {'name': 'tony', 'email': 'tony@mail.org'}
models.add_person(db_conn, self.tagAuthor['name'], self.tagAuthor['email'])
self.revision_sha1_hex = 'dbefcd8f9c8a1003ee35f7f953c4c1480986e607'
self.revision_sha1_bin = hashutil.hex_to_hash(self.revision_sha1_hex)
models.add_revision(db_conn,
self.revision_sha1_bin,
now(),
now(),
self.directory_sha1_bin,
"revision message",
self.tagAuthor,
self.tagAuthor)
self.release_sha1_hex = 'cbefcd8f9c8a1003ee35f7f953c4c1480986e607'
self.release_sha1_bin = hashutil.hex_to_hash(self.release_sha1_hex)
models.add_release(db_conn,
self.release_sha1_bin,
self.revision_sha1_bin,
now(),
"0.0.1",
"Super release tagged by tony",
self.tagAuthor)
@classmethod
def tearDownClass(self):
app_client_teardown(self.content_storage_dir)
@istest
def get_release_ok(self):
# when
rv = self.app.get('/vcs/releases/%s' % self.release_sha1_hex)
# then
- assert rv.status_code == 200
- assert serial.loads(rv.data)['id'] == self.release_sha1_hex
+ self.assertEquals(rv.status_code, 200)
+ self.assertEquals(serial.loads(rv.data)['id'], self.release_sha1_hex)
@istest
def get_release_not_found(self):
# when
rv = self.app.get('/vcs/releases/inexistant-sha1')
# then
- assert rv.status_code == 404
- assert rv.data == b'Not found!'
+ self.assertEquals(rv.status_code, 404)
+ self.assertEquals(rv.data, b'Not found!')
@istest
def get_release_not_found_with_bad_format(self):
# when
rv = self.app.get('/vcs/releases/1')
# then
- assert rv.status_code == 404
- assert rv.data == b'Not found!'
+ self.assertEquals(rv.status_code, 404)
+ self.assertEquals(rv.data, b'Not found!')
@istest
def put_release_create_and_update(self):
release_sha1_hex = 'bbefcd8f9c8a1003ee35f7f953c4c1480986e607'
release_sha1_bin = hashutil.hex_to_hash(release_sha1_hex)
rv = self.app.get('/vcs/releases/%s' % release_sha1_hex)
# then
- assert rv.status_code == 404
- assert rv.data == b'Not found!'
+ self.assertEquals(rv.status_code, 404)
+ self.assertEquals(rv.data, b'Not found!')
# we create it
body = serial.dumps({'id': release_sha1_bin,
'revision': self.revision_sha1_bin,
'date': now(),
'name': '0.0.1',
'comment': 'super release tagged by ardumont',
'author': self.tagAuthor})
rv = self.app.put('/vcs/releases/%s' % release_sha1_hex,
data=body,
headers={'Content-Type': serial.MIMETYPE})
- assert rv.status_code == 204
- assert rv.data == b''
+ self.assertEquals(rv.status_code, 204)
+ self.assertEquals(rv.data, b'')
# now it exists
rv = self.app.get('/vcs/releases/%s' % release_sha1_hex)
# then
- assert rv.status_code == 200
- assert serial.loads(rv.data)['id'] == release_sha1_hex
+ self.assertEquals(rv.status_code, 200)
+ self.assertEquals(serial.loads(rv.data)['id'], release_sha1_hex)
# we update it
rv = self.app.put('/vcs/releases/%s' % release_sha1_hex,
data=body,
headers={'Content-Type': serial.MIMETYPE})
- assert rv.status_code == 204
- assert rv.data == b''
+ self.assertEquals(rv.status_code, 204)
+ self.assertEquals(rv.data, b'')
# still the same
rv = self.app.get('/vcs/releases/%s' % release_sha1_hex)
# then
- assert rv.status_code == 200
- assert serial.loads(rv.data)['id'] == release_sha1_hex
+ self.assertEquals(rv.status_code, 200)
+ self.assertEquals(serial.loads(rv.data)['id'], release_sha1_hex)
diff --git a/swh/loader/git/tests/test_api_revision.py b/swh/loader/git/tests/test_api_revision.py
index 41192d7..18935bf 100644
--- a/swh/loader/git/tests/test_api_revision.py
+++ b/swh/loader/git/tests/test_api_revision.py
@@ -1,141 +1,141 @@
# Copyright (C) 2015 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import unittest
from nose.tools import istest
from nose.plugins.attrib import attr
from swh.loader.git.storage import db, models
from swh.loader.git.protocols import serial
from test_utils import now, app_client, app_client_teardown
from swh.core import hashutil
@attr('slow')
class RevisionTestCase(unittest.TestCase):
@classmethod
def setUpClass(self):
self.app, db_url, self.content_storage_dir = app_client()
with db.connect(db_url) as db_conn:
directory_sha1_hex = '13d2a9739ac02431681c317ce449909a46c59554'
self.directory_sha1_bin = hashutil.hex_to_hash(directory_sha1_hex)
models.add_directory(db_conn, self.directory_sha1_bin)
self.authorAndCommitter = {'name': 'some-name', 'email': 'some-email'}
models.add_person(db_conn, self.authorAndCommitter['name'], self.authorAndCommitter['email'])
self.revision_parent_sha1_hex = '23d2a9739ac02431681c317ce449909a46c59554'
self.revision_parent_sha1_bin = hashutil.hex_to_hash(self.revision_parent_sha1_hex)
models.add_revision(db_conn,
self.revision_parent_sha1_bin,
now(),
now(),
self.directory_sha1_bin,
"revision message",
self.authorAndCommitter,
self.authorAndCommitter)
revision_parent_2_sha1_hex = '33d2a9739ac02431681c317ce449909a46c59554'
self.revision_parent_2_sha1_bin = hashutil.hex_to_hash(revision_parent_2_sha1_hex)
models.add_revision(db_conn,
self.revision_parent_2_sha1_bin,
now(),
now(),
self.directory_sha1_bin,
"revision message 2",
self.authorAndCommitter,
self.authorAndCommitter)
revision_parent_3_sha1_hex = '43d2a9739ac02431681c317ce449909a46c59554'
self.revision_parent_3_sha1_bin = hashutil.hex_to_hash(revision_parent_3_sha1_hex)
models.add_revision(db_conn,
self.revision_parent_3_sha1_bin,
now(),
now(),
self.directory_sha1_bin,
"revision message 3",
self.authorAndCommitter,
self.authorAndCommitter)
@classmethod
def tearDownClass(self):
app_client_teardown(self.content_storage_dir)
@istest
def get_revision_ok(self):
# when
rv = self.app.get('/vcs/revisions/%s' % self.revision_parent_sha1_hex)
# then
- assert rv.status_code == 200
- assert serial.loads(rv.data)['id'] == self.revision_parent_sha1_hex
+ self.assertEquals(rv.status_code, 200)
+ self.assertEquals(serial.loads(rv.data)['id'], self.revision_parent_sha1_hex)
@istest
def get_revision_not_found(self):
# when
rv = self.app.get('/vcs/revisions/inexistant-sha1')
# then
- assert rv.status_code == 404
- assert rv.data == b'Not found!'
+ self.assertEquals(rv.status_code, 404)
+ self.assertEquals(rv.data, b'Not found!')
@istest
def get_revision_not_found_with_bad_format(self):
# when
rv = self.app.get('/vcs/revisions/1')
# then
- assert rv.status_code == 404
- assert rv.data == b'Not found!'
+ self.assertEquals(rv.status_code, 404)
+ self.assertEquals(rv.data, b'Not found!')
@istest
def put_revision_create_and_update(self):
revision_sha1_hex = '53d2a9739ac02431681c317ce449909a46c59554'
rv = self.app.get('/vcs/revisions/%s' % revision_sha1_hex)
# then
- assert rv.status_code == 404
- assert rv.data == b'Not found!'
+ self.assertEquals(rv.status_code, 404)
+ self.assertEquals(rv.data, b'Not found!')
# we create it
body = serial.dumps({'date': now(),
'committer-date': now(),
'directory': self.directory_sha1_bin,
'message': 'revision message describing it',
'committer': self.authorAndCommitter,
'author': self.authorAndCommitter,
'parent-sha1s': [self.revision_parent_sha1_bin,
self.revision_parent_3_sha1_bin,
self.revision_parent_2_sha1_bin]})
rv = self.app.put('/vcs/revisions/%s' % revision_sha1_hex,
data=body,
headers={'Content-Type': serial.MIMETYPE})
- assert rv.status_code == 204
- assert rv.data == b''
+ self.assertEquals(rv.status_code, 204)
+ self.assertEquals(rv.data, b'')
# now it exists
rv = self.app.get('/vcs/revisions/%s' % revision_sha1_hex)
# then
- assert rv.status_code == 200
- assert serial.loads(rv.data)['id'] == revision_sha1_hex
+ self.assertEquals(rv.status_code, 200)
+ self.assertEquals(serial.loads(rv.data)['id'], revision_sha1_hex)
# we update it
rv = self.app.put('/vcs/revisions/%s' % revision_sha1_hex,
data=body,
headers={'Content-Type': serial.MIMETYPE})
- assert rv.status_code == 204
- assert rv.data == b''
+ self.assertEquals(rv.status_code, 204)
+ self.assertEquals(rv.data, b'')
# still the same
rv = self.app.get('/vcs/revisions/%s' % revision_sha1_hex)
# then
- assert rv.status_code == 200
- assert serial.loads(rv.data)['id'] == revision_sha1_hex
+ self.assertEquals(rv.status_code, 200)
+ self.assertEquals(serial.loads(rv.data)['id'], revision_sha1_hex)
diff --git a/swh/loader/git/tests/test_date.py b/swh/loader/git/tests/test_date.py
index d8ac1c0..052813d 100644
--- a/swh/loader/git/tests/test_date.py
+++ b/swh/loader/git/tests/test_date.py
@@ -1,56 +1,56 @@
# Copyright (C) 2015 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import unittest
from nose.tools import istest
from swh.loader.git import date
class DateTestCase(unittest.TestCase):
@istest
def negative_offsets(self):
# when
d0 = date.ts_to_str(1434379797, -120)
- assert d0 == '2015-06-15 12:49:57-02:00'
+ self.assertEquals(d0, '2015-06-15 12:49:57-02:00')
# when
d1 = date.ts_to_str(1434379797, -60)
- assert d1 == '2015-06-15 13:49:57-01:00'
+ self.assertEquals(d1, '2015-06-15 13:49:57-01:00')
# when
d2 = date.ts_to_str(1434379797, -30)
- assert d2 == '2015-06-15 14:19:57-00:30'
+ self.assertEquals(d2, '2015-06-15 14:19:57-00:30')
# when
d3 = date.ts_to_str(1434379797, 30)
- assert d3 == '2015-06-15 15:19:57+00:30'
+ self.assertEquals(d3, '2015-06-15 15:19:57+00:30')
@istest
def positive_offsets(self):
# when
d0 = date.ts_to_str(1434449254, 120)
- assert d0 == '2015-06-16 12:07:34+02:00'
+ self.assertEquals(d0, '2015-06-16 12:07:34+02:00')
# when
d1 = date.ts_to_str(1434449254, 60)
- assert d1 == '2015-06-16 11:07:34+01:00'
+ self.assertEquals(d1, '2015-06-16 11:07:34+01:00')
# when
d2 = date.ts_to_str(1434449254, 0)
- assert d2 == '2015-06-16 10:07:34+00:00'
+ self.assertEquals(d2, '2015-06-16 10:07:34+00:00')
# when
d3 = date.ts_to_str(1434449254, -60)
- assert d3 == '2015-06-16 09:07:34-01:00'
+ self.assertEquals(d3, '2015-06-16 09:07:34-01:00')
diff --git a/swh/loader/git/tests/test_http.py b/swh/loader/git/tests/test_http.py
index c293124..c519df1 100644
--- a/swh/loader/git/tests/test_http.py
+++ b/swh/loader/git/tests/test_http.py
@@ -1,41 +1,42 @@
# Copyright (C) 2015 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import unittest
from nose.tools import istest
from swh.loader.git.client import http
from swh.loader.git.storage import storage
class TestHttp(unittest.TestCase):
@istest
def url(self):
# when
s = http.compute_simple_url('http://base-url', '/end')
# then
- assert s == 'http://base-url/end'
+ self.assertEquals(s, 'http://base-url/end')
@istest
def url_lookup_per_type(self):
# then
- assert http.url_lookup_per_type == { storage.Type.origin: "/vcs/origins/"
- , storage.Type.content: "/vcs/contents/"
- , storage.Type.directory: "/vcs/directories/"
- , storage.Type.revision: "/vcs/revisions/" }
+ self.assertEquals(http.url_lookup_per_type,
+ {storage.Type.origin: "/vcs/origins/",
+ storage.Type.content: "/vcs/contents/",
+ storage.Type.directory: "/vcs/directories/",
+ storage.Type.revision: "/vcs/revisions/"})
@istest
def url_store_per_type(self):
# then
- assert http.url_store_per_type == { storage.Type.origin: "/vcs/origins/"
- , storage.Type.content: "/vcs/contents/"
- , storage.Type.directory: "/vcs/directories/"
- , storage.Type.revision: "/vcs/revisions/"
- , storage.Type.release: "/vcs/releases/"
- , storage.Type.occurrence: "/vcs/occurrences/"
- , storage.Type.person: "/vcs/persons/"
- }
+ self.assertEquals(http.url_store_per_type,
+ {storage.Type.origin: "/vcs/origins/",
+ storage.Type.content: "/vcs/contents/",
+ storage.Type.directory: "/vcs/directories/",
+ storage.Type.revision: "/vcs/revisions/",
+ storage.Type.release: "/vcs/releases/",
+ storage.Type.occurrence: "/vcs/occurrences/",
+ storage.Type.person: "/vcs/persons/"})
diff --git a/swh/loader/git/tests/test_local_loader.py b/swh/loader/git/tests/test_local_loader.py
index 04daa46..3aac15a 100644
--- a/swh/loader/git/tests/test_local_loader.py
+++ b/swh/loader/git/tests/test_local_loader.py
@@ -1,249 +1,252 @@
# coding: utf-8
# Copyright (C) 2015 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import unittest
import pygit2
import tempfile
import shutil
from nose.plugins.attrib import attr
from nose.tools import istest
from swh.core.conf import reader
from swh.loader.git.storage import db, models
from swh.loader.git import loader
import test_initdb
from test_utils import list_files_from
from test_git_utils import create_commit_with_content, create_tag
@attr('slow')
class TestLocalLoader(unittest.TestCase):
def setUp(self):
"""Initialize a git repository for the remaining test to manipulate.
"""
tmp_git_folder_path = tempfile.mkdtemp(prefix='test-sgloader.',
dir='/tmp')
self.tmp_git_repo = pygit2.init_repository(tmp_git_folder_path)
self.conf_back = reader.read('./resources/test/back.ini',
{'port': ('int', 9999)})
self.db_url = self.conf_back['db_url']
self.conf = {
'action': 'load',
'repo_path': self.tmp_git_repo.workdir,
'backend-type': 'local',
'backend': './resources/test/back.ini'
}
def init_db_setup(self):
"""Initialize a git repository for the remaining test to manipulate.
"""
test_initdb.prepare_db(self.db_url)
def tearDown(self):
"""Destroy the test git repository.
"""
shutil.rmtree(self.tmp_git_repo.workdir)
shutil.rmtree(self.conf_back['content_storage_dir'], ignore_errors=True)
@istest
def should_fail_on_bad_action(self):
# when
try:
loader.load({'action': 'unknown'})
+ self.fail()
except:
- pass
+ self.assertTrue(True)
@istest
def should_fail_on_inexistant_folder(self):
# when
try:
loader.load({'action': 'load',
'repo_path': 'something-that-definitely-does-not-exist'})
+ self.fail()
except:
- pass
+ self.assertTrue(True)
@istest
def should_fail_on_inexistant_backend_type(self):
# when
try:
loader.load({'action': 'load',
'repo_path': '.',
'backend-type': 'unknown'}) # only local or remote supported
+ self.fail()
except:
- pass
+ self.assertTrue(True)
@istest
def local_loader(self):
"""Trigger loader and make sure everything is ok.
"""
self.init_db_setup()
# given
commit0 = create_commit_with_content(self.tmp_git_repo, 'blob 0',
'commit msg 0')
commit1 = create_commit_with_content(self.tmp_git_repo, 'blob 1',
'commit msg 1',
[commit0.hex])
commit2 = create_commit_with_content(self.tmp_git_repo, 'blob 2',
'commit msg 2',
[commit1.hex])
commit3 = create_commit_with_content(self.tmp_git_repo, None,
'commit msg 3',
[commit2.hex])
commit4 = create_commit_with_content(self.tmp_git_repo, 'blob 4',
'commit msg 4',
[commit3.hex])
# when
loader.load(self.conf)
# then
nb_files = len(list_files_from(self.conf_back['content_storage_dir']))
self.assertEquals(nb_files, 4, "4 blobs.")
with db.connect(self.db_url) as db_conn:
self.assertEquals(
models.count_revisions(db_conn),
5,
"Should be 5 commits")
self.assertEquals(
models.count_directories(db_conn),
5,
"Should be 5 trees")
self.assertEquals(
models.count_contents(db_conn),
4,
"Should be 4 blobs as we created one commit without data!")
self.assertEquals(
models.count_release(db_conn),
0,
"No tag created so 0 release.")
self.assertEquals(
models.count_occurrence(db_conn),
1,
"Should be 1 reference (master) so 1 occurrence.")
# given
commit5 = create_commit_with_content(self.tmp_git_repo, 'new blob 5',
'commit msg 5',
[commit4.hex])
commit6 = create_commit_with_content(self.tmp_git_repo,
'new blob and last 6',
'commit msg 6',
[commit5.hex])
commit7 = create_commit_with_content(self.tmp_git_repo, 'new blob 7',
'commit msg 7',
[commit6.hex])
# when
loader.load(self.conf)
# then
nb_files = len(list_files_from(self.conf_back['content_storage_dir']))
self.assertEquals(nb_files, 4+3, "3 new blobs.")
with db.connect(self.db_url) as db_conn:
self.assertEquals(
models.count_revisions(db_conn),
8,
"Should be 5+3 == 8 commits now")
self.assertEquals(
models.count_directories(db_conn),
8,
"Should be 5+3 == 8 trees")
self.assertEquals(
models.count_contents(db_conn),
7,
"Should be 4+3 == 7 blobs")
self.assertEquals(
models.count_release(db_conn),
0,
"No tag created so 0 release.")
self.assertEquals(
models.count_occurrence(db_conn),
2,
"Should be 1 reference which changed twice so 2 occurrences (master changed).")
# given
create_commit_with_content(self.tmp_git_repo, None,
'commit 8 with parent 2',
[commit7.hex])
# when
loader.load(self.conf)
# then
nb_files = len(list_files_from(self.conf_back['content_storage_dir']))
self.assertEquals(nb_files, 7, "no new blob.")
with db.connect(self.db_url) as db_conn:
self.assertEquals(
models.count_revisions(db_conn),
9,
"Should be 8+1 == 9 commits now")
self.assertEquals(
models.count_directories(db_conn),
8,
"Should be 8 trees (new commit without blob so no new tree)")
self.assertEquals(
models.count_contents(db_conn),
7,
"Should be 7 blobs (new commit without new blob)")
self.assertEquals(
models.count_release(db_conn),
0,
"No tag created so 0 release.")
self.assertEquals(
models.count_occurrence(db_conn),
3,
"Should be 1 reference which changed thrice so 3 occurrences (master changed again).")
self.assertEquals(
models.count_person(db_conn),
2,
"1 author + 1 committer")
# add tag
create_tag(self.tmp_git_repo, '0.0.1', commit5, 'bad ass release 0.0.1, towards infinity...')
create_tag(self.tmp_git_repo, '0.0.2', commit7, 'release 0.0.2... and beyond')
loader.load(self.conf)
# then
nb_files = len(list_files_from(self.conf_back['content_storage_dir']))
self.assertEquals(nb_files, 7, "no new blob.")
with db.connect(self.db_url) as db_conn:
self.assertEquals(
models.count_revisions(db_conn),
9,
"Should be 8+1 == 9 commits now")
self.assertEquals(
models.count_directories(db_conn),
8,
"Should be 8 trees (new commit without blob so no new tree)")
self.assertEquals(
models.count_contents(db_conn),
7,
"Should be 7 blobs (new commit without new blob)")
self.assertEquals(
models.count_release(db_conn),
2,
"Should be 2 annotated tags so 2 releases")
self.assertEquals(
models.count_occurrence(db_conn),
3,
"master did not change this time so still 3 occurrences")
self.assertEquals(
models.count_person(db_conn),
3,
"1 author + 1 committer + 1 tagger")
diff --git a/swh/loader/git/tests/test_remote_loader.py b/swh/loader/git/tests/test_remote_loader.py
index dcaf401..13a5603 100644
--- a/swh/loader/git/tests/test_remote_loader.py
+++ b/swh/loader/git/tests/test_remote_loader.py
@@ -1,252 +1,251 @@
# coding: utf-8
# Copyright (C) 2015 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import unittest
import pygit2
import tempfile
import shutil
import os
from nose.plugins.attrib import attr
from nose.tools import istest
from swh.core.conf import reader
from swh.loader.git.storage import db, models
from swh.loader.git import loader
import test_initdb
from test_git_utils import create_commit_with_content, create_tag
from test_utils import list_files_from
@attr('slow')
class TestRemoteLoader(unittest.TestCase):
def setUp(self):
tmp_git_folder_path = tempfile.mkdtemp(prefix='test-sgloader.',
dir='/tmp')
self.tmp_git_repo = pygit2.init_repository(tmp_git_folder_path)
self.conf = reader.read('./resources/test/back.ini',
{'port': ('int', 9999)})
self.db_url = self.conf['db_url']
self.conf.update({
'action': 'load',
'repo_path': self.tmp_git_repo.workdir,
'backend-type': 'remote',
'backend': 'http://localhost:%s' % self.conf['port']
})
# Not the remote loader in charge of creating the folder, so we do it
if not os.path.exists(self.conf['content_storage_dir']):
os.mkdir(self.conf['content_storage_dir'])
def init_db_setup(self):
"""Initialize a git repository for the remaining test to manipulate.
"""
test_initdb.prepare_db(self.db_url)
def tearDown(self):
"""Destroy the test git repository.
"""
shutil.rmtree(self.tmp_git_repo.workdir)
shutil.rmtree(self.conf['content_storage_dir'])
@istest
def should_fail_on_bad_action(self):
# when
try:
loader.load({'action': 'unknown'})
except:
- # FIXME assert raises
- pass
+ self.assertTrue(True)
@istest
def should_fail_on_inexistant_folder(self):
# when
try:
loader.load({'action': 'load',
'repo_path': 'something-that-definitely-does-not-exist'})
except:
- pass
+ self.assertTrue(True)
@istest
def should_fail_on_inexistant_backend_type(self):
# when
try:
loader.load({'action': 'load',
'repo_path': '.',
'backend-type': 'unknown'}) # only local or remote supported
except:
- pass
+ self.assertTrue(True)
@istest
def remote_loader(self):
"""Trigger loader and make sure everything is ok.
"""
# given
self.init_db_setup()
# given
commit0 = create_commit_with_content(self.tmp_git_repo, 'blob 0',
'commit msg 0')
commit1 = create_commit_with_content(self.tmp_git_repo, 'blob 1',
'commit msg 1',
[commit0.hex])
commit2 = create_commit_with_content(self.tmp_git_repo, 'blob 2',
'commit msg 2',
[commit1.hex])
commit3 = create_commit_with_content(self.tmp_git_repo, None,
'commit msg 3',
[commit2.hex])
commit4 = create_commit_with_content(self.tmp_git_repo, 'blob 4',
'commit msg 4',
[commit3.hex])
# when
loader.load(self.conf)
# then
nb_files = len(list_files_from(self.conf['content_storage_dir']))
self.assertEquals(nb_files, 4, "4 blobs")
with db.connect(self.db_url) as db_conn:
self.assertEquals(
models.count_revisions(db_conn),
5,
"Should be 5 commits")
self.assertEquals(
models.count_directories(db_conn),
5,
"Should be 5 trees")
self.assertEquals(
models.count_contents(db_conn),
4,
"Should be 4 blobs as we created one commit without data!")
self.assertEquals(
models.count_release(db_conn),
0,
"No tag created so 0 release.")
self.assertEquals(
models.count_occurrence(db_conn),
1,
"Should be 1 reference (master) so 1 occurrence.")
# given
commit5 = create_commit_with_content(self.tmp_git_repo, 'new blob 5',
'commit msg 5',
[commit4.hex])
commit6 = create_commit_with_content(self.tmp_git_repo,
'new blob and last 6',
'commit msg 6',
[commit5.hex])
commit7 = create_commit_with_content(self.tmp_git_repo, 'new blob 7',
'commit msg 7',
[commit6.hex])
# when
loader.load(self.conf)
# then
nb_files = len(list_files_from(self.conf['content_storage_dir']))
self.assertEquals(nb_files, 4+3, "3 new blobs")
with db.connect(self.db_url) as db_conn:
self.assertEquals(
models.count_revisions(db_conn),
8,
"Should be 5+3 == 8 commits now")
self.assertEquals(
models.count_directories(db_conn),
8,
"Should be 5+3 == 8 trees")
self.assertEquals(
models.count_contents(db_conn),
7,
"Should be 4+3 == 7 blobs")
self.assertEquals(
models.count_release(db_conn),
0,
"No tag created so 0 release.")
self.assertEquals(
models.count_occurrence(db_conn),
2,
"Should be 1 reference which changed twice so 2 occurrences (master changed).")
# given
create_commit_with_content(self.tmp_git_repo, None,
'commit 8 with parent 2',
[commit7.hex])
# when
loader.load(self.conf)
# then
nb_files = len(list_files_from(self.conf['content_storage_dir']))
self.assertEquals(nb_files, 7, "no new blob")
with db.connect(self.db_url) as db_conn:
self.assertEquals(
models.count_revisions(db_conn),
9,
"Should be 8+1 == 9 commits now")
self.assertEquals(
models.count_directories(db_conn),
8,
"Should be 8 trees (new commit without blob so no new tree)")
self.assertEquals(
models.count_contents(db_conn),
7,
"Should be 7 blobs (new commit without new blob)")
self.assertEquals(
models.count_release(db_conn),
0,
"No tag created so 0 release.")
self.assertEquals(
models.count_occurrence(db_conn),
3,
"Should be 1 reference which changed thrice so 3 occurrences (master changed again).")
self.assertEquals(
models.count_person(db_conn),
2,
"1 author + 1 committer")
# add tag
create_tag(self.tmp_git_repo, '0.0.1', commit5, 'bad ass release 0.0.1, towards infinity...')
create_tag(self.tmp_git_repo, '0.0.2', commit7, 'release 0.0.2... and beyond')
loader.load(self.conf)
# then
nb_files = len(list_files_from(self.conf['content_storage_dir']))
self.assertEquals(nb_files, 7, "no new blob")
with db.connect(self.db_url) as db_conn:
self.assertEquals(
models.count_revisions(db_conn),
9,
"Should be 8+1 == 9 commits now")
self.assertEquals(
models.count_directories(db_conn),
8,
"Should be 8 trees (new commit without blob so no new tree)")
self.assertEquals(
models.count_contents(db_conn),
7,
"Should be 7 blobs (new commit without new blob)")
self.assertEquals(
models.count_release(db_conn),
2,
"Should be 2 annotated tags so 2 releases")
self.assertEquals(
models.count_occurrence(db_conn),
3,
"master did not change this time so still 3 occurrences")
self.assertEquals(
models.count_person(db_conn),
3,
"1 author + 1 committer + 1 tagger")
diff --git a/swh/loader/git/tests/test_swhrepo.py b/swh/loader/git/tests/test_swhrepo.py
index 2555c90..2564d83 100644
--- a/swh/loader/git/tests/test_swhrepo.py
+++ b/swh/loader/git/tests/test_swhrepo.py
@@ -1,53 +1,53 @@
# Copyright (C) 2015 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import unittest
from nose.tools import istest
from swh.loader.git.data import swhrepo
class SWHRepoTestCase(unittest.TestCase):
@istest
def new_swhrepo(self):
# when
r = swhrepo.SWHRepo()
r.add_origin({'url': 'foobar'})
r.add_content({'id': 'some-con-sha1'})
r.add_content({'id': 'some-con-sha1-2','stuff': 'some-stuff'})
r.add_directory({'id': 'some-dir-sha1'})
r.add_directory({'id': 'some-dir-sha1-2'})
r.add_revision({'id': 'some-rev-sha1'})
r.add_revision({'id': 'some-rev-sha1-2'})
r.add_person('id0', {'name': 'the one'})
r.add_person('id1', {'name': 'another one'})
r.add_occurrence({'id': 'some-occ-sha1'})
r.add_release({'id': 'some-rel-sha1'})
# then
- assert r.get_origin() == {'url': 'foobar'}
- assert r.get_releases() == [{'id': 'some-rel-sha1'}]
- assert r.get_occurrences() == [{'id': 'some-occ-sha1'}]
+ self.assertEquals(r.get_origin(), {'url': 'foobar'})
+ self.assertEquals(r.get_releases(), [{'id': 'some-rel-sha1'}])
+ self.assertEquals(r.get_occurrences(), [{'id': 'some-occ-sha1'}])
for sha in ['some-con-sha1', 'some-con-sha1-2',
'some-dir-sha1', 'some-dir-sha1-2',
'some-rev-sha1', 'some-rev-sha1-2']:
- assert r.already_visited(sha) is True
+ self.assertTrue(r.already_visited(sha))
- assert r.already_visited('some-occ-sha1') is False
- assert r.already_visited('some-rel-sha1') is False
+ self.assertFalse(r.already_visited('some-occ-sha1'))
+ self.assertFalse(r.already_visited('some-rel-sha1'))
- assert r.get_contents() == {'some-con-sha1': {'id': 'some-con-sha1'},
- 'some-con-sha1-2': {'id': 'some-con-sha1-2','stuff': 'some-stuff'}}
- assert r.get_directories() == {'some-dir-sha1': {'id': 'some-dir-sha1'},
- 'some-dir-sha1-2': {'id': 'some-dir-sha1-2'}}
- assert r.get_revisions() == {'some-rev-sha1': {'id': 'some-rev-sha1'},
- 'some-rev-sha1-2': {'id': 'some-rev-sha1-2'}}
+ self.assertEquals(r.get_contents(), {'some-con-sha1': {'id': 'some-con-sha1'},
+ 'some-con-sha1-2': {'id': 'some-con-sha1-2','stuff': 'some-stuff'}})
+ self.assertEquals(r.get_directories(), {'some-dir-sha1': {'id': 'some-dir-sha1'},
+ 'some-dir-sha1-2': {'id': 'some-dir-sha1-2'}})
+ self.assertEquals(r.get_revisions(), {'some-rev-sha1': {'id': 'some-rev-sha1'},
+ 'some-rev-sha1-2': {'id': 'some-rev-sha1-2'}})
- assert len(r.get_persons()) == 2
+ self.assertEquals(len(r.get_persons()), 2)
File Metadata
Details
Attached
Mime Type
text/x-diff
Expires
Jul 4 2025, 7:51 AM (10 w, 2 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3253868
Attached To
rDLDG Git loader
Event Timeline
Log In to Comment