AWS gives us a few ways to refresh the Athena table partitions. We can use the user interface, run the MSCK REPAIR TABLE
statement using Hive, or use a Glue Crawler. This article will show you how to create a new crawler and use it to refresh an Athena table.
Table of Contents
First, we have to install, import boto3
, and create a glue client
import boto3
glue = boto3.client('glue', aws_access_key_id=aws_access_key_id, aws_secret_access_key=aws_secret_access_key)
If the crawler already exists, we can reuse it. To get the existing crawler, we have to use the get_crawler
function. Note that, instead of returning a null
, the function raises an EntityNotFoundException
if there is no crawler with a given name. We will not use the instance returned by the get_crawler
function. We call it just to check whether we should create the crawler or not.
try:
glue.get_crawler(Name=crawler_name)
except glue.exceptions.EntityNotFoundException:
# the crawler does not exist
Creating a new crawler
To create a new crawler which refreshes table partitions, we need a few information:
- the database and the name of the existing Athena table
- the desired behavior in case of schema changes
- the IAM role that allows the crawler to access the files in S3 and modify the Glue Data Catalog
Let’s start with crawler targets. In this example, we want to refresh tables which are already defined in the Glue Data Catalog, so we are going to use the CatalogTargets
property and leave other targets empty:
table_names = [{'DatabaseName': 'schema_name', 'Tables': ['table_A', 'table_B']}]
crawler_targets = {'S3Targets': [], 'JdbcTargets': [], 'DynamoDBTargets': [], 'CatalogTargets': table_names}
In addition to that, we want to detect and add a new partition/column, but we don’t want to remove anything automatically, so our SchemaChangePolicy
should look like this:
schema_change_policy = {'UpdateBehavior': 'UPDATE_IN_DATABASE', 'DeleteBehavior': 'LOG'}
We also have to instruct the crawler to use the table metadata when adding or updating the columns (so it does not change the types of the columns) and combine all partitions’ schemas. It will allow us to remove a column in the future without breaking the schema (we will get nulls when the data is missing).
crawler_config = '''{
"Version":1.0,
"CrawlerOutput":{"Partitions":{"AddOrUpdateBehavior":"InheritFromTable"}},
"Grouping":{"TableGroupingPolicy":"CombineCompatibleSchemas"}}
'''
Finally, we can create a new crawler:
glue.create_crawler(Name=crawler_name,
Role=glue_crawler_service_role,
DatabaseName=database_name, Description='some description',
Targets=crawler_targets, Classifiers=[], TablePrefix='',
SchemaChangePolicy=schema_change_policy,
Configuration=crawler_config)
Starting a crawler
Starting a crawler is trivial. All we have to do is calling the start_crawler
function:
glue.start_crawler(Name=crawler_name)
If the crawler is already running, we will get the CrawlerRunningException
.
Want to build AI systems that actually work?
Download my expert-crafted GenAI Transformation Guide for Data Teams and discover how to properly measure AI performance, set up guardrails, and continuously improve your AI solutions like the pros.
Waiting until a crawler finishes running
If we want to wait until a crawler finishes its job, we should check the status of the crawler:
crawler_status = glue.get_crawler(Name=crawler_name)['Crawler']['State']
finished = crawler_status == 'READY'
We can run this code in a loop, but make sure that it has a second exit condition (for example, waiting no longer than 10 minutes in total) in case the crawler gets stuck.