python - saving pyspark rdd to hbase raises attribute error -
i trying write spark rdd using pyspark hbase table. rdd looks following using print rdd.take(rdd.count()) command
[decimal('0.39326837'), decimal('0.03643601'), decimal('0.06031798'), decimal('0.08885452')] when try write rdd hbase table using function saverecord
def saverecord(tx_fee_rdd): host = 'localhost' #sys.argv[1] table = 'tx_fee_table' #needs created before hand in hbase shell conf = {"hbase.zookeeper.quorum": host, "hbase.mapred.outputtable": table, "mapreduce.outputformat.class": "org.apache.hadoop.hbase.mapreduce.tableoutputformat", "mapreduce.job.output.key.class": "org.apache.hadoop.hbase.io.immutablebyteswritable", "mapreduce.job.output.value.class": "org.apache.hadoop.io.writable"} keyconv = "org.apache.spark.examples.pythonconverters.stringtoimmutablebyteswritableconverter" valueconv = "org.apache.spark.examples.pythonconverters.stringlisttoputconverter" #row key id,id, cfamily=tx_fee_col,column_name = tx_fee, column_value=x datamap = tx_fee_rdd.map(lambda x: ("tx_fee_col","tx_fee",x ) ) datamap.saveasnewapihadoopdataset(conf=conf,keyconverter=keyconv,valueconverter=valueconv) tx_fee_rdd.foreach(saverecord) i following error
attributeerror: 'decimal' object has no attribute 'map' how deal ?
following @zeros323 suggestions, getting following error
traceback (most recent call last): file "/home/ubuntu/unix_practice/bcrpc/bitcoin-inspector-webserver/bitcoin/bctxfee_text3.py", line 66, in <module> saverecord(tx_fee_rdd) file "/home/ubuntu/unix_practice/bcrpc/bitcoin-inspector-webserver/bitcoin/bctxfee_text3.py", line 29, in saverecord datamap.saveasnewapihadoopdataset(conf=conf,keyconverter=keyconv,valueconverter=valueconv) file "/usr/local/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 1348, in saveasnewapihadoopdataset file "/usr/local/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py", line 538, in __call__ file "/usr/local/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py", line 300, in get_return_value py4j.protocol.py4jjavaerror: error occurred while calling z:org.apache.spark.api.python.pythonrdd.saveashadoopdataset. : org.apache.spark.sparkexception: rdd element of type [ljava.lang.object; cannot used @ org.apache.spark.api.python.serdeutil$.pythontopairrdd(serdeutil.scala:237) @ org.apache.spark.api.python.pythonrdd$.saveashadoopdataset(pythonrdd.scala:801) @ org.apache.spark.api.python.pythonrdd.saveashadoopdataset(pythonrdd.scala) @ sun.reflect.nativemethodaccessorimpl.invoke0(native method) @ sun.reflect.nativemethodaccessorimpl.invoke(nativemethodaccessorimpl.java:62) @ sun.reflect.delegatingmethodaccessorimpl.invoke(delegatingmethodaccessorimpl.java:43) @ java.lang.reflect.method.invoke(method.java:498) @ py4j.reflection.methodinvoker.invoke(methodinvoker.java:231) @ py4j.reflection.reflectionengine.invoke(reflectionengine.java:379) @ py4j.gateway.invoke(gateway.java:259) @ py4j.commands.abstractcommand.invokemethod(abstractcommand.java:133) @ py4j.commands.callcommand.execute(callcommand.java:79) @ py4j.gatewayconnection.run(gatewayconnection.java:207) @ java.lang.thread.run(thread.java:745)
foreach operates on individual records hence receives decimal objects not rdds. cannot map on these not mention use saveasnewapihadoopdataset method.
if want use saveasnewapihadoopdataset function should operate directly on rdd:
saverecord(tx_fee_rdd) another possible issue following part:
datamap = tx_fee_rdd.map(lambda x: ("tx_fee_col","tx_fee",x ) ) saveasnewapihadoopdataset expect pairs not triplets. may not work decimal objects. see hbase_outputformat.py example details.
Comments
Post a Comment