Article Directory
- Spark Sql DataFrame DataSet:
- 1. Reduce small files
- Under local mode access cluster resources
- on hive
- optimization
- 5. Memory optimization
- 6. Parameters and partition control
- optimization
- Spark Streaming:
- How to ensure that spark streaming data is not lost
- Solve thread safety issues
Spark Sql DataFrame DataSet:
1. Reduce small files
usecoalesce() operator, reduce partition
inserthiveTable Select the insert mode, insertto compatible hive matches the hive table according to the field order. saveastable incompatible hive matches based on field name
2.windowsUnder local mode access cluster resources
, , , put it in the resource source package
HA Support:
val sparkSession = SparkSession.builder().config(sparkConf).enableHiveSupport().getOrCreate()
val ssc = sparkSession.sparkContext
ssc.hadoopConfiguration.set("", "hdfs://mycluster")
ssc.hadoopConfiguration.set("", "mycluster")
- 1
- 2
- 3
- 4
on hive
Just move the configuration file to spark conf
optimization
Join is divided into three types
(1) Hash join The old version of spark is used, and the conditions are harsh. Spark has been deprecated now
(2) broadcast hasst join is suitable for small tables join large tables. Only small tables can be broadcast
(3) sortmerge join spark is the default join big table join big table
Broadcast join API method
Lead packet import org.apache.._
broadcast(table).join(table)
spark sql method
By controlling the default 10mb, that is, tables less than or equal to 10mb are all small tables
5.Memory optimization
Spark memory is divided into two pieces
(1) Storage memory: used to store cache and cache data
(2) shuffle(execution) memory: used to calculate join groupby reduce
It was statically managed before 1.6, and dynamically managed in 1.6. Just allocate the executor memory to sufficiently allocate the executor memory.
rdd dataset/dataframe
Example: rdd java serialization cache default cache 1.7G
rdd kryo serialization and use ser to serialize the cache cache size drops to 269.6 MB
dataset default cache 34.2mb
dataset ser serialization cache 33.8 MB
Spark UI interface storage tab can monitor storage memory size
6. Parameters and partition control
The number of partitions must be 2 to 3 times the number of CPUs, in order to make full use of CPUs
spark-submit --master yarn --deploy-mode client --driver-memory 1g --num-executors 2 --executor-cores 2 --executor-memory 2g --class --queue spark com_atguigu_sparkstreaming-1.
–num-executors 2 * --executor-cores 2 is the total number of CPUs, so the number of spark partitions must be 2 times or 3 times this number, so it is the optimal
Control the amount of data pulled by reduce to map side
Control the shuffle file size
Optimization effect: 5%
optimization
Generate data skew solution
(1) Break up the large table and expand the size of the small table Experimental results: Although the data skew is solved, it is even more time-consuming
(2) Broadcast join local join can solve the problem of data tilt, which is also commonly used in spark optimization
Big table join big table optimization
smb join is optimized for the join of the bucket. The sorting time must be large. Otherwise, a large number of small files will be generated.
Spark Streaming:
- I won't use it because it will cause a large number of small files, one file per partition
- The default number of partitions in streaming is the same as the number of partitions in topic.
- It is a parameter that must be set in development. Control how many pieces of spark streaming consume per second. 1,000 to 3,000 pieces of data are OK.
- It's based on checkpoint, so it won't be used
How to ensure that spark streaming data is not lost
- 1. Manual submission of offset settings
- 2. Control the order of sequence and ensure that the submission of offsets is after processing business data.
stream.foreachRDD(rdd => {
val sqlProxy = new SqlProxy()
val client = DataSourceUtil.getConnection
try {
val offsetRanges: Array[OffsetRange] = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
for (or <- offsetRanges) {
sqlProxy.executeUpdate(client, "replace into `offset_manager` (groupid,topic,`partition`,untilOffset) values(?,?,?,?)",
Array(groupid, or.topic, or.partition.toString, or.untilOffset))
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
-
When streaming is initialized, gomysqlQuery the offset. If there is an offset in mysql, it is based on the offset. If there is no offset in mysql, it is proved to be the first time it is started.
Back pressure Solve the problem of spark streaming decompression, dynamically pull data, the upper limit value is determined by this parameter
Elegantly close. After you receive the kill command after you turn it on, you will not kill immediately, but will wait until the current batch of data is processed without any problems before killing it. -
spark streaming operation database:
resultDStream.foreachRDD(rdd => {
rdd.foreachPartition(partition => {
//Create a connection to the database
partition.foreach( //Use connection)
//Return to partition Close the connection
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
Solve thread safety issues
-
1. Add distributed lock zkredisAll is OK, but adding it will definitely affect efficiency
-
Or groupby groupbykey aggregates the same key to the same partition first. Then the query requests issued by the same key will only be once.
-
When submitting spark submit --num-executors 5 --executor-core 2 Multiplying two parameters equals the number of partitions cpu is 1:1 than the number of partitions on the previous one. Then spark streaming runs the fastest speed. Spark streaming will not deliberately increase or decrease partitions in spark streaming, because increasing will shuffle and reduce partitionsparallelThe degree is reduced, so I won't use it deliberately