We have updated our Privacy Policy, click here for more information.

Contact

    Thank you

    From Rain to Results: Navigating Data with kdb Insights

    Published: April 30, 2025

    In today’s data-driven world, making informed decisions is crucial for business success. However, for many new data analytics users, the complexities of tying together technologies can be daunting. We believe this can be simplified with KX kdb Insights Stream Processor, an all-in-one engine to ingest, transform and stream data, easing data analytics to everyone.

    In this post, I will walk you through how use the Stream Processor against a simple use case of polling data continuously (for the seasoned, a feed handler) from a restAPI endpoint and how simple it is to ingest data, transform, and serve, making it easier for your team to unlock actionable insights within a day.

    *Although this is not ready for production, by chaining the Stream Processor with Reliable Transport, it will address production concerns such as redundancies out of box.

    The Business Value of Data Analytics

    Before diving into the technical details, let’s explore the business benefits of effective data analytics:

    • Informed Decision-Making: Real-time access to accurate and enriched data that empowers teams to make decisions based on facts, not guesswork.
    • Cost Efficiency: By optimizing processes and identifying trends, businesses can save time and resources from technical implementation.
    • Competitive Advantage: Leveraging data analytics helps organizations stay ahead of the curve, responding swiftly to market changes.

    Now, let’s look at how the KX Insight tech stack delivers these benefits with simplicity and ease base on a close to home use case in Rainy Singapore.

    Assumptions and Considerations

    It is assumed that you have the following:

    1. Have working knowledge on how Docker works
    2. Access to pull the Stream Processor image into your Docker Deployment
    3. Because we will be using q scripts to build the pipeline, a base understanding of q syntax is required
    4. Have a working license for KDB+ Cloud generated from a valid kc.lic with base64 encoding. This license is then exported as an environment variable as KDB_LICENSE_B64

    If you want to follow through, you can clone from my repo to have the right base yaml (Provided by KX) and spec.q or you can code it from scratch.

    Our Tech Stack Overview

    As a quick introduction, KX launched kdb Insights (KX Cloud Native Production Ready Platfrom) in March 2021 and Stream Processor is one of the containerized microservice. To learn more, please visit Kx for more information.

    For our demo, we will utilize the base KX Stream Processor Docker image from KX designed for seamless data ingestion, transformation, and persistence. Here’s a breakdown of how it works:

    1. Simple Streaming Data Ingestion

    KDB Insights’s Stream Processor comes packaged with Kurl, Cron, and a full-fledged ETL Pipeline framework, all within a Docker worker container.


    \d .
    //You can edit api to be dynamic or you can use the python api via pyKX
    .meteo.apiLink:"https://api.open-meteo.com/v1/forecast?latitude=1.3236&longitude=103.9273&current=temperature_2m,rain&timezone=Asia%2FSingapore";
    .meteo.tab:flip `latitude`longitude`generationtime_ms`timezone`elevation`time`interval`temperature_2m`rain!"FFF*JPJFF"$\:();
    .meteo.agg:flip `time`temperature_2m`rain!();
    .meteo.keys:cols .meteo.tab;
    .meteo.pull:{
    //use KX kurl library to pull from http request
    res:.kurl.sync (.meteo.apiLink;`GET;::);
    //if status = 200, continue with subsequent actions
    if[200=res[0];
    publish res[1];
    ]
    };
    //in-built cron framework to mimic a real-time feed on 5 mins interval
    .tm.add[`.meteo.pull;(`.meteo.pull;());300000;0];
    .tm.add1shot[`stop; ({.tm.del `.meteo.pull; .qsp.teardown[]}; ()); 1D00:00:00];

    The above function .meteo.pull is then put into a timer to mock data feed streaming into our pipeline on a regular interval as a json string and kicks off the data transformation as part of the pipeline.

    2. Data Transformation Made Easy

    Once the data is ingested via publish, the data flows from node to node as part of the Stream Processor framework for transformation to fit your analysis needs. This could involve cleaning, aggregating, or restructuring data. The simplicity of the Stream Processor allows you to define transformation rules concisely, using the chained operator syntax as seen below. To learn more of supported nodes, please visit: https://code.kx.com/insights/1.12/api/stream-processor/q/index.html.


    //First layer of resolving the input from JSON to KDB
    input:.qsp.read.fromCallback[`publish]
    .qsp.decode.json[]
    .qsp.map[{enlist .meteo.keys!(_[-2;x],x[`current]).meteo.keys}]
    .qsp.transform.schema[.meteo.tab]
    .qsp.split[]
    //Second layer of writing directly to .meteo.tab
    //Store to cache the latest event time and check if incoming event time is newer, if so, update the cache and store the data
    directWritePipe:input
    .qsp.map[{[op;md;data]
    lastCacheTime: .qsp.get[op;md];
    currentTime: first data`time;
    $[=[lastCacheTime;0] or lastCacheTime < currentTime;
    .qsp.set[op;md;currentTime];
    data:select from data where time > currentTime
    ];
    data
    };.qsp.use ``state!(::;0)]
    .qsp.write.toVariable[`.meteo.tab; `upsert]
    //Second layer of performing window function
    //The tumbling window will create a new window every hour base on time, this data will be cached in memory
    //and when a new event comes in with the next hour time, the cache data will be aggregated and stored in .meteo.agg
    //This cache is then subsequently discarded. Hence the data in .meteo.agg will be the max temperature and rain for every hour
    windowPipe: input
    .qsp.map[{select time, temperature_2m, rain from x}]
    .qsp.window.tumbling[01:00:00;`time]
    //round time to nearest hour
    .qsp.map[{select max temperature_2m, max rain by time.date + 60 xbar time.minute from x}]
    .qsp.write.toVariable[`.meteo.agg; `upsert]
    //Activate pipeline
    .qsp.run (directWritePipe; windowPipe);

    With the nodes design of the Stream Processor, we have put together 11 small nodes to achieve our use case. For those new to the API language, .qsp.decode.json is a node and this node is directly linked to .qsp.map to perform kdb data dictionary indexing.

    Fig 1. Pipeline specification of ETL of Meteo Weather Data for our use case

    Through the above graph, we can see how simple it is to chain data operations so that we have usable insights instead of just raw data at the end of the pipeline. In advance pipelines, we can spin up multiple workers for parallelization. For our proposed pipeline, we will be performing 2 key actions. 1) Store the incoming data and deduplicate the data via cache, 2) Stream data over a 1-hour window and create aggregated analytics that can drive decision making.

    3. Persistent Caching for Downstream Consumers

    After transformation, the processed data is cached for easy access by downstream applications. This not only enhances performance but also ensures that your analytics tools are always working with the latest data.

    Example Implementation

    Here’s a simplified code snippet demonstrating how to publish the data via REST API. This is a framework within kdb Insights that simplifies data consumption for downstream clients.


    //custom analytic function to determine if a brolly is needed base on the last hour rainfall data
    .meteo.bringBrolly:{
    rainData:last[.meteo.agg]`rain;
    $[rainData > 2;bringBrolly:"Yes";bringBrolly:"No"]; //arbitary logic, you can build a more complex logic here via ML or other means
    bringBrolly
    };
    //in-built kurl framework to allow this single processing process to also be a restAPI Server
    .rest:.com_kx_rest;
    .rest.init enlist[`autoBind]!enlist[1b];
    .rest.register[`get;"/weather_raw";"API to pull weather update collected so far";{select from .meteo.tab};()];
    .rest.register[`get;"/weather_agg";"API to pull aggregated weather update collected so far";{select from .meteo.agg};()];
    .rest.register[`get;"/weather_agent";"API to determine yes or no to bring a brolly";.meteo.bringBrolly;()];

    To enable insights, we created a function .meteo.bringBrolly that helps us decide if we should bring a brolly given that last aggregated stats.

    We then expose this function by registering it under the REST API framework in just 1 line. This allows us to build a client application that can call our URL and get a reply.

    Fig 2. Chrome as a client running the weather agent API route

    Through the above graph, we can see how simple it is to chain data operations so that we have usable insights instead of just raw data at the end of the pipeline. In advance pipelines, we can spin up multiple workers for parallelization. For our proposed pipeline, we will be performing 2 key actions. 1) Store the incoming data and deduplicate the data via cache, 2) Stream data over a 1-hour window and create aggregated analytics that can drive decision making.

    4. Business Impact

    Implementing our tech stack is straightforward. With just a few commands, you can start polling your desired API, transforming the data, caching analytics, and exposing insights for application use. You will no longer need to build KDB architecture from scratch and ultimately, now you can focus on building business logics for swifter business decisions with increased confidence for positive impact.

    For the advanced developers, to take it further, you can try creating a complex statistic workflow that calculates the odds of (rain) requiring a brolly. This can help me as a tourist goods peddler selling an assortment of goods, I can now make the Just-In-Time decision to stock more brolly instead of toys from my warehouse and maximize my profit for a rainy day.

    Conclusion

    At First Derivative, we’re committed to empowering new data analytics users with the tools they need to succeed through consulting on KX Services. The KX tech stack not only offers business practicality but also simplifies the technical implementation process, improving speed to business impact. Whether you’re a seasoned developer or just starting out, our approach allows you to focus on what really matters—extracting valuable insights from your data.

    Ready to dive deeper? Check out KX documentation for more details or reach out to First Derivatives for support!

    Technical Support

    For pulling kdb Insights Image, to have a username and password, you will need to check in with KX for access to the portal.

    docker login portal.dl.kx.com -u username -p password

    Due to how kdb Insights containers pick up the license, you will need to encode your provided kc.lic from KX.

    export KDB_LICENSE_B64=$(base64 path-to/kc.lic)

    FAQ

    How is publish seen in .meteo.pull defined?

    This is defined by the .qsp framework when we declare a callback for `publish

    Reach out to First Derivative today for tailored support with KX kdb

    Contact us today

    Guan Yu Lim

    Guan Yu Lim
    Engineering Services
    KX Services

    First Derivative LinkedIn profile

    Explore

    More Insights

    Your rate of change

    Starts here