Home › Forums › kdb+ › How to deserialize a Kafka topic message with kfk.q › Reply To: How to deserialize a Kafka topic message with kfk.q
-
**Untested** but a basic comsumer for testing for you would look something like:
\l ../kfk.q
kfk_cfg:(!) . flip(
(`metadata.broker.list;`localhost:9092);
(`group.id;`0);
(`fetch.wait.max.ms;`10);
(`statistics.interval.ms;`10000)
);
client:.kfk.Consumer[kfk_cfg];
topic1:`trades;
generalHelper:{[t;d]![t;();0b;key[d]!{(x;y)}'[valued;keyd]]}
castRules:`time`sym`size`side!("P"$;`$;`long$;first)
trades:([] time:`timestamp$();sym:`$();price:`float$();size:`long$();side:`char$())
topcb1:{[msg]
`trades insert generalHelper[enlist .j.k "c"$msg[`data];castRules]
}
.kfk.Metadata[client];
.kfk.Subscribe[client;enlist topic1;enlist .kfk.PARTITION_UA;enlist topcb1]