Changeset View
Changeset View
Standalone View
Standalone View
swh/search/in_memory.py
| # Copyright (C) 2019-2021 The Software Heritage developers | # Copyright (C) 2019-2021 The Software Heritage developers | ||||
| # See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
| # License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
| # See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
| from collections import defaultdict | from collections import defaultdict | ||||
| from datetime import datetime | |||||
| import itertools | import itertools | ||||
| import re | import re | ||||
| from typing import Any, Dict, Iterable, Iterator, List, Optional | from typing import Any, Dict, Iterable, Iterator, List, Optional | ||||
| from swh.model.identifiers import origin_identifier | from swh.model.identifiers import origin_identifier | ||||
| from swh.search.interface import MinimalOriginDict, OriginDict, PagedResult | from swh.search.interface import MinimalOriginDict, OriginDict, PagedResult | ||||
| _words_regexp = re.compile(r"\w+") | _words_regexp = re.compile(r"\w+") | ||||
| ▲ Show 20 Lines • Show All 45 Lines • ▼ Show 20 Lines | def origin_update(self, documents: Iterable[OriginDict]) -> None: | ||||
| if "url" in document: | if "url" in document: | ||||
| document["_url_tokens"] = set( | document["_url_tokens"] = set( | ||||
| self._url_splitter.split(source_document["url"]) | self._url_splitter.split(source_document["url"]) | ||||
| ) | ) | ||||
| if "visit_types" in document: | if "visit_types" in document: | ||||
| document["visit_types"] = set(source_document["visit_types"]) | document["visit_types"] = set(source_document["visit_types"]) | ||||
| if "visit_types" in self._origins[id_]: | if "visit_types" in self._origins[id_]: | ||||
| document["visit_types"].update(self._origins[id_]["visit_types"]) | document["visit_types"].update(self._origins[id_]["visit_types"]) | ||||
| if "nb_visits" in document: | |||||
| document["nb_visits"] = max( | |||||
| document["nb_visits"], self._origins[id_].get("nb_visits", 0) | |||||
| ) | |||||
| if "last_visit_date" in document: | |||||
| document["last_visit_date"] = max( | |||||
| datetime.fromisoformat(document["last_visit_date"]), | |||||
| datetime.fromisoformat( | |||||
| self._origins[id_] | |||||
| .get("last_visit_date", "0001-01-01T00:00:00.000000Z",) | |||||
| .replace("Z", "+00:00") | |||||
| ), | |||||
| ).isoformat() | |||||
| self._origins[id_].update(document) | self._origins[id_].update(document) | ||||
| if id_ not in self._origin_ids: | if id_ not in self._origin_ids: | ||||
| self._origin_ids.append(id_) | self._origin_ids.append(id_) | ||||
| def origin_search( | def origin_search( | ||||
| self, | self, | ||||
| *, | *, | ||||
| url_pattern: Optional[str] = None, | url_pattern: Optional[str] = None, | ||||
| metadata_pattern: Optional[str] = None, | metadata_pattern: Optional[str] = None, | ||||
| with_visit: bool = False, | with_visit: bool = False, | ||||
| visit_types: Optional[List[str]] = None, | visit_types: Optional[List[str]] = None, | ||||
| page_token: Optional[str] = None, | page_token: Optional[str] = None, | ||||
| min_nb_visits: int = 0, | |||||
| min_last_visit_date: str = "", | |||||
| limit: int = 50, | limit: int = 50, | ||||
| ) -> PagedResult[MinimalOriginDict]: | ) -> PagedResult[MinimalOriginDict]: | ||||
| hits: Iterator[Dict[str, Any]] = ( | hits: Iterator[Dict[str, Any]] = ( | ||||
| self._origins[id_] | self._origins[id_] | ||||
| for id_ in self._origin_ids | for id_ in self._origin_ids | ||||
| if not self._origins[id_].get("blocklisted") | if not self._origins[id_].get("blocklisted") | ||||
| ) | ) | ||||
| Show All 35 Lines | ) -> PagedResult[MinimalOriginDict]: | ||||
| raise ValueError( | raise ValueError( | ||||
| "At least one of url_pattern and metadata_pattern must be provided." | "At least one of url_pattern and metadata_pattern must be provided." | ||||
| ) | ) | ||||
| next_page_token: Optional[str] = None | next_page_token: Optional[str] = None | ||||
| if with_visit: | if with_visit: | ||||
| hits = filter(lambda o: o.get("has_visits"), hits) | hits = filter(lambda o: o.get("has_visits"), hits) | ||||
| if min_nb_visits: | |||||
| hits = filter(lambda o: o.get("nb_visits", 0) >= min_nb_visits, hits) | |||||
| if min_last_visit_date: | |||||
| hits = filter( | |||||
| lambda o: datetime.fromisoformat(o.get("last_visit_date", "")) | |||||
| >= datetime.fromisoformat(min_last_visit_date), | |||||
| hits, | |||||
| ) | |||||
| if visit_types is not None: | if visit_types is not None: | ||||
| visit_types_set = set(visit_types) | visit_types_set = set(visit_types) | ||||
| hits = filter( | hits = filter( | ||||
| lambda o: visit_types_set.intersection(o.get("visit_types", set())), | lambda o: visit_types_set.intersection(o.get("visit_types", set())), | ||||
| hits, | hits, | ||||
| ) | ) | ||||
| Show All 13 Lines | |||||