KX Community

Find answers, ask questions, and connect with our KX Community around the world.
KX Community Guidelines

Home Forums kdb+ How to deserialize a Kafka topic message with kfk.q

  • How to deserialize a Kafka topic message with kfk.q

    Posted by rgiu70 on August 25, 2024 at 10:51 am

    Hello everyone,

    I have configured the kfk.q library to interact with Kafka topics in q. I created a simple producer with Python that generates market data in a format like this:

    C:\Users\Administrator\cod\kdb+\scripts\KdbArchitecture>docker exec -it broker0 /bin/bash

    [appuser@broker0 ~]$ kafka-console-consumer –bootstrap-server localhost:29092 –topic trades –from-beginning

    {“time”: “2024-08-25T09:56:43.291893”, “sym”: “ABC”, “price”: 113.16, “size”: 18, “side”: “A”}

    {“time”: “2024-08-25T09:56:50.443880”, “sym”: “ABC”, “price”: 116.89, “size”: 51, “side”: “B”}

    {“time”: “2024-08-25T09:57:09.795277”, “sym”: “ABC”, “price”: 154.67, “size”: 47, “side”: “B”}

    {“time”: “2024-08-25T09:57:45.319374”, “sym”: “ABC”, “price”: 154.37, “size”: 94, “side”: “A”}

    {“time”: “2024-08-25T09:58:04.654672”, “sym”: “ABC”, “price”: 191.05, “size”: 69, “side”: “B”}

    The issue I’m facing is that I can’t properly deserialize the message in JSON format.

    What I would like to achieve is to deserialize the message and, for example, create a q
    table to interact with the data.

    Alternatively, I would like to deserialize the message to feed the data into the feed
    (feed.q) within the kdb+ tickerplant architecture.

    Any advice on how to do this?

    Here’s an example of “consumer.q” that reads the message, prints it, but does not deserialize it correctly to perform the tasks mentioned above.

    \l kfk.q

    kfk_cfg: metadata.broker.listgroup.idfetch.wait.max.msstatistics.interval.ms! ("localhost:9092"; "consumer-group-1"; "10"; "1000")
    client: .kfk.Consumer kfk_cfg;

    topic_trades: trades</i>;<br>data_trades: ();</p><p>topcb_trades: {[msg]<br> msgDict: .j.k msg[<i>data];
    data_trades,:: enlist msgDict;
    -1 .Q.s msgDict;
    }

    -1 "Subscribing to topics called trades...";
    .kfk.Subscribe[client; (`trades); enlist .kfk.PARTITION_UA; (topcb_trades)]

    // polling
    pollKafkaAsync: {
    while[1b;
    .kfk.Poll[client; 0; 0];

    if[count data_trades > 0;

    -1 .Q.s last data_trades;
    ];
    ];
    }

    pollKafkaAsync[]

    time | “2024-08-25T10:50:10.743928”

    sym | “ABC”

    price| 117.4

    size | 67f

    side | ,”B”

    time | “2024-08-25T10:50:10.743928”

    sym | “ABC”

    price| 117.4

    size | 67f

    side | ,”B”

    time | “2024-08-25T10:50:10.743928”

    sym | “ABC”

    price| 117.4

    size | 67f

    side | ,”B”

    time | “2024-08-25T10:50:10.743928”

    sym | “ABC”

    price| 117.4

    size | 67f

    side | ,”B”

    time | “2024-08-25T10:50:10.743928”

    sym | “ABC”

    price| 117.4

    size | 67f

    side | ,”B”

    thank you.

    rocuinneagain replied 4 months, 3 weeks ago 2 Members · 6 Replies
  • 6 Replies
  • rgiu70

    Member
    August 25, 2024 at 10:54 am
  • rocuinneagain

    Member
    August 26, 2024 at 12:34 pm

    If you review this blog post you will find it useful:

    Kdb+/q Insights: Parsing JSON Files | KX

    //Create a sample msg for testing
    q)msg:`time`sym`price`size`side!("2024-08-25T10:50:10.743928";"ABC";117.4;67f;enlist "B")
    q)msg
    time | "2024-08-25T10:50:10.743928"
    sym | "ABC"
    price| 117.4
    size | 67f
    side | ,"B"
    //Enlisting a dictionary beomcesa single row table
    q)enlist msg
    q)time sym price size side
    --------------------------------------------------
    "2024-08-25T10:50:10.743928" "ABC" 117.4 67 ,"B"
    //The datatypes are not what we want
    q)meta enlist msg
    c | t f a
    -----| -----
    time | C
    sym | C
    price| f
    size | f
    side | C
    //Use the function from the linked blog
    q)generalHelper:{[t;d]![t;();0b;key[d]!{(x;y)}'[valued;keyd]]}
    //Define the casting rules - adjust to what you would like
    q)castRules:`time`sym`size`side!("P"$;`$;`long$;first)
    //Now the msg can be handled to the datatypes we require
    q)generalHelper[enlist msg;castRules]
    time sym price size side
    -------------------------------------------------
    2024.08.25D10:50:10.743928000 ABC 117.4 67 B
    q)meta generalHelper[enlist msg;castRules]
    c | t f a
    -----| -----
    time | p
    sym | s
    price| f
    size | j
    side | c
    //Create an empty table to test saving the data
    q)trades:([] time:`timestamp$();sym:`$();price:`float$();size:`long$();side:`char$())
    q)trades
    time sym price size side
    ------------------------
    //We can insert the rows as they arrive
    q)`trades insert generalHelper[enlist msg;castRules]
    ,0
    q)trades
    time sym price size side
    -------------------------------------------------
    2024.08.25D10:50:10.743928000 ABC 117.4 67 B

  • rocuinneagain

    Member
    August 26, 2024 at 12:37 pm

    **Untested** but a basic comsumer for testing for you would look something like:

    \l ../kfk.q
    kfk_cfg:(!) . flip(
    (`metadata.broker.list;`localhost:9092);
    (`group.id;`0);
    (`fetch.wait.max.ms;`10);
    (`statistics.interval.ms;`10000)
    );
    client:.kfk.Consumer[kfk_cfg];

    topic1:`trades;

    generalHelper:{[t;d]![t;();0b;key[d]!{(x;y)}'[valued;keyd]]}
    castRules:`time`sym`size`side!("P"$;`$;`long$;first)
    trades:([] time:`timestamp$();sym:`$();price:`float$();size:`long$();side:`char$())
    topcb1:{[msg]
    `trades insert generalHelper[enlist .j.k "c"$msg[`data];castRules]
    }
    .kfk.Metadata[client];
    .kfk.Subscribe[client;enlist topic1;enlist .kfk.PARTITION_UA;enlist topcb1]
  • rgiu70

    Member
    August 27, 2024 at 10:16 am

    Thank you very much for your response.

    I have tried to adapt the helper to the model I am using, and it seems to be working…

    The problem I am encountering is that I still cannot populate the hdb table with the incoming data from the Kafka topic (which should now be correctly deserialized).

    Could you please take a look and let me know if you notice any errors?

    Thank you very much for your patience….

    schema – (sym.q)

    trade:([]time:timestamp$();sym:symbol$();price:float$();size:long$();side:char$())<br></pre><p><br></p><p>consumer.q</p><p><br></p><pre>\l kfk.q<p>kfk_cfg: <i>metadata.broker.listgroup.idfetch.wait.max.msstatistics.interval.ms</i>!<br>         ("localhost:9092"; "consumer-group-1"; "10"; "1000")</p><p>client: .kfk.Consumer kfk_cfg;</p><p>topic_trades: <i>trades;
    trades:([] time:timestamp$();sym:<span style="background-color: var(--bb-content-alternate-background-color); font-size: 1rem; color: var(--bb-body-text-color);">symbol$();price:float$();size:long$();side:char$())</span></p><p>data_trades: ()<br><br>h: neg hopen <i>::5010

    // Json Helper
    generalHelper:{[t;d]![t;();0b;key[d]!{(x;y)}'[value d;key d]]}

    castRules: timesympricesizeside</i>!("P"$;$;float$;long$;first)

    topcb_trades: {[msg]
    msgDict: .j.k msg[data</i>];<br> convertedMsg: generalHelper[enlist msgDict; castRules];<br> <br> -1 "convertedMsg Content:";<br> -1 .Q.s convertedMsg;<br> <br> if[not 98h=type convertedMsg; // check if table<br> -1 "Error:convertedMsgis not a table.";<br> :() // Exit<br> ];<br> if[0=count convertedMsg; // check if empty<br> -1 "Error:convertedMsgis empty.";<br> :() // Exit<br> ];<br> <br> // Check data type<br> -1 "Data type:";<br> -1 raze ("time: ", string type each convertedMsg<i>time),
    (" sym: ", string type each convertedMsgsym</i>),<br> (" price: ", string type each convertedMsg<i>price),
    (" size: ", string type each convertedMsgsize</i>),<br> (" side: ", string type each convertedMsg<i>side);

    trades </i>insert convertedMsg;<br> -1 "Insert in local tabletrades--> done.";<br> <br> h(".u.upd"; <i>trade; convertedMsg);
    -1 "remote tickerplant update --> done.";

    -1 .Q.s convertedMsg;
    }

    -1 "Subscribing to topic trades...";
    .kfk.Subscribe[client; (topic_trades); enlist .kfk.PARTITION_UA; (topcb_trades)]


    pollKafkaAsync: {
    while[1b;
    .kfk.Poll[client; 0; 0];

    if[count data_trades > 0;
    -1 .Q.s last data_trades;
    ];
    ];
    }

    pollKafkaAsync[]

    rdb.q

  • rgiu70

    Member
    August 27, 2024 at 10:21 am
    • rocuinneagain

      Member
      August 27, 2024 at 10:46 am

      You do not need pollKafkaAsync.

      Otherwise things look fine.

Log in to reply.