Home › Forums › kdb+ › How to deserialize a Kafka topic message with kfk.q › Reply To: How to deserialize a Kafka topic message with kfk.q
-
If you review this blog post you will find it useful:
Kdb+/q Insights: Parsing JSON Files | KX
//Create a sample msg for testing
q)msg:`time`sym`price`size`side!("2024-08-25T10:50:10.743928";"ABC";117.4;67f;enlist "B")
q)msg
time | "2024-08-25T10:50:10.743928"
sym | "ABC"
price| 117.4
size | 67f
side | ,"B"
//Enlisting a dictionary beomcesa single row table
q)enlist msg
q)time sym price size side
--------------------------------------------------
"2024-08-25T10:50:10.743928" "ABC" 117.4 67 ,"B"
//The datatypes are not what we want
q)meta enlist msg
c | t f a
-----| -----
time | C
sym | C
price| f
size | f
side | C
//Use the function from the linked blog
q)generalHelper:{[t;d]![t;();0b;key[d]!{(x;y)}'[valued;keyd]]}
//Define the casting rules - adjust to what you would like
q)castRules:`time`sym`size`side!("P"$;`$;`long$;first)
//Now the msg can be handled to the datatypes we require
q)generalHelper[enlist msg;castRules]
time sym price size side
-------------------------------------------------
2024.08.25D10:50:10.743928000 ABC 117.4 67 B
q)meta generalHelper[enlist msg;castRules]
c | t f a
-----| -----
time | p
sym | s
price| f
size | j
side | c
//Create an empty table to test saving the data
q)trades:([] time:`timestamp$();sym:`$();price:`float$();size:`long$();side:`char$())
q)trades
time sym price size side
------------------------
//We can insert the rows as they arrive
q)`trades insert generalHelper[enlist msg;castRules]
,0
q)trades
time sym price size side
-------------------------------------------------
2024.08.25D10:50:10.743928000 ABC 117.4 67 B