python - saving pyspark rdd to hbase raises attribute error -


i trying write spark rdd using pyspark hbase table. rdd looks following using print rdd.take(rdd.count()) command

[decimal('0.39326837'), decimal('0.03643601'), decimal('0.06031798'), decimal('0.08885452')] 

when try write rdd hbase table using function saverecord

def saverecord(tx_fee_rdd):       host = 'localhost'  #sys.argv[1]       table = 'tx_fee_table'  #needs created before hand in hbase shell       conf = {"hbase.zookeeper.quorum": host,             "hbase.mapred.outputtable": table,             "mapreduce.outputformat.class": "org.apache.hadoop.hbase.mapreduce.tableoutputformat",             "mapreduce.job.output.key.class": "org.apache.hadoop.hbase.io.immutablebyteswritable",             "mapreduce.job.output.value.class": "org.apache.hadoop.io.writable"}     keyconv = "org.apache.spark.examples.pythonconverters.stringtoimmutablebyteswritableconverter"     valueconv = "org.apache.spark.examples.pythonconverters.stringlisttoputconverter"     #row key id,id, cfamily=tx_fee_col,column_name = tx_fee, column_value=x      datamap = tx_fee_rdd.map(lambda x: ("tx_fee_col","tx_fee",x ) )       datamap.saveasnewapihadoopdataset(conf=conf,keyconverter=keyconv,valueconverter=valueconv)     tx_fee_rdd.foreach(saverecord) 

i following error

attributeerror: 'decimal' object has no attribute 'map' 

how deal ?

following @zeros323 suggestions, getting following error

traceback (most recent call last):   file "/home/ubuntu/unix_practice/bcrpc/bitcoin-inspector-webserver/bitcoin/bctxfee_text3.py", line 66, in <module>     saverecord(tx_fee_rdd)   file "/home/ubuntu/unix_practice/bcrpc/bitcoin-inspector-webserver/bitcoin/bctxfee_text3.py", line 29, in saverecord     datamap.saveasnewapihadoopdataset(conf=conf,keyconverter=keyconv,valueconverter=valueconv)     file "/usr/local/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 1348, in saveasnewapihadoopdataset   file "/usr/local/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py", line 538, in __call__   file "/usr/local/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py", line 300, in get_return_value py4j.protocol.py4jjavaerror: error occurred while calling z:org.apache.spark.api.python.pythonrdd.saveashadoopdataset. : org.apache.spark.sparkexception: rdd element of type [ljava.lang.object; cannot used     @ org.apache.spark.api.python.serdeutil$.pythontopairrdd(serdeutil.scala:237)     @ org.apache.spark.api.python.pythonrdd$.saveashadoopdataset(pythonrdd.scala:801)     @ org.apache.spark.api.python.pythonrdd.saveashadoopdataset(pythonrdd.scala)     @ sun.reflect.nativemethodaccessorimpl.invoke0(native method)     @ sun.reflect.nativemethodaccessorimpl.invoke(nativemethodaccessorimpl.java:62)     @ sun.reflect.delegatingmethodaccessorimpl.invoke(delegatingmethodaccessorimpl.java:43)     @ java.lang.reflect.method.invoke(method.java:498)     @ py4j.reflection.methodinvoker.invoke(methodinvoker.java:231)     @ py4j.reflection.reflectionengine.invoke(reflectionengine.java:379)     @ py4j.gateway.invoke(gateway.java:259)     @ py4j.commands.abstractcommand.invokemethod(abstractcommand.java:133)     @ py4j.commands.callcommand.execute(callcommand.java:79)     @ py4j.gatewayconnection.run(gatewayconnection.java:207)     @ java.lang.thread.run(thread.java:745) 

foreach operates on individual records hence receives decimal objects not rdds. cannot map on these not mention use saveasnewapihadoopdataset method.

if want use saveasnewapihadoopdataset function should operate directly on rdd:

saverecord(tx_fee_rdd) 

another possible issue following part:

datamap = tx_fee_rdd.map(lambda x: ("tx_fee_col","tx_fee",x ) )   

saveasnewapihadoopdataset expect pairs not triplets. may not work decimal objects. see hbase_outputformat.py example details.


Comments

Popular posts from this blog

sublimetext3 - what keyboard shortcut is to comment/uncomment for this script tag in sublime -

java - No use of nillable="0" in SOAP Webservice -

ubuntu - Laravel 5.2 quickstart guide gives Not Found Error -