Source code for pyqldb.cursor.stream_cursor

# Copyright 2019, Inc. or its affiliates. All Rights Reserved.
# Licensed under the Apache License, Version 2.0 (the "License"). You may not use this file except in compliance with
# the License. A copy of the License is located at
# or in the "license" file accompanying this file. This file is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
# CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
# and limitations under the License.
from amazon.ion.simpleion import loads

from ..errors import ResultClosedError

[docs]class StreamCursor: """ An iterable class representing a stream cursor on a statement's result set. :type page: dict :param page: The page containing the initial result set data dictionary of the statement's execution. :type session: :py:class:`pyqldb.communication.session_client.SessionClient` :param session: The parent session that represents the communication channel to QLDB. :type transaction_id: str :param transaction_id: The ID of this cursor's parent transaction, required to fetch pages. """ def __init__(self, page, session, transaction_id): self._page = page self._session = session self._index = 0 self._is_open = True self._transaction_id = transaction_id def __iter__(self): """ Iterator function to implement the iterator protocol. """ return self def __next__(self): """ Iterator function to implement the iterator protocol. Pulls the next page containing the results being iterated on when the end of the current is reached. :rtype: :py:class:`amazon.ion.simple_types.IonSymbol` :return: The Ion value in the row that the cursor is on. :raises StopIteration: When there are no more results. """ if not self._is_open: raise ResultClosedError(self._session.token) if self._index >= len(self._page.get('Values')): if not self._are_there_more_results(): raise StopIteration self._next_page() while len(self._page.get('Values')) == 0 and self._are_there_more_results(): self._next_page() if len(self._page.get('Values')) == 0: raise StopIteration row = self._page.get('Values')[self._index] ion_value = self._value_holder_to_ion_value(row) self._index += 1 return ion_value
[docs] def close(self): """ Close this stream cursor object. """ self._is_open = False
def _are_there_more_results(self): """ Check if there are more results. """ return self._page.get('NextPageToken') is not None def _next_page(self): """ Get the next page using this cursor's session. """ statement_result = self._session._fetch_page(self._transaction_id, self._page.get('NextPageToken')) page = statement_result.get('Page') self._page = page self._index = 0 @staticmethod def _value_holder_to_ion_value(value): """ Get the Ion binary out from a value holder, and convert it into an Ion value. """ binary = value.get('IonBinary') ion_value = loads(binary) return ion_value