Source code for greengrasssdk.Lambda

#
# Copyright 2010-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
#

import logging
import re

from io import BytesIO

from greengrass_common.function_arn_fields import FunctionArnFields
from greengrass_ipc_python_sdk.ipc_client import IPCClient, IPCException
from greengrasssdk.utils.testing import mock

# Log messages in the SDK are part of customer's log because they're helpful for debugging
# customer's lambdas. Since we configured the root logger to log to customer's log and set the
# propagate flag of this logger to True. The log messages submitted from this logger will be
# sent to the customer's local Cloudwatch handler.
customer_logger = logging.getLogger(__name__)
customer_logger.propagate = True

valid_base64_regex = '^([A-Za-z0-9+/]{4})*([A-Za-z0-9+/]{4}|[A-Za-z0-9+/]{3}=|[A-Za-z0-9+/]{2}==)$'


[docs]class InvocationException(Exception): pass
[docs]class Client: def __init__(self, endpoint='localhost', port=None): """ :param endpoint: Endpoint used to connect to IPC. :type endpoint: str :param port: Deprecated. Will not be used. :type port: None """ self.ipc = IPCClient(endpoint=endpoint)
[docs] def invoke(self, **kwargs): r""" Invokes Lambda function of the given name. :Keyword Arguments: * *ClientContext* (``bytes``) -- Optional Base64-encoded data about the invoking client to pass to the Lambda function * *FunctionName* (``string``) -- [REQUIRED] The Amazon Resource Name (ARN) of the Lambda function to invoke. Name formats: * Qualified ARN - The function ARN with the version suffix. e.g. arn:aws:lambda:aws-region:acct-id:function:helloworld:1 * Unqualified ARN - The function ARN without the version suffix. e.g. arn:aws:lambda:aws-region:acct-id:function:helloworld * *InvocationType* (``string``) -- Choose from the following options. * ``RequestResponse`` (default) - Invoke the Lambda synchronously. Block until the function returns a response or times out. * ``Event`` - Invoke the Lambda asynchronously. The response only includes empty payload. * *Payload* (``bytes``) -- Optional input for the Lambda function to invoke. * *Qualifier* (``string``) -- Optional parameter to specify a Lambda function version if it was not included in the FunctionName field. If you specify a function version, the API uses the qualified function ARN to invoke a specific Lambda function. :returns: (``dict``) -- * *FunctionError* (``string``) -- If present, indicates that an error occurred while executing the Lambda function. If an error occurred, this field will have one of two values, ``Handled`` or ``Unhandled``. ``Handled`` errors are errors that are reported by the function while the ``Unhandled`` errors are those detected and reported by Greengrass Core. ``Unhandled`` errors include out of memory errors and function timeouts. Error details are provided in the Payload. * *Payload* (``bytes or StreamingBody object``) -- It is the result returned by the Lambda function. This is present only if the invocation type is ``RequestResponse``. In the event of a function error this field contains a message describing the error. """ # FunctionName is a required parameter if 'FunctionName' not in kwargs: raise ValueError( '"FunctionName" argument of Lambda.Client.invoke is a required argument but was not provided.' ) arn_fields = FunctionArnFields(kwargs['FunctionName']) arn_qualifier = arn_fields.qualifier # A Function qualifier can be provided as part of the ARN in FunctionName, or it can be provided here. The # behavior of the cloud is to throw an exception if both are specified but not equal extraneous_qualifier = kwargs.get('Qualifier', '') if extraneous_qualifier and arn_qualifier and arn_qualifier != extraneous_qualifier: raise ValueError('The derived qualifier from the function name does not match the specified qualifier.') final_qualifier = arn_qualifier if arn_qualifier else extraneous_qualifier try: # GGC v1.9.0 or newer function_arn = FunctionArnFields.build_function_arn(arn_fields.unqualified_arn, final_qualifier) except AttributeError: # older GGC version raise AttributeError('class FunctionArnFields has no attribute \'build_function_arn\'. build_function_arn ' 'is introduced in GGC v1.9.0. Please check your GGC version.') # ClientContext must be base64 if given, but is an option parameter try: client_context = kwargs.get('ClientContext', b'').decode() except AttributeError as e: customer_logger.exception(e) raise ValueError( '"ClientContext" argument must be a byte string or support a decode method which returns a string' ) if client_context: if not re.match(valid_base64_regex, client_context): raise ValueError('"ClientContext" argument of Lambda.Client.invoke must be base64 encoded.') # Payload is an optional parameter payload = kwargs.get('Payload', b'') invocation_type = kwargs.get('InvocationType', 'RequestResponse') customer_logger.debug('Invoking local lambda "{}" with payload "{}" and client context "{}"'.format( function_arn, payload, client_context)) # Post the work to IPC and return the result of that work return self._invoke_internal(function_arn, payload, client_context, invocation_type)
@mock def _invoke_internal(self, function_arn, payload, client_context, invocation_type="RequestResponse"): """ This private method is seperate from the main, public invoke method so that other code within this SDK can give this Lambda client a raw payload/client context to invoke with, rather than having it built for them. This lets you include custom ExtensionMap_ values like subject which are needed for our internal pinned Lambdas. """ customer_logger.debug('Invoking Lambda function "{}" with Greengrass Message "{}"'.format(function_arn, payload)) try: invocation_id = self.ipc.post_work(function_arn, payload, client_context, invocation_type) if invocation_type == "Event": # TODO: Properly return errors based on BOTO response # https://boto3.readthedocs.io/en/latest/reference/services/lambda.html#Lambda.Client.invoke return {'Payload': b'', 'FunctionError': ''} work_result_output = self.ipc.get_work_result(function_arn, invocation_id) if not work_result_output.func_err: output_payload = StreamingBody(work_result_output.payload) else: output_payload = work_result_output.payload invoke_output = { 'Payload': output_payload, 'FunctionError': work_result_output.func_err, } return invoke_output except IPCException as e: customer_logger.exception(e) raise InvocationException('Failed to invoke function due to ' + str(e))
[docs]class StreamingBody(object): """Wrapper class for http response payload This provides a consistent interface to AWS Lambda Python SDK """ def __init__(self, payload): self._raw_stream = BytesIO(payload) self._amount_read = 0
[docs] def read(self, amt=None): """Read at most amt bytes from the stream. If the amt argument is omitted, read all data. """ chunk = self._raw_stream.read(amt) self._amount_read += len(chunk) return chunk
[docs] def close(self): self._raw_stream.close()