经验首页 前端设计 程序设计 Java相关 移动开发 数据库/运维 软件/图像 大数据/云计算 其他经验
当前位置:技术经验 » 数据库运维 » Spark » 查看文章
spark+phoenix
来源:cnblogs  作者:dlyhm  时间:2018/10/16 9:24:12  对本文有异议

 

phoenix作为查询引擎,为了提高查询效率,为phoenix表创建了二级索引,而数据是sparkstreaming通过hbase api直接向hbase插数据。那么问题来了,对于phoenix的二级索引,直接插入底层hbase的源表,不会引起二级索引的更新,从而导致phoenix索引数据和hbase源表数据不一致。而对于spark+phoenix的写入方式,官方有文档说明,但是有版本限制,以下是官方原文:


    • To ensure that all requisite Phoenix / HBase platform dependencies are available on the classpath for the Spark executors and drivers, set both ‘spark.executor.extraClassPath’ and ‘spark.driver.extraClassPath’ in spark-defaults.conf to include the ‘phoenix-<version>-client.jar’

    • Note that for Phoenix versions 4.7 and 4.8 you must use the ‘phoenix-<version>-client-spark.jar’. As of Phoenix 4.10, the ‘phoenix-<version>-client.jar’ is compiled against Spark 2.x. If compability with Spark 1.x if needed, you must compile Phoenix with the spark16 maven profile.

所以只能考虑用jdbc的方式做。

我使用的版本信息:

  • spark:2.2.1

  • phoenix:4.13.2

jar包引入:

    1.  <dependency>
    2.             <groupId>org.apache.phoenix</groupId>
    3.             <artifactId>phoenix-core</artifactId>
    4.             <version>4.13.1-HBase-1.2</version>
    5.         </dependency>
    6.         <dependency>
    7.             <groupId>org.apache.phoenix</groupId>
    8.             <artifactId>phoenix-spark</artifactId>
    9.             <version>4.13.1-HBase-1.2</version>
    10.         </dependency>

     

phoenixUtil类:

    1. public class PhoenixUtil {private static LinkedList<Connection> connectionQueue;static {try {
    2.             Class.forName("org.apache.phoenix.jdbc.PhoenixDriver");
    3.         } catch (ClassNotFoundException e) {
    4.             e.printStackTrace();
    5.         }
    6.     }public synchronized static Connection getConnection() throws SQLException {try {if (connectionQueue == null){
    7.                 connectionQueue = new LinkedList<Connection>();for (int i = 0;< 3;i++){
    8.                     Connection conn = DriverManager.getConnection("jdbc:phoenix:hostname:2181");
    9.  
    10.                     connectionQueue.push(conn);
    11.                 }
    12.             }
    13.         }catch (Exception e1){
    14.             e1.printStackTrace();
    15.         }return connectionQueue.poll();
    16.     }public static void returnConnection(Connection conn){
    17.         connectionQueue.push(conn);
    18.     }

     

在sparkstreaming中引入phoenixUtil类(由于业务关系,这里使用的是statement):

saveLines.foreachRDD(rdd -> {            rdd.foreachPartition(p -> {                Connection conn = PhoenixUtil.getConnection();                Statement stmt = conn.createStatement();                conn.setAutoCommit(false);             //业务逻辑             //sql                    }                    stmt.addBatch(sql);                }                stmt.executeBatch();                conn.commit();                stmt.close();                PhoenixUtil.returnConnection(conn);                ZkKafkaUtil.updateOffset(offsetRanges, GROUP_ID, TOPIC);            });        });

最后,如果大家有更好的方式处理这个问题,欢迎指教。

 友情链接:直通硅谷  点职佳  北美留学生论坛

本站QQ群:前端 618073944 | Java 606181507 | Python 626812652 | C/C++ 612253063 | 微信 634508462 | 苹果 692586424 | C#/.net 182808419 | PHP 305140648 | 运维 608723728

W3xue 的所有内容仅供测试,对任何法律问题及风险不承担任何责任。通过使用本站内容随之而来的风险与本站无关。
关于我们  |  意见建议  |  捐助我们  |  报错有奖  |  广告合作、友情链接(目前9元/月)请联系QQ:27243702 沸活量
皖ICP备17017327号-2 皖公网安备34020702000426号