Changeset View
Changeset View
Standalone View
Standalone View
swh/storage/cassandra/storage.py
Show First 20 Lines • Show All 679 Lines • ▼ Show 20 Lines | def object_find_by_sha1_git(self, ids): | ||||
missing_ids.remove(sha1_git) | missing_ids.remove(sha1_git) | ||||
if not missing_ids: | if not missing_ids: | ||||
# We found everything, skipping the next queries. | # We found everything, skipping the next queries. | ||||
break | break | ||||
return results | return results | ||||
def origin_get(self, origins): | def origin_get(self, origins: Iterable[str]) -> Iterable[Optional[Origin]]: | ||||
if isinstance(origins, dict): | return [self.origin_get_one(origin) for origin in origins] | ||||
# Old API | |||||
return_single = True | |||||
origins = [origins] | |||||
else: | |||||
return_single = False | |||||
if any("id" in origin for origin in origins): | |||||
raise StorageArgumentException("Origin ids are not supported.") | |||||
results = [self.origin_get_one(origin) for origin in origins] | |||||
if return_single: | |||||
assert len(results) == 1 | |||||
return results[0] | |||||
else: | |||||
return results | |||||
def origin_get_one(self, origin: Dict[str, Any]) -> Optional[Dict[str, Any]]: | def origin_get_one(self, origin_url: str) -> Optional[Origin]: | ||||
if "id" in origin: | """Given an origin url, return the origin if it exists, None otherwise | ||||
raise StorageArgumentException("Origin ids are not supported.") | |||||
if "url" not in origin: | |||||
raise StorageArgumentException("Missing origin url") | |||||
rows = self._cql_runner.origin_get_by_url(origin["url"]) | |||||
rows = list(rows) | """ | ||||
rows = list(self._cql_runner.origin_get_by_url(origin_url)) | |||||
if rows: | if rows: | ||||
assert len(rows) == 1 | assert len(rows) == 1 | ||||
result = rows[0]._asdict() | return Origin(url=rows[0].url) | ||||
return { | |||||
"url": result["url"], | |||||
} | |||||
else: | else: | ||||
return None | return None | ||||
def origin_get_by_sha1(self, sha1s): | def origin_get_by_sha1(self, sha1s): | ||||
results = [] | results = [] | ||||
for sha1 in sha1s: | for sha1 in sha1s: | ||||
rows = self._cql_runner.origin_get_by_sha1(sha1) | rows = self._cql_runner.origin_get_by_sha1(sha1) | ||||
if rows: | if rows: | ||||
Show All 36 Lines | ): | ||||
if with_visit: | if with_visit: | ||||
origins = [orig for orig in origins if orig.next_visit_id > 1] | origins = [orig for orig in origins if orig.next_visit_id > 1] | ||||
return [{"url": orig.url,} for orig in origins[offset : offset + limit]] | return [{"url": orig.url,} for orig in origins[offset : offset + limit]] | ||||
def origin_add(self, origins: Iterable[Origin]) -> Dict[str, int]: | def origin_add(self, origins: Iterable[Origin]) -> Dict[str, int]: | ||||
origins = list(origins) | origins = list(origins) | ||||
known_origins = [ | to_add = [ori for ori in origins if self.origin_get_one(ori.url) is None] | ||||
Origin.from_dict(d) | |||||
for d in self.origin_get([origin.to_dict() for origin in origins]) | |||||
if d is not None | |||||
] | |||||
to_add = [origin for origin in origins if origin not in known_origins] | |||||
self.journal_writer.origin_add(to_add) | self.journal_writer.origin_add(to_add) | ||||
for origin in to_add: | for origin in to_add: | ||||
self._cql_runner.origin_add_one(origin) | self._cql_runner.origin_add_one(origin) | ||||
return {"origin:add": len(to_add)} | return {"origin:add": len(to_add)} | ||||
def origin_visit_add(self, visits: Iterable[OriginVisit]) -> Iterable[OriginVisit]: | def origin_visit_add(self, visits: Iterable[OriginVisit]) -> Iterable[OriginVisit]: | ||||
for visit in visits: | for visit in visits: | ||||
origin = self.origin_get({"url": visit.origin}) | origin = self.origin_get_one(visit.origin) | ||||
if not origin: # Cannot add a visit without an origin | if not origin: # Cannot add a visit without an origin | ||||
raise StorageArgumentException("Unknown origin %s", visit.origin) | raise StorageArgumentException("Unknown origin %s", visit.origin) | ||||
all_visits = [] | all_visits = [] | ||||
nb_visits = 0 | nb_visits = 0 | ||||
for visit in visits: | for visit in visits: | ||||
nb_visits += 1 | nb_visits += 1 | ||||
if not visit.visit: | if not visit.visit: | ||||
Show All 22 Lines | def _origin_visit_status_add(self, visit_status: OriginVisitStatus) -> None: | ||||
self.journal_writer.origin_visit_status_add([visit_status]) | self.journal_writer.origin_visit_status_add([visit_status]) | ||||
self._cql_runner.origin_visit_status_add_one(visit_status) | self._cql_runner.origin_visit_status_add_one(visit_status) | ||||
def origin_visit_status_add( | def origin_visit_status_add( | ||||
self, visit_statuses: Iterable[OriginVisitStatus] | self, visit_statuses: Iterable[OriginVisitStatus] | ||||
) -> None: | ) -> None: | ||||
# First round to check existence (fail early if any is ko) | # First round to check existence (fail early if any is ko) | ||||
for visit_status in visit_statuses: | for visit_status in visit_statuses: | ||||
origin_url = self.origin_get({"url": visit_status.origin}) | origin_url = self.origin_get_one(visit_status.origin) | ||||
if not origin_url: | if not origin_url: | ||||
raise StorageArgumentException(f"Unknown origin {visit_status.origin}") | raise StorageArgumentException(f"Unknown origin {visit_status.origin}") | ||||
for visit_status in visit_statuses: | for visit_status in visit_statuses: | ||||
self._origin_visit_status_add(visit_status) | self._origin_visit_status_add(visit_status) | ||||
def _origin_visit_apply_last_status(self, visit: Dict[str, Any]) -> Dict[str, Any]: | def _origin_visit_apply_last_status(self, visit: Dict[str, Any]) -> Dict[str, Any]: | ||||
"""Retrieve the latest visit status information for the origin visit. | """Retrieve the latest visit status information for the origin visit. | ||||
▲ Show 20 Lines • Show All 347 Lines • Show Last 20 Lines |