In this article, I show how to use the SSHHook
in a PythonOperator
to connect to a remote server from Airflow using SSH and execute a command.
Table of Contents
First, I have to define the SSH connection in Airflow because I will pass the connection parameters using the Airflow connection id instead of defining the host, port, username, and password in the Python code.
When that part is done, I can define the function that connects to SSH:
from airflow.contrib.hooks.ssh_hook import SSHHook
ssh = SSHHook(ssh_conn_id=AIRFLOW_CONNECTION_ID)
In the next step, I open a new connection and execute the command (in this example, I will use touch
to create a new file). Creating a new connection, however, is not enough. We have to do it in the try...finally
block because the SSHHook
does not close it automatically when we no longer need it:
ssh_client = None
try:
ssh_client = ssh.get_conn()
ssh_client.load_system_host_keys()
ssh_client.exec_command('touch file_name')
finally:
if ssh_client:
ssh_client.close()
In the try
block, I called the load_sytem_host_keys
function to import the OpenSSH known_hosts
file, which contains the remote hosts’ identifiers and is used as a rudiment method of man-in-the-middle attack prevention.
After that, I executed the exec_command
function to create a new file. In this example, I don’t care about the server response, so I do not assign the values returned by exec_command
to any variable.
If I wanted to get a response, I would have to assign the returned values to three variables:
stdin, stdout, stderr = ssh_client.exec_command('touch file_name')
Want to build AI systems that actually work?
Download my expert-crafted GenAI Transformation Guide for Data Teams and discover how to properly measure AI performance, set up guardrails, and continuously improve your AI solutions like the pros.
SSHHook in PythonOperator
In this article, I want to show how to use the SSHHook
with the PythonOperator
. Therefore, I have to put the whole code in a function:
def create_the_file():
ssh = SSHHook(ssh_conn_id=AIRFLOW_CONNECTION_ID)
ssh_client = None
try:
ssh_client = ssh.get_conn()
ssh_client.load_system_host_keys()
ssh_client.exec_command('touch file_name')
finally:
if ssh_client:
ssh_client.close()
and define a PythonOperator
that calls my function:
call_ssh_task = PythonOperator(
task_id='call_ssh_task',
python_callable=create_the_file,
dag=dag
)