scrubber_db = <swh.scrubber.db.ScrubberDb object at 0x7f76550ce7f0>
swh_storage = <swh.storage.postgresql.storage.Storage object at 0x7f76550cef98>
def test_many_origins(scrubber_db, swh_storage):
scrubber_db.corrupt_object_add(
CORRUPT_OBJECT.id, CORRUPT_OBJECT.datastore, CORRUPT_OBJECT.object_
)
origins = [Origin(url=f"http://example.org/{i}") for i in range(1000)]
swh_storage.origin_add(origins)
graph = NaiveGraphClient(
nodes=[CORRUPT_OBJECT.id] + [origin.swhid() for origin in origins],
edges=[(origin.swhid(), CORRUPT_OBJECT.id) for origin in origins],
)
locator = OriginLocator(
db=scrubber_db,
graph=graph,
storage=swh_storage,
start_object=CoreSWHID.from_string("swh:1:cnt:" + "00" * 20),
end_object=CoreSWHID.from_string("swh:1:snp:" + "ff" * 20),
)
locator.run()
with scrubber_db.conn.cursor() as cur:
cur.execute("SELECT object_id, origin_url FROM object_origin")
rows = set(cur)
assert rows <= {(str(CORRUPT_OBJECT.id), origin.url) for origin in origins}
> assert len(rows) == 100
E assert 0 == 100
E +0
E -100
.tox/py3/lib/python3.7/site-packages/swh/scrubber/tests/test_origin_locator.py:170: AssertionError
TEST RESULT
TEST RESULT
- Run At
- Mar 25 2022, 3:28 PM