-
How to deserialize a Kafka topic message with kfk.q
Hello everyone,
I have configured the kfk.q library to interact with Kafka topics in q. I created a simple producer with Python that generates market data in a format like this:
C:\Users\Administrator\cod\kdb+\scripts\KdbArchitecture>docker exec -it broker0 /bin/bash
[appuser@broker0 ~]$ kafka-console-consumer –bootstrap-server localhost:29092 –topic trades –from-beginning
{“time”: “2024-08-25T09:56:43.291893”, “sym”: “ABC”, “price”: 113.16, “size”: 18, “side”: “A”}
{“time”: “2024-08-25T09:56:50.443880”, “sym”: “ABC”, “price”: 116.89, “size”: 51, “side”: “B”}
{“time”: “2024-08-25T09:57:09.795277”, “sym”: “ABC”, “price”: 154.67, “size”: 47, “side”: “B”}
{“time”: “2024-08-25T09:57:45.319374”, “sym”: “ABC”, “price”: 154.37, “size”: 94, “side”: “A”}
{“time”: “2024-08-25T09:58:04.654672”, “sym”: “ABC”, “price”: 191.05, “size”: 69, “side”: “B”}
The issue I’m facing is that I can’t properly deserialize the message in JSON format.
What I would like to achieve is to deserialize the message and, for example, create a q
table to interact with the data.Alternatively, I would like to deserialize the message to feed the data into the feed
(feed.q) within the kdb+ tickerplant architecture.Any advice on how to do this?
Here’s an example of “consumer.q” that reads the message, prints it, but does not deserialize it correctly to perform the tasks mentioned above.
\l kfk.q
kfk_cfg:
metadata.broker.list
group.idfetch.wait.max.ms
statistics.interval.ms! ("localhost:9092"; "consumer-group-1"; "10"; "1000")
client: .kfk.Consumer kfk_cfg;topic_trades:
trades</i>;<br>data_trades: ();</p><p>topcb_trades: {[msg]<br> msgDict: .j.k msg[<i>
data];
data_trades,:: enlist msgDict;
-1 .Q.s msgDict;
}-1 "Subscribing to topics called
trades
...";
.kfk.Subscribe[client; (`trades); enlist .kfk.PARTITION_UA; (topcb_trades)]// polling
pollKafkaAsync: {
while[1b;
.kfk.Poll[client; 0; 0];
if[count data_trades > 0;
-1 .Q.s last data_trades;
];
];
}pollKafkaAsync[]
time | “2024-08-25T10:50:10.743928”
sym | “ABC”
price| 117.4
size | 67f
side | ,”B”
time | “2024-08-25T10:50:10.743928”
sym | “ABC”
price| 117.4
size | 67f
side | ,”B”
time | “2024-08-25T10:50:10.743928”
sym | “ABC”
price| 117.4
size | 67f
side | ,”B”
time | “2024-08-25T10:50:10.743928”
sym | “ABC”
price| 117.4
size | 67f
side | ,”B”
time | “2024-08-25T10:50:10.743928”
sym | “ABC”
price| 117.4
size | 67f
side | ,”B”
thank you.
Log in to reply.