When using Apache Camel with Quarkus as of today, we are limited number of Camel connectors. One important one being the Apache Kafka connector. The Kafka connector provided through the Smallrye Kafka Extension is available for Quarkus though. So in this article, I will show how to wire Smallrye Kafka connector and Camel together. We will also be using the camel-servlet component to reuse the undertow http endpoint provided by Quarkus and to spice things up a little we will use the XML DSL of Camel. All of this will be natively compiled in the end.
In a previous article I showed how to create a hello world using Camel and Quarkus. Here We will try to do something more advanced. Lets start by setting up the project
Add the following dependencies to the pom.xml file.
Configure application properties in src/main/resources/application.properties
quarkus.camel.servlet.url-patterns = /camel/* %dev.quarkus.camel.routes-uris = camel/routes.xml quarkus.camel.routes-uris=file:/camel/routes.xml quarkus.camel.defer-init-phase = true
We will match all calls that go to /camel/* to the camel servlet component. In dev mode we will scan the file routes.xml for camel routes located in the classpath, in the resource folder camel.
When launched in "production mode" with the native executable the resources are not included by default in the binary. I'm externalizing the routes.xml file to the folder /camel so that it can be set and changed dynamically. This is actually quite interesting because we get to have a configurable native executable that we can fine tune by switching the routes during run time. For some when I tried to change the property quarkus.camel.routes-uris during native run time, it's new value doesn't seem to be taken into account. So this file path is kind of like being hard coded right now..
Edit July 6,2019 : I discovered that there are different Configuration Phases : BUILD_TIME, BUILD_AND_RUN_TIME_FIXED & RUN_TIME. It turns out this property is set to BUILD_AND_RUN_TIME_FIXED, so it is not dynamically read during native run time for the moment. More details here.
Quarkus.camel.defer-init-phase property is set to true because it seems that the camel context is created before all the CDI bean wiring and dependency injection happens. I need to do further digging to understand entirely how this works.
Lets set up the routes in the src/main/resources/camel/routes.xml file.
The first route is to consume messages coming from Kafka. We will wire later on a camel producer template to send messages to this route from any other class in application
The second route listens to http requests coming on the path http://0.0.0.0:8080/camel/send. It will use a bean that incorporates an emitter to send messages.
mp.messaging.incoming.in-events.connector=smallrye-kafka mp.messaging.incoming.in-events.topic=events mp.messaging.incoming.in-events.value.deserializer=org.apache.kafka.common.serialization.StringDeserializer mp.messaging.outgoing.out-events.connector=smallrye-kafka mp.messaging.outgoing.out-events.topic=events mp.messaging.outgoing.out-events.value.serializer=org.apache.kafka.common.serialization.StringSerializer
We will configure a smallrye-kafka connector for incoming messages and outgoing messages
Now lets go ahead and create a consumer with the @Incoming annotation. We will need to inject a Camel ProducerTemplate instance into the consumer so that the message can be forwarded to our Camel direct endpoint
Now lets create a producer. Note that this producer will be used by the camel xml context. So it needs to support reflection to find the different methods dynamically. We need to register this class for reflection so that it can work once compiled to native code.
Now that we have everything configured. We need to wire everything together. Register the SmallRye producer into the camel context and inject a camel producer template in the Quarkus CDI context.
As mentioned earlier, it seems that the Camel context is started earlier than the CDI context, so when Camel tries to look for a bean called smallrye-producer it will fail to start because it can't find it. This also comes from the fact that ApplicationScoped beans are instantiated in a lazy fashion. Since the smallrye-producer is never called in the CDI context, it will never be initialized. For these reasons we need to observe the Camel InitializedEvent and register the SmallRye producer at that specific time.
The @Produces annotation provides the Camel producer template to the CDI context.
Now lets compile it to native code. The compilation is quite memory & CPU intensive. I guess this is due to the costly compilation of XML support libraries. At peak I observed 9.8 GB of memory consumption. The whole compilation process took about 7 to 8 minutes
git clone https://github.com/alainpham/quarkus-kafka-camel-servlet.git cd quarkus-kafka-camel-servlet mvn package -Pnative
If you are in a rush. I have already built a container image for you. You can run everything wih only one command using the docker-compose.yml file. It will boot a zookeeper and a kafka broker using images provided by the Strimzi project and also a container with the current project.
You can also get the container image directly here :
curl http://localhost:8080/camel/send -X POST -H "Content-Type: text/plain" -d 'Hello there!!'
The output should be
quarkus-kafka-camel-servlet_1 | 2019-07-05 13:12:58,025 INFO [send-kafka-events] (executor-thread-1) Sending quarkus-kafka-camel-servlet_1 | 2019-07-05 13:12:58,027 INFO [io.sma.rea.mes.kaf.KafkaSink] (vert.x-eventloop-thread-1) Message org.eclipse.microprofile.reactive.messaging.Message$$Lambda$5e1799c60041209b57937765b33186671a72f6f8@7f35f7425e98 sent successfully to Kafka topic 'events' quarkus-kafka-camel-servlet_1 | 2019-07-05 13:12:58,027 INFO [receive-kafka-events] (vert.x-eventloop-thread-0) Received from kafka broker : Hello there!
Thanks for reading !
Resources I used for inspiration
Originally published July 04, 2019
Latest update July 06, 2019
Related posts :