How can I use Spark Dataset with Thrift -
my data format defined apache thrift, code generated scrooge. store in spark using parquet, explained in blog.
i can read data dataframe easily, doing:
val df = sqlcontext.read.parquet("/path/to/data")
and can read in rdd bit more of gymnastics:
def loadrdd[v <: tbase[_, _]](inputdirectory: string, vclass: class[v]): rdd[v] = { implicit val ctagv: classtag[v] = classtag(vclass) parquetinputformat.setreadsupportclass(jobconf, classof[thriftreadsupport[v]]) parquetthriftinputformat.setthriftclass(jobconf, vclass) val rdd = sc.newapihadoopfile( inputdirectory, classof[parquetthriftinputformat[v]], classof[void], vclass, jobconf) rdd.asinstanceof[newhadooprdd[void, v]].values } loadrdd("/path/to/data", classof[mythriftclass])
my question is: how can access data in new dataset api released spark 1.6? reason want benefits of dataset api: type safety same speed of dataframe.
i understand sort of encoder needed, , provided primitive types , case classes, have thrift generated code (the java or scala one, either 1 fits bill), lot case class, not one.
i tried obvious options, did not work:
val df = sqlcontext.read.parquet("/path/to/data") df.as[myjavathriftclass] <console>:25: error: unable find encoder type stored in dataset. primitive types (int, string, etc) , product types (case classes) supported importing sqlcontext.implicits._ support serializing other types added in future releases. df.as[myscalathriftclass] scala.scalareflectionexception: <none> not term @ scala.reflect.api.symbols$symbolapi$class.asterm(symbols.scala:199) @ scala.reflect.internal.symbols$symbolcontextapiimpl.asterm(symbols.scala:84) @ org.apache.spark.sql.catalyst.scalareflection$.org$apache$spark$sql$catalyst$scalareflection$$extractorfor(scalareflection.scala:492) @ org.apache.spark.sql.catalyst.scalareflection$.extractorsfor(scalareflection.scala:394) @ org.apache.spark.sql.catalyst.encoders.expressionencoder$.apply(expressionencoder.scala:54) @ org.apache.spark.sql.sqlimplicits.newproductencoder(sqlimplicits.scala:41) ... 48 elided df.as[myscalathriftclass.immutable] java.lang.unsupportedoperationexception: no encoder found org.apache.thrift.protocol.tfield - field (class: "org.apache.thrift.protocol.tfield", name: "field") - array element class: "com.twitter.scrooge.tfieldblob" - field (class: "scala.collection.immutable.map", name: "_passthroughfields") - root class: "com.worldsense.scalathrift.thriftrange.immutable" @ org.apache.spark.sql.catalyst.scalareflection$.org$apache$spark$sql$catalyst$scalareflection$$extractorfor(scalareflection.scala:597) @ org.apache.spark.sql.catalyst.scalareflection$$anonfun$org$apache$spark$sql$catalyst$scalareflection$$extractorfor$1.apply(scalareflection.scala:509) @ org.apache.spark.sql.catalyst.scalareflection$$anonfun$org$apache$spark$sql$catalyst$scalareflection$$extractorfor$1.apply(scalareflection.scala:502) @ scala.collection.immutable.list.flatmap(list.scala:327) @ org.apache.spark.sql.catalyst.scalareflection$.org$apache$spark$sql$catalyst$scalareflection$$extractorfor(scalareflection.scala:502) @ org.apache.spark.sql.catalyst.scalareflection$.tocatalystarray$1(scalareflection.scala:419) @ org.apache.spark.sql.catalyst.scalareflection$.org$apache$spark$sql$catalyst$scalareflection$$extractorfor(scalareflection.scala:537) @ org.apache.spark.sql.catalyst.scalareflection$$anonfun$org$apache$spark$sql$catalyst$scalareflection$$extractorfor$1.apply(scalareflection.scala:509) @ org.apache.spark.sql.catalyst.scalareflection$$anonfun$org$apache$spark$sql$catalyst$scalareflection$$extractorfor$1.apply(scalareflection.scala:502) @ scala.collection.immutable.list.flatmap(list.scala:327) @ org.apache.spark.sql.catalyst.scalareflection$.org$apache$spark$sql$catalyst$scalareflection$$extractorfor(scalareflection.scala:502) @ org.apache.spark.sql.catalyst.scalareflection$.extractorsfor(scalareflection.scala:394) @ org.apache.spark.sql.catalyst.encoders.expressionencoder$.apply(expressionencoder.scala:54) @ org.apache.spark.sql.sqlimplicits.newproductencoder(sqlimplicits.scala:41) ... 48 elided
it seems shapeless works fine thrift generated code, , wondering if maybe can use generate current encoders api accept.
any hints?
Comments
Post a Comment