Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
106 changes: 90 additions & 16 deletions samcli/local/lambda_service/local_lambda_invoke_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@
import json
import logging
import io
import threading

from uuid import uuid4
from flask import Flask, request

from samcli.lib.utils.stream_writer import StreamWriter
Expand Down Expand Up @@ -101,10 +103,12 @@ def validate_request():
"log-type: {} is not supported. None is only supported.".format(log_type))

invocation_type = request_headers.get('X-Amz-Invocation-Type', 'RequestResponse')
if invocation_type != 'RequestResponse':
LOG.warning("invocation-type: %s is not supported. RequestResponse is only supported.", invocation_type)
if invocation_type not in ['Event', 'RequestResponse']:
LOG.warning("invocation-type: %s is not supported. Supported types: Event, RequestResponse.",
invocation_type)
return LambdaErrorResponses.not_implemented_locally(
"invocation-type: {} is not supported. RequestResponse is only supported.".format(invocation_type))
("invocation-type: {} is not supported. " +
"Supported types: Event, RequestResponse.").format(invocation_type))

def _construct_error_handling(self):
"""
Expand All @@ -115,48 +119,118 @@ def _construct_error_handling(self):
self._app.register_error_handler(404, LambdaErrorResponses.generic_path_not_found)
self._app.register_error_handler(405, LambdaErrorResponses.generic_method_not_allowed)

def _invoke_request_handler(self, function_name):
def _invoke_async_request(self, function_name, request_data, stdout, stderr):
"""
Request Handler for the Local Lambda Invoke path. This method is responsible for understanding the incoming
request and invoking the Local Lambda Function
Asynchronous Request Handler for the Local Lambda Invoke path. Flask response is returned immediately while the
lambda function is run in a separate thread.

Parameters
----------
function_name str
Name of the function to invoke
Name of function to invoke

request_data str
Parameters to pass to lambda function

stdout io.BaseIO
Output stream that stdout should be written to

stderr io.BaseIO
Output stream that stderr should be written to

Returns
-------
A Flask Response response object as if it was returned from Lambda
A flask Response response object as if it was returned from Lambda
"""
flask_request = request
request_id = uuid4()

request_data = flask_request.get_data()
thread = threading.Thread(target=self._invoke_sync_request,
args=(function_name, request_data),
kwargs={'stdout': stdout, 'stderr': stderr})
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These can be in args as well.

thread.daemon = True
thread.start()

if not request_data:
request_data = b'{}'
LOG.debug('Async invocation: %s, requestId=%s', function_name, request_id)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This requestId will not match the requestId in the container. This will be very confusing when reading the output. Can we not add the request id?

Ideally, if we could set the id from SAM CLI in the docker container, it would be much better but last time I checked, their was a way to do this.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So I just checked awslambda.js in the docker container and it looks like awsRequestId is set from invokeId. invokeId is set to uuid(). If we could just allow an environment variable to overwrite this value then we could pass it from SAM CLI. I don't know where the code for awslambda.js is coming from otherwise I'd make this change myself:

var fs = require('fs')
var crypto = require('crypto')

var HANDLER = process.argv[2] || process.env.AWS_LAMBDA_FUNCTION_HANDLER || process.env._HANDLER || 'index.handler'
var EVENT_BODY = process.argv[3] || process.env.AWS_LAMBDA_EVENT_BODY ||
  (process.env.DOCKER_LAMBDA_USE_STDIN && fs.readFileSync('/dev/stdin', 'utf8')) || '{}'

var FN_NAME = process.env.AWS_LAMBDA_FUNCTION_NAME || 'test'
var VERSION = process.env.AWS_LAMBDA_FUNCTION_VERSION || '$LATEST'
var MEM_SIZE = process.env.AWS_LAMBDA_FUNCTION_MEMORY_SIZE || '1536'
var TIMEOUT = process.env.AWS_LAMBDA_FUNCTION_TIMEOUT || '300'
var REGION = process.env.AWS_REGION || process.env.AWS_DEFAULT_REGION || 'us-east-1'
var ACCOUNT_ID = process.env.AWS_ACCOUNT_ID || randomAccountId()
var ACCESS_KEY_ID = process.env.AWS_ACCESS_KEY_ID || 'SOME_ACCESS_KEY_ID'
var SECRET_ACCESS_KEY = process.env.AWS_SECRET_ACCESS_KEY || 'SOME_SECRET_ACCESS_KEY'
var SESSION_TOKEN = process.env.AWS_SESSION_TOKEN
var INVOKED_ARN = process.env.AWS_LAMBDA_FUNCTION_INVOKED_ARN || arn(REGION, ACCOUNT_ID, FN_NAME)
var INVOKE_ID = process.env.AWS_LAMBDA_INVOKE_ID || uuid() // TODO: add this line

function consoleLog(str) {
  process.stderr.write(formatConsole(str))
}

function systemLog(str) {
  process.stderr.write(formatSystem(str) + '\n')
}

function systemErr(str) {
  process.stderr.write(formatErr(str) + '\n')
}

function handleResult(resultStr, cb) {
  if (!process.stdout.write('\n' + resultStr + '\n')) {
    process.stdout.once('drain', cb)
  } else {
    process.nextTick(cb)
  }
}

// Don't think this can be done in the Docker image
process.umask(2)

process.env.AWS_LAMBDA_FUNCTION_NAME = FN_NAME
process.env.AWS_LAMBDA_FUNCTION_VERSION = VERSION
process.env.AWS_LAMBDA_FUNCTION_MEMORY_SIZE = MEM_SIZE
process.env.AWS_LAMBDA_LOG_GROUP_NAME = '/aws/lambda/' + FN_NAME
process.env.AWS_LAMBDA_LOG_STREAM_NAME = new Date().toISOString().slice(0, 10).replace(/-/g, '/') +
  '/[' + VERSION + ']' + crypto.randomBytes(16).toString('hex')
process.env.AWS_REGION = REGION
process.env.AWS_DEFAULT_REGION = REGION
process.env._HANDLER = HANDLER

var OPTIONS = {
  initInvokeId: uuid(),
  invokeId: INVOKE_ID, // TODO: update this
  handler: HANDLER,
  suppressInit: true,
  credentials: {
    key: ACCESS_KEY_ID,
    secret: SECRET_ACCESS_KEY,
    session: SESSION_TOKEN,
  },
  eventBody: EVENT_BODY,
  contextObjects: {
    // clientContext: '{}',
    // cognitoIdentityId: undefined,
    // cognitoPoolId: undefined,
  },
  invokedFunctionArn: INVOKED_ARN,
}

return self.service_response(None,
{'x-amzn-requestid': request_id},
202)

request_data = request_data.decode('utf-8')
def _invoke_sync_request(self, function_name, request_data, stdout, stderr):
"""
Synchronous Request Handler for the Local Lambda Invoke path.

stdout_stream = io.BytesIO()
stdout_stream_writer = StreamWriter(stdout_stream, self.is_debugging)

try:
self.lambda_runner.invoke(function_name, request_data, stdout=stdout_stream_writer, stderr=self.stderr)
Parameters
----------
function_name str
Name of function to invoke

request_data str
Parameters to pass to lambda function

stdout io.BaseIO
Output stream that stdout should be written to

stderr io.BaseIO
Output stream that stderr should be written to

Returns
-------
A flask Response response object as if it was returned from Lambda
"""

try:
self.lambda_runner.invoke(function_name, request_data, stdout=stdout, stderr=stderr)
except FunctionNotFound:
LOG.debug('%s was not found to invoke.', function_name)
return LambdaErrorResponses.resource_not_found(function_name)

lambda_response, lambda_logs, is_lambda_user_error_response = \
LambdaOutputParser.get_lambda_output(stdout_stream)
LambdaOutputParser.get_lambda_output(stdout)

if self.stderr and lambda_logs:
if stderr and lambda_logs:
# Write the logs to stderr if available.
self.stderr.write(lambda_logs)
stderr.write(lambda_logs)

if is_lambda_user_error_response:
LOG.debug('Lambda error response: %s', lambda_response)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this really needed if we are sending this data back in the next line? My opinion is to remove this.

return self.service_response(lambda_response,
{'Content-Type': 'application/json', 'x-amz-function-error': 'Unhandled'},
200)

LOG.debug('Lambda returned success: %s', lambda_response)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as the above comment on LOG.debug

return self.service_response(lambda_response, {'Content-Type': 'application/json'}, 200)

def _invoke_request_handler(self, function_name):
"""
Determines what type of reuest handler should be used to invoke the Local Lambda.

Parameters
----------
function_name str
Name of the function to invoke

Returns
-------
A Flask Response response object as if it was returned from Lambda
"""
flask_request = request

request_data = flask_request.get_data()
invocation_type = flask_request.headers.get('X-Amz-Invocation-Type', 'RequestResponse')

if not request_data:
request_data = b'{}'

request_data = request_data.decode('utf-8')

stdout_stream = io.BytesIO()

if invocation_type == 'Event':
return self._invoke_async_request(function_name, request_data, stdout_stream, self.stderr)

return self._invoke_sync_request(function_name, request_data, stdout_stream, self.stderr)
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ def test_invoke_with_log_type_not_None(self):

def test_invoke_with_invocation_type_not_RequestResponse(self):
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should add explicit integ test for Invoke with --invocation-type Event.

expected_error_message = "An error occurred (NotImplemented) when calling the Invoke operation: " \
"invocation-type: DryRun is not supported. RequestResponse is only supported."
"invocation-type: DryRun is not supported. Supported types: Event, RequestResponse."

with self.assertRaises(ClientError) as error:
self.lambda_client.invoke(FunctionName="EchoEventFunction", InvocationType="DryRun")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,25 @@ def test_invoke_request_handler(self, request_mock, lambda_output_parser_mock, s
lambda_runner_mock.invoke.assert_called_once_with('HelloWorld', '{}', stdout=ANY, stderr=None)
service_response_mock.assert_called_once_with('hello world', {'Content-Type': 'application/json'}, 200)

@patch('samcli.local.lambda_service.local_lambda_invoke_service.LocalLambdaInvokeService.service_response')
@patch('samcli.local.lambda_service.local_lambda_invoke_service.LambdaOutputParser')
@patch('samcli.local.lambda_service.local_lambda_invoke_service.request')
def test_invoke_request_handler_async(self, request_mock, lambda_output_parser_mock, service_response_mock):
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should be mocking out Thread here too and asserting the mock is called correctly.

lambda_output_parser_mock.get_lambda_output.return_value = 'hello world', None, False
service_response_mock.return_value = 'event'
request_mock.headers = {'X-Amz-Invocation-Type': 'Event'}
request_mock.get_data.return_value = b'{}'

lambda_runner_mock = Mock()
service = LocalLambdaInvokeService(lambda_runner=lambda_runner_mock, port=3000, host='localhost')

response = service._invoke_request_handler(function_name='HelloWorld')

self.assertEquals(response, 'event')

lambda_runner_mock.invoke.assert_called_once_with('HelloWorld', '{}', stdout=ANY, stderr=None)
service_response_mock.assert_called_with(None, {'x-amzn-requestid': ANY}, 202)

@patch('samcli.local.lambda_service.local_lambda_invoke_service.LambdaErrorResponses')
@patch('samcli.local.lambda_service.local_lambda_invoke_service.request')
def test_invoke_request_handler_on_incorrect_path(self, request_mock, lambda_error_responses_mock):
Expand Down Expand Up @@ -224,7 +243,7 @@ def test_request_log_type_not_None(self, flask_request, lambda_error_responses_m

@patch('samcli.local.lambda_service.local_lambda_invoke_service.LambdaErrorResponses')
@patch('samcli.local.lambda_service.local_lambda_invoke_service.request')
def test_request_invocation_type_not_ResponseRequest(self, flask_request, lambda_error_responses_mock):
def test_request_invocation_type_not_supported(self, flask_request, lambda_error_responses_mock):
flask_request.get_data.return_value = None
flask_request.headers = {'X-Amz-Invocation-Type': 'DryRun'}
flask_request.content_type = 'application/json'
Expand All @@ -237,7 +256,7 @@ def test_request_invocation_type_not_ResponseRequest(self, flask_request, lambda
self.assertEquals(response, "NotImplementedLocally")

lambda_error_responses_mock.not_implemented_locally.assert_called_once_with(
"invocation-type: DryRun is not supported. RequestResponse is only supported.")
"invocation-type: DryRun is not supported. Supported types: Event, RequestResponse.")

@patch('samcli.local.lambda_service.local_lambda_invoke_service.request')
def test_request_with_no_data(self, flask_request):
Expand Down