phoenix作为查询引擎,为了提高查询效率,为phoenix表创建了二级索引,而数据是sparkstreaming通过hbase api直接向hbase插数据。那么问题来了,对于phoenix的二级索引,直接插入底层hbase的源表,不会引起二级索引的更新,从而导致phoenix索引数据和hbase源表数据不一致。而对于spark+phoenix的写入方式,官方有文档说明,但是有版本限制,以下是官方原文:
To ensure that all requisite Phoenix / HBase platform dependencies are available on the classpath for the Spark executors and drivers, set both ‘spark.executor.extraClassPath’ and ‘spark.driver.extraClassPath’ in spark-defaults.conf to include the ‘phoenix-<version>-client.jar’
Note that for Phoenix versions 4.7 and 4.8 you must use the ‘phoenix-<version>-client-spark.jar’. As of Phoenix 4.10, the ‘phoenix-<version>-client.jar’ is compiled against Spark 2.x. If compability with Spark 1.x if needed, you must compile Phoenix with the spark16 maven profile.
所以只能考虑用jdbc的方式做。
我使用的版本信息:
spark:2.2.1
phoenix:4.13.2
jar包引入:
phoenixUtil类:
在sparkstreaming中引入phoenixUtil类(由于业务关系,这里使用的是statement):
saveLines.foreachRDD(rdd -> {
rdd.foreachPartition(p -> {
Connection conn = PhoenixUtil.getConnection();
Statement stmt = conn.createStatement();
conn.setAutoCommit(false); //业务逻辑 //sql }
stmt.addBatch(sql);
}
stmt.executeBatch();
conn.commit();
stmt.close();
PhoenixUtil.returnConnection(conn);
ZkKafkaUtil.updateOffset(offsetRanges, GROUP_ID, TOPIC);
});
});
最后,如果大家有更好的方式处理这个问题,欢迎指教。