#
# Copyright 2010-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
#
import base64
import json
import logging
from greengrasssdk import Lambda
from greengrass_common.env_vars import SHADOW_FUNCTION_ARN, ROUTER_FUNCTION_ARN, MY_FUNCTION_ARN
# Log messages in the SDK are part of customer's log because they're helpful for debugging
# customer's lambdas. Since we configured the root logger to log to customer's log and set the
# propagate flag of this logger to True. The log messages submitted from this logger will be
# sent to the customer's local Cloudwatch handler.
customer_logger = logging.getLogger(__name__)
customer_logger.propagate = True
[docs]class ShadowError(Exception):
pass
[docs]class Client:
def __init__(self):
self.lambda_client = Lambda.Client()
[docs] def get_thing_shadow(self, **kwargs):
r"""
Call shadow lambda to obtain current shadow state.
:Keyword Arguments:
* *thingName* (``string``) --
[REQUIRED]
The name of the thing.
:returns: (``dict``) --
The output from the GetThingShadow operation
* *payload* (``bytes``) --
The state information, in JSON format.
"""
thing_name = self._get_required_parameter('thingName', **kwargs)
payload = b''
return self._shadow_op('get', thing_name, payload)
[docs] def update_thing_shadow(self, **kwargs):
r"""
Updates the thing shadow for the specified thing.
:Keyword Arguments:
* *thingName* (``string``) --
[REQUIRED]
The name of the thing.
* *payload* (``bytes or seekable file-like object``) --
[REQUIRED]
The state information, in JSON format.
:returns: (``dict``) --
The output from the UpdateThingShadow operation
* *payload* (``bytes``) --
The state information, in JSON format.
"""
thing_name = self._get_required_parameter('thingName', **kwargs)
payload = self._get_required_parameter('payload', **kwargs)
return self._shadow_op('update', thing_name, payload)
[docs] def delete_thing_shadow(self, **kwargs):
r"""
Deletes the thing shadow for the specified thing.
:Keyword Arguments:
* *thingName* (``string``) --
[REQUIRED]
The name of the thing.
:returns: (``dict``) --
The output from the DeleteThingShadow operation
* *payload* (``bytes``) --
The state information, in JSON format.
"""
thing_name = self._get_required_parameter('thingName', **kwargs)
payload = b''
return self._shadow_op('delete', thing_name, payload)
[docs] def publish(self, **kwargs):
r"""
Publishes state information.
:Keyword Arguments:
* *topic* (``string``) --
[REQUIRED]
The name of the MQTT topic.
* *payload* (``bytes or seekable file-like object``) --
The state information, in JSON format.
* *queueFullPolicy* (``string``) --
The policy for GGC to take when its internal queue is full
:returns: None
"""
topic = self._get_required_parameter('topic', **kwargs)
# payload and queueFullPolicy are optional parameters
payload = kwargs.get('payload', b'')
queue_full_policy = kwargs.get('queueFullPolicy', '')
function_arn = ROUTER_FUNCTION_ARN
client_context = {
'custom': {
'source': MY_FUNCTION_ARN,
'subject': topic
}
}
if queue_full_policy == 'AllOrException':
client_context['custom']['queueFullPolicy'] = 'AllOrError'
elif queue_full_policy in ['BestEffort', '']:
client_context['custom']['queueFullPolicy'] = queue_full_policy
else:
raise ValueError('Invalid value for queueFullPolicy: {queueFullPolicy}'.format(
queueFullPolicy=queue_full_policy
))
customer_logger.debug('Publishing message on topic "{}" with Payload "{}"'.format(topic, payload))
self.lambda_client._invoke_internal(
function_arn,
payload,
base64.b64encode(json.dumps(client_context).encode()),
'Event'
)
def _get_required_parameter(self, parameter_name, **kwargs):
if parameter_name not in kwargs:
raise ValueError('Parameter "{parameter_name}" is a required parameter but was not provided.'.format(
parameter_name=parameter_name
))
return kwargs[parameter_name]
def _shadow_op(self, op, thing_name, payload):
topic = '$aws/things/{thing_name}/shadow/{op}'.format(thing_name=thing_name, op=op)
function_arn = SHADOW_FUNCTION_ARN
client_context = {
'custom': {
'subject': topic
}
}
customer_logger.debug('Calling shadow service on topic "{}" with payload "{}"'.format(topic, payload))
response = self.lambda_client._invoke_internal(
function_arn,
payload,
base64.b64encode(json.dumps(client_context).encode())
)
if response['FunctionError'] != '':
raise ShadowError('Request for shadow state returned FunctionError "{}" with error payload "{}"'.format(
response['FunctionError'], response['Payload']
))
payload = response['Payload'].read()
if response:
response_payload_map = json.loads(payload.decode('utf-8'))
if 'code' in response_payload_map and 'message' in response_payload_map:
raise ShadowError('Request for shadow state returned error code {} with message "{}"'.format(
response_payload_map['code'], response_payload_map['message']
))
return {'payload': payload}