Source code for pyqldb.driver.base_qldb_driver
# Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"). You may not use this file except in compliance with
# the License. A copy of the License is located at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# or in the "license" file accompanying this file. This file is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
# CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
# and limitations under the License.
from abc import ABC, abstractmethod
from logging import getLogger
from boto3 import client
from botocore.config import Config
from boto3.session import Session
from .. import __version__
from ..communication.session_client import SessionClient
from ..session.qldb_session import QldbSession
logger = getLogger(__name__)
SERVICE_DESCRIPTION = 'QLDB Driver for Python v{}'.format(__version__)
SERVICE_NAME = 'qldb-session'
SERVICE_RETRY = {'max_attempts': 0}
DEFAULT_CONFIG = Config(user_agent_extra=SERVICE_DESCRIPTION, retries=SERVICE_RETRY)
[docs]class BaseQldbDriver(ABC):
"""
An abstract base class representing a factory for creating sessions.
:type ledger_name: str
:param ledger_name: The QLDB ledger name.
:type retry_limit: int
:param retry_limit: The number of automatic retries for statement executions using convenience methods on sessions
when an OCC conflict or retriable exception occurs. This value must not be negative.
:type read_ahead: int
:param read_ahead: The number of read-ahead buffers. Determines the maximum number of statement result pages that
can be buffered in memory. This value must be either 0, to disable read-ahead, or a minimum of 2.
:type executor: :py:class:`concurrent.futures.thread.ThreadPoolExecutor`
:param executor: A specific, optional, executor to be used by the retrieval thread if read-ahead is enabled.
:type region_name: str
:param region_name: See [1].
:type verify: bool/str
:param verify: See [1].
:type endpoint_url: str
:param endpoint_url: See [1].
:type aws_access_key_id: str
:param aws_access_key_id: See [1].
:type aws_secret_access_key: str
:param aws_secret_access_key: See [1].
:type aws_session_token: str
:param aws_session_token: See [1].
:type config: :py:class:`botocore.config.Config`
:param config: See [2]. Note that parameter user_agent_extra will be overwritten.
:type boto3_session: :py:class:`boto3.session.Session`
:param boto3_session: The boto3 session to create the client with (see [1]). The boto3 session is expected to be
configured correctly.
:raises TypeError: When config is not an instance of :py:class:`botocore.config.Config`.
When boto3_session is not an instance of :py:class:`boto3.session.Session`.
:raises ValueError: When `read_ahead` or `retry_limit` is not set to the allowed values specified.
[1]: `Boto3 Session.client Reference <https://boto3.amazonaws.com/v1/documentation/api/latest/reference/core/session.html#boto3.session.Session.client>`_.
[2]: `Botocore Config Reference <https://botocore.amazonaws.com/v1/documentation/api/latest/reference/config.html>`_.
"""
def __init__(self, ledger_name, retry_limit=4, read_ahead=0, executor=None, region_name=None, verify=None,
endpoint_url=None, aws_access_key_id=None, aws_secret_access_key=None, aws_session_token=None,
config=None, boto3_session=None):
if retry_limit < 0:
raise ValueError('Value for retry_limit cannot be negative.')
if read_ahead < 2 and read_ahead != 0:
raise ValueError('Value for read_ahead must be 0 or 2 or greater.')
self._ledger_name = ledger_name
self._retry_limit = retry_limit
self._read_ahead = read_ahead
self._executor = executor
self._is_closed = False
if config is not None:
if not isinstance(config, Config):
raise TypeError('config must be of type botocore.config.Config. Found: {}'
.format(type(config).__name__))
self._config = config.merge(DEFAULT_CONFIG)
else:
self._config = DEFAULT_CONFIG
if boto3_session is not None:
if not isinstance(boto3_session, Session):
raise TypeError('boto3_session must be of type boto3.session.Session. Found: {}'
.format(type(boto3_session).__name__))
if region_name is not None or aws_access_key_id is not None or aws_secret_access_key is not None or \
aws_session_token is not None:
logger.warning('Custom parameters were detected while using a specified Boto3 client and will be '
'ignored. Please preconfigure the Boto3 client with those parameters instead.')
self._client = boto3_session.client(SERVICE_NAME, verify=verify, endpoint_url=endpoint_url,
config=self._config)
else:
self._client = client(SERVICE_NAME, region_name=region_name, verify=verify, endpoint_url=endpoint_url,
aws_access_key_id=aws_access_key_id, aws_secret_access_key=aws_secret_access_key,
aws_session_token=aws_session_token, config=self._config)
def __enter__(self):
"""
Context Manager function to support the 'with' statement.
"""
return self
def __exit__(self, exc_type, exc_val, exc_tb):
"""
Context Manager function to support the 'with' statement.
"""
self.close()
[docs] @abstractmethod
def get_session(self):
"""
Retrieve a QldbSession object. This method must be overridden.
"""
pass
@property
def read_ahead(self):
"""
The number of read-ahead buffers to be made available per `StreamCursor` instantiated by this driver.
Determines the maximum number of result pages that can be buffered in memory.
.. seealso:: :py:class:`pyqldb.cursor.stream_cursor.StreamCursor`
"""
return self._read_ahead
@property
def retry_limit(self):
"""
The number of automatic retries for statement executions using convenience methods on sessions when
an OCC conflict or retriable exception occurs.
"""
return self._retry_limit
[docs] def close(self):
"""
Close this driver.
"""
self._is_closed = True
def _create_new_session(self):
"""
Create a new QldbSession object.
"""
session_client = SessionClient.start_session(self._ledger_name, self._client)
return QldbSession(session_client, self._read_ahead, self._retry_limit, self._executor)