如果你想创建一个状态存储,你需要提供一个串行器 and 解串器class 为您要使用的类型。在 Kafka Stream 中,有一个称为Serde将序列化器和反序列化器包装在一个类中。
如果你使用.withValues(Class<K> keyClass)
它必须坚持这一点
@param keyClass 键的类,它必须是 Kafka 内置 serdes 的类型之一
因为没有内置Serdes
for HashMap
你需要先实现一个(可能称为HashMapSerde
) 并将此类赋予该方法.withValues(Serde<K> keySerde)
。此外,您必须实现实际的序列化器和反序列化器HashMap
, 也。如果您知道 HashMap 的通用类型,则应该指定它们(这使得序列化器和反序列化器的实现更加简单。
像这样的东西(只是一个草图;省略了泛型类型):
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.Deserializer;
public class HashMapSerde implements Serde<HashMap> {
void configure(Map<String, ?> configs, boolean isKey) {
/* put your code here */
}
void close() {
/* put your code here */
}
Serializer<HashMap> serializer() {
return new Serializer<HashMap>() {
public void configure(Map<String, ?> configs, boolean isKey) {
/* put your code here */
}
public byte[] serialize(String topic, T data) {
/* put your code here */
}
public void close() {
/* put your code here */
}
};
}
Deserializer<HashMap> deserializer() {
return new Deserializer<HashMap>() {
public void configure(Map<String, ?> configs, boolean isKey) {
/* put your code here */
}
public T deserialize(String topic, byte[] data) {
/* put your code here */
}
public void close() {
/* put your code here */
}
};
}
}
如果您想查看如何实现(反)序列化器和Serde
, 看看https://github.com/apache/kafka/tree/trunk/clients/src/main/java/org/apache/kafka/common/serialization https://github.com/apache/kafka/tree/trunk/clients/src/main/java/org/apache/kafka/common/serialization and https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/serialization/Serdes.java https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/serialization/Serdes.java