How to get a notification when a new file is uploaded to an S3 bucket

To get a Slack notification when a new file is uploaded to an S3 bucket, we have to configure the bucket notifications feature. We can do this using Terraform. In the following example, I assume that you have already defined the bucket and the SNS topic:

resource "aws_s3_bucket_notfication" "bucket_notification" {
    bucket = aws_s3_bucket.some-bucket.id

    topic {
        topic_arn = aws_sns_queue.queue-name.arn
        events = ["s3:ObjectCreated:*"]
    }
}

After that, we have to define a Lambda function that reacts to the messages sent to the SNS topic. I will use Serverless, so the yml configuration of my lambda function looks like this (it is only the part about the function; the whole file is longer):

plugins:
  - serverless-python-requirements

custom:
  pythonRequirements:
    dockerizePip: non-linux
    layer: true

functions:
  function_name:
    handler: file_name.function_name
    memorySize: 128
    layers:
      - {Ref: PythonRequirementsLambdaLayer}
    events:
      - sns: sns_arn

In the Python function, we have to extract the data from the SNS message and parse the messages:

import requests
import json
import boto3


def _parse_message(raw_message):
    message_records = json.loads(raw_message)
    for message in message_records['Records']:
        bucket_name = message['s3']['bucket']['name']
        file_name = message['s3']['object']['key']
        file_size = message['s3']['object']['size']
        file_size_in_mb = '{:.5f}'.format(file_size / (1024**2))
        yield f"File {file_name} uploaded to bucket {bucket_name} (size: {file_size_in_mb} MB)"

def _extract_file_info(record):
    sns_event = record['Sns']
    raw_message = sns_event['Message']
    message = list(_parse_message(raw_message))
    return message

We use those functions in the handler function to build the message:

def function_name(event, context):
    updated_file = [_extract_file_info(record) for record in event['Records']]
    message = '\n'.join([item for sublist in updated_file for item in sublist])

    ...

In the end, we have to send the message to the Slack webhook URL. I assume that you have the URL defined as a constant. In a real application, it is best to pass it as an environment variable or read from the Secrets Manager:

    requests.post(SLACK_WEBHOOK_URL, json={'text': message, 'link_names': 1})
Older post

Get an XCom value in the Airflow on_failure_callback function

How to get the task instance in the on_failure_callback to get access to XCom

Newer post

How to enable S3 bucket versioning using Terraform

How to configure S3 bucket versioning in Terraform

Are you looking for an experienced AI consultant? Do you need assistance with your RAG or Agentic Workflow?
Schedule a call, send me a message on LinkedIn. Schedule a call or send me a message on LinkedIn

>