'use strict';
/* eslint-disable no-restricted-syntax */
/*
* Copyright (c) 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*/
const cbor = require('cbor');
const net = require('net');
const smData = require('aws-greengrass-core-sdk/stream-manager/data');
const exceptions = require('./exceptions');
const utilInternal = require('./utilInternal');
const util = require('./util');
// Consts
// Version of the Java SDK.
// NOTE: When you bump this version,
// consider adding the old version to olderSupportedProtocolVersions list (if you intend to support it)
const SDK_VERSION = '1.1.0';
// List of supported protocol protocol.
// These are meant to be used for graceful degradation if the server does not support the current SDK version.
const OLD_SUPPORTED_PROTOCOL_VERSIONS = ['1.0.0'];
const CONNECT_VERSION = 1;
const removeFromArray = (arr, f) => {
arr.indexOf(f) !== -1 && arr.splice(arr.indexOf(f), 1);
};
/**
* Stream manager client
*
* @class
* @memberOf aws-greengrass-core-sdk.StreamManager
*
* @example <caption>StreamManager Usage</caption>
* const { StreamManagerClient } = require('aws-greengrass-core-sdk/stream-manager');
* const client = new StreamManagerClient();
* c.onConnected(async () => {
* // Do work with the client (c) here.
* });
*/
class StreamManagerClient {
#closed = false;
/**
* @type {module:net.Socket}
*/
#socket = null;
#authToken = null;
#connected = false;
/**
* @typedef Logger
* @type {Object}
* @property {function(...*)} error
* @property {function(...*)} info
* @property {function(...*)} debug
*/
/**
* @type {?Logger}
*/
#logger = null;
#requestMap = {};
connectCallbacks = [];
errorCallbacks = [];
#defaultParams = {
port: null,
host: '127.0.0.1',
onConnectCb: null,
onErrorCb: null,
logger: {
error: console.error,
debug: console.debug,
info: console.info,
warn: console.warn,
},
};
/**
* Constructs a new Stream Manager client. Once connected, <tt>onConnectCb</tt> will be called and
* the client can then be used.
*
* @param {Object?} opts All these options are optional.
* @param {Number} opts.port
* @param {String} opts.host
* @param {Function?} opts.onConnectCb Optional callback to be called once the client has connected.
* @param {Function(Error)?} opts.onErrorCb Optional, but highly suggested callback to be called when a connection error occurs.
* @param {Logger?} opts.logger
*/
constructor(opts) {
let {
// eslint-disable-next-line prefer-const
port, host, onConnectCb, onErrorCb, logger,
} = {
// Apply defaults
...this.#defaultParams,
// Then possibly override them with what the user set
...opts,
};
if (port === null) {
port = parseInt(process.env.STREAM_MANAGER_SERVER_PORT || 8088, 10);
}
this.#logger = logger;
this.port = port;
this.host = host;
this.#authToken = process.env.AWS_CONTAINER_AUTHORIZATION_TOKEN || null;
if (typeof onConnectCb === 'function') {
this.onConnected(onConnectCb);
}
if (typeof onErrorCb === 'function') {
this.onError(onErrorCb);
}
this.__connect();
}
async __connect() {
try {
await new Promise((resolve, reject) => {
if (this.#closed) {
return reject(new exceptions.StreamManagerException('Client is closed and cannot be reopened'));
}
if (this.#connected) {
return resolve();
}
const newSock = net.createConnection({
port: this.port,
host: this.host,
// Set high water mark so that we can read 1 full packet (1GB) at a time instead of needing to
// try to read multiple times and combine the results. The HWM adjusts how much the socket will
// buffer when reading.
readableHighWaterMark: utilInternal.MAX_PACKET_SIZE,
}, async () => {
try {
// Connection started
this.#logger.debug(`Opening connection to ${this.host}:${this.port}`);
this.#connected = false;
const request = new smData.ConnectRequest()
.withProtocolVersion(smData.VersionInfo.PROTOCOL_VERSION.asMap())
.withSdkVersion(SDK_VERSION)
.withAuthToken(this.#authToken)
.withOtherSupportedProtocolVersions(OLD_SUPPORTED_PROTOCOL_VERSIONS)
.withRequestId(utilInternal.uuidv4());
// Write the connect version
newSock.write(utilInternal.intToBuffer(CONNECT_VERSION, 1));
// Write request to socket
const frame = new smData.MessageFrame(smData.Operation.Connect, cbor.encode(request.asMap()));
const byteFrame = utilInternal.encodeFrame(frame);
newSock.write(byteFrame.header);
newSock.write(byteFrame.payload);
await this.__read(newSock);
// Only now that we're connected should we set/replace the socket
this.#socket = newSock;
resolve();
} catch (errors) {
reject(errors);
}
});
newSock.on('error', (e) => {
this.#logger.error(e);
this.errorCallbacks.forEach((f) => f(e));
newSock.end();
if (!this.#connected) {
reject(e);
}
});
newSock.on('end', () => {
this.#logger.info('Socket is ending');
});
newSock.on('close', () => {
newSock.destroy();
this.#connected = false;
});
});
// Set us to be in connected mode
this.#connected = true;
this.#logger.info('Successfully connected');
this.connectCallbacks.forEach((f) => {
try {
f();
} finally {
// After calling the connect callback remove it so we don't call it multiple times.
// A client should only connect once.
removeFromArray(this.connectCallbacks, f);
}
});
} catch (e) {
this.#logger.error(e);
this.errorCallbacks.forEach((f) => f(e));
throw e;
}
}
__readSocket(n, socket, resolve = null, reject = null) {
if (resolve && reject) {
if (this.#closed) {
reject();
}
const r = socket.read(n);
if (r === null) {
socket.once('readable', () => {
this.__readSocket(n, socket, resolve, reject);
});
return;
}
resolve(r);
return;
}
return new Promise((res, rej) => {
if (this.#closed) {
rej();
}
this.__readSocket(n, socket, res, rej);
});
}
async __read(socket = this.#socket) {
if (this.#connected) {
const frame = await this.__readMessageFrame(socket);
this.__handleReadResponse(cbor.decodeFirstSync(frame.payload), frame);
} else {
// Read connect version
const connectResponseVersion = utilInternal.intFromBuffer(await this.__readSocket(1, socket));
if (connectResponseVersion !== CONNECT_VERSION) {
this.#logger.error('Unexpected response from the server, Connect version:', connectResponseVersion);
throw new exceptions.ConnectFailedException('Failed to establish connection with the server');
}
// Read connect response
let response = await this.__readMessageFrame(socket);
if (response.operation === smData.Operation.ConnectResponse) {
const payload = cbor.decodeFirstSync(response.payload);
response = smData.ConnectResponse.fromMap(payload);
this.#logger.debug('Received ConnectResponse from server:', response);
} else {
this.#logger.error('Received data with unexpected operation', response.operation);
throw new exceptions.ConnectFailedException('Failed to establish connection with the server');
}
if (response.status !== smData.ResponseStatusCode.Success) {
this.#logger.error('Received ConnectResponse with unexpected status', response.status);
throw new exceptions.ConnectFailedException('Failed to establish connection with the server');
}
if (response.protocolVersion !== smData.VersionInfo.PROTOCOL_VERSION.asMap()) {
this.#logger.warn('SDK with version %s using Protocol version %s is not fully compatible with '
+ 'Server with version %s. '
+ 'Client has connected in a compatibility mode using protocol version %s. '
+ 'Some features will not work as expected', SDK_VERSION, smData.VersionInfo.PROTOCOL_VERSION.asMap(),
response.serverVersion, response.protocolVersion);
}
}
// Put ourselves back in the event loop to handle the next messages
setImmediate(async () => {
try {
await this.__read();
} catch (e) {
// Only bubble up the errors when we're actually connected and not closed
if (this.#connected && !this.#closed) {
this.errorCallbacks.forEach((f) => f(e));
}
}
});
}
async __readMessageFrame(socket) {
const length = utilInternal.intFromBuffer(await this.__readSocket(4, socket));
const operation = utilInternal.intFromBuffer(await this.__readSocket(1, socket));
let op = smData.Operation.fromMap(operation);
if (typeof op === 'undefined') {
this.#logger.error('Found unknown operation', operation);
op = smData.Operation.Unknown;
}
return new smData.MessageFrame(op, await this.__readSocket(length - 1, socket));
}
__handleReadResponse(data, frame) {
if (frame.operation === smData.Operation.ReadMessagesResponse) {
const response = smData.ReadMessagesResponse.fromMap(data);
this.#logger.debug('Received ReadMessagesResponse from server');
this.#requestMap[response.requestId](response);
} else if (frame.operation === smData.Operation.CreateMessageStreamResponse) {
const response = smData.CreateMessageStreamResponse.fromMap(data);
this.#logger.debug('Received CreateMessageStreamResponse from server', frame);
this.#requestMap[response.requestId](response);
} else if (frame.operation === smData.Operation.UpdateMessageStreamResponse) {
const response = smData.UpdateMessageStreamResponse.fromMap(data);
this.#logger.debug('Received UpdateMessageStreamResponse from server', frame);
this.#requestMap[response.requestId](response);
} else if (frame.operation === smData.Operation.DeleteMessageStreamResponse) {
const response = smData.DeleteMessageStreamResponse.fromMap(data);
this.#logger.debug('Received DeleteMessageStreamResponse from server', frame);
this.#requestMap[response.requestId](response);
} else if (frame.operation === smData.Operation.AppendMessageResponse) {
const response = smData.AppendMessageResponse.fromMap(data);
this.#logger.debug('Received AppendMessageResponse from server', frame);
this.#requestMap[response.requestId](response);
} else if (frame.operation === smData.Operation.ListStreamsResponse) {
const response = smData.ListStreamsResponse.fromMap(data);
this.#logger.debug('Received ListStreamsResponse from server', frame);
this.#requestMap[response.requestId](response);
} else if (frame.operation === smData.Operation.DescribeMessageStreamResponse) {
const response = smData.DescribeMessageStreamResponse.fromMap(data);
this.#logger.debug('Received DescribeMessageStreamResponse from server', frame);
this.#requestMap[response.requestId](response);
} else if (frame.operation === smData.Operation.UnknownOperationError) {
this.#logger.error('Received response with UnknownOperation Error from server. You should update your server version');
const response = smData.UnknownOperationError.fromMap(data);
this.#requestMap[response.requestId](response);
} else if (frame.operation === smData.Operation.Unknown) {
this.#logger.error('Received response with unknown operation from server', frame);
try {
const { requestId } = cbor.decodeFirstSync(frame.payload);
this.#requestMap[requestId](frame);
} catch {
// We tried our best to figure out the request id, but it failed.
// We already logged the unknown smData.Operation, so there's nothing
// else we can do at this point
}
} else {
this.#logger.error('Received data with unhandled operation', frame.operation);
}
}
async _sendAndReceive(operation, data) {
if (this.#closed) {
throw new exceptions.StreamManagerException('Client is closed and cannot be reopened');
}
if (data.requestId === null) {
// eslint-disable-next-line no-param-reassign
data.requestId = utilInternal.uuidv4();
}
const validation = utilInternal.isInvalid(data);
if (validation) {
throw new exceptions.ValidationException(validation);
}
// If we're not connected, immediately try to reconnect
if (!this.#connected) {
await this.__connect();
}
const promise = new Promise(((resolve, reject) => {
this.#requestMap[data.requestId] = (result) => {
// Drop async queue from request map
delete this.#requestMap[data.requestId];
if (result instanceof smData.MessageFrame && result.operation === smData.Operation.Unknown) {
reject(new exceptions.ClientException('Received response with unknown operation from server'));
}
resolve(result);
};
}));
// Write request to socket
const frame = new smData.MessageFrame(operation, cbor.encode(data.asMap()));
const byteFrame = utilInternal.encodeFrame(frame);
this.#socket.write(byteFrame.header);
this.#socket.write(byteFrame.payload);
return promise;
}
static __validateReadMessagesOptions(options) {
if (options !== null) {
if (!(options instanceof smData.ReadMessagesOptions)) {
throw new exceptions.ValidationException('options argument to read_messages must be a ReadMessageOptions object');
}
const validation = utilInternal.isInvalid(options);
if (validation) {
throw new exceptions.ValidationException(validation);
}
if (
options.minMessageCount !== null
&& options.maxMessageCount !== null
&& options.minMessageCount > options.maxMessageCount
) {
throw new exceptions.ValidationException('minMessageCount must be less than or equal to maxMessageCount');
}
}
}
/**
* Append a message into the specified message stream. Returns the sequence number of the message
* if it was successfully appended.
*
* @param streamName {string} The name of the stream to append to.
* @param data {Buffer} Buffer containing the data to be written.
* @returns {Promise<Number>}
*/
async appendMessage(streamName, data) {
const request = new smData.AppendMessageRequest().withName(streamName).withPayload(data);
const result = await this._sendAndReceive(smData.Operation.AppendMessage, request);
utilInternal.throwOnErrorResponse(result);
return result.sequenceNumber;
}
/**
* Create a message stream with a given definition.
*
* @param definition {aws-greengrass-core-sdk.StreamManager.MessageStreamDefinition}
* @returns {Promise<void>}
*/
async createMessageStream(definition) {
if (!(definition instanceof smData.MessageStreamDefinition)) {
throw new exceptions.ValidationException('definition argument to create_stream must be a MessageStreamDefinition object');
}
const request = new smData.CreateMessageStreamRequest().withDefinition(definition);
const result = await this._sendAndReceive(smData.Operation.CreateMessageStream, request);
utilInternal.throwOnErrorResponse(result);
}
/**
* Updates a message stream with the new definition.
* Minimum version requirements: StreamManager server version 1.1 (or AWS IoT Greengrass Core 1.11.0)
*
* @param definition {aws-greengrass-core-sdk.StreamManager.MessageStreamDefinition}
* @returns {Promise<void>}
*/
async updateMessageStream(definition) {
if (!(definition instanceof smData.MessageStreamDefinition)) {
throw new exceptions.ValidationException('definition argument to update_stream must be a MessageStreamDefinition object');
}
const request = new smData.UpdateMessageStreamRequest().withDefinition(definition);
const result = await this._sendAndReceive(smData.Operation.UpdateMessageStream, request);
utilInternal.throwOnErrorResponse(result);
}
/**
* Deletes a message stream based on its name. Nothing is returned if the request succeeds.
* An error will be thrown if the request fails.
*
* @param streamName {string} The name of the stream to be deleted.
* @returns {Promise<void>}
*/
async deleteMessageStream(streamName) {
const request = new smData.DeleteMessageStreamRequest().withName(streamName);
const result = await this._sendAndReceive(smData.Operation.DeleteMessageStream, request);
utilInternal.throwOnErrorResponse(result);
}
/**
* Read message(s) from a chosen stream with options. If no options are specified it will try to read
* 1 message from the stream.
*
* @param streamName {string} The name of the stream to read from.
* @param options {aws-greengrass-core-sdk.StreamManager.ReadMessagesOptions?} Options used when reading from the stream of type {@linkcode aws-greengrass-core-sdk.StreamManager.ReadMessagesOptions}.
* Defaults are:
* <ul>
* <li>desiredStartSequenceNumber: 0</li>
* <li>minMessageCount: 1</li>
* <li>maxMessageCount: 1</li>
* <li>readTimeoutMillis: 0 <pre>// Where 0 here represents that the server will immediately return the messages
* // or an exception if there were not enough messages available.</pre></li>
* </ul>
* <p>
* If desiredStartSequenceNumber is specified in the options and is less
* than the current beginning of the stream, returned messages will start
* at the beginning of the stream and not necessarily the desiredStartSequenceNumber.
* </p>
* @returns {Promise<aws-greengrass-core-sdk.StreamManager.Message[]>} List of one or more messages.
*/
async readMessages(streamName, options = null) {
StreamManagerClient.__validateReadMessagesOptions(options);
const request = new smData.ReadMessagesRequest().withStreamName(streamName).withReadMessagesOptions(options);
const result = await this._sendAndReceive(smData.Operation.ReadMessages, request);
utilInternal.throwOnErrorResponse(result);
return result.messages;
}
/**
* List the streams in StreamManager. Returns a list of their names.
*
* @returns {Promise<String[]>}
*/
async listStreams() {
const request = new smData.ListStreamsRequest();
const result = await this._sendAndReceive(smData.Operation.ListStreams, request);
utilInternal.throwOnErrorResponse(result);
return result.streams;
}
/**
* Describe a message stream to get metadata including the stream's definition,
* size, and exporter statuses.
*
* @param streamName {string} The name of the stream to describe
* @returns {Promise<aws-greengrass-core-sdk.StreamManager.MessageStreamInfo>}
*/
async describeMessageStream(streamName) {
const request = new smData.DescribeMessageStreamRequest().withName(streamName);
const result = await this._sendAndReceive(smData.Operation.DescribeMessageStream, request);
utilInternal.throwOnErrorResponse(result);
return result.messageStreamInfo;
}
/**
* Add a callback for when the client connects.
* @param f {function}
*/
onConnected(f) {
if (this.#connected) {
f();
} else {
this.connectCallbacks.push(f);
}
}
/**
* Add a callback for when an error occurs.
* @param f {function}
*/
onError(f) {
this.errorCallbacks.push(f);
}
/**
* Close the connection
*/
close() {
if (this.#socket) {
this.#socket.end();
}
this.#closed = true;
}
}
module.exports = {
...smData,
StreamManagerClient: StreamManagerClient,
...exceptions,
util,
};