Updated February 27, 2023
Introduction to Spark Accumulator
Shared variables are used by Apache Spark. When a cluster executor is sent a task by the driver, each node of the cluster receives a copy of shared variables. There are two basic types supported by Apache Spark of shared variables – Accumulator and broadcast. Apache Spark is widely used and is an open-source cluster computing framework. This comes with features like computation machine learning, streaming of API’s, and graph processing algorithms. Variables that are added through associated operations are Accumulators. Implementing sums and counters are one of the examples of accumulator tasks and there are many other tasks as such. Numeric types are supported by spark easily than any other type, but support can be added to other types by the programmers.
Syntax:
The above code shares the details for the class accumulator of PySpark.
val acc = sc.accumulator(v)
Initially v is set to zero more preferentially when one performs sum r a count operation.
Why do we Use Spark Accumulator?
When a user wants to perform communicative or associate operations on the data, we use the Spark accumulator. These can be created without or with a name. The Sparks UI helps in viewing the name created with accumulator and these can also be useful in understanding the progress of running stages. By calling SparkContext.accumulator(v), the accumulator can be created taking the initial value as v, just as similar to Spark broadcast. Used in implementing sums and counter operations as in MapReduce functions. Accumulators are not supported in Python.
Code:
package org.spark.accumulator.crowd.now.aggregator.sample2
var lalaLines: Int = 0
sc.textFile("some log file", 4)
.forech { line =>
if (line.length() == 0) lalaLines += 1
}
println (s " Lala Lines are from the above code = $lalaLines")
In the above code, the value will be zero when the blank lines code output is printed. Shifting of code by the Spark to each and every executor, the variables are local to that executor, and the latest and updated value is not given back to the driver. Making the blank Lines as an accumulator might solve the above problem. And that will help in updating back all the changes to every variable in every executor.
The above code is written like this:
Code:
package org.spark.accumulator.crowd.now.aggregator.sample2
var lalaLines = sc.accumulator(, “lala Lines”)
sc.textFile("some log file", 4)
.forech { line =>
if (line.length() == 0) lalaLines += 1
}
println (s "\tlala Lines are from the above code = $lalaLines.value")
This code makes sure that the accumulator blankLine is up to date across each executor and relays back to the driver.
How Does Spark Accumulator Work?
Variables of broadcast allow the developers of Spark to keep a secured read only cached variable on different nodes. With the needed tasks, only shipping a copy merely. Without having to waste a lot of time and transfer of network input and output, they can be used in giving a node a large copy of the input dataset. Broadcast variables can be distributed by Spark using a variety of accumulator algorithms which might turn largely and the cost of communication is reduced.
There are different stages in executing the actions of Spark. The stages are then separated by operation – shuffle. In every stage Spark accumulator automatically the common data needs to be in the cache, and should be serialized from which again will be de-serialised by every node before each task is run. And for this cause, If the variables of the broadcast are created explicitly, the multiple staged tasks all across needed with the same data, the above should be done.
The mentioned above broadcast variable creation by wrapping function SparkConext.accumulator, the code for it is:
Code:
val accum = sc.accumulator(0)
accum: spark.Accumulator[Int] = 0
sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum += x)
accum.value
res2: Int = 20
package org.spark.accumulator.crowd.now.aggregator.sample2
object Boot {
import util.is
def main(args: Array[String]): Unit = {
val sparkConfigiration = new Spark.Configuration(true)
.setMaster_("L[4]")
.setNameOfApp("Analyzer Spark")
val sparkContext = new Spark.Context(spark.Configuration)
val httpStatusList = sparkContext broadcast populateHttpStatusList
val Info_http = sparkContext accumulator(0, "HTTP a")
val Success_http = sparkContext accumulator(0, "HTTP b")
val Redirect_http = sparkContext accumulator(0, "HTTP c")
val Client.Error_http = sparkContext accumulator(0, "HTTP d")
val Server.Error_http = sparkContext accumulator(0, "HTTP e")
sparkContext.tf(gClass.gRes("log").gPath,
println("THE_START")
println("HttpStatusCodes are going to be printed in result from access the log parse")
println("Http Status Info")
println("Status Success")
println("Status Redirect")
println("Client Error")
println("Server Error")
println("THE_END")
spark.Context.stop()
}
}
}
The variable of the broadcast is called a value and it stores the user data. The variable also returns a value of broadcast.
accumulator_ = sc.accumulator(0)
rdd = sc.parallelize([1, 2, 3, 4])
def f(x):
global accumulator_
accum += x
rdd.foreach(f)
accum.value
The accumulator command line is given below:
$SPARK_HOME/bin/spark-submit accumulator.py
Output:
Advantages and Uses of Spark Accumulator
Memory access is very direct.
- Garbage values are least collected in processing overhead.
- Memory format is compact columnar.
- Query catalyst optimization.
- Code generation is the whole stage.
- Advantages of compile tile type by datasets over the data-frames.
Conclusion
We have seen the concept of the Spark accumulator. Spark uses shared variables, for processing and parallel. For information aggregations and communicative associations and operations, accumulators variables are used. in map-reduce, for summing the counter or operation we can use an accumulator. Whereas in spark, the variables are mutable. Accumulator’s value cannot be read by the executors. But only the driver program can. Counter in Map reduce java is similar to this.
Recommended Articles
This is a guide to Spark Accumulator. Here we discuss Introduction to Spark Accumulator and how it works along with its advantages and Uses. You can also go through our other suggested articles to learn more –