Changeset View
Changeset View
Standalone View
Standalone View
swh/storage/storage.py
Show First 20 Lines • Show All 1,136 Lines • ▼ Show 20 Lines | ) -> PagedResult[Origin]: | ||||
assert len(origins) <= limit | assert len(origins) <= limit | ||||
return PagedResult(results=origins, next_page_token=next_page_token) | return PagedResult(results=origins, next_page_token=next_page_token) | ||||
@timed | @timed | ||||
@db_transaction() | @db_transaction() | ||||
def origin_count( | def origin_count( | ||||
self, url_pattern, regexp=False, with_visit=False, db=None, cur=None | self, | ||||
): | url_pattern: str, | ||||
regexp: bool = False, | |||||
with_visit: bool = False, | |||||
db=None, | |||||
cur=None, | |||||
) -> int: | |||||
return db.origin_count(url_pattern, regexp, with_visit, cur) | return db.origin_count(url_pattern, regexp, with_visit, cur) | ||||
@timed | @timed | ||||
@process_metrics | @process_metrics | ||||
@db_transaction() | @db_transaction() | ||||
def origin_add(self, origins: List[Origin], db=None, cur=None) -> Dict[str, int]: | def origin_add(self, origins: List[Origin], db=None, cur=None) -> Dict[str, int]: | ||||
urls = [o.url for o in origins] | urls = [o.url for o in origins] | ||||
known_origins = set(url for (url,) in db.origin_get_by_url(urls, cur)) | known_origins = set(url for (url,) in db.origin_get_by_url(urls, cur)) | ||||
▲ Show 20 Lines • Show All 232 Lines • Show Last 20 Lines |