Quarkus, Kafka, Camel Servlet, Wiring it Together

Quarkus, Kafka, Camel Servlet, Wiring it Together

July 04, 2019 ( last updated : July 06, 2019 )
quarkus kubernetes camel kafka smallrye

https://github.com/alainpham/quarkus-kafka-camel-servlet


Abstract

When using Apache Camel with Quarkus as of today, we are limited to a number of Camel connectors. An important one, being the Apache Kafka connector. The Kafka connector provided through the Smallrye Kafka Extension is available for Quarkus though. So in this article, I will show how to wire the Smallrye Kafka connector and Camel together. We will also be using the camel-servlet component to reuse the undertow http endpoint provided by Quarkus. To spice things up a little we will use the XML DSL of Camel. All of this will be natively compiled in the end.

Setup a project

In a previous article I showed how to create a hello world using Camel and Quarkus. Here We will try to do something more advanced. Lets start by setting up the project

mkdir quarkus-kafka-camel-servlet

cd quarkus-kafka-camel-servlet

mvn io.quarkus:quarkus-maven-plugin:0.18.0:create \
-DprojectGroupId=agilabs \
-DprojectArtifactId=quarkus-kafka-camel-servlet \
-DclassName="agilabs.GreetingResource" \
-Dpath="/hello"

Add the following dependencies to the pom.xml file.

<dependency>
    <groupId>io.quarkus</groupId>
    <artifactId>quarkus-smallrye-reactive-messaging-kafka</artifactId>
</dependency>
<dependency>
    <groupId>io.quarkus</groupId>
    <artifactId>quarkus-camel-core</artifactId>
</dependency>
<dependency>
    <groupId>io.quarkus</groupId>
    <artifactId>quarkus-camel-servlet</artifactId>
</dependency>

Configure Camel

Application properties

Configure application properties in src/main/resources/application.properties

quarkus.camel.servlet.url-patterns = /camel/*
%dev.quarkus.camel.routes-uris = camel/routes.xml
quarkus.camel.routes-uris=file:/camel/routes.xml
quarkus.camel.defer-init-phase = true

We will match all calls that go to /camel/* to the camel servlet component. In dev mode we will scan the file routes.xml for camel routes located in the classpath, in the resource folder camel.

When launched in "production mode" with the native executable the resources are not included by default in the binary. I'm externalizing the routes.xml file to the folder /camel so that it can be set and changed dynamically. This is actually quite interesting because we get to have a configurable native executable that we can fine tune by switching the routes during run time. For some when I tried to change the property quarkus.camel.routes-uris during native run time, it's new value doesn't seem to be taken into account. So this file path is kind of like being hard coded right now..

Edit July 6,2019 : I discovered that there are different Configuration Phases : BUILD_TIME, BUILD_AND_RUN_TIME_FIXED & RUN_TIME. It turns out this property is set to BUILD_AND_RUN_TIME_FIXED, so it is not dynamically read during native run time for the moment. More details here.

Quarkus.camel.defer-init-phase property is set to true because it seems that the camel context is created before all the CDI bean wiring and dependency injection happens. I need to do further digging to understand entirely how this works.

Camel Routes XML Files

Lets set up the routes in the src/main/resources/camel/routes.xml file.

<?xml version="1.0" encoding="UTF-8" ?>
<routes xmlns="http://camel.apache.org/schema/spring" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd">

    <route id="receive-kafka-events">
        <from uri="direct:receive-events"></from>
        <log message="Received from kafka broker : ${body}"></log>
    </route>

    <route id="send-kafka-events">
        <from uri="servlet:send"></from>
        <log message="Sending"></log>
        <convertBodyTo type="java.lang.String"></convertBodyTo>
        <bean ref="smallrye-producer" method="produce(${body})"></bean>
        <setBody>
            <constant>Message sent</constant>
        </setBody>
    </route>
</routes>

The first route is to consume messages coming from Kafka. We will wire later on a camel producer template to send messages to this route from any other class in application

The second route listens to http requests coming on the path http://0.0.0.0:8080/camel/send. It will use a bean that incorporates an emitter to send messages.

Configure sinks for Kafka topics

Application properties

mp.messaging.incoming.in-events.connector=smallrye-kafka
mp.messaging.incoming.in-events.topic=events
mp.messaging.incoming.in-events.value.deserializer=org.apache.kafka.common.serialization.StringDeserializer

mp.messaging.outgoing.out-events.connector=smallrye-kafka
mp.messaging.outgoing.out-events.topic=events
mp.messaging.outgoing.out-events.value.serializer=org.apache.kafka.common.serialization.StringSerializer

We will configure a smallrye-kafka connector for incoming messages and outgoing messages

Create a consumer

Now lets go ahead and create a consumer with the @Incoming annotation. We will need to inject a Camel ProducerTemplate instance into the consumer so that the message can be forwarded to our Camel direct endpoint

package agilabs;
import javax.enterprise.context.ApplicationScoped;
import javax.inject.Inject;
import org.apache.camel.ProducerTemplate;
import org.eclipse.microprofile.reactive.messaging.Incoming;

@ApplicationScoped
public class Consumer {
    
    @Inject
    ProducerTemplate camelProducer;

    @Incoming("in-events")
    public void consume(String message) {              
        camelProducer.sendBody("direct:receive-events", message);
    }
}

Create a producer

Now lets create a producer. Note that this producer will be used by the camel xml context. So it needs to support reflection to find the different methods dynamically. We need to register this class for reflection so that it can work once compiled to native code.

package agilabs;

import javax.enterprise.context.ApplicationScoped;
import javax.inject.Inject;

import org.apache.camel.ProducerTemplate;
import org.eclipse.microprofile.reactive.messaging.Incoming;

import io.quarkus.runtime.annotations.RegisterForReflection;
import io.smallrye.reactive.messaging.annotations.Emitter;
import io.smallrye.reactive.messaging.annotations.Stream;

@ApplicationScoped
@RegisterForReflection
public class Producer {
    
    @Inject
    @Stream("out-events")
    Emitter<String> emitter;

    public void produce(String message) {
        emitter.send(message);       
    }
}

Wiring it all together

Now that we have everything configured. We need to wire everything together. Register the SmallRye producer into the camel context and inject a camel producer template in the Quarkus CDI context.

As mentioned earlier, it seems that the Camel context is started earlier than the CDI context, so when Camel tries to look for a bean called smallrye-producer it will fail to start because it can't find it. This also comes from the fact that ApplicationScoped beans are instantiated in a lazy fashion. Since the smallrye-producer is never called in the CDI context, it will never be initialized. For these reasons we need to observe the Camel InitializedEvent and register the SmallRye producer at that specific time.

The @Produces annotation provides the Camel producer template to the CDI context.

package agilabs;
import javax.enterprise.event.Observes;
import javax.enterprise.inject.Produces;
import javax.inject.Inject;
import org.apache.camel.CamelContext;
import org.apache.camel.ProducerTemplate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.quarkus.camel.core.runtime.InitializedEvent;
import io.smallrye.reactive.messaging.annotations.Emitter;
import io.smallrye.reactive.messaging.annotations.Stream;

public class CamelWiring{

    @Inject
    Producer smallRyeProducer;

    @Inject
    CamelContext camelContext;

    public void registerSmallryeEmitter( @Observes InitializedEvent ev ) throws Exception {
        camelContext.getRegistry().bind("smallrye-producer", smallRyeProducer);
    }

    @Produces
    public ProducerTemplate buildCamelProducerTemplate() throws Exception {
        return camelContext.createProducerTemplate(); ;
    }
}

Compiling and testing the project

Now lets compile it to native code. The compilation is quite memory & CPU intensive. I guess this is due to the costly compilation of XML support libraries. At peak I observed 9.8 GB of memory consumption. The whole compilation process took about 7 to 8 minutes

git clone https://github.com/alainpham/quarkus-kafka-camel-servlet.git
cd quarkus-kafka-camel-servlet
mvn package -Pnative

If you are in a rush. I have already built a container image for you. You can run everything wih only one command using the docker-compose.yml file. It will boot a zookeeper and a kafka broker using images provided by the Strimzi project and also a container with the current project.

docker-compose up

You can also get the container image directly here :
https://hub.docker.com/r/alainpham/quarkus-kafka-camel-servlet

Test it!

curl http://localhost:8080/camel/send  -X POST -H "Content-Type: text/plain"  -d 'Hello there!!'

The output should be

quarkus-kafka-camel-servlet_1  | 2019-07-05 13:12:58,025 INFO  [send-kafka-events] (executor-thread-1) Sending
quarkus-kafka-camel-servlet_1  | 2019-07-05 13:12:58,027 INFO  [io.sma.rea.mes.kaf.KafkaSink] (vert.x-eventloop-thread-1) Message org.eclipse.microprofile.reactive.messaging.Message$$Lambda$5e1799c60041209b57937765b33186671a72f6f8@7f35f7425e98 sent successfully to Kafka topic 'events'
quarkus-kafka-camel-servlet_1  | 2019-07-05 13:12:58,027 INFO  [receive-kafka-events] (vert.x-eventloop-thread-0) Received from kafka broker : Hello there!

Thanks for reading !

Resources I used for inspiration

Originally published July 04, 2019
Latest update July 06, 2019

Related posts :