KX Community

Find answers, ask questions, and connect with our KX Community around the world.
KX Community Guidelines

Home Forums kdb+ How to deserialize a Kafka topic message with kfk.q Reply To: How to deserialize a Kafka topic message with kfk.q

  • rgiu70

    Member
    August 27, 2024 at 10:16 am

    Thank you very much for your response.

    I have tried to adapt the helper to the model I am using, and it seems to be working…

    The problem I am encountering is that I still cannot populate the hdb table with the incoming data from the Kafka topic (which should now be correctly deserialized).

    Could you please take a look and let me know if you notice any errors?

    Thank you very much for your patience….

    schema – (sym.q)

    trade:([]time:timestamp$();sym:symbol$();price:float$();size:long$();side:char$())<br></pre><p><br></p><p>consumer.q</p><p><br></p><pre>\l kfk.q<p>kfk_cfg: <i>metadata.broker.listgroup.idfetch.wait.max.msstatistics.interval.ms</i>!<br>         ("localhost:9092"; "consumer-group-1"; "10"; "1000")</p><p>client: .kfk.Consumer kfk_cfg;</p><p>topic_trades: <i>trades;
    trades:([] time:timestamp$();sym:<span style="background-color: var(--bb-content-alternate-background-color); font-size: 1rem; color: var(--bb-body-text-color);">symbol$();price:float$();size:long$();side:char$())</span></p><p>data_trades: ()<br><br>h: neg hopen <i>::5010

    // Json Helper
    generalHelper:{[t;d]![t;();0b;key[d]!{(x;y)}'[value d;key d]]}

    castRules: timesympricesizeside</i>!("P"$;$;float$;long$;first)

    topcb_trades: {[msg]
    msgDict: .j.k msg[data</i>];<br> convertedMsg: generalHelper[enlist msgDict; castRules];<br> <br> -1 "convertedMsg Content:";<br> -1 .Q.s convertedMsg;<br> <br> if[not 98h=type convertedMsg; // check if table<br> -1 "Error:convertedMsgis not a table.";<br> :() // Exit<br> ];<br> if[0=count convertedMsg; // check if empty<br> -1 "Error:convertedMsgis empty.";<br> :() // Exit<br> ];<br> <br> // Check data type<br> -1 "Data type:";<br> -1 raze ("time: ", string type each convertedMsg<i>time),
    (" sym: ", string type each convertedMsgsym</i>),<br> (" price: ", string type each convertedMsg<i>price),
    (" size: ", string type each convertedMsgsize</i>),<br> (" side: ", string type each convertedMsg<i>side);

    trades </i>insert convertedMsg;<br> -1 "Insert in local tabletrades--> done.";<br> <br> h(".u.upd"; <i>trade; convertedMsg);
    -1 "remote tickerplant update --> done.";

    -1 .Q.s convertedMsg;
    }

    -1 "Subscribing to topic trades...";
    .kfk.Subscribe[client; (topic_trades); enlist .kfk.PARTITION_UA; (topcb_trades)]


    pollKafkaAsync: {
    while[1b;
    .kfk.Poll[client; 0; 0];

    if[count data_trades > 0;
    -1 .Q.s last data_trades;
    ];
    ];
    }

    pollKafkaAsync[]

    rdb.q