In this article, I show how to use the SSHHook in a PythonOperator to connect to a remote server from Airflow using SSH and execute a command.

Table of Contents

  1. SSHHook in PythonOperator

First, I have to define the SSH connection in Airflow because I will pass the connection parameters using the Airflow connection id instead of defining the host, port, username, and password in the Python code.

When that part is done, I can define the function that connects to SSH:

from airflow.contrib.hooks.ssh_hook import SSHHook

ssh = SSHHook(ssh_conn_id=AIRFLOW_CONNECTION_ID)

In the next step, I open a new connection and execute the command (in this example, I will use touch to create a new file). Creating a new connection, however, is not enough. We have to do it in the try...finally block because the SSHHook does not close it automatically when we no longer need it:

ssh_client = None
try:
    ssh_client = ssh.get_conn()
    ssh_client.load_system_host_keys()
    ssh_client.exec_command('touch file_name')
finally:
    if ssh_client:
        ssh_client.close()

In the try block, I called the load_sytem_host_keys function to import the OpenSSH known_hosts file, which contains the remote hosts’ identifiers and is used as a rudiment method of man-in-the-middle attack prevention.

After that, I executed the exec_command function to create a new file. In this example, I don’t care about the server response, so I do not assign the values returned by exec_command to any variable.

If I wanted to get a response, I would have to assign the returned values to three variables:

stdin, stdout, stderr = ssh_client.exec_command('touch file_name')

Want to build AI systems that actually work?

Download my expert-crafted GenAI Transformation Guide for Data Teams and discover how to properly measure AI performance, set up guardrails, and continuously improve your AI solutions like the pros.

SSHHook in PythonOperator

In this article, I want to show how to use the SSHHook with the PythonOperator. Therefore, I have to put the whole code in a function:

def create_the_file():
    ssh = SSHHook(ssh_conn_id=AIRFLOW_CONNECTION_ID)
    ssh_client = None
    try:
        ssh_client = ssh.get_conn()
        ssh_client.load_system_host_keys()
        ssh_client.exec_command('touch file_name')
    finally:
        if ssh_client:
            ssh_client.close()

and define a PythonOperator that calls my function:

call_ssh_task = PythonOperator(
    task_id='call_ssh_task',
    python_callable=create_the_file,
    dag=dag
)

Want to build AI systems that actually work?

Download my expert-crafted GenAI Transformation Guide for Data Teams and discover how to properly measure AI performance, set up guardrails, and continuously improve your AI solutions like the pros.

Older post

Use the ROW_NUMBER() function to get top rows by partition in Hive

How to calculate row number by partition in Hive and use it to filter rows

Newer post

Remove a directory from S3 using Airflow S3Hook

How to remove files with a common prefix from S3

Are you looking for an experienced AI consultant? Do you need assistance with your RAG or Agentic Workflow?
Book a Quick Consultation, send me a message on LinkedIn. Book a Quick Consultation or send me a message on LinkedIn

>