python - saving pyspark rdd to hbase raises attribute error -
i trying write spark rdd using pyspark hbase table. rdd looks following using print rdd.take(rdd.count()) command
[decimal('0.39326837'), decimal('0.03643601'), decimal('0.06031798'), decimal('0.08885452')]
when try write rdd hbase table using function saverecord
def saverecord(tx_fee_rdd): host = 'localhost' #sys.argv[1] table = 'tx_fee_table' #needs created before hand in hbase shell conf = {"hbase.zookeeper.quorum": host, "hbase.mapred.outputtable": table, "mapreduce.outputformat.class": "org.apache.hadoop.hbase.mapreduce.tableoutputformat", "mapreduce.job.output.key.class": "org.apache.hadoop.hbase.io.immutablebyteswritable", "mapreduce.job.output.value.class": "org.apache.hadoop.io.writable"} keyconv = "org.apache.spark.examples.pythonconverters.stringtoimmutablebyteswritableconverter" valueconv = "org.apache.spark.examples.pythonconverters.stringlisttoputconverter" #row key id,id, cfamily=tx_fee_col,column_name = tx_fee, column_value=x datamap = tx_fee_rdd.map(lambda x: ("tx_fee_col","tx_fee",x ) ) datamap.saveasnewapihadoopdataset(conf=conf,keyconverter=keyconv,valueconverter=valueconv) tx_fee_rdd.foreach(saverecord)
i following error
attributeerror: 'decimal' object has no attribute 'map'
how deal ?
following @zeros323 suggestions, getting following error
traceback (most recent call last): file "/home/ubuntu/unix_practice/bcrpc/bitcoin-inspector-webserver/bitcoin/bctxfee_text3.py", line 66, in <module> saverecord(tx_fee_rdd) file "/home/ubuntu/unix_practice/bcrpc/bitcoin-inspector-webserver/bitcoin/bctxfee_text3.py", line 29, in saverecord datamap.saveasnewapihadoopdataset(conf=conf,keyconverter=keyconv,valueconverter=valueconv) file "/usr/local/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 1348, in saveasnewapihadoopdataset file "/usr/local/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py", line 538, in __call__ file "/usr/local/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py", line 300, in get_return_value py4j.protocol.py4jjavaerror: error occurred while calling z:org.apache.spark.api.python.pythonrdd.saveashadoopdataset. : org.apache.spark.sparkexception: rdd element of type [ljava.lang.object; cannot used @ org.apache.spark.api.python.serdeutil$.pythontopairrdd(serdeutil.scala:237) @ org.apache.spark.api.python.pythonrdd$.saveashadoopdataset(pythonrdd.scala:801) @ org.apache.spark.api.python.pythonrdd.saveashadoopdataset(pythonrdd.scala) @ sun.reflect.nativemethodaccessorimpl.invoke0(native method) @ sun.reflect.nativemethodaccessorimpl.invoke(nativemethodaccessorimpl.java:62) @ sun.reflect.delegatingmethodaccessorimpl.invoke(delegatingmethodaccessorimpl.java:43) @ java.lang.reflect.method.invoke(method.java:498) @ py4j.reflection.methodinvoker.invoke(methodinvoker.java:231) @ py4j.reflection.reflectionengine.invoke(reflectionengine.java:379) @ py4j.gateway.invoke(gateway.java:259) @ py4j.commands.abstractcommand.invokemethod(abstractcommand.java:133) @ py4j.commands.callcommand.execute(callcommand.java:79) @ py4j.gatewayconnection.run(gatewayconnection.java:207) @ java.lang.thread.run(thread.java:745)
foreach
operates on individual records hence receives decimal
objects not rdd
s. cannot map on these not mention use saveasnewapihadoopdataset
method.
if want use saveasnewapihadoopdataset
function should operate directly on rdd
:
saverecord(tx_fee_rdd)
another possible issue following part:
datamap = tx_fee_rdd.map(lambda x: ("tx_fee_col","tx_fee",x ) )
saveasnewapihadoopdataset
expect pairs not triplets. may not work decimal
objects. see hbase_outputformat.py
example details.
Comments
Post a Comment