Airflow - Deploy and Execute Spark Job using Yarn Cluster Mode

2018 was exciting year and I might be bit lazy for not sharing enough. To close 2018 decided to share few useful tips based on some experiments I did. Here is the one on Airflow DAG and Spark-Submit.

Recently I was trying to create an airflow DAG. Looks Simple :-). 
Slight twist I wanted this DAG to be submitted using yarn (deploy-mode cluster) and execute on Spark. Search didn't yield much result to address the challenge and hence this blog. 

When we talk about airflow, Python comes into mind. For Spark famous PySpark came to my mind. After few experiments realised PySpark meant for interactive use only means it can be used only in client mode. PySpark internally launches Spark-Shell means interactive :-(. It will not work in Cluster mode. Almost convinced at this point I need to write my own python operator (tried a bit) and then found existing SparkSubmitOperator. 

SparkSubmitOperator In simple terms it is wrapper around Spark-Submit. This Python based Operator allows you to deploy your Job to spark cluster in "cluster mode". I'll share other deploy-mode options in upcoming blogs.

Now here are some details on SparkSubmitOperator. Please include following dependency
from airflow.contrib.operators.spark_submit_operator import SparkSubmitOperator


#Defined Different Input Parameters. You can add based on your spark-submit requirement.
 #Spark-Submit-Operator Configuration Settings

__config = {
    'driver_memory': '2g', #spark submit equivalent spark.driver.memory or driver-memory
    'executor_memory': '512m',#spark-submit equivalent executor-memory
    'num_executors': 3, #spark-submit equivalent num-executors
    'executor-cores': 3,
    'verbose': True,
    'name': 'Scapidb2TrxUpcase',#Application Name i.e. --name
    'java_class': 'com.rkg.spark.scala.generated.Scapidb2TrxUpcase’,#Spark Submit Main Class --class
    'application': '/Users/sethugupta/Scapidb2TrxUpcase.jar',#Application Jar File
    'application_args': ['yarn', '172.18.0.2',]#Application Arguments supplied along with jar
}#My Application <Path>Scapidb2TrxUpcase_batch.jar contains 2 input args yarn 172.18.0.2

 #Using SparkSubmitOperator

spark_submit_task = SparkSubmitOperator(
    task_id='sparksubmit_job', #Any Name: Your Airflow Task ID
    conn_id='spark_hdp', #Spark Connection in Airflow, Can use spark_default
    conf={ #Properties you want to define as --conf during your spark-submit call
        'spark.hadoop.yarn.timeline-service.enabled': 'false',#Latest Yarn and HDP issue
        'spark.authenticate': 'false',
        'spark.driver.host': '172.18.0.2',
        'spark.driver.extraClassPath': abc.jar,xyz.jar, #spark.driver.extraClassPath
        'spark.jars': file:/tmp/bdf.jar,file:/tmp/pqr.jar#Dependent Jar Files
    },#You can add any other --conf property inside it.
    dag=dag,#Dag Name
    **__config #Different Properties defined above to keep the use simpler
)

Here are details about my Spark Connector created under Airflow Admin Tab. Use host yarnhere. Can leave Connection Type empty. You can add more information under this extra which doesn't fit within config and --conf. 
{"queue":"default", "deploy-mode":"cluster", "spark-home":"My Spark Home", "spark-binary":"spark-submit", "namespace":"default"}
Here are Details about SparkSubmitOperator itself I used along with its source.Please do let me know if you need additional help. I'll find a mechanism to publish working sample soon. I am exploring for helper operators to address few gaps this operator has related to real-time log information from cluster to airflow.

-Ritesh
Disclaimer: “The postings on this site are my own and don’t necessarily represent IBM’s positions, strategies or opinions.”