We have updated our Privacy Policy, click here for more information.
Thank you
Published: April 30, 2025
In today’s data-driven world, making informed decisions is crucial for business success. However, for many new data analytics users, the complexities of tying together technologies can be daunting. We believe this can be simplified with KX kdb Insights Stream Processor, an all-in-one engine to ingest, transform and stream data, easing data analytics to everyone.
In this post, I will walk you through how use the Stream Processor against a simple use case of polling data continuously (for the seasoned, a feed handler) from a restAPI endpoint and how simple it is to ingest data, transform, and serve, making it easier for your team to unlock actionable insights within a day.
*Although this is not ready for production, by chaining the Stream Processor with Reliable Transport, it will address production concerns such as redundancies out of box.
Before diving into the technical details, let’s explore the business benefits of effective data analytics:
Now, let’s look at how the KX Insight tech stack delivers these benefits with simplicity and ease base on a close to home use case in Rainy Singapore.
It is assumed that you have the following:
If you want to follow through, you can clone from my repo to have the right base yaml (Provided by KX) and spec.q or you can code it from scratch.
As a quick introduction, KX launched kdb Insights (KX Cloud Native Production Ready Platfrom) in March 2021 and Stream Processor is one of the containerized microservice. To learn more, please visit Kx for more information.
For our demo, we will utilize the base KX Stream Processor Docker image from KX designed for seamless data ingestion, transformation, and persistence. Here’s a breakdown of how it works:
KDB Insights’s Stream Processor comes packaged with Kurl, Cron, and a full-fledged ETL Pipeline framework, all within a Docker worker container.
\d .
//You can edit api to be dynamic or you can use the python api via pyKX
.meteo.apiLink:"https://api.open-meteo.com/v1/forecast?latitude=1.3236&longitude=103.9273¤t=temperature_2m,rain&timezone=Asia%2FSingapore";
.meteo.tab:flip `latitude`longitude`generationtime_ms`timezone`elevation`time`interval`temperature_2m`rain!"FFF*JPJFF"$\:();
.meteo.agg:flip `time`temperature_2m`rain!();
.meteo.keys:cols .meteo.tab;
.meteo.pull:{
//use KX kurl library to pull from http request
res:.kurl.sync (.meteo.apiLink;`GET;::);
//if status = 200, continue with subsequent actions
if[200=res[0];
publish res[1];
]
};
//in-built cron framework to mimic a real-time feed on 5 mins interval
.tm.add[`.meteo.pull;(`.meteo.pull;());300000;0];
.tm.add1shot[`stop; ({.tm.del `.meteo.pull; .qsp.teardown[]}; ()); 1D00:00:00];
The above function .meteo.pull is then put into a timer to mock data feed streaming into our pipeline on a regular interval as a json string and kicks off the data transformation as part of the pipeline.
Once the data is ingested via publish, the data flows from node to node as part of the Stream Processor framework for transformation to fit your analysis needs. This could involve cleaning, aggregating, or restructuring data. The simplicity of the Stream Processor allows you to define transformation rules concisely, using the chained operator syntax as seen below. To learn more of supported nodes, please visit: https://code.kx.com/insights/1.12/api/stream-processor/q/index.html.
//First layer of resolving the input from JSON to KDB
input:.qsp.read.fromCallback[`publish]
.qsp.decode.json[]
.qsp.map[{enlist .meteo.keys!(_[-2;x],x[`current]).meteo.keys}]
.qsp.transform.schema[.meteo.tab]
.qsp.split[]
//Second layer of writing directly to .meteo.tab
//Store to cache the latest event time and check if incoming event time is newer, if so, update the cache and store the data
directWritePipe:input
.qsp.map[{[op;md;data]
lastCacheTime: .qsp.get[op;md];
currentTime: first data`time;
$[=[lastCacheTime;0] or lastCacheTime < currentTime;
.qsp.set[op;md;currentTime];
data:select from data where time > currentTime
];
data
};.qsp.use ``state!(::;0)]
.qsp.write.toVariable[`.meteo.tab; `upsert]
//Second layer of performing window function
//The tumbling window will create a new window every hour base on time, this data will be cached in memory
//and when a new event comes in with the next hour time, the cache data will be aggregated and stored in .meteo.agg
//This cache is then subsequently discarded. Hence the data in .meteo.agg will be the max temperature and rain for every hour
windowPipe: input
.qsp.map[{select time, temperature_2m, rain from x}]
.qsp.window.tumbling[01:00:00;`time]
//round time to nearest hour
.qsp.map[{select max temperature_2m, max rain by time.date + 60 xbar time.minute from x}]
.qsp.write.toVariable[`.meteo.agg; `upsert]
//Activate pipeline
.qsp.run (directWritePipe; windowPipe);
With the nodes design of the Stream Processor, we have put together 11 small nodes to achieve our use case. For those new to the API language, .qsp.decode.json is a node and this node is directly linked to .qsp.map to perform kdb data dictionary indexing.
Fig 1. Pipeline specification of ETL of Meteo Weather Data for our use case
Through the above graph, we can see how simple it is to chain data operations so that we have usable insights instead of just raw data at the end of the pipeline. In advance pipelines, we can spin up multiple workers for parallelization. For our proposed pipeline, we will be performing 2 key actions. 1) Store the incoming data and deduplicate the data via cache, 2) Stream data over a 1-hour window and create aggregated analytics that can drive decision making.
After transformation, the processed data is cached for easy access by downstream applications. This not only enhances performance but also ensures that your analytics tools are always working with the latest data.
Here’s a simplified code snippet demonstrating how to publish the data via REST API. This is a framework within kdb Insights that simplifies data consumption for downstream clients.
//custom analytic function to determine if a brolly is needed base on the last hour rainfall data
.meteo.bringBrolly:{
rainData:last[.meteo.agg]`rain;
$[rainData > 2;bringBrolly:"Yes";bringBrolly:"No"]; //arbitary logic, you can build a more complex logic here via ML or other means
bringBrolly
};
//in-built kurl framework to allow this single processing process to also be a restAPI Server
.rest:.com_kx_rest;
.rest.init enlist[`autoBind]!enlist[1b];
.rest.register[`get;"/weather_raw";"API to pull weather update collected so far";{select from .meteo.tab};()];
.rest.register[`get;"/weather_agg";"API to pull aggregated weather update collected so far";{select from .meteo.agg};()];
.rest.register[`get;"/weather_agent";"API to determine yes or no to bring a brolly";.meteo.bringBrolly;()];
To enable insights, we created a function .meteo.bringBrolly that helps us decide if we should bring a brolly given that last aggregated stats.
We then expose this function by registering it under the REST API framework in just 1 line. This allows us to build a client application that can call our URL and get a reply.
Fig 2. Chrome as a client running the weather agent API route
Through the above graph, we can see how simple it is to chain data operations so that we have usable insights instead of just raw data at the end of the pipeline. In advance pipelines, we can spin up multiple workers for parallelization. For our proposed pipeline, we will be performing 2 key actions. 1) Store the incoming data and deduplicate the data via cache, 2) Stream data over a 1-hour window and create aggregated analytics that can drive decision making.
Implementing our tech stack is straightforward. With just a few commands, you can start polling your desired API, transforming the data, caching analytics, and exposing insights for application use. You will no longer need to build KDB architecture from scratch and ultimately, now you can focus on building business logics for swifter business decisions with increased confidence for positive impact.
For the advanced developers, to take it further, you can try creating a complex statistic workflow that calculates the odds of (rain) requiring a brolly. This can help me as a tourist goods peddler selling an assortment of goods, I can now make the Just-In-Time decision to stock more brolly instead of toys from my warehouse and maximize my profit for a rainy day.
At First Derivative, we’re committed to empowering new data analytics users with the tools they need to succeed through consulting on KX Services. The KX tech stack not only offers business practicality but also simplifies the technical implementation process, improving speed to business impact. Whether you’re a seasoned developer or just starting out, our approach allows you to focus on what really matters—extracting valuable insights from your data.
Ready to dive deeper? Check out KX documentation for more details or reach out to First Derivatives for support!
For pulling kdb Insights Image, to have a username and password, you will need to check in with KX for access to the portal.
docker login portal.dl.kx.com -u username -p password
Due to how kdb Insights containers pick up the license, you will need to encode your provided kc.lic from KX.
export KDB_LICENSE_B64=$(base64 path-to/kc.lic)
How is publish seen in .meteo.pull defined?
This is defined by the .qsp framework when we declare a callback for `publish
Reach out to First Derivative today for tailored support with KX kdb
Guan Yu Lim
Engineering Services
KX Services
First Derivative LinkedIn profile