Changeset View
Changeset View
Standalone View
Standalone View
swh/storage/in_memory.py
Show First 20 Lines • Show All 132 Lines • ▼ Show 20 Lines | def reset(self): | ||||
self._origins = {} | self._origins = {} | ||||
self._origins_by_id = [] | self._origins_by_id = [] | ||||
self._origins_by_sha1 = {} | self._origins_by_sha1 = {} | ||||
self._origin_visits = {} | self._origin_visits = {} | ||||
self._origin_visit_statuses: Dict[Tuple[str, int], List[OriginVisitStatus]] = {} | self._origin_visit_statuses: Dict[Tuple[str, int], List[OriginVisitStatus]] = {} | ||||
self._persons = {} | self._persons = {} | ||||
# {origin_url: {authority: [metadata]}} | # {origin_url: {authority: [metadata]}} | ||||
self._origin_metadata: Dict[ | self._object_metadata: Dict[ | ||||
str, | str, | ||||
Dict[ | Dict[ | ||||
Hashable, | Hashable, | ||||
SortedList[Tuple[datetime.datetime, FetcherKey], Dict[str, Any]], | SortedList[Tuple[datetime.datetime, FetcherKey], Dict[str, Any]], | ||||
], | ], | ||||
] = defaultdict( | ] = defaultdict( | ||||
lambda: defaultdict( | lambda: defaultdict( | ||||
lambda: SortedList(key=lambda x: (x["discovery_date"], x["fetcher"])) | lambda: SortedList(key=lambda x: (x["discovery_date"], x["fetcher"])) | ||||
▲ Show 20 Lines • Show All 858 Lines • ▼ Show 20 Lines | def stat_counters(self): | ||||
collections.Counter( | collections.Counter( | ||||
obj_type | obj_type | ||||
for (obj_type, obj_id) in itertools.chain(*self._objects.values()) | for (obj_type, obj_id) in itertools.chain(*self._objects.values()) | ||||
) | ) | ||||
) | ) | ||||
return stats | return stats | ||||
def refresh_stat_counters(self): | def refresh_stat_counters(self): | ||||
pass | pass | ||||
ardumont: Remove the extra pass ;) | |||||
Done Inline ActionsI'm blaming git vlorentz: I'm blaming git | |||||
Not Done Inline Actions:D ardumont: :D | |||||
def content_metadata_add( | |||||
self, | |||||
id: str, | |||||
discovery_date: datetime.datetime, | |||||
authority: Dict[str, Any], | |||||
fetcher: Dict[str, Any], | |||||
format: str, | |||||
metadata: bytes, | |||||
) -> None: | |||||
self._object_metadata_add( | |||||
"content", id, discovery_date, authority, fetcher, format, metadata, | |||||
) | |||||
def origin_metadata_add( | def origin_metadata_add( | ||||
self, | self, | ||||
origin_url: str, | origin_url: str, | ||||
discovery_date: datetime.datetime, | discovery_date: datetime.datetime, | ||||
authority: Dict[str, Any], | authority: Dict[str, Any], | ||||
fetcher: Dict[str, Any], | fetcher: Dict[str, Any], | ||||
format: str, | format: str, | ||||
metadata: bytes, | metadata: bytes, | ||||
) -> None: | ) -> None: | ||||
if not isinstance(origin_url, str): | if not isinstance(origin_url, str): | ||||
raise StorageArgumentException( | raise StorageArgumentException( | ||||
"origin_id must be str, not %r" % (origin_url,) | "origin_url must be str, not %r" % (origin_url,) | ||||
) | |||||
self._object_metadata_add( | |||||
"origin", origin_url, discovery_date, authority, fetcher, format, metadata, | |||||
) | ) | ||||
def _object_metadata_add( | |||||
self, | |||||
object_type: str, | |||||
id: str, | |||||
discovery_date: datetime.datetime, | |||||
authority: Dict[str, Any], | |||||
fetcher: Dict[str, Any], | |||||
format: str, | |||||
metadata: bytes, | |||||
) -> None: | |||||
if not isinstance(metadata, bytes): | if not isinstance(metadata, bytes): | ||||
raise StorageArgumentException( | raise StorageArgumentException( | ||||
"metadata must be bytes, not %r" % (metadata,) | "metadata must be bytes, not %r" % (metadata,) | ||||
) | ) | ||||
authority_key = self._metadata_authority_key(authority) | authority_key = self._metadata_authority_key(authority) | ||||
if authority_key not in self._metadata_authorities: | if authority_key not in self._metadata_authorities: | ||||
raise StorageArgumentException(f"Unknown authority {authority}") | raise StorageArgumentException(f"Unknown authority {authority}") | ||||
fetcher_key = self._metadata_fetcher_key(fetcher) | fetcher_key = self._metadata_fetcher_key(fetcher) | ||||
if fetcher_key not in self._metadata_fetchers: | if fetcher_key not in self._metadata_fetchers: | ||||
raise StorageArgumentException(f"Unknown fetcher {fetcher}") | raise StorageArgumentException(f"Unknown fetcher {fetcher}") | ||||
origin_metadata_list = self._origin_metadata[origin_url][authority_key] | object_metadata_list = self._object_metadata[id][authority_key] | ||||
origin_metadata = { | object_metadata: Dict[str, Any] = { | ||||
"origin_url": origin_url, | "id": id, | ||||
"discovery_date": discovery_date, | "discovery_date": discovery_date, | ||||
"authority": authority_key, | "authority": authority_key, | ||||
"fetcher": fetcher_key, | "fetcher": fetcher_key, | ||||
"format": format, | "format": format, | ||||
"metadata": metadata, | "metadata": metadata, | ||||
} | } | ||||
for existing_origin_metadata in origin_metadata_list: | for existing_object_metadata in object_metadata_list: | ||||
if ( | if ( | ||||
existing_origin_metadata["fetcher"] == fetcher_key | existing_object_metadata["fetcher"] == fetcher_key | ||||
and existing_origin_metadata["discovery_date"] == discovery_date | and existing_object_metadata["discovery_date"] == discovery_date | ||||
): | ): | ||||
# Duplicate of an existing one; replace it. | # Duplicate of an existing one; replace it. | ||||
existing_origin_metadata.update(origin_metadata) | existing_object_metadata.update(object_metadata) | ||||
break | break | ||||
else: | else: | ||||
origin_metadata_list.add(origin_metadata) | object_metadata_list.add(object_metadata) | ||||
return None | |||||
def origin_metadata_get( | def origin_metadata_get( | ||||
self, | self, | ||||
origin_url: str, | origin_url: str, | ||||
authority: Dict[str, str], | authority: Dict[str, str], | ||||
after: Optional[datetime.datetime] = None, | after: Optional[datetime.datetime] = None, | ||||
page_token: Optional[bytes] = None, | page_token: Optional[bytes] = None, | ||||
limit: int = 1000, | limit: int = 1000, | ||||
) -> Dict[str, Any]: | ) -> Dict[str, Any]: | ||||
if not isinstance(origin_url, str): | if not isinstance(origin_url, str): | ||||
raise TypeError("origin_url must be str, not %r" % (origin_url,)) | raise TypeError("origin_url must be str, not %r" % (origin_url,)) | ||||
res = self._object_metadata_get( | |||||
"origin", origin_url, authority, after, page_token, limit | |||||
) | |||||
res["results"] = copy.deepcopy(res["results"]) | |||||
for result in res["results"]: | |||||
result["origin_url"] = result.pop("id") | |||||
return res | |||||
def _object_metadata_get( | |||||
self, | |||||
object_type: str, | |||||
id: str, | |||||
authority: Dict[str, str], | |||||
after: Optional[datetime.datetime] = None, | |||||
page_token: Optional[bytes] = None, | |||||
limit: int = 1000, | |||||
) -> Dict[str, Any]: | |||||
authority_key = self._metadata_authority_key(authority) | authority_key = self._metadata_authority_key(authority) | ||||
if page_token is not None: | if page_token is not None: | ||||
(after_time, after_fetcher) = msgpack_loads(page_token) | (after_time, after_fetcher) = msgpack_loads(page_token) | ||||
after_fetcher = tuple(after_fetcher) | after_fetcher = tuple(after_fetcher) | ||||
if after is not None and after > after_time: | if after is not None and after > after_time: | ||||
raise StorageArgumentException( | raise StorageArgumentException( | ||||
"page_token is inconsistent with the value of 'after'." | "page_token is inconsistent with the value of 'after'." | ||||
) | ) | ||||
entries = self._origin_metadata[origin_url][authority_key].iter_after( | entries = self._object_metadata[id][authority_key].iter_after( | ||||
(after_time, after_fetcher) | (after_time, after_fetcher) | ||||
) | ) | ||||
elif after is not None: | elif after is not None: | ||||
entries = self._origin_metadata[origin_url][authority_key].iter_from( | entries = self._object_metadata[id][authority_key].iter_from((after,)) | ||||
(after,) | |||||
) | |||||
entries = (entry for entry in entries if entry["discovery_date"] > after) | entries = (entry for entry in entries if entry["discovery_date"] > after) | ||||
else: | else: | ||||
entries = iter(self._origin_metadata[origin_url][authority_key]) | entries = iter(self._object_metadata[id][authority_key]) | ||||
if limit: | if limit: | ||||
entries = itertools.islice(entries, 0, limit + 1) | entries = itertools.islice(entries, 0, limit + 1) | ||||
results = [] | results = [] | ||||
for entry in entries: | for entry in entries: | ||||
authority = self._metadata_authorities[entry["authority"]] | authority = self._metadata_authorities[entry["authority"]] | ||||
fetcher = self._metadata_fetchers[entry["fetcher"]] | fetcher = self._metadata_fetchers[entry["fetcher"]] | ||||
▲ Show 20 Lines • Show All 110 Lines • Show Last 20 Lines |
Remove the extra pass ;)