Changeset View
Changeset View
Standalone View
Standalone View
swh/provenance/tests/test_provenance_db.py
# Copyright (C) 2021 The Software Heritage developers | # Copyright (C) 2021 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
import datetime | import datetime | ||||
from swh.model.tests.swh_model_data import TEST_OBJECTS | from swh.model.tests.swh_model_data import TEST_OBJECTS | ||||
from swh.provenance.origin import OriginEntry | from swh.provenance.origin import OriginEntry | ||||
from swh.provenance.provenance import origin_add, revision_add | from swh.provenance.provenance import origin_add, revision_add | ||||
from swh.provenance.revision import RevisionEntry | from swh.provenance.revision import RevisionEntry | ||||
from swh.provenance.tests.conftest import synthetic_result | |||||
def ts2dt(ts: dict) -> datetime.datetime: | def ts2dt(ts: dict) -> datetime.datetime: | ||||
timestamp = datetime.datetime.fromtimestamp( | timestamp = datetime.datetime.fromtimestamp( | ||||
ts["timestamp"]["seconds"], | ts["timestamp"]["seconds"], | ||||
datetime.timezone(datetime.timedelta(minutes=ts["offset"])), | datetime.timezone(datetime.timedelta(minutes=ts["offset"])), | ||||
) | ) | ||||
return timestamp.replace(microsecond=ts["timestamp"]["microseconds"]) | return timestamp.replace(microsecond=ts["timestamp"]["microseconds"]) | ||||
▲ Show 20 Lines • Show All 136 Lines • ▼ Show 20 Lines | for expected in first_expected_content: | ||||
(blob, rev, date, path) = provenance.content_find_first(contentid) | (blob, rev, date, path) = provenance.content_find_first(contentid) | ||||
if isinstance(expected["path"], tuple): | if isinstance(expected["path"], tuple): | ||||
assert bytes(path).decode() in expected["path"] | assert bytes(path).decode() in expected["path"] | ||||
else: | else: | ||||
assert bytes(path).decode() == expected["path"] | assert bytes(path).decode() == expected["path"] | ||||
assert bytes(blob) == contentid | assert bytes(blob) == contentid | ||||
assert bytes(rev).hex() == expected["rev"] | assert bytes(rev).hex() == expected["rev"] | ||||
assert int(date.timestamp()) == expected["date"] | assert int(date.timestamp()) == expected["date"] | ||||
def test_provenance_db(provenance, storage_and_CMDBTS, archive_pg): | |||||
storage, data = storage_and_CMDBTS | |||||
revisions = {rev["id"]: rev for rev in data["revision"]} | |||||
rows = { | |||||
"content": set(), | |||||
"content_in_dir": set(), | |||||
"content_early_in_rev": set(), | |||||
"directory": set(), | |||||
"directory_in_rev": set(), | |||||
"location": set(), | |||||
"revision": set(), | |||||
} | |||||
def db_count(table): | |||||
provenance.cursor.execute(f"SELECT count(*) FROM {table}") | |||||
return provenance.cursor.fetchone()[0] | |||||
for synth_rev in synthetic_result("synthetic_noroot_lower.txt"): | |||||
revision = revisions[synth_rev["sha1"]] | |||||
entry = RevisionEntry( | |||||
archive_pg, | |||||
id=revision["id"], | |||||
date=ts2dt(revision["date"]), | |||||
root=revision["directory"], | |||||
parents=revision["parents"], | |||||
) | |||||
revision_add(provenance, archive_pg, entry) | |||||
# each "entry" in the synth file is one new revision | |||||
rows["revision"].add(synth_rev["sha1"]) | |||||
assert len(rows["revision"]) == db_count("revision") | |||||
# this revision might have added new content objects | |||||
rows["content"] |= set(x["dst"] for x in synth_rev["R_C"]) | |||||
rows["content"] |= set(x["dst"] for x in synth_rev["D_C"]) | |||||
assert len(rows["content"]) == db_count("content") | |||||
# check for R-C (direct) entries | |||||
rows["content_early_in_rev"] |= set( | |||||
(x["src"], x["dst"], x["path"]) for x in synth_rev["R_C"] | |||||
) | |||||
assert len(rows["content_early_in_rev"]) == db_count("content_early_in_rev") | |||||
# check directories | |||||
rows["directory"] |= set(x["dst"] for x in synth_rev["R_D"]) | |||||
assert len(rows["directory"]) == db_count("directory") | |||||
# check for R-D entries | |||||
rows["directory_in_rev"] |= set( | |||||
(x["src"], x["dst"], x["path"]) for x in synth_rev["R_D"] | |||||
) | |||||
assert len(rows["directory_in_rev"]) == db_count("directory_in_rev") | |||||
# check for D-C entries | |||||
rows["content_in_dir"] |= set( | |||||
(x["src"], x["dst"], x["path"]) for x in synth_rev["D_C"] | |||||
) | |||||
assert len(rows["content_in_dir"]) == db_count("content_in_dir") | |||||
# check for location entries | |||||
rows["location"] |= set(x["path"] for x in synth_rev["R_C"]) | |||||
rows["location"] |= set(x["path"] for x in synth_rev["D_C"]) | |||||
rows["location"] |= set(x["path"] for x in synth_rev["R_D"]) | |||||
assert len(rows["location"]) == db_count("location") |