Spark Connect is a comparatively new element within the Spark ecosystem that enables skinny purchasers to run Spark functions on a distant Spark cluster. This expertise can supply some advantages to Spark functions that use the DataFrame API. Spark has lengthy allowed to run SQL queries on a distant Thrift JDBC server. Nevertheless, this skill to remotely run shopper functions written in any supported language (Scala, Python) appeared solely in Spark 3.4.
On this article, I’ll share our expertise utilizing Spark Join (model 3.5). I’ll discuss the advantages we gained, technical particulars associated to operating Spark shopper functions, and a few tips about make your Spark Join setup extra environment friendly and steady.
Spark is likely one of the key elements of the analytics platform at Joom. Now we have a lot of inner customers and over 1000 customized Spark functions. These functions run at totally different occasions of day, have totally different complexity, and require very totally different quantities of computing sources (starting from a number of cores for a few minutes to over 250 cores for a number of days). Beforehand, all of them have been all the time executed as separate Spark functions (with their very own driver and executors), which, within the case of small and medium-sized functions (we traditionally have many such functions), led to noticeable overhead. With the introduction of Spark Join, it’s now potential to arrange a shared Spark Join server and run many Spark shopper functions on it. Technically, the Spark Join server is a Spark utility with an embedded Spark Join endpoint.
Listed here are the advantages we have been capable of get from this:
- Useful resource financial savings
– When operating by way of Spark Join, shopper functions don’t require their very own Spark driver (which usually makes use of over 1.5 GB of reminiscence). As a substitute, they use a skinny shopper with a typical reminiscence consumption of 200 MB.
– Executor utilization improves since any executor can run the duties of a number of shopper functions. For instance, suppose some Spark utility, in some unspecified time in the future in its execution, begins utilizing considerably fewer cores and reminiscence than initially requested. There are numerous explanation why this may occur. Then, within the case of a separate Spark utility, at the moment unused sources are sometimes wasted since dynamic allocation typically doesn’t present environment friendly scale-down. Nevertheless, with the Spark Join server, the freed-up cores and reminiscence can instantly be used to run duties of different shopper functions. - Lowered startup wait time
– For varied causes, we now have to restrict the variety of concurrently operating separate Spark functions, they usually might wait within the queue for fairly a very long time if all slots are at the moment occupied. It may well negatively have an effect on knowledge readiness time and person expertise. Within the case of the Spark Join server, we now have up to now been capable of keep away from such limitations, and all Spark Join shopper functions begin operating instantly after launch.
– For ad-hoc executions, it’s fascinating to reduce the time to get outcomes as a lot as potential and keep away from retaining folks ready. Within the case of separate Spark functions, launching a shopper utility typically requires provisioning further EC2 nodes for its driver and executors, in addition to initializing the motive force and executors. All of this collectively can take greater than 4 minutes. Within the case of the Spark Join server, a minimum of its driver is all the time up and able to settle for requests, so it is just a matter of ready for added executors, and sometimes executors are already obtainable. This may increasingly considerably cut back the wait time for ad-hoc functions to be prepared.
Our constraints
In the mean time, we don’t run long-running heavy functions on Spark Join for the next causes:
- They could trigger failure or unstable habits of the Spark Join server (e.g., by overflowing disks on executor nodes). It may well result in large-scale issues for your complete platform.
- They typically require distinctive reminiscence settings and use particular optimization methods (e.g., customized extraStrategies).
- We at the moment have an issue with giving the Spark Join server a variety of executors to deal with a really massive simultaneous load (that is associated to the habits of Spark Activity Scheduler and is past the scope of this text).
Subsequently, heavy functions nonetheless run as separate Spark functions.
We use Spark on Kubernetes/EKS and Airflow. Some code examples might be particular to this atmosphere.
Now we have too many various, continually altering Spark functions, and it might take an excessive amount of time to manually decide for every one whether or not it ought to run on Spark Join in line with our standards or not. Moreover, the record of functions operating on Spark Join must be up to date recurrently. For instance, suppose right now, some utility is mild sufficient, so we now have determined to run it on Spark Join. However tomorrow, its builders might add a number of massive joins, making it fairly heavy. Then, it is going to be preferable to run it as a separate Spark utility. The reverse scenario can also be potential.
Finally, we created a service to robotically decide launch every particular shopper utility. This service analyzes the historical past of earlier runs for every utility, evaluating such metrics as Complete Activity Time
, Shuffle Write
, Disk Spill
, and others (this knowledge is collected utilizing SparkListener). Customized parameters set for the functions by builders (e.g., reminiscence settings of drivers and executors) are additionally thought-about. Primarily based on this knowledge, the service robotically determines for every utility whether or not it ought to be run this time on the Spark Join server or as a separate Spark utility. Thus, all our functions ought to be able to run in both of the 2 methods.
In the environment, every shopper utility is constructed independently of the others and has its personal JAR file containing the applying code, in addition to particular dependencies (for instance, ML functions typically use third-party libraries like CatBoost and so forth). The issue is that the SparkSession API for Spark Join is considerably totally different from the SparkSession API used for separate Spark functions (Spark Join purchasers use the spark-connect-client-jvm
artifact). Subsequently, we’re alleged to know on the construct time of every shopper utility whether or not it is going to run by way of Spark Join or not. However we have no idea that. The next describes our strategy to launching shopper functions, which eliminates the necessity to construct and handle two variations of JAR artifact for a similar utility.
For every Spark shopper utility, we construct just one JAR file containing the applying code and particular dependencies. This JAR is used each when operating on Spark Join and when operating as a separate Spark utility. Subsequently, these shopper JARs don’t comprise particular Spark dependencies. The suitable Spark dependencies (spark-core
/spark-sql
or spark-connect-client-jvm
) might be offered later within the Java classpath, relying on the run mode. In any case, all shopper functions use the identical Scala code to initialize SparkSession, which operates relying on the run mode. All shopper utility JARs are constructed for the common Spark API. So, within the a part of the code supposed for Spark Join purchasers, the SparkSession
strategies particular to the Spark Join API (distant
, addArtifact
) are referred to as by way of reflection:
val sparkConnectUri: Choice[String] = Choice(System.getenv("SPARK_CONNECT_URI"))val isSparkConnectMode: Boolean = sparkConnectUri.isDefined
def createSparkSession(): SparkSession = {
if (isSparkConnectMode) {
createRemoteSparkSession()
} else {
SparkSession.builder
// No matter you might want to do to configure SparkSession for a separate
// Spark utility.
.getOrCreate
}
}
non-public def createRemoteSparkSession(): SparkSession = {
val uri = sparkConnectUri.getOrElse(throw new Exception(
"Required atmosphere variable 'SPARK_CONNECT_URI' will not be set."))
val builder = SparkSession.builder
// Reflection is used right here as a result of the common SparkSession API doesn't
// comprise these strategies. They're solely obtainable within the SparkSession API
// model for Spark Join.
classOf[SparkSession.Builder]
.getDeclaredMethod("distant", classOf[String])
.invoke(builder, uri)
// A set of identifiers for this utility (for use later).
val scAppId = s"spark-connect-${UUID.randomUUID()}"
val airflowTaskId = Choice(System.getenv("AIRFLOW_TASK_ID"))
.getOrElse("unknown_airflow_task_id")
val session = builder
.config("spark.joom.scAppId", scAppId)
.config("spark.joom.airflowTaskId", airflowTaskId)
.getOrCreate()
// If the shopper utility makes use of your Scala code (e.g., customized UDFs),
// then you will need to add the jar artifact containing this code in order that it
// can be utilized on the Spark Join server aspect.
val addArtifact = Choice(System.getenv("ADD_ARTIFACT_TO_SC_SESSION"))
.forall(_.toBoolean)
if (addArtifact) {
val mainApplicationFilePath =
System.getenv("SPARK_CONNECT_MAIN_APPLICATION_FILE_PATH")
classOf[SparkSession]
.getDeclaredMethod("addArtifact", classOf[String])
.invoke(session, mainApplicationFilePath)
}
Runtime.getRuntime.addShutdownHook(new Thread() {
override def run(): Unit = {
session.shut()
}
})
session
}
Within the case of Spark Join mode, this shopper code may be run as a daily Java utility anyplace. Since we use Kubernetes, this runs in a Docker container. All dependencies particular to Spark Join are packed right into a Docker picture used to run shopper functions (a minimal instance of this picture may be discovered here). The picture incorporates not solely the spark-connect-client-jvm
artifact but in addition different frequent dependencies utilized by virtually all shopper functions (e.g., hadoop-aws
since we virtually all the time interact with S3 storage on the shopper aspect).
FROM openjdk:11-jre-slimWORKDIR /app
# Right here, we copy the frequent artifacts required for any of our Spark Join
# purchasers (primarily spark-connect-client-jvm, in addition to spark-hive,
# hadoop-aws, scala-library, and so on.).
COPY construct/libs/* /app/
COPY src/important/docker/entrypoint.sh /app/
RUN chmod +x ./entrypoint.sh
ENTRYPOINT ["./entrypoint.sh"]
This frequent Docker picture is used to run all our shopper functions with regards to operating them by way of Spark Join. On the similar time, it doesn’t comprise shopper JARs with the code of explicit functions and their dependencies as a result of there are lots of such functions which might be continually up to date and should rely upon any third-party libraries. As a substitute, when a selected shopper utility is launched, the situation of its JAR file is handed utilizing an atmosphere variable, and that JAR is downloaded throughout initialization in entrypoint.sh
:
#!/bin/bash
set -eo pipefail# This variable may also be used within the SparkSession builder inside
# the applying code.
export SPARK_CONNECT_MAIN_APPLICATION_FILE_PATH="/tmp/$(uuidgen).jar"
# Obtain the JAR with the code and particular dependencies of the shopper
# utility to be run. All such JAR information are saved in S3, and when
# making a shopper Pod, the trail to the required JAR is handed to it
# by way of atmosphere variables.
java -cp "/app/*" com.joom.analytics.sc.shopper.S3Downloader
${MAIN_APPLICATION_FILE_S3_PATH} ${SPARK_CONNECT_MAIN_APPLICATION_FILE_PATH}
# Launch the shopper utility. Any MAIN_CLASS initializes a SparkSession
# originally of its execution utilizing the code offered above.
java -cp ${SPARK_CONNECT_MAIN_APPLICATION_FILE_PATH}:"/app/*" ${MAIN_CLASS} "$@"
Lastly, when it comes time to launch the applying, our customized SparkAirflowOperator robotically determines the execution mode (Spark Join or separate) primarily based on the statistics of earlier runs of this utility.
- Within the case of Spark Join, we use KubernetesPodOperator to launch the shopper Pod of the applying.
KubernetesPodOperator
takes as parameters the beforehand described Docker picture, in addition to the atmosphere variables (MAIN_CLASS
,JAR_PATH
and others), which might be obtainable to be used insideentrypoint.sh
and the applying code. There is no such thing as a must allocate many sources to the shopper Pod (for instance, its typical consumption in the environment: reminiscence — 200 MB, vCPU — 0.15). - Within the case of a separate Spark utility, we use our customized AirflowOperator, which runs Spark functions utilizing spark-on-k8s-operator and the official Spark Docker image. Let’s skip the main points about our Spark AirflowOperator for now, as it’s a massive subject deserving a separate article.
Not all current Spark functions may be efficiently executed on Spark Join since its SparkSession API is totally different from the SparkSession API used for separate Spark functions. For instance, in case your code makes use of sparkSession.sparkContext
or sparkSession.sessionState
, it is going to fail within the Spark Join shopper as a result of the Spark Join model of SparkSession doesn’t have these properties.
In our case, the most typical reason behind issues was utilizing sparkSession.sessionState.catalog
and sparkSession.sparkContext.hadoopConfiguration
. In some circumstances, sparkSession.sessionState.catalog
may be changed with sparkSession.catalog
, however not all the time. sparkSession.sparkContext.hadoopConfiguration
could also be wanted if the code executed on the shopper aspect incorporates operations in your knowledge storage, akin to this:
def delete(path: Path, recursive: Boolean = true)
(implicit hadoopConfig: Configuration): Boolean = {
val fs = path.getFileSystem(hadoopConfig)
fs.delete(path, recursive)
}
Happily, it’s potential to create a standalone SessionCatalog
to be used throughout the Spark Join shopper. On this case, the category path of the Spark Join shopper should additionally embody org.apache.spark:spark-hive_2.12
, in addition to libraries for interacting along with your storage (since we use S3, so in our case, it’s org.apache.hadoop:hadoop-aws
).
import org.apache.spark.SparkConf
import org.apache.hadoop.conf.Configuration
import org.apache.spark.sql.hive.StandaloneHiveExternalCatalog
import org.apache.spark.sql.catalyst.catalog.{ExternalCatalogWithListener, SessionCatalog}// That is simply an instance of what the required properties may seem like.
// All of them ought to already be set for current Spark functions in a single
// manner or one other, and their full record may be discovered within the UI of any
// operating separate Spark utility on the Atmosphere tab.
val sessionCatalogConfig = Map(
"spark.hadoop.hive.metastore.uris" -> "thrift://metastore.spark:9083",
"spark.sql.catalogImplementation" -> "hive",
"spark.sql.catalog.spark_catalog" -> "org.apache.spark.sql.delta.catalog.DeltaCatalog",
)
val hadoopConfig = Map(
"hive.metastore.uris" -> "thrift://metastore.spark:9083",
"fs.s3.impl" -> "org.apache.hadoop.fs.s3a.S3AFileSystem",
"fs.s3a.aws.credentials.supplier" -> "com.amazonaws.auth.DefaultAWSCredentialsProviderChain",
"fs.s3a.endpoint" -> "s3.amazonaws.com",
// and others...
)
def createStandaloneSessionCatalog(): (SessionCatalog, Configuration) = {
val sparkConf = new SparkConf().setAll(sessionCatalogConfig)
val hadoopConfiguration = new Configuration()
hadoopConfig.foreach {
case (key, worth) => hadoopConfiguration.set(key, worth)
}
val externalCatalog = new StandaloneHiveExternalCatalog(
sparkConf, hadoopConfiguration)
val sessionCatalog = new SessionCatalog(
new ExternalCatalogWithListener(externalCatalog)
)
(sessionCatalog, hadoopConfiguration)
}
You additionally must create a wrapper for HiveExternalCatalog
accessible in your code (as a result of the HiveExternalCatalog
class is non-public to the org.apache.spark
package deal):
package deal org.apache.spark.sql.hiveimport org.apache.hadoop.conf.Configuration
import org.apache.spark.SparkConf
class StandaloneHiveExternalCatalog(conf: SparkConf, hadoopConf: Configuration)
extends HiveExternalCatalog(conf, hadoopConf)
Moreover, it’s typically potential to interchange code that doesn’t work on Spark Join with an alternate, for instance:
sparkSession.createDataFrame(sparkSession.sparkContext.parallelize(knowledge), schema)
==>sparkSession.createDataFrame(knowledge.toList.asJava, schema)
sparkSession.sparkContext.getConf.get(“some_property”)
==>sparkSession.conf.get(“some_property”)
Fallback to a separate Spark utility
Sadly, it’s not all the time simple to repair a selected Spark utility to make it work as a Spark Join shopper. For instance, third-party Spark elements used within the venture pose a major danger, as they’re typically written with out contemplating compatibility with Spark Join. Since, in the environment, any Spark utility may be robotically launched on Spark Join, we discovered it cheap to implement a fallback to a separate Spark utility in case of failure. Simplified, the logic is as follows:
- If some utility fails on Spark Join, we instantly attempt to rerun it as a separate Spark utility. On the similar time, we increment the counter of failures that occurred throughout execution on Spark Join (every shopper utility has its personal counter).
- The subsequent time this utility is launched, we verify the failure counter of this utility:
– If there are fewer than 3 failures, we assume that the final time, the applying might have failed not due to incompatibility with Spark Join however attributable to every other potential short-term causes. So, we attempt to run it on Spark Join once more. If it completes efficiently this time, the failure counter of this shopper utility is reset to zero.
– If there are already 3 failures, we assume that the applying can not work on Spark Join and cease trying to run it there for now. Additional, it is going to be launched solely as a separate Spark utility. - If the applying has 3 failures on Spark Join, however the final one was greater than 2 months in the past, we attempt to run it on Spark Join once more (in case one thing has modified in it throughout that point, making it appropriate with Spark Join). If it succeeds this time, we reset the failure counter to zero once more. If unsuccessful once more, the subsequent try might be in one other 2 months.
This strategy is considerably easier than sustaining code that identifies the explanations for failures from logs, and it really works properly generally. Makes an attempt to run incompatible functions on Spark Join normally would not have any important unfavorable impression as a result of, within the overwhelming majority of circumstances, if an utility is incompatible with Spark Join, it fails instantly after launch with out losing time and sources. Nevertheless, it is very important point out that every one our functions are idempotent.
As I already talked about, we accumulate Spark statistics for every Spark utility (most of our platform optimizations and alerts rely upon it). That is simple when the applying runs as a separate Spark utility. Within the case of Spark Join, the phases and duties of every shopper utility have to be separated from the phases and duties of all different shopper functions that run concurrently throughout the shared Spark Join server.
You possibly can go any identifiers to the Spark Join server by setting customized properties for the shopper SparkSession
:
val session = builder
.config("spark.joom.scAppId", scAppId)
.config("spark.joom.airflowTaskId", airflowTaskId)
.getOrCreate()
Then, within the SparkListener
on the Spark Join server aspect, you may retrieve all of the handed info and affiliate every stage/process with the actual shopper utility.
class StatsReportingSparkListener extends SparkListener {override def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted): Unit = {
val stageId = stageSubmitted.stageInfo.stageId
val stageAttemptNumber = stageSubmitted.stageInfo.attemptNumber()
val scAppId = stageSubmitted.properties.getProperty("spark.joom.scAppId")
// ...
}
}
Here, you can find the code for the StatsReportingSparkListener
we use to gather statistics. You may also be keen on this free tool for locating efficiency points in your Spark functions.
The Spark Join server is a completely operating Spark utility the place a lot of purchasers can run their Jobs. Subsequently, it may be worthwhile to customise its properties, which may make it extra dependable and stop waste of sources. Listed here are some settings that turned out to be helpful in our case:
// Utilizing dynamicAllocation is necessary for the Spark Join server
// as a result of the workload may be very erratically distributed over time.
spark.dynamicAllocation.enabled: true // default: false// This pair of parameters is liable for the well timed elimination of idle
// executors:
spark.dynamicAllocation.cachedExecutorIdleTimeout: 5m // default: infinity
spark.dynamicAllocation.shuffleTracking.timeout: 5m // default: infinity
// To create new executors solely when the present ones can not deal with
// the acquired duties for a major period of time. This enables you
// to avoid wasting sources when a small variety of duties arrive in some unspecified time in the future
// in time, which don't require many executors for well timed processing.
// With elevated schedulerBacklogTimeout, pointless executors don't
// have the chance to look by the point all incoming duties are
// accomplished. The time to finish the duties will increase barely with this,
// however generally, this enhance will not be important.
spark.dynamicAllocation.schedulerBacklogTimeout: 30s // default: 1s
// If, for some cause, you might want to cease the execution of a shopper
// utility (and unlock sources), you may forcibly terminate the shopper.
// Presently, even explicitly closing the shopper SparkSession doesn't
// instantly finish the execution of its corresponding Jobs on the server.
// They are going to proceed to run for a length equal to 'detachedTimeout'.
// Subsequently, it might be cheap to cut back it.
spark.join.execute.supervisor.detachedTimeout: 2m // default: 5m
// Now we have encountered a scenario when killed duties might dangle for
// an unpredictable period of time, resulting in dangerous penalties for his or her
// executors. On this case, it's higher to take away the executor on which
// this drawback occurred.
spark.process.reaper.enabled: true // default: false
spark.process.reaper.killTimeout: 300s // default: -1
// The Spark Join server can run for an prolonged time frame. Throughout
// this time, executors might fail, together with for causes past our management
// (e.g., AWS Spot interruptions). This feature is required to stop
// your complete server from failing in such circumstances.
spark.executor.maxNumFailures: 1000
// In our expertise, BroadcastJoin can result in very severe efficiency
// points in some circumstances. So, we determined to disable broadcasting.
// Disabling this selection normally doesn't end in a noticeable efficiency
// degradation for our typical functions anyway.
spark.sql.autoBroadcastJoinThreshold: -1 // default: 10MB
// For a lot of of our shopper functions, we now have so as to add an artifact to
// the shopper session (technique sparkSession.addArtifact()).
// Utilizing 'useFetchCache=true' leads to double house consumption for
// the applying JAR information on executors' disks, as they're additionally duplicated
// in an area cache folder. Typically, this even causes disk overflow with
// subsequent issues for the executor.
spark.information.useFetchCache: false // default: true
// To make sure honest useful resource allocation when a number of functions are
// operating concurrently.
spark.scheduler.mode: FAIR // default: FIFO
For instance, after we adjusted the idle timeout
properties, the useful resource utilization modified as follows:
Preventive restart
In the environment, the Spark Join server (model 3.5) might change into unstable after a number of days of steady operation. Most frequently, we face randomly hanging shopper utility jobs for an infinite period of time, however there could also be different issues as properly. Additionally, over time, the likelihood of a random failure of your complete Spark Join server will increase dramatically, and this may occur on the unsuitable second.
As this element evolves, it is going to possible change into extra steady (or we’ll discover out that we now have completed one thing unsuitable in our Spark Join setup). However at the moment, the only answer has turned out to be a each day preventive restart of the Spark Join server at an acceptable second (i.e., when no shopper functions are operating on it). An instance of what the restart code may seem like can be found here.
On this article, I described our expertise utilizing Spark Connect with run a lot of numerous Spark functions.
To summarize the above:
- This element can assist save sources and cut back the wait time for the execution of Spark shopper functions.
- It’s higher to watch out about which functions ought to be run on the shared Spark Join server, as resource-intensive functions might trigger issues for your complete system.
- You possibly can create an infrastructure for launching shopper functions in order that the choice on run any utility (both as a separate Spark utility or as a Spark Join shopper) may be made robotically in the meanwhile of launch.
- You will need to word that not all functions will have the ability to run on Spark Join, however the variety of such circumstances may be considerably diminished. If there’s a risk of operating functions that haven’t been examined for compatibility with the Spark Join model of SparkSession API, it’s price implementing a fallback to separate Spark functions.
- It’s price being attentive to the Spark properties that may enhance useful resource utilization and enhance the general stability of the Spark Join server. It might even be cheap to arrange a periodic preventive restart of the Spark Join server to cut back the likelihood of unintentional failure and undesirable habits.
Total, we now have had a optimistic expertise utilizing Spark Join in our firm. We are going to proceed to observe the event of this expertise with nice curiosity, and there’s a plan to develop its use.