经验首页 前端设计 程序设计 Java相关 移动开发 数据库/运维 软件/图像 大数据/云计算 其他经验
当前位置:技术经验 » 大数据/云/AI » Apache Kafka » 查看文章
Kafka的接口回调 +自定义分区、拦截器
来源:cnblogs  作者:给你一个公主抱  时间:2019/3/1 9:11:06  对本文有异议

一、接口回调+自定义分区

  1.接口回调:在使用消费者的send方法时添加Callback回调

 

  1. producer.send(new ProducerRecord<String, String>("xinnian", "20" + i + "年新年好!"), new Callback() {
    public void onCompletion(RecordMetadata recordMetadata, Exception e) {
    if (recordMetadata!=null){
    System.out.println(recordMetadata.topic()+"-----"+recordMetadata.offset()+"-----"+recordMetadata.partition());
    }
    }
     2.自定义分区:定义类实现Patitioner接口,实现接口的方法:
       设置configure、分区逻辑partitionreturn 1;)、释放资源close、在生产者的配置过程中添加入分区属性。
     在定义生产者属性时添加分区的属性即可
  1. /**
  2. * @author: PrincessHug
  3. * @date: 2019/2/28, 16:24
  4. * @Blog: https://www.cnblogs.com/HelloBigTable/
  5. */
  6. public class PartitionDemo implements Partitioner {
  7. public int partition(String s, Object o, byte[] bytes, Object o1, byte[] bytes1, Cluster cluster) {
  8. return 1;
  9. }
  10.  
  11. public void close() {
  12.  
  13. }
  14.  
  15. public void configure(Map<String, ?> map) {
  16.  
  17. }
  18. }
  19.  
  20. public class ProducerDemo {
  21. public static void main(String[] args) {
  22. Properties prop = new Properties();
  23.  
  24. //参数配置
  25. //kafka节点的地址
  26. prop.put("bootstrap.servers", "192.168.126.128:9092");
  27. //发送消息是否等待应答
  28. prop.put("acks", "all");
  29. //配置发送消息失败重试
  30. prop.put("retries", "0");
  31. //配置批量处理消息大小
  32. prop.put("batch.size", "10241");
  33. //配置批量处理数据延迟
  34. prop.put("linger.ms","5");
  35. //配置内存缓冲大小
  36. prop.put("buffer.memory", "12341235");
  37. //消息在发送前必须序列化
  38. prop.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
  39. prop.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
  40. prop.put("partitioner.class", "PartitionDemo");
  41.  
  42. KafkaProducer<String, String> producer = new KafkaProducer<String, String>(prop);
  43.  
  44. for (int i=10;i<100;i++){
  45. producer.send(new ProducerRecord<String, String>("xinnian", "20" + i + "年新年好!"), new Callback() {
  46. public void onCompletion(RecordMetadata recordMetadata, Exception e) {
  47. if (recordMetadata!=null){
  48. System.out.println(recordMetadata.topic()+"-----"+recordMetadata.offset()+"-----"+recordMetadata.partition());
  49. }
  50. }
  51. });
  52. }
  53. producer.close();
  54. }
  55. }

  注意:在自定义分区后,你的消费者会收不到消息,因为消费者默认接收的分区为0。

 

二、拦截器

  1)创建生产者类;
     2)创建自定义拦截器类实现ProducerInterceptor接口,重写抽象方法;
     3)在业务逻辑方法ProducerRecord方法中,修改返回值,
        return new ProducerRecord<String,String>(
        record.topic(),
        record.partiiton(),
        record.key(),
        System.currentTimeMillis() + "-" + record.value() + "-" + record.topic());
     4)在生产者类中将自定义拦截器生效
       prop.put(ProducerConfig.INTERCEPTOR_CLASSEA_CONFIG,"com.wyh.com.wyh.kafka.interceptor.TimeInterceptor");
     5)运行生产者main方法,或者在linux端用shell测试。

  1. /**
  2. * @author: PrincessHug
  3. * @date: 2019/2/28, 20:59
  4. * @Blog: https://www.cnblogs.com/HelloBigTable/
  5. */
  6. public class TimeInterceptor implements ProducerInterceptor<String, String> {
  7. //业务逻辑
  8. public ProducerRecord<String, String> onSend(ProducerRecord<String, String> producerRecord) {
  9. return new ProducerRecord<String,String>(
  10. producerRecord.topic(),
  11. producerRecord.partition(),
  12. producerRecord.key(),
  13. System.currentTimeMillis()+"--"+producerRecord.value()
  14. );
  15. }
  16. //发送失败调用
  17. public void onAcknowledgement(RecordMetadata recordMetadata, Exception e) {
  18. }
  19. //释放资源
  20. public void close() {
  21. }
  22. //获取配置信息
  23. public void configure(Map<String, ?> map) {
  24. }
  25. }
  26. public class ItctorProducer {
  27. public static void main(String[] args) {
  28. //配置生产者属性
  29. Properties prop = new Properties();
  30. //kafka节点的地址
  31. prop.put("bootstrap.servers", "192.168.126.128:9092");
  32. //发送消息是否等待应答
  33. prop.put("acks", "all");
  34. //配置发送消息失败重试
  35. prop.put("retries", "0");
  36. //配置批量处理消息大小
  37. prop.put("batch.size", "1024");
  38. //配置批量处理数据延迟
  39. prop.put("linger.ms","5");
  40. //配置内存缓冲大小
  41. prop.put("buffer.memory", "12341235");
  42. //消息在发送前必须序列化
  43. prop.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
  44. prop.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
  45. //添加拦截器
  46. ArrayList<String> inList = new ArrayList<String>();
  47. inList.add("interceptor.TimeInterceptor");
  48. prop.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,inList);
  49. //实例化producer
  50. KafkaProducer<String, String> producer = new KafkaProducer<String, String>(prop);
  51. //发送消息
  52. for (int i=0;i<99;i++){
  53. producer.send(new ProducerRecord<String, String>("xinnian","You are genius!"+i));
  54. }
  55. //释放资源
  56. producer.close();
  57. }
  58. }

 


  1.   

原文链接:http://www.cnblogs.com/HelloBigTable/p/10453884.html

 友情链接:直通硅谷  点职佳  北美留学生论坛

本站QQ群:前端 618073944 | Java 606181507 | Python 626812652 | C/C++ 612253063 | 微信 634508462 | 苹果 692586424 | C#/.net 182808419 | PHP 305140648 | 运维 608723728

W3xue 的所有内容仅供测试,对任何法律问题及风险不承担任何责任。通过使用本站内容随之而来的风险与本站无关。
关于我们  |  意见建议  |  捐助我们  |  报错有奖  |  广告合作、友情链接(目前9元/月)请联系QQ:27243702 沸活量
皖ICP备17017327号-2 皖公网安备34020702000426号