Hbase提供了丰富的Java API,以及线程池操作,下面我用线程池来展示一下使用Java API操作Hbase。
项目结构如下:

我使用的Hbase的版本是
hbase-0.98.9-hadoop2-bin.tar.gz
大家下载后,可以拿到里面的lib目录下面的jar文件,即上所示的hbase-lib资源。
接口类:
/hbase-util/src/com/b510/hbase/util/dao/HbaseDao.java
- 1 package com.b510.hbase.util.dao;
- 2
- 3 import java.util.List;
- 4
- 5 import org.apache.hadoop.hbase.client.HTableInterface;
- 6
- 7
- 8 /**
- 9 * @author Hongten
- 10 * @created 7 Nov 2018
- 11 */
- 12 public interface HbaseDao {
- 13
- 14 // initial table
- 15 public HTableInterface getHTableFromPool(String tableName);
- 16
- 17 // check if the table is exist
- 18 public boolean isHTableExist(String tableName);
- 19
- 20 // create table
- 21 public void createHTable(String tableName, String[] columnFamilys);
- 22
- 23 // insert new row
- 24 public void addRow(String tableName, String rowKey, String columnFamily, String column, String value);
- 25
- 26 // get row by row key
- 27 public void getRow(String tableName, String rowKey);
- 28
- 29 public void getAllRows(String tableName);
- 30
- 31 // get rows by giving range
- 32 public void getRowsByRange(String tableName, String startRowKey, String endRowKey);
- 33
- 34 //delete row
- 35 public void delRow(String tableName, String rowKey);
- 36
- 37 //delete rows by row keys
- 38 public void delRowsByRowKeys(String tableName, List<String> rowKeys);
- 39
- 40 // auto flush data when close
- 41 public void closeAutoFlush(HTableInterface table);
- 42
- 43 // close table
- 44 public void closeTable(HTableInterface table);
- 45
- 46 // close pool connection
- 47 public void closePoolConnection();
- 48
- 49 // delete table
- 50 public void deleteHTable(String tableName);
- 51 }
实现类:
/hbase-util/src/com/b510/hbase/util/dao/impl/HbaseDaoImpl.java
- 1 package com.b510.hbase.util.dao.impl;
- 2
- 3 import java.io.IOException;
- 4 import java.util.List;
- 5
- 6 import org.apache.hadoop.conf.Configuration;
- 7 import org.apache.hadoop.hbase.Cell;
- 8 import org.apache.hadoop.hbase.CellUtil;
- 9 import org.apache.hadoop.hbase.HBaseConfiguration;
- 10 import org.apache.hadoop.hbase.HColumnDescriptor;
- 11 import org.apache.hadoop.hbase.HTableDescriptor;
- 12 import org.apache.hadoop.hbase.MasterNotRunningException;
- 13 import org.apache.hadoop.hbase.TableName;
- 14 import org.apache.hadoop.hbase.ZooKeeperConnectionException;
- 15 import org.apache.hadoop.hbase.client.Delete;
- 16 import org.apache.hadoop.hbase.client.Get;
- 17 import org.apache.hadoop.hbase.client.HBaseAdmin;
- 18 import org.apache.hadoop.hbase.client.HTableInterface;
- 19 import org.apache.hadoop.hbase.client.HTablePool;
- 20 import org.apache.hadoop.hbase.client.Put;
- 21 import org.apache.hadoop.hbase.client.Result;
- 22 import org.apache.hadoop.hbase.client.ResultScanner;
- 23 import org.apache.hadoop.hbase.client.Scan;
- 24
- 25 import com.b510.hbase.util.dao.HbaseDao;
- 26
- 27 /**
- 28 * @author Hongten
- 29 * @created 7 Nov 2018
- 30 */
- 31 @SuppressWarnings("deprecation")
- 32 public class HbaseDaoImpl implements HbaseDao {
- 33
- 34 private static Configuration conf = null;
- 35 private static HBaseAdmin hAdmin;
- 36 private static HTablePool pool;
- 37
- 38 private static int defaultPoolSize = 5;
- 39
- 40 public HbaseDaoImpl(int poolSize) {
- 41 conf = HBaseConfiguration.create();
- 42 conf.set("hbase.zookeeper.quorum", "node1:2888,node2:2888,node3:2888");
- 43 try {
- 44 hAdmin = new HBaseAdmin(conf);
- 45 // the default pool size is 5.
- 46 pool = new HTablePool(conf, poolSize <= 0 ? defaultPoolSize : poolSize);
- 47 } catch (MasterNotRunningException e) {
- 48 e.printStackTrace();
- 49 } catch (ZooKeeperConnectionException e) {
- 50 e.printStackTrace();
- 51 } catch (IOException e) {
- 52 e.printStackTrace();
- 53 }
- 54 }
- 55
- 56 @Override
- 57 public HTableInterface getHTableFromPool(String tableName) {
- 58 HTableInterface table = pool.getTable(tableName);
- 59 return table;
- 60 }
- 61
- 62 @Override
- 63 public boolean isHTableExist(String tableName) {
- 64 try {
- 65 return hAdmin.tableExists(tableName);
- 66 } catch (IOException e) {
- 67 e.printStackTrace();
- 68 }
- 69 return false;
- 70 }
- 71
- 72 @Override
- 73 public void createHTable(String tableName, String[] columnFamilys) {
- 74 if (!isHTableExist(tableName)) {
- 75 HTableDescriptor tableDescriptor = new HTableDescriptor(TableName.valueOf(tableName));
- 76 // The Hbase suggested the number of column family should be less than 3.
- 77 // Normally, there only have 1 column family.
- 78 for (String cfName : columnFamilys) {
- 79 HColumnDescriptor hColumnDescriptor = new HColumnDescriptor(cfName);
- 80 tableDescriptor.addFamily(hColumnDescriptor);
- 81 }
- 82 try {
- 83 hAdmin.createTable(tableDescriptor);
- 84 } catch (IOException e) {
- 85 e.printStackTrace();
- 86 }
- 87 System.out.println("The table [" + tableName + "] is created.");
- 88 } else {
- 89 System.out.println("The table [" + tableName + "] is existing already.");
- 90 }
- 91
- 92 }
- 93
- 94 @Override
- 95 public void addRow(String tableName, String rowKey, String columnFamily, String column, String value) {
- 96 if (isHTableExist(tableName)) {
- 97 HTableInterface table = getHTableFromPool(tableName);
- 98 Put put = new Put(rowKey.getBytes());
- 99 put.add(columnFamily.getBytes(), column.getBytes(), value.getBytes());
- 100 try {
- 101 table.put(put);
- 102 } catch (IOException e) {
- 103 e.printStackTrace();
- 104 }
- 105 System.out.println("Insert into table [" + tableName + "], Rowkey=[" + rowKey + "], Column=[" + columnFamily + ":" + column + "], Vlaue=[" + value + "].");
- 106 closeTable(table);
- 107 } else {
- 108 System.out.println("The table [" + tableName + "] does not exist.");
- 109 }
- 110 }
- 111
- 112 @Override
- 113 public void getRow(String tableName, String rowKey) {
- 114 if (isHTableExist(tableName)) {
- 115 HTableInterface table = getHTableFromPool(tableName);
- 116 Get get = new Get(rowKey.getBytes());
- 117 Result result;
- 118 try {
- 119 result = table.get(get);
- 120 String columnName = "";
- 121 String timeStamp = "";
- 122 String columnFamily = "";
- 123 String value = "";
- 124 for (Cell cell : result.rawCells()) {
- 125 timeStamp = String.valueOf(cell.getTimestamp());
- 126 columnFamily = new String(CellUtil.cloneFamily(cell));
- 127 columnName = new String(CellUtil.cloneQualifier(cell));
- 128 value = new String(CellUtil.cloneValue(cell));
- 129
- 130 System.out.println("Get from table [" + tableName + "], Rowkey=[" + rowKey + "], Column=[" + columnFamily + ":" + columnName + "], Timestamp=[" + timeStamp + "], Vlaue=[" + value + "].");
- 131 }
- 132 } catch (IOException e) {
- 133 e.printStackTrace();
- 134 }
- 135 closeTable(table);
- 136 } else {
- 137 System.out.println("The table [" + tableName + "] does not exist.");
- 138 }
- 139 }
- 140
- 141 @Override
- 142 public void getAllRows(String tableName) {
- 143 if (isHTableExist(tableName)) {
- 144 Scan scan = new Scan();
- 145 scanHTable(tableName, scan);
- 146 } else {
- 147 System.out.println("The table [" + tableName + "] does not exist.");
- 148 }
- 149 }
- 150
- 151 private void scanHTable(String tableName, Scan scan) {
- 152 try {
- 153 HTableInterface table = getHTableFromPool(tableName);
- 154 ResultScanner results = table.getScanner(scan);
- 155 for (Result result : results) {
- 156 String rowKey = "";
- 157 String columnName = "";
- 158 String timeStamp = "";
- 159 String columnFamily = "";
- 160 String value = "";
- 161 for (Cell cell : result.rawCells()) {
- 162 rowKey = new String(CellUtil.cloneRow(cell));
- 163 timeStamp = String.valueOf(cell.getTimestamp());
- 164 columnFamily = new String(CellUtil.cloneFamily(cell));
- 165 columnName = new String(CellUtil.cloneQualifier(cell));
- 166 value = new String(CellUtil.cloneValue(cell));
- 167
- 168 System.out.println("Get from table [" + tableName + "], Rowkey=[" + rowKey + "], Column=[" + columnFamily + ":" + columnName + "], Timestamp=[" + timeStamp + "], Vlaue=[" + value + "].");
- 169 }
- 170 }
- 171 closeTable(table);
- 172 } catch (IOException e) {
- 173 e.printStackTrace();
- 174 }
- 175 }
- 176
- 177 @Override
- 178 public void getRowsByRange(String tableName, String startRowKey, String endRowKey) {
- 179 if (isHTableExist(tableName)) {
- 180 Scan scan = new Scan();
- 181 scan.setStartRow(startRowKey.getBytes());
- 182 // not equals Stop Row Key, it mean the result does not include the stop row record(exclusive).
- 183 // the hbase version is 0.98.9
- 184 scan.setStopRow(endRowKey.getBytes());
- 185 scanHTable(tableName, scan);
- 186 } else {
- 187 System.out.println("The table [" + tableName + "] does not exist.");
- 188 }
- 189 }
- 190
- 191 @Override
- 192 public void delRow(String tableName, String rowKey) {
- 193 if (isHTableExist(tableName)) {
- 194 HTableInterface table = getHTableFromPool(tableName);
- 195 deleteRow(table, rowKey);
- 196 } else {
- 197 System.out.println("The table [" + tableName + "] does not exist.");
- 198 }
- 199 }
- 200
- 201 private void deleteRow(HTableInterface table, String rowKey) {
- 202 Delete del = new Delete(rowKey.getBytes());
- 203 try {
- 204 table.delete(del);
- 205 System.out.println("Delete from table [" + new String(table.getTableName()) + "], Rowkey=[" + rowKey + "].");
- 206 closeTable(table);
- 207 } catch (IOException e) {
- 208 e.printStackTrace();
- 209 }
- 210 }
- 211
- 212 @Override
- 213 public void delRowsByRowKeys(String tableName, List<String> rowKeys) {
- 214 if (rowKeys != null && rowKeys.size() > 0) {
- 215 for (String rowKey : rowKeys) {
- 216 delRow(tableName, rowKey);
- 217 }
- 218 }
- 219 }
- 220
- 221 @Override
- 222 public void deleteHTable(String tableName) {
- 223 if (isHTableExist(tableName)) {
- 224 try {
- 225 hAdmin.disableTable(tableName.getBytes());
- 226 hAdmin.deleteTable(tableName.getBytes());
- 227 System.out.println("The table [" + tableName + "] is deleted.");
- 228 } catch (IOException e) {
- 229 e.printStackTrace();
- 230 }
- 231 } else {
- 232 System.out.println("The table [" + tableName + "] does not exist.");
- 233 }
- 234
- 235 }
- 236
- 237 @Override
- 238 public void closeAutoFlush(HTableInterface table) {
- 239 table.setAutoFlush(false, false);
- 240 }
- 241
- 242 @Override
- 243 public void closeTable(HTableInterface table) {
- 244 try {
- 245 table.close();
- 246 } catch (IOException e) {
- 247 e.printStackTrace();
- 248 }
- 249 }
- 250
- 251 @Override
- 252 public void closePoolConnection() {
- 253 try {
- 254 pool.close();
- 255 } catch (IOException e) {
- 256 e.printStackTrace();
- 257 }
- 258 }
- 259
- 260 }
测试类:
/hbase-util/src/com/b510/hbase/util/dao/test/HbaseDaoTest.java
- 1 package com.b510.hbase.util.dao.test;
- 2
- 3 import java.util.ArrayList;
- 4 import java.util.List;
- 5
- 6 import org.junit.Test;
- 7
- 8 import com.b510.hbase.util.dao.HbaseDao;
- 9 import com.b510.hbase.util.dao.impl.HbaseDaoImpl;
- 10
- 11 /**
- 12 * @author Hongten
- 13 * @created 7 Nov 2018
- 14 */
- 15 public class HbaseDaoTest {
- 16
- 17 HbaseDao dao = new HbaseDaoImpl(4);
- 18
- 19 public static final String tableName = "t_test";
- 20 public static final String columnFamilyName = "cf1";
- 21 public static final String[] CFs = { columnFamilyName };
- 22
- 23 public static final String COLUMN_NAME_NAME = "name";
- 24 public static final String COLUMN_NAME_AGE = "age";
- 25
- 26 @Test
- 27 public void main() {
- 28 createTable();
- 29 addRow();
- 30 getRow();
- 31 getAllRows();
- 32 getRowsByRange();
- 33 delRow();
- 34 delRowsByRowKeys();
- 35 deleteHTable();
- 36 }
- 37
- 38 public void createTable() {
- 39 System.out.println("=== create table ====");
- 40 dao.createHTable(tableName, CFs);
- 41 }
- 42
- 43 public void addRow() {
- 44 System.out.println("=== insert record ====");
- 45 dao.addRow(tableName, "12345566", columnFamilyName, COLUMN_NAME_NAME, "Hongten");
- 46 dao.addRow(tableName, "12345566", columnFamilyName, COLUMN_NAME_AGE, "22");
- 47
- 48 dao.addRow(tableName, "12345567", columnFamilyName, COLUMN_NAME_NAME, "Tom");
- 49 dao.addRow(tableName, "12345567", columnFamilyName, COLUMN_NAME_AGE, "25");
- 50
- 51 dao.addRow(tableName, "12345568", columnFamilyName, COLUMN_NAME_NAME, "Jone");
- 52 dao.addRow(tableName, "12345568", columnFamilyName, COLUMN_NAME_AGE, "30");
- 53
- 54 dao.addRow(tableName, "12345569", columnFamilyName, COLUMN_NAME_NAME, "Jobs");
- 55 dao.addRow(tableName, "12345569", columnFamilyName, COLUMN_NAME_AGE, "24");
- 56 }
- 57
- 58 public void getRow() {
- 59 System.out.println("=== get record ====");
- 60 dao.getRow(tableName, "12345566");
- 61 }
- 62
- 63 public void getAllRows() {
- 64 System.out.println("=== scan table ====");
- 65 dao.getAllRows(tableName);
- 66 }
- 67
- 68 public void getRowsByRange() {
- 69 System.out.println("=== scan record by giving range ====");
- 70 // it will return the '12345567' and '12345568' rows.
- 71 dao.getRowsByRange(tableName, "12345567", "12345569");
- 72 }
- 73
- 74 public void delRow() {
- 75 System.out.println("=== delete record ====");
- 76 dao.delRow(tableName, "12345568");
- 77 // only '12345567' row.
- 78 getRowsByRange();
- 79 }
- 80
- 81 public void delRowsByRowKeys() {
- 82 System.out.println("=== delete batch records ====");
- 83 List<String> rowKeys = new ArrayList<String>();
- 84 rowKeys.add("12345566");
- 85 rowKeys.add("12345569");
- 86 dao.delRowsByRowKeys(tableName, rowKeys);
- 87 // can not find the '12345566' and '12345569'
- 88 getAllRows();
- 89 }
- 90
- 91 public void deleteHTable() {
- 92 System.out.println("=== delete table ====");
- 93 dao.deleteHTable(tableName);
- 94 }
- 95 }
测试结果:
- log4j:WARN No appenders could be found for logger (org.apache.hadoop.metrics2.lib.MutableMetricsFactory).
- log4j:WARN Please initialize the log4j system properly.
- log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
- === create table ====
- The table [t_test] is created.
- === insert record ====
- Insert into table [t_test], Rowkey=[12345566], Column=[cf1:name], Vlaue=[Hongten].
- Insert into table [t_test], Rowkey=[12345566], Column=[cf1:age], Vlaue=[22].
- Insert into table [t_test], Rowkey=[12345567], Column=[cf1:name], Vlaue=[Tom].
- Insert into table [t_test], Rowkey=[12345567], Column=[cf1:age], Vlaue=[25].
- Insert into table [t_test], Rowkey=[12345568], Column=[cf1:name], Vlaue=[Jone].
- Insert into table [t_test], Rowkey=[12345568], Column=[cf1:age], Vlaue=[30].
- Insert into table [t_test], Rowkey=[12345569], Column=[cf1:name], Vlaue=[Jobs].
- Insert into table [t_test], Rowkey=[12345569], Column=[cf1:age], Vlaue=[24].
- === get record ====
- Get from table [t_test], Rowkey=[12345566], Column=[cf1:age], Timestamp=[1541652952697], Vlaue=[22].
- Get from table [t_test], Rowkey=[12345566], Column=[cf1:name], Timestamp=[1541652952626], Vlaue=[Hongten].
- === scan table ====
- Get from table [t_test], Rowkey=[12345566], Column=[cf1:age], Timestamp=[1541652952697], Vlaue=[22].
- Get from table [t_test], Rowkey=[12345566], Column=[cf1:name], Timestamp=[1541652952626], Vlaue=[Hongten].
- Get from table [t_test], Rowkey=[12345567], Column=[cf1:age], Timestamp=[1541652952779], Vlaue=[25].
- Get from table [t_test], Rowkey=[12345567], Column=[cf1:name], Timestamp=[1541652952743], Vlaue=[Tom].
- Get from table [t_test], Rowkey=[12345568], Column=[cf1:age], Timestamp=[1541652952834], Vlaue=[30].
- Get from table [t_test], Rowkey=[12345568], Column=[cf1:name], Timestamp=[1541652952807], Vlaue=[Jone].
- Get from table [t_test], Rowkey=[12345569], Column=[cf1:age], Timestamp=[1541652952928], Vlaue=[24].
- Get from table [t_test], Rowkey=[12345569], Column=[cf1:name], Timestamp=[1541652952869], Vlaue=[Jobs].
- === scan record by giving range ====
- Get from table [t_test], Rowkey=[12345567], Column=[cf1:age], Timestamp=[1541652952779], Vlaue=[25].
- Get from table [t_test], Rowkey=[12345567], Column=[cf1:name], Timestamp=[1541652952743], Vlaue=[Tom].
- Get from table [t_test], Rowkey=[12345568], Column=[cf1:age], Timestamp=[1541652952834], Vlaue=[30].
- Get from table [t_test], Rowkey=[12345568], Column=[cf1:name], Timestamp=[1541652952807], Vlaue=[Jone].
- === delete record ====
- Delete from table [t_test], Rowkey=[12345568].
- === scan record by giving range ====
- Get from table [t_test], Rowkey=[12345567], Column=[cf1:age], Timestamp=[1541652952779], Vlaue=[25].
- Get from table [t_test], Rowkey=[12345567], Column=[cf1:name], Timestamp=[1541652952743], Vlaue=[Tom].
- === delete batch records ====
- Delete from table [t_test], Rowkey=[12345566].
- Delete from table [t_test], Rowkey=[12345569].
- === scan table ====
- Get from table [t_test], Rowkey=[12345567], Column=[cf1:age], Timestamp=[1541652952779], Vlaue=[25].
- Get from table [t_test], Rowkey=[12345567], Column=[cf1:name], Timestamp=[1541652952743], Vlaue=[Tom].
- === delete table ====
- The table [t_test] is deleted.
源码下载:
hbase-util.zip
========================================================
More reading,and english is important.
I'm Hongten
- 大哥哥大姐姐,觉得有用打赏点哦!你的支持是我最大的动力。谢谢。
Hongten博客排名在100名以内。粉丝过千。
Hongten出品,必是精品。
E | hongtenzone@foxmail.com B | http://www.cnblogs.com/hongten
========================================================