Changeset View
Changeset View
Standalone View
Standalone View
swh/model/tests/test_merkle.py
# Copyright (C) 2017-2020 The Software Heritage developers | # Copyright (C) 2017-2022 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
import unittest | import unittest | ||||
from swh.model import merkle | from swh.model import merkle | ||||
class MerkleTestNode(merkle.MerkleNode): | class MerkleTestNode(merkle.MerkleNode): | ||||
object_type = "tested_merkle_node_type" | object_type = "tested_merkle_node_type" | ||||
def __init__(self, data): | def __init__(self, data): | ||||
super().__init__(data) | super().__init__(data) | ||||
self.compute_hash_called = 0 | self.compute_hash_called = 0 | ||||
def compute_hash(self): | def compute_hash(self) -> bytes: | ||||
self.compute_hash_called += 1 | self.compute_hash_called += 1 | ||||
child_data = [child + b"=" + self[child].hash for child in sorted(self)] | child_data = [child + b"=" + self[child].hash for child in sorted(self)] | ||||
return b"hash(" + b", ".join([self.data.get("value", b"")] + child_data) + b")" | |||||
return b"hash(" + b", ".join([self.data["value"]] + child_data) + b")" | |||||
class MerkleTestLeaf(merkle.MerkleLeaf): | class MerkleTestLeaf(merkle.MerkleLeaf): | ||||
object_type = "tested_merkle_leaf_type" | object_type = "tested_merkle_leaf_type" | ||||
def __init__(self, data): | def __init__(self, data): | ||||
super().__init__(data) | super().__init__(data) | ||||
self.compute_hash_called = 0 | self.compute_hash_called = 0 | ||||
def compute_hash(self): | def compute_hash(self): | ||||
self.compute_hash_called += 1 | self.compute_hash_called += 1 | ||||
return b"hash(" + self.data["value"] + b")" | return b"hash(" + self.data.get("value", b"") + b")" | ||||
class TestMerkleLeaf(unittest.TestCase): | class TestMerkleLeaf(unittest.TestCase): | ||||
def setUp(self): | def setUp(self): | ||||
self.data = {"value": b"value"} | self.data = {"value": b"value"} | ||||
self.instance = MerkleTestLeaf(self.data) | self.instance = MerkleTestLeaf(self.data) | ||||
def test_equality(self): | def test_equality(self): | ||||
Show All 14 Lines | class TestMerkleLeaf(unittest.TestCase): | ||||
def test_data(self): | def test_data(self): | ||||
self.assertEqual(self.instance.get_data(), self.data) | self.assertEqual(self.instance.get_data(), self.data) | ||||
def test_collect(self): | def test_collect(self): | ||||
collected = self.instance.collect() | collected = self.instance.collect() | ||||
self.assertEqual( | self.assertEqual( | ||||
collected, | collected, | ||||
{ | {self.instance}, | ||||
self.instance.object_type: { | |||||
self.instance.hash: self.instance.get_data(), | |||||
}, | |||||
}, | |||||
) | ) | ||||
collected2 = self.instance.collect() | collected2 = self.instance.collect() | ||||
self.assertEqual(collected2, {}) | self.assertEqual(collected2, set()) | ||||
self.instance.reset_collect() | self.instance.reset_collect() | ||||
collected3 = self.instance.collect() | collected3 = self.instance.collect() | ||||
self.assertEqual(collected, collected3) | self.assertEqual(collected, collected3) | ||||
def test_leaf(self): | def test_leaf(self): | ||||
with self.assertRaisesRegex(ValueError, "is a leaf"): | with self.assertRaisesRegex(ValueError, "is a leaf"): | ||||
self.instance[b"key1"] = "Test" | self.instance[b"key1"] = "Test" | ||||
Show All 37 Lines | def setUp(self): | ||||
{ | { | ||||
"value": value3, | "value": value3, | ||||
} | } | ||||
) | ) | ||||
node2[j] = node3 | node2[j] = node3 | ||||
self.nodes[value3] = node3 | self.nodes[value3] = node3 | ||||
def test_equality(self): | def test_equality(self): | ||||
node1 = merkle.MerkleNode({"foo": b"bar"}) | node1 = MerkleTestNode({"value": b"bar"}) | ||||
node2 = merkle.MerkleNode({"foo": b"bar"}) | node2 = MerkleTestNode({"value": b"bar"}) | ||||
node3 = merkle.MerkleNode({}) | node3 = MerkleTestNode({}) | ||||
self.assertEqual(node1, node2) | self.assertEqual(node1, node2) | ||||
self.assertNotEqual(node1, node3, node1 == node3) | self.assertNotEqual(node1, node3, node1 == node3) | ||||
node1["foo"] = node3 | node1[b"a"] = node3 | ||||
self.assertNotEqual(node1, node2) | self.assertNotEqual(node1, node2) | ||||
node2["foo"] = node3 | node2[b"a"] = node3 | ||||
self.assertEqual(node1, node2) | self.assertEqual(node1, node2) | ||||
def test_hash(self): | def test_hash(self): | ||||
for node in self.nodes.values(): | for node in self.nodes.values(): | ||||
self.assertEqual(node.compute_hash_called, 0) | self.assertEqual(node.compute_hash_called, 0) | ||||
# Root hash will compute hash for all the nodes | # Root hash will compute hash for all the nodes | ||||
hash = self.root.hash | hash = self.root.hash | ||||
Show All 28 Lines | def test_hash(self): | ||||
# update_hash also invalidates all parents | # update_hash also invalidates all parents | ||||
if key in (b"root", b"root/a") or key.startswith(b"root/a/b"): | if key in (b"root", b"root/a") or key.startswith(b"root/a/b"): | ||||
self.assertEqual(node.compute_hash_called, 2) | self.assertEqual(node.compute_hash_called, 2) | ||||
else: | else: | ||||
self.assertEqual(node.compute_hash_called, 1) | self.assertEqual(node.compute_hash_called, 1) | ||||
def test_collect(self): | def test_collect(self): | ||||
collected = self.root.collect() | collected = self.root.collect() | ||||
self.assertEqual(len(collected[self.root.object_type]), len(self.nodes)) | self.assertEqual(collected, set(self.nodes.values())) | ||||
olasd: When we implement `__eq__` on `MerkleNode`, this might work. | |||||
for node in self.nodes.values(): | for node in self.nodes.values(): | ||||
self.assertTrue(node.collected) | self.assertTrue(node.collected) | ||||
collected2 = self.root.collect() | collected2 = self.root.collect() | ||||
self.assertEqual(collected2, {}) | self.assertEqual(collected2, set()) | ||||
def test_iter_tree_with_deduplication(self): | def test_iter_tree_with_deduplication(self): | ||||
nodes = list(self.root.iter_tree()) | nodes = list(self.root.iter_tree()) | ||||
self.assertCountEqual(nodes, self.nodes.values()) | self.assertCountEqual(nodes, self.nodes.values()) | ||||
def test_iter_tree_without_deduplication(self): | def test_iter_tree_without_deduplication(self): | ||||
# duplicate existing hash in merkle tree | # duplicate existing hash in merkle tree | ||||
self.root[b"d"] = MerkleTestNode({"value": b"root/c/c/c"}) | self.root[b"d"] = MerkleTestNode({"value": b"root/c/c/c"}) | ||||
▲ Show 20 Lines • Show All 52 Lines • ▼ Show 20 Lines | def test_update(self): | ||||
for key, node in self.nodes.items(): | for key, node in self.nodes.items(): | ||||
if key in (b"root", b"root/b"): | if key in (b"root", b"root/b"): | ||||
self.assertEqual(node.compute_hash_called, 2) | self.assertEqual(node.compute_hash_called, 2) | ||||
else: | else: | ||||
self.assertEqual(node.compute_hash_called, 1) | self.assertEqual(node.compute_hash_called, 1) | ||||
# Ensure we collected root, root/b, and both new children | # Ensure we collected root, root/b, and both new children | ||||
collected_after_update = self.root.collect() | collected_after_update = self.root.collect() | ||||
self.assertCountEqual( | self.assertEqual( | ||||
Not Done Inline Actionscan now be an assertEqual olasd: can now be an `assertEqual` | |||||
collected_after_update[MerkleTestNode.object_type], | collected_after_update, | ||||
[ | { | ||||
self.nodes[b"root"].hash, | self.nodes[b"root"], | ||||
self.nodes[b"root/b"].hash, | self.nodes[b"root/b"], | ||||
new_children[b"c"].hash, | new_children[b"c"], | ||||
new_children[b"d"].hash, | new_children[b"d"], | ||||
], | }, | ||||
) | ) | ||||
# test that noop updates doesn't invalidate anything | # test that noop updates doesn't invalidate anything | ||||
self.root[b"a"][b"b"].update({}) | self.root[b"a"][b"b"].update({}) | ||||
self.assertEqual(self.root.collect(), {}) | self.assertEqual(self.root.collect(), set()) |
When we implement __eq__ on MerkleNode, this might work.