Changeset View
Changeset View
Standalone View
Standalone View
swh/storage/tests/test_storage.py
Show First 20 Lines • Show All 947 Lines • ▼ Show 20 Lines | def test_revision_get_order(self, swh_storage, sample_data_model): | ||||
assert [Revision.from_dict(r) for r in res1] == [revision, revision2] | assert [Revision.from_dict(r) for r in res1] == [revision, revision2] | ||||
# order 2 | # order 2 | ||||
res2 = swh_storage.revision_get([revision2.id, revision.id]) | res2 = swh_storage.revision_get([revision2.id, revision.id]) | ||||
assert [Revision.from_dict(r) for r in res2] == [revision2, revision] | assert [Revision.from_dict(r) for r in res2] == [revision2, revision] | ||||
def test_revision_log(self, swh_storage, sample_data_model): | def test_revision_log(self, swh_storage, sample_data_model): | ||||
revision3, revision4 = sample_data_model["revision"][2:4] | revision1, revision2, revision3, revision4 = sample_data_model["revision"][:4] | ||||
# data.revision4 -is-child-of-> data.revision3 | # rev4 -is-child-of-> rev3 -> rev1, (rev2 -> rev1) | ||||
swh_storage.revision_add([revision3, revision4]) | swh_storage.revision_add([revision1, revision2, revision3, revision4]) | ||||
# when | # when | ||||
results = list(swh_storage.revision_log([revision4.id])) | results = list(swh_storage.revision_log([revision4.id])) | ||||
# for comparison purposes | # for comparison purposes | ||||
actual_results = [Revision.from_dict(r) for r in results] | actual_results = [Revision.from_dict(r) for r in results] | ||||
assert len(actual_results) == 2 # rev4 -child-> rev3 | assert len(actual_results) == 4 # rev4 -child-> rev3 -> rev1, (rev2 -> rev1) | ||||
assert actual_results == [revision4, revision3] | assert actual_results == [revision4, revision3, revision1, revision2] | ||||
assert list(swh_storage.journal_writer.journal.objects) == [ | |||||
("revision", revision3), | |||||
("revision", revision4), | |||||
] | |||||
def test_revision_log_with_limit(self, swh_storage, sample_data_model): | def test_revision_log_with_limit(self, swh_storage, sample_data_model): | ||||
revision3, revision4 = sample_data_model["revision"][2:4] | revision1, revision2, revision3, revision4 = sample_data_model["revision"][:4] | ||||
# data.revision4 -is-child-of-> data.revision3 | # revision4 -is-child-of-> revision3 | ||||
swh_storage.revision_add([revision3, revision4]) | swh_storage.revision_add([revision3, revision4]) | ||||
results = list(swh_storage.revision_log([revision4.id], 1)) | results = list(swh_storage.revision_log([revision4.id], 1)) | ||||
actual_results = [Revision.from_dict(r) for r in results] | actual_results = [Revision.from_dict(r) for r in results] | ||||
assert len(actual_results) == 1 | assert len(actual_results) == 1 | ||||
assert actual_results[0] == revision4 | assert actual_results[0] == revision4 | ||||
def test_revision_log_unknown_revision(self, swh_storage, sample_data_model): | def test_revision_log_unknown_revision(self, swh_storage, sample_data_model): | ||||
revision = sample_data_model["revision"][0] | revision = sample_data_model["revision"][0] | ||||
rev_log = list(swh_storage.revision_log([revision.id])) | rev_log = list(swh_storage.revision_log([revision.id])) | ||||
assert rev_log == [] | assert rev_log == [] | ||||
def test_revision_shortlog(self, swh_storage, sample_data_model): | def test_revision_shortlog(self, swh_storage, sample_data_model): | ||||
revision3, revision4 = sample_data_model["revision"][2:4] | revision1, revision2, revision3, revision4 = sample_data_model["revision"][:4] | ||||
# data.revision4 -is-child-of-> data.revision3 | # rev4 -is-child-of-> rev3 -> (rev1, rev2); rev2 -> rev1 | ||||
swh_storage.revision_add([revision3, revision4]) | swh_storage.revision_add([revision1, revision2, revision3, revision4]) | ||||
# when | |||||
results = list(swh_storage.revision_shortlog([revision4.id])) | results = list(swh_storage.revision_shortlog([revision4.id])) | ||||
actual_results = [[id, tuple(parents)] for (id, parents) in results] | actual_results = [[id, tuple(parents)] for (id, parents) in results] | ||||
assert len(actual_results) == 2 # rev4 -child-> rev3 | assert len(actual_results) == 4 | ||||
assert list(actual_results[0]) == [revision4.id, revision4.parents] | assert actual_results == [ | ||||
assert list(actual_results[1]) == [revision3.id, revision3.parents] | [revision4.id, revision4.parents], | ||||
[revision3.id, revision3.parents], | |||||
[revision1.id, revision1.parents], | |||||
[revision2.id, revision2.parents], | |||||
] | |||||
def test_revision_shortlog_with_limit(self, swh_storage, sample_data_model): | def test_revision_shortlog_with_limit(self, swh_storage, sample_data_model): | ||||
revision3, revision4 = sample_data_model["revision"][2:4] | revision1, revision2, revision3, revision4 = sample_data_model["revision"][:4] | ||||
# data.revision4 -is-child-of-> data.revision3 | # revision4 -is-child-of-> revision3 | ||||
swh_storage.revision_add([revision3, revision4]) | swh_storage.revision_add([revision1, revision2, revision3, revision4]) | ||||
results = list(swh_storage.revision_shortlog([revision4.id], 1)) | results = list(swh_storage.revision_shortlog([revision4.id], 1)) | ||||
actual_results = [[id, tuple(parents)] for (id, parents) in results] | actual_results = [[id, tuple(parents)] for (id, parents) in results] | ||||
assert len(actual_results) == 1 | assert len(actual_results) == 1 | ||||
assert list(actual_results[0]) == [revision4.id, revision4.parents] | assert list(actual_results[0]) == [revision4.id, revision4.parents] | ||||
def test_revision_get(self, swh_storage, sample_data_model): | def test_revision_get(self, swh_storage, sample_data_model): | ||||
revision, revision2 = sample_data_model["revision"][:2] | revision, revision2 = sample_data_model["revision"][:2] | ||||
swh_storage.revision_add([revision]) | swh_storage.revision_add([revision]) | ||||
actual_revisions = list(swh_storage.revision_get([revision.id, revision2.id])) | actual_revisions = list(swh_storage.revision_get([revision.id, revision2.id])) | ||||
assert len(actual_revisions) == 2 | assert len(actual_revisions) == 2 | ||||
assert Revision.from_dict(actual_revisions[0]) == revision | assert Revision.from_dict(actual_revisions[0]) == revision | ||||
assert actual_revisions[1] is None | assert actual_revisions[1] is None | ||||
def test_revision_get_no_parents(self, swh_storage, sample_data_model): | def test_revision_get_no_parents(self, swh_storage, sample_data_model): | ||||
revision = sample_data_model["revision"][2] | revision = sample_data_model["revision"][0] | ||||
swh_storage.revision_add([revision]) | swh_storage.revision_add([revision]) | ||||
get = list(swh_storage.revision_get([revision.id])) | get = list(swh_storage.revision_get([revision.id])) | ||||
assert len(get) == 1 | assert len(get) == 1 | ||||
assert revision.parents == () | |||||
assert tuple(get[0]["parents"]) == () # no parents on this one | assert tuple(get[0]["parents"]) == () # no parents on this one | ||||
def test_revision_get_random(self, swh_storage, sample_data_model): | def test_revision_get_random(self, swh_storage, sample_data_model): | ||||
revision1, revision2, revision3 = sample_data_model["revision"][:3] | revision1, revision2, revision3 = sample_data_model["revision"][:3] | ||||
swh_storage.revision_add([revision1, revision2, revision3]) | swh_storage.revision_add([revision1, revision2, revision3]) | ||||
assert swh_storage.revision_get_random() in { | assert swh_storage.revision_get_random() in { | ||||
▲ Show 20 Lines • Show All 3,102 Lines • Show Last 20 Lines |