Spring Batch and MongoDB - codecentric AG Blog

:

#springbatch #mongodb #nosql

Spring Batch

Spring Batch is a Spring-based framework for enterprise Java batch processing. An important aspect of Spring Batch is the separation between reading from and writing to resources and the processing of a single record, called item in the Spring Batch lingo. There are a lot of existing item readers and writers for a wide range of resources like JDBC databases, JMS messaging systems, flat file etc. If the resource of your choice is not supported of of the box, it is easy to implement your own reader and writer as we will see in a minute.

MongoDB

MongoDB is a popular NoSQL datastore. It stores so called documents (basically an ordered set of key/value pairs where a value can be a simple data type like String or integer but also an array of values or a sub document). MongoDB is optimized for heavy write throughput and horizontal scaling.

Since I am a big fan of MongoDB on the one hand and introducing the Spring Batch framework at one of my customers on the other hand, why not implement a Spring Batch item reader and writer for MongoDB and publish it on github so that everybody can use it: github.com/ttrelle/spring-batch-mongodb-support.

MongoDB Item Reader

Implementing the item reader was straightforward. It was merely a matter of passing parameters to the underlying MongoDB driver API. The usage is very simple:

<bean id="itemReader1"
	class="org.springframework.batch.item.mongodb.MongoDBItemReader"
	scope="step" 
	p:mongo-ref="mongod" 
	p:db="#{jobParameters['db']}"
	p:collection="#{jobParameters['collection']}" 
 
	p:query="{a: {$gt: 5}"
	p:keys="{_id:0, a:1, b:1}"
 
	p:sort="{b: -1}"
	p:batchSize="20"
	p:limit="100"
	p:skip="5"
	p:snapshot="true"
/>

<bean id="itemReader1" class="org.springframework.batch.item.mongodb.MongoDBItemReader" scope="step" p:mongo-ref="mongod" p:db="#{jobParameters['db']}" p:collection="#{jobParameters['collection']}"p:query="{a: {$gt: 5}" p:keys="{_id:0, a:1, b:1}"p:sort="{b: -1}" p:batchSize="20" p:limit="100" p:skip="5" p:snapshot="true" />

We have three kinds of parameters:

  • mongo, db and collection determine the MongoDB connection and what collection to read from. These parameters are required, all other are optional.
  • query and keys are making up the MongoDB query. The first one is the query itself, the second one selects the field to read. If you don’t set a query string, all documents from the collection are read.
  • sort, batchSize, limit, skip and snapshot are parameters of the cursor that is used to iterate over the result set.

By default, the item reader emits DBObject instances that come from the MongoDB driver API. These objects are basically ordered hashmaps. If you want to use another representation of your data in the item processor, you can write a custom converter …

public class DocumentUserConverter implements Converter<DBObject, User> {
 
	@Override
	public User convert(DBObject document) {
		User usr = new User();
 
		usr.setId((String)document.get("_id"));
		usr.setName((String)document.get("name"));
		usr.setLoginCount((Integer)document.get("n"));
 
		return usr;
	}
}

public class DocumentUserConverter implements Converter<DBObject, User> {@Override public User convert(DBObject document) { User usr = new User(); usr.setId((String)document.get("_id")); usr.setName((String)document.get("name")); usr.setLoginCount((Integer)document.get("n")); return usr; } }

… and put it into the reader:

<bean id="user-converter" class="[package].DocumentUserConverter" />
 
<bean id="itemReader1"
	class="org.springframework.batch.item.mongodb.MongoDBItemReader"
	scope="step" 
	p:mongo-ref="mongod" 
	p:db="#{jobParameters['db']}"
	p:collection="#{jobParameters['collection']}" 
 
	p:converter-ref="user-converter"
        ...
/>

<bean id="user-converter" class="[package].DocumentUserConverter" /><bean id="itemReader1" class="org.springframework.batch.item.mongodb.MongoDBItemReader" scope="step" p:mongo-ref="mongod" p:db="#{jobParameters['db']}" p:collection="#{jobParameters['collection']}" p:converter-ref="user-converter" ... />

MongoDB Item Writer

My first approach to the item writer was very naive. I just took the (optionally converted) DBObject item list and inserted them into the target collection. This can be done with the following configuration:

<bean id="itemWriter1" 
	class="org.springframework.batch.item.mongodb.MongoDBItemWriter"
	scope="step"
	p:mongo-ref="mongod" 
	p:db="#{jobParameters['db']}"
	p:collection="#{jobParameters['collection']}"
 
	p:transactional="true"
	p:writeConcern="WriteConcern.JOURNAL_SAFE"
	p:checkWriteResult="true"
/>

<bean id="itemWriter1" class="org.springframework.batch.item.mongodb.MongoDBItemWriter" scope="step" p:mongo-ref="mongod" p:db="#{jobParameters['db']}" p:collection="#{jobParameters['collection']}" p:transactional="true" p:writeConcern="WriteConcern.JOURNAL_SAFE" p:checkWriteResult="true" />

These are possible parameters:

  • mongo, db and collection determine the MongoDB connection and what collection to write to. These parameters are required, all other are optional.
  • transaction let the writer act (more or less) transactional (more on that later on). Defaults to true.
  • writeConcern If you want to use a write concern that is different from the one specified on the MongoDNB connection.
  • checkWriteResult This flag determintes whether to check for errors after writing (the default behaviour of the Java driver is fire&forget). Defaults to true.

As with the reader you can also specify a converter for this writer that optionally converts from some other representation to DBObject instances.

TX or no TX?

In Spring Batch, all jobs run within an active transaction, even if they write to nontransactional resources like files, SMTP servers etc. My colleague Tobias Flohre (who is a Spring Batch expert) helped me with adopting the writer to that aspect of Spring Batch. Basically, the writer now …

a) delays the insertion of the documents into the MongoDB collection to the end of the transaction. This is a common pattern for nontransactional resources in Spring Batch. The advantage of this behaviour is obvious: if another writing resource (e.g. a JDBC writer) fails and causes a rollback, no documents are inserted into MongoDB.

b) throws an exception that causes the rollback of the surrounding transaction if the write to MongoDB fails.

Such an implementation now mimics a nearly transactional behaviour, even when writing to a MongoDB collection.

Of course, this does not turn MongoDB into a transactional database!

If you insert more than one document into a collection and one of these inserts fails, the remaining inserts are not (and cannot be) rolled back. Let’s assume our commit-interval in the item proccessor is set to 3. The MongoDB item writer will try to write all three documents in a single batch. If the write of the second document fails (maybe because of an index violation), the first document is already inserted.

To achieve a more transactional behaviour you have to set commit-interval = "1". Inserting single documents is an atomic operation. If we check for errors after each insert operation (which is the default behaviour of the writer), we can check whether the insert was successful or not. From a performance view, a commit-interval of 1 is not the best option, of course.

Summary

With the help of the MongoDB item reader and writer you can access a MongoDB datastore within your Spring Batch jobs. The item reader can be used straightforward. If you want to write documents the writer provides an implementation that is as transactional as you can get with MongoDB.

Feel free to use the MongoDB item reader and writer and let me know if it is useful to you.