Page Menu
Home
Software Heritage
Search
Configure Global Search
Log In
Files
F7124412
D3565.id12566.diff
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
10 KB
Subscribers
None
D3565.id12566.diff
View Options
diff --git a/swh/storage/tests/test_storage.py b/swh/storage/tests/test_storage.py
--- a/swh/storage/tests/test_storage.py
+++ b/swh/storage/tests/test_storage.py
@@ -58,12 +58,9 @@
yield db, cur
-def normalize_entity(obj: Union[Dict, Revision]) -> Dict:
+def normalize_entity(obj: Union[Revision, Release]) -> Dict:
"""Normalize entity model object (revision, release)"""
- if isinstance(obj, Revision):
- entity = obj.to_dict()
- else:
- entity = obj
+ entity = obj.to_dict()
for key in ("date", "committer_date"):
if key in entity:
entity[key] = identifiers.normalize_timestamp(entity[key])
@@ -1151,66 +1148,66 @@
revision3.id,
}
- def test_release_add(self, swh_storage):
- init_missing = swh_storage.release_missing(
- [data.release["id"], data.release2["id"]]
- )
- assert [data.release["id"], data.release2["id"]] == list(init_missing)
+ def test_release_add(self, swh_storage, sample_data_model):
+ release, release2 = sample_data_model["release"][:2]
+
+ init_missing = swh_storage.release_missing([release.id, release2.id])
+ assert list(init_missing) == [release.id, release2.id]
- actual_result = swh_storage.release_add([data.release, data.release2])
+ actual_result = swh_storage.release_add([release, release2])
assert actual_result == {"release:add": 2}
- end_missing = swh_storage.release_missing(
- [data.release["id"], data.release2["id"]]
- )
+ end_missing = swh_storage.release_missing([release.id, release2.id])
assert list(end_missing) == []
assert list(swh_storage.journal_writer.journal.objects) == [
- ("release", Release.from_dict(data.release)),
- ("release", Release.from_dict(data.release2)),
+ ("release", release),
+ ("release", release2),
]
# already present so nothing added
- actual_result = swh_storage.release_add([data.release, data.release2])
+ actual_result = swh_storage.release_add([release, release2])
assert actual_result == {"release:add": 0}
swh_storage.refresh_stat_counters()
assert swh_storage.stat_counters()["release"] == 2
- def test_release_add_from_generator(self, swh_storage):
+ def test_release_add_from_generator(self, swh_storage, sample_data_model):
+ release, release2 = sample_data_model["release"][:2]
+
def _rel_gen():
- yield data.release
- yield data.release2
+ yield release
+ yield release2
actual_result = swh_storage.release_add(_rel_gen())
assert actual_result == {"release:add": 2}
assert list(swh_storage.journal_writer.journal.objects) == [
- ("release", Release.from_dict(data.release)),
- ("release", Release.from_dict(data.release2)),
+ ("release", release),
+ ("release", release2),
]
swh_storage.refresh_stat_counters()
assert swh_storage.stat_counters()["release"] == 2
- def test_release_add_no_author_date(self, swh_storage):
- release = data.release
-
- release["author"] = None
- release["date"] = None
+ def test_release_add_no_author_date(self, swh_storage, sample_data_model):
+ full_release = sample_data_model["release"][0]
+ release = attr.evolve(full_release, author=None, date=None)
actual_result = swh_storage.release_add([release])
assert actual_result == {"release:add": 1}
- end_missing = swh_storage.release_missing([data.release["id"]])
+ end_missing = swh_storage.release_missing([release.id])
assert list(end_missing) == []
assert list(swh_storage.journal_writer.journal.objects) == [
- ("release", Release.from_dict(release))
+ ("release", release)
]
- def test_release_add_validation(self, swh_storage):
- rel = copy.deepcopy(data.release)
+ def test_release_add_validation(self, swh_storage, sample_data_model):
+ release = sample_data_model["release"][0]
+
+ rel = release.to_dict()
rel["date"]["offset"] = 2 ** 16
with pytest.raises(StorageArgumentException, match="offset") as cm:
@@ -1219,7 +1216,7 @@
if type(cm.value) == psycopg2.DataError:
assert cm.value.pgcode == psycopg2.errorcodes.NUMERIC_VALUE_OUT_OF_RANGE
- rel = copy.deepcopy(data.release)
+ rel = release.to_dict()
rel["author"] = None
with pytest.raises(StorageArgumentException, match="date") as cm:
@@ -1228,92 +1225,91 @@
if type(cm.value) == psycopg2.IntegrityError:
assert cm.value.pgcode == psycopg2.errorcodes.CHECK_VIOLATION
- def test_release_add_validation_type(self, swh_storage):
- rel = copy.deepcopy(data.release)
-
+ def test_release_add_validation_type(self, swh_storage, sample_data_model):
+ release = sample_data_model["release"][0]
+ rel = release.to_dict()
rel["date"]["offset"] = "toto"
with pytest.raises(StorageArgumentException):
swh_storage.release_add([rel])
- def test_release_add_twice(self, swh_storage):
- actual_result = swh_storage.release_add([data.release])
+ def test_release_add_twice(self, swh_storage, sample_data_model):
+ release, release2 = sample_data_model["release"][:2]
+
+ actual_result = swh_storage.release_add([release])
assert actual_result == {"release:add": 1}
assert list(swh_storage.journal_writer.journal.objects) == [
- ("release", Release.from_dict(data.release))
+ ("release", release)
]
- actual_result = swh_storage.release_add(
- [data.release, data.release2, data.release, data.release2]
- )
+ actual_result = swh_storage.release_add([release, release2, release, release2])
assert actual_result == {"release:add": 1}
assert set(swh_storage.journal_writer.journal.objects) == set(
- [
- ("release", Release.from_dict(data.release)),
- ("release", Release.from_dict(data.release2)),
- ]
+ [("release", release), ("release", release2),]
)
- def test_release_add_name_clash(self, swh_storage):
- release1 = data.release.copy()
- release2 = data.release2.copy()
+ def test_release_add_name_clash(self, swh_storage, sample_data_model):
+ release, release2 = [
+ attr.evolve(
+ c,
+ author=Person(
+ fullname=b"John Doe <john.doe@example.com>",
+ name=b"John Doe",
+ email=b"john.doe@example.com",
+ ),
+ )
+ for c in sample_data_model["release"][:2]
+ ]
- release1["author"] = {
- "fullname": b"John Doe <john.doe@example.com>",
- "name": b"John Doe",
- "email": b"john.doe@example.com",
- }
- release2["author"] = {
- "fullname": b"John Doe <john.doe@example.com>",
- "name": b"John Doe ",
- "email": b"john.doe@example.com ",
- }
- actual_result = swh_storage.release_add([release1, release2])
+ actual_result = swh_storage.release_add([release, release2])
assert actual_result == {"release:add": 2}
- def test_release_get(self, swh_storage):
+ def test_release_get(self, swh_storage, sample_data_model):
+ release, release2, release3 = sample_data_model["release"][:3]
+
# given
- swh_storage.release_add([data.release, data.release2])
+ swh_storage.release_add([release, release2])
# when
- actual_releases = list(
- swh_storage.release_get([data.release["id"], data.release2["id"]])
- )
+ actual_releases = list(swh_storage.release_get([release.id, release2.id]))
# then
for actual_release in actual_releases:
if "id" in actual_release["author"]:
del actual_release["author"]["id"] # hack: ids are generated
- assert [normalize_entity(data.release), normalize_entity(data.release2)] == [
- actual_releases[0],
- actual_releases[1],
+ assert [actual_releases[0], actual_releases[1],] == [
+ normalize_entity(release),
+ normalize_entity(release2),
]
- unknown_releases = list(swh_storage.release_get([data.release3["id"]]))
-
+ unknown_releases = list(swh_storage.release_get([release3.id]))
assert unknown_releases[0] is None
- def test_release_get_order(self, swh_storage):
- add_result = swh_storage.release_add([data.release, data.release2])
+ def test_release_get_order(self, swh_storage, sample_data_model):
+ release, release2 = sample_data_model["release"][:2]
+
+ add_result = swh_storage.release_add([release, release2])
assert add_result == {"release:add": 2}
# order 1
- res1 = swh_storage.release_get([data.release["id"], data.release2["id"]])
- assert list(res1) == [data.release, data.release2]
+ res1 = swh_storage.release_get([release.id, release2.id])
+ assert list(res1) == [release.to_dict(), release2.to_dict()]
# order 2
- res2 = swh_storage.release_get([data.release2["id"], data.release["id"]])
- assert list(res2) == [data.release2, data.release]
+ res2 = swh_storage.release_get([release2.id, release.id])
+ assert list(res2) == [release2.to_dict(), release.to_dict()]
+
+ def test_release_get_random(self, swh_storage, sample_data_model):
+ release, release2, release3 = sample_data_model["release"][:3]
- def test_release_get_random(self, swh_storage):
- swh_storage.release_add([data.release, data.release2, data.release3])
+ swh_storage.release_add([release, release2, release3])
assert swh_storage.release_get_random() in {
- data.release["id"],
- data.release2["id"],
- data.release3["id"],
+ release.id,
+ release2.id,
+ release3.id,
}
def test_origin_add_one(self, swh_storage):
File Metadata
Details
Attached
Mime Type
text/plain
Expires
Dec 21 2024, 10:37 AM (11 w, 4 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3227543
Attached To
D3565: test_storage: release: Use data model object
Event Timeline
Log In to Comment