请稍等 ...
×

采纳答案成功!

向帮助你的同学说点啥吧!感谢那些助人为乐的人

12-10 作业回答

作业1:Kafka Message 的序列化方式可以有很多种,试试其他的序列化方式 ?
自定义一个序列化方式吧,参考StringSerializer和StringDeserializer:
实体类

@Data
@AllArgsConstructor
@NoArgsConstructor
public class User implements Serializable {
    private Integer id;
    private String username;
    private String password;
}

序列化与反序列化

public class UserSerializer implements Serializer<User> {
    @Override
    public void configure(Map<String, ?> map, boolean b) {

    }
    /**
     * 序列化
     * @param s
     * @param user
     * @return
     */
    @Override
    public byte[] serialize(String s, User user) {
        return JSON.toJSONBytes(user);
    }
    
    @Override
    public void close() {

    }
}
public class UserDeserializer implements Deserializer<User> {
    @Override
    public void configure(Map<String, ?> map, boolean b) {

    }

    /**
     * 反序列化
     * @param s
     * @param bytes
     * @return
     */
    @Override
    public User deserialize(String s, byte[] bytes) {
        return JSON.parseObject(bytes, User.class);
    }

    @Override
    public void close() {

    }
}

producer关键代码

   static {
        Properties producerProperties = new Properties();
        producerProperties.put("bootstrap.servers", "120.77.xxx.xxx:9092");
        producerProperties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        producerProperties.put("value.serializer", "com.me.kafkastudy.customserializer.UserSerializer");
        producer = new KafkaProducer<>(producerProperties);
    }

    private static void sendMessage() {
        User user = new User(10, "小明", "123456");
        ProducerRecord<String, User> record = new ProducerRecord<>(
                "customSerializer","myname", user
        );
        producer.send(record);
        producer.close();
    }

consumer关键代码

 static {
        consumerProperties = new Properties();
        consumerProperties.put("bootstrap.servers", "120.77.xxx.xxx:9092");
        consumerProperties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        consumerProperties.put("value.deserializer", "com.me.kafkastudy.customserializer.UserDeserializer");
        consumerProperties.put("group.id", "kafka-study");
    }

    private static void generalMessageAsyncCommit() {
        consumerProperties.put("auto.commit.offset", false);
        consumer = new KafkaConsumer<>(consumerProperties);
        consumer.subscribe(Collections.singleton("customSerializer"));

        while (true) {
            boolean flag = true;
            ConsumerRecords<String, User> consumerRecords = consumer.poll(Duration.ofMillis(100));
            for (ConsumerRecord<String, User> record : consumerRecords) {
                System.out.println(String.format(
                        "topic = %s, partition = %s, key = %s, value = %s",
                        record.topic(), record.partition(), record.key(), record.value()
                ));

            }
            consumer.commitAsync();
        }
    }

运行结果
图片描述

作业2:关于消息区分,如果想象成数据库的分库分表,你会使用什么 key 去控制分区呢 ?

答: 根据不同的微服务,或者不同的业务来控制吧, 将相同服务的消息存放到相同的partition, 或者根据topic来控制也可以吧,相同topic存入相同的分区?

##########################
这一章学完我还有两个问题,请求老师解答一下
1. 自定义serializer实现了Serializer接口,其中有三个方法,configure ,serialize,和close ; 其中serialize是完成序列化的,那configure和close一般是干嘛用的呢? 自定义序列化好像不配置也没有影响
2. 为什么value为string类型的反序列化时,可以通过如下代码来判断已经监听完成了呢,我们发送的消息明明没有含有done的字符串,这是kafka为我们自动添加的吗?

   if ("done".equals(record.value())) {
       falg = false;
       break;
   }

正在回答

1回答

同学你好:

    非常感谢你的回答,答案给出的也非常好。我对你的实现和回复做出一些解释:

    作业1:Kafka Message 的序列化方式可以有很多种,试试其他的序列化方式 ?

    除了常见的 JSON 形式的序列化,Avro、Thrift、PB 等也是非常常用的序列化方法。你这里实现的也是可用的,非常好!


    作业2:关于消息区分,如果想象成数据库的分库分表,你会使用什么 key 去控制分区呢 ?    

    通常情况下,我们不需要去考虑消息分区。但是对于一些特定的场景,比如 AB Test,我们需要根据用户 id 作为分区的 key,把 A 类和 B 类用户的消息分开处理计算。


    1. 自定义serializer实现了Serializer接口,其中有三个方法,configure ,serialize,和 close ; 其中serialize是完成序列化的,那configure和close一般是干嘛用的呢? 

    configure 方法的入参 Map<String, ?> configs 是当前 Kakfa 的相关配置,你可以从中获取一些信息做一些特定的操作。但是,大多数情况下,你不需要实现这个方法。

    close 方法是说 serialize 完成之后,你可能做的一些收尾工作。通常,不需要实现。


    2. 为什么value为string类型的反序列化时,可以通过如下代码来判断已经监听完成了呢,我们发送的消息明明没有含有done的字符串,这是kafka为我们自动添加的吗?

    哈哈哈,这个是我故意设定的一个消息。当向 Kafka 发送 done 字符串的时候,退出循环。是自己定义的规则!

    

    欢迎来 QQ 群随时交流、讨论,也非常感谢同学的支持! 

    


4 回复 有任何疑惑可以回复我~
  • 提问者 mapper #1
    感谢老师的解答,,其。实第二个问题问完我就想明白了。。感觉自己问了一个很蠢的问题,哈哈哈  有点尴尬 : )
    回复 有任何疑惑可以回复我~ 2019-02-28 21:34:55
问题已解决,确定采纳
还有疑问,暂不采纳
意见反馈 帮助中心 APP下载
官方微信