Combining Apache Cassandra with Apache Karaf - codecentric AG Blog

:

Getting the best of Apache Cassandra inside Apache Karaf: this blog post will describe how easy it was to embed the NoSQL database inside the runtime. This can be helpful while developing OSGi-related applications with Karaf that work together with Cassandra.
The entire project for the examples can be found at GitHub.

Working with Apache Cassandra is fun, though sometimes you just need a simple embedded server with a connected client to test something for a POC or to explore the APIs. So it should be easy to install and use this embedded server. Combining this with the power of Apache Karaf, an OSGi Container, makes an unbeatable team.

Apache Karaf

Apache Karaf is an application server for OSGi applications. In the showcase we are going to use Karaf in the version 4.0.0-SNAPSHOT. It is still unreleased, but comes with so many great improvements, that we chose the SNAPSHOT anyway.

The main goal of Apache Karaf is to provide a complete infrastructure to run OSGi related applications without the hassle of integrating and fine-tuning the components. For example it provides standard logging functionality, a shell, which can also be called via SSH and some more infrastructural bundles. But certainly one of the biggest plusses is the shell that even supports auto completion and acts like a regular Unix shell.

Apache Cassandra Embedded

Running an embedded Apache Cassandra isn’t that hard, you just need to instantiate a org.apache.cassandra.service.CassandraDaemon. Without a configuration the daemon would fail to start though. Providing such a configuration with Karaf or OSGi in general is quite easy. Implement a ManagedService that manages the lifecycle, i.e.. start, stop, configure, of the wrapped CassandraDeamon .

As a ManagedService the newly implemented OsgiEmbeddedCassandra class just needs to implement the update method of the ManagedService API and therefore will be informed if a new configuration for the embedded Cassandra service is available. This method will take care of starting and stopping the CassandraDeamon.

The availability of a new configuration is triggered by the registered persistentID de.nierbeck.cassandra.embedded, if there is a new configuration available the new configuration for the cassandra.yaml file is stored in the service and the daemon will be stopped and started again.

public class OsgiEmbeddedCassandra implements Server, CassandraService,
		ManagedService {
...
	@Override
	public void updated(Dictionary<String, ?> properties)
			throws ConfigurationException {
		if (isRunning())
			stop();
		if (properties != null) {
			cassandraConfig = (String) properties.get("cassandra.yaml");
		}
		start();
	}
...
 
}

public class OsgiEmbeddedCassandra implements Server, CassandraService, ManagedService { ... @Override public void updated(Dictionary<String, ?> properties) throws ConfigurationException { if (isRunning()) stop(); if (properties != null) { cassandraConfig = (String) properties.get("cassandra.yaml"); } start(); } ...}

Additionally the service will implement the methods to start and stop the embedded Cassandra via the implemented interface.

...
public class OsgiEmbeddedCassandra implements Server, CassandraService,
		ManagedService {
...
	@Override
	public void start() {
		logger.info("starting Cassandra in Embedded mode");
 
		if (cassandraConfig != null) {
			System.setProperty("cassandra.config", "file://" + cassandraConfig);
		}
		System.setProperty("cassandra-foreground", "false");
 
		cassandraDaemon = new CassandraDaemon();
		try {
			logger.info("initializing cassandra deamon");
			cassandraDaemon.init(null);
		} catch (IOException e) {
			throw new RuntimeException(e);
		}
		logger.info("starting cassandra deamon");
		cassandraDaemon.start();
 
		logger.info("cassandra up and runnign");
	}
 
	@Override
	public void stop() {
		logger.info("Stopping cassandra deamon");
		logger.info("cleaning up the Schema keys");
		Schema.instance.clear();
		logger.info("stopping cassandra");
		cassandraDaemon.stop();
		logger.info("destroying the cassandra deamon");
		cassandraDaemon.destroy();
		logger.info("cassandra is removed");
		cassandraDaemon = null;
 
		logger.info("removing MBean");
		MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
		try {
			mbs.unregisterMBean(new ObjectName(
					"org.apache.cassandra.db:type=DynamicEndpointSnitch"));
		} catch (MBeanRegistrationException | InstanceNotFoundException
				| MalformedObjectNameException e) {
			logger.warn("Couldn't remove MBean");
		}
 
	}
...
}

... public class OsgiEmbeddedCassandra implements Server, CassandraService, ManagedService { ... @Override public void start() { logger.info("starting Cassandra in Embedded mode");if (cassandraConfig != null) { System.setProperty("cassandra.config", "file://" + cassandraConfig); } System.setProperty("cassandra-foreground", "false");cassandraDaemon = new CassandraDaemon(); try { logger.info("initializing cassandra deamon"); cassandraDaemon.init(null); } catch (IOException e) { throw new RuntimeException(e); } logger.info("starting cassandra deamon"); cassandraDaemon.start();logger.info("cassandra up and runnign"); }@Override public void stop() { logger.info("Stopping cassandra deamon"); logger.info("cleaning up the Schema keys"); Schema.instance.clear(); logger.info("stopping cassandra"); cassandraDaemon.stop(); logger.info("destroying the cassandra deamon"); cassandraDaemon.destroy(); logger.info("cassandra is removed"); cassandraDaemon = null;logger.info("removing MBean"); MBeanServer mbs = ManagementFactory.getPlatformMBeanServer(); try { mbs.unregisterMBean(new ObjectName( "org.apache.cassandra.db:type=DynamicEndpointSnitch")); } catch (MBeanRegistrationException | InstanceNotFoundException | MalformedObjectNameException e) { logger.warn("Couldn't remove MBean"); }} ... }

 

Creating Karaf – Cassandra administration commands

The interactive Karaf command shell is a powerful tool, which helps a lot with every-day debugging and administration tasks. So it is just natural to have some administrative commands available to control the embedded Cassandra service.
In this showcase we’ll create four Commands to manage the embedded Cassandra service. The StopService command will, as the name already states, give you the ability to stop a running embedded Cassandra service. The StartService command starts the service, if it isn’t already running. With the IsServiceRunning command you’ll get feedback about the current state. As an embedded instance is usually good for starting a POC or some showcase you might want to do some more try and error, so a cleanup of the mess could be handy. For this the CleanupKeyspace command is used. How easy it is to create such a command can be seen by the following StartService command, this and all other commands can be found in the sources.

@Command(scope = "cassandra-admin", name = "start", description = "Connect to cassandra server")
@Service
public class StartService implements Action {
 
	@Reference
	CassandraService cassandraServer;
 
	@Override
	public Object execute() throws Exception {
		if (cassandraServer.isRunning()) {
			System.err.println("Embedded Cassandra is already started");
			return null;
		}
 
		cassandraServer.start();
		System.out.println("Embedded Cassandra started.");
		return null;
	}
 
}

@Command(scope = "cassandra-admin", name = "start", description = "Connect to cassandra server") @Service public class StartService implements Action {@Reference CassandraService cassandraServer;@Override public Object execute() throws Exception { if (cassandraServer.isRunning()) { System.err.println("Embedded Cassandra is already started"); return null; }cassandraServer.start(); System.out.println("Embedded Cassandra started."); return null; }}

It’s a very simple class, all of the infrastructure needed is hidden by annotations. The @Command annotation marks this service as a command. The properties tell the scope of the command on the shell, like cassandra:connect. This is useful to group certain commands.
The @Service annotation declares this class to be a service. With this, it will be registered as a service in the service registry of the OSGi framework. The embedded Cassandra service is referenced by the @Reference annotation. Karaf will take care of injecting the service if it is available, otherwise this command won’t be accessible from the shell. As this command doesn’t use any further arguments only the execution method is needed. It’ll start the service if the underlying CassandraDeamon is stopped, otherwise it fails with a message. The execute method which is called by the shell implementation, does expect some return value, which will be printed to the shell. But since we want to be in control of the output, especially in case of an error, the method just returns a null.

Installation in Karaf

Karaf uses a concept called Features to simplify the grouping and deployment of bundle onto the server. A Karaf feature describes a set of bundles to be installed in one go. The feature descriptor is a xml structure.
To install the bundles required for running an embedded Cassandra interacting with it via the shell, you’ll just need to create a feature descriptor. The following snippet shows a fragment of the Karaf feature to install the showcase:

<features xmlns="http://karaf.apache.org/xmlns/features/v1.3.0" name="Karaf-Cassandra-Feature-1.0.0-SNAPSHOT">
...
    <feature name="Karaf-Cassandra-Embedded" description="Karaf-Cassandra-Feature" version="1.0.0.SNAPSHOT">
        <bundle>mvn:de.nierbeck.cassandra/Karaf-Cassandra-Service/1.0.0-SNAPSHOT</bundle>
        <bundle>mvn:de.nierbeck.cassandra/Karaf-Cassandra-Embedded/1.0.0-SNAPSHOT</bundle>
        <bundle>mvn:de.nierbeck.cassandra/Karaf-Cassandra-Admin/1.0.0-SNAPSHOT</bundle>
    </feature>
...
</features>

<features xmlns="http://karaf.apache.org/xmlns/features/v1.3.0" name="Karaf-Cassandra-Feature-1.0.0-SNAPSHOT"> ... <feature name="Karaf-Cassandra-Embedded" description="Karaf-Cassandra-Feature" version="1.0.0.SNAPSHOT"> <bundle>mvn:de.nierbeck.cassandra/Karaf-Cassandra-Service/1.0.0-SNAPSHOT</bundle> <bundle>mvn:de.nierbeck.cassandra/Karaf-Cassandra-Embedded/1.0.0-SNAPSHOT</bundle> <bundle>mvn:de.nierbeck.cassandra/Karaf-Cassandra-Admin/1.0.0-SNAPSHOT</bundle> </feature> ... </features>

To install this feature you’ll need to go to the shell of Karaf and issue the following commands. Where the first command registers the feature definition and the second one does install the feature.

feature:repo-add mvn:de.nierbeck.cassandra/Karaf-Cassandra-Feature/1.0.0-SNAPSHOT/xml/features
feature:install Karaf-Cassandra-Embedded

feature:repo-add mvn:de.nierbeck.cassandra/Karaf-Cassandra-Feature/1.0.0-SNAPSHOT/xml/features feature:install Karaf-Cassandra-Embedded

After this feature is installed, you are able to use the commands we previously defined. For Example issuing the following command:

cassandra-admin:isRunning

cassandra-admin:isRunning

Will return true, as the embedded Cassandra is running.

Some more shell commands

As it is pretty boring just to start and stop an embedded Cassandra server, we will add some more commands to our library. Those commands don’t necessarily need an embedded Cassandra server they could be used to connect to any Cassandra cluster.
At first some Cassandra client bundles need to be installed. In the previously mentioned feature definition is also a feature defined for Cassandra client bundles. Just install the Karaf-Cassandra-Client feature as shown before. Now we need some more client command, such as connecting to a Cassandra Cluster or issuing CQL scripts. The following commands are available if you install the last remaining feature in the feature descriptor, the Karaf-Cassandra-Shell feature.

  • cassandra:connect:
    This command does connect to any known cluster just issue the name or IP address of the node known to run a Cassandra instance. If the port isn’t the default one use the –p option to add an alternative port. To connect to the embedded Cassandra server issue the following:
    cassandra:connect –p 9142 localhost
  • cassandra:disconnect:
    Disconnects the current shell session from the remote Cassandra cluster. In case there is no active session it’ll tell you.
  • cassandra:isConnected:
    Will tell you true or false, if there is a connection bound to the shell. In case a USE command for a certain keyspace has been issued, the isConnected will return true:keyspace_name
  • cassandra:cql:
    This command takes an argument or an option. In case you give an argument like the following:
    cassandra:cql “select * from foo.bar;”
    it will print a table with the selection of table bar in keyspace foo.
    If you call:
    cassandra:cql –f /absolut/path/to/select.cql
    it will parse the file and execute the contained cql script. Printing tables in case there are Rows to print.

These are some nice commands but sometimes when debugging for an issue you don’t really know where to start, so a little help by the system is needed. For this some extra cassandra:cqlsh subshell commands are available. The cqlsh subshell is intentional because the following commands are supposed to give the same feeling as in the cqlsh, which is a python script. Those commands have completers that take the current scope like session, selected keyspace or table into account.
To use those commands, either switch to the corresponding subshells by issuing cassandra and cqlsh or prefix the commands by cassandra:cqlsh:

  • USE:
    The USE command appended by the keyspace name will use this keyspace name for the current Cassandra session, which is bound to the shell. This command does have a completer which tells you of the available keyspace names.
  • DESCRIBE:
    This command issued with keyspaces will tell you about the available keyspaces. If used with TABLES, it’ll tell you of the known tables and if it is combined with TABLE table_name, it’ll tell you about the details of the selected table. Again this command does have a completer to help about the keyspaces and tables.
  • DROP:
    The DROP command will drop either a table or a keyspace, depending on the input. This command also provides a completer for navigating to the right command.
  • CREATE:
    Supports creating either a keyspace or a table structure, together with the completer it should provide a CQLSH like completion and handling.
  • INSERT:
    This command will insert new data into a select table. A completer helps on finding the right syntax for it.
  • SELECT:
    The SELECT command will be send to the cluster, if there is a valid result it will be printed as a table view. The building of the select term is supported by the completion to help like in the CQLSH.

The most power of this showcase comes from the CQLSH like commands, which help debugging into the data contained in your Cassandra Database. In combination with the embedded Cassandra both tools provide a nice combination of features to start and play with a Cassandra Database for a new Project or POC. Karaf itself is just there to start and help with the infrastructure, as a lightweight container.

To run all this yourself follow the description of the Showcase at

https://github.com/ANierbeck/Karaf-Cassandra/