Source code for pyqldb.session.qldb_session

# Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"). You may not use this file except in compliance with
# the License. A copy of the License is located at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# or in the "license" file accompanying this file. This file is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
# CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
# and limitations under the License.
from logging import getLogger

from botocore.exceptions import ClientError

from ..cursor.buffered_cursor import BufferedCursor
from ..cursor.stream_cursor import StreamCursor
from ..errors import ExecuteError, is_invalid_session_exception, is_occ_conflict_exception, is_retriable_exception, \
    is_transaction_expired_exception
from ..execution.executor import Executor
from ..transaction.transaction import Transaction

logger = getLogger(__name__)


[docs]class QldbSession: """ The QldbSession is meant for internal use only. A class representing a session to a QLDB ledger for interacting with QLDB. A QldbSession is linked to the specified ledger in the parent driver of the instance of the QldbSession. In any given QldbSession, only one transaction can be active at a time. This object can have only one underlying session to QLDB, and therefore the lifespan of a QldbSession is tied to the underlying session, which is not indefinite, and on expiry this QldbSession will become invalid, and a new QldbSession needs to be created from the parent driver in order to continue usage. When a QldbSession is no longer needed, :py:meth:`pyqldb.session.qldb_session.QldbSession._end_session` should be invoked in order to clean up any resources. See :py:class:`pyqldb.driver.qldb_driver.QldbDriver` for an example of session lifecycle management, allowing the re-use of sessions when possible. There should only be one thread interacting with a session at any given time. :type session: :py:class:`pyqldb.communication.session_client.SessionClient` :param session: The session object representing a communication channel with QLDB. :type read_ahead: int :param read_ahead: The number of pages to read-ahead and buffer when retrieving results. :type executor: :py:class:`concurrent.futures.thread.ThreadPoolExecutor` :param executor: The executor to be used by the retrieval thread. """ def __init__(self, session, read_ahead, executor): self._is_alive = True self._read_ahead = read_ahead self._executor = executor self._session = session @property def ledger_name(self): """ The **read-only** ledger name. """ return self._session.ledger_name @property def session_id(self): """ The **read-only** session ID. """ return self._session.id @property def session_token(self): """ The **read-only** session token. """ return self._session.token def _end_session(self): """ End this session. No-op if already closed. """ if self._is_alive: self._is_alive = False self._session._close() def _execute_lambda(self, query_lambda): """ Implicitly start a transaction, execute the lambda function, and commit the transaction. :type query_lambda: function :param query_lambda: The lambda function to execute. A lambda function cannot have any side effects as it may be invoked multiple times, and the result cannot be trusted until the transaction is committed. :rtype: :py:class:`pyqldb.cursor.buffered_cursor.BufferedCursor`/object :return: The return value of the lambda function which could be a :py:class:`pyqldb.cursor.buffered_cursor.BufferedCursor` on the result set of a statement within the lambda. :raises ExecuteError: Error containing the context of a failure during execute. """ transaction = None transaction_id = None try: transaction = self._start_transaction() result = query_lambda(Executor(transaction)) if isinstance(result, StreamCursor): # If someone accidentally returned a StreamCursor object which would become invalidated by the # commit, automatically buffer it to allow them to use the result anyway. result = BufferedCursor(result) transaction._commit() return result except Exception as e: is_retryable = is_retriable_exception(e) is_session_invalid = is_invalid_session_exception(e) if is_session_invalid and not is_transaction_expired_exception(e): # Underlying session is dead on InvalidSessionException except for transaction expiry. self._is_alive = False elif not is_occ_conflict_exception(e): # OCC does not need session state reset as the transaction is implicitly closed. self._no_throw_abort() if transaction is not None: transaction_id = transaction.transaction_id raise ExecuteError(e, is_retryable, is_session_invalid, transaction_id) finally: if transaction is not None: transaction._close_child_cursors() def _start_transaction(self): """ Start a transaction using an available database session. :rtype: :py:class:`pyqldb.transaction.transaction.Transaction` :return: A new transaction. """ transaction_id = self._session._start_transaction().get('TransactionId') transaction = Transaction(self._session, self._read_ahead, transaction_id, self._executor) return transaction def _no_throw_abort(self): """ Send an abort request which will not throw on failure. """ try: self._session._abort_transaction() except ClientError as ce: self._is_alive = False logger.warning('Ignored error aborting transaction during execution: {}'.format(ce))