In this article, I show how to use the SSHHook in a PythonOperator to connect to a remote server from Airflow using SSH and execute a command.

Table of Contents

  1. Get Weekly AI Implementation Insights
  2. SSHHook in PythonOperator

First, I have to define the SSH connection in Airflow because I will pass the connection parameters using the Airflow connection id instead of defining the host, port, username, and password in the Python code.

When that part is done, I can define the function that connects to SSH:

from airflow.contrib.hooks.ssh_hook import SSHHook

ssh = SSHHook(ssh_conn_id=AIRFLOW_CONNECTION_ID)

In the next step, I open a new connection and execute the command (in this example, I will use touch to create a new file). Creating a new connection, however, is not enough. We have to do it in the try...finally block because the SSHHook does not close it automatically when we no longer need it:

ssh_client = None
try:
    ssh_client = ssh.get_conn()
    ssh_client.load_system_host_keys()
    ssh_client.exec_command('touch file_name')
finally:
    if ssh_client:
        ssh_client.close()

In the try block, I called the load_sytem_host_keys function to import the OpenSSH known_hosts file, which contains the remote hosts’ identifiers and is used as a rudiment method of man-in-the-middle attack prevention.

After that, I executed the exec_command function to create a new file. In this example, I don’t care about the server response, so I do not assign the values returned by exec_command to any variable.

If I wanted to get a response, I would have to assign the returned values to three variables:

stdin, stdout, stderr = ssh_client.exec_command('touch file_name')

Get Weekly AI Implementation Insights

Join engineering leaders who receive my analysis of common AI production failures and how to prevent them. No fluff, just actionable techniques.

SSHHook in PythonOperator

In this article, I want to show how to use the SSHHook with the PythonOperator. Therefore, I have to put the whole code in a function:

def create_the_file():
    ssh = SSHHook(ssh_conn_id=AIRFLOW_CONNECTION_ID)
    ssh_client = None
    try:
        ssh_client = ssh.get_conn()
        ssh_client.load_system_host_keys()
        ssh_client.exec_command('touch file_name')
    finally:
        if ssh_client:
            ssh_client.close()

and define a PythonOperator that calls my function:

call_ssh_task = PythonOperator(
    task_id='call_ssh_task',
    python_callable=create_the_file,
    dag=dag
)

Get Weekly AI Implementation Insights

Join engineering leaders who receive my analysis of common AI production failures and how to prevent them. No fluff, just actionable techniques.

Older post

Use the ROW_NUMBER() function to get top rows by partition in Hive

How to calculate row number by partition in Hive and use it to filter rows

Newer post

Remove a directory from S3 using Airflow S3Hook

How to remove files with a common prefix from S3

Engineering leaders: Is your AI failing in production? Take the 10-minute assessment
>