Environment Setup
Please go through my online notes to set up the followings
GitHub Repository
https://github.com/abhinotes/kafka-many-to-one-connectivity
Flow
Components
Source Connectors
- Read a message from the application over IBM MQ
- Format message into custom Kafka message format and enrich the key, source name, etc
- Post to the accounting service request Kafka topic
Note: Kafka Confluent which is a licensed version of Kafka comes out of the box source connectors for standard message formats transformations and endpoints connector like MQ, Kafka, etc
Accounting Service
- Consumes request from accounting info service Kafka topic
- Simulates accounting info service response and produce it to Accounting service response topic
Streams Processor
This is the mail module we gonna discuss in this post.
- Use of GlobalKTable – while joins are being performed request message can be on any partitions and hence lookup to be made on entire partitions
- While join is performed and a response stream is created we can manipulate streams output – in this example a local function getUpdatedResponse has been created to update source name into a response message which is a key input to the routing
- Once the output stream is created it can be posted to the destination Kafka topic, we can dynamically derive the topic name – here function topicNameExtractor has been created which reads the source name from response message and derived the destination Topic, Ex:- responseTopicAccInfo+Retail
// 1.) Create request as GlobalKTable - As request can be on any partition. Response topic as KStream - As triggers are response messages
GlobalKTable<String, String> requestStream = builder.globalTable(accountingRequestTopic);
KStream<String, String> responseStream = builder.stream(accountingResponseTopic);
// 2.) Define output stream by joining response stream with
KStream<String, String> processStream = responseStream.join(requestStream, (key, value) -> key,
(res, req) -> getUpdatedResponse(req, res));
// 3.) Destination Route derivation and Posting : Write topic derivation logic (Ex:- topicNameExtractor)
processStream.to((key, value, recordContext) -> topicNameExtractor(value));
Sink Connectors
- Read the routed response from Stream processor on legacy application-specific Kafka Topic
- Extract and Parse accounting service response to source application understandable format
- Post to application-specific IBM MQ response queue
Note: Kafka Confluent which is a licensed version of Kafka comes out of the box sink connectors for standard message formats transformations and endpoints connector like MQ, Kafka, etc
Quick tools
- Please refer to this to have some handy scripts to operate with Kafka