作业1:Kafka Message 的序列化方式可以有很多种,试试其他的序列化方式 ?
自定义一个序列化方式吧,参考StringSerializer和StringDeserializer:
实体类
@Data
@AllArgsConstructor
@NoArgsConstructor
public class User implements Serializable {
private Integer id;
private String username;
private String password;
}
序列化与反序列化
public class UserSerializer implements Serializer<User> {
@Override
public void configure(Map<String, ?> map, boolean b) {
}
/**
* 序列化
* @param s
* @param user
* @return
*/
@Override
public byte[] serialize(String s, User user) {
return JSON.toJSONBytes(user);
}
@Override
public void close() {
}
}
public class UserDeserializer implements Deserializer<User> {
@Override
public void configure(Map<String, ?> map, boolean b) {
}
/**
* 反序列化
* @param s
* @param bytes
* @return
*/
@Override
public User deserialize(String s, byte[] bytes) {
return JSON.parseObject(bytes, User.class);
}
@Override
public void close() {
}
}
producer关键代码
static {
Properties producerProperties = new Properties();
producerProperties.put("bootstrap.servers", "120.77.xxx.xxx:9092");
producerProperties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
producerProperties.put("value.serializer", "com.me.kafkastudy.customserializer.UserSerializer");
producer = new KafkaProducer<>(producerProperties);
}
private static void sendMessage() {
User user = new User(10, "小明", "123456");
ProducerRecord<String, User> record = new ProducerRecord<>(
"customSerializer","myname", user
);
producer.send(record);
producer.close();
}
consumer关键代码
static {
consumerProperties = new Properties();
consumerProperties.put("bootstrap.servers", "120.77.xxx.xxx:9092");
consumerProperties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
consumerProperties.put("value.deserializer", "com.me.kafkastudy.customserializer.UserDeserializer");
consumerProperties.put("group.id", "kafka-study");
}
private static void generalMessageAsyncCommit() {
consumerProperties.put("auto.commit.offset", false);
consumer = new KafkaConsumer<>(consumerProperties);
consumer.subscribe(Collections.singleton("customSerializer"));
while (true) {
boolean flag = true;
ConsumerRecords<String, User> consumerRecords = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, User> record : consumerRecords) {
System.out.println(String.format(
"topic = %s, partition = %s, key = %s, value = %s",
record.topic(), record.partition(), record.key(), record.value()
));
}
consumer.commitAsync();
}
}
运行结果
作业2:关于消息区分,如果想象成数据库的分库分表,你会使用什么 key 去控制分区呢 ?
答: 根据不同的微服务,或者不同的业务来控制吧, 将相同服务的消息存放到相同的partition, 或者根据topic来控制也可以吧,相同topic存入相同的分区?
##########################
这一章学完我还有两个问题,请求老师解答一下
1. 自定义serializer实现了Serializer接口,其中有三个方法,configure ,serialize,和close ; 其中serialize是完成序列化的,那configure和close一般是干嘛用的呢? 自定义序列化好像不配置也没有影响
2. 为什么value为string类型的反序列化时,可以通过如下代码来判断已经监听完成了呢,我们发送的消息明明没有含有done的字符串,这是kafka为我们自动添加的吗?
if ("done".equals(record.value())) {
falg = false;
break;
}