13 Comments
Jul 30, 2023Liked by Aurimas Griciūnas

hello, have some questions:-

1) Rule of thumb: set the number of Cores to be two times less than files being read. This way you are very likely to avoid idling cores due to file size skew. Adjust according to your situation.

so let us consider a situation in which 1 have to read 196 files and file size is of 800 mb each and is gz file.

I use spark RDD to read all files and on EMRServerless

so total cores should be less than 98? right as these files are not splittable.

But lets say I read and repartition them to 2000 partitions so increase parallelism in further stages. but if my cores are limited then all next set of actions (stages or tasks) would be slow, right?

Expand full comment
author

If you know that all files are of the same size, e.g. 800 mb, you can safely go with 196 cores as the read time will be pretty much equal for all files. The rule of thumb can be applied if you are not sure about the distribution of file sizes.

2000 partitions for 98 cores is still somewhat ok I would say, it also really depends on how much compute you can afford, naturally, having somewhere around 500 cores would be ideal, but this would require a huge cluster.

Expand full comment

Here I have a question , lets say if partitions are of equal size and I do around 500 cores then would it result in higher cost?

Expand full comment
Jul 30, 2023Liked by Aurimas Griciūnas

Hello, great post but have couple of questions:-

1) Why we havent made any reference to number of spark executors.

2) there are certain files such as .gz , when we use RDD to read files then they are not splittable , so how things change in that scenario.

Expand full comment
author

I would usually suggest having ~3-4 cores per executor. Going more than 5 will reduce the performance due to I/O bottleneck (that has been shown). Number of executors can then be naturally calculated given the size of the cluster you are running on.

Non splittable file can only be read in whole. The remaining rules remain, but you can set how the executor partitions the file in memory, meaning that even if you read a non-splittable file of 800mb, you can chunk it into 128 mb peaces while being read.

Expand full comment

Great post. Actually, we can also consider the partitioning as a special type of bucketing, i.e. the modulo is the cardinality of the partition column. So naturally, if the cardinality is big, bucketing will be a better choice since there will be less small partition files.

Regarding spark.default.parallelism and spark.sql.shuffle.partitions, they are quite close to each other and a little bit confusing. Could you talk a little bit more in the future? Thanks in advance.

Expand full comment
author
Jul 3, 2023·edited Jul 3, 2023Author

Sure, I will cover spark.default.parallelism and spark.sql.shuffle.partitions as a passage in the part 2 then (the article series might even have 3 parts when I see how many topics to cover there are). If you have any other topics that you would like to see covered or questions answered, let me know.

When it comes to using bucketing instead of partitioning you should also think about the fact that you might want to have an ability to visually organise files on disk for later movement outside of Spark framework, partitioning allows for that.

When it comes to the column on which you partition/bucket, there should be not much impact on the resulting storage needed as bucketed files would only have a single uniques value for that column and RLE should take care of compressing it to almost nothing. Which is good news :)

Expand full comment

hello , regarding keeping cores per executor to 5. Can you please tell use case where high cores per cpu will lead to better results, I have seen in compute intensive operations keeping high cores will lead to better results.

also pls refer https://aws.amazon.com/blogs/big-data/amazon-emr-serverless-supports-larger-worker-sizes-to-run-more-compute-and-memory-intensive-workloads/

and let me know your comments

Expand full comment

Aurimas Griciūnas any coments on this , i have same doubt

Expand full comment

Great post !! however , I read in stackoverflow that spark.sql.shuffle.partitions applies only to joins and shuffles and not while writing data to disk using dataframe writer. Could you please comment ? Thanks

Expand full comment

Great post !! however , I read in stackoverflow that spark.sql.shuffle.partitions applies only to joins and shuffles and not while writing data to disk using dataframe writer. Could you please comment ? Thanks

Expand full comment

you havent talked about spark.dynamicallocation - I believe this is one of key parameter for cost optimization

Expand full comment

another scenario for

1) Rule of thumb: set the number of Cores to be two times less than files being read. This way you are very likely to avoid idling cores due to file size skew. Adjust according to your situation. - Now the file size is ranging from 200 mb to 800 mb and all are non splittable file total 196 files , so in this case core size should be 98?

Expand full comment