Spark UDF exception when accessing broadcast variable -
i'm having difficulty accessing scala.collection.immutable.map
inside spark udf.
i'm broadcasting map
val browserlangmap = sc.broadcast (source.fromfile(browserlangfilepath).getlines.map(_.split(,)).map(e => (e(0).toint,e(1))).tomap)
creating udf access map
def addbrowsercode = udf((browserlang:int) => if(browserlangmap.value.contains(browserlang)) browserlangmap.value(browserlang) else "")`
using udf add new column
val joineddf = rawdf.join(broadcast(geodf).as("geo"), $"start_ip" === $"geo.start_ip_num", "left_outer") .withcolumn("browser_code", addbrowsercode($"browser_language")) .selectexpr(getselectquery:_*)
full stack trace --> https://www.dropbox.com/s/p1d5322fo9cxro6/stack_trace.txt?dl=0
org.apache.spark.sparkexception: task not serializable @ org.apache.spark.util.closurecleaner$.ensureserializable(closurecleaner.scala:304) @ org.apache.spark.util.closurecleaner$.org$apache$spark$util$closurecleaner$$clean(closurecleaner.scala:294) @ org.apache.spark.util.closurecleaner$.clean(closurecleaner.scala:122) @ org.apache.spark.sparkcontext.clean(sparkcontext.scala:2055) @ org.apache.spark.sparkcontext.runjob(sparkcontext.scala:1857) @ org.apache.spark.deploy.sparksubmit.main(sparksubmit.scala) caused by: java.io.notserializableexception: $iwc$$iwc$$iwc$$iwc$$iwc$$iwc$$iwc$$iwc$$iwc$$iwc$$iwc$$iwc$$iwc$$iwc$$iwc$$iwc$$iwc$$iwc$$iwc$$iwc$metadataschema$ serialization stack: - object not serializable (class: $iwc$$iwc$$iwc$$iwc$$iwc$$iwc$$iwc$$iwc$$iwc$$iwc$$iwc$$iwc$$iwc$$iwc$$iwc$$iwc$$iwc$$iwc$$iwc$$iwc$metadataschema$, value: $iwc$$iwc$$iwc$$iwc$$iwc$$iwc$$iwc$$iwc$$iwc$$iwc$$iwc$$iwc$$iwc$$iwc$$iwc$$iwc$$iwc$$iwc$$iwc$$iwc$metadataschema$@30b4ba52) - field (class: $iwc$$iwc$$iwc$$iwc$$iwc$$iwc$$iwc$$iwc$$iwc$$iwc$$iwc$$iwc$$iwc$$iwc$$iwc$$iwc$$iwc$$iwc$$iwc$$iwc, name: metadataschema$module, type: class $iwc$$iwc$$iwc$$iwc$$iwc$$iwc$$iwc$$iwc$$iwc$$iwc$$iwc$$iwc$$iwc$$iwc$$iwc$$iwc$$iwc$$iwc$$iwc$$iwc$metadataschema$) - object (class org.apache.spark.sql.catalyst.expressions.scalaudf, udf(browser_language#235)) - field (class: org.apache.spark.sql.catalyst.expressions.if, name: falsevalue, type: class org.apache.spark.sql.catalyst.expressions.expression) - object (class org.apache.spark.sql.catalyst.expressions.if, if (isnull(browser_language#235)) null else udf(browser_language#235)) - field (class: org.apache.spark.sql.catalyst.expressions.alias, name: child, type: class org.apache.spark.sql.catalyst.expressions.expression) - object (class org.apache.spark.sql.catalyst.expressions.alias, if (isnull(browser_language#235)) null else udf(browser_language#235) browser_language#507) - object (class org.apache.spark.onetoonedependency, org.apache.spark.onetoonedependency@5ae38c4e) - writeobject data (class: scala.collection.immutable.$colon$colon) - object (class scala.collection.immutable.$colon$colon, list(org.apache.spark.onetoonedependency@5ae38c4e)) - field (class: org.apache.spark.rdd.rdd, name: org$apache$spark$rdd$rdd$$dependencies_, type: interface scala.collection.seq) @ org.apache.spark.util.closurecleaner$.ensureserializable(closurecleaner.scala:301) ... 80 more
i know access broadcast map causing this. when remove reference in udf there no exception.
def addbrowsercode = udf((browserlang:int) => browserlang.tostring()) //test udf without accessing broadcast map , works
spark version 1.6
i found strange behavior ":paste" in spark shell. happens when paste entire code in single multi-line paste :paste.
the same code works if paste broadcast , udf creation first , paste join+savetofile in separate :paste.
may scala shell issue. don't know.
Comments
Post a Comment