diff --git a/swh/lister/core/indexing_lister.py b/swh/lister/core/indexing_lister.py deleted file mode 100644 index c1df1f2..0000000 --- a/swh/lister/core/indexing_lister.py +++ /dev/null @@ -1,276 +0,0 @@ -# Copyright (C) 2015-2019 the Software Heritage developers -# License: GNU General Public License version 3, or any later version -# See top-level LICENSE file for more information - -import abc -from datetime import datetime -from itertools import count -import logging -from typing import Any, Dict, List, Optional, Tuple, Union - -import dateutil -from requests import Response -from sqlalchemy import func - -from .lister_base import ListerBase -from .lister_transports import ListerHttpTransport - -logger = logging.getLogger(__name__) - - -class IndexingLister(ListerBase): - """Lister* intermediate class for any service that follows the pattern: - - - The service must report at least one stable unique identifier, known - herein as the UID value, for every listed repository. - - If the service splits the list of repositories into sublists, it must - report at least one stable and sorted index identifier for every listed - repository, known herein as the indexable value, which can be used as - part of the service endpoint query to request a sublist beginning from - that index. This might be the UID if the UID is monotonic. - - Client sends a request to list repositories starting from a given - index. - - Client receives structured (json/xml/etc) response with information about - a sequential series of repositories starting from that index and, if - necessary/available, some indication of the URL or index for fetching the - next series of repository data. - - See :class:`swh.lister.core.lister_base.ListerBase` for more details. - - This class cannot be instantiated. To create a new Lister for a source - code listing service that follows the model described above, you must - subclass this class and provide the required overrides in addition to - any unmet implementation/override requirements of this class's base. - (see parent class and member docstrings for details) - - Required Overrides:: - - def get_next_target_from_response - - """ - - flush_packet_db = 20 - """Number of iterations in-between write flushes of lister repositories to - db (see fn:`run`). - """ - default_min_bound = "" - """Default initialization value for the minimum boundary index to use when - undefined (see fn:`run`). - """ - - @abc.abstractmethod - def get_next_target_from_response( - self, response: Response - ) -> Union[Optional[datetime], Optional[str], Optional[int]]: - """Find the next server endpoint identifier given the entire response. - - Implementation of this method depends on the server API spec - and the shape of the network response object returned by the - transport_request method. - - Args: - response (transport response): response page from the server - Returns: - index of next page, possibly extracted from a next href url - """ - pass - - # You probably don't need to override anything below this line. - - def filter_before_inject( - self, models_list: List[Dict[str, Any]] - ) -> List[Dict[str, Any]]: - """Overrides ListerBase.filter_before_inject - - Bounds query results by this Lister's set max_index. - """ - models_list = [ - m - for m in models_list - if self.is_within_bounds(m["indexable"], None, self.max_index) - ] - return models_list - - def db_query_range(self, start, end): - """Look in the db for a range of repositories with indexable - values in the range [start, end] - - Args: - start (model indexable type): start of desired indexable range - end (model indexable type): end of desired indexable range - Returns: - a list of sqlalchemy.ext.declarative.declarative_base objects - with indexable values within the given range - """ - retlist = self.db_session.query(self.MODEL) - if start is not None: - retlist = retlist.filter(self.MODEL.indexable >= start) - if end is not None: - retlist = retlist.filter(self.MODEL.indexable <= end) - return retlist - - def db_partition_indices( - self, partition_size: int - ) -> List[Tuple[Optional[int], Optional[int]]]: - """Describe an index-space compartmentalization of the db table - in equal sized chunks. This is used to describe min&max bounds for - parallelizing fetch tasks. - - Args: - partition_size (int): desired size to make each partition - - Returns: - a list of tuples (begin, end) of indexable value that - declare approximately equal-sized ranges of existing - repos - - """ - n = max(self.db_num_entries(), 10) - partition_size = min(partition_size, n) - n_partitions = n // partition_size - - min_index = self.db_first_index() - max_index = self.db_last_index() - - if min_index is None or max_index is None: - # Nothing to list - return [] - - if isinstance(min_index, str): - - def format_bound(bound): - return bound.isoformat() - - min_index = dateutil.parser.parse(min_index) - max_index = dateutil.parser.parse(max_index) - elif isinstance(max_index - min_index, int): - - def format_bound(bound): - return int(bound) - - else: - - def format_bound(bound): - return bound - - partition_width = (max_index - min_index) / n_partitions - - # Generate n_partitions + 1 bounds for n_partitions partitons - bounds = [ - format_bound(min_index + i * partition_width) - for i in range(n_partitions + 1) - ] - - # Trim duplicate bounds - bounds.append(None) - bounds = [cur for cur, next in zip(bounds[:-1], bounds[1:]) if cur != next] - - # Remove bounds for lowest and highest partition - bounds[0] = bounds[-1] = None - - return list(zip(bounds[:-1], bounds[1:])) - - def db_first_index(self): - """Look in the db for the smallest indexable value - - Returns: - the smallest indexable value of all repos in the db - """ - t = self.db_session.query(func.min(self.MODEL.indexable)).first() - if t: - return t[0] - return None - - def db_last_index(self): - """Look in the db for the largest indexable value - - Returns: - the largest indexable value of all repos in the db - """ - t = self.db_session.query(func.max(self.MODEL.indexable)).first() - if t: - return t[0] - return None - - def disable_deleted_repo_tasks(self, start, end, keep_these): - """Disable tasks for repos that no longer exist between start and end. - - Args: - start: beginning of range to disable - end: end of range to disable - keep_these (uid list): do not disable repos with uids in this list - """ - if end is None: - end = self.db_last_index() - - if not self.is_within_bounds(end, None, self.max_index): - end = self.max_index - - deleted_repos = self.winnow_models( - self.db_query_range(start, end), self.MODEL.uid, keep_these - ) - tasks_to_disable = [ - repo.task_id for repo in deleted_repos if repo.task_id is not None - ] - if tasks_to_disable: - self.scheduler.disable_tasks(tasks_to_disable) - for repo in deleted_repos: - repo.task_id = None - - def run(self, min_bound=None, max_bound=None): - """Main entry function. Sequentially fetches repository data - from the service according to the basic outline in the class - docstring, continually fetching sublists until either there - is no next index reference given or the given next index is greater - than the desired max_bound. - - Args: - min_bound (indexable type): optional index to start from - max_bound (indexable type): optional index to stop at - Returns: - nothing - """ - status = "uneventful" - self.min_index = min_bound - self.max_index = max_bound - - def ingest_indexes(): - index = min_bound or self.default_min_bound - for i in count(1): - response, injected_repos = self.ingest_data(index) - if not response and not injected_repos: - logger.info("No response from api server, stopping") - return - - next_index = self.get_next_target_from_response(response) - # Determine if any repos were deleted, and disable their tasks. - keep_these = list(injected_repos.keys()) - self.disable_deleted_repo_tasks(index, next_index, keep_these) - - # termination condition - if next_index is None or next_index == index: - logger.info("stopping after index %s, no next link found", index) - return - index = next_index - logger.debug("Index: %s", index) - yield i - - for i in ingest_indexes(): - if (i % self.flush_packet_db) == 0: - logger.debug("Flushing updates at index %s", i) - self.db_session.commit() - self.db_session = self.mk_session() - status = "eventful" - - self.db_session.commit() - self.db_session = self.mk_session() - return {"status": status} - - -class IndexingHttpLister(ListerHttpTransport, IndexingLister): - """Convenience class for ensuring right lookup and init order - when combining IndexingLister and ListerHttpTransport.""" - - def __init__(self, url=None, override_config=None): - IndexingLister.__init__(self, override_config=override_config) - ListerHttpTransport.__init__(self, url=url) diff --git a/swh/lister/core/page_by_page_lister.py b/swh/lister/core/page_by_page_lister.py deleted file mode 100644 index 2db8a53..0000000 --- a/swh/lister/core/page_by_page_lister.py +++ /dev/null @@ -1,164 +0,0 @@ -# Copyright (C) 2015-2018 the Software Heritage developers -# License: GNU General Public License version 3, or any later version -# See top-level LICENSE file for more information - -import abc -import logging - -from .lister_base import ListerBase -from .lister_transports import ListerHttpTransport - - -class PageByPageLister(ListerBase): - """Lister* intermediate class for any service that follows the simple - pagination page pattern. - - - Client sends a request to list repositories starting from a - given page identifier. - - - Client receives structured (json/xml/etc) response with - information about a sequential series of repositories (per page) - starting from a given index. And, if available, some indication - of the next page index for fetching the remaining repository - data. - - See :class:`swh.lister.core.lister_base.ListerBase` for more - details. - - This class cannot be instantiated. To create a new Lister for a - source code listing service that follows the model described - above, you must subclass this class. Then provide the required - overrides in addition to any unmet implementation/override - requirements of this class's base (see parent class and member - docstrings for details). - - Required Overrides:: - - def get_next_target_from_response - - """ - - @abc.abstractmethod - def get_next_target_from_response(self, response): - """Find the next server endpoint page given the entire response. - - Implementation of this method depends on the server API spec - and the shape of the network response object returned by the - transport_request method. - - For example, some api can use the headers links to provide the - next page. - - Args: - response (transport response): response page from the server - - Returns: - index of next page, possibly extracted from a next href url - - """ - pass - - @abc.abstractmethod - def get_pages_information(self): - """Find the total number of pages. - - Implementation of this method depends on the server API spec - and the shape of the network response object returned by the - transport_request method. - - For example, some api can use dedicated headers: - - x-total-pages to provide the total number of pages - - x-total to provide the total number of repositories - - x-per-page to provide the number of elements per page - - Returns: - tuple (total number of repositories, total number of - pages, per_page) - - """ - pass - - # You probably don't need to override anything below this line. - - def do_additional_checks(self, models_list): - """Potentially check for existence of repositories in models_list. - - This will be called only if check_existence is flipped on in - the run method below. - - """ - for m in models_list: - sql_repo = self.db_query_equal("uid", m["uid"]) - if sql_repo: - return False - return models_list - - def run(self, min_bound=None, max_bound=None, check_existence=False): - """Main entry function. Sequentially fetches repository data from the - service according to the basic outline in the class - docstring. Continually fetching sublists until either there - is no next page reference given or the given next page is - greater than the desired max_page. - - Args: - min_bound: optional page to start from - max_bound: optional page to stop at - check_existence (bool): optional existence check (for - incremental lister whose sort - order is inverted) - - Returns: - nothing - - """ - status = "uneventful" - page = min_bound or 0 - loop_count = 0 - - self.min_page = min_bound - self.max_page = max_bound - - while self.is_within_bounds(page, self.min_page, self.max_page): - logging.info("listing repos starting at %s" % page) - - response, injected_repos = self.ingest_data(page, checks=check_existence) - if not response and not injected_repos: - logging.info("No response from api server, stopping") - break - elif not injected_repos: - logging.info("Repositories already seen, stopping") - break - status = "eventful" - - next_page = self.get_next_target_from_response(response) - - # termination condition - - if (next_page is None) or (next_page == page): - logging.info("stopping after page %s, no next link found" % page) - break - else: - page = next_page - - loop_count += 1 - if loop_count == 20: - logging.info("flushing updates") - loop_count = 0 - self.db_session.commit() - self.db_session = self.mk_session() - - self.db_session.commit() - self.db_session = self.mk_session() - - return {"status": status} - - -class PageByPageHttpLister(ListerHttpTransport, PageByPageLister): - """Convenience class for ensuring right lookup and init order when - combining PageByPageLister and ListerHttpTransport. - - """ - - def __init__(self, url=None, override_config=None): - PageByPageLister.__init__(self, override_config=override_config) - ListerHttpTransport.__init__(self, url=url) diff --git a/swh/lister/core/tests/test_indexing_lister.py b/swh/lister/core/tests/test_indexing_lister.py deleted file mode 100644 index 3d29ab7..0000000 --- a/swh/lister/core/tests/test_indexing_lister.py +++ /dev/null @@ -1,125 +0,0 @@ -# Copyright (C) 2019 the Software Heritage developers -# License: GNU General Public License version 3, or any later version -# See top-level LICENSE file for more information - -import datetime - -from swh.lister.core.indexing_lister import IndexingLister - - -class MockedIndexingListerDbPartitionIndices(IndexingLister): - # Abstract Attribute boilerplate - LISTER_NAME = "DbPartitionIndices" - MODEL = type(None) - - # ABC boilerplate - def get_next_target_from_response(self, *args, **kwargs): - pass - - def __init__(self, num_entries, first_index, last_index): - self.num_entries = num_entries - self.first_index = first_index - self.last_index = last_index - - def db_num_entries(self): - return self.num_entries - - def db_first_index(self): - return self.first_index - - def db_last_index(self): - return self.last_index - - -def test_db_partition_indices(): - m = MockedIndexingListerDbPartitionIndices( - num_entries=1000, first_index=1, last_index=10001, - ) - assert m - - partitions = m.db_partition_indices(100) - - # 1000 entries with indices 1 - 10001, partitions of 100 entries - assert len(partitions) == 10 - assert partitions[0] == (None, 1001) - assert partitions[-1] == (9001, None) - - -def test_db_partition_indices_zero_first(): - m = MockedIndexingListerDbPartitionIndices( - num_entries=1000, first_index=0, last_index=10000, - ) - assert m - - partitions = m.db_partition_indices(100) - - # 1000 entries with indices 0 - 10000, partitions of 100 entries - assert len(partitions) == 10 - assert partitions[0] == (None, 1000) - assert partitions[-1] == (9000, None) - - -def test_db_partition_indices_small_index_range(): - m = MockedIndexingListerDbPartitionIndices( - num_entries=5000, first_index=0, last_index=5, - ) - assert m - - partitions = m.db_partition_indices(100) - - assert partitions == [(None, 1), (1, 2), (2, 3), (3, 4), (4, None)] - - -def test_db_partition_indices_date_indices(): - # 24 hour delta - first = datetime.datetime.fromisoformat("2019-11-01T00:00:00+00:00") - last = datetime.datetime.fromisoformat("2019-11-02T00:00:00+00:00") - - m = MockedIndexingListerDbPartitionIndices( - # one entry per second - num_entries=24 * 3600, - first_index=first, - last_index=last, - ) - assert m - - # 3600 entries per partition => 1 partition per hour - partitions = m.db_partition_indices(3600) - - assert len(partitions) == 24 - - expected_bounds = [first + datetime.timedelta(hours=i) for i in range(25)] - expected_bounds[0] = expected_bounds[-1] = None - - assert partitions == list(zip(expected_bounds[:-1], expected_bounds[1:])) - - -def test_db_partition_indices_float_index_range(): - m = MockedIndexingListerDbPartitionIndices( - num_entries=10000, first_index=0.0, last_index=1.0, - ) - assert m - - partitions = m.db_partition_indices(1000) - - assert len(partitions) == 10 - - expected_bounds = [0.1 * i for i in range(11)] - expected_bounds[0] = expected_bounds[-1] = None - - assert partitions == list(zip(expected_bounds[:-1], expected_bounds[1:])) - - -def test_db_partition_indices_uneven_int_index_range(): - m = MockedIndexingListerDbPartitionIndices( - num_entries=5641, first_index=0, last_index=10000, - ) - assert m - - partitions = m.db_partition_indices(500) - - assert len(partitions) == 5641 // 500 - - for i, (start, end) in enumerate(partitions): - assert isinstance(start, int) or (i == 0 and start is None) - assert isinstance(end, int) or (i == 10 and end is None)