Fuse Data Grid Integration via Hot Rod with Persistence and Indexing for Advanced Queries

Fuse Data Grid Integration via Hot Rod with Persistence and Indexing for Advanced Queries

April 07, 2017 ( last updated : April 06, 2017 )
jboss-fuse data-grid hot-rod protobuf lucene

https://github.com/alainpham/techlab-fuse-jdg-simple-hotrod


Abstract

Building Integration and Services platform with JBoss Fuse is great. It is even better when you add a distributed in memory data base solution such as JBoss Data Grid to the mix. This article will show how to make both technologies work together using the camel-jbossdatagrid component. We will go through the setup of a JBoss Data Grid server with persistence and see how to use it in a JBoss Fuse application through the remote Hot Rod client. Furthermore we will see how to take advantage of Protocol buffers and Lucene to index data and perform content based queries.

In the example, we will buil rest services to store and retrieve some business events.

fuse data grid remote server architecture

In summary the steps in this tutorial are the following :

Prerequisites

Versions used

Access modes to JBoss Data Grid

JBoss Data Grid can be accessed through 2 different modes : Library Mode and Server Mode.

In Server Mode, Data Grid is installed as a distant server and Fuse gets access to the objects through the Hot Rod client.

In library mode (or embedded), the Fuse engine uses it's own JVM memory to contain objects that is part of the Data Grid. In other words the Fuse engine acts as if it was a node amongst the Data Grid cluster. This mode gives access to advanced features such as transactions and locking and is actually quite easy to setup. The down side is that it is not possible to perform queries with the infinispan-embedded-query library with Fuse in a Karaf container. The main reason is that infinispan-embedded-query includes full dependencies to libraries such as hibernate. Hence it is not suited for an OSGI container such as Karaf.

In this article we will explore the possibilities of the Server Mode with the Hot Rod client. The Library Mode will be discussed in an other article.

Create a Fuse project

select version when creatting Fuse project

select emptu spring when creating Fuse project

Edit information and dependencies in pom.xml file

<dependency>
	<groupId>org.apache.camel</groupId>
	<artifactId>camel-jbossdatagrid</artifactId>
	<version>6.5.1.Final-redhat-1</version>
</dependency>

<dependency>
	<groupId>org.apache.camel</groupId>
	<artifactId>camel-netty4-http</artifactId>
</dependency>

<dependency>
	<groupId>org.apache.camel</groupId>
	<artifactId>camel-swagger-java</artifactId>
</dependency>

<dependency>
	<groupId>org.apache.camel</groupId>
	<artifactId>camel-jackson</artifactId>
</dependency>

Cache Container configuration on Data Grid Server

<server xmlns="urn:jboss:domain:1.6">
	...
   <subsystem xmlns="urn:jboss:domain:datasources:1.2">
      <datasources>
         <datasource jndi-name="java:jboss/datasources/JdbcDS" pool-name="JdbcDS" enabled="true" use-java-context="true">
            <connection-url>jdbc:h2:tcp://localhost:8942/dgdb</connection-url>
            <driver>h2</driver>
            <security>
               <user-name>sa</user-name>
               <password />
            </security>
         </datasource>
         <drivers>
            <driver name="h2" module="com.h2database.h2">
               <xa-datasource-class>org.h2.jdbcx.JdbcDataSource</xa-datasource-class>
            </driver>
         </drivers>
      </datasources>
   </subsystem>
   <subsystem xmlns="urn:infinispan:server:core:6.3" default-cache-container="local">
      <cache-container name="local" default-cache="default" statistics="true">
        ...
         <local-cache name="event">
            <eviction strategy="LRU" max-entries="50" /> <!-- At most we have 50 entries in the cache by evicting [l]east-[r]ecently-[u]sed-->
            <indexing index="LOCAL"> <!-- Indexing is activated to be stored localy only -->
               <property name="default.directory_provider">filesystem</property>
               <property name="default.indexBase">ispn_index</property>
            </indexing>
            <!-- A mixed store is created to be able to contain entries with string keys and also binary keys -->
            <mixed-keyed-jdbc-store datasource="java:jboss/datasources/JdbcDS" passivation="false" preload="true" purge="false">
               <binary-keyed-table prefix="ISPN_MIX_BKT" create-on-start="true" drop-on-exit="false">
                  <id-column name="id" type="VARCHAR" />
                  <data-column name="datum" type="BINARY" />
                  <timestamp-column name="version" type="BIGINT" />
               </binary-keyed-table>
               <string-keyed-table prefix="ISPN_MIX_STR" create-on-start="true" drop-on-exit="false">
                  <id-column name="id" type="VARCHAR" />
                  <data-column name="datum" type="BINARY" />
                  <timestamp-column name="version" type="BIGINT" />
               </string-keyed-table>
            </mixed-keyed-jdbc-store>
         </local-cache>
      </cache-container>
      <cache-container name="security" />
   </subsystem>
	 ...
</server>

Launch H2 data base

Download H2 data base here :
https://storage.googleapis.com/google-code-archive-downloads/v2/code.google.com/h2database/h2-2012-07-13.zip

Run the database with the following parameters

java -cp h2*.jar org.h2.tools.Server -tcp -tcpAllowOthers -tcpPort 8942 -baseDir ./h2dbstore -web -webAllowOthers -webPort 11112

Now we can start the JBoss Data Grid Server by going to its installation folder and running :

bin/standalone.sh

Create a model class for events and annotate for indexing

Create a class and annotate it so that it is indexed by Data Grid. To enable indexing through Hot Rod, POJOs need to serialized as protocol buffers

package techlab.model;

import java.io.Serializable;
import java.util.Date;

import org.infinispan.protostream.annotations.ProtoDoc;
import org.infinispan.protostream.annotations.ProtoField;

@ProtoDoc("@Indexed")
public class Event implements Serializable{
	private static final long serialVersionUID = 1L;
	private String uid;
	private Date timestmp;
	private String name;
	private String content;

	@ProtoField(number = 1)
	public String getUid() {
		return uid;
	}

	public void setUid(String uid) {
		this.uid = uid;
	}

	@ProtoDoc("@IndexedField(index = true, store = false)")
	@ProtoField(number = 2)
	public Date getTimestmp() {
		return timestmp;
	}

	public void setTimestmp(Date timestmp) {
		this.timestmp = timestmp;
	}

	@ProtoDoc("@IndexedField(index = true, store = false)")
	@ProtoField(number = 3)
	public String getName() {
		return name;
	}
	public void setName(String name) {
		this.name = name;
	}

	@ProtoField(number = 4)
	public String getContent() {
		return content;
	}

	public void setContent(String content) {
		this.content = content;
	}
}

Create rest services to get and put entries into the cache

Write a CacheManager factory. Note that protobuf schemas can be generated from the annotated class and then registered to the Data Grid Server on the reserved in metadata cache

package techlab.factory;

import java.io.IOException;

import org.infinispan.client.hotrod.RemoteCache;
import org.infinispan.client.hotrod.RemoteCacheManager;
import org.infinispan.client.hotrod.configuration.ConfigurationBuilder;
import org.infinispan.client.hotrod.marshall.ProtoStreamMarshaller;
import org.infinispan.protostream.SerializationContext;
import org.infinispan.protostream.annotations.ProtoSchemaBuilder;
import org.infinispan.protostream.annotations.ProtoSchemaBuilderException;
import org.infinispan.query.remote.client.ProtobufMetadataManagerConstants;

import techlab.model.Event;

public class RemoteCacheManagerFactory {

	ConfigurationBuilder clientBuilder;

	public RemoteCacheManagerFactory(String hostname, int port) {
		clientBuilder = new ConfigurationBuilder();
		clientBuilder.addServer()
		.host(hostname)
		.port(port)
		.marshaller(new ProtoStreamMarshaller());
	}

	public RemoteCacheManager newRemoteCacheManager() throws ProtoSchemaBuilderException, IOException {
		RemoteCacheManager remoteCacheManager = new RemoteCacheManager(clientBuilder.build());

		SerializationContext ctx = ProtoStreamMarshaller.getSerializationContext(remoteCacheManager);

		ProtoSchemaBuilder protoSchemaBuilder = new ProtoSchemaBuilder();

		//create a protobuf schema file from the annotated class. Protobuf marshallers and unmarshallers are generated automtically
		String eventSchema = protoSchemaBuilder
				.fileName("event.proto")
				.packageName("techlab")
				.addClass(Event.class)
				.build(ctx);

		//register the protobuf schema in the remote cache
		RemoteCache<String, String> metadataCache = remoteCacheManager.getCache(ProtobufMetadataManagerConstants.PROTOBUF_METADATA_CACHE_NAME);
		metadataCache.put("event.proto", eventSchema);

		//check if there is an error with the schemas
		String errors = metadataCache.get(ProtobufMetadataManagerConstants.ERRORS_KEY_SUFFIX);
		if (errors != null) {
			throw new IllegalStateException("Some Protobuf schema files contain errors:\n" + errors);
		}

		return remoteCacheManager;
	}
}

Configure the factory in the spring context

<beans>
	...
	<!-- ########################################################### -->
	<!-- Definition of remote cache Manager -->
	<!-- ########################################################### -->
	<bean class="techlab.factory.RemoteCacheManagerFactory" id="remoteCacheManagerFactory">
		<constructor-arg value="localhost" />
		<constructor-arg value="11222" />
	</bean>
	<bean factory-bean="remoteCacheManagerFactory" factory-method="newRemoteCacheManager"
		id="cacheManager" />
	...
</bean>

Add a reusable endpoint to the Camel context

<camelContext id="techlab-fuse-jdg-library-mode" xmlns="http://camel.apache.org/schema/spring">
	<!-- Data Grid endpoint -->
	<endpoint id="datagrid" uri="infinispan://?cacheContainer=#cacheManager" />
</camelContext>

Use the rest DSL to create routes and expose services to do the basic operations

<camelContext id="techlab-fuse-jdg-library-mode" xmlns="http://camel.apache.org/schema/spring">
	<!-- Data Grid endpoint -->
	<endpoint id="datagrid" uri="infinispan://?cacheContainer=#cacheManager" />

	<restConfiguration bindingMode="json" component="netty4-http"
		enableCORS="true" port="7123" apiContextPath="/api-doc">
		<dataFormatProperty key="prettyPrint" value="true" />
	</restConfiguration>

	<rest id="svc" path="">
		<get id="getOp" uri="{cacheName}/{uid}">
			<description>Get an entry with an ID from a cache</description>
			<to uri="direct:getOp" />
		</get>
		<put id="putOp" uri="{cacheName}/{uid}" type="techlab.model.Event">
			<description>Inserts an entry with the given ID and content in a cache</description>
			<to uri="direct:putOp" />
		</put>
	</rest>

	<!-- rest service to get an entry with the key -->
	<route id="getOpRoute">
		<from id="getOpStarter" uri="direct:getOp" />
		<setHeader headerName="CamelInfinispanKey" id="getOpRouteSetKey">
			<simple>${headers.uid}</simple>
		</setHeader>
		<setHeader headerName="CamelInfinispanCacheName" id="getOpRouteSetCacheName">
			<simple>${headers.cacheName}</simple>
		</setHeader>
		<setHeader headerName="CamelInfinispanOperation" id="getOpRouteSetOperation">
			<constant>CamelInfinispanOperationGet</constant>
		</setHeader>
		<to id="getOpRouteToDataGrid" uri="ref:datagrid" />
		<setBody id="getOpRouteSetResponse">
			<simple>${header.CamelInfinispanOperationResult}</simple>
		</setBody>
	</route>

	<!-- rest service to put entries into a cache -->
	<route id="putOpRoute">
		<from id="putOpStarter" uri="direct:putOp" />
		<setHeader headerName="CamelInfinispanKey" id="putOpRouteSetKey">
			<simple>${headers.uid}</simple>
		</setHeader>
		<setHeader headerName="CamelInfinispanCacheName" id="putOpRouteSetCacheName">
			<simple>${headers.cacheName}</simple>
		</setHeader>
		<setHeader headerName="CamelInfinispanOperation" id="putOpRouteSetOperation">
			<constant>CamelInfinispanOperationPut</constant>
		</setHeader>
		<setHeader headerName="CamelInfinispanValue" id="putOpRouteSetValue">
			<simple>${body}</simple>
		</setHeader>
		<to id="putOpRouteToDataGrid" uri="ref:datagrid" />
		<setBody id="putOpRouteSetResponse">
			<simple>Value inserted</simple>
		</setBody>
	</route>
</camelContext>

Create a query service with dynamic parameters

Define a rest service that allows to pass any http query parameters
(i.e http://localhost:7123/query/event/techlab.model.Event?timestmp=1462208399999&name=ended)

<get id="queryOp" uri="query/{cacheName}/{type}">
	<description>Allows to query based on object fields using lucene search engine</description>
	<to uri="direct:queryOp" />
</get>

Create classes that generate Data Grid Queries. Note that these classes are pretty generic as they use reflection and are suitable to any data model.

package techlab.dg;

import java.beans.BeanInfo;
import java.beans.IntrospectionException;
import java.beans.Introspector;
import java.beans.PropertyDescriptor;
import java.util.Date;
import java.util.Map;

import org.apache.camel.component.infinispan.InfinispanQueryBuilder;
import org.infinispan.query.dsl.FilterConditionContext;
import org.infinispan.query.dsl.Query;
import org.infinispan.query.dsl.QueryBuilder;
import org.infinispan.query.dsl.QueryFactory;

public class GenericQuery implements InfinispanQueryBuilder {

	private Map<String,Object> params;
	private BeanInfo info;
	private Class type;

	public GenericQuery(String typeName, Map<String, Object> params) throws ClassNotFoundException, IntrospectionException {
		super();

		//inspect the searched class in order to get the fields that can be queried
		type  =  Class.forName(typeName);
		info = Introspector.getBeanInfo( type,Object.class);
		this.params = params;

	}

	@Override
	public Query build(QueryFactory<Query> queryFactory) {


		QueryBuilder<Query> qb = queryFactory.from(type);

		FilterConditionContext ctx=null;

		// for each property of the class we look if a parameter has been set
		for ( PropertyDescriptor pd : info.getPropertyDescriptors() ){

			Object searchValue = this.params.get(pd.getName());

			//only search the fields that are actually indexed by checking the presence of Field annotation

			//only add search criteria when the parameter has been set in the header and when the property is indexed
			if (searchValue!=null){

				//if field is a date convert the type explicitly
				if (pd.getPropertyType().equals(Date.class)){
					searchValue = new Date(Long.parseLong((String)searchValue));
				}

				if (ctx==null){ 	//first condition
					ctx = qb.having(pd.getName()).eq(searchValue);
				}else{ 				//additional conditions with and operator
					ctx.and().having(pd.getName()).eq(searchValue);
				}
			}
		}

		return qb.build();
	}
}
package techlab.dg;

import java.beans.IntrospectionException;

import org.apache.camel.Exchange;
import org.apache.camel.component.infinispan.InfinispanQueryBuilder;

public class GenerateQuery {

	public InfinispanQueryBuilder getBuilder(Exchange ex) throws ClassNotFoundException, IntrospectionException {

		InfinispanQueryBuilder qb = new GenericQuery(ex.getIn().getHeader("type",String.class),ex.getIn().getHeaders());

		return qb;
	}
}

Declare the beans, service endpoint and route in the Camel context

<beans>
	...
  <bean id="generateQuery" class="techlab.dg.GenerateQuery" />
	...
	<camelContext id="techlab-fuse-jdg-library-mode" xmlns="http://camel.apache.org/schema/spring">
		...
		<rest id="svc" path="" >
			...
			<get id="queryOp" uri="query/{cacheName}/{type}">
				<description>Allows to query based on object fields using lucene search engine</description>
				<to uri="direct:queryOp" />
			</get>
		</rest>
		...
		<!-- rest service to query caches with any indexed field -->
		<route id="queryOpRoute">
			<from id="queryOpStarter" uri="direct:queryOp" />
			<log message="Query headers : ${headers}"></log>
			<setHeader headerName="CamelInfinispanCacheName" id="queryOpRouteSetCacheName">
				<simple>${headers.cacheName}</simple>
			</setHeader>
			<setHeader headerName="CamelInfinispanOperation" id="queryOpRouteSetOperation">
				<constant>CamelInfinispanOperationQuery</constant>
			</setHeader>
			<setHeader headerName="CamelInfinispanQueryBuilder" id="queryOpRouteSetBuilder">
				<method ref="generateQuery" method="getBuilder" />
			</setHeader>
			<to id="queryOpRouteToDataGrid" uri="ref:datagrid" />
			<setBody id="queryOpRouteSetResponse">
				<simple>${header.CamelInfinispanOperationResult}</simple>
			</setBody>
		</route>
	</camelContext>
</beans>

Test the project

Run the Fuse project on your developer machine

mvn clean package camel:run

Insert a few entries by running a curl command

curl -X PUT --header 'Content-Type: application/json' --header 'Accept: application/json' -d '{
 "uid": "1",
 "timestmp": "2017-04-07T19:30:00.000Z",
 "name": "start",
 "content": "party started" }' 'http://localhost:7123/event/1'

curl -X PUT --header 'Content-Type: application/json' --header 'Accept: application/json' -d '{
	"uid": "2",
	"timestmp": "2017-04-07T22:15:00.000Z",
	"name": "incident",
	"content": "police arrived" }' 'http://localhost:7123/event/2'

curl -X PUT --header 'Content-Type: application/json' --header 'Accept: application/json' -d '{
	"uid": "3",
	"timestmp": "2017-04-07T23:18:00.000Z",
	"name": "incident",
	"content": "host arrested" }' 'http://localhost:7123/event/3'

curl -X PUT --header 'Content-Type: application/json' --header 'Accept: application/json' -d '{
 "uid": "4",
 "timestmp": "2017-04-07T23:20:00.000Z",
 "name": "end",
 "content": "party ended" }' 'http://localhost:7123/event/4'

List all events through a query without parameters

curl -X GET --header 'Accept: application/json' 'http://localhost:7123/query/event/techlab.model.Event'
[ {
  "uid" : "1",
  "timestmp" : 1491593400000,
  "name" : "start",
  "content" : "party started"
}, {
  "uid" : "2",
  "timestmp" : 1491603300000,
  "name" : "incident",
  "content" : "police arrived"
}, {
  "uid" : "3",
  "timestmp" : 1491607080000,
  "name" : "incident",
  "content" : "host arrested"
}, {
  "uid" : "4",
  "timestmp" : 1491607200000,
  "name" : "end",
  "content" : "party ended"
}

List all incidents through a query with a parameter

curl -X GET --header 'Accept: application/json' 'http://localhost:7123/query/event/techlab.model.Event?name=incident'
[ {
  "uid" : "2",
  "timestmp" : 1491603300000,
  "name" : "incident",
  "content" : "police arrived"
}, {
  "uid" : "3",
  "timestmp" : 1491607080000,
  "name" : "incident",
  "content" : "host arrested"
} ]

List all incidents at a certain hour with 2 parameters

curl -X GET --header 'Accept: application/json' 'http://localhost:7123/query/event/techlab.model.Event?name=incident&timestmp=1491607080000'
[ {
  "uid" : "3",
  "timestmp" : 1491607080000,
  "name" : "incident",
  "content" : "host arrested"
} ]

Alternatively you can also use swagger-ui to test the services

swagger for rest service testing

Deploy Fuse project to Fuse Server (Karaf)

In the pom.xml file of the Fuse project, add dynamic import block to the maven-bundle-plugin.

<plugin>
	<groupId>org.apache.felix</groupId>
	<artifactId>maven-bundle-plugin</artifactId>
	<version>${version.maven-bundle-plugin}</version>
	<extensions>true</extensions>
	<configuration>
		<instructions>
			<Bundle-SymbolicName>techlab-fuse-jdg-simple-hotrod</Bundle-SymbolicName>
			<Bundle-Name>techlab-fuse-jdg-simple-hotrod</Bundle-Name>
			<DynamicImport-Package>*</DynamicImport-Package>
		</instructions>
	</configuration>
</plugin>

Generate bundle for deployment

mvn clean package

Connect to Fuse console and run these commands to install required dependencies

features:install camel-swagger-java camel-netty4-http camel-jackson
features:addurl mvn:org.apache.camel/camel-jbossdatagrid/6.5.1.Final-redhat-1/xml/features
features:install camel-jbossdatagrid

Install our Fuse project bundle

osgi:install -s file:<PATH_TO_PROJECT>/techlab-fuse-jdg-simple-hotrod/target/techlab-fuse-jdg-simple-hotrod-1.0.0-SNAPSHOT.jar

That's it, now we have our running Data Grid with persistence, indexes and we are able to access it in a Fuse Project.

Thanks for reading

Originally published April 07, 2017
Latest update April 06, 2017

Related posts :