To get a Slack notification when a new file is uploaded to an S3 bucket, we have to configure the bucket notifications feature. We can do this using Terraform. In the following example, I assume that you have already defined the bucket and the SNS topic:
resource "aws_s3_bucket_notfication" "bucket_notification" {
bucket = aws_s3_bucket.some-bucket.id
topic {
topic_arn = aws_sns_queue.queue-name.arn
events = ["s3:ObjectCreated:*"]
}
}
After that, we have to define a Lambda function that reacts to the messages sent to the SNS topic. I will use Serverless, so the yml configuration of my lambda function looks like this (it is only the part about the function; the whole file is longer):
plugins:
- serverless-python-requirements
custom:
pythonRequirements:
dockerizePip: non-linux
layer: true
functions:
function_name:
handler: file_name.function_name
memorySize: 128
layers:
- {Ref: PythonRequirementsLambdaLayer}
events:
- sns: sns_arn
In the Python function, we have to extract the data from the SNS message and parse the messages:
import requests
import json
import boto3
def _parse_message(raw_message):
message_records = json.loads(raw_message)
for message in message_records['Records']:
bucket_name = message['s3']['bucket']['name']
file_name = message['s3']['object']['key']
file_size = message['s3']['object']['size']
file_size_in_mb = '{:.5f}'.format(file_size / (1024**2))
yield f"File {file_name} uploaded to bucket {bucket_name} (size: {file_size_in_mb} MB)"
def _extract_file_info(record):
sns_event = record['Sns']
raw_message = sns_event['Message']
message = list(_parse_message(raw_message))
return message
We use those functions in the handler function to build the message:
def function_name(event, context):
updated_file = [_extract_file_info(record) for record in event['Records']]
message = '\n'.join([item for sublist in updated_file for item in sublist])
...
In the end, we have to send the message to the Slack webhook URL. I assume that you have the URL defined as a constant. In a real application, it is best to pass it as an environment variable or read from the Secrets Manager:
requests.post(SLACK_WEBHOOK_URL, json={'text': message, 'link_names': 1})
Want to build AI systems that actually work?
Download my expert-crafted GenAI Transformation Guide for Data Teams and discover how to properly measure AI performance, set up guardrails, and continuously improve your AI solutions like the pros.