To submit a PySpark job using SSHOperator in Airflow, we need three things:

Table of Contents

  1. Get Weekly AI Implementation Insights

  • an existing SSH connection to the Spark cluster
  • the location of the PySpark script (for example, an S3 location if we use EMR)
  • parameters used by PySpark and the script

The usage of the operator looks like this:

from airflow.contrib.operators.ssh_operator import SSHOperator

script = 's3://some_bucket/script.py'
spark_parameters = '--executor-memory 100G'
# here we can use Airflow template to define the parameters used in the script
parameters = '--db {{ params.database_instance }}, --output_path {{ params.output_path }}' 

submit_pyspark_job = SSHOperator(
    task_id='pyspark_submit'
    ssh_conn_id='ssh_connection',
    command='set -a; PYSPARK_PYTHON=python3; /usr/bin/spark-submit --deploy-mode cluster %s %s %s' % (spark_parameters, script, parameters),
    dag=dag
)

Get Weekly AI Implementation Insights

Join engineering leaders who receive my analysis of common AI production failures and how to prevent them. No fluff, just actionable techniques.

Get Weekly AI Implementation Insights

Join engineering leaders who receive my analysis of common AI production failures and how to prevent them. No fluff, just actionable techniques.

Older post

How to add a manual step to an Airflow DAG using the JiraOperator

How can you add a human action to an Airflow DAG?

Newer post

How to delay an Airflow DAG until a given hour using the DateTimeSensor

How to use the DateTimeSensor in Airflow

Engineering leaders: Is your AI failing in production? Take the 10-minute assessment
>