Some spark applications may benefit from running parts in parallel. Use case is :

--master yarn

--deploy-mode client


The following technique proved to be cost and time effective while converting hive applications to spark

  1. skip creating the temporary tables and create temporary views instead from dataframes
  2. independent datasets can be created in different threads and their results saved as temporary views, thus a view created in a thread will be visible to other parts of application after joining the threads.

Different approaches are based on the amount of threads needed and the programming language

Scala multithreading using Futures

import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.IntegerType
import org.apache.spark.sql._
import java.util.Calendar
import scala.concurrent.forkjoin._
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration._
import scala.concurrent.{Await, Future}

object hiveSales
{
def main(args: Array[String])
{
var cbaspark:cbaSpark = null;
val configFile=args(0)

var sparkProperties="hive.metastore.uris=thrift://datanode1.custom-built-apps.com:9083;"
sparkProperties += "spark.driver.maxResultSize=0;spark.sql.session.timeZone=Canada/Eastern;"
sparkProperties += "hive.exec.dynamic.partition=true;hive.exec.dynamic.partition.mode=nonstrict;"

cbaspark= new cbaSpark("hiveSales",configFile,sparkProperties)
cbaspark.printStart
cbaspark.printSettings

// Remove unnecessary variables or add the ones needed
val source_db=cbaspark.variables("source_db")
val stage_db=cbaspark.variables("stage_db")
val target_db=cbaspark.variables("target_db")
val warehouseLocation=cbaspark.variables("warehouseLocation")
// NOTE THIS SECTION MUST BE EDITED TO MATCH THE TASKS AT HAND // it is provided for reference only

///// Create staging tables

val thread1=Future
{

/////////////// products //////////////
var tableName=s"${stage_db}.products_current"
var dataPath=s"${warehouseLocation}/${tableName}"
var location=s""" location '${dataPath}'"""


// drop the table
cbaspark.dropTable(tableName,dataPath)


val strCreateProducts =s"""create table ${tableName}
stored as parquet
${location}
tblproperties('parquet.compress'='SNAPPY')
as
select
id
,productname
from ${source_db}.products_src where iscurrent='Y'
"""
println("[INFO] products_current has been created")

cbaspark.execHQL(strCreateProducts)
}

val thread2=Future
{
/////////////// customers //////////////
var tableName=s"${stage_db}.customers_current"
var dataPath=s"${warehouseLocation}/${tableName}"
var location=s""" location '${dataPath}'"""


// drop the table
cbaspark.dropTable(tableName,dataPath)

val strCreateCustomers =s"""create table ${tableName}
stored as parquet
${location}
tblproperties('parquet.compress'='SNAPPY')
as
select
id
,customername
from ${source_db}.customers_src where iscurrent='Y'
"""
println("[INFO] customers_current has been created")

cbaspark.execHQL(strCreateCustomers)
}
Await.result(thread1,Duration.Inf)
Await.result(thread2,Duration.Inf)


/////////////// sales //////////////
var tableName=s"${stage_db}.sales_current"
var dataPath=s"${warehouseLocation}/${tableName}"
var location=s""" location '${dataPath}'"""


// drop the table
cbaspark.dropTable(tableName,dataPath)

val strCreateSales =s"""create table ${tableName}
stored as parquet
${location}
tblproperties('parquet.compress'='SNAPPY')
as
SELECT
customerid
, productid
, amount
, sales_dt
from ${source_db}.sales where
customerid in (select id from ${stage_db}.customers_current)
and productid in (select id from ${stage_db}.products_current)
"""
println("[INFO] sales_current has been created")

cbaspark.execHQL(strCreateSales)

cbaspark.stop
cbaspark.printEnd

} //end of main
} //end of Class



///////////////////////// cbaSpark class ////////////////////

//File : cbaSpark.scala
//Author : Boris Alexandrov <boris.alexandrov@custom-built-apps.com>
//Date : Thu Mar 26 15:32:39 EDT 2020
//Project : cbaSpark implementation
//Version : v1.0
//Revision :initial version
// :Testing with scala 2.12.10 and Spark 3.0.0
////////////////////////////////////////////

package com.cba.spark
import org.apache.spark.sql.{Row, SaveMode, SparkSession}
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.IntegerType
import org.apache.spark.sql._
import scala.io.Source
import org.apache.log4j.Logger
import org.apache.log4j.Level
import java.util.Calendar
import scala.concurrent.forkjoin._
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration._
import scala.concurrent.{Await, Future}
import org.apache.hadoop.fs.{FileSystem,Path}

class cbaSpark(appName :String, configFile :String, sparkProperties :String) extends Serializable
{

Logger.getLogger("org").setLevel(Level.OFF)
Logger.getLogger("akka").setLevel(Level.OFF)

var spark :SparkSession=null
var variables=Map[String,String]()
try
{

// create spark session

val builder=SparkSession.builder()

builder.appName(appName)

///////////////////////// add some spark configuration if any
if(sparkProperties.contains(";") && sparkProperties.contains("="))
{
val sparkConfigPairs=sparkProperties.split(";")
sparkConfigPairs.foreach
{
sparkConfigPair=>
var p = sparkConfigPair.split("=")
//////// set the configuration parameters to spark session
builder.config(p(0),p(1))

}
}
/////// create spark session with hive support based on the builder parameters ///////

spark=builder.enableHiveSupport()
.getOrCreate()


///////// get the settings from config file into the variables map
////////// the settings file must be in hdfs so that both command line spark submit and oozie could pick it up

val df=spark.read.textFile(configFile)
df.collect.foreach
{
variablePair=>
var p = variablePair.split("=")
variables +=(p(0)→p(1))

}

/////////////// register UDFs to use in spark sql /////////////////
spark.udf.register("myuuid",udf(()=>java.util.UUID.randomUUID().toString))
} //end of try
catch
{
case e : Exception => println("[ERROR] " + e)
}
def printSettings
{
println("///////////// SETTINGS VARIABLES /////////////////////////")
variables.foreach
{
e: (String,String) => println(e._1 + "=" + e._2)
}
println("///////////// END OF SETTINGS VARIABLES //////////////////")

println("////////////////////// SPARK CONFIGURATION ////////////////////////////")
spark.conf.getAll.foreach(println)
println("////////////////////// END OF SPARK CONFIGURATION /////////////////////")

}
def execHiveQuery(strSQL:String) : DataFrame =
{
var df=spark.sql(strSQL)
df
}

def createView(df:DataFrame,viewName:String)
{
df.createOrReplaceTempView(viewName)
}

def dropTable(tableName:String,dataPath:String)
{
val strDropTable=s"drop table if exists ${tableName}"
spark.sql(strDropTable)
///// if the table is external drop the data
val fs=FileSystem.get(spark.sparkContext.hadoopConfiguration)
fs.delete(new Path(dataPath),true)
}
def printStart
{

println("[START] Starting Application id: " + spark.conf.get("spark.app.id"))
val startTime=Calendar.getInstance().getTime()
println("[INFO] Start time: " + startTime)

}

def printEnd
{

println("[INFO] Completed Application id: " + spark.conf.get("spark.app.id"))
val endTime=Calendar.getInstance().getTime()
println("[END] End time: " + endTime)

}

def stop

{
spark.stop()
}

} //end of Class


Java multithreading using Thread


Java multithreading using Futures and Executors

//File : testFuture.java
//Author : Boris Alexandrov<boris.alexandrov@custom-built-apps.com>
//Date : Sun Sep 20 17:25:21 EDT 2020
//Project : Spark based project
//Version : v1.0
//Revision : initial version

package com.cba.spark;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;

import java.util.ArrayList;
import java.util.List;
import java.io.IOException;
import java.io.Serializable;
import java.util.Iterator;
import java.time.temporal.ChronoUnit;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import com.google.common.collect.Lists;


import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger;

public class testFuture implements Runnable

{

SparkSession spark;
String strSQL;
String viewName;
final static AtomicInteger counter=new AtomicInteger();

public testFuture(SparkSession s,String s1,String v)
{
this.spark=s;
this.strSQL=s1;
this.viewName=v;
}

public static void main(String[] args) throws ExecutionException, InterruptedException
{
String applicationName="testFuture";

try
{

SparkSession spark=SparkSession
.builder()
.appName(applicationName)
.enableHiveSupport()
.config("spark.sql.crossJoin.enabled","true")
.config("hive.exec.dynamic.partition.mode","nonstrict")
.config("hive.exec.dynamic.partition","true")
.getOrCreate();

LocalDateTime startTime=LocalDateTime.now();
System.out.println("Application id: " + spark.sparkContext().applicationId());
System.out.println("Application start time: " + startTime);

// generate array of statements
ArrayList<String> statements=new ArrayList<String>();

String src_db="dataexplorer1";
String SQL="select * from ";
SQL += src_db;
SQL += ".hosts";
statements.add(SQL);

SQL="select * from ";
SQL += src_db;
SQL += ".hosts_bk";
statements.add(SQL);

SQL="select * from ";
SQL += src_db;
SQL += ".myfirsttab";
statements.add(SQL);

ArrayList<String>views=new ArrayList<String>();
views.add("view1");
views.add("view2");
views.add("view3");


///////////// RUN SQL IN SEVERAL THREADS //////////
//
int poolSize=5;
ExecutorService service = Executors.newFixedThreadPool(poolSize);
List<Future<Runnable>> futures = new ArrayList<Future<Runnable>>();
// create 3 threads to run sql simultaneously
//
for (int i=0;i<views.size();i++)
{
Future f = service.submit(new testFuture(spark,statements.get(i),views.get(i)));
futures.add(f);

}

// wait for all tasks to complete before continuing
for (Future<Runnable> f : futures)
{
f.get();
}

Dataset<Row> df=spark.sql("select * from view1");
df.collectAsList().forEach(
row->
{
System.out.println(row.getInt(0) + " " + row.getString(1));
}
);


Dataset<Row> df1=spark.sql("select * from view2");
df1.collectAsList().forEach(
row->
{
System.out.println(row.getInt(0) + " " + row.getString(1));
}
);


Dataset<Row> df3=spark.sql("select * from view3");
df3.collectAsList().forEach(
row->
{
System.out.println(row.getInt(0) + " " + row.getString(1));
}

);
// Create a join on the 3 views

// create another set of Futures
//
//
//shut down the executor service so that this thread can exit
service.shutdownNow();

LocalDateTime completionTime=LocalDateTime.now();
long diff = ChronoUnit.SECONDS.between(startTime,completionTime);
System.out.println("Application completion time :" + completionTime);
System.out.println("Application run time: " + diff + " seconds.");


//clean the spark session
spark.stop();
}
catch(Exception e)
{

System.out.println(e.getMessage());
}
}//end of main

public void run()
{
Dataset<Row> df=this.spark.sql(this.strSQL);
df.createOrReplaceTempView(this.viewName);
counter.incrementAndGet();
System.out.println(counter +"." + viewName + " completed");
}
} //end of class