Lambda Extensions with Python

Lambda Extensions with Python

Oct 20, 2020·
Andrew Wyllie
Andrew Wyllie
· 10 min read
Image credit: Andrew Wyllie

Keeping up with AWS these days is like drinking from a firehose and while a lot of their newer products are designed to make existing products more accessible, every once in a while you get something that helps solve a nagging issue, or opens up a whole new world of possibilities on one of their core services.

AWS Lambda extensions, currently in preview and announced on Oct 8, 2020, is one of these types of products. Think of an extension as a wrapper or “sidecar” that runs in the same execution environment as the lambda’s code. According to the press release, some of the uses for these extensions include:

  • capturing diagnostic information before during and after the lambda function executes
  • instrumenting code without changing the structure of the code
  • fetching secrets before the function is invoked
  • detecting and alerting on function activity through security agents

That’s a lot to think about but initializing code and fetching secrets before the function runs seems like it could be a big deal in reducing function runtimes. For example, I often need to load parameters and secrets from SSM Parameter store. I usually try to do this once and then cache everything so it’s available the next time the function runs. As you can see in the diagram below, the extension runs in the same container as the function. This means that the extension has access to everything in the lambda’s environment including all of the environment variables, the actual lambda code, code in layers and the 500MB in /tmp.

A quick overview of how the extensions API fits into the lambda environment

The goal will be to write an extension that grabs a few parameters from SSM and caches them to see how fast it runs and then compare the results with a similar lambda that does not use the extension. The team at AWS (as always) has supplied us with some great sample code (https://github.com/aws-samples/aws-lambda-extensions) to get everything going. I highly recommend working through a few of their simple samples to get a feel for what is going on.

Set Up

The first challenge is that the extension needs to be installed in a layer in a directory called /extensions . Setting up a lambda layer is actually pretty straight forward and if you really want to get fancy, I wrote a post about using GitHub Actions to create a CI/CD pipeline for installing lambda layers (available here: https://www.dilex.net/data-blog/aws-serverless-cicd-with-github-actions) although if you just want to get something running, follow the examples in the aws-samples example code referenced above.

My approach was to write an extensions class (basically borrowed from the example code from the aws-samples repo) which looks like this:

import json
import os
import signal
import sys
import requests

'''
This is the base class for lambda extensions.  The actual extensions can be found in the top
level /extensions directory.  Extensions can be used to initialize objects and data before
the lambda starts running as well as handing logging or other types of error reporting without
affecting the code in the lambda itself
'''


class Extension:
    def __init__(self):
        self.name = os.path.basename(sys.argv[0])
        self.extension_id = None
        self.events = ['INVOKE', 'SHUTDOWN']
        self.lambda_api = os.environ['AWS_LAMBDA_RUNTIME_API']

        # catch and handle signals
        signal.signal(signal.SIGINT, self.handle_signal)
        signal.signal(signal.SIGTERM, self.handle_signal)

    def register_extension(self):
        'the extension needs to be registered or lambda will not run it'
        print(f"[{self.name}] Registering extension", flush=True)

        url = f"http://{self.lambda_api}/2020-01-01/extension/register"
        headers = {
            'Lambda-Extension-Name': self.name
        }
        payload = {
            'events': self.events
        }
        response = requests.post(
            url=url,
            headers=headers,
            json=payload
        )
        self.extension_id = response.headers['Lambda-Extension-Identifier']
        print(f"[{self.name}] Registered with ID: {self.extension_id}", flush=True)

    def process_events(self):
        'this listens for events from the lambda service'
        url = f"http://{self.lambda_api}/2020-01-01/extension/event/next"
        headers = {
            'Lambda-Extension-Identifier': self.extension_id
        }

        # set up a loop and wait for incoming requests
        while True:
            print(f"[{self.name}] Waiting for event...", flush=True)
            response = requests.get(
                url=url,
                headers=headers,
                timeout=None
            )
            event = json.loads(response.text)
            if event['eventType'] == 'SHUTDOWN':
                self.exit_processing()
            else:
                self.event_processing(event)

    def event_processing(self, event):
        'runs on every lambda invoke event - override function to do something more interesting'
        print(f'[{self.name}] Received event: {json.dumps(event)}', flush=True)

    def exit_processing(self):
        'do something when the function exits - override this to do something more interesting'
        print(f'[{self.name}] Received SHUTDOWN event. Exiting!', flush=True)
        sys.exit(0)

    def handle_signal(self, sig, frame):
        'if needed, pass this signal down to child process'
        print(f"[{self.name}] Received signal={sig}. Exiting", flush=True)
        sys.exit(0)

Admittedly, this is overkill for this simple test but the idea was to create something I could use to build a few different extensions.

Let’s take a look at a couple of sections of the above code. The instantiation method is going to grab the filename the function is being run from to establish self.name and, by default, this class will run the extension on INVOKE and SHUTDOWN events. The register_extension method is going to connect to the lambda api to establish an extension_id. The process_events method will set up a listener and wait for events from the lambda service.

The event_processing and exit_processing functions are basically placeholders that can be overridden from the extension code. This class was installed in a layer under /opt/python.

Now, let’s take a look at the extension code:

#!/usr/bin/env python3

import os
import sys

# These need to be imported from /opt/python
sys.path.append('/opt/python')
from AwAws.Lambda.extensions import Extension
from AwAws.SharedResources.parameters import Parameters
from AwAws.Utils.env import Env


class SSMParamsExtension(Extension):
    def __init__(self):
        self.service = None
        super().__init__()

    def store_params(self):
        try:
            Parameters(self.service).create_tmp_dict()
            print(f'[{self.name}] SSM params found for {self.service}')
        except Exception as e:
            print(f'[{self.name}] No SSM params found for {self.service} - check SSM', e)
            pass

    # override event_processing in the base class
    def event_processing(self, event):
        pass
        # print(f'[MY {self.name}] Received event: {json.dumps(event)}', flush=True)


ext = SSMParamsExtension()
ext.events = ['SHUTDOWN']
ext.register_extension()

# This extension only runs in the init and shutdown stage
ext.service = Env().get_env('AWAWS_SSM_SERVICE')
if ext.service is not None:
    ext.store_params()
else:
    print('AWAWS_SSM_SERVICE is required for SSM caching - ssm parameter caching turned off')

ext.process_events()

`` NOTE: make sure to make the function executable (chmod 755 your_extension_name)

This really just loads a couple of things from my AwAws boto3 wrapper code . The Extension class has been installed in AwAws.Lambda.extensions and I’m pulling in a module that grabs parameters from SSM as well as a utility to deal with environment variables.

Ignoring the class for a moment, this section at the bottom of the extension code:

ext = SSMParamsExtension()
ext.events = ['SHUTDOWN']
ext.register_extension()

# This extension only runs in the init stage
ext.service = Env().get_env('AWAWS_SSM_SERVICE')
if ext.service is not None:
    ext.store_params()
else:
    print(f'AWAWS_SSM_SERVICE is required for SSM caching - ssm parameter caching turned off')

ext.process_events()

is going to instantiate the class, set the extension up so that it will only run on the SHUTDOWN event and then register the extension with the lambda service. Since we are only going to load the parameters once, we don’t need to listen for INVOKE events. Next, we look for an environment variable that is set on the lambda which contains the root of our SSM parameters. For example, if the SSM parameters are organized something like /ServiceName/Param1, /ServiceName/Param2, etc. the env variable would be set to ServiceName.

Now we call the store_params method in the class which establishes a botocore client to connect to SSM, connects to SSM and grabs the parameters we asked for and then uses pickle to put the parameters in a file in /tmp.

Now we can push the layer up to AWS Lambda and start working on the lambda code to run the test.

Testing the Extension With the layer loaded we create a very simple Hello World lambda using the SAM toolkit.

import json

from AwAws.Lambda.base import Lambda
from AwAws.SharedResources.parameters import Parameters

class LambdaTestClass(Lambda):
	def handle(self, event, context):
        print('FUNCTION: started')
        
        ssm = Parameters('Announce')
        try:
            params = ssm.read_tmp_dict()
        except Exception as e:
            params = ssm.get_param_dictionary()
            
        for name in params.keys():
            print('SSM PARAM:', name, 'Value:', params[name])

        print('FUNCTION:', json.dumps(event))
        print('FUNCTION: done')

        return {
            "statusCode": 200,
            "body": json.dumps({
                "message": "hello world",
                # "location": ip.text.replace("\n", "")
            }),
        }

lambda_class = LambdaTestClass()

# this does not get inoked when initializing
# but only when the lambda service invokes it
lambda_handler = lambda_class.get_handler()

A bit of a sidebar here. I use a base class for all of my lambda functions. All this really means is that my function code is wrapped in the get_handler() method which allows me to do stuff like send an event to the function that keeps the function warm - a bit of legacy code to reduce cold start times.

So, ignoring the class, this function is going to read the parameters from the file in /tmp that was created by the extension. If you recall earlier, we set up an environment variable to specify which parameters we want from SSM. If we don’t set the environment, the extension will not get any parameters from SSM in which case we fall back to setting up the SSM client and getting the parameters in the lambda function itself. By testing it both ways we can get an idea of what impact the extension is having on our start up times.

With the Extension

REPORT RequestId: 6b9d5fbb-cb31-4bf1-b961-698b6f654182 Duration: 18.95 ms Billed Duration: 100 ms Memory Size: 128 MB Max Memory Used: 96 MB Init Duration: 675.52 ms

Without the Extension

REPORT RequestId: bdc9dfe3-c962-4afc-896c-37c2389a9679 Duration: 904.77 ms Billed Duration: 1000 ms Memory Size: 128 MB Max Memory Used: 103 MB Init Duration: 669.66 ms

As expected, running the lambda without running the extension first resulted in a much slower execution time - in fact, it’s off by an order of magnitude while the Init Duration for both function is about the same. Now, the astute reader may have noticed that there are a few more things we can do with the non extension lambda to tune it up a bit. For example, in both runs the extension is running, just in the second case, the extension does not fetch the data from SSM and cache it. It’s also worth pointing out that we should be establishing the botocore client connection before the function runs.

Let’s overhaul the function by tuning it up a bit and see what happens. Here’s the new function:

import json

from AwAws.Lambda.base import Lambda
from AwAws.SharedResources.parameters import Parameters

# set up the botocore client connection
ssm = Parameters('Announce')
ssm.get_ssm()

class LambdaTestClass(Lambda):
    def handle(self, event, context):
        print('FUNCTION: started')
        
        # try:
        #    params = ssm.read_tmp_dict()
        # except Exception as e:
        params = ssm.get_param_dictionary()
            
        for name in params.keys():
            print('SSM PARAM:', name, 'Value:', params[name])
        
        print('FUNCTION:', json.dumps(event))
        print('FUNCTION: done')

        return {
            "statusCode": 200,
            "body": json.dumps({
                "message": "hello world",
                # "location": ip.text.replace("\n", "")
            }),
        }


lambda_class = LambdaTestClass()

# this does not get inoked when initializing
# but only when the lambda service invokes it
lambda_handler = lambda_class.get_handler()

Which gives us:

REPORT RequestId: c76f03f9-7292-46bc-9d4e-92c044a24a27 Duration: 240.84 ms Billed Duration: 300 ms Memory Size: 128 MB Max Memory Used: 103 MB Init Duration: 725.15 ms

This is definitely better but still not as fast as the version with the extension. It’s also bumped up the initiation time a little bit which is not that surprising.

Wrapping Up

Obviously, to really test this properly we should set up a framework that reloads and runs the function a few hundred times but even this quick and dirty analysis gives us some insights on how the lambda extensions can be implemented. Extensions have a lot of other nice features too. The incoming events are passed into the extension if you register it with the INVOKE event. This allows you to do things like send events to cloudwatch or eventbridge without modifying the function code, even do some analysis on the incoming event to see if someone is trying something malicious. Definitely a cool feature and worth spending some time leveraging how to use it.

Did I miss something? Leave a note in the comments below!