scala - Broadcast not happening while joining dataframes in Spark 1.6 -
below sample code running. when spark job runs, dataframe joins happening using sortmergejoin instead of broadcastjoin.
def joineddf (sqlcontext: sqlcontext, txntable: dataframe, countriesdfbroadcast: broadcast[dataframe]): dataframe = { txntable.as("df1").join((countriesdfbroadcast.value).withcolumnrenamed("cntry_id", "dw_cntry_id").as("countries"), $"df1.user_cntry_id" === $"countries.dw_cntry_id", "inner") } joineddf(sqlcontext, txntable, countriesdfbroadcast).write.parquet("temp")
the broadcastjoin not happening when specify broadcast() hint in join statement.
the optimizer hashpartitioning dataframe , causing data skew.
has seen behavior?
i running on yarn using spark 1.6 , hivecontext sqlcontext. spark job runs on 200 executors. , data size of txntable 240gb , datasize of countriesdf 5mb.
both way how broadcast dataframe
, how access incorrect.
standard broadcasts cannot used handle distributed data structures. if want perform broadcast join on
dataframe
should usebroadcast
functions marks givendataframe
broadcasting:import org.apache.spark.sql.functions.broadcast val countriesdf: dataframe = ??? val tmp: dataframe = broadcast( countriesdf.withcolumnrenamed("cntry_id", "dw_cntry_id").as("countries") ) txntable.as("df1").join( broadcast(tmp), $"df1.user_cntry_id" === $"countries.dw_cntry_id", "inner")
internally
collect
tmp
without converting internal , broadcast afterwards.join arguments eagerly evaluated. possible use
sparkcontext.broadcast
distributed data structure broadcast value evaluated locally beforejoin
called. thats' why function work @ doesn't perform broadcast join.
Comments
Post a Comment