Changeset View
Changeset View
Standalone View
Standalone View
swh/storage/storage.py
Show First 20 Lines • Show All 1,156 Lines • ▼ Show 20 Lines | def origin_metadata_add( | ||||
discovery_date: datetime.datetime, | discovery_date: datetime.datetime, | ||||
authority: Dict[str, Any], | authority: Dict[str, Any], | ||||
fetcher: Dict[str, Any], | fetcher: Dict[str, Any], | ||||
format: str, | format: str, | ||||
metadata: bytes, | metadata: bytes, | ||||
db=None, | db=None, | ||||
cur=None, | cur=None, | ||||
) -> None: | ) -> None: | ||||
authority_id = db.metadata_authority_get_id( | self._object_metadata_add( | ||||
authority["type"], authority["url"], cur | "origin", | ||||
ardumont: could you add a test for that part? | |||||
origin_url, | |||||
discovery_date, | |||||
authority, | |||||
fetcher, | |||||
format, | |||||
metadata, | |||||
db, | |||||
cur, | |||||
) | ) | ||||
if not authority_id: | |||||
raise StorageArgumentException(f"Unknown authority {authority}") | def _object_metadata_add( | ||||
fetcher_id = db.metadata_fetcher_get_id( | self, | ||||
fetcher["name"], fetcher["version"], cur | object_type: str, | ||||
id: str, | |||||
discovery_date: datetime.datetime, | |||||
authority: Dict[str, Any], | |||||
fetcher: Dict[str, Any], | |||||
format: str, | |||||
metadata: bytes, | |||||
db, | |||||
cur, | |||||
) -> None: | |||||
authority_id = self._get_authority_id(authority, db, cur) | |||||
fetcher_id = self._get_fetcher_id(fetcher, db, cur) | |||||
if not isinstance(metadata, bytes): | |||||
raise StorageArgumentException( | |||||
"metadata must be bytes, not %r" % (metadata,) | |||||
) | ) | ||||
if not fetcher_id: | |||||
raise StorageArgumentException(f"Unknown fetcher {fetcher}") | db.object_metadata_add( | ||||
try: | object_type, | ||||
db.origin_metadata_add( | id, | ||||
origin_url, | |||||
discovery_date, | discovery_date, | ||||
authority_id, | authority_id, | ||||
fetcher_id, | fetcher_id, | ||||
format, | format, | ||||
metadata, | metadata, | ||||
cur, | cur, | ||||
) | ) | ||||
except psycopg2.ProgrammingError as e: | |||||
raise StorageArgumentException(*e.args) | send_metric( | ||||
send_metric("origin_metadata:add", count=1, method_name="origin_metadata_add") | f"{object_type}_metadata:add", | ||||
count=1, | |||||
method_name=f"{object_type}_metadata_add", | |||||
) | |||||
@timed | @timed | ||||
@db_transaction(statement_timeout=500) | @db_transaction(statement_timeout=500) | ||||
def origin_metadata_get( | def origin_metadata_get( | ||||
self, | self, | ||||
origin_url: str, | origin_url: str, | ||||
authority: Dict[str, str], | authority: Dict[str, str], | ||||
after: Optional[datetime.datetime] = None, | after: Optional[datetime.datetime] = None, | ||||
page_token: Optional[bytes] = None, | page_token: Optional[bytes] = None, | ||||
limit: int = 1000, | limit: int = 1000, | ||||
db=None, | db=None, | ||||
cur=None, | cur=None, | ||||
) -> Dict[str, Any]: | ) -> Dict[str, Any]: | ||||
result = self._object_metadata_get( | |||||
"origin", origin_url, authority, after, page_token, limit, db, cur | |||||
Not Done Inline Actionssame here. ardumont: same here. | |||||
) | |||||
for res in result["results"]: | |||||
res.pop("id") | |||||
res["origin_url"] = origin_url | |||||
return result | |||||
def _object_metadata_get( | |||||
self, | |||||
object_type: str, | |||||
id: str, | |||||
authority: Dict[str, str], | |||||
after: Optional[datetime.datetime], | |||||
page_token: Optional[bytes], | |||||
limit: int, | |||||
db, | |||||
cur, | |||||
) -> Dict[str, Any]: | |||||
if page_token: | if page_token: | ||||
(after_time, after_fetcher) = msgpack_loads(page_token) | (after_time, after_fetcher) = msgpack_loads(page_token) | ||||
if after and after_time < after: | if after and after_time < after: | ||||
raise StorageArgumentException( | raise StorageArgumentException( | ||||
"page_token is inconsistent with the value of 'after'." | "page_token is inconsistent with the value of 'after'." | ||||
) | ) | ||||
else: | else: | ||||
after_time = after | after_time = after | ||||
after_fetcher = None | after_fetcher = None | ||||
authority_id = db.metadata_authority_get_id( | authority_id = db.metadata_authority_get_id( | ||||
authority["type"], authority["url"], cur | authority["type"], authority["url"], cur | ||||
) | ) | ||||
if not authority_id: | if not authority_id: | ||||
return { | return { | ||||
"next_page_token": None, | "next_page_token": None, | ||||
"results": [], | "results": [], | ||||
} | } | ||||
rows = db.origin_metadata_get( | rows = db.object_metadata_get( | ||||
origin_url, authority_id, after_time, after_fetcher, limit + 1, cur | object_type, id, authority_id, after_time, after_fetcher, limit + 1, cur | ||||
) | ) | ||||
rows = [dict(zip(db.origin_metadata_get_cols, row)) for row in rows] | rows = [dict(zip(db.object_metadata_get_cols, row)) for row in rows] | ||||
results = [] | results = [] | ||||
for row in rows: | for row in rows: | ||||
row = row.copy() | row = row.copy() | ||||
row.pop("metadata_fetcher.id") | row.pop("metadata_fetcher.id") | ||||
results.append( | result = { | ||||
{ | |||||
"origin_url": row.pop("origin.url"), | |||||
"authority": { | "authority": { | ||||
"type": row.pop("metadata_authority.type"), | "type": row.pop("metadata_authority.type"), | ||||
"url": row.pop("metadata_authority.url"), | "url": row.pop("metadata_authority.url"), | ||||
}, | }, | ||||
"fetcher": { | "fetcher": { | ||||
"name": row.pop("metadata_fetcher.name"), | "name": row.pop("metadata_fetcher.name"), | ||||
"version": row.pop("metadata_fetcher.version"), | "version": row.pop("metadata_fetcher.version"), | ||||
}, | }, | ||||
**row, | **row, | ||||
} | } | ||||
) | |||||
results.append(result) | |||||
if len(results) > limit: | if len(results) > limit: | ||||
results.pop() | results.pop() | ||||
assert len(results) == limit | assert len(results) == limit | ||||
last_returned_row = rows[-2] # rows[-1] corresponds to the popped result | last_returned_row = rows[-2] # rows[-1] corresponds to the popped result | ||||
next_page_token: Optional[bytes] = msgpack_dumps( | next_page_token: Optional[bytes] = msgpack_dumps( | ||||
( | ( | ||||
last_returned_row["discovery_date"], | last_returned_row["discovery_date"], | ||||
▲ Show 20 Lines • Show All 59 Lines • ▼ Show 20 Lines | |||||
def clear_buffers(self, object_types: Optional[Iterable[str]] = None) -> None: | def clear_buffers(self, object_types: Optional[Iterable[str]] = None) -> None: | ||||
"""Do nothing | """Do nothing | ||||
""" | """ | ||||
return None | return None | ||||
def flush(self, object_types: Optional[Iterable[str]] = None) -> Dict: | def flush(self, object_types: Optional[Iterable[str]] = None) -> Dict: | ||||
return {} | return {} | ||||
def _get_authority_id(self, authority: Dict[str, Any], db, cur): | |||||
authority_id = db.metadata_authority_get_id( | |||||
authority["type"], authority["url"], cur | |||||
) | |||||
if not authority_id: | |||||
raise StorageArgumentException(f"Unknown authority {authority}") | |||||
return authority_id | |||||
def _get_fetcher_id(self, fetcher: Dict[str, Any], db, cur): | |||||
fetcher_id = db.metadata_fetcher_get_id( | |||||
fetcher["name"], fetcher["version"], cur | |||||
) | |||||
if not fetcher_id: | |||||
raise StorageArgumentException(f"Unknown fetcher {fetcher}") | |||||
return fetcher_id |
could you add a test for that part?