Page Menu
Home
Software Heritage
Search
Configure Global Search
Log In
Files
F7123291
D3627.diff
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
22 KB
Subscribers
None
D3627.diff
View Options
diff --git a/requirements-swh.txt b/requirements-swh.txt
--- a/requirements-swh.txt
+++ b/requirements-swh.txt
@@ -1,3 +1,3 @@
-swh.core[db,http] >= 0.1.0
+swh.core[db,http] >= 0.2.0
swh.model >= 0.4.0
swh.objstorage >= 0.0.40
diff --git a/swh/storage/cassandra/storage.py b/swh/storage/cassandra/storage.py
--- a/swh/storage/cassandra/storage.py
+++ b/swh/storage/cassandra/storage.py
@@ -32,6 +32,7 @@
MetadataTargetType,
RawExtrinsicMetadata,
)
+from swh.storage.interface import PagedResult
from swh.storage.objstorage import ObjStorage
from swh.storage.writer import JournalWriter
from swh.storage.utils import map_optional, now
@@ -844,15 +845,39 @@
def origin_visit_get(
self,
origin: str,
- last_visit: Optional[int] = None,
- limit: Optional[int] = None,
+ page_token: Optional[str] = None,
order: str = "asc",
- ) -> Iterable[Dict[str, Any]]:
- rows = self._cql_runner.origin_visit_get(origin, last_visit, limit, order)
+ limit: int = 10,
+ ) -> PagedResult[OriginVisit]:
+ order = order.lower()
+ allowed_orders = ["asc", "desc"]
+ if order not in allowed_orders:
+ raise StorageArgumentException(
+ f"order must be one of {', '.join(allowed_orders)}."
+ )
+ if page_token and not isinstance(page_token, str):
+ raise StorageArgumentException("page_token must be a string.")
+
+ next_page_token = None
+ visit_from = page_token and int(page_token)
+ visits: List[OriginVisit] = []
+ extra_limit = limit + 1
+ rows = self._cql_runner.origin_visit_get(origin, visit_from, extra_limit, order)
for row in rows:
- visit = self._format_origin_visit_row(row)
- yield self._origin_visit_apply_last_status(visit)
+ visits.append(converters.row_to_visit(row))
+
+ assert len(visits) <= extra_limit
+ if len(visits) == extra_limit:
+ last_visit = visits[limit]
+ visits = visits[:limit]
+ assert last_visit is not None and last_visit.visit is not None
+ if order == "asc":
+ next_page_token = str(last_visit.visit - 1)
+ else:
+ next_page_token = str(last_visit.visit + 1)
+
+ return PagedResult(results=visits, next_page_token=next_page_token)
def origin_visit_find_by_date(
self, origin: str, visit_date: datetime.datetime
diff --git a/swh/storage/db.py b/swh/storage/db.py
--- a/swh/storage/db.py
+++ b/swh/storage/db.py
@@ -481,6 +481,8 @@
+ [jsonize(visit_status.metadata)],
)
+ origin_visit_cols = ["origin", "visit", "date", "type"]
+
def origin_visit_add_with_id(self, origin_visit: OriginVisit, cur=None) -> None:
"""Insert origin visit when id are already set
@@ -488,12 +490,11 @@
ov = origin_visit
assert ov.visit is not None
cur = self._cursor(cur)
- origin_visit_cols = ["origin", "visit", "date", "type"]
query = """INSERT INTO origin_visit ({cols})
VALUES ((select id from origin where url=%s), {values})
ON CONFLICT (origin, visit) DO NOTHING""".format(
- cols=", ".join(origin_visit_cols),
- values=", ".join("%s" for col in origin_visit_cols[1:]),
+ cols=", ".join(self.origin_visit_cols),
+ values=", ".join("%s" for col in self.origin_visit_cols[1:]),
)
cur.execute(query, (ov.origin, ov.visit, ov.date, ov.type))
@@ -618,6 +619,37 @@
cur.execute(query, tuple(query_params))
yield from cur
+ def origin_visit_get_range(
+ self, origin: str, visit_from: int, order: str, limit: int, cur=None,
+ ):
+ assert order in ["asc", "desc"]
+ cur = self._cursor(cur)
+
+ origin_visit_cols = ["o.url as origin", "ov.visit", "ov.date", "ov.type"]
+ query_parts = [
+ f"SELECT {', '.join(origin_visit_cols)} FROM origin_visit ov ",
+ "INNER JOIN origin o ON o.id = ov.origin ",
+ ]
+ query_parts.append("WHERE o.url = %s")
+ query_params: List[Any] = [origin]
+
+ if visit_from > 0:
+ op_comparison = ">" if order == "asc" else "<"
+ query_parts.append(f"and ov.visit {op_comparison} %s")
+ query_params.append(visit_from)
+
+ if order == "asc":
+ query_parts.append("ORDER BY ov.visit ASC")
+ elif order == "desc":
+ query_parts.append("ORDER BY ov.visit DESC")
+
+ query_parts.append("LIMIT %s")
+ query_params.append(limit)
+
+ query = "\n".join(query_parts)
+ cur.execute(query, tuple(query_params))
+ yield from cur
+
def origin_visit_get(self, origin_id, visit_id, cur=None):
"""Retrieve information on visit visit_id of origin origin_id.
diff --git a/swh/storage/in_memory.py b/swh/storage/in_memory.py
--- a/swh/storage/in_memory.py
+++ b/swh/storage/in_memory.py
@@ -51,6 +51,7 @@
RawExtrinsicMetadata,
)
from swh.model.hashutil import DEFAULT_ALGORITHMS, hash_to_bytes, hash_to_hex
+from swh.storage.interface import PagedResult
from swh.storage.objstorage import ObjStorage
from swh.storage.utils import now
@@ -862,31 +863,47 @@
def origin_visit_get(
self,
origin: str,
- last_visit: Optional[int] = None,
- limit: Optional[int] = None,
+ page_token: Optional[str] = None,
order: str = "asc",
- ) -> Iterable[Dict[str, Any]]:
+ limit: int = 10,
+ ) -> PagedResult[OriginVisit]:
+ next_page_token = None
+ page_token = page_token or "0"
order = order.lower()
- assert order in ["asc", "desc"]
+ allowed_orders = ["asc", "desc"]
+ if order not in allowed_orders:
+ raise StorageArgumentException(
+ f"order must be one of {', '.join(allowed_orders)}."
+ )
+ if not isinstance(page_token, str):
+ raise StorageArgumentException("page_token must be a string.")
+
+ visit_from = int(page_token)
origin_url = self._get_origin_url(origin)
- if origin_url in self._origin_visits:
- visits = self._origin_visits[origin_url]
- visits = sorted(visits, key=lambda v: v.visit, reverse=(order == "desc"))
- if last_visit is not None:
- if order == "asc":
- visits = [v for v in visits if v.visit > last_visit]
- else:
- visits = [v for v in visits if v.visit < last_visit]
- if limit is not None:
- visits = visits[:limit]
- for visit in visits:
- if not visit:
- continue
- visit_id = visit.visit
+ extra_limit = limit + 1
+ visits = sorted(
+ self._origin_visits.get(origin_url, []),
+ key=lambda v: v.visit,
+ reverse=(order == "desc"),
+ )
+
+ if visit_from > 0 and order == "asc":
+ visits = [v for v in visits if v.visit > visit_from]
+ elif visit_from > 0 and order == "desc":
+ visits = [v for v in visits if v.visit < visit_from]
+ visits = visits[:extra_limit]
+
+ assert len(visits) <= extra_limit
+ if len(visits) == extra_limit:
+ last_visit = visits[limit]
+ visits = visits[:limit]
+ assert last_visit is not None and last_visit.visit is not None
+ if order == "asc":
+ next_page_token = str(last_visit.visit - 1)
+ else:
+ next_page_token = str(last_visit.visit + 1)
- visit_update = self._origin_visit_get_updated(origin_url, visit_id)
- assert visit_update is not None
- yield visit_update
+ return PagedResult(results=visits, next_page_token=next_page_token)
def origin_visit_find_by_date(
self, origin: str, visit_date: datetime.datetime
diff --git a/swh/storage/interface.py b/swh/storage/interface.py
--- a/swh/storage/interface.py
+++ b/swh/storage/interface.py
@@ -5,9 +5,10 @@
import datetime
-from typing import Any, Dict, Iterable, List, Optional, Tuple, Union
+from typing import Dict, Iterable, List, Optional, Tuple, TypeVar, Union
from swh.core.api import remote_api_endpoint
+from swh.core.api.classes import PagedResult as CorePagedResult
from swh.model.identifiers import SWHID
from swh.model.model import (
Content,
@@ -32,6 +33,10 @@
return f
+TResult = TypeVar("TResult")
+PagedResult = CorePagedResult[TResult, str]
+
+
class StorageInterface:
@remote_api_endpoint("check_config")
def check_config(self, *, check_write):
@@ -793,22 +798,24 @@
def origin_visit_get(
self,
origin: str,
- last_visit: Optional[int] = None,
- limit: Optional[int] = None,
+ page_token: Optional[str] = None,
order: str = "asc",
- ) -> Iterable[Dict[str, Any]]:
- """Retrieve all the origin's visit's information.
+ limit: int = 10,
+ ) -> PagedResult[OriginVisit]:
+ """Retrieve page of OriginVisit information.
Args:
origin: The visited origin
- last_visit: Starting point from which listing the next visits
- Default to None
- limit: Number of results to return from the last visit.
- Default to None
+ page_token: opaque string used to get the next results of a search
order: Order on visit id fields to list origin visits (default to asc)
+ limit: Number of visits to return
- Yields:
- List of visits.
+ Raises:
+ StorageArgumentException if the order is wrong or the page_token type is
+ mistyped.
+
+ Returns: Page of OriginVisit data model objects. if next_page_token is None,
+ there is no longer data to retrieve.
"""
...
diff --git a/swh/storage/storage.py b/swh/storage/storage.py
--- a/swh/storage/storage.py
+++ b/swh/storage/storage.py
@@ -45,6 +45,7 @@
RawExtrinsicMetadata,
)
from swh.model.hashutil import DEFAULT_ALGORITHMS, hash_to_bytes, hash_to_hex
+from swh.storage.interface import PagedResult
from swh.storage.objstorage import ObjStorage
from swh.storage.utils import now
@@ -877,22 +878,55 @@
return OriginVisitStatus.from_dict(row)
@timed
- @db_transaction_generator(statement_timeout=500)
+ @db_transaction(statement_timeout=500)
def origin_visit_get(
self,
origin: str,
- last_visit: Optional[int] = None,
- limit: Optional[int] = None,
+ page_token: Optional[str] = None,
order: str = "asc",
+ limit: int = 10,
db=None,
cur=None,
- ) -> Iterable[Dict[str, Any]]:
- assert order in ["asc", "desc"]
- lines = db.origin_visit_get_all(
- origin, last_visit=last_visit, limit=limit, order=order, cur=cur
- )
- for line in lines:
- yield dict(zip(db.origin_visit_get_cols, line))
+ ) -> PagedResult[OriginVisit]:
+ page_token = page_token or "0"
+ order = order.lower()
+ allowed_orders = ["asc", "desc"]
+ if order not in allowed_orders:
+ raise StorageArgumentException(
+ f"order must be one of {', '.join(allowed_orders)}."
+ )
+ if not isinstance(page_token, str):
+ raise StorageArgumentException("page_token must be a string.")
+
+ next_page_token = None
+ visit_from = int(page_token)
+ visits: List[OriginVisit] = []
+ extra_limit = limit + 1
+ for row in db.origin_visit_get_range(
+ origin, visit_from=visit_from, order=order, limit=extra_limit, cur=cur
+ ):
+ row_d = dict(zip(db.origin_visit_cols, row))
+ visits.append(
+ OriginVisit(
+ origin=row_d["origin"],
+ visit=row_d["visit"],
+ date=row_d["date"],
+ type=row_d["type"],
+ )
+ )
+
+ assert len(visits) <= extra_limit
+
+ if len(visits) == extra_limit:
+ last_visit = visits[limit]
+ visits = visits[:limit]
+ assert last_visit is not None and last_visit.visit is not None
+ if order == "asc":
+ next_page_token = str(last_visit.visit - 1)
+ else:
+ next_page_token = str(last_visit.visit + 1)
+
+ return PagedResult(results=visits, next_page_token=next_page_token)
@timed
@db_transaction(statement_timeout=500)
diff --git a/swh/storage/tests/test_retry.py b/swh/storage/tests/test_retry.py
--- a/swh/storage/tests/test_retry.py
+++ b/swh/storage/tests/test_retry.py
@@ -269,16 +269,15 @@
swh_storage.origin_add([origin])
- origins = list(swh_storage.origin_visit_get(origin.url))
+ origins = swh_storage.origin_visit_get(origin.url).results
assert not origins
origin_visit = swh_storage.origin_visit_add([visit])[0]
assert origin_visit.origin == origin.url
assert isinstance(origin_visit.visit, int)
- origin_visit = next(swh_storage.origin_visit_get(origin.url))
- assert origin_visit["origin"] == origin.url
- assert isinstance(origin_visit["visit"], int)
+ actual_visit = swh_storage.origin_visit_get(origin.url).results[0]
+ assert actual_visit == visit
def test_retrying_proxy_swh_storage_origin_visit_add_retry(
@@ -303,7 +302,7 @@
[visit],
]
- origins = list(swh_storage.origin_visit_get(origin.url))
+ origins = swh_storage.origin_visit_get(origin.url).results
assert not origins
r = swh_storage.origin_visit_add([visit])
@@ -327,7 +326,7 @@
visit = sample_data.origin_visit
assert visit.origin == origin.url
- origins = list(swh_storage.origin_visit_get(origin.url))
+ origins = swh_storage.origin_visit_get(origin.url).results
assert not origins
with pytest.raises(StorageArgumentException, match="Refuse to add"):
diff --git a/swh/storage/tests/test_storage.py b/swh/storage/tests/test_storage.py
--- a/swh/storage/tests/test_storage.py
+++ b/swh/storage/tests/test_storage.py
@@ -41,7 +41,7 @@
from swh.storage import get_storage
from swh.storage.converters import origin_url_to_sha1 as sha1
from swh.storage.exc import HashCollision, StorageArgumentException
-from swh.storage.interface import StorageInterface
+from swh.storage.interface import StorageInterface, PagedResult # noqa
from swh.storage.utils import content_hex_hashes, now
@@ -1253,10 +1253,29 @@
visits.append(date_visit)
return visits
+ def test_origin_visit_get__unknown_origin(self, swh_storage):
+ actual_page = swh_storage.origin_visit_get("foo")
+ assert actual_page.next_page_token is None
+ assert actual_page.results == []
+ assert actual_page == PagedResult()
+
+ def test_origin_visit_get__validation_failure(self, swh_storage, sample_data):
+ origin = sample_data.origin
+ swh_storage.origin_add([origin])
+ with pytest.raises(
+ StorageArgumentException, match="page_token must be a string"
+ ):
+ swh_storage.origin_visit_get(origin.url, page_token=10) # not bytes
+
+ with pytest.raises(
+ StorageArgumentException, match="order must be one of asc, desc"
+ ):
+ swh_storage.origin_visit_get(origin.url, order="foobar") # wrong order
+
def test_origin_visit_get_all(self, swh_storage, sample_data):
origin = sample_data.origin
swh_storage.origin_add([origin])
- visits = swh_storage.origin_visit_add(
+ ov1, ov2, ov3 = swh_storage.origin_visit_add(
[
OriginVisit(
origin=origin.url,
@@ -1275,59 +1294,86 @@
),
]
)
- ov1, ov2, ov3 = [
- {**v.to_dict(), "status": "created", "snapshot": None, "metadata": None,}
- for v in visits
- ]
# order asc, no pagination, no limit
- all_visits = list(swh_storage.origin_visit_get(origin.url))
- assert all_visits == [ov1, ov2, ov3]
+ actual_page = swh_storage.origin_visit_get(origin.url)
+ assert actual_page.next_page_token is None
+ assert actual_page == PagedResult(results=[ov1, ov2, ov3])
# order asc, no pagination, limit
- all_visits2 = list(swh_storage.origin_visit_get(origin.url, limit=2))
- assert all_visits2 == [ov1, ov2]
+ actual_page = swh_storage.origin_visit_get(origin.url, limit=2)
+ next_page_token = actual_page.next_page_token
+ assert next_page_token is not None
+ assert actual_page.results == [ov1, ov2]
# order asc, pagination, no limit
- all_visits3 = list(
- swh_storage.origin_visit_get(origin.url, last_visit=ov1["visit"])
+ actual_page = swh_storage.origin_visit_get(
+ origin.url, page_token=next_page_token
+ )
+ assert actual_page.next_page_token is None
+ assert actual_page.results == [ov3]
+ assert actual_page == PagedResult(results=[ov3])
+
+ next_page_token = str(ov1.visit)
+ actual_page = swh_storage.origin_visit_get(
+ origin.url, page_token=next_page_token
)
- assert all_visits3 == [ov2, ov3]
+ assert actual_page.next_page_token is None
+ assert actual_page == PagedResult(results=[ov2, ov3])
# order asc, pagination, limit
- all_visits4 = list(
- swh_storage.origin_visit_get(origin.url, last_visit=ov2["visit"], limit=1)
+ actual_page = swh_storage.origin_visit_get(
+ origin.url, page_token=next_page_token, limit=2
)
- assert all_visits4 == [ov3]
+ assert actual_page.next_page_token is None
+ assert actual_page.results == [ov2, ov3]
+ assert actual_page == PagedResult(results=[ov2, ov3])
+
+ next_page_token = str(ov2.visit)
+ actual_page = swh_storage.origin_visit_get(
+ origin.url, page_token=next_page_token, limit=1
+ )
+ assert actual_page.next_page_token is None
+ assert actual_page == PagedResult(results=[ov3])
# order desc, no pagination, no limit
- all_visits5 = list(swh_storage.origin_visit_get(origin.url, order="desc"))
- assert all_visits5 == [ov3, ov2, ov1]
+ actual_page = swh_storage.origin_visit_get(origin.url, order="desc")
+ assert actual_page.next_page_token is None
+ assert actual_page == PagedResult(results=[ov3, ov2, ov1])
# order desc, no pagination, limit
- all_visits6 = list(
- swh_storage.origin_visit_get(origin.url, limit=2, order="desc")
+ actual_page = swh_storage.origin_visit_get(origin.url, limit=2, order="desc")
+ next_page_token = actual_page.next_page_token
+ assert next_page_token is not None
+ assert actual_page.results == [ov3, ov2]
+
+ actual_page = swh_storage.origin_visit_get(
+ origin.url, page_token=next_page_token, order="desc"
)
- assert all_visits6 == [ov3, ov2]
+ assert actual_page.next_page_token is None
+ assert actual_page.results == [ov1]
+ assert actual_page == PagedResult(results=[ov1])
# order desc, pagination, no limit
- all_visits7 = list(
- swh_storage.origin_visit_get(
- origin.url, last_visit=ov3["visit"], order="desc"
- )
+ next_page_token = str(ov3.visit)
+ actual_page = swh_storage.origin_visit_get(
+ origin.url, page_token=next_page_token, order="desc"
)
- assert all_visits7 == [ov2, ov1]
+ assert actual_page.next_page_token is None
+ assert actual_page == PagedResult(results=[ov2, ov1])
# order desc, pagination, limit
- all_visits8 = list(
- swh_storage.origin_visit_get(
- origin.url, last_visit=ov3["visit"], order="desc", limit=1
- )
+ actual_page = swh_storage.origin_visit_get(
+ origin.url, page_token=next_page_token, order="desc", limit=1
)
- assert all_visits8 == [ov2]
+ next_page_token = actual_page.next_page_token
+ assert next_page_token is not None
+ assert actual_page.results == [ov2]
- def test_origin_visit_get__unknown_origin(self, swh_storage):
- assert [] == list(swh_storage.origin_visit_get("foo"))
+ actual_page = swh_storage.origin_visit_get(
+ origin.url, page_token=next_page_token, order="desc"
+ )
+ assert actual_page == PagedResult(results=[ov1])
def test_origin_visit_status_get_random(self, swh_storage, sample_data):
origins = sample_data.origins[:2]
@@ -1562,21 +1608,16 @@
snapshot=None,
)
- actual_origin_visits = list(swh_storage.origin_visit_get(origin1.url))
- expected_visits = [
- {**ovs1.to_dict(), "type": ov1.type},
- {**ovs2.to_dict(), "type": ov2.type},
- ]
-
- assert len(expected_visits) == len(actual_origin_visits)
-
+ actual_visits = swh_storage.origin_visit_get(origin1.url).results
+ expected_visits = [ov1, ov2]
+ assert len(expected_visits) == len(actual_visits)
for visit in expected_visits:
- assert visit in actual_origin_visits
+ assert visit in actual_visits
actual_objects = list(swh_storage.journal_writer.journal.objects)
expected_objects = list(
[("origin", origin1)]
- + [("origin_visit", visit) for visit in [ov1, ov2]] * 2
+ + [("origin_visit", visit) for visit in expected_visits] * 2
+ [("origin_visit_status", ovs) for ovs in [ovs1, ovs2]]
)
@@ -1719,7 +1760,7 @@
status="created",
snapshot=None,
)
- date_visit_now = now()
+ date_visit_now = round_to_milliseconds(now())
visit_status1 = OriginVisitStatus(
origin=ov1.origin,
visit=ov1.visit,
@@ -1732,13 +1773,8 @@
# second call will ignore existing entries (will send to storage though)
swh_storage.origin_visit_status_add([visit_status1])
- origin_visits = list(swh_storage.origin_visit_get(ov1.origin))
-
- assert len(origin_visits) == 1
- origin_visit1 = origin_visits[0]
- assert origin_visit1
- assert origin_visit1["status"] == "full"
- assert origin_visit1["snapshot"] == snapshot.id
+ visit_status = swh_storage.origin_visit_status_get_latest(ov1.origin, ov1.visit)
+ assert visit_status == visit_status1
actual_objects = list(swh_storage.journal_writer.journal.objects)
File Metadata
Details
Attached
Mime Type
text/plain
Expires
Wed, Dec 18, 12:57 PM (10 h, 55 m ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3217866
Attached To
D3627: storage*: origin_visit_get(...) -> PagedResult[OriginVisit]
Event Timeline
Log In to Comment