Kafka streams to build many to one connectivity – Part 2/2

If not seen already, I have explained the use-case in this post

Kafka streams to build many to one connectivity – Part 1/2

Environment Setup

Please go through my online notes to set up the followings

  1. Basic Kafka development server setup
  2. Basic IBM MQ development server setup in docker

GitHub Repository




Source Connectors
  1. Read a message from the application over IBM MQ
  2. Format message into custom Kafka message format and enrich the key, source name, etc
  3. Post to the accounting service request Kafka topic

Note: Kafka Confluent which is a licensed version of Kafka comes out of the box source connectors for standard message formats transformations and endpoints connector like MQ, Kafka, etc

Accounting Service 
  1. Consumes request from accounting info service Kafka topic
  2. Simulates accounting info service response and produce it to Accounting service response topic
Streams Processor 

This is the mail module we gonna discuss in this post.

  • Use of GlobalKTable – while joins are being performed request message can be on any partitions and hence lookup to be made on entire partitions
  • While join is performed and a response stream is created we can manipulate streams output – in this example a local function getUpdatedResponse has been created to update source name into a response message which is a key input to the routing
  • Once the output stream is created it can be posted to the destination Kafka topic, we can dynamically derive the topic name – here function topicNameExtractor has been created which reads the source name from response message and derived the destination Topic, Ex:- responseTopicAccInfo+Retail
// 1.)  Create request as GlobalKTable - As request can be on any partition. Response topic as KStream - As triggers are response messages

GlobalKTable<String, String> requestStream = builder.globalTable(accountingRequestTopic);
KStream<String, String> responseStream = builder.stream(accountingResponseTopic);

// 2.) Define output stream by joining response stream with

KStream<String, String> processStream = responseStream.join(requestStream, (key, value) -> key,
        (res, req) -> getUpdatedResponse(req, res));

// 3.) Destination Route derivation and Posting :  Write topic derivation logic (Ex:- topicNameExtractor)

processStream.to((key, value, recordContext) -> topicNameExtractor(value));
Sink Connectors
  1. Read the routed response from Stream processor on legacy application-specific Kafka Topic
  2. Extract and Parse accounting service response to source application understandable format
  3. Post to application-specific IBM MQ response queue

Note: Kafka Confluent which is a licensed version of Kafka comes out of the box sink connectors for standard message formats transformations and endpoints connector like MQ, Kafka, etc

Quick tools

  • Please refer to this to have some handy scripts to operate with Kafka

