经验首页 前端设计 程序设计 Java相关 移动开发 数据库/运维 软件/图像 大数据/云计算 其他经验
当前位置:技术经验 » 大数据/云/AI » Hadoop » 查看文章
hadoop2-HBase的Java API操作
来源:cnblogs  作者:Hongten  时间:2018/11/9 11:08:45  对本文有异议

Hbase提供了丰富的Java API,以及线程池操作,下面我用线程池来展示一下使用Java API操作Hbase。

项目结构如下:

我使用的Hbase的版本是

hbase-0.98.9-hadoop2-bin.tar.gz

大家下载后,可以拿到里面的lib目录下面的jar文件,即上所示的hbase-lib资源。

 

接口类:

/hbase-util/src/com/b510/hbase/util/dao/HbaseDao.java

  1. 1 package com.b510.hbase.util.dao;
  2. 2
  3. 3 import java.util.List;
  4. 4
  5. 5 import org.apache.hadoop.hbase.client.HTableInterface;
  6. 6
  7. 7
  8. 8 /**
  9. 9 * @author Hongten
  10. 10 * @created 7 Nov 2018
  11. 11 */
  12. 12 public interface HbaseDao {
  13. 13
  14. 14 // initial table
  15. 15 public HTableInterface getHTableFromPool(String tableName);
  16. 16
  17. 17 // check if the table is exist
  18. 18 public boolean isHTableExist(String tableName);
  19. 19
  20. 20 // create table
  21. 21 public void createHTable(String tableName, String[] columnFamilys);
  22. 22
  23. 23 // insert new row
  24. 24 public void addRow(String tableName, String rowKey, String columnFamily, String column, String value);
  25. 25
  26. 26 // get row by row key
  27. 27 public void getRow(String tableName, String rowKey);
  28. 28
  29. 29 public void getAllRows(String tableName);
  30. 30
  31. 31 // get rows by giving range
  32. 32 public void getRowsByRange(String tableName, String startRowKey, String endRowKey);
  33. 33
  34. 34 //delete row
  35. 35 public void delRow(String tableName, String rowKey);
  36. 36
  37. 37 //delete rows by row keys
  38. 38 public void delRowsByRowKeys(String tableName, List<String> rowKeys);
  39. 39
  40. 40 // auto flush data when close
  41. 41 public void closeAutoFlush(HTableInterface table);
  42. 42
  43. 43 // close table
  44. 44 public void closeTable(HTableInterface table);
  45. 45
  46. 46 // close pool connection
  47. 47 public void closePoolConnection();
  48. 48
  49. 49 // delete table
  50. 50 public void deleteHTable(String tableName);
  51. 51 }

 

实现类:

/hbase-util/src/com/b510/hbase/util/dao/impl/HbaseDaoImpl.java

  1. 1 package com.b510.hbase.util.dao.impl;
  2. 2
  3. 3 import java.io.IOException;
  4. 4 import java.util.List;
  5. 5
  6. 6 import org.apache.hadoop.conf.Configuration;
  7. 7 import org.apache.hadoop.hbase.Cell;
  8. 8 import org.apache.hadoop.hbase.CellUtil;
  9. 9 import org.apache.hadoop.hbase.HBaseConfiguration;
  10. 10 import org.apache.hadoop.hbase.HColumnDescriptor;
  11. 11 import org.apache.hadoop.hbase.HTableDescriptor;
  12. 12 import org.apache.hadoop.hbase.MasterNotRunningException;
  13. 13 import org.apache.hadoop.hbase.TableName;
  14. 14 import org.apache.hadoop.hbase.ZooKeeperConnectionException;
  15. 15 import org.apache.hadoop.hbase.client.Delete;
  16. 16 import org.apache.hadoop.hbase.client.Get;
  17. 17 import org.apache.hadoop.hbase.client.HBaseAdmin;
  18. 18 import org.apache.hadoop.hbase.client.HTableInterface;
  19. 19 import org.apache.hadoop.hbase.client.HTablePool;
  20. 20 import org.apache.hadoop.hbase.client.Put;
  21. 21 import org.apache.hadoop.hbase.client.Result;
  22. 22 import org.apache.hadoop.hbase.client.ResultScanner;
  23. 23 import org.apache.hadoop.hbase.client.Scan;
  24. 24
  25. 25 import com.b510.hbase.util.dao.HbaseDao;
  26. 26
  27. 27 /**
  28. 28 * @author Hongten
  29. 29 * @created 7 Nov 2018
  30. 30 */
  31. 31 @SuppressWarnings("deprecation")
  32. 32 public class HbaseDaoImpl implements HbaseDao {
  33. 33
  34. 34 private static Configuration conf = null;
  35. 35 private static HBaseAdmin hAdmin;
  36. 36 private static HTablePool pool;
  37. 37
  38. 38 private static int defaultPoolSize = 5;
  39. 39
  40. 40 public HbaseDaoImpl(int poolSize) {
  41. 41 conf = HBaseConfiguration.create();
  42. 42 conf.set("hbase.zookeeper.quorum", "node1:2888,node2:2888,node3:2888");
  43. 43 try {
  44. 44 hAdmin = new HBaseAdmin(conf);
  45. 45 // the default pool size is 5.
  46. 46 pool = new HTablePool(conf, poolSize <= 0 ? defaultPoolSize : poolSize);
  47. 47 } catch (MasterNotRunningException e) {
  48. 48 e.printStackTrace();
  49. 49 } catch (ZooKeeperConnectionException e) {
  50. 50 e.printStackTrace();
  51. 51 } catch (IOException e) {
  52. 52 e.printStackTrace();
  53. 53 }
  54. 54 }
  55. 55
  56. 56 @Override
  57. 57 public HTableInterface getHTableFromPool(String tableName) {
  58. 58 HTableInterface table = pool.getTable(tableName);
  59. 59 return table;
  60. 60 }
  61. 61
  62. 62 @Override
  63. 63 public boolean isHTableExist(String tableName) {
  64. 64 try {
  65. 65 return hAdmin.tableExists(tableName);
  66. 66 } catch (IOException e) {
  67. 67 e.printStackTrace();
  68. 68 }
  69. 69 return false;
  70. 70 }
  71. 71
  72. 72 @Override
  73. 73 public void createHTable(String tableName, String[] columnFamilys) {
  74. 74 if (!isHTableExist(tableName)) {
  75. 75 HTableDescriptor tableDescriptor = new HTableDescriptor(TableName.valueOf(tableName));
  76. 76 // The Hbase suggested the number of column family should be less than 3.
  77. 77 // Normally, there only have 1 column family.
  78. 78 for (String cfName : columnFamilys) {
  79. 79 HColumnDescriptor hColumnDescriptor = new HColumnDescriptor(cfName);
  80. 80 tableDescriptor.addFamily(hColumnDescriptor);
  81. 81 }
  82. 82 try {
  83. 83 hAdmin.createTable(tableDescriptor);
  84. 84 } catch (IOException e) {
  85. 85 e.printStackTrace();
  86. 86 }
  87. 87 System.out.println("The table [" + tableName + "] is created.");
  88. 88 } else {
  89. 89 System.out.println("The table [" + tableName + "] is existing already.");
  90. 90 }
  91. 91
  92. 92 }
  93. 93
  94. 94 @Override
  95. 95 public void addRow(String tableName, String rowKey, String columnFamily, String column, String value) {
  96. 96 if (isHTableExist(tableName)) {
  97. 97 HTableInterface table = getHTableFromPool(tableName);
  98. 98 Put put = new Put(rowKey.getBytes());
  99. 99 put.add(columnFamily.getBytes(), column.getBytes(), value.getBytes());
  100. 100 try {
  101. 101 table.put(put);
  102. 102 } catch (IOException e) {
  103. 103 e.printStackTrace();
  104. 104 }
  105. 105 System.out.println("Insert into table [" + tableName + "], Rowkey=[" + rowKey + "], Column=[" + columnFamily + ":" + column + "], Vlaue=[" + value + "].");
  106. 106 closeTable(table);
  107. 107 } else {
  108. 108 System.out.println("The table [" + tableName + "] does not exist.");
  109. 109 }
  110. 110 }
  111. 111
  112. 112 @Override
  113. 113 public void getRow(String tableName, String rowKey) {
  114. 114 if (isHTableExist(tableName)) {
  115. 115 HTableInterface table = getHTableFromPool(tableName);
  116. 116 Get get = new Get(rowKey.getBytes());
  117. 117 Result result;
  118. 118 try {
  119. 119 result = table.get(get);
  120. 120 String columnName = "";
  121. 121 String timeStamp = "";
  122. 122 String columnFamily = "";
  123. 123 String value = "";
  124. 124 for (Cell cell : result.rawCells()) {
  125. 125 timeStamp = String.valueOf(cell.getTimestamp());
  126. 126 columnFamily = new String(CellUtil.cloneFamily(cell));
  127. 127 columnName = new String(CellUtil.cloneQualifier(cell));
  128. 128 value = new String(CellUtil.cloneValue(cell));
  129. 129
  130. 130 System.out.println("Get from table [" + tableName + "], Rowkey=[" + rowKey + "], Column=[" + columnFamily + ":" + columnName + "], Timestamp=[" + timeStamp + "], Vlaue=[" + value + "].");
  131. 131 }
  132. 132 } catch (IOException e) {
  133. 133 e.printStackTrace();
  134. 134 }
  135. 135 closeTable(table);
  136. 136 } else {
  137. 137 System.out.println("The table [" + tableName + "] does not exist.");
  138. 138 }
  139. 139 }
  140. 140
  141. 141 @Override
  142. 142 public void getAllRows(String tableName) {
  143. 143 if (isHTableExist(tableName)) {
  144. 144 Scan scan = new Scan();
  145. 145 scanHTable(tableName, scan);
  146. 146 } else {
  147. 147 System.out.println("The table [" + tableName + "] does not exist.");
  148. 148 }
  149. 149 }
  150. 150
  151. 151 private void scanHTable(String tableName, Scan scan) {
  152. 152 try {
  153. 153 HTableInterface table = getHTableFromPool(tableName);
  154. 154 ResultScanner results = table.getScanner(scan);
  155. 155 for (Result result : results) {
  156. 156 String rowKey = "";
  157. 157 String columnName = "";
  158. 158 String timeStamp = "";
  159. 159 String columnFamily = "";
  160. 160 String value = "";
  161. 161 for (Cell cell : result.rawCells()) {
  162. 162 rowKey = new String(CellUtil.cloneRow(cell));
  163. 163 timeStamp = String.valueOf(cell.getTimestamp());
  164. 164 columnFamily = new String(CellUtil.cloneFamily(cell));
  165. 165 columnName = new String(CellUtil.cloneQualifier(cell));
  166. 166 value = new String(CellUtil.cloneValue(cell));
  167. 167
  168. 168 System.out.println("Get from table [" + tableName + "], Rowkey=[" + rowKey + "], Column=[" + columnFamily + ":" + columnName + "], Timestamp=[" + timeStamp + "], Vlaue=[" + value + "].");
  169. 169 }
  170. 170 }
  171. 171 closeTable(table);
  172. 172 } catch (IOException e) {
  173. 173 e.printStackTrace();
  174. 174 }
  175. 175 }
  176. 176
  177. 177 @Override
  178. 178 public void getRowsByRange(String tableName, String startRowKey, String endRowKey) {
  179. 179 if (isHTableExist(tableName)) {
  180. 180 Scan scan = new Scan();
  181. 181 scan.setStartRow(startRowKey.getBytes());
  182. 182 // not equals Stop Row Key, it mean the result does not include the stop row record(exclusive).
  183. 183 // the hbase version is 0.98.9
  184. 184 scan.setStopRow(endRowKey.getBytes());
  185. 185 scanHTable(tableName, scan);
  186. 186 } else {
  187. 187 System.out.println("The table [" + tableName + "] does not exist.");
  188. 188 }
  189. 189 }
  190. 190
  191. 191 @Override
  192. 192 public void delRow(String tableName, String rowKey) {
  193. 193 if (isHTableExist(tableName)) {
  194. 194 HTableInterface table = getHTableFromPool(tableName);
  195. 195 deleteRow(table, rowKey);
  196. 196 } else {
  197. 197 System.out.println("The table [" + tableName + "] does not exist.");
  198. 198 }
  199. 199 }
  200. 200
  201. 201 private void deleteRow(HTableInterface table, String rowKey) {
  202. 202 Delete del = new Delete(rowKey.getBytes());
  203. 203 try {
  204. 204 table.delete(del);
  205. 205 System.out.println("Delete from table [" + new String(table.getTableName()) + "], Rowkey=[" + rowKey + "].");
  206. 206 closeTable(table);
  207. 207 } catch (IOException e) {
  208. 208 e.printStackTrace();
  209. 209 }
  210. 210 }
  211. 211
  212. 212 @Override
  213. 213 public void delRowsByRowKeys(String tableName, List<String> rowKeys) {
  214. 214 if (rowKeys != null && rowKeys.size() > 0) {
  215. 215 for (String rowKey : rowKeys) {
  216. 216 delRow(tableName, rowKey);
  217. 217 }
  218. 218 }
  219. 219 }
  220. 220
  221. 221 @Override
  222. 222 public void deleteHTable(String tableName) {
  223. 223 if (isHTableExist(tableName)) {
  224. 224 try {
  225. 225 hAdmin.disableTable(tableName.getBytes());
  226. 226 hAdmin.deleteTable(tableName.getBytes());
  227. 227 System.out.println("The table [" + tableName + "] is deleted.");
  228. 228 } catch (IOException e) {
  229. 229 e.printStackTrace();
  230. 230 }
  231. 231 } else {
  232. 232 System.out.println("The table [" + tableName + "] does not exist.");
  233. 233 }
  234. 234
  235. 235 }
  236. 236
  237. 237 @Override
  238. 238 public void closeAutoFlush(HTableInterface table) {
  239. 239 table.setAutoFlush(false, false);
  240. 240 }
  241. 241
  242. 242 @Override
  243. 243 public void closeTable(HTableInterface table) {
  244. 244 try {
  245. 245 table.close();
  246. 246 } catch (IOException e) {
  247. 247 e.printStackTrace();
  248. 248 }
  249. 249 }
  250. 250
  251. 251 @Override
  252. 252 public void closePoolConnection() {
  253. 253 try {
  254. 254 pool.close();
  255. 255 } catch (IOException e) {
  256. 256 e.printStackTrace();
  257. 257 }
  258. 258 }
  259. 259
  260. 260 }

 

测试类:

/hbase-util/src/com/b510/hbase/util/dao/test/HbaseDaoTest.java

  1. 1 package com.b510.hbase.util.dao.test;
  2. 2
  3. 3 import java.util.ArrayList;
  4. 4 import java.util.List;
  5. 5
  6. 6 import org.junit.Test;
  7. 7
  8. 8 import com.b510.hbase.util.dao.HbaseDao;
  9. 9 import com.b510.hbase.util.dao.impl.HbaseDaoImpl;
  10. 10
  11. 11 /**
  12. 12 * @author Hongten
  13. 13 * @created 7 Nov 2018
  14. 14 */
  15. 15 public class HbaseDaoTest {
  16. 16
  17. 17 HbaseDao dao = new HbaseDaoImpl(4);
  18. 18
  19. 19 public static final String tableName = "t_test";
  20. 20 public static final String columnFamilyName = "cf1";
  21. 21 public static final String[] CFs = { columnFamilyName };
  22. 22
  23. 23 public static final String COLUMN_NAME_NAME = "name";
  24. 24 public static final String COLUMN_NAME_AGE = "age";
  25. 25
  26. 26 @Test
  27. 27 public void main() {
  28. 28 createTable();
  29. 29 addRow();
  30. 30 getRow();
  31. 31 getAllRows();
  32. 32 getRowsByRange();
  33. 33 delRow();
  34. 34 delRowsByRowKeys();
  35. 35 deleteHTable();
  36. 36 }
  37. 37
  38. 38 public void createTable() {
  39. 39 System.out.println("=== create table ====");
  40. 40 dao.createHTable(tableName, CFs);
  41. 41 }
  42. 42
  43. 43 public void addRow() {
  44. 44 System.out.println("=== insert record ====");
  45. 45 dao.addRow(tableName, "12345566", columnFamilyName, COLUMN_NAME_NAME, "Hongten");
  46. 46 dao.addRow(tableName, "12345566", columnFamilyName, COLUMN_NAME_AGE, "22");
  47. 47
  48. 48 dao.addRow(tableName, "12345567", columnFamilyName, COLUMN_NAME_NAME, "Tom");
  49. 49 dao.addRow(tableName, "12345567", columnFamilyName, COLUMN_NAME_AGE, "25");
  50. 50
  51. 51 dao.addRow(tableName, "12345568", columnFamilyName, COLUMN_NAME_NAME, "Jone");
  52. 52 dao.addRow(tableName, "12345568", columnFamilyName, COLUMN_NAME_AGE, "30");
  53. 53
  54. 54 dao.addRow(tableName, "12345569", columnFamilyName, COLUMN_NAME_NAME, "Jobs");
  55. 55 dao.addRow(tableName, "12345569", columnFamilyName, COLUMN_NAME_AGE, "24");
  56. 56 }
  57. 57
  58. 58 public void getRow() {
  59. 59 System.out.println("=== get record ====");
  60. 60 dao.getRow(tableName, "12345566");
  61. 61 }
  62. 62
  63. 63 public void getAllRows() {
  64. 64 System.out.println("=== scan table ====");
  65. 65 dao.getAllRows(tableName);
  66. 66 }
  67. 67
  68. 68 public void getRowsByRange() {
  69. 69 System.out.println("=== scan record by giving range ====");
  70. 70 // it will return the '12345567' and '12345568' rows.
  71. 71 dao.getRowsByRange(tableName, "12345567", "12345569");
  72. 72 }
  73. 73
  74. 74 public void delRow() {
  75. 75 System.out.println("=== delete record ====");
  76. 76 dao.delRow(tableName, "12345568");
  77. 77 // only '12345567' row.
  78. 78 getRowsByRange();
  79. 79 }
  80. 80
  81. 81 public void delRowsByRowKeys() {
  82. 82 System.out.println("=== delete batch records ====");
  83. 83 List<String> rowKeys = new ArrayList<String>();
  84. 84 rowKeys.add("12345566");
  85. 85 rowKeys.add("12345569");
  86. 86 dao.delRowsByRowKeys(tableName, rowKeys);
  87. 87 // can not find the '12345566' and '12345569'
  88. 88 getAllRows();
  89. 89 }
  90. 90
  91. 91 public void deleteHTable() {
  92. 92 System.out.println("=== delete table ====");
  93. 93 dao.deleteHTable(tableName);
  94. 94 }
  95. 95 }

 

测试结果:

  1. log4j:WARN No appenders could be found for logger (org.apache.hadoop.metrics2.lib.MutableMetricsFactory).
  2. log4j:WARN Please initialize the log4j system properly.
  3. log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
  4. === create table ====
  5. The table [t_test] is created.
  6. === insert record ====
  7. Insert into table [t_test], Rowkey=[12345566], Column=[cf1:name], Vlaue=[Hongten].
  8. Insert into table [t_test], Rowkey=[12345566], Column=[cf1:age], Vlaue=[22].
  9. Insert into table [t_test], Rowkey=[12345567], Column=[cf1:name], Vlaue=[Tom].
  10. Insert into table [t_test], Rowkey=[12345567], Column=[cf1:age], Vlaue=[25].
  11. Insert into table [t_test], Rowkey=[12345568], Column=[cf1:name], Vlaue=[Jone].
  12. Insert into table [t_test], Rowkey=[12345568], Column=[cf1:age], Vlaue=[30].
  13. Insert into table [t_test], Rowkey=[12345569], Column=[cf1:name], Vlaue=[Jobs].
  14. Insert into table [t_test], Rowkey=[12345569], Column=[cf1:age], Vlaue=[24].
  15. === get record ====
  16. Get from table [t_test], Rowkey=[12345566], Column=[cf1:age], Timestamp=[1541652952697], Vlaue=[22].
  17. Get from table [t_test], Rowkey=[12345566], Column=[cf1:name], Timestamp=[1541652952626], Vlaue=[Hongten].
  18. === scan table ====
  19. Get from table [t_test], Rowkey=[12345566], Column=[cf1:age], Timestamp=[1541652952697], Vlaue=[22].
  20. Get from table [t_test], Rowkey=[12345566], Column=[cf1:name], Timestamp=[1541652952626], Vlaue=[Hongten].
  21. Get from table [t_test], Rowkey=[12345567], Column=[cf1:age], Timestamp=[1541652952779], Vlaue=[25].
  22. Get from table [t_test], Rowkey=[12345567], Column=[cf1:name], Timestamp=[1541652952743], Vlaue=[Tom].
  23. Get from table [t_test], Rowkey=[12345568], Column=[cf1:age], Timestamp=[1541652952834], Vlaue=[30].
  24. Get from table [t_test], Rowkey=[12345568], Column=[cf1:name], Timestamp=[1541652952807], Vlaue=[Jone].
  25. Get from table [t_test], Rowkey=[12345569], Column=[cf1:age], Timestamp=[1541652952928], Vlaue=[24].
  26. Get from table [t_test], Rowkey=[12345569], Column=[cf1:name], Timestamp=[1541652952869], Vlaue=[Jobs].
  27. === scan record by giving range ====
  28. Get from table [t_test], Rowkey=[12345567], Column=[cf1:age], Timestamp=[1541652952779], Vlaue=[25].
  29. Get from table [t_test], Rowkey=[12345567], Column=[cf1:name], Timestamp=[1541652952743], Vlaue=[Tom].
  30. Get from table [t_test], Rowkey=[12345568], Column=[cf1:age], Timestamp=[1541652952834], Vlaue=[30].
  31. Get from table [t_test], Rowkey=[12345568], Column=[cf1:name], Timestamp=[1541652952807], Vlaue=[Jone].
  32. === delete record ====
  33. Delete from table [t_test], Rowkey=[12345568].
  34. === scan record by giving range ====
  35. Get from table [t_test], Rowkey=[12345567], Column=[cf1:age], Timestamp=[1541652952779], Vlaue=[25].
  36. Get from table [t_test], Rowkey=[12345567], Column=[cf1:name], Timestamp=[1541652952743], Vlaue=[Tom].
  37. === delete batch records ====
  38. Delete from table [t_test], Rowkey=[12345566].
  39. Delete from table [t_test], Rowkey=[12345569].
  40. === scan table ====
  41. Get from table [t_test], Rowkey=[12345567], Column=[cf1:age], Timestamp=[1541652952779], Vlaue=[25].
  42. Get from table [t_test], Rowkey=[12345567], Column=[cf1:name], Timestamp=[1541652952743], Vlaue=[Tom].
  43. === delete table ====
  44. The table [t_test] is deleted.

 

源码下载:

hbase-util.zip

 

========================================================

More reading,and english is important.

I'm Hongten

 

  1. 大哥哥大姐姐,觉得有用打赏点哦!你的支持是我最大的动力。谢谢。
    Hongten博客排名在100名以内。粉丝过千。
    Hongten出品,必是精品。

E | hongtenzone@foxmail.com  B | http://www.cnblogs.com/hongten

========================================================

 友情链接:直通硅谷  点职佳  北美留学生论坛

本站QQ群:前端 618073944 | Java 606181507 | Python 626812652 | C/C++ 612253063 | 微信 634508462 | 苹果 692586424 | C#/.net 182808419 | PHP 305140648 | 运维 608723728

W3xue 的所有内容仅供测试,对任何法律问题及风险不承担任何责任。通过使用本站内容随之而来的风险与本站无关。
关于我们  |  意见建议  |  捐助我们  |  报错有奖  |  广告合作、友情链接(目前9元/月)请联系QQ:27243702 沸活量
皖ICP备17017327号-2 皖公网安备34020702000426号