How can I use Spark Dataset with Thrift -


my data format defined apache thrift, code generated scrooge. store in spark using parquet, explained in blog.

i can read data dataframe easily, doing:

val df = sqlcontext.read.parquet("/path/to/data") 

and can read in rdd bit more of gymnastics:

def loadrdd[v <: tbase[_, _]](inputdirectory: string, vclass: class[v]): rdd[v] = {     implicit val ctagv: classtag[v] = classtag(vclass)     parquetinputformat.setreadsupportclass(jobconf, classof[thriftreadsupport[v]])     parquetthriftinputformat.setthriftclass(jobconf, vclass)     val rdd = sc.newapihadoopfile(       inputdirectory, classof[parquetthriftinputformat[v]], classof[void], vclass, jobconf)     rdd.asinstanceof[newhadooprdd[void, v]].values   } loadrdd("/path/to/data", classof[mythriftclass]) 

my question is: how can access data in new dataset api released spark 1.6? reason want benefits of dataset api: type safety same speed of dataframe.

i understand sort of encoder needed, , provided primitive types , case classes, have thrift generated code (the java or scala one, either 1 fits bill), lot case class, not one.

i tried obvious options, did not work:

val df = sqlcontext.read.parquet("/path/to/data")  df.as[myjavathriftclass]  <console>:25: error: unable find encoder type stored in dataset.  primitive types (int, string, etc) , product types (case classes) supported importing sqlcontext.implicits._  support serializing other types added in future releases.  df.as[myscalathriftclass]  scala.scalareflectionexception: <none> not term   @ scala.reflect.api.symbols$symbolapi$class.asterm(symbols.scala:199)   @ scala.reflect.internal.symbols$symbolcontextapiimpl.asterm(symbols.scala:84)   @ org.apache.spark.sql.catalyst.scalareflection$.org$apache$spark$sql$catalyst$scalareflection$$extractorfor(scalareflection.scala:492)   @ org.apache.spark.sql.catalyst.scalareflection$.extractorsfor(scalareflection.scala:394)   @ org.apache.spark.sql.catalyst.encoders.expressionencoder$.apply(expressionencoder.scala:54)   @ org.apache.spark.sql.sqlimplicits.newproductencoder(sqlimplicits.scala:41)   ... 48 elided   df.as[myscalathriftclass.immutable]  java.lang.unsupportedoperationexception: no encoder found org.apache.thrift.protocol.tfield - field (class: "org.apache.thrift.protocol.tfield", name: "field") - array element class: "com.twitter.scrooge.tfieldblob" - field (class: "scala.collection.immutable.map", name: "_passthroughfields") - root class: "com.worldsense.scalathrift.thriftrange.immutable"   @ org.apache.spark.sql.catalyst.scalareflection$.org$apache$spark$sql$catalyst$scalareflection$$extractorfor(scalareflection.scala:597)   @ org.apache.spark.sql.catalyst.scalareflection$$anonfun$org$apache$spark$sql$catalyst$scalareflection$$extractorfor$1.apply(scalareflection.scala:509)   @ org.apache.spark.sql.catalyst.scalareflection$$anonfun$org$apache$spark$sql$catalyst$scalareflection$$extractorfor$1.apply(scalareflection.scala:502)   @ scala.collection.immutable.list.flatmap(list.scala:327)   @ org.apache.spark.sql.catalyst.scalareflection$.org$apache$spark$sql$catalyst$scalareflection$$extractorfor(scalareflection.scala:502)   @ org.apache.spark.sql.catalyst.scalareflection$.tocatalystarray$1(scalareflection.scala:419)   @ org.apache.spark.sql.catalyst.scalareflection$.org$apache$spark$sql$catalyst$scalareflection$$extractorfor(scalareflection.scala:537)   @ org.apache.spark.sql.catalyst.scalareflection$$anonfun$org$apache$spark$sql$catalyst$scalareflection$$extractorfor$1.apply(scalareflection.scala:509)   @ org.apache.spark.sql.catalyst.scalareflection$$anonfun$org$apache$spark$sql$catalyst$scalareflection$$extractorfor$1.apply(scalareflection.scala:502)   @ scala.collection.immutable.list.flatmap(list.scala:327)   @ org.apache.spark.sql.catalyst.scalareflection$.org$apache$spark$sql$catalyst$scalareflection$$extractorfor(scalareflection.scala:502)   @ org.apache.spark.sql.catalyst.scalareflection$.extractorsfor(scalareflection.scala:394)   @ org.apache.spark.sql.catalyst.encoders.expressionencoder$.apply(expressionencoder.scala:54)   @ org.apache.spark.sql.sqlimplicits.newproductencoder(sqlimplicits.scala:41)   ... 48 elided 

it seems shapeless works fine thrift generated code, , wondering if maybe can use generate current encoders api accept.

any hints?


Comments

Popular posts from this blog

sublimetext3 - what keyboard shortcut is to comment/uncomment for this script tag in sublime -

java - No use of nillable="0" in SOAP Webservice -

ubuntu - Laravel 5.2 quickstart guide gives Not Found Error -