Consume Kafka Events And Store To Mysql
Java program that consumes orders as events from Kafka and stores them in a MySQL database
Here is an example of a Java program that consumes order-related events from Kafka and stores them in a MySQL database:
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
public class OrderEventConsumer {
public static void main(String[] args) {
// Set up MySQL connection and prepared statement
String url = "jdbc:mysql://localhost:3306/mydatabase";
String user = "username";
String password = "password";
Connection conn = null;
PreparedStatement stmt = null;
// Set up Kafka consumer
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("group.id", "my-group");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("orders"));
// Consume and process events
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
// Connect to MySQL
try {
conn = DriverManager.getConnection(url, user, password);
// Insert event into database
stmt = conn.prepareStatement("INSERT INTO orders (order_id, product, quantity, price) VALUES (?, ?, ?, ?)");
stmt.setString(1, record.key());
stmt.setString(2, record.value().split(",")[0]);
stmt.setInt(3, Integer.parseInt(record.value().split(",")[1]));
stmt.setDouble(4, Double.parseDouble(record.value().split(",")[2]));
stmt.executeUpdate();
} catch (SQLException e) {
e.printStackTrace();
} finally {
// Close the connection and statement
if (stmt != null) stmt.close();
if (conn != null) conn.close();
}
}
}
}
}
Written on December 5, 2022