经验首页 前端设计 程序设计 Java相关 移动开发 数据库/运维 软件/图像 大数据/云计算 其他经验
当前位置:技术经验 » 大数据/云/AI » Apache Kafka » 查看文章
KakfaSpout自定义scheme
来源:cnblogs  作者:夢見貓、  时间:2018/11/23 10:20:40  对本文有异议

一.Mapper和Scheme

scheme:将kafka传到spout里的数据格式进行转化. record->tuple

mapper:将storm传到kafka的数据格式进行转化.tuple->record

二.为什么要自定义消息格式

在很多需求里, 从kafka传递过来的数据并不是单纯的string, 可以是任意对象.当我们需要根据对象的某个属性进行分组时, 默认的new Fields("bytes")就不太适合.但是消息传递的形式还是string.我们可以在传入kafka之前使用fastJson的转化方法将实体对象转化成jsonString.

到了scheme在转换成实体类对象.

三.怎么更改scheme

构建kafkaSpout时我们要配置很多参数, 可以看一下kafkaConfig代码.

  1. public final BrokerHosts hosts; //用以获取Kafka broker和partition的信息
  2. public final String topic;//从哪个topic读取消息
  3. public final String clientId; // SimpleConsumer所用的client id
  4. public int fetchSizeBytes = 1024 * 1024; //发给Kafka的每个FetchRequest中,用此指定想要的response中总的消息的大小
  5. public int socketTimeoutMs = 10000;//与Kafka broker的连接的socket超时时间
  6. public int fetchMaxWait = 10000; //当服务器没有新消息时,消费者会等待这些时间
  7. public int bufferSizeBytes = 1024 * 1024;//SimpleConsumer所使用的SocketChannel的读缓冲区大小
  8. public MultiScheme scheme = new RawMultiScheme();//从Kafka中取出的byte[],该如何反序列化
  9. public boolean forceFromStart = false;//是否强制从Kafka中offset最小的开始读起
  10. public long startOffsetTime = kafka.api.OffsetRequest.EarliestTime();//从何时的offset时间开始读,默认为最旧的offset
  11. public long maxOffsetBehind = Long.MAX_VALUE;//KafkaSpout读取的进度与目标进度相差多少,相差太多,Spout会丢弃中间的消息
  12. public boolean useStartOffsetTimeIfOffsetOutOfRange = true;//如果所请求的offset对应的消息在Kafka中不存在,是否使startOffsetTime

 

可以看到, 所有的配置项都是public, 所以当我们实例化一个spoutConfig之后, 可以通过直接引用的方式进行更改属性值.

我们可以看构建kafkaspout的代码:

  1. ZkHosts zkHosts = new ZkHosts(zkHost);
  2. // zk对地址有唯一性标识
  3. String zkRoot = "/" + topic;
  4. String id = UUID.randomUUID().toString();
  5. // 构建spoutConfig
  6. SpoutConfig spoutConf = new SpoutConfig(zkHosts, topic, zkRoot, id);
  7. spoutConf.scheme = new SchemeAsMultiScheme(new SensorDataScheme());
  8. spoutConf.startOffsetTime = OffsetRequest.LatestTime();
  9. KafkaSpout kafkaSpout = new KafkaSpout(spoutConf);

四.怎么自定义scheme

我们有这样一个需求,有一个实体类如下:

  1. public class SensorData implements Serializable {
  2. // 设备Id;
  3. private String deviceId;
  4. // 型号id
  5. private String dmPropertiesId;
  6. // 通道名称;
  7. private String channelName;
  8. // 采集的温度值
  9. private double deviceTemp;
  10. // 采集的时间;
  11. private Date date;
  12. }

数据进来kafka到storm消费时, 根据deviceId进行分组.当然, 我们在写入的时候对数据json化, 使用fastjson把实体对象变成字符串, 而不是直接传实体类对象进入kafka(亲测会报错, 无法进行转换).最终数据会在scheme的declare的方法里处理.

Scheme接口:

  1. public interface Scheme extends Serializable {
  2. List<Object> deserialize(ByteBuffer ser);
  3. public Fields getOutputFields();
  4. }

可以看到有两个需要实现的方法, 一个是传过来的byte数据进行转化, 一个是传入下一层bolt的时候以什么字段分组. 跟踪kafka的源码我们可以看到, 他的declare方法最终会调用scheme的方法来确认字段名.

看一下scheme的整体代码:

  1. package dm.scheme;
  2. import java.nio.ByteBuffer;
  3. import java.nio.charset.Charset;
  4. import java.nio.charset.StandardCharsets;
  5. import java.util.List;
  6. import org.apache.storm.kafka.StringScheme;
  7. import org.apache.storm.spout.Scheme;
  8. import org.apache.storm.tuple.Fields;
  9. import org.apache.storm.tuple.Values;
  10. import org.apache.storm.utils.Utils;
  11. import com.alibaba.fastjson.JSON;
  12. import dm.entity.SensorData;
  13. /**
  14. *
  15. * KafkaRecord 映射 tuple 转化类;
  16. *
  17. * @author chenwen
  18. *
  19. */
  20. public class SensorDataScheme implements Scheme {
  21. /**
  22. *
  23. */
  24. private static final long serialVersionUID = 1L;
  25. private static final Charset UTF8_CHARSET = StandardCharsets.UTF_8;
  26. /**
  27. *
  28. * 反序列化
  29. */
  30. @Override
  31. public List<Object> deserialize(ByteBuffer byteBuffer) {
  32. // 将kafka消息转化成jsonString
  33. String sensorDataJson = StringScheme.deserializeString(byteBuffer);
  34. SensorData sensorData = JSON.parseObject(sensorDataJson, SensorData.class);
  35. String id = sensorData.getDeviceId();
  36. return new Values(id, sensorData);
  37. }
  38. public static String deserializeString(ByteBuffer byteBuffer) {
  39. if (byteBuffer.hasArray()) {
  40. int base = byteBuffer.arrayOffset();
  41. return new String(byteBuffer.array(), base + byteBuffer.position(), byteBuffer.remaining());
  42. } else {
  43. return new String(Utils.toByteArray(byteBuffer), UTF8_CHARSET);
  44. }
  45. }
  46. @Override
  47. public Fields getOutputFields() {
  48. return new Fields("deviceId", "sensorData"); // 返回字段及其名称;
  49. }
  50. }

 

 友情链接:直通硅谷  点职佳  北美留学生论坛

本站QQ群:前端 618073944 | Java 606181507 | Python 626812652 | C/C++ 612253063 | 微信 634508462 | 苹果 692586424 | C#/.net 182808419 | PHP 305140648 | 运维 608723728

W3xue 的所有内容仅供测试,对任何法律问题及风险不承担任何责任。通过使用本站内容随之而来的风险与本站无关。
关于我们  |  意见建议  |  捐助我们  |  报错有奖  |  广告合作、友情链接(目前9元/月)请联系QQ:27243702 沸活量
皖ICP备17017327号-2 皖公网安备34020702000426号