Changeset View
Changeset View
Standalone View
Standalone View
swh/storage/in_memory.py
| Show All 13 Lines | |||||
| from collections import defaultdict | from collections import defaultdict | ||||
| from datetime import timedelta | from datetime import timedelta | ||||
| from typing import ( | from typing import ( | ||||
| Any, | Any, | ||||
| Callable, | Callable, | ||||
| Dict, | Dict, | ||||
| Generic, | Generic, | ||||
| Hashable, | |||||
| Iterable, | Iterable, | ||||
| Iterator, | Iterator, | ||||
| List, | List, | ||||
| Optional, | Optional, | ||||
| Tuple, | Tuple, | ||||
| TypeVar, | TypeVar, | ||||
| Union, | Union, | ||||
| ) | ) | ||||
| ▲ Show 20 Lines • Show All 63 Lines • ▼ Show 20 Lines | class SortedList(collections.UserList, Generic[SortedListKey, SortedListItem]): | ||||
| def iter_from(self, start_key: SortedListKey) -> Iterator[SortedListItem]: | def iter_from(self, start_key: SortedListKey) -> Iterator[SortedListItem]: | ||||
| """Returns an iterator over all the elements whose key is greater | """Returns an iterator over all the elements whose key is greater | ||||
| or equal to `start_key`. | or equal to `start_key`. | ||||
| (This is an efficient equivalent to: | (This is an efficient equivalent to: | ||||
| `(x for x in L if key(x) >= start_key)`) | `(x for x in L if key(x) >= start_key)`) | ||||
| """ | """ | ||||
| from_index = bisect.bisect_left(self.data, (start_key,)) | from_index = bisect.bisect_left(self.data, (start_key,)) | ||||
| for (k, item) in itertools.islice(self.data, from_index, None): | for (k, item) in itertools.islice(self.data, from_index, None): | ||||
ardumont: Missing a rebase on latest master
That's D2987, ain't it? | |||||
| yield item | yield item | ||||
| class InMemoryStorage: | class InMemoryStorage: | ||||
| def __init__(self, journal_writer=None): | def __init__(self, journal_writer=None): | ||||
| self.reset() | self.reset() | ||||
| self.journal_writer = JournalWriter(journal_writer) | self.journal_writer = JournalWriter(journal_writer) | ||||
| def reset(self): | def reset(self): | ||||
| self._contents = {} | self._contents = {} | ||||
| self._content_indexes = defaultdict(lambda: defaultdict(set)) | self._content_indexes = defaultdict(lambda: defaultdict(set)) | ||||
| self._skipped_contents = {} | self._skipped_contents = {} | ||||
| self._skipped_content_indexes = defaultdict(lambda: defaultdict(set)) | self._skipped_content_indexes = defaultdict(lambda: defaultdict(set)) | ||||
| self._directories = {} | self._directories = {} | ||||
| self._revisions = {} | self._revisions = {} | ||||
| self._releases = {} | self._releases = {} | ||||
| self._snapshots = {} | self._snapshots = {} | ||||
| self._origins = {} | self._origins = {} | ||||
| self._origins_by_id = [] | self._origins_by_id = [] | ||||
| self._origins_by_sha1 = {} | self._origins_by_sha1 = {} | ||||
| self._origin_visits = {} | self._origin_visits = {} | ||||
| self._origin_visit_statuses: Dict[Tuple[str, int], List[OriginVisitStatus]] = {} | self._origin_visit_statuses: Dict[Tuple[str, int], List[OriginVisitStatus]] = {} | ||||
| self._persons = [] | self._persons = [] | ||||
| self._origin_metadata = defaultdict(list) | |||||
| self._tools = {} | |||||
| self._metadata_providers = {} | |||||
| self._objects = defaultdict(list) | |||||
| # {origin_url: {authority: [metadata]}} | |||||
| self._origin_metadata: Dict[ | |||||
| str, Dict[Hashable, SortedList[datetime.datetime, Dict[str, Any]]] | |||||
| ] = defaultdict( | |||||
| lambda: defaultdict(lambda: SortedList(key=lambda x: x["discovery_date"])) | |||||
| ) # noqa | |||||
| self._metadata_fetchers: Dict[Hashable, Dict[str, Any]] = {} | |||||
| self._metadata_authorities: Dict[Hashable, Dict[str, Any]] = {} | |||||
| self._objects = defaultdict(list) | |||||
| self._sorted_sha1s = SortedList[bytes, bytes]() | self._sorted_sha1s = SortedList[bytes, bytes]() | ||||
| self.objstorage = ObjStorage({"cls": "memory", "args": {}}) | self.objstorage = ObjStorage({"cls": "memory", "args": {}}) | ||||
| def check_config(self, *, check_write): | def check_config(self, *, check_write): | ||||
| return True | return True | ||||
| def _content_add(self, contents: Iterable[Content], with_data: bool) -> Dict: | def _content_add(self, contents: Iterable[Content], with_data: bool) -> Dict: | ||||
| self.journal_writer.content_add(contents) | self.journal_writer.content_add(contents) | ||||
| content_add = 0 | content_add = 0 | ||||
| content_add_bytes = 0 | |||||
| if with_data: | if with_data: | ||||
| summary = self.objstorage.content_add( | summary = self.objstorage.content_add( | ||||
| c for c in contents if c.status != "absent" | c for c in contents if c.status != "absent" | ||||
| ) | ) | ||||
| content_add_bytes = summary["content:add:bytes"] | content_add_bytes = summary["content:add:bytes"] | ||||
| for content in contents: | for content in contents: | ||||
| key = self._content_key(content) | key = self._content_key(content) | ||||
| ▲ Show 20 Lines • Show All 884 Lines • ▼ Show 20 Lines | def stat_counters(self): | ||||
| for (obj_type, obj_id) in itertools.chain(*self._objects.values()) | for (obj_type, obj_id) in itertools.chain(*self._objects.values()) | ||||
| ) | ) | ||||
| ) | ) | ||||
| return stats | return stats | ||||
| def refresh_stat_counters(self): | def refresh_stat_counters(self): | ||||
| pass | pass | ||||
| def origin_metadata_add(self, origin_url, ts, provider, tool, metadata): | def origin_metadata_add( | ||||
| self, | |||||
| origin_url: str, | |||||
| discovery_date: datetime.datetime, | |||||
| authority: Dict[str, Any], | |||||
| fetcher: Dict[str, Any], | |||||
| format: str, | |||||
| metadata: bytes, | |||||
| ) -> None: | |||||
| if not isinstance(origin_url, str): | if not isinstance(origin_url, str): | ||||
| raise TypeError("origin_id must be str, not %r" % (origin_url,)) | raise StorageArgumentException( | ||||
| "origin_id must be str, not %r" % (origin_url,) | |||||
| if isinstance(ts, str): | ) | ||||
| ts = dateutil.parser.parse(ts) | authority_key = self._metadata_authority_key(authority) | ||||
| if authority_key not in self._metadata_authorities: | |||||
| raise StorageArgumentException(f"Unknown authority {authority}") | |||||
| fetcher_key = self._metadata_fetcher_key(fetcher) | |||||
| if fetcher_key not in self._metadata_fetchers: | |||||
| raise StorageArgumentException(f"Unknown fetcher {fetcher}") | |||||
| origin_metadata = { | origin_metadata = { | ||||
| "origin_url": origin_url, | "origin_url": origin_url, | ||||
| "discovery_date": ts, | "discovery_date": discovery_date, | ||||
| "tool_id": tool, | "authority": authority_key, | ||||
| "fetcher": fetcher_key, | |||||
| "format": format, | |||||
| "metadata": metadata, | "metadata": metadata, | ||||
| "provider_id": provider, | |||||
| } | } | ||||
| self._origin_metadata[origin_url].append(origin_metadata) | self._origin_metadata[origin_url][authority_key].add(origin_metadata) | ||||
| return None | return None | ||||
| def origin_metadata_get_by(self, origin_url, provider_type=None): | def origin_metadata_get( | ||||
| self, | |||||
| origin_url: str, | |||||
| authority: Dict[str, str], | |||||
| after: Optional[datetime.datetime] = None, | |||||
| limit: Optional[int] = None, | |||||
| ) -> List[Dict[str, Any]]: | |||||
| if not isinstance(origin_url, str): | if not isinstance(origin_url, str): | ||||
| raise TypeError("origin_url must be str, not %r" % (origin_url,)) | raise TypeError("origin_url must be str, not %r" % (origin_url,)) | ||||
| metadata = [] | |||||
| for item in self._origin_metadata[origin_url]: | |||||
| item = copy.deepcopy(item) | |||||
| provider = self.metadata_provider_get(item["provider_id"]) | |||||
| for attr_name in ("name", "type", "url"): | |||||
| item["provider_" + attr_name] = provider["provider_" + attr_name] | |||||
| metadata.append(item) | |||||
| return metadata | |||||
| def tool_add(self, tools): | |||||
| inserted = [] | |||||
| for tool in tools: | |||||
| key = self._tool_key(tool) | |||||
| assert "id" not in tool | |||||
| record = copy.deepcopy(tool) | |||||
| record["id"] = key # TODO: remove this | |||||
| if key not in self._tools: | |||||
| self._tools[key] = record | |||||
| inserted.append(copy.deepcopy(self._tools[key])) | |||||
| return inserted | authority_key = self._metadata_authority_key(authority) | ||||
| def tool_get(self, tool): | if after is None: | ||||
| return self._tools.get(self._tool_key(tool)) | entries = iter(self._origin_metadata[origin_url][authority_key]) | ||||
| else: | |||||
| entries = self._origin_metadata[origin_url][authority_key].iter_from(after) | |||||
| if limit: | |||||
| entries = itertools.islice(entries, 0, limit) | |||||
| def metadata_provider_add( | results = [] | ||||
| self, provider_name, provider_type, provider_url, metadata | for entry in entries: | ||||
| ): | authority = self._metadata_authorities[entry["authority"]] | ||||
| provider = { | fetcher = self._metadata_fetchers[entry["fetcher"]] | ||||
| "provider_name": provider_name, | results.append( | ||||
| "provider_type": provider_type, | { | ||||
| "provider_url": provider_url, | **entry, | ||||
| "authority": {"type": authority["type"], "url": authority["url"],}, | |||||
| "fetcher": { | |||||
| "name": fetcher["name"], | |||||
| "version": fetcher["version"], | |||||
| }, | |||||
| } | |||||
| ) | |||||
| return results | |||||
| def metadata_fetcher_add( | |||||
| self, name: str, version: str, metadata: Dict[str, Any] | |||||
| ) -> None: | |||||
| fetcher = { | |||||
| "name": name, | |||||
| "version": version, | |||||
| "metadata": metadata, | |||||
| } | |||||
| key = self._metadata_fetcher_key(fetcher) | |||||
| if key not in self._metadata_fetchers: | |||||
| self._metadata_fetchers[key] = fetcher | |||||
| def metadata_fetcher_get(self, name: str, version: str) -> Optional[Dict[str, Any]]: | |||||
| return self._metadata_fetchers.get( | |||||
| self._metadata_fetcher_key({"name": name, "version": version}) | |||||
| ) | |||||
| def metadata_authority_add( | |||||
| self, type: str, url: str, metadata: Dict[str, Any] | |||||
| ) -> None: | |||||
| authority = { | |||||
| "type": type, | |||||
| "url": url, | |||||
| "metadata": metadata, | "metadata": metadata, | ||||
| } | } | ||||
| key = self._metadata_provider_key(provider) | key = self._metadata_authority_key(authority) | ||||
| provider["id"] = key | self._metadata_authorities[key] = authority | ||||
| self._metadata_providers[key] = provider | |||||
| return key | def metadata_authority_get(self, type: str, url: str) -> Optional[Dict[str, Any]]: | ||||
| return self._metadata_authorities.get( | |||||
| def metadata_provider_get(self, provider_id): | self._metadata_authority_key({"type": type, "url": url}) | ||||
| return self._metadata_providers.get(provider_id) | ) | ||||
| def metadata_provider_get_by(self, provider): | |||||
| key = self._metadata_provider_key(provider) | |||||
| return self._metadata_providers.get(key) | |||||
| def _get_origin_url(self, origin): | def _get_origin_url(self, origin): | ||||
| if isinstance(origin, str): | if isinstance(origin, str): | ||||
| return origin | return origin | ||||
| else: | else: | ||||
| raise TypeError("origin must be a string.") | raise TypeError("origin must be a string.") | ||||
| def _person_add(self, person): | def _person_add(self, person): | ||||
| Show All 10 Lines | |||||
| @staticmethod | @staticmethod | ||||
| def _content_key(content): | def _content_key(content): | ||||
| """ A stable key and the algorithm for a content""" | """ A stable key and the algorithm for a content""" | ||||
| if isinstance(content, BaseContent): | if isinstance(content, BaseContent): | ||||
| content = content.to_dict() | content = content.to_dict() | ||||
| return tuple((key, content.get(key)) for key in sorted(DEFAULT_ALGORITHMS)) | return tuple((key, content.get(key)) for key in sorted(DEFAULT_ALGORITHMS)) | ||||
| @staticmethod | @staticmethod | ||||
| def _tool_key(tool): | def _metadata_fetcher_key(fetcher: Dict) -> Hashable: | ||||
| return "%r %r %r" % ( | return (fetcher["name"], fetcher["version"]) | ||||
| tool["name"], | |||||
| tool["version"], | |||||
| tuple(sorted(tool["configuration"].items())), | |||||
| ) | |||||
| @staticmethod | @staticmethod | ||||
| def _metadata_provider_key(provider): | def _metadata_authority_key(authority: Dict) -> Hashable: | ||||
| return "%r %r" % (provider["provider_name"], provider["provider_url"]) | return (authority["type"], authority["url"]) | ||||
| def diff_directories(self, from_dir, to_dir, track_renaming=False): | def diff_directories(self, from_dir, to_dir, track_renaming=False): | ||||
| raise NotImplementedError("InMemoryStorage.diff_directories") | raise NotImplementedError("InMemoryStorage.diff_directories") | ||||
| def diff_revisions(self, from_rev, to_rev, track_renaming=False): | def diff_revisions(self, from_rev, to_rev, track_renaming=False): | ||||
| raise NotImplementedError("InMemoryStorage.diff_revisions") | raise NotImplementedError("InMemoryStorage.diff_revisions") | ||||
| def diff_revision(self, revision, track_renaming=False): | def diff_revision(self, revision, track_renaming=False): | ||||
| Show All 10 Lines | |||||
Missing a rebase on latest master
That's D2987, ain't it?