KX Community

Find answers, ask questions, and connect with our KX Community around the world.
KX Community Guidelines

Home Forums kdb+ How to deserialize a Kafka topic message with kfk.q Reply To: How to deserialize a Kafka topic message with kfk.q

  • rocuinneagain

    Member
    August 26, 2024 at 12:37 pm

    **Untested** but a basic comsumer for testing for you would look something like:

    \l ../kfk.q
    kfk_cfg:(!) . flip(
    (`metadata.broker.list;`localhost:9092);
    (`group.id;`0);
    (`fetch.wait.max.ms;`10);
    (`statistics.interval.ms;`10000)
    );
    client:.kfk.Consumer[kfk_cfg];

    topic1:`trades;

    generalHelper:{[t;d]![t;();0b;key[d]!{(x;y)}'[valued;keyd]]}
    castRules:`time`sym`size`side!("P"$;`$;`long$;first)
    trades:([] time:`timestamp$();sym:`$();price:`float$();size:`long$();side:`char$())
    topcb1:{[msg]
    `trades insert generalHelper[enlist .j.k "c"$msg[`data];castRules]
    }
    .kfk.Metadata[client];
    .kfk.Subscribe[client;enlist topic1;enlist .kfk.PARTITION_UA;enlist topcb1]