Home › Forums › kdb+ › How to deserialize a Kafka topic message with kfk.q › Reply To: How to deserialize a Kafka topic message with kfk.q
-
Thank you very much for your response.
I have tried to adapt the helper to the model I am using, and it seems to be working…
The problem I am encountering is that I still cannot populate the hdb table with the incoming data from the Kafka topic (which should now be correctly deserialized).
Could you please take a look and let me know if you notice any errors?
Thank you very much for your patience….
schema – (sym.q)
trade:([]time:
timestamp$();sym:
symbol$();price:float$();size:
long$();side:char$())<br></pre><p><br></p><p>consumer.q</p><p><br></p><pre>\l kfk.q<p>kfk_cfg: <i>
metadata.broker.listgroup.id
fetch.wait.max.msstatistics.interval.ms</i>!<br> ("localhost:9092"; "consumer-group-1"; "10"; "1000")</p><p>client: .kfk.Consumer kfk_cfg;</p><p>topic_trades: <i>
trades;
trades:([] time:timestamp$();sym:<span style="background-color: var(--bb-content-alternate-background-color); font-size: 1rem; color: var(--bb-body-text-color);">
symbol$();price:float$();size:
long$();side:char$())</span></p><p>data_trades: ()<br><br>h: neg hopen <i>
::5010
// Json Helper
generalHelper:{[t;d]![t;();0b;key[d]!{(x;y)}'[value d;key d]]}castRules:
time
symprice
sizeside</i>!("P"$;
$;float$;
long$;first)topcb_trades: {[msg]
msgDict: .j.k msg[data</i>];<br> convertedMsg: generalHelper[enlist msgDict; castRules];<br> <br> -1 "convertedMsg Content:";<br> -1 .Q.s convertedMsg;<br> <br> if[not 98h=type convertedMsg; // check if table<br> -1 "Error:
convertedMsgis not a table.";<br> :() // Exit<br> ];<br> if[0=count convertedMsg; // check if empty<br> -1 "Error:
convertedMsgis empty.";<br> :() // Exit<br> ];<br> <br> // Check data type<br> -1 "Data type:";<br> -1 raze ("time: ", string type each convertedMsg<i>
time),
(" sym: ", string type each convertedMsgsym</i>),<br> (" price: ", string type each convertedMsg<i>
price),
(" size: ", string type each convertedMsgsize</i>),<br> (" side: ", string type each convertedMsg<i>
side);
trades </i>insert convertedMsg;<br> -1 "Insert in local table
trades--> done.";<br> <br> h(".u.upd"; <i>
trade; convertedMsg);
-1 "remote tickerplant update --> done.";
-1 .Q.s convertedMsg;
}-1 "Subscribing to topic
trades
...";
.kfk.Subscribe[client; (topic_trades); enlist .kfk.PARTITION_UA; (topcb_trades)]
pollKafkaAsync: {
while[1b;
.kfk.Poll[client; 0; 0];
if[count data_trades > 0;
-1 .Q.s last data_trades;
];
];
}pollKafkaAsync[]
rdb.q