CD Cloud Logix

Quick Guide on How to Implement a Lambda Function – Enforcing Public Block Access on S3 Bucket automatically

Olivier Butterbach
Olivier Butterbach

CEO of CD Cloud Logix

Table of Contents

Here is a quick guide on how to implement a lambda function which would scan all of your S3 Buckets and enforce Public Block Access ACLs automatically if they’re missing. Once completed, an email would be triggered to indicate which S3 Bucket was modified. This configuration is using Terraform 0.12 version to deploy this project.

This documentation is based on the following GitHub repository, any questions, don’t hesitate to raise it.

Github: Terraform S3 Public Notifications

I. Terraform Providers

Start by configuring your main AWS account as a Terraform provider as follow:

provider "aws" {
  region  = "eu-west-1"
  version = "~> 2.6"
}

Once completed, add your Terraform backend:

terraform {
  required_version = "~> 0.12.0"

  backend "s3" {
    acl     = "bucket-owner-full-control"
    bucket  = "tf-state-cdcloudlogix"
    encrypt = true
    key     = "lambda_s3_public.tfstate"
    region  = "eu-west-1"
  }

Don’t forget to modify the S3 bucket name as well as the key name based on your situation.

II. Modules

This project is using a unique module:

  • s3-scan-public

This module would install the following resources:

  • IAM roles and policies
  • CloudWatch Events
  • Lambda
  • SNS Notification

III. Lambda function

This project is using a bash script for installing / testing and zipping our lambda function:

#!/usr/bin/env bash
# Create python packages for lambda
#title          :python_packages.sh
#description    :This script will install pip packages, run pylint and pytest and finally zip python scripts
#author         :Oli
#date           :25/04/2020
#version        :0.1
#bash_version   :3.2.57(1)-release
#===================================================================================

set -o errexit
set -o pipefail
set -o nounset

function install_packages() {
  pip install -r requirements_test.txt
}

function zip_python() {
  pushd s3-scan-public/
  zip -r ../s3-public-payload.zip s3_public.py
  popd
  mv s3-public-payload.zip ../
}

function test_python() {
  pushd s3-scan-public/
  pylint s3_public.py
  popd
}

install_packages
test_python
zip_python

Before applying Terraform plan, the zip file would have to be present on the local file system.

Prepare a virtual environment for python before running this on your terminal:

$ virtualenv -p python3 venv
Running virtualenv with interpreter /usr/local/bin/python3
Using base prefix '/usr/local/Cellar/python/3.7.5/Frameworks/Python.framework/Versions/3.7'
New python executable in /Users/oli/Documents/venv/bin/python3.7
Also creating executable in /Users/oli/Documents/venv/bin/python
Installing setuptools, pip, wheel...
done.$ source venv/bin/activate

IV. SSM Parameters

To avoid exposing email addresses in a GitHub repository, this project is making the use of the following SSM parameters:

  • s3-public-exception-list: List of S3 Buckets excluded from Scan(StringList type)
  • default-source-email: Default name used as a source (String type)
  • default-destination-emails: List of email recipients (StringList type)

Regarding the configuration of each of these SSM parameters, have a look below on these screenshots:

V. Deploy

Package your lambda script first (Truncated output):

21:12 $ ./python_packages.sh
Requirement already satisfied: pytest in /Users/oli/Documents/terraform_s3_public_notifications/venv/lib/python3.7/site-packages 
[...]
~/Documents/terraform_s3_public_notifications/modules/s3-scan-public ~/Documents/terraform_s3_public_notifications/modules
-------------------------------------------------------------------
Your code has been rated at 10.00/10 (previous run: 9.22/10, +0.78)
~/Documents/terraform_s3_public_notifications/modules
~/Documents/terraform_s3_public_notifications/modules/s3-scan-public ~/Documents/terraform_s3_public_notifications/modules
  adding: s3_public.py (deflated 70%)
~/Documents/terraform_s3_public_notifications/modules

Then execute terraform init command (Truncated output):

21:13 $ terraform init
Initializing modules...
Initializing the backend...
Initializing provider plugins...
The following providers do not have any version constraints in configuration,
so the latest version was installed.
[...]

Finally, apply Terraform (Truncated output):

(venv) ✔ ~/Documents/terraform_s3_public_notifications [master|✔]
21:51 $ terraform apply
module.s3-public-lambda.data.aws_ssm_parameter.s3_public_emails: Refreshing state...
module.s3-public-lambda.data.aws_caller_identity.current: Refreshing state...
module.s3-public-lambda.data.aws_ssm_parameter.display_name: Refreshing state...
module.s3-public-lambda.data.template_file.cloudformation_sns_stack: Refreshing state...
[...]
Plan: 15 to add, 0 to change, 0 to destroy.
Do you want to perform these actions?
  Terraform will perform the actions described above.
  Only 'yes' will be accepted to approve.
Enter a value: yes
module.s3-public-lambda.aws_iam_policy.s3_public_log_policy: Creating...
module.s3-public-lambda.aws_cloudwatch_event_rule.schedule: Creating...
module.s3-public-lambda.aws_iam_policy.access_ssm_policy: Creating...
module.s3-public-lambda.aws_iam_policy.access_s3_policy: Creating...
module.s3-public-lambda.aws_iam_role.lambda_s3_public_role: Creating...
module.s3-public-lambda.aws_cloudformation_stack.sns_topic: Creating...
[...]

If you’re happy with this configuration, just confirm by typing yes

After deploying, each recipient would receive the following email from AWS, don’t forget to confirm that you wish to subscribe:

lambda function
Confirmation email of subscription

VI. Details of this repository

The following lambda function will:

  • List all your existing buckets
  • Compare this list with the exempt list of S3 Buckets present in your SSM parameter
  • Check if Public Access Block ACL is applied on each of your bucket
  • Apply Public Access Block ACL on your bucket if it wasn’t apply
  • Notify by SNS notification
"""
# Function:
# Purpose:  A Python function to list out any AWS S3 buckets in the account that have
# public access based on their ACLs, either Read or Write permissions.
"""

import os
import logging
from botocore.exceptions import ClientError
import boto3

SNS_TOPIC_ARN = os.getenv('SNS_TOPIC_ARN')
S3_EXCEPTION = os.getenv('S3_EXCEPTION')
AWS_ACCOUNT = os.getenv('AWS_ACCOUNT')
LOGGER = logging.getLogger()
LOGGER.setLevel(logging.INFO)

def lambda_handler(event, _):
    """
    Lambda handler function
    """
    LOGGER.info('Event: %s', event)
    private_buckets = []
    exception_buckets = []
    exception_buckets = ssm_s3_list(S3_EXCEPTION)
    if exception_buckets is None:
        LOGGER.error('SSM Parameter missing')
        sns_notify_public_bucket(private_buckets)
        return None
    client = boto3.client('s3')
    list_bucket_response = list_buckets(client)
    for bucket_info in list_bucket_response:
        if bucket_info['Name'] not in exception_buckets:
            response = retrieve_block_access(client, bucket_info['Name'])
            if block_configuration(client, bucket_info['Name'], response):
                private_buckets.append(bucket_info['Name'])
    LOGGER.info('Private Buckets: %s', private_buckets)
    if private_buckets:
        LOGGER.info('Sending sns message')
        sns_notify_public_bucket(private_buckets)
    return None

def list_buckets(client):
    """
    Return list of buckets
    """
    return client.list_buckets()['Buckets']

def ssm_s3_list(ssm_name):
    """
    Return list of buckets from SSM Parameters
    """
    ssmclient = boto3.client('ssm', region_name='eu-west-1')
    s3_exception_list = []
    if ssm_name:
        try:
            s3_exception_list = ssmclient.get_parameter(
                Name=ssm_name
                )['Parameter']['Value'].split(',')
            LOGGER.info('Buckets in list of exception: %s', s3_exception_list)
        except ClientError as client_error:
            LOGGER.error('No SSM parameter found: %s', client_error)
            return None
    else:
        return None
    return s3_exception_list

def retrieve_block_access(client, bucket_name):
    """
    Return public access block if exists
    """
    try:
        return client.get_public_access_block(Bucket=bucket_name)
    except ClientError as client_error:
        LOGGER.error('Get Access Block Exception: %s', client_error)
        return None

def block_configuration(client, bucket_name, response):
    """
    Return True if PublicAccessBlockConfiguration is missing
    """
    if response is not None:
        for _, value in response['PublicAccessBlockConfiguration'].items():
            if str(value) == "False":
                apply_block_access(client, bucket_name)
                return True
    else:
        apply_block_access(client, bucket_name)
        return True
    return False

def apply_block_access(client, bucket_name):
    """
    Apply public access block
    """
    try:
        LOGGER.info('Put Access Block on S3 Bucket: %s', bucket_name)
        return client.put_public_access_block(
            Bucket=bucket_name,
            PublicAccessBlockConfiguration={
                'BlockPublicAcls': True,
                'IgnorePublicAcls': True,
                'BlockPublicPolicy': True,
                'RestrictPublicBuckets': True
                })
    except ClientError as client_error:
        LOGGER.error('Put Access Block Exception: %s', client_error)
        return None

def sns_notify_public_bucket(private_buckets):
    """
    Notify the list of buckets where Public Access Block has been turn ON
    """
    sns_client = boto3.client('sns', region_name='eu-west-1')
    subject = 'AWS Account - {} S3 Bucket Public Status'.format(AWS_ACCOUNT)
    message_body = ''
    if private_buckets:
        message_body = '\n Public Access Block configuration applied to: {}'.format(private_buckets)
        message_body += '\n Configuration applied to {} buckets'.format(len(private_buckets))
        message_body += '\n Add your S3 Bucket to exception list if it is supposed to be public'
    else:
        message_body = 'Missing SSM Parameter, please configure it'
    sns_client.publish(TopicArn=SNS_TOPIC_ARN, Message=message_body, Subject=subject)

This lambda would be run from Monday to Friday each week:

# -----------------------------------------------------------
# set up AWS Cloudwatch Event every Monday to Friday at 9am
# -----------------------------------------------------------

resource "aws_cloudwatch_event_rule" "schedule" {
  name                = "event-invoke-s3-public-lambda"
  schedule_expression = "cron(0 9 ? * MON-FRI *)"
}

Terraform doesn’t support SNS Email configuration natively, hence, the reason why we need to use a Cloudformation template for this purpose:

# -----------------------------------------------------------
# AWS SNS topic (https://www.terraform.io/docs/providers/aws/r/sns_topic_subscription.html#email)
# -----------------------------------------------------------

resource "aws_cloudformation_stack" "sns_topic" {
  name          = var.stack_name
  template_body = data.template_file.cloudformation_sns_stack.rendered
}

# -----------------------------------------------------------
# Use Cloudformation template for EMAIL SNS Topic
# -----------------------------------------------------------

data "template_file" "cloudformation_sns_stack" {
  template = file("${path.module}/email-sns-stack.json.tpl")

  vars = {
    display_name  = data.aws_ssm_parameter.display_name.value
    subscriptions = "${join("," , formatlist("{ \"Endpoint\": \"%s\", \"Protocol\": \"%s\" }", split(",", data.aws_ssm_parameter.s3_public_emails.value), var.protocol))}"
  }
}

Combine with the following template file:

{
  "AWSTemplateFormatVersion": "2010-09-09",
  "Resources" : {
    "EmailSNSTopic": {
      "Type" : "AWS::SNS::Topic",
      "Properties" : {
        "DisplayName" : "${display_name}",
        "Subscription": [
          ${subscriptions}
        ]
      }
    }
  },
  "Outputs" : {
    "ARN" : {
      "Description" : "Email SNS Topic ARN",
      "Value" : { "Ref" : "EmailSNSTopic" }
    }
  }
}

VII. Testing

You can test your lambda by using the Test button as follow:

Lambda Function

This function would start running and setting up the Public Access Block ACLs on each of your existing buckets. Once Completed, you should receive the following email of confirmation:

Lambda Function

Table of Contents

RELATED

Here is a quick guide on how to implement a...

Read More

As an SRE Engineer Background, I was shopping around for...

Read More