Source code for pyqldb.cursor.read_ahead_cursor
# Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"). You may not use this file except in compliance with
# the License. A copy of the License is located at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# or in the "license" file accompanying this file. This file is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
# CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
# and limitations under the License.
from logging import getLogger
from queue import Full, Queue
from threading import Thread
from botocore.exceptions import ClientError
from .stream_cursor import StreamCursor
from ..errors import ResultClosedError
logger = getLogger(__name__)
[docs]class ReadAheadCursor(StreamCursor):
"""
An iterable class representing a read ahead cursor on a statement's result set. This class will create a queue of
size `read_ahead` and fetch results asynchronously to fill the queue.
:type statement_result: dict
:param statement_result: The initial result set data dictionary of the statement execution.
:type session: :py:class:`pyqldb.communication.session_client.SessionClient`
:param session: The parent session that represents the communication channel to QLDB.
:type transaction_id: str
:param transaction_id: The ID of this cursor's parent transaction, required to fetch pages.
:type read_ahead: int
:param read_ahead: The number of pages to read-ahead and buffer in this cursor.
:type executor: :py:class:`concurrent.futures.thread.ThreadPoolExecutor`
:param executor: The optional executor for asynchronous retrieval. If none specified, a new thread is created.
"""
def __init__(self, statement_result, session, transaction_id, read_ahead, executor):
super().__init__(statement_result, session, transaction_id)
self._queue = Queue(read_ahead - 1)
if executor is None:
thread = Thread(target=self._populate_queue)
thread.setDaemon(True)
thread.start()
else:
executor.submit(self._populate_queue)
def _are_there_more_results(self):
"""
Check if there are more results.
"""
return not (self._page.get('NextPageToken') is None and self._queue.empty())
def _next_page(self):
"""
Get the next page from the buffer queue.
"""
queue_result = self._queue.get()
if isinstance(queue_result, Exception):
raise queue_result
super()._accumulate_query_stats(queue_result)
self._page = queue_result.get('Page')
self._index = 0
def _populate_queue(self):
"""
Fill the buffer queue with the statement_result fetched. If ClientError is received, it is put in the queue and
execution stops. If the parent transaction is closed, stop fetching results.
"""
try:
next_page_token = self._page.get('NextPageToken')
while next_page_token is not None:
statement_result = self._session._fetch_page(self._transaction_id, next_page_token)
while True:
try:
# Timeout of 50ms.
self._queue.put(statement_result, timeout=0.05)
page = statement_result.get('Page')
next_page_token = page.get('NextPageToken')
break
except Full:
# When timeout is reached, check if the read-ahead retrieval thread should end.
if not self._is_open:
logger.debug('Cursor was closed; read-ahead retriever thread stopping.')
raise ResultClosedError(self._session.token)
except (ClientError, ResultClosedError) as error:
while not self._queue.empty():
self._queue.get_nowait()
logger.debug('Queued an exception: {}'.format(error))
self._queue.put(error)