Source code for pyqldb.session.qldb_session

# Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"). You may not use this file except in compliance with
# the License. A copy of the License is located at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# or in the "license" file accompanying this file. This file is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
# CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
# and limitations under the License.
from logging import getLogger
from time import sleep

from botocore.exceptions import ClientError, EndpointConnectionError, ConnectionClosedError, ConnectTimeoutError, \
    ReadTimeoutError

from pyqldb.util.retry import Retry
from ..errors import is_bad_request_exception, is_invalid_session_exception, is_occ_conflict_exception, \
    is_retriable_exception, SessionClosedError, LambdaAbortedError, StartTransactionError
from ..cursor.buffered_cursor import BufferedCursor
from ..cursor.stream_cursor import StreamCursor
from ..execution.executor import Executor
from ..transaction.transaction import Transaction

logger = getLogger(__name__)
SLEEP_CAP_MS = 5000
SLEEP_BASE_MS = 10

RETRYABLE_HTTP_ERRORS = (
    ReadTimeoutError, EndpointConnectionError, ConnectionClosedError, ConnectTimeoutError
)


[docs]class QldbSession: """ The QldbSession is meant for internal use only. A class representing a session to a QLDB ledger for interacting with QLDB. A QldbSession is linked to the specified ledger in the parent driver of the instance of the QldbSession. In any given QldbSession, only one transaction can be active at a time. This object can have only one underlying session to QLDB, and therefore the lifespan of a QldbSession is tied to the underlying session, which is not indefinite, and on expiry this QldbSession will become invalid, and a new QldbSession needs to be created from the parent driver in order to continue usage. When a QldbSession is no longer needed, :py:meth:`pyqldb.session.qldb_session.QldbSession._end_session` should be invoked in order to clean up any resources. See :py:class:`pyqldb.driver.qldb_driver.QldbDriver` for an example of session lifecycle management, allowing the re-use of sessions when possible. There should only be one thread interacting with a session at any given time. :type session: :py:class:`pyqldb.communication.session_client.SessionClient` :param session: The session object representing a communication channel with QLDB. :type read_ahead: int :param read_ahead: The number of pages to read-ahead and buffer when retrieving results. :type executor: :py:class:`concurrent.futures.thread.ThreadPoolExecutor` :param executor: The executor to be used by the retrieval thread. :type return_session_to_pool: :function :param return_session_to_pool: A callback that describes how the session will be returned to the pool. """ def __init__(self, session, read_ahead, executor, return_session_to_pool): self._is_closed = False self._read_ahead = read_ahead self._executor = executor self._session = session self._return_session_to_pool = return_session_to_pool def __enter__(self): """ Context Manager function to support the 'with' statement. """ return self def __exit__(self, exc_type, exc_val, exc_tb): """ Context Manager function to support the 'with' statement. """ self._release() @property def ledger_name(self): """ The **read-only** ledger name. """ return self._session.ledger_name @property def session_id(self): """ The **read-only** session ID. """ return self._session.id @property def session_token(self): """ The **read-only** session token. """ return self._session.token def _close(self): """ Close this `QldbSession`. """ self._is_closed = True def _end_session(self): """ End this session. No-op if already closed. """ if not self._is_closed: self._is_closed = True self._session._close() def _execute_lambda(self, query_lambda, retry_config, context): """ Implicitly start a transaction, execute the lambda function, and commit the transaction, retrying up to the retry limit if an OCC conflict or retriable exception occurs. :type query_lambda: function :param query_lambda: The lambda function to execute. A lambda function cannot have any side effects as it may be invoked multiple times, and the result cannot be trusted until the transaction is committed. :type retry_config: :py:class:`pyqldb.config.retry_config.RetryConfig` :param retry_config: Config to specify max number of retries, base and custom backoff strategy for retries. :type context: :py:class:`pyqldb.driver.qldb_driver._LambdaExecutionContext` :param context: The LambdaExecutionContext instance is used for storing execution context across a lifespan of a lambda (including retries due to session expiry). :rtype: :py:class:`pyqldb.cursor.buffered_cursor.BufferedCursor`/object :return: The return value of the lambda function which could be a :py:class:`pyqldb.cursor.buffered_cursor.BufferedCursor` on the result set of a statement within the lambda. :raises IllegalStateError: When the commit digest from commit transaction result does not match. :raises ClientError: When there is an error executing against QLDB. :raises LambdaAbortedError: If the lambda function calls :py:class:`pyqldb.execution.executor.Executor.abort`. """ while True: transaction = None error = None try: transaction = self._start_transaction() result = query_lambda(Executor(transaction)) if isinstance(result, StreamCursor): # If someone accidentally returned a StreamCursor object which would become invalidated by the # commit, automatically buffer it to allow them to use the result anyway. result = BufferedCursor(result) transaction._commit() return result except LambdaAbortedError as lae: self._no_throw_abort(transaction) raise lae except StartTransactionError as ste: error = ste.error self._no_throw_abort(transaction) if context.execution_attempt >= retry_config.retry_limit: # raise wrapped Error raise ste.error except RETRYABLE_HTTP_ERRORS as rhe: error = rhe self._no_throw_abort(transaction) if context.execution_attempt >= retry_config.retry_limit: raise rhe logger.warning('Retryable HTTP error occurred: {}, retrying transaction'.format(rhe)) except ClientError as ce: error = ce if is_invalid_session_exception(ce): self._is_closed = True raise ce if not is_occ_conflict_exception(ce): self._no_throw_abort(transaction) if is_occ_conflict_exception(ce) or is_retriable_exception(ce): if context.execution_attempt >= retry_config.retry_limit: raise ce logger.warning('OCC conflict or retriable exception occurred: {}'.format(ce)) else: raise ce except Exception as e: self._no_throw_abort(transaction) raise e context.increment_execution_attempt() transaction_id = None if transaction is None else transaction.transaction_id self._retry_sleep(retry_config, context.execution_attempt, error, transaction_id) def _start_transaction(self): """ Start a transaction using an available database session. :rtype: :py:class:`pyqldb.transaction.transaction.Transaction` :return: A new transaction. :raises StartTransactionError: When this session fails to start a new transaction on the session. """ try: transaction_id = self._session._start_transaction().get('TransactionId') transaction = Transaction(self._session, self._read_ahead, transaction_id, self._executor) return transaction except ClientError as ce: if is_bad_request_exception(ce): logger.warning('Error occurred while starting transaction: {}'.format(ce)) raise StartTransactionError(ce) raise ce def _throw_if_closed(self): """ Check and throw if this session is closed. """ if self._is_closed: logger.error(SessionClosedError()) raise SessionClosedError @staticmethod def _retry_sleep(retry_config, execution_attempt, error, transaction_id): sleep(Retry.calculate_backoff(retry_config, execution_attempt, error, transaction_id) / 1000) def _no_throw_abort(self, transaction): """ Send an abort request which will not throw on failure. """ try: if transaction is None: self._session._abort_transaction() else: transaction._abort() except ClientError as ce: logger.warning('Ignored error aborting transaction during execution: {}'.format(ce)) def _release(self): """ Return this `QldbSession` to the pool. """ self._return_session_to_pool(self)