Changeset View
Changeset View
Standalone View
Standalone View
swh/storage/tests/test_storage.py
Show First 20 Lines • Show All 1,412 Lines • ▼ Show 20 Lines | def test_snapshot_get_latest(self): | ||||
origin_id = self.storage.origin_add_one(self.origin) | origin_id = self.storage.origin_add_one(self.origin) | ||||
origin_visit1 = self.storage.origin_visit_add(origin_id, | origin_visit1 = self.storage.origin_visit_add(origin_id, | ||||
self.date_visit1) | self.date_visit1) | ||||
visit1_id = origin_visit1['visit'] | visit1_id = origin_visit1['visit'] | ||||
origin_visit2 = self.storage.origin_visit_add(origin_id, | origin_visit2 = self.storage.origin_visit_add(origin_id, | ||||
self.date_visit2) | self.date_visit2) | ||||
visit2_id = origin_visit2['visit'] | visit2_id = origin_visit2['visit'] | ||||
# Add a visit with the same date as the previous one | |||||
origin_visit3 = self.storage.origin_visit_add(origin_id, | |||||
self.date_visit2) | |||||
visit3_id = origin_visit3['visit'] | |||||
# Two visits, both with no snapshot: latest snapshot is None | # Two visits, both with no snapshot: latest snapshot is None | ||||
self.assertIsNone(self.storage.snapshot_get_latest(origin_id)) | self.assertIsNone(self.storage.snapshot_get_latest(origin_id)) | ||||
# Add snapshot to visit1, latest snapshot = visit 1 snapshot | # Add snapshot to visit1, latest snapshot = visit 1 snapshot | ||||
self.storage.snapshot_add(origin_id, visit1_id, self.complete_snapshot) | self.storage.snapshot_add(origin_id, visit1_id, self.complete_snapshot) | ||||
self.assertEqual(self.complete_snapshot, | self.assertEqual(self.complete_snapshot, | ||||
self.storage.snapshot_get_latest(origin_id)) | self.storage.snapshot_get_latest(origin_id)) | ||||
Show All 19 Lines | def test_snapshot_get_latest(self): | ||||
# Check that the status filter is still working | # Check that the status filter is still working | ||||
self.assertEqual( | self.assertEqual( | ||||
self.complete_snapshot, | self.complete_snapshot, | ||||
self.storage.snapshot_get_latest(origin_id, | self.storage.snapshot_get_latest(origin_id, | ||||
allowed_statuses=['full']), | allowed_statuses=['full']), | ||||
) | ) | ||||
# Add snapshot to visit3 (same date as visit2) and check that | |||||
# the new snapshot is returned | |||||
self.storage.snapshot_add(origin_id, visit3_id, self.complete_snapshot) | |||||
self.assertEqual(self.complete_snapshot, | |||||
self.storage.snapshot_get_latest(origin_id)) | |||||
def test_stat_counters(self): | def test_stat_counters(self): | ||||
expected_keys = ['content', 'directory', | expected_keys = ['content', 'directory', | ||||
'origin', 'person', 'revision'] | 'origin', 'person', 'revision'] | ||||
# Initially, all counters are 0 | # Initially, all counters are 0 | ||||
self.storage.refresh_stat_counters() | self.storage.refresh_stat_counters() | ||||
counters = self.storage.stat_counters() | counters = self.storage.stat_counters() | ||||
▲ Show 20 Lines • Show All 750 Lines • Show Last 20 Lines |