使用spark写数据到Hbase的三种方式 您所在的位置:网站首页 spark数据导入hbase 使用spark写数据到Hbase的三种方式

使用spark写数据到Hbase的三种方式

2023-08-14 05:45| 来源: 网络整理| 查看: 265

方式一:直接使用HBase Table的PUT方法

import org.apache.hadoop.hbase.{HBaseConfiguration, TableName} import org.apache.hadoop.hbase.client.{ConnectionFactory, Put, Table} import org.apache.hadoop.hbase.util.Bytes import org.apache.spark.{SparkConf, SparkContext} /** * Description: Use Put method of Hbase Client insert data into hbase in Spark-streaming. * * Author : Adore Chen * Created: 2017-12-22 */ object SparkPut { /** * insert 100,000 cost 20762 ms * * @param args */ def main(args: Array[String]): Unit = { val conf = new SparkConf().setAppName("SparkPut") val context = new SparkContext(conf) try { val rdd = context.makeRDD(1 to 100000, 4) // column family val family = Bytes.toBytes("cf") // column counter --> ctr val column = Bytes.toBytes("ctr") println("count is :" + rdd.count()) rdd.take(5).foreach(println) // mapPartition & foreachPartition // mapPartition is a lazy transformation, if no action, there is no result. // foreachPartition is an action rdd.foreachPartition(list => { val table = createTable() list.foreach(value => { val put = new Put(Bytes.toBytes(value)) put.addImmutable(family, column, Bytes.toBytes(value)) table.put(put) }) table.close() }) } finally { context.stop() } } /** * create Hbase Table interface. * * @return */ def createTable(): Table = { val hbaseConf = HBaseConfiguration.create() hbaseConf.set("hbase.zookeeper.quorum", "localhost") hbaseConf.set("hbase.zookeeper.property.clientPort", "2181") hbaseConf.set("hbase.defaults.for.version.skip", "true") val conn = ConnectionFactory.createConnection(hbaseConf) conn.getTable(TableName.valueOf("test_table")) } }

 

 

方式二:Put(List)

import org.apache.hadoop.hbase.{HBaseConfiguration, TableName} import org.apache.hadoop.hbase.client.{ConnectionFactory, Put} import org.apache.hadoop.hbase.util.Bytes import org.apache.spark.{SparkConf, SparkContext} import scala.collection.JavaConversions /** * Description: Use Mutator batch insert in spark context. * * Author : Adore Chen * Created: 2017-12-22 */ object SparkPutList { /** * Use mutator batch insert 100,000, mutator.mutator(Put) cost: 22369 * Use put list insert 100,000, cost: 25571 * Use put list by Map 100,000, cost: 21299 * * @param args */ def main(args: Array[String]): Unit = { // putByList() putByMap() } def putByMap(): Unit = { val conf = new SparkConf().setAppName(SparkPutList.getClass().getSimpleName()) val context = new SparkContext(conf) // column family val family = Bytes.toBytes("cf") // column counter --> ctr val column = Bytes.toBytes("ctr") try { val rdd = context.makeRDD(1 to 100000, 4) rdd.map(value => { val put = new Put(Bytes.toBytes(value)) put.addImmutable(family, column, Bytes.toBytes(value)) }).foreachPartition( itr => { val hbaseConf = HBaseConfiguration.create() val conn = ConnectionFactory.createConnection(hbaseConf) val table = conn.getTable(TableName.valueOf("test_table")) table.put(JavaConversions.seqAsJavaList(itr.toSeq)) table.close() }) } finally { context.stop() } } def putByList(): Unit = { val conf = new SparkConf().setAppName(SparkPutList.getClass().getSimpleName()) val context = new SparkContext(conf) // column family val family = Bytes.toBytes("cf") // column counter --> ctr val column = Bytes.toBytes("ctr") try { val rdd = context.makeRDD(1 to 100000, 4) rdd.foreachPartition(list => { val hbaseConf = HBaseConfiguration.create() val conn = ConnectionFactory.createConnection(hbaseConf) val table = conn.getTable(TableName.valueOf("test_table")) val putList = new java.util.LinkedList[Put]() list.foreach(value => { val put = new Put(Bytes.toBytes(value)) put.addImmutable(family, column, Bytes.toBytes(value)) putList.add(put) }) table.put(putList) table.close() }) } finally { context.stop() } } def putByMutator(): Unit = { val conf = new SparkConf().setAppName(SparkPutList.getClass().getSimpleName()) val context = new SparkContext(conf) // column family val family = Bytes.toBytes("cf") // column counter --> ctr val column = Bytes.toBytes("ctr") try { val rdd = context.makeRDD(1 to 100000, 4) rdd.foreachPartition(list => { val hbaseConf = HBaseConfiguration.create() val conn = ConnectionFactory.createConnection(hbaseConf) val mutator = conn.getBufferedMutator(TableName.valueOf("test_table")) list.foreach(value => { val put = new Put(Bytes.toBytes(value)) put.addImmutable(family, column, Bytes.toBytes(value)) mutator.mutate(put) }) mutator.close() }) } finally { context.stop() } } }

 

 

方式三: 使用map reduce job 写入Hbase

import org.apache.hadoop.hbase.HBaseConfiguration import org.apache.hadoop.hbase.client.Put import org.apache.hadoop.hbase.io.ImmutableBytesWritable import org.apache.hadoop.hbase.mapreduce.TableOutputFormat import org.apache.hadoop.hbase.util.Bytes import org.apache.hadoop.mapreduce.Job import org.apache.hadoop.mapreduce.lib.map.MultithreadedMapper import org.apache.spark.{SparkConf, SparkContext} /** * Description: Put data into Hbase by map reduce Job. * * Author : Adore Chen * Created: 2017-12-22 */ object SparkMapJob { /** * insert 100,000 cost 21035 ms * * @param args */ def main(args: Array[String]): Unit = { val conf = new SparkConf().setAppName("SparkPutByMap") val context = new SparkContext(conf) val hbaseConf =HBaseConfiguration.create() hbaseConf.set(TableOutputFormat.OUTPUT_TABLE, "test_table") //IMPORTANT: must set the attribute to solve the problem (can't create path from null string ) hbaseConf.set("mapreduce.output.fileoutputformat.outputdir", "/tmp") val job = Job.getInstance(hbaseConf) job.setOutputFormatClass(classOf[TableOutputFormat[ImmutableBytesWritable]]) job.setOutputKeyClass(classOf[ImmutableBytesWritable]) job.setOutputValueClass(classOf[Put]) try{ val rdd = context.makeRDD(1 to 100000) // column family val family = Bytes.toBytes("cf") // column counter --> ctr val column = Bytes.toBytes("ctr") rdd.map(value => { var put = new Put(Bytes.toBytes(value)) put.addImmutable(family, column, Bytes.toBytes(value)) (new ImmutableBytesWritable(), put) }) .saveAsNewAPIHadoopDataset(job.getConfiguration) }finally{ context.stop() } } }

 

 

测试环境:

 

ext{ sparkVersion = '2.3.0' } dependencies { compile 'org.slf4j:slf4j-api:1.7.25' compile 'org.apache.logging.log4j:log4j-api:2.11.0' compile 'org.apache.logging.log4j:log4j-core:2.11.0' // spark related compile "org.apache.spark:spark-core_2.11:${sparkVersion}" compile "org.apache.spark:spark-streaming_2.11:${sparkVersion}" compile "org.apache.spark:spark-sql_2.11:${sparkVersion}" compile "org.apache.spark:spark-streaming-kafka-0-10_2.11:${sparkVersion}" // hbase related compile 'org.apache.hbase:hbase-client:1.3.1' compile 'org.apache.hbase:hbase-server:1.3.1' // redis related compile 'redis.clients:jedis:2.9.0' // mysql connector compile 'mysql:mysql-connector-java:5.1.46' // hive jdbc compile 'org.apache.hive:hive-jdbc:2.3.3' compile 'org.apache.logging.log4j:log4j:2.11.0' compile 'org.apache.avro:avro:1.8.2' compile 'org.apache.avro:avro-tools:1.8.2' testCompile 'junit:junit:4.12' }

 

关于使用场景的说明:

第一二种场景,主要是独立使用HBase时候使用。

第三种场景,和spark、flink等集成时使用。

注意:代码中仅为显示使用方法,HBase Connection最好使用单例模式和Spark、Flink集成。 

参考HBaseUtil里的代码: 

https://blog.csdn.net/adorechen/article/details/106058924



【本文地址】

公司简介

联系我们

今日新闻

    推荐新闻

    专题文章
      CopyRight 2018-2019 实验室设备网 版权所有