1.x - RDD framework 2.x - DataSets / Dataframe
Important spark links - D-Zone
Keep in mind that repartitioning your data is a fairly expensive operation.
Spark also has an optimized version of repartition() called coalesce() that allows avoiding data movement,
but only if you are decreasing the number of RDD partitions.
ex:
It avoids a full shuffle. If it's known that the number is decreasing then the executor can safely keep data on the minimum number of partitions, only moving the data off the extra nodes, onto the nodes that we kept.
So, it would go something like this:
Node 1 = 1,2,3
Node 2 = 4,5,6
Node 3 = 7,8,9
Node 4 = 10,11,12
Then coalesce down to 2 partitions:
Node 1 = 1,2,3 + (10,11,12)
Node 3 = 7,8,9 + (4,5,6)
Notice that Node 1 and Node 3 did not require its original data to move.
coalesce uses existing partitions to minimize the amount of data that’s shuffled. repartition creates new partitions and does a full shuffle. coalesce results in partitions with different amounts of data (sometimes partitions that have much different sizes) and repartition results in roughly equal sized partitions.
MSCK REPAIR TABLE
can be a costly operation, because it needs to scan the table’s sub-tree in the file system (the S3 bucket).
Multiple levels of partitioning can make it more costly, as it needs to traverse additional sub-directories.
Assuming all potential combinations of partition values occur in the data set, this can turn into a combinatorial explosion.
If you are adding new partitions to an existing table, then you may find that it’s more efficient to run ALTER TABLE ADD PARTITION
commands for the individual new partitions. This avoids the need to scan the table’s entire sub-tree in the file system.
It is less convenient than simply running MSCK REPAIR TABLE
, but sometimes the optimization is worth it. A viable strategy
is often to use MSCK REPAIR TABLE
for an initial import, and then use ALTER TABLE ADD PARTITION
for ongoing maintenance
as new data gets added into the table.
If it’s really not feasible to use ALTER TABLE ADD PARTITION
to manage the partitions directly,
then the execution time might be unavoidable. Reducing the number of partitions might reduce execution time,
because it won’t need to traverse as many directories in the file system. Of course, then the partitioning is different,
which might impact query execution time, so it’s a trade-off.
For transformations, Spark adds them to a DAG of computation and only when driver requests some data, does this DAG actually gets executed.
One advantage of this is that Spark can make many optimization decisions after it had a chance to look at the DAG in entirety. This would not be possible if it executed everything as soon as it got it.
For example – if you executed every transformation eagerly, what does that mean? Well, it means you will have to materialize that many intermediate datasets in memory. This is evidently not efficient – for one, it will increase your GC costs. (Because you’re really not interested in those intermediate results as such. Those are just convnient abstractions for you while writing the program.) So, what you do instead is – you tell Spark what is the eventual answer you’re interested and it figures out best way to get there.
Spark Transformation vs Action
Cluster
- Driver
- Executor (Memory / Disk) (Cores / Task Slots)
- Actions -> (One or more transformations (1 or more) -> Job->(Stages 1 or more)->(Tasks 1 or more)
- Only task interacts with H/W.(tasks in same stage performs same transformation but on different data.
- If a different operation / transformation needs to be performed it needs to be in a different stage.
- 1 Task == 1 partition == 1 slot == 1 core.
sql.files.maxPartitionBytes
sql.shuffle.partitions
df.write.option(maxRecordsPerFile, N)
Output target sixe for shuffle : ~200mb
partition count for shuffle would be : input size to the shuffle stage / output target size
ex:
shuffle input stage : 210GB
shuffle partitions = 2100000MB/200MB = 1050
so the shuffle partition should be 1050 partitions.
but if there are 2000 cores avaialble then we should use 2000 as the shuffle partition.
Optimize further by using the ceiling of core count
so total partitions for shuffle = Math.ceil(( Input size to shuffle / target size) / total cores) * total cores
By this way you ensure that you are always using all the cores effectively.
df.localCheckpoint().partition(n).write()...
for creating local stage barrier.df.cache = df.persist(MEMORY_AND_DISK)
remember that persist actually uses the same internal memory that is
used by other operations like shuffle and join. So persist with care.sql.auto.autoBroadCastJoinThreshold
default (10 m)
Risk associated with BCJ is
driver.maxResultSize
Make sure you have validation functions to make sure that all these are caught.
Note: So whenever the BCJ is deserialized and row compressed they take more space than what we initially assumption so account for that.
data1.join(broadcast(data2), data1.id == data2.id)
skew data example with salting fix:
df.groupByKey("city","state").agg(f(x)).orderBy(col, ascending=False)
salt = random(0,spark.cong.get(shuffle.partitions) - 1)
df.withColumn("salt", lit(salt))
.groupByKey("city","state", "salt")
.agg(f(x))
.drop("salt")
.orderBy(col, ascending=False)
Try to use null safe equality
for nulls in the datasets. use isolated salting
for avoiding nulls. In this
you basically only apply high salting to columns with null
.
You should use Not exists
instead on Not in
in SQL.
Range Join optimization?? Not explained check reference.
approxCountDistinct()
5% margin of error.dropDuplicates before JOIN or groupBy
. dropDuplicates
is an alias.If you can use an explode and sql.functions()
instead of doing map
or flatmap
that is usually better.
If you are using primitives in UDF’s it’s usually not vectorized and is not gonna perform that well.
Try to use sql.function
wherever possible. use pandas UDF’s or Arrow UDF’s if not available in function
Summary
solve(problem):
if problem is small enough:
solve problem directly (sequential algorithm)
else:
for part in subdivide(problem)
fork subtask to solve(part)
join all subtasks spawned in previous loop
return combined results
Java implementation for fork join
Question : Say that we have a partitioned data set and there is code which runs a series of SQL
spark SQL queries all of them operating over the columns rather than the rows of the data
set there is and one of these queries is actually doing an aggregate operation over each of the each of the columns
even though this is quick the other queries in the in this process are gonna take a lot of time okay so my
question here is in this situation should I be looking into scheduling of these tasks itself and second
thing is like when you’re processing columns rather than rows should we be like looking at some sort
of like vertical partitioning rather than like the just repartition of spark which offers.
Someone else’s notes on Daniel lecture
##Memory Tuning for spark Basic memory configuration
JVM Heap memory
- Spark Memory (0.75 of heap)
1. Executor Memory : It's mainly used to store temporary data in the calculation process of Shuffle, Join, Sort, Aggregation, etc.
2. Storage Memory : It's mainly used to store Spark cache data, such as RDD cache, Broadcast variable, Unroll data, and so on.
- User Memory ( It's mainly used to store the data needed for RDD conversion operations, such as the information for RDD dependency. (~0.25 MB)
- Reserved Memory (Reserved for system and spark's internal objects)(300 MB)
##CI-CD for spark DataBricks CI-CD DataBricks Video At Metacog Using AWS and Github actions
Join strategies Broadcast Hash Join, Shuffle Hash Join, Shuffle Sort Merge, Iterative Broadcast Join
Troubleshooting shuffle / uneven sharding - Some task are taking a lot of time. speculative task’s are triggered.
ShuffleHashJoin (Impt)
Problems with shuffle joins usually are
join_df = sqlContext.sql("Select * FROM people_in_the_US JOIN states ON people_in_the_US.states = states");
sqlContext.sql("Select * FROM people_in_cali LEFT JOIN all_people_in_world ON people_in_cali.id = all_people_in_world.id");
BroadcastHashJoin (Impt)
SortMergeJoin
CartesianJoin Not much explained here.
One to Many Joins One to many joins - parquet took care of it?
Theta Joins Join is based on a condition - Full cartesian join and then do the condition. Generate bucket to match condition and match the buckets?? not clear about this as well.
join(self, other, on=None, how=None)
how=left,right,inner,outer
ex : empDF.join(deptDF,empDF.emp_dept_id == deptDF.dept_id, "inner")
Group by Key
sparkContext.textFile("s3://../..")
.flatMap(lambda line: line.split())
.map(lambda word: (word,1))
.groupByKey()
map(lambda (w,counts) : (w, sum(counts)))
Reduce By Key
spaarkContext.textFile("s3://../..")
.flatMap(lambda line: line.split())
.map(lambda word: (word,1))
reduceByKey(lambda a,b : a+b)
Reduce by key will perform better because it will combine the results in the node before sending it over.
whereas GBK all the records will have to be moved to
its respective partitions in appropriate nodes.
We can not always apply ReduceByKey as ReduceByKey requires combining all the values into another value
with the exact same type.
aggregateByKey, foldByKey, combineByKey
The recommended number of partitions is around 3 or 4 times the number of CPUs in the cluster so that the work gets distributed more evenly among the CPUs.
spark.createDataFrame(vals, schema=schema)
from pyspark.sql.types import
schema = StructType([StructField("author", StringType(), False), StructField("title", StringType(), False), StructField("pages", IntegerType(), False)])
spark = (SparkSession
.builder
.appName("Example-3_6")
.getOrCreate())
file = "/databricks-datasets/learning-spark-v2/flights/summary-data/orc/*"
df = spark.read.format("orc").option("path", file).load()
df.show(10, False)
path = "/databricks-datasets/learning-spark-v2/cctvVideos/train_images/"
binary_files_df = (spark.read.format("binaryFile")
.option("pathGlobFilter", "*.jpg")
.load(path))
binary_files_df.show(5)
df.write.parquet(...)
df_nextdate = df_nextdate.withColumn('next_timestamp', when(df_nextdate.next_timestamp.isNull(),
to_timestamp(lit("2038-01-19 03:14:07")))
.otherwise(to_timestamp(df_nextdate.next_timestamp)))
df_gmsd = df_gmsd.withColumn('market_legacyname',
regexp_extract(concat_ws(',', 'attributes'), 'Market:([^,]+)',1))
df_lastrec = df_lastrec.withColumn('rnk', row_number()
.over(Window
.partitionBy(
"zipcode")
.orderBy(desc("update_id"))))
df_final = df_final.withColumn('end_date', lead('eventtimestamp_utc' , 1)
.over(Window
.partitionBy('zipcode','propertytype')
.orderBy('zipcode', 'propertytype', 'eventtimestamp_utc', desc('status'), 'ruleId')))
df_final = df_final.withColumn('md5val_prev', lag('md5val', 1, 0)
.over(Window
.partitionBy('zipcode', 'propertytype')
.orderBy('zipcode', 'propertytype','eventtimestamp_utc', 'status', 'ruleId')))