# Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"). You may not use this file except in compliance with
# the License. A copy of the License is located at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# or in the "license" file accompanying this file. This file is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
# CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
# and limitations under the License.
from logging import getLogger
from amazon.ion.simple_types import IonPyBool, IonPyBytes, IonPyDecimal, IonPyDict, IonPyFloat, IonPyInt, IonPyList, \
IonPyNull, IonPySymbol, IonPyText, IonPyTimestamp
from amazon.ion.simpleion import dumps
from botocore.exceptions import ClientError
logger = getLogger(__name__)
[docs]class SessionClient:
"""
A class representing an independent session to a QLDB ledger that handles endpoint requests. This class is used in
:py:class:`pyqldb.driver.base_qldb_driver.BaseQldbDriver` and :py:class:`pyqldb.session.qldb_session.QldbSession`.
This class is not meant to be used directly by developers.
:type ledger_name: str
:param ledger_name: The QLDB ledger name.
:type token: str
:param token: The initial session token representing the session connection.
:type client: :py:class:`botocore.client.BaseClient`
:param: The low level service client.
:type session_id: str
:param: The session ID.
"""
def __init__(self, ledger_name, token, client, session_id):
self._ledger_name = ledger_name
self._token = token
self._client = client
self._session_id = session_id
def __enter__(self):
"""
Context Manager function to support the 'with' statement.
"""
return self
def __exit__(self, exc_type, exc_val, exc_tb):
"""
Context Manager function to support the 'with' statement.
"""
self.close()
@property
def client(self):
"""
The **read-only** low level service client.
"""
return self._client
@property
def id(self):
"""
The **read-only** ID of this session.
"""
return self._session_id
@property
def ledger_name(self):
"""
The **read-only** ledger name.
"""
return self._ledger_name
@property
def token(self):
"""
The **read-only** token.
"""
return self._token
[docs] def abort_transaction(self):
"""
Send request to abort the currently active transaction.
:rtype: dict
:return: The abort transaction result response from the endpoint.
"""
request = {'SessionToken': self.token, 'AbortTransaction': {}}
result = self._send_command(request)
abort_transaction_output = result.get('AbortTransaction')
return abort_transaction_output
[docs] def close(self):
"""
Close this session.
"""
try:
self.end_session()
except ClientError as ce:
# We will only log issues closing the session, as QLDB will clean them after a timeout.
logger.warning('Errors closing session: {}'.format(ce))
[docs] def commit_transaction(self, transaction_id, commit_digest):
"""
Send request to commit the currently active transaction.
:type transaction_id: str
:param transaction_id: The ID of the transaction.
:type commit_digest: str
:param commit_digest: The digest hash of the transaction to commit.
:rtype: dict
:return: The commit transaction result response from the endpoint.
"""
request = {'SessionToken': self.token, 'CommitTransaction': {'TransactionId': transaction_id,
'CommitDigest': commit_digest}}
result = self._send_command(request)
commit_transaction_output = result.get('CommitTransaction')
return commit_transaction_output
[docs] def end_session(self):
"""
Send request to end the independent session represented by the instance of this class.
:rtype: dict
:return: The end session result response from the endpoint.
"""
request = {'SessionToken': self.token, 'EndSession': {}}
result = self._send_command(request)
end_session_output = result.get('EndSession')
return end_session_output
[docs] def execute_statement(self, transaction_id, statement, parameters):
"""
Send an execute request with parameters to QLDB.
:type transaction_id: str
:param transaction_id: The ID of the transaction.
:type statement: str
:param statement: The statement to execute.
:type parameters: list
:param parameters: List of Ion values to fill in parameters of the statement.
:rtype: dict
:return: The statement result response from the endpoint.
"""
parameters = list(map(self._to_value_holder, parameters))
request = {'SessionToken': self.token, 'ExecuteStatement': {'TransactionId': transaction_id,
'Statement': statement,
'Parameters': parameters}}
result = self._send_command(request)
statement_result = result.get('ExecuteStatement')
return statement_result
[docs] def fetch_page(self, transaction_id, next_page_token):
"""
Send fetch result request to QLDB, retrieving the next chunk of data for the result.
:type transaction_id: str
:param transaction_id: The ID of the transaction.
:type next_page_token: str
:param next_page_token: The next page token.
:rtype: dict
:return: The fetch result response from the endpoint.
"""
request = {'SessionToken': self.token, 'FetchPage': {'TransactionId': transaction_id,
'NextPageToken': next_page_token}}
result = self._send_command(request)
statement_result = result.get('FetchPage')
return statement_result
[docs] def start_transaction(self):
"""
Send request to start a transaction.
:rtype: dict
:return: The start transaction response.
"""
request = {'SessionToken': self.token, 'StartTransaction': {}}
result = self._send_command(request)
return result.get('StartTransaction')
def _send_command(self, request):
"""
Call send_command method of the low level client.
"""
logger.debug('Sending request: {}'.format(request))
result = self._client.send_command(**request)
logger.debug('Received response: {}'.format(result))
return result
[docs] @staticmethod
def start_session(ledger_name, client):
"""
Factory method for constructing a new :py:class:`pyqldb.communication.session_client.SessionClient`, creating a
new session to QLDB on construction.
:type ledger_name: str
:param ledger_name: The ledger name to create session.
:type client: :py:class:`botocore.client.BaseClient`
:param client: The low level service client.
:rtype: :py:class:`pyqldb.communication.session_client.SessionClient`
:return: A new SessionClient object.
"""
logger.debug('Initiating new session.')
result = client.send_command(StartSession={'LedgerName': ledger_name})
token = result.get('StartSession').get('SessionToken')
session_id = result.get('ResponseMetadata').get('HTTPHeaders').get('x-amzn-requestid')
return SessionClient(ledger_name, token, client, session_id)
@staticmethod
def _to_value_holder(parameter):
"""
Convert Python or Ion value to binary, and store in a value holder.
"""
parameter_binary = dumps(parameter)
value_holder = {'IonBinary': parameter_binary}
return value_holder