有序列化就會(huì)有反序列化,反序列化的操作是在Kafka Consumer中完成的,使用起來(lái)只需要配置一下key.deserializer和value.deseriaizer。對(duì)應(yīng)上面自定義的Company類(lèi)型的Deserializer就需要實(shí)現(xiàn)org.apache.kafka.common.serialization.Deserializer接口,這個(gè)接口同樣有三個(gè)方法:
創(chuàng)新互聯(lián)是一家集網(wǎng)站建設(shè),武陵企業(yè)網(wǎng)站建設(shè),武陵品牌網(wǎng)站建設(shè),網(wǎng)站定制,武陵網(wǎng)站建設(shè)報(bào)價(jià),網(wǎng)絡(luò)營(yíng)銷(xiāo),網(wǎng)絡(luò)優(yōu)化,武陵網(wǎng)站推廣為一體的創(chuàng)新建站企業(yè),幫助傳統(tǒng)企業(yè)提升企業(yè)形象加強(qiáng)企業(yè)競(jìng)爭(zhēng)力??沙浞譂M足這一群體相比中小企業(yè)更為豐富、高端、多元的互聯(lián)網(wǎng)需求。同時(shí)我們時(shí)刻保持專業(yè)、時(shí)尚、前沿,時(shí)刻以成就客戶成長(zhǎng)自我,堅(jiān)持不斷學(xué)習(xí)、思考、沉淀、凈化自己,讓我們?yōu)楦嗟钠髽I(yè)打造出實(shí)用型網(wǎng)站。public void configure(Map<String, ?> configs, boolean isKey):用來(lái)配置當(dāng)前類(lèi)。
public byte[] serialize(String topic, T data):用來(lái)執(zhí)行反序列化。如果data為null建議處理的時(shí)候直接返回null而不是拋出一個(gè)異常。
public void close():用來(lái)關(guān)閉當(dāng)前序列化器。
下面就來(lái)看一下DemoSerializer對(duì)應(yīng)的反序列化的DemoDeserializer,詳細(xì)代碼如下:
public class DemoDeserializer implements Deserializer<Company> {
public void configure(Map<String, ?> configs, boolean isKey) {}
public Company deserialize(String topic, byte[] data) {
if (data == null) {
return null;
}
if (data.length < 8) {
throw new SerializationException("Size of data received by DemoDeserializer is shorter than expected!");
}
ByteBuffer buffer = ByteBuffer.wrap(data);
int nameLen, addressLen;
String name, address;
nameLen = buffer.getInt();
byte[] nameBytes = new byte[nameLen];
buffer.get(nameBytes);
addressLen = buffer.getInt();
byte[] addressBytes = new byte[addressLen];
buffer.get(addressLen);
try {
name = new String(nameBytes, "UTF-8");
address = new String(addressBytes, "UTF-8");
} catch (UnsupportedEncodingException e) {
throw new SerializationException("Error occur when deserializing!");
}
return new Company(name,address);
}
public void close() {}
}
有些讀者可能對(duì)新版的Consumer不是很熟悉,這里順帶著舉一個(gè)完整的消費(fèi)示例,并以DemoDeserializer作為消息Value的反序列化器。
Properties properties = new Properties();
properties.put("bootstrap.servers", brokerList);
properties.put("group.id", consumerGroup);
properties.put("session.timeout.ms", 10000);
properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.put("value.deserializer", "com.hidden.client.DemoDeserializer");
properties.put("client.id", "hidden-consumer-client-id-zzh-2");
KafkaConsumer<String, Company> consumer = new KafkaConsumer<String, Company>(properties);
consumer.subscribe(Arrays.asList(topic));
try {
while (true) {
ConsumerRecords<String, Company> records = consumer.poll(100);
for (ConsumerRecord<String, Company> record : records) {
String info = String.format("topic=%s, partition=%s, offset=%d, consumer=%s, country=%s",
record.topic(), record.partition(), record.offset(), record.key(), record.value());
System.out.println(info);
}
consumer.commitAsync(new OffsetCommitCallback() {
public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
if (exception != null) {
String error = String.format("Commit failed for offsets {}", offsets, exception);
System.out.println(error);
}
}
});
}
} finally {
consumer.close();
}
有些時(shí)候自定義的類(lèi)型還可以和Avro、ProtoBuf等聯(lián)合使用,而且這樣更加的方便快捷,比如我們將前面Company的Serializer和Deserializer用Protostuff包裝一下,由于篇幅限制,筆者這里只羅列出對(duì)應(yīng)的serialize和deserialize方法,詳細(xì)參考如下:
public byte[] serialize(String topic, Company data) {
if (data == null) {
return null;
}
Schema schema = (Schema) RuntimeSchema.getSchema(data.getClass());
LinkedBuffer buffer = LinkedBuffer.allocate(LinkedBuffer.DEFAULT_BUFFER_SIZE);
byte[] protostuff = null;
try {
protostuff = ProtostuffIOUtil.toByteArray(data, schema, buffer);
} catch (Exception e) {
throw new IllegalStateException(e.getMessage(), e);
} finally {
buffer.clear();
}
return protostuff;
}
public Company deserialize(String topic, byte[] data) {
if (data == null) {
return null;
}
Schema schema = RuntimeSchema.getSchema(Company.class);
Company ans = new Company();
ProtostuffIOUtil.mergeFrom(data, ans, schema);
return ans;
}
如果Company的字段很多,我們使用Protostuff進(jìn)一步封裝一下的方式就顯得簡(jiǎn)潔很多。不過(guò)這個(gè)不是最主要的,而最主要的是經(jīng)過(guò)Protostuff包裝之后,這個(gè)Serializer和Deserializer可以向前兼容(新加字段采用默認(rèn)值)和向后兼容(忽略新加字段),這個(gè)特性Avro和Protobuf也都具備。
自定義的類(lèi)型有一個(gè)不得不面對(duì)的問(wèn)題就是Kafka Producer和Kafka Consumer之間的序列化和反序列化的兼容性,試想對(duì)于StringSerializer來(lái)說(shuō),Kafka Consumer可以順其自然的采用StringDeserializer,不過(guò)對(duì)于Company這種專用類(lèi)型,某個(gè)服務(wù)使用DemoSerializer進(jìn)行了序列化之后,那么下游的消費(fèi)者服務(wù)必須也要實(shí)現(xiàn)對(duì)應(yīng)的DemoDeserializer。再者,如果上游的Company類(lèi)型改變,下游也需要跟著重新實(shí)現(xiàn)一個(gè)新的DemoSerializer,這個(gè)后面所面臨的難題可想而知。所以,如無(wú)特殊需要,筆者不建議使用自定義的序列化和反序列化器;如有業(yè)務(wù)需要,也要使用通用的Avro、Protobuf、Protostuff等序列化工具包裝,盡可能的實(shí)現(xiàn)得更加通用且向前后兼容。
題外話,對(duì)于Kafka的“深耕者”Confluent來(lái)說(shuō),還有其自身的一套序列化和反序列化解決方案(io.confluent.kafka.serializer.KafkaAvroSerializer),GitHub上有相關(guān)資料,讀者如有興趣可以自行擴(kuò)展學(xué)習(xí)。
本文的重點(diǎn)是你有沒(méi)有收獲與成長(zhǎng),其余的都不重要,希望讀者們能謹(jǐn)記這一點(diǎn)。同時(shí)我經(jīng)過(guò)多年的收藏目前也算收集到了一套完整的學(xué)習(xí)資料,包括但不限于:分布式架構(gòu)、高可擴(kuò)展、高性能、高并發(fā)、Jvm性能調(diào)優(yōu)、Spring,MyBatis,Nginx源碼分析,Redis,ActiveMQ、、Mycat、Netty、Kafka、Mysql、Zookeeper、Tomcat、Docker、Dubbo、Nginx等多個(gè)知識(shí)點(diǎn)高級(jí)進(jìn)階干貨,希望對(duì)想成為架構(gòu)師的朋友有一定的參考和幫助
另外有需要云服務(wù)器可以了解下創(chuàng)新互聯(lián)scvps.cn,海內(nèi)外云服務(wù)器15元起步,三天無(wú)理由+7*72小時(shí)售后在線,公司持有idc許可證,提供“云服務(wù)器、裸金屬服務(wù)器、高防服務(wù)器、香港服務(wù)器、美國(guó)服務(wù)器、虛擬主機(jī)、免備案服務(wù)器”等云主機(jī)租用服務(wù)以及企業(yè)上云的綜合解決方案,具有“安全穩(wěn)定、簡(jiǎn)單易用、服務(wù)可用性高、性價(jià)比高”等特點(diǎn)與優(yōu)勢(shì),專為企業(yè)上云打造定制,能夠滿足用戶豐富、多元化的應(yīng)用場(chǎng)景需求。
新聞標(biāo)題:Kafka消息序列化和反序列化(下)-創(chuàng)新互聯(lián)
文章出自:http://m.newbst.com/article10/cejcdo.html
成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供響應(yīng)式網(wǎng)站、企業(yè)網(wǎng)站制作、定制開(kāi)發(fā)、軟件開(kāi)發(fā)、靜態(tài)網(wǎng)站、關(guān)鍵詞優(yōu)化
聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請(qǐng)盡快告知,我們將會(huì)在第一時(shí)間刪除。文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如需處理請(qǐng)聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時(shí)需注明來(lái)源: 創(chuàng)新互聯(lián)
猜你還喜歡下面的內(nèi)容