博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Hbase Client 使用示例
阅读量:4697 次
发布时间:2019-06-09

本文共 3448 字,大约阅读时间需要 11 分钟。

这两天在读淘宝开源出来的DataX,想模仿它写一个离线数据交换组件。读了它读写Hbase的插件的源代码,觉得写得确实比我之前写得好。整理出来,放在这里,向优秀代码学习。关键的地方时它在处理异常的时候考虑的比我周全很多。

先是写Hbase的代码:

/** * (C) 2010-2011 Alibaba Group Holding Limited. * * This program is free software; you can redistribute it and/or * modify it under the terms of the GNU General Public License  * version 2 as published by the Free Software Foundation.  *  */package com.taobao.datax.plugins.writer.hbasewriter;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.hbase.HTableDescriptor;import org.apache.hadoop.hbase.client.*;import org.apache.hadoop.hbase.util.Bytes;import java.io.IOException;import java.util.ArrayList;import java.util.List;public class HBaseProxy {	private Configuration config;	private HTable htable;	private HBaseAdmin admin;	private HTableDescriptor descriptor;		private static final int BUFFER_LINE = 1024;		private List
buffer = new ArrayList
(BUFFER_LINE); private Put p; public static HBaseProxy newProxy(String hbase_conf, String table) throws IOException { return new HBaseProxy(hbase_conf, table); } private HBaseProxy(String hbase_conf, String tableName) throws IOException { Configuration conf = new Configuration(); conf.addResource(new Path(hbase_conf)); config = new Configuration(conf); htable = new HTable(config, tableName); admin = new HBaseAdmin(config); descriptor = htable.getTableDescriptor(); } public void setBufferSize(int bufferSize) throws IOException { this.htable.setWriteBufferSize(bufferSize); } public void setHTable(String tableName) throws IOException { this.htable = new HTable(config, tableName); } public void setAutoFlush(boolean autoflush) { this.htable.setAutoFlush(autoflush); } public boolean check() throws IOException { if (!admin.isMasterRunning()) { throw new IllegalStateException("hbase master is not running!"); } if (!admin.tableExists(htable.getTableName())) { throw new IllegalStateException("hbase table " + Bytes.toString(htable.getTableName()) + " is not existed!"); } if (!admin.isTableAvailable(htable.getTableName())) { throw new IllegalStateException("hbase table " + Bytes.toString(htable.getTableName()) + " is not available!"); } if (!admin.isTableEnabled(htable.getTableName())) { throw new IllegalStateException("hbase table " + Bytes.toString(htable.getTableName()) + " is disable!"); } return true; } public void close() throws IOException { htable.close(); } public void deleteTable() throws IOException { Scan s = new Scan(); ResultScanner scanner = htable.getScanner(s); for (Result rr = scanner.next(); rr != null; rr = scanner.next()) { htable.delete(new Delete(rr.getRow())); } scanner.close(); } public void truncateTable() throws IOException { admin.disableTable(htable.getTableName()); admin.deleteTable(htable.getTableName()); admin.createTable(descriptor); } public void flush() throws IOException { if (!buffer.isEmpty()) { htable.put(buffer); buffer.clear(); } htable.flushCommits(); } public void prepare(byte[] rowKey) { this.p = new Put(rowKey); } public Put add(byte[] family, byte[] qualifier, byte[] value) { return this.p.add(family, qualifier, value); } public void insert() throws IOException { buffer.add(this.p); if (buffer.size() >= BUFFER_LINE) { htable.put(buffer); buffer.clear(); } }}

  

转载于:https://www.cnblogs.com/qgxiaoguang/archive/2013/02/21/2921045.html

你可能感兴趣的文章
记一些从数学和程序设计中体会到的思想
查看>>
题目1462:两船载物问题
查看>>
POJ 2378 Tree Cutting(树形DP,水)
查看>>
第二冲刺阶段个人博客5
查看>>
UVA 116 Unidirectional TSP (白书dp)
查看>>
第三方测速工具
查看>>
MySQL 网络访问连接
查看>>
在aws ec2上使用root用户登录
查看>>
数据访问 投票习题
查看>>
CIO知识储备
查看>>
cnblog!i'm coming!
查看>>
使用点符号代替溢出的文本
查看>>
Axios 中文说明
查看>>
fatal: remote origin already exists.
查看>>
gridview 自定义value值
查看>>
2018二月实现计划成果及其三月规划
查看>>
类名.class和getClass()区别
查看>>
12/17面试题
查看>>
LeetCode 242. Valid Anagram
查看>>
JSP表单提交乱码
查看>>