Reakt Kafka Example
Reakt-Kakfa adapts Kafka to Reakt promises and streams.
Reakt has promise libraries for Vert.x, Netty, Guava, and Cassandra.
Using Promises with Kafka Producers
final AsyncProducer<Long, String> producer = new AsyncProducer<>(createProducer());
...
producer.send(TOPIC, key, value)
.catchError(throwable -> {
System.err.println("Trouble sending record " + throwable.getLocalizedMessage());
throwable.printStackTrace(System.err);
})
.then(recordMetadata -> {
System.out.printf("%d %d %s \n", recordMetadata.offset(),
recordMetadata.partition(), recordMetadata.topic());
}).invoke();
Using Streams with Kafka Consumers
final StreamConsumer<Long, String> stream = StreamConsumer.subscribe(createConsumer(), TOPIC, result -> {
result.then(consumerRecords -> {
System.out.println("Got message " + consumerRecords.count());
consumerRecords.forEach(record -> {
countDownLatch.countDown();
});
result.request(1); //calls commitAsync
}).catchError(throwable -> {
System.err.println("Trouble Getting record " + throwable.getLocalizedMessage());
throwable.printStackTrace(System.err);
result.cancel();
});
});
stream.close();
Full integration test and example for Reakt Kafka
package io.advantageous.reakt.kafka;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.LongDeserializer;
import org.apache.kafka.common.serialization.LongSerializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.junit.Test;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import static org.junit.Assert.assertEquals;
public class IntegrationTest {
private final static String TOPIC = "my-test-topic";
private final static String BOOTSTRAP_SERVERS = "localhost:9092";
private final static int SEND_RECORD_COUNT = 10_000;
@Test
public void test() throws Exception {
final KafkaServer kafkaServer = new KafkaServer();
System.out.println("Starting server");
Thread.sleep(10_000);
final ExecutorService executorService = Executors.newSingleThreadExecutor();
final AsyncProducer<Long, String> producer = new AsyncProducer<>(createProducer());
executorService.execute(() -> {
for (int i = 0; i < SEND_RECORD_COUNT; i++) {
if (i % 1000 == 0) System.out.println("Sending message " + i);
producer.send(TOPIC, 1L * i, "value " + i)
.catchError(throwable -> {
System.err.println("Trouble sending record " + throwable.getLocalizedMessage());
throwable.printStackTrace(System.err);
})
.then(recordMetadata -> {
if (recordMetadata.offset() % 1000 ==0)
System.out.printf("%d %d %s \n", recordMetadata.offset(),
recordMetadata.partition(), recordMetadata.topic());
})
.invoke();
}
producer.flush();
});
final CountDownLatch countDownLatch = new CountDownLatch(SEND_RECORD_COUNT);
final StreamConsumer<Long, String> stream = StreamConsumer.subscribe(createConsumer(), TOPIC, result -> {
result.then(consumerRecords -> {
System.out.println("Got message " + consumerRecords.count());
consumerRecords.forEach(record -> {
countDownLatch.countDown();
});
result.request(1);
}).catchError(throwable -> {
System.err.println("Trouble Getting record " + throwable.getLocalizedMessage());
throwable.printStackTrace(System.err);
result.cancel();
});
});
Thread.sleep(3_000);
countDownLatch.await(10, TimeUnit.SECONDS);
assertEquals(0, countDownLatch.getCount());
stream.close();
producer.close();
executorService.shutdown();
Thread.sleep(3_000);
kafkaServer.shutdown();
Thread.sleep(3_000);
}
private static Producer<Long, String> createProducer() {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
props.put(ProducerConfig.CLIENT_ID_CONFIG, "KafkaExampleProducer");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
return new KafkaProducer<>(props);
}
private static Consumer<Long, String> createConsumer() {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
props.put(ConsumerConfig.GROUP_ID_CONFIG, "KafkaExampleConsumer");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
LongDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class.getName());
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 1000);
return new KafkaConsumer<>(props);
}
}
Notice that we use an embedded version of Kafka.
Kafka embedded
package io.advantageous.reakt.kafka;
import org.apache.zookeeper.server.ServerConfig;
import org.apache.zookeeper.server.ZooKeeperServerMain;
import org.apache.zookeeper.server.quorum.QuorumPeerConfig;
import java.io.IOException;
import java.util.Properties;
import kafka.server.KafkaConfig;
import kafka.server.KafkaServerStartable;
public class KafkaServer {
private final ZooKeeperServerMain zooKeeperServer;
private final KafkaServerStartable kafkaServer;
public KafkaServer() {
final Properties zkProperties = new Properties();
final Properties kafkaProperties = new Properties();
try {
//load properties
kafkaProperties.load(Class.class.getResourceAsStream("/io/advantageous/reakt/kafka/kafka.properties"));
zkProperties.load(Class.class.getResourceAsStream("/io/advantageous/reakt/kafka/zookeeper.properties"));
} catch (Exception e){
throw new RuntimeException(e);
}
QuorumPeerConfig quorumConfiguration = new QuorumPeerConfig();
try {
quorumConfiguration.parseProperties(zkProperties);
} catch(Exception e) {
throw new RuntimeException(e);
}
zooKeeperServer = new ZooKeeperServerMain();
final ServerConfig configuration = new ServerConfig();
configuration.readFrom(quorumConfiguration);
new Thread(() -> {
try {
zooKeeperServer.runFromConfig(configuration);
} catch (IOException e) {
e.printStackTrace(System.err);
}
}).start();
//start local kafka broker
kafkaServer = new KafkaServerStartable(new KafkaConfig(kafkaProperties));
kafkaServer.startup();
}
public void shutdown() {
kafkaServer.shutdown();
}
public static void main(String[] args) {
new KafkaServer();
}
}
I like your post very much. It is nice useful for my research. I wish for you to share more info about this. Keep blogging Apache Kafka Training in Electronic City
ReplyDelete