ֿ

Overview

Galaxy is a high-performance in-memory data-grid (IMDG) that can serve as a basis for building distributed applications that require fine-tuned control over data placement and/or custom distributed data-structures.

What makes Galaxy different from other IMDGs is the way it assigns data items to cluster node. Instead of sharding the data on one of the keys using a consistent hashing scheme, Galaxy dynamically moves objects from one node to another as needed using a cache-coherence protocol similar to that found in CPUs. This makes Galaxy suitable for applications with predictable data access patterns, i.e. applications where the data items behave according to some proximity metric, and items that are “closer” together are more likely to be accessed together than items that are “far” apart.

Galaxy is not a key-value store, though it can be used to build one. It’s meant to be used as a low-level platform for building distributed data structures.

Galaxy uses ZooKeeper or JGroups for cluster management, and BerkeleyDB or any SQL database for optional persistence.

Note: Galaxy is currently in ALPHA and considered experimental. Please submit bug reports and feature requests to the issue tracker.

Quasar is developed by Parallel Universe and released as free software, dual-licensed under the Eclipse Public License and the GNU Lesser General Public License.

News

July 23, 2014

Galaxy 1.4 has been released.

January 22, 2014

Galaxy 1.3 has been released.

August 9, 2012

Galaxy’s Networking. Part 3 of the Galaxy Internals blog post series.

August 3, 2012

How Galaxy Handles Failures. Part 2 of the Galaxy Internals blog post series.

July 26, 2012

How Galaxy Maintains Data Consistency in the Grid.

July 10, 2012

The first public version of Galaxy, 1.0-alpha1, has been released.

See the announcement on our blog.

Getting Started

System requirements

Galaxy requires that the Java Runtime Environment (JRE) version 7 will be installed on your machine.

Downloading Galaxy

You can download the Galaxy distribution from here: Release 1.0

Building Galaxy

To build galaxy, simply cd to the Galaxy directory, then run:

gradle

If you don’t have gradle installed on your machine then run instead:

./gradlew

To build the documentation, you need to have Sphinx and lessc installed. Then run:

gradle generateDocs

Using Maven

Add the following dependency to Maven:

<dependency>
    <groupId>co.paralleluniverse</groupId>
    <artifactId>galaxy</artifactId>
    <version>1.4</version>
</dependency>

Using the pre-built configurations

The user manual explains, at length, how to configure Galaxy (see Configuring and Monitoring Galaxy). However, for your convenience, a number of pre-built configurations are included with the Galaxy distribution, and can be found in the config directory.

While normally you’d have one large XML file with Galaxy’s configuration, these sample configurations are divided into snippet XML files (those XML files starting with an underscore in the config directory) that you can mix and match.

Peer configuration

Regular Galaxy cluster nodes are called “peers”, and to configure a peer, take a look at peer.xml in the config directory. It contains three or four XML import elements:

  1. _peer.xml. This import is required for all peer configurations.

  2. Cluster management product. Can be one of:
    • _jgroups.xml - to use JGroups.
    • _zookeeper.xml - to use Apache ZooKeeper. If selected, configure zkConnectString in _zookeeper.xml to match your ZooKeeper configuration.
  3. One of:
    • _with_server.xml - if you’d like your cluster to have a server node for persistence (see Configuring, Running and Monitoring the Server for an explanation about servers).
    • _with_cloud_server.xml - if you want a server but are running Galaxy in a cloud environment that does not allow multicast. In that case, you must use _zookeeper.xml.* _with_dumb_server - if you’d like to use a server that isn’t a Galaxy node but a simple SQL database for persistence. In this case, you must also import _sql.xml as item 4 (see Dumb servers for an explanation about dumb servers).
    • _no_server.xml- if you don’t want a server at all.
  4. Possibly _sql.xml, if and only if you’ve chosen _with_dumb_server.If chosen, edit _sql.xml with your database connection information. See Using SQL for more information about configuring the RDBMS integration.

Note: It is recommended that you configure your cluster to use a server node or a dumb server. See Should you use a server?.

In addition to this file, you’ll need to edit an additional .properties file. You must provide these properties (you can edit peer.properties in the config directory):

  1. galaxy.nodeId - this will identify the node in the cluster. Two or more nodes with the same id will form a “backup group” (see Backup groups).

  2. galaxy.port - this is the UDP port that Galaxy will use to send messages among peer nodes.

  3. galaxy.slave_port - if there are more than one nodes with the same id, the slaves of the peer group will connect to this TCP port on the master to receive backup data.

  4. galaxy.multicast.address - the IP address to use for multicast (not used when _with_cloud_server is chosen).

  5. galaxy.multicast.port - the IP port to use for multicast (not used when _with_cloud_server is chosen).

Properties 4 and 5 must be the same for all peer nodes. Properties 2 and 3 may be different for each node (this is especially useful when running several nodes on the same machine for testing). The nodeId property should be different or each node (but the same for nodes in the same backup group).

Note: If you create several peer properties files with different ports, you can run several peers on a single machine!

Server configuration

If you heed our recommendation and want to use Galaxy with a server node (or more, though, currently, only a single server node is supported), you’re going to need to configure it. Just like with peers, you should start by looking at the server.xml file that’s in the config directory. It is comprised of three XML import elements:

  1. _server.xml. This import is required for all peer configurations.

  2. Cluster management product. Can be one of:
    • _jgroups.xml - to use JGroups.
    • _zookeeper.xml - to use ZooKeeper. If selected, configure zkConnectString in _zookeeper.xml to match your ZooKeeper configuration.
  3. Persistence layer. Can be one of:
    • _bdb.xml - to use Berkeley DB, Java Edition as the persistence engine. If you choose to use BDB JE, you might want to change the envHome property, defined in the _bdb.xml file, to point to the directory where you want to place the BDB files, and the truncate property (which can be true or false) depending on whether or not you want the database truncated (cleared) upon server startup. See Using BerkeleyDB for more information about configuring BDB.
    • _sql.xml - to use a SQL database with a JDBC driver for persistence.If chosen, edit _sql.xml with your database connection information. See Using SQL for more information about configuring the RDBMS integration. Because you’re using a server node and peers would need to access it over the network, it’s best that you runthe server on the same machine running the DB to save the extra network hop. This should actually provide better performance than using the DB as a dumb server as explained above.

For the server, too, you’ll need to edit server.properties in the config directory. Leave galaxy.nodeId set to 0 - this is what identifies the node as the server. But set galaxy.port to the TCP port you want the peers to use when connecting to the server.

Running Galaxy

The Galaxy server runs as a standalone process. The peers are your application code that calls into the Galaxy library. Note that if you’re using ZooKeeper, you must start the ZooKeeper servers before starting any Galaxy nodes.

Running the server

To run the server, simply run the executable Java class co.paralleluniverse.galaxy.Server, and pass it two command-line arguments: the XML configuration file and the properties file, like so:

java -classpath galaxy.jar co.paralleluniverse.galaxy.Server config/server.xml config/server.properties

Using the peers

In your application code, you need to get an instance of the Grid class , which is the entry point to Galaxy’s API. You do it by calling the getInstance static method, and passing it the location of the XML and properties files you have configured in the configuration step, like this:

Grid grid = Grid.getInstance("config/peer.xml", "config/peer1.properties");

(depending on your current directory you may need to provide a different path to the XML and properties files.)

Usually, your next statement would be to tell the node to go online:

grid.goOnline();

Now you should read the API section of the manual to learn how to use the Galaxy API.

User Manual

This document explains how to use and configure Galaxy. Running Galaxy is about equal parts configuration and API so you’re advised to read this manual in its entirety.

Introduction to Galaxy

Galaxy is an in-memory data-grid. It’s main function is to distribute data objects (stored as simple byte arrays) among cluster nodes for distributed processing. It also provides point-to-point messaging with guaranteed delivery and guaranteed ordering, as well as a cluster configuration management service.

Galaxy Features and Architecture

RAM storage and code/data co-location

Application code runs on the same cluster nodes (called peer nodes), and processes the data objects which are kept in RAM. Unlike other IMDGs that partition data items, and distribute them in such a way that each object is the responsibility of a single node, Galaxy nodes exchange messages to transfer ownership of objects from one node to another. This means that if the application code in one of the nodes regularly updates some data items, those items will be owned by the node, and will be available for processing and update without any network I/O. As the application runs, items will migrate from one node to another (though items can reside on several nodes at once for read-only access). This gives the application precise control over the location of the data in the cluster for maximum performance.

This flexibility, however, is suitable when data access patterns can be predicted and so the data can be optimally placed. If data access is completely random, other IMDGs may be a better choice as they can retrieve or update any item at a cost of one network hop, while Galaxy might need to search the cluster for an unexpected item access.

Because the application code is co-located with the data, and because all of the data is kept in RAM, Galaxy is suitable for low-latency applications.

Consistency

Galaxy stores data in a fully consistent manner, meaning that if data item B has been modified after a change to data item A, no node in the cluster will see A’s new value but B’s old one.

Galaxy achieves consistency by using a cache-coherence protocol similar tothe protocols used to coordinate memory access among CPU cores. However, as Galaxy can guarantee the ordering of coordination messages between nodes no memory-fence operations are requires (as they are in CPUs) to achieve consistency.

Disk persistence and server nodes

The data items can optionally be persisted to disk on one or more special nodes called server nodes.

However, in order to keep latency low, Galaxy (even when configured to use a persistent server node), is not durable. This means that a failure in one or more of the nodes may result in a permanent loss of some recent updates. However, even in cases of such failures the data remains consistent, meaning the lost updates will be lost to all of the nodes (or to none).

High Availability

Galaxy can withstand a failure in one or more of the nodes, providing high-availability. This is achieved by either running Galaxy with a server node (which persists all of the grid data to disk) or by running a slave node (or more) for each of the peers, or both.

If only a server node is used, data is not lost when a peer node fails (except for possibly some recent updates as explained above), and all the data items owned by the failed node are still accessible. However, as the server reads those items from the disk, latency might suffer until all items have been accessed by the peers and kept in RAM.

Alternately, or in combination with a server, you can run one or more slave-nodes for each of the peers, that mirror the data stored in them, so that upon failure of a peer, one of its slaves will take over, already having all of the necessary data items in RAM.

A server node may also have slaves that will take over when it fails.

Messaging

Galaxy provides a point-to-point messaging service that guarantees message delivery and ordering. A message can be sent to a known node or to the unknown (to the application) owner of a given data item. So if Galaxy’s data-item migration makes moving data to code simple, Galaxy’s messages make moving an operation (code) to data just as simple.

The application messages are delivered by the same communication channel that delivers Galaxy’s internal coherence protocol messages, so messages are also guaranteed to be ordered with data operations. Say that node A updates data item X, and then sends a message to the owner of data item Y (which happens to be node B), as a result of which node B reads the value of X. In this case node B is guaranteed to read the value of X after the update done by A before sending the message.

Monitoring

All of Galaxy’s components are monitored to enable full diagnoses of failure or performance problems.

Galaxy’s aim is to give the application full control over data location and processing in the grid, and in-order to provide maximal flexibility with a simple API, it is relatively low-level. It provides no query mechanism whatsoever, imposes no structure on the data (which is kept in memory and on disk as byte arrays), and provides no locking of elements to coordinate complex transactions among different threads on a single node (although each operation is atomic by itself). All of that must be provided by the application.

Note: Use data-store operations to move data to code; use messages to move code to data.

Galaxy Internals

A series of blog posts detailing Galaxy’s inner workings can be found here: part 1, part 2, part 3.

Tips for Achieving High-Performance with Galaxy

  • Reduce contention - just like in all distributed systems (and even inside your CPU), contention invariably requires communication and communication invariably increases latency. Try to avoid multiple nodes all competing to update the same items.
  • The more nodes share an item, the less often it should be updated - even if an item is usually updated by the same node, if the item is shared (for read access), by a large number of nodes, updating it will increase latency (in the reader nodes, not the writer node).
  • Trees are good - Tree data structures (like B-trees and tries) often have the property that the higher up a tree-node is, it will be shared more, but will be updated less often. That’s a great property.
  • Keep your transactions short - this will also reduce contention. Try not to do any blocking operation while in a Galaxy transaction.
  • Consider configuring your Ethernet network to use jumbo frames - if your network supports it, jumbo frames may improve communication speed among Galaxy nodes.

Galaxy’s API

Galaxy’s Grid API (which is fully documented in the Javadoc) is found in the package co.paralleluniverse.galaxy.

Getting the grid instance

To get an instance of the galaxy grid, represented by the Grid class (Javadoc), simply call

Grid grid = Grid.getInstance();

Usually, your next statement would be to tell the node to go online:

grid.goOnline();

See Cluster organization for more details about node states.

Note: See Configuring and Monitoring Galaxy for information about configuring the grid.

Grid services

The Galaxy grid provides three services:

The data store service (accessed through the interface Store) provides all operations on grid data items. To obtain an instance of the Store, call

Store store = grid.store();

The messenger service (accessed through the interface Messenger) allows sending point-to-point messages over the grid that work in concert with the data store service. You can obtain an instance of the Messenger, by calling

Messenger messenger = grid.messenger();

The cluster service (accessed through the Cluster interface), is used internally by Galaxy to manage the cluster (handle membership changes, leader election etc.) but can be used by client application as well. It provides a nice mechanism for sharing configuration data among nodes. The Cluster instance is obtained so:

Cluster cluster = grid.cluster();

Data Store

The grid’s shared data store is accessed through the Store class (Javaoc) which is used for all operations on data items.

Data items

Galaxy data items are simple byte arrays, which are assigned a unique long identifier by the grid. You cannot choose the id given to a data item (Galaxy is not a key value store), so you are responsible for storing the item ids in your object graph (you can think of item ids as the grid version of references). This is all because Galaxy is meant to be used to implement any kind of distributed data structure (you can implement a distributed map on top of Galaxy and thus build your own key-value store).

To get an item’s id, you would either read it from another item (like following a reference), or read it from a message (see Messenger). However, all cluster nodes need a way to easily find the root (or roots) of the object graph, and for this purpose, Galaxy provides root items.

Root items

A root item is a data item which you’d like to access without knowing its id in advance. Roots are found using string identifiers of your own choosing. When a root is first located, it will be allocated, but only by one of the nodes accessing it. So if several cluster nodes are all accessing the same root, one will be responsible for initializing it (if it has not already been created before), and the rest of the nodes will observe the initialized root. This ensures that any node will either find an initialized root, or be assigned the task of initializing it (this will only happen once for each root).

Finding a root is done by calling the getRoot method within a transaction (getRoot is the only Store operation that requires a transaction. For all other operations, transactions are optional. Transactions are fully explained later in this chapter), like so:

long root = -1;
StoreTransaction txn = store.beginTransaction();
try {
    root = store.getRoot("myRootName", txn);
    if (store.isRootCreated(root, txn) 
        store.set(root, initialRootData(), txn); // initialize root
    store.commit(txn);
} catch(Exception ex) {
    store.rollback(txn);
    store.abort(txn);
}

Locating a root by its name can be costly, so only locate a root once (during application startup) and store its id for future accesses.

Note: Do not use the root mechanism as a general key-value store. Roots were designed to be accessed by their string identifiers only rarely (usually only when the application starts). Locating a root by its name is a costly operation.

Serialization and Persistables

In order to represent application objects, you can use any serialization mechanism, such as java.io serialization, Protocol Buffers, [Kryo] or any other. However, for best serialization performance, it is best to have your data objects implement the Persistable interface.

The Persistable interface (Javadoc) provides direct access to galaxy’s internal ByteBuffers, and eschews copying data to and from byte arrays. All of the Store’s data methods (get, set, put etc.) have versions that work with Persistables. Just make sure never to modify the ByteBuffer’s contents inside your implementation of Persistable’s read method.

Allocating items

Root items are allocated automatically. All other items must be allocated explicitly using one of the put methods or with the alloc method.

The alloc method (Javadoc) allocates one or more items. The items allocated are empty (i.e. contain nulls), and can be set with one of theset methods. This method is mostly intended for allocating arrays - a block of items with consecutive ids. The return value is the identifier for the first allocated item, with subsequent ids assigned to the following allocated items.

The put methods allocates a new item and sets its value (there are variants taking values of different types - array, ByteBuffer and Persistable). It returns the newly allocated item id.

Reading items

To read an items value, simply pass its id to one of the get methods.

Hinted reads

Sometimes your application may know which node likely owns a certain item (say if this information was conveyed in a message) or that an item is likely owned by the same node that currently owns a different item (if this is how your distributed data structure behaves). In such cases, you can provide hints to the get method as to the item’s owner, which may sometimes be helpful in improving the running time of the operation (however, even if the hint is wrong and the item is not, in fact, stored on the hinted node, the operation will still perform correctly and find the item wherever it is).

Some variants of the get method take a nodeHint parameter (a short value) that names the (supposed) owning node.

The getFromOwner methods take a second item id that points to an item which is likely owned by the same node that owns the requested item. Note that calling this method may only improve performance if the hinting item (the second parameter) is found on the local node (and so its owner is already known).

Writing items

To write an item’s value, use one of the set methods.

Deleting items

An item can be deleted with the del method (Javadoc).

Trying to access (get or set) a deleted item will result in an exception, but you should not rely on that to detect deleted items (making sure an item is deleted might be costly). Instead, try to only delete items when they are no longer “referenced” by any other item (i.e., they are unreachable).

Item ownership

As explained in the introduction, Galaxy is different from other IMDGs in that item ownership can move between cluster nodes during normal operation. This will now be explained in further detail.

Owned items and shared items

Whenever you access a Galaxy data item in your application, it is sent to the cluster node your code is running on. The item is then stored in RAM in one of two states: owned or shared.

Every Galaxy data item is owned by exactly one node at any point in time, but can be shared by many. All nodes sharing an item can read its value, but only the owning node can write it. The owning node and sharing nodes for each item change based on the operations the program performs. You can check whether an item is shared, owned or non-existent in any particular node by calling the getState method (Javadoc).

Sharing an item

When you call the get method (any of its variants), if the item is not found on the local node (in either a shared or an owned state), it will be fetched from the owning node, and kept in RAM in the shared state, until the owning node invalidates it (when the item value is changed). Any further reads (with get) will complete immediately with no required network operations.

The gets method (all of its variants) is very similar to get, except that the item will remain shared on the current node until it is explicitly released. In other words, the item is pinned to this node in the shared state. You shouldn’t keep the item pinned for long, because as long as it’s pinned to the local node, it’s value cannot be changed by the owning node! (this is not exactly true - see Inner Workings.

To release a pinned item, you must call the release method (Javadoc) or use the gets method in a transaction.

Owning an item

In order for an item to be written (with the set method), it must be owned by the local node, and it must not be shared by any other node (we then say that the item is exclusive in the calling node). So, when you call the set method, ownership of the item is transferred to the calling node, and all sharing nodes are asked to invalidate their copies of the item.

The getx method (all of its variants) reads an item’s value, but first it obtains ownership over it, and invalidates all sharers. In other words, it pins the item to the local node in the exclusive state. As long as the item is pinned, no other node can read or write the item (this, too, is not exactly true - see Inner Workings, so you should release it as soon as possible. getx is essentially a “get for write” operation, used to read the the item with the intent to soon modify it with set.

To release a pinned item, you must call the release method (Javadoc) or use the getx method in a transaction.

Deadlocks

Because getx and gets pin an item to the local node until it is explicitly released, pinning more than one item can result in a deadlock. For example if node A pins item X in a shared mode (using gets) and then wishes to pin item Y in the exclusive mode (with getx), while, at the same time node B pins Y in the shared mode and wishes to pin X in the exclusive mode, a conflict may occur which will result in both nodes A and B unable to complete their operation. This is called a deadlock.

When deadlock occurs, the failed operation will throw a TimeoutException. If this happens, you must undo all writes that have succeeded to relevant items and release all pinned items in order to allow the other node to complete its operation. Then, you may retry the operation. Transactions make dealing with timeouts easier.

See Timeout for instructions on setting the timeout duration.

Note: Inner Workings- When set or getx are called, the caller does not actually wait for all sharers to invalidate the items before modifying it. Galaxy assumes that if a tree falls in the forest and no one is around to hear it, it does not make a sound, so some lengthy operations are allowed to proceed as long as no other node can have access to the item. Therefore, set or getx will complete before all sharers have invalidated their copies, but the item’s new value will not be made available to other nodes until they do so. In fact, this is also done with high availability backup data (to the server or slave nodes). Writes do not wait for the server or slaves to acknowledge the backup, but other nodes cannot read the item’s new value until the backup has been completed. Neither is it entirely true that items pinned in the exclusive mode (with getx) cannot be read by other nodes. In fact Galaxy allows nodes to read an exclusively pinned item’s old value (as it had been before it was pinned), provided that the item is found on that node (because it was once a sharer or an owner of the item), and provided that reading the value will not violate consistency guarantees. In any case, Galaxy never allows reading (or writing) an item in a way that will violate consistency.

Listeners

You can listen for changes in an item’s value by providing a listener to the method setListener (Javadoc), which will get notified of events pertaining to a specific item. Only one listener can be set for a given item, and it may be removed by passing a null listener to setListener.

A listener may be useful, say, for updating a deserialized representation of the item.

The listener implements three methods:

  • received - called when a new value for the item has been received by the node when a get completes after the node’s value has been changed by another node. Received will not be called when the value is modified by the local node, nor will it be called when another node updates the item, but the local node has not requested its value with a get (or gets/x). That is, the listener does not listen for all modifications done to the item, only those which are of interest to this node, namely only when a get/s/x has been issued.
  • invalidated - called when the item’s owner requested the item be invalidated by the local node (because it wants an exclusive ownership for an update). Note that this does not necessarily mean that the item may not be read by the local node, as sometimes Galaxy allows stale reads as long as they don’t break consistency (see Inner Workings).
  • evicted - called when the item has been evicted entirely from the local node, either because it was a shared item that was not accessed recently and Galaxy evicted it to conserve memory, because the item has been deleted, or because Galaxy has determined that it can no longer be read without violating consistency.
Transactions

Transactions are used to make multi-item atomic operations easier to use. An atomic multi-item operation is one that potentially modifies more than one item, and allows other nodes to observe the items’ values either as they were before the transaction started or as they are once the transaction has completed. Internally, transaction simply track which items were pinned, and allows releasing all of them with one simple method call (remember, an item pinned with getx cannot be observed by other nodes).

A transaction is started with the beginTransaction method, completed with the commit or abort method, and is used so:

StoreTransaction txn = store.beginTransaction();
try {
    byte[] valX = store.gets(x, txn);
    byte[] valY = store.getx(y, txn);
    store.set(y, process(valX, valY), txn);
    store.commit(txn);
} catch(TimeoutException e) {
    store.rollback(txn); // or undo writes manually with a series of sets.
    store.abort(txn);
}

Note how you must explicitly undo your changes if the transaction fails - either using rollback or manually using set. By default, transactions support the rollback operation, but this makes them slower (and consume more memory) as they must remember items’ old values. You can disable this “redo log” in the configuration file (See Configuring and Monitoring the Cache).

See Timeout for instructions on setting the timeout duration.

Asynchronous operations

Galaxy works best when most data operations access items that are already stored on the local node (see Performance). However, occasionally operations do require network hops (for ownership transfer etc.), and so may block.

The data-store API provides non-blocking versions to all data operations (called getAsync, getsAsync, getxAsync etc.) that do not block, but instead return a Future. This is especially useful (and will give a significant performance boost) when performing several operations that don’t each require the result of the previous one. In the worst case (when network IO is required) this will result in all network requests being sent together instead of each being sent only after the previous has completed.

Here’s an example:

ListenableFuture<byte[]> valX = store.getsAsync(x, txn);
ListenableFuture<byte[]> valY = store.getxAsync(y, txn);
store.set(y, process(valX.get(), valY.get()), txn); // this call is synchronous

When used in a transaction, commit (and abort) will automatically wait for all futures returned within the transactions (and will so guarantee they are all complete when the transaction ends.

StoreTransaction txn = store.beginTransaction();
    
try {
    ListenableFuture<byte[]> valX = store.getsAsync(x, txn);
    ListenableFuture<byte[]> valY = store.getxAsync(y, txn);
    store.setAsync(y, process(valX.get(), valY.get()), txn);
    store.commit(txn);
} catch(TimeoutException e) {
    store.rollback(txn); // or undo writes manually with a series of sets.
    store.abort(txn);
}
Multithreading

All of Store’s methods are thread safe, and the Store instance may safely be used by multiple threads. However, Galaxy was built to provide inter-node synchronization - not intra-node synchronization - and so pinning an item to the local node entails no locking. Meaning, an item that was pinned with getx on one thread, will result in getx succeeding immediately when called from another. Even transactions (which are a thin management layer over pinning) will easily trample over each other if they touch the same items on different threads. Any synchronization among threads (such as locking) must be done by the application (or another layer of middleware on top of Galaxy).

By leaving locking to the application, Galaxy provides a lot of flexibility. For example, if used carefully, several threads may cooperate in running the same Galaxy transaction.

Performance

To fully enjoy Galaxy’s low-latency processing, abide by the following advice:

  • Reduce contention - just like in all distributed systems (and even inside your CPU), contention invariably requires communication and communication invariably increases latency. Try to avoid multiple nodes all competing to update the same items.
  • The more nodes share an item, the less often it should be updated - even if an item is usually updated by the same node, if the item is shared (for read access), by a large number of nodes, updating it will increase latency (in the reader nodes, not the writer node).
  • Trees are good - Tree data structures (like B-trees and tries) often have the property that the higher up a tree-node is, it will be shared more, but will be updated less often. That’s a great property.
  • Keep your transactions short - this will also reduce contention. Try not to do any blocking operation while in a Galaxy transaction.
  • Use asynchronous operations when appropriate.

Messenger

The Messenger (Javadoc) lets you send point-to-point messages to other nodes in the cluster, and is used alongside the Data store to distribute your application in the grid. While Galaxy’s data-store moves data around the grid to be processed at the appropriate node (moving data to code), messages are used to request nodes to carry out operations without migrating data (moving code to data).

There are two ways of routing your messages. You can send a message to a known node, or you can send it to the (unknown) owner of a specific data item.

Message topics

Messages are sent and received by topics. Topics are the mechanism by which messages are delivered to the appropriate recipient within the node. There are two kinds of topics you can use. long topics and String topics.

To receive messages, you register a listener that will receive all incoming messages sent to a specific topic in the local node. You register receivers like so:

messenger.addMessageListener(topic, myListener);

With topic being either a String or a long, and myListener is an object implementing MessageListener (Javadoc).

When a message sent to myTopic on the local node is received, myListener’s messageReceived method (Javadoc) will be called, and passed the message payload and the sending node.

You remove a listener by calling removeMessageListener.

Routing messages

One way of addressing messages is sending them to a well known node. You would usually use this option when replying to received messages, in which case the original sender is passed to the MessageListener’s messageReceived method (Javadoc).

To send a message to a well known node use one of the send methods, and pass it the node id (a short), the topic (either a long or a String), and the message payload, like so:

messenger.send(node, topic, message);

The other way of addressing messages is sending them to the owner of a known data item. To do that, use one of the sendToOwnerOf methods, and pass it the item id, the topic and the message payload, like so:

messenger.sendToOwnerOf(itemId, topic, message);
Delivery guarantees

Messages are guaranteed to be delivered, and to arrive in the order they were sent (i.e. two messages M and N that are sent in this order from node A to B, will be received by B in the same order, regardless of the messages’ topics).

Messages are delivered by the same communication channel that delivers Galaxy’s internal coherence protocol messages used to move data items around, so messages are also guaranteed to be ordered with data operations.

Say that node A updates data item X, and then sends a message M to the owner of data item Y (which happens to be node B), and as a result of receiving M, node B reads the value of X. In this case node B is guaranteed to read the value of X after the update done by A before sending the message.

These guarantees make it simple to distribute data processing in the grid.

Cluster Management

The Cluster interface (Javadoc) provides information about cluster nodes, and emits cluster events (like nodes joining or leaving the cluster). It also allows for easy configuration management.

While an application employing Galaxy does not have to use this service, it is very useful for all clustered applications. And while the application is free to use any other software for cluster management, this service is being used internally by Galaxy, so when this service detects a node going down - that’s when the data-store and messenger detect is as down, as well.

Internally, this service employs either Apache ZooKeeper (through Netflix’s Curator library) or JGroups (see Configuring the Cluster Component for configuring Galaxy to use either option). You can gain access to the underlying implementation by calling the getUnderlyingResource method (Javadoc) which will return the CuratorZookeeperClient used by Galaxy for cluster management (which, in turn, can be further queried for the underlying ZooKeeper instance) or the JGroups JChannel used for cluster management.

Cluster organization

Every Galaxy node in the cluster has a unique name (a String) which is automatically assigned to it, and an id (a short value) which is assigned in the local configuration file (see Configuring the Cluster Component) and is not unique (it is shared among all nodes in a backup group (see Backup groups). Node id 0 designates server nodes (see Server node).

In addition, every node may expose additional properties (such as IP addresses, ports or any other information) to all other nodes in the cluster. Some properties are used internally by Galaxy, but you can add your own (see Custom properties).

Each node is, at any given moment, in one of three states: offline, joined or online.

  • An offline node is one that is not seen by the other nodes in the cluster, either because it is turned off, disconnected from the network, or suffering from a software failure.
  • A joined node is one which is seen by all other nodes, but does not participate in the data-grid, i.e. it cannot receive messages or data-items. All it can do is observe other nodes’ state.
  • An online node is one which fully participates in the grid, either as a master or a slave (see Backup groups.

When your application first starts, its node is in the offline state. When you get a grid instance (by calling Grid.getInstance()) that’s when the node will try to join the cluster, and become joined.

To go online call grid.goOnline().

Server node

A server node (or nodes) is a special Galaxy node that doesn’t run application code, but is responsible for providing Galaxy data with disk-based persistence. All of the data in the grid is persisted to disk on the server node, and is also automatically served by it in case of a node failure.

The node server always has a node id of 0, and, just like with regular nodes, you can have several server nodes in a backup groups configuration for added availability.

See Servers and Backup Groups and Configuring, Running and Monitoring the Server for more information about configuring and running server node(s).

Backup groups

Galaxy nodes can be configured in backup groups. All nodes assigned the same node id in their configuration file will become part of the same backup group. At any given time, each live backup group has exactly one master node, and zero or more slave nodes. The master node replicates all of its owned data items to its slaves so that they can take over in case it fails.

When the master node fails, one of its slaves will become the master and take over. However, a master node can never become a slave. A slave node is notified of its new master status by an event (see Lifecycle events).

See Servers and Backup Groups for information about configuring backup groups.

Lifecycle events

You can listen for important lifecycle events with the addLifeCycleListener to which you pass your LifeCycleListener (Javadoc). The listener will be notified of the following events:

  • online - This event will get triggered when the node reaches the online state. The single boolean parameter tells the node is a master or a slave (see Backup groups).
  • offline - This event will get triggered when the node goes offline for some reason. Obviously, if the node goes offline due to a power failure, this event will not be triggered.
  • switchToMaster - This event will get triggered at a slave node when it becomes the master (when the previous master has gone offline.

You can also check the current state of the local node with the methods isOnline and isMaster.

You remove a lifecycle listener by calling removeLifecycleListener.

Cluster events

The Cluster service also sends notifications about occuronces in other cluster nodes.

If you hand a NodeChangeListener (Javadoc) to addNodeChangeListener you’ll be notified when a new node (actually, a backup group) comes online, when it goes offline, and when it has a master switchover.

A SlaveConfigurationListener (Javadoc) passed to addSlaveConfigurationListener will notify you when a slave is added to or removed from your local node’s backup group.

Node information

The Cluster interface has methods that return information about the local node and all other cluster nodes.

The getMyNodeId method returns the local node’s id, and the getNodes method returns the ids of all online nodes (actually all online backup groups. Remember, the node id is shared by all nodes in the group).

NodeInfo

The NodeInfo interface (Javadoc) let’s you access the configuration record published by any node in the cluster.

getMyNodeInfo returns the NodeInfo of the local node, so by calling

grid.cluster().getMyNodeInfo().getName()

you can get the local node’s unique name.

There are other methods in the Cluster interface that let you access the NodeInfo of just about any node in the cluster. Refer to the Javadoc for more information.

Custom properties

Other than its name and id, each node’s configuration record can hold any number of additional properties, which can be accessed by NodeInfo’s get or getProperties methods.

You can add a custom node property, and additionally require that nodes that have not yet publish a value for the property will not be in considered online until they do, with the addNodeProperty method (Javadoc). Refer to the Javadoc for more information.

The setNodeProperty method lets you set a property of the local node.

The addMasterNodePropertyListener (Javadoc) and addSlaveNodePropertyListener (Javadoc) methods allow you to listen for changes in the values of configuration record properties of any master node in the cluster or any of your slave nodes respectively.

The distributed tree

Galaxy nodes publish their configuration record via a service called DistributedTree (Javadoc) which implements a distributed filesystem-like tree of information nodes (very similar to the way Apache ZooKeeper works, and in fact, when the cluster is configured to use ZooKeeper, the DistributedTree is just a thin layer on top of it). DistributedTree makes some strong consistency and ordering guarantees about it’s data, which make it suitable for sharing critical configuration data and for coordination tasks among nodes, such as leader election. Galaxy’s NodeInfo records simply wrap the DistributedTree nodes in one of its branches (directories).

If you’d like direct access to the tree, you can obtain the DistributedTree instance by calling the getDistributedTree method.

Configuring and Monitoring Galaxy

Configuring the Galaxy grid is just as important as using its API, as the API is designed to be simple, but the actions Galaxy takes under the hood are very much determined by the way Galaxy is configured.

Galaxy uses the Spring Framework for its configuration, and is configured from an XML file using Spring Beans. For a short tutorial on Spring configuration, see A Spring Primer.

Note: This manual provides detailed instructions to configuring Galaxy. However, for a quicker start, we suggest you take a look at Using the pre-built configurations in the Getting Started guide.

The configuration file(s)

A Galaxy node is configured from one XML file (of course, that file can reference others in accordance with the Spring Beans schema), whose path is passed to the getInstance static method of the Grid class (Javadoc). If you pass null (or use the zero-argument version of getInstance), the default configuration file, galaxy.xml is used, if it’s found somewhere on the classpath.

In addition to the XML file, you may use a Java properties file, and reference those properties in the configuration file like so: ${prop.name}. The name of the properties file may also be passed to getInstance. If a null name is passed (or the zero-argument version of getInstance is used), no properties file gets loaded. The properties file is handy when you want to use the same configuration file for all nodes, with only minor differences (like node ID). In this case, you can just set the different properties in the separate properties file. This is the common case so properties files are recommended.

You define properties in the properties file like so (you can make up your own property names - they have no special meaning to Galaxy:

galaxy.nodeId=3

and you reference them from the configuration file when setting configuration properties like this:

<constructor-arg name="nodeId" value="${galaxy.nodeId}"/> 

Alternatively, you can define properties as JVM system properties by the same name.

The configuration file defines which implementation each Galaxy component is to use, as well as implementation specific configuration parameters for each component.

Galaxy’s configuration is flexible enough to allow running multiple nodes (even server nodes) in the same physical machine. All it requires is setting IP ports and the like carefully, where appropriate.

Note: You must use the documented bean ids for each component. You may not assign the beans other names, as Galaxy depends on the components having specific names. In addition, Spring’s default-autowire mode must be set to constructor.

Here’s how the configuration file should look:

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:context="http://www.springframework.org/schema/context"
       xmlns:util="http://www.springframework.org/schema/util"

       xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
          http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.0.xsd
          http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util-3.0.xsd"
       default-lazy-init="false"
       default-autowire="constructor">

    <!-- bean definitions ... -->
</beans>

Configuring Galaxy monitoring

Most Galaxy components expose monitoring data, and most of those that do let you choose what monitor to use (in the monitoringType constructor-arg). Currently, there are two options:

  • METRICS - uses Yammer’s Metrics library. Data can be exported to JMX MBeans, Ganglia, or Graphite. This option gives very detailed an high-resolution monitoring data, but might be heavy on resources.
  • JMX - uses simple JMX MBeans to export monitoring data. The data is lower-resolution and cruder than the Metrics option, but might be lighter on resources.

The best way to view MBeans is with VisualVM.

Monitoring Galaxy’s components

Most Galaxy components expose monitoring data, and the details about which information is exposed is detailed in the component’s respective manual section. Aside from component-specific monitoring, all components expose their configuration properties and basic status as MBeans. These MBeans are named co.paralleluniverse.galaxy:type=components,name=COMPONENT_NAME, where COMPONENT_NAME is the Spring Bean id.

Logging

Galaxy uses SLF4J for all its logging.

Galaxy’s components

A Spring Primer

The Spring Framework lets applications configure and wire their components in an XML file (it does many other things as well, but Galaxy only uses Spring for configuration). Using Spring or configuration allows very fine-tined control over many aspects of the application, but it can be quite verbose. This section is a short tutorial explaining how to configure and wire components using Spring XML. For a more detailed reference of Spring configuration please see Spring’s IoC Container Documentation.

Spring XML

A Spring configuration file is an XML file containing bean definitions (beans are explained in the next section). Here’s the structure of the configuration file, as should be used for Galaxy configuration. It contains some required Spring schemas as well as a few optional ones that provide some convenient features:

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:context="http://www.springframework.org/schema/context"
       xmlns:util="http://www.springframework.org/schema/util"

       xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
       http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.0.xsd
       http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util-3.0.xsd"
       default-lazy-init="false"
       default-autowire="constructor">

    <!-- bean definitions ... -->
</beans>
Instantiating beans

Spring beans are application components declared, configured in wired in the Spring XML configuration file as bean XML elements. Each bean corresponds to an instance of a Java object. A bean has a unique ID, which names the component, and a means to instantiate it. Usually, a bean is instantiated by constructing a simple object of a given class, in which case the bean definition would look like this.

<bean id="myBean" class="com.mycompany.myClass"/>

This would instantiate an instance (called myBean) of the class com.mycompany.myClass.

Sometimes, beans are constructed not using a simple constructor but by calling a factory static method. In that case, the bean definition would look like this:

<bean id="myBean" class="com.mycompany.myFactoryClass" factory-method="createInstance"/>

This would instantiate a bean (of some unspecified class) by calling the createInstance static method of the com.mycompany.myFactoryClass class.

Configuring beans

Usually, a bean definition includes some configuration of the bean instance. There are two kinds of configuration hooks: constructor-args and properties. Each bean definition can include constructor-args as well as properties.

constructor-args

constructor-args are passed as either constructor arguments to the class constructor or as arguments to the static factory method, according to the instantiation mechanism used. They are usually required for a successful bean instantiation.

constructor-args can be passed by name, by index, or by type. Here’s how we use them to instantiate a bean, using names:

<bean id="myBean" class="com.mycompany.myClass">
      <constructor-arg name="myName" value="foo" />
      <constructor-arg name="size" value="5" />        
</bean>

Here we’ve passed the value "foo" to the String parameter myName of the constructor, and the value 5 to the size parameter (in the next section we’ll see how Spring knows which value type to use).

Unfortunately, specifying constructor-args by name does not always work - it requires either relevant debug information in the class file or the use of a specific annotation by the class’s author. (Note: all Galaxy components described in this documentation, i.e. those that belong to a co.paralleluniverse.* packages, are annotated and allow specifying constructor-args by name). If named constructor-args don’t work, we can specify them using indices like so:

<bean id="myBean" class="com.mycompany.myClass">
      <constructor-arg index="0" value="foo" />
      <constructor-arg index="1" value="5" />        
</bean>

This also passes the String value "foo" as the first argument of the constructor (which is supposedly myName), and the value 5 as the second argument.

If each parameter is of a different type, we can also pass constructor-args without specifying either name or index, like so:

<bean id="myBean" class="com.mycompany.myClass">
      <constructor-arg value="foo" />
      <constructor-arg value="5" />        
</bean>

This will achieve the same effect, but only if Spring can figure out unambiguously the arguments’ types and which parameter they correspond to; if not, Spring will issue an error.

properties

properties are configurations that are usually optional, and if not included in the bean definition, the bean will use some default values. Internally, properties are set by calling Java Bean setter methods, and can therefore always be specified by name, like so:

<bean id="otherBean" class="com.mycompany.otherClass">
      <property name="foo" value="bar" />
      <property name="colorName" value="blue" />
      <property name="width" value="10" />
      <property name="height" value="100" />
</bean>

Of course, some beans combine both constructor-args and properties.

The p- and c- namespaces

Spring provides two special XML namespaces called p and c that allow for a less verbose configuration in many cases.

Using the p and c namespaces, you can specify configuration values as XML attributes rather than as child elements. So the following definition, using the p and c namespaces:

<bean id="myBean" class="a.b.Foo" c:text="Hello World!" p:height="300" p:color="blue" />

is identical to:

<bean id="myBean" class="a.b.Foo">
      <constructor-arg name="text" value="Hello World!" />
      <property name="height" value="300" />
      <property name="color" value="blue" />
</bean>

If constructor-arg names do not work with a given class, you can use indices with the c namespace, with

<bean id="myBean" class="a.b.Bar" c:_0="100" c:_1="200" />

being the same as

    <bean id="myBean" class="a.b.Bar">
        <constructor-arg index="0" value="100" />
        <constructor-arg index="0" value="200" />
    </bean>
Values

This section explains how Spring interprets property (or constructor-arg) values.

Primitives, strings and enums

Spring uses Java reflection to figure out the type of the property (or constructor-arg), and it attempts to convert the string value in the XML document into the appropriate type. So value="5" would become the integer 5 if the property’s type is int, 5.0 if it’s double, or the string "5" if the property is of type String; The values true and false are thus converted to boolean if the property is of the boolean type, and strings are passed verbatim if the property is of type String.

If a property’s type is a Java enum, then its value is converted to the appropriate enum value according to its name, so value="NFC" would become java.text.Normalizer.Form.NFC if the property’s type is java.text.Normalizer.Form.

Null values

If a property (or a constructor-arg), accept a Java reference, then the null value can be passed like this:

<property name="foo"><null/></property>
Referencing other beans

When a property (or constructor-arg) take an object, we can pass it one of the beans we’ve defined. So, if we have this bean:

<bean id="fooBean" class="a.b.Foo" />

and the Bar class has a property (or constructor-arg) taking a Foo, we can pass it fooBean like so:

<bean id="myBar" class="a.b.Bar" />
      <property name="foo" ref="fooBean" />
</bean>

Notice the use of the ref attribute rather than value.

The same could be done with the c/p- namespaces: p:foo-ref="myFoo", c:foo-ref="myFoo" or c:_0-ref="myFoo".

Compound values (inner beans)

When a property (or a constructor-arg) take an object, we could use a ref, but if the object is to be used only for the purpose of setting this particular property, rather than define a new bean and reference it, it is less verbose, less cluttered, and generally preferable to use an “inner bean”.

Inner beans are bean definitions local to a single property (or constructor arg), that don’t have an id, and therefore cannot be referenced anywhere else.

Here’s an example from a Galaxy configuration file of setting a property that takes a value of type java.net.InetSocketAddress:

<property name="multicastGroup">
    <bean class="java.net.InetSocketAddress">
          <constructor-arg index="0" value="225.0.0.1"/>
          <constructor-arg index="1" value="7050"/>
    </bean>
</property>

Or, more succinctly:

<property name="multicastGroup">
    <bean class="java.net.InetSocketAddress" c:_0="225.0.0.1" c:_1="7050" />
</property>

Configuring the Cluster Component

The cluster component (defined by the Spring Bean named cluster) is the most fundamental of Galaxy’s components. It defines how the Galaxy nodes discover each other and exchange configuration data.

There are currently two implementations of the cluster, one employing Apache ZooKeeper, and the other using JGroups. You may choose whichever you prefer. Note, however, that ZooKeeper requires special server nodes while JGroups doesn’t.

Servers and Backup Groups

Both cluster implementations share a couple of properties.

The first sets the nodeId (constructor-arg, short). This is a required property.
The special node id of 0 is reserved for server nodes (see below).

The other common property is hasServer (property, boolean, default:true). It is an optional property (with the default value true) that specifies whether the cluster has server nodes.

There are two mechanisms by which Galaxy provides high-availability in the face of node failures: server nodes and backup groups.

Server node

A server node (or nodes) is a special Galaxy node that doesn’t run application code, but is responsible for providing Galaxy data with disk-based persistence. All of the data in the grid is persisted to disk on the server node, and is also automatically served by it in case of a node failure.

The node server always has a node id of 0, and, just like with regular nodes, you can have several server nodes in a backup group (see below) configuration for added availability.

See Configuring, Running and Monitoring the Server for more information about configuring and running server node(s).

Backup Groups

Galaxy nodes can be configured in backup groups. All nodes assigned the same node id in their configuration file will become part of the same backup group. At any given time, each live backup group has exactly one master node, and zero or more slave nodes. The master node replicates all of its owned data items to its slaves so that they can take over in case it fails.

When the master node fails, one of its slaves will become the master and take over. However, a master node can never become a slave. A slave node is notified of its new master status by an event (see Lifecycle events).

See Cluster organization for more information.

Note: This version of Galaxy does not yet support a backup group for the server. This version of Galaxy supports backup groups of size 2 only (i.e. one master and one backup for each peer node).

Should you use a server?

Short answer: yes!

While you can configure a Galaxy cluster without a server, such configuration entails a somewhat different cache-coherence protocol. When not using a server node, if two node failures occur (from two different backup groups) within a very short time period, this may result in lost data items (though not in data conflicts).

In addition, it is not yet clear how dependable the grid could be without a server, and how useful a server-less configuration is. It is possible that the server-less configuration will be discontinued in a future version.

Note: In this version, the server-less configuration is not dependable, and may result in lost data, and possibly data conflicts, even if a single node fails!

Using ZooKeeper

The cluster component can use Apache ZooKeeper for cluster management. Galaxy uses ZooKeeper through the Netflix’s Curator library which simplifies ZooKeeper use. Please refer to the ZooKeeper documentation on how to set up ZooKeeper servers.

The ZooKeeper implementation of the cluster component is called co.paralleluniverse.galaxy.zookeeper.ZooKeeperCluster and has (in addition to the common nodeId and hasServer) the following configuration properties:

zkConnectString (constructor-arg, String)
The ZooKeeper connection string, which tells the node how to connect to the ZooKeeper servers. See the ZooKeeper Programmers Guide for details.

sessionTimeoutMs (property, int, default: 15000)
The ZooKeeper session timeout, in milliseconds. The ZooKeeper documentation has the details.

connectionTimeoutMs (property, int, default: 10000)
The Curator connection timeout, in milliseconds.

retryPolicy (property, com.netflix.curator.retry.ExponentialBackoffRetry, default: new ExponentialBackoffRetry(20, 20))
The Curator retry policy for failed ZooKeeper operations. See the example below on how to set this property. Refer to the Curator documentation for details.

Using ZooKeeper in the Cloud

There generally shouldn’t be a problem running Galaxy with ZooKeeper in the cloud. However, using ZooKeeper requires that the UDP implementation of the comm component be used, and that should be configured correctly to work in cloud environments. Please refer to No multicast (using Galaxy in the cloud) for information.

ZooKeeper Configuration Example
<bean id="cluster" class="co.paralleluniverse.galaxy.zookeeper.ZooKeeperCluster">
      <constructor-arg name="nodeId" value="${galaxy.nodeId}"/>
      <property name="hasServer" value="true"/>
      <constructor-arg name="zkConnectString" value="127.0.0.1:2181"/>
      <property name="sessionTimeoutMs" value="1500"/>
      <property name="connectionTimeoutMs" value="1000"/>
      <property name="retryPolicy">
          <bean class="com.netflix.curator.retry.ExponentialBackoffRetry">
              <constructor-arg index="0" value="20"/>
              <constructor-arg index="1" value="20"/>
          </bean>
      </property>
</bean>
Using JGroups

Instead of ZooKeeper, the cluster component can used an implementation (called co.paralleluniverse.galaxy.jgroups.JGroupsCluster) that employs JGroups for cluster management. Unlike ZooKeeper, JGroups manages the cluster in a purely peer-to-peer fashion, and thus does not require any special servers.

In addition to nodeId and hasServer, the JGroups cluster bean requires a few more properties, both must be identical in all Galaxy nodes.

The first, jgroupsClusterName (constructor-arg, String), is a String identifier you choose to give the JGroups cluster. This property must be identical in all of the cluster nodes.

The second is contains the detailed JGroups cluster configuration, and this property, too, must be identical in all nodes.

There are two options for setting it. You can either set the jgroupsConfXML property to point to a JGroups XML configuration file, or set the jgroupsConf property and embed within it the JGroups XML configuration, as in the example below. You must set one of these properties, but not both.

This section of the JGroups manual explains the JGroups configuration in detail, but the JGroups jar file contains several complete XML configuration files you can use as a basis.

Any valid JGroups configuration would do, but you must make two important addition to ensure proper operation of Galaxy (either in the embedded configuration or in the separate XML file).

First, you must add the SEQUENCER protocol to the configuration, so that a complete ordering of configuration messages is enforced. This is done by adding the following XML element to the JGroups configuration, somewhere towards the bottom:

<SEQUENCER />

Second, you must add the COUNTER protocol. Add the following XML element at the very bottom of the JGroups configuration:

<COUNTER bypass_bundling="true"  timeout="5000"/>

Third, if the cluster is configured not to use a server (hasServer is set to false), a locking protocol must be added to the bottom of JGroups configuration:

<CENTRAL_LOCK num_backups="1"/>

Note: Do not forget to add the SEQUENCER and COUNTER (and if no server is used then CENTRAL_LOCK as well) protocols to the JGroups configuration!

The third property, jgroupsThreadPool (property, java.util.concurrent.ThreadPoolExecutor, required), creates the thread pool used by JGroups. Please refer to Thread pools to learn how to configure thread-pools, or look at the example below.

Using JGroups in the Cloud

Some cloud environments (like Amazon EC2) prohibit multicast, so JGroups must be configured to not use multicast if you’re running Galaxy in such an environment.

There are generally two options to use in such cases. The first is to use the UDP JGroups transport, but disable multicasting (by setting to false the ip_mcast property). In addition, you must use a discovery protocol that does not employ multicasting, such as FILE_PING, JDBC_PING, RACKSPACE_PING or S3_PING. See Initial membership discovery in the JGroups documentation for more information on discovery protocols.

The other option is to use the TCP JGroups transport with either the TCPPING or TCPGOSSIP discovery protocols (or any of the ones mentioned above).

Note: When using Galaxy and JGroups in environments that do not support multicasting, you must also configure the comm component appropriately. See No multicast (using Galaxy in the cloud).

JGroups Configuration Example
<bean id="cluster" class="co.paralleluniverse.galaxy.jgroups.JGroupsCluster">
    <constructor-arg name="nodeId" value="${galaxy.nodeId}"/>
    <property name="hasServer" value="true"/>
    <constructor-arg name="jgroupsClusterName" value="cluster1"/>
    <property name="jgroupsConf">
      <value>
        <![CDATA[
        <config xmlns="urn:org:jgroups"
                xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
                xsi:schemaLocation="urn:org:jgroups http://www.jgroups.org/schema/JGroups-3.1.xsd">
            <UDP
                mcast_port="${jgroups.udp.mcast_port:45588}"
                tos="8"
                ucast_recv_buf_size="20M"
                ucast_send_buf_size="640K"
                mcast_recv_buf_size="25M"
                mcast_send_buf_size="640K"
                loopback="true"
                discard_incompatible_packets="true"
                max_bundle_size="64K"
                max_bundle_timeout="30"
                ip_ttl="${jgroups.udp.ip_ttl:8}"
                enable_bundling="true"
                enable_diagnostics="true"
                thread_naming_pattern="cl"

                timer_type="new"
                timer.min_threads="4"
                timer.max_threads="10"
                timer.keep_alive_time="3000"
                timer.queue_max_size="500"/>

            <PING timeout="2000"
                  num_initial_members="4"/>
            <MERGE3 max_interval="30000"
                    min_interval="10000"/>
            <FD_SOCK/>
            <FD_ALL/>
            <VERIFY_SUSPECT timeout="1500"  />
            <BARRIER />
            <pbcast.NAKACK2 xmit_interval="1000"
                            xmit_table_num_rows="100"
                            xmit_table_msgs_per_row="2000"
                            xmit_table_max_compaction_time="30000"
                            max_msg_batch_size="500"
                            use_mcast_xmit="false"
                            discard_delivered_msgs="true"/>
            <UNICAST />
            <pbcast.STABLE stability_delay="1000" desired_avg_gossip="50000"
                           max_bytes="4M"/>
            <pbcast.GMS print_local_addr="true" join_timeout="3000"
                        view_bundling="true"/>
            <SEQUENCER />
            <UFC max_credits="2M"
                 min_threshold="0.4"/>
            <MFC max_credits="2M"
                 min_threshold="0.4"/>
            <FRAG2 frag_size="60K"  />
            <pbcast.STATE_TRANSFER />
            <COUNTER bypass_bundling="true" 
                     timeout="5000"/>
        </config>
        ]]>
      </value>
    </property>
    <property name="jgroupsThreadPool">
        <bean class="co.paralleluniverse.galaxy.core.ConfigurableThreadPool">
            <constructor-arg name="corePoolSize" value="2"/>
            <constructor-arg name="maxPoolSize" value="8"/>
            <constructor-arg name="keepAliveMillis" value="5000"/>
            <constructor-arg name="maxQueueSize" value="500"/>
        </bean>
    </property>
</bean>

Configuring and Monitoring the Cache

The cache (bean id cache) is the component responsible for the managements of data items in the local node’s RAM, and for the ownership cache-coherence logic.

Configuring the cache

The cache currently has only one implementation (co.paralleluniverse.galaxy.core.Cache), and the following properties:

monitoringType (constructor-arg, String)
Sets the monitor type to use for cache monitoring. Can be either METRICS or JMX (see Configuring Galaxy monitoring).

synchronous (property, boolean, default: false)
Whether or not backups are done synchronously. When set to true, local get operations block until the server and/or slaves have acknowledged the backup.

maxCapacity (constructor-arg, long)
The maximum capacity (in bytes) to be used for storing shared items. If shared items take up more space than that, they will be evicted from the cache. Note that owned items are never evicted.

maxItemSize (property, int, default: 1024)
The maximum size, in bytes of a single data item. If UDPComm is used as the comm implementation (see Configuring and Monitoring the Comm Component), then an item must fit in a single UDP packet with room to spare. Ideally, it would fit in one IP packet, so for larger values of maxItemSize it’s best to configure your network to use jumbo packets. This value must be the same in all nodes.

rollbackSupported (property, boolean, default: true)
Sets whether or not automatic rollbacks for transactions are supported. See Transactions and the Store.rollback() (Javadoc).

compareBeforeWrite (property, boolean, default: true)
Sets whether or not written items should first be compared with their old value before creating a new version (Galaxy maintains a version number for each item to track updates).

reuseLines (property, boolean, default: true)
Sets whether or not the cache should pool and reuse the data-item book-keeping objects.

reuseSharerSets (property, boolean, default: false)
Sets whether or not the cache should pool and reuse the objects used to store data-item sharers.

maxStaleReadMillis (property, long, default: 500)
The maximum amount of time (in milliseconds) allowed to elapse since an item has been invalidated while still allowing the get operation to return the old value. Note that if a get could result in any inconsistency, the fresh value will always be retrieved from the owning node no matter what the value of this property is.

Here’s an example:

<bean id="cache" class="co.paralleluniverse.galaxy.core.Cache">
      <constructor-arg name="monitoringType" value="METRICS"/>
      <constructor-arg name="maxCapacity" value="100000000"/> 
      <property name="maxItemSize" value="1024"/>
      <property name="reuseLines" value="true"/>
      <property name="reuseSharerSets" value="true"/>
      <property name="rollbackSupported" value="true"/>
      <property name="compareBeforeWrite" value="true"/>
</bean>

Note: Synchronous mode is not yet implemented in this version.

Configuring local storage

localStorage is a component related to cache. It is responsible for the actual storage in RAM of the data items. There are currently two implementations - one keeps the items in the Java heap, and one stores them in direct ByteBuffer s.

Heap local storage

The co.paralleluniverse.galaxy.HeapLocalStorage implementation stores items in plain Java byte-arrays, allowing the Java garbage-collector to reclaim them when appropriate.

It has one property - monitoringType (constructor-arg, String), which you can set to either METRICS or JMX (see Configuring Galaxy monitoring).

This is how you configure it:

<bean id="localStorage" class="co.paralleluniverse.galaxy.HeapLocalStorage">
      <constructor-arg name="monitoringType" value="METRICS"/>
</bean>
Off-heap local storage

The other localStorage implementation, co.paralleluniverse.galaxy.core.OffHeapLocalStorage, stores data items in direct ByteBuffer s and manages allocations and de-allocations. It manages blocks of fixed-sized memory pages, each block used for data items of certain sizes. When allocating memory for an item of sized n, the memory buffer returned will be the nearest power-of-two greater-or-equal to n.

It has several configuration properties:

monitoringType (constructor-arg, String)
Can be METRICS or JMX (see Configuring Galaxy monitoring).

pageSize (constructor-arg, int)
The size in kilobytes of each memory page used by the allocator.

maxItemSize (constructor-arg, int)
The maximum size, in bytes of a single data item. Must be set to the same value as the maxItemSize property of the cache component.

maxPagesForConcurrency (property, int, default: Runtime.getRuntime().availableProcessors() * 2)
The maximum number of pages to allocate in each block simply for reducing contention (and not because memory is exhausted).

Here’s an example:

<bean id="localStorage" class="co.paralleluniverse.galaxy.core.OffHeapLocalStorage">
    	<constructor-arg name="monitoringType" value="METRICS"/> 
      <constructor-arg name="pageSize" value="1024"/>
      <constructor-arg name="maxItemSize" value="1024"/>
      <property name="maxPagesForConcurrency" value="4"/>
</bean>

The amount of memory available for direct buffer is determined by the XX:MaxDirectMemorySize command line option passed to the JVM (the java command). For example, to provide 512MB to direct ByteBuffers, you add the following command line option to the java command:

-XX:MaxDirectMemorySize=100M
Configuring Backup

The backup component is responsible for backing up the node’s owned items after modifications to the server and/or slaves. There is currently one implementation of backup - co.paralleluniverse.galaxy.core.BackupImpl" - and it has two configuration properties:

monitoringType (constructor-arg, String)
Sets the monitor type to use for cache monitoring. Can be either METRICS or JMX (see Configuring Galaxy monitoring).

maxDelay (property, int, default: 10)
The maximum duration, in milliseconds, between flushes of backup data (to the server and/or slaves.) It is also roughly the maximum amount of time that can be “lost”, i.e. updates that can disappear if the node goes down. If it’s small, less updates can be lost in a case of failure, but both latency and throughput would suffer.

serverComm (constructor-arg, co.paralleluniverse.galaxy.core.ServerComm, default: autowired)
If you configure your cluster without a server, set this constructor-arg to null(see Null values). Otherwise, don’t set it at all, and Spring will auto-wire it to whatever serverComm component you have defined (see The ServerComm).

Here’s a configuration example:

<bean id="backup" class="co.paralleluniverse.galaxy.core.BackupImpl">
      <constructor-arg name="monitoringType" value="METRICS"/>
      <property name="maxDelay" value="200"/>
</bean>
Monitoring the cache

TBD

Configuring and Monitoring the Messenger

The Messenger component (bean: messenger) is responsible for sending and receiving user messages on the grid (see Messenger).

Configuring the messenger

messenger has only one implementation - co.paralleluniverse.galaxy.core.MessengerImpl, and it takes just one configuration property: threadPool (property, co.paralleluniverse.galaxy.core.NodeOrderedThreadPoolExecutor, required). NodeOrderedThreadPoolExecutor is a special king of java.util.concurrent.ThreadPoolExecutor, and to learn more about configuring it, please see Thread pools for instructions on how to configure a thread-pool, or just take a look at this example:

<bean id="messenger" class="co.paralleluniverse.galaxy.core.MessengerImpl">
    <constructor-arg name="threadPool">
        <bean class="co.paralleluniverse.galaxy.core.NodeOrderedThreadPoolExecutor">
            <constructor-arg name="corePoolSize" value="2"/>
            <constructor-arg name="maximumPoolSize" value="8"/>
            <constructor-arg name="keepAliveTime" value="5000"/>
            <constructor-arg name="unit" value="MILLISECONDS"/>
            <constructor-arg name="maxQueueSize" value="500"/>
            <constructor-arg name="workQueue">
                <bean class="co.paralleluniverse.common.concurrent.SimpleBlockingQueue" c:maxSize="500"/>
            </constructor-arg>
        </bean>
    </constructor-arg>
</bean>

Configuring and Monitoring the Comm Component

The Comm component (bean id: comm) is responsible for transmitting Galaxy’s internal cache-coherence protocol messages, as well as user messages (sent with the Messenger) over the network. There are currently two implementations for this component. The first uses UDP, and the second uses JGroups and is available only if the cluster is configured to use JGroups (see Using JGroups).

Common comm configurations

Both comm implementations share a couple of very important configuration properties.

Timeout

The timeout configuration (property, long, default: 200), is the duration, in milliseconds to wait for a response to a massage. This, in effect, determines the time it takes for any grid operation to fail and throw a TimeoutException (See Deadlocks and Transactions).

The shorter the timeout is, the faster deadlocks will be detected, but so will more operations fail spuriously due to network latency.

No multicast (using Galaxy in the cloud)

For some internal operations (such as finding the owner of a freshly-encountered item id), both Comm implementations use multicast by default. Some cloud platforms (like Amazon EC2) do not allow multicast, so to run Galaxy without multicast, we can configure it to use the server for node discovery. Of course, in a situation like that a server is necessary.

In deployments where multicast is available, communicating with the server instead of multicasting can have a performance impact - sometimes for the better and sometimes for the worse, depending mostly on your server’s performance.

To tell comm to talk to the server instead of multicasting, set the bean property sendToServerInsteadOfMulticast to true (it’s false by default).

Please not that if you’re using JGroups for your cluster component implementation, you must configure JGroups to avoid multicast as well. See Using JGroups in the Cloud for more information.

The ServerComm

comm makes use of another component, called serverComm to communicate with the server. At the moment, there is just one implementation of server-comm, which uses TCP (so it makes use of the optional bossExecutor, workerExecutor and receiveExecutor properties, explained in Configuring Netty Channels.

So, if your cluster has a server node, define it with the following bean (in this example we do not set the thread-pool properties, so defaults are used):

<bean id="serverComm" class="co.paralleluniverse.galaxy.netty.TcpServerClientComm"/>

and link it to the comm bean by putting this line in the comm bean definition:

<constructor-arg name="serverComm" ref="serverComm"/>

(Actually, there is one more server-comm implementation, one that’s used with something called “dumb servers”. See Dumb servers for details).

If your cluster is configured without a server, set this constructor-arg to null (see Null values).

The SlaveComm

Just like a special component is used to communicate with the server, so too a special component is used to communicate with the slaves in the node’s backup group. The component is slaveComm, and it currently has one implementation that uses TCP called co.paralleluniverse.galaxy.netty.TcpSlaveComm.

In addition to the optional bossExecutor, workerExecutor and receiveExecutor properties explained Configuring Netty Channels, it has one configuration property:

port (constructor-arg, int)
The TCP port used for master-slave communications. The master binds a server socket to this port (and the slaves discover the port using the distributed configuration record, so in principle, this port can be different on each node, as it’s used only when the node is master.)

Here’s an example:

<bean id="slaveComm" class="co.paralleluniverse.galaxy.netty.TcpSlaveComm">
    <constructor-arg name="port" value="${grid.slave_port}"/>
<property name="receiveExecutor">
      <bean class="org.jboss.netty.handler.execution.OrderedMemoryAwareThreadPoolExecutor">
          <constructor-arg index="0" value="8"/> <!-- name="corePoolSize" -->
          <constructor-arg index="1" value="0"/> <!-- name="maxChannelMemorySize" -->
          <constructor-arg index="2" value="0"/> <!-- name="maxTotalMemorySize" -->
          <constructor-arg index="3" value="5000"/> <!-- name="keepAliveTime" -->
          <constructor-arg index="4" value="MILLISECONDS"/> <!-- name="unit" -->
      </bean>
  </property>
</bean>
Using the UDP comm

The UDP implementation of the Comm component uses UDP datagrams for cache-coherence and user messages.

Other than the mentioned common ones, plus the bossExecutor, workerExecutor and receiveExecutor properties explained in [Configuring Netty Channels](#config-comm-netty, this implementation has the following configuration properties:

port (constructor-arg, int)
The UDP port the component will send and receive messages on. This value does not have to be the same for all nodes. (nodes know each other ports by publishing them as a configuration record in the cluster).

multicastGroup (property, java.net.InetSocketAddress, required if sendToServerInsteadOfMulticast is true)
The multicast IP address this node will join for multicast address, and the port to use for sending and receiving multicast messages. This value must be the same in all nodes. See the example below on how to set this property’s value.

multicastNetworkInterface (property, java.net.NetworkInterface, default: null)
The network interface to use for multicast. If set to null (the default), the default interface will be used.

receiveBufferSize (property, int, default: determined by the socket; implementation specific)
The size of the socket receive buffer (SO_RCVBUF). The SO_RCVBUF option is used by the the network implementation as a hint to size the underlying network I/O buffers. The SO_RCVBUF setting may also be used by the network implementation to determine the maximum size of the packet that can be received on this socket.

minimumNodesToMulticast (property, int, default: 3)
The minimum number of nodes in the cluster (not including servers) for this component to use multicast. If there are fewer online nodes than this value, the component will unicast messages to each node.

resendPeriodMillisecs (property, int, default: 20)
The duration in milliseconds to wait between consecutive resending of a message if a reply has not been received. If exponentioalBackoff is turned on (it’s turned on by default), this is the initial duration (between the first time the message is sent and the second).

exponentialBackoff (property, boolean, default: true)
If turned on (which is the default), doubles the duration between resending of messages after each re-send.

jitter (property, boolean, default: false)
If turned on, adds a random small jitter to the duration between resends.

minDelayMicrosecs (property, int, default: 1)
The minimum duration, in microseconds, to wait before transmitting a packet, for other messages to be sent so that they could be added to the same packet.

maxDelayMicrosecs (property, int, default: 10)
The maximum duration, in microseconds, to wait for additional messages (in case they keep arriving), before transmitting a packet.

maxQueueSize (property, int, default: 50)
The maximum number of messages waiting in the comm component’s message queue. If this number is reached, sending an additional message will block until the queue length falls beneath it.

maxPacketSize (property, int, default: 4096)
The maximum size of a single packet the comm component will transmit. Data-item size (defined by the maxItemSize property of the cache component; see Configuring the cache must not exceed this value (and there must also be some room left for headers).

maxRequestOnlyPacketSize (property, int, default: maxPacketSize / 2)
The maximum size of a packet that contains only request messages. Must be less than maxPacketSize. The exact semantics of this property is beyond the scope of this document, but if this value is too close to maxPacketSize a deadlock condition may arise (it will be clearly noted in the logs, so you can recognize it if it happens), and if it’s too small, performance under heavy load may suffer.

<bean id="comm" class="co.paralleluniverse.galaxy.netty.UDPComm">
    <constructor-arg name="serverComm" ref="serverComm"/>
    <property name="timeout" value="500"/>
    <property name="sendToServerInsteadOfMulticast" value="false"/>
    <constructor-arg name="port" value="${grid.port}"/>
    <property name="minimumNodesToMulticast" value="2"/>
    <property name="multicastGroup">
        <bean class="java.net.InetSocketAddress">
            <constructor-arg index="0" value="225.0.0.1"/>
            <constructor-arg index="1" value="7050"/>
        </bean>
    </property>
    <property name="resendPeriodMillisecs" value="35"/>
    <property name="exponentialBackoff" value="true"/>
    <property name="jitter" value="true"/>
    <property name="minDelayMicrosecs" value="500"/>
    <property name="maxDelayMicrosecs" value="2000"/>
    <property name="maxQueueSize" value="10"/>
    <property name="maxPacketSize" value="2048"/>
    <property name="maxRequestOnlyPacketSize" value="400"/>
    <property name="workerExecutor">
        <bean class="co.paralleluniverse.galaxy.core.ConfigurableThreadPool">
            <constructor-arg name="corePoolSize" value="2"/>
            <constructor-arg name="maximumPoolSize" value="8"/>
            <constructor-arg name="keepAliveMillis" value="5000"/>
            <constructor-arg name="maxQueueSize" value="500"/>
        </bean>
    </property>
    <property name="receiveExecutor">
        <bean class="org.jboss.netty.handler.execution.OrderedMemoryAwareThreadPoolExecutor">
            <constructor-arg index="0" value="8"/> <!-- name="corePoolSize" -->
            <constructor-arg index="1" value="0"/> <!-- name="maxChannelMemorySize" -->
            <constructor-arg index="2" value="0"/> <!-- name="maxTotalMemorySize" -->
            <constructor-arg index="3" value="5000"/> <!-- name="keepAliveTime" -->
            <constructor-arg index="4" value="MILLISECONDS"/> <!-- name="unit" -->
        </bean>
    </property>
</bean>
Configuring Netty Channels

Except for configuration messages used by the cluster component, all Galaxy network communication - peer-nodes to peer-nodes, peer-nodes to servers and slaves to masters - uses the Netty library (unless you’ve decided to use JGroups for peer-to-peer communication - see Using the Jgroups comm.

Netty communication channels use various thread-pools. TCP channels use a single “boss thread” taken from a “boss” thread-pool for making or accepting connections, and possibly multiple “worker” threads taken from a different pool, responsible for sending and receiving messages. UDP channels (they’re connectionless) only use worker threads.

Components using TCP, therefore have two properties, bossExecutor and workerExecutor taking an instance of java.util.concurrent.ThreadPoolExecutor. If you don’t set these properties, each uses a default thread-pool (returned from calling java.util.concurrent.Executors.newCachedThreadPool()). Components making use of UDP don’t have the bossThread property.

See Configuring a thread-pool for the thread-pool configuration details.

In addition, all components using Netty can optionally use another thread-pool, passed to the receiveExecutor property. This thread-pool, which must be an instance of org.jboss.netty.handler.execution.OrderedMemoryAwareThreadPoolExecutor (a subclass of java.util.concurrent.ThreadPoolExecutor), and it is used to actually process the receive messages. If you set this property to null (which is the default), all message processing will be done on the channel’s worker thread-pool.

OrderedMemoryAwareThreadPoolExecutor doesn’t take a maximumPoolSize argument (as its core-size is also its maximum size), but it does take two additional arguments:

maxChannelMemorySize (long)
The maximum total size, in bytes, of the queued events per channel (i.e. per cluster node we’re communicating with). A value of 0 disables this limit.

maxTotalMemorySize (long)
The maximum total size, in bytes, of the queued events for this pool. A value of 0 disables this limit.

Unfortunately, named constructor-args don’t seem to work with this class, so we must use argument indexes, and create the instance like so:

<bean class="org.jboss.netty.handler.execution.OrderedMemoryAwareThreadPoolExecutor">
    <constructor-arg index="0" value="8"/> <!-- "corePoolSize" -->
    <constructor-arg index="1" value="0"/> <!-- "maxChannelMemorySize" -->
    <constructor-arg index="2" value="0"/> <!-- "maxTotalMemorySize" -->
    <constructor-arg index="3" value="5000"/> <!-- "keepAliveTime" -->
    <constructor-arg index="4" value="MILLISECONDS"/> <!-- "unit" -->
</bean>
Using the JGroups comm

The JGroups comm implementation is co.paralleluniverse.galaxy.jgroups.JGroupsComm. It is only available when JGroups is chosen as the cluster component implementation (see Using JGroups). Because JGroups is configured in the cluster bean, this bean has no properties other than the common comm ones, and can be defined as simply as:

<bean id="comm" class="co.paralleluniverse.galaxy.jgroups.JGroupsComm">
	<constructor-arg name="serverComm" ref="serverComm"/>
    <property name="sendToServerInsteadOfMulticast" value="false"/>
    <property name="timeout" value="200"/>
</bean>

The timeout and sendToServerInsteadOfMulticast properties are kept at their default values in this example, so they can be dropped entirely.

Monitoring the comm component

TBD

Configuring, Running and Monitoring the Server

The Galaxy server node (or nodes, if more than one is configured in the backup group) provides two features:

  • Disk persistence
  • High availability in case of node failure.

A Galaxy cluster does not have to be configured to have a server. If disk persistence is not required, you can rely on backup groups alone for high-availability (see Backup groups). However, in some circumstances, server nodes can provide other benefits, such as improved performance.

There are two kinds of servers a Galaxy cluster can use. The first, which we’ll call “real servers”, and the other we’ll call “dumb servers”. We will refer to non-server nodes as “peer nodes”.

Real servers

Real servers are server nodes that run the Galaxy server software and (usually) access an embedded database.

Configuring real servers

A real server has a similar configuration file to that used in peer nodes, but with some shared and some different components:

cluster
This is the same as in the peer nodes. See Configuring the Cluster Component for instructions on how to configure this component.

comm
This is the equivalent of the peer nodes’ comm component (see Configuring and Monitoring the Comm Component), but it’s (currently) sole implementation (co.paralleluniverse.galaxy.netty.TcpServerServerComm) - the server side of the peers’ TCP serverComm - takes a port property (constructor-arg, int), plus, optionally, the bossExecutor, workerExecutor and receiveExecutor properties explained in Configuring Netty Channels.

Here’s a configuration example:

<bean id="comm" class="co.paralleluniverse.galaxy.netty.TcpServerServerComm">
    <constructor-arg name="port" value="9675"/>
    <property name="receiveExecutor">
        <bean class="org.jboss.netty.handler.execution.OrderedMemoryAwareThreadPoolExecutor">
            <constructor-arg index="0" value="8"/> <!-- name="corePoolSize" -->
            <constructor-arg index="1" value="0"/> <!-- name="maxChannelMemorySize" -->
            <constructor-arg index="2" value="0"/> <!-- name="maxTotalMemorySize" -->
            <constructor-arg index="3" value="5000"/> <!-- name="keepAliveTime" -->
            <constructor-arg index="4" value="MILLISECONDS"/> <!-- name="unit" -->
        </bean>
    </property>
</bean>

memory
This is the equivalent of the peers’ cache component, and it’s responsible for the server’s data-item logic. It has one implementation (co.paralleluniverse.galaxy.core.MainMemory) that takes just a monitoringType property (see The configuration file(s)). Here’s how it’s defined:

<bean id="memory" class="co.paralleluniverse.galaxy.core.MainMemory">
    <constructor-arg name="monitoringType" value="METRICS"/> <!-- METRICS/JMX -->
</bean>

store
This component is responsible for persisting and retrieving data items using a database. Its configuration is explained below.

Running the server

To run the server, simply run the executable Java class co.paralleluniverse.galaxy.Server, and optionally pass one or two command line arguments specifying the configuration file and, optionally, the properties file, too (see The configuration file(s) for an explanation).

Optionally, if for some reason you’d like to embed the Galaxy server in your own Java process, you may start the server by calling Server’s start method and optionally pas the configuration and properties files.

If no configuration file is specified, the server uses the file called galaxy.xml if it’s found somewhere on the classpath.

You may refer to the Server class Javadoc.

Dumb servers

Dumb servers are machines running some sort of a database server that can be used as Galaxy servers without being Galaxy nodes themselves, i.e., they do not run any Galaxy code - just the database server. Databases that provide network access can be used as dumb servers. When using a dumb server, the logic for accessing and communicating with them is hosted on the peer nodes.

To configure a dumb server, you must make some changes and additions to the peers’ components.

serverPipe
This simple additional bean is responsible for piping all messages sent to the server to the local server proxy. It’s defined thus:

<bean id="serverPipe" class="co.paralleluniverse.galaxy.server.CommPipe"/>

serverComm
Instead of a TCP connection to a real server, we will now be directing messages to the server through the serverPipe so serverComm is now defined so:

<bean id="serverComm" factory-bean="serverPipe" factory-method="getComm1">
    <constructor-arg index="0" value="${grid.nodeId}"/>
</bean>

memory
The memory component (responsible for server logic) now sits at the peers, so it has to be added to the peer configuration, and it receives messages from the other end of the serverPipe:

<bean id="memory" class="co.paralleluniverse.galaxy.core.MainMemory">
    <constructor-arg name="comm">
        <bean factory-bean="serverPipe" factory-method="getComm2">
            <constructor-arg index="0" value="0"/>
        </bean>
    </constructor-arg>
    <constructor-arg name="monitoringType" value="METRICS"/>
</bean>

store
The store is now configured at the peers. See below for instructions on how to configure the store.

Configuring the store

The store is the component responsible for data-item persistence, and is usually implemented on top of some database. At the moment there are two store implementations, one that uses BerkeleyDB Java Edition, and that uses any RDBMS with a JDBC driver.

Using BerkeleyDB

BerkeleyDB Java Edition (BDB JE) can be used as Galaxy’s store. Because BDB JE is an embedded database and does not have a network interface, it can only be used as a dumb server - only as part of a real server. The store implementation that uses BDB JE is co.paralleluniverse.galaxy.berkeleydb.BerkeleyDB, and it has two configuration properties:

envHome (constructor-arg, String)
The path to the directory which will contain the BDB files.

truncate (property, boolean, default: false)
Whether or not the database will be truncated (i.e., all the data-item data be deleted) when the server starts.

durability (property, com.sleepycat.je.Durability.SyncPolicy, default: WRITE_NO_SYNC)
Defines the disk synchronization policy to be used when committing a transaction. There are three possible values: SYNC, WRITE_NO_SYNC, or NO_SYNC, that are fully explained in the BDB JE Javadocs here.

Tuning of BerkeleyDB JE is possible by setting properties in the je.properties file, placed at the environment home directory. Details about BDB JE tuning can be found in the JE documentation here.

Here’s a configuration example:

<bean id="store" class="co.paralleluniverse.galaxy.berkeleydb.BerkeleyDB">
    <constructor-arg name="envHome" value="/usr/bdb/galaxy"/>
    <property name="truncate" value="true"/>
</bean>
Using SQL

Any SQL database that supports transactions and has a JDBC driver can be used as the store. Those that have a network interface can also become dumb servers. The store implementation that uses JDBC is co.paralleluniverse.galaxy.jdbc.SQLDB and here are it’s configuration properties:

dataSource (constructor-arg, javax.sql.DataSource)
The DataSource instance used to construct DB connections. See the example below on how to set this property.

maxItemSize (property, int, default: 1024)
The maximum size, in bytes, of a data-item. Must be the same as the maxItemSize set in the cache component (see Configuring the cache).

useUpdateableCursors (property, boolean, default: false)
Whether updateable cursors should be used in some atomic transactions. Might have a positive, or negative performance impact, depending on the database and driver implementation.

schema (property, String, default: pugalaxy)
The schema that will host the Galaxy table.

tableName (property, String, default: memory)
The name of the table that will store the data-items.

bigintType (property, String, default: queried with DatabaseMetaData if possible)
The name of the database’s SQL type for JDBC’s BIGINT. Should be set if automatic detection does not work.

smallintType (property, String, default: queried with DatabaseMetaData if possible)
The name of the database’s SQL type for JDBC’s SMALLINT. Should be set if automatic detection does not work.

varbinaryType (property, String, default: queried with DatabaseMetaData if possible)
The name of the database’s SQL type for JDBC’s VARBINARY. Should be set if automatic detection does not work.

<bean id="store" class="co.paralleluniverse.galaxy.jdbc.SQLDB">
    <constructor-arg name="dataSource">
        <bean class="org.apache.derby.jdbc.ClientDataSource40">
            <property name="serverName" value="mydbhost"/>
            <property name="portNumber" value="1527"/>
            <property name="databaseName" value="galaxydb"/>
            <property name="createDatabase" value="create"/>
        </bean>
    </constructor-arg>
    <property name="maxItemSize" value="1024"/>
    <property name="useUpdateableCursors" value="false"/>
</bean>

When using a SQL database, if the entire grid is taken down, you must manually either clear the Galaxy table (which is called pugalaxy.memory by default) if you’d like to dispose of the data, or, if you’d like to keep it, you must assign ownership of all data items to the server by running the following SQL command:

UPDATE pugalaxy.memory SET owner=0

Note: Do not forget to clear the database table or set the owner in all rows to 0 before re-starting the grid. If you don’t, havoc will ensue.

Miscellaneous Configurations

Thread pools

Galaxy makes extensive use of thread-pools.

Configuring a thread-pool

Several Galaxy components require you to provide a thread-pool in the form of a java.util.concurrent.ThreadPoolExecutor instance or a subclass of it.

These are ThreadPoolExecutor’s constructor-args:

corePoolSize, index:0, int
The number of threads to keep in the pool, even if they are idle.

maximumPoolSize, index:1, int
The maximum number of threads to allow in the pool.

keepAliveTime, index:2, long
When the number of threads is greater than the core, this is the maximum time that excess idle threads will wait for new tasks before terminating.

unit, index:3, java.util.concurrent.TimeUnit
The time unit for the keepAliveTime argument. Can be NONOSECONDS, MICROSECONDS, MILLISECONDS, SECONDS, MINUTES, HOURS or DAYS.

workQueue, index:4, java.util.concurrent.BlockingQueue
The queue to use for holding tasks before they are executed.

For the workQueue argument, is is best to use an instance of co.paralleluniverse.common.concurrent.QueueFactory` which constructs the queue most appropriate for the given maximum size. It is defined like this:

<bean class="co.paralleluniverse.common.concurrent.QueueFactory" factory-method="getInstance" c:maxSize="500"/>

And there are two special values for maxSize. -1 designates an unbounded queue, and 0 specifies a handoff “queue” where each producer must wait for a consumer (i.e. there can be no tasks waiting in the queue).

Here’s an example of an inner bean (Compound values (inner beans)) defining a ThreadPoolExecutor:

<bean class="java.util.concurrent.ThreadPoolExecutor">
    <constructor-arg index="0" value="2"/>
    <constructor-arg index="1" value="8"/>
    <constructor-arg index="2" value="5000"/>
    <constructor-arg index="3" value="MILLISECONDS"/>
    <constructor-arg index="4">
        <bean class="co.paralleluniverse.common.concurrent.QueueFactory" factory-method="getInstance" c:maxSize="500"/>
    </constructor-arg>
</bean>

Galaxy provides a convenience class that is a bit simpler to declare, which can be used instead of java.util.concurrent.ThreadPoolExecutor (but not when a specific subtype is required!)

<bean class="co.paralleluniverse.galaxy.core.ConfigurableThreadPool">
    <constructor-arg name="corePoolSize" value="2"/>
    <constructor-arg name="maximumPoolSize" value="8"/>
    <constructor-arg name="keepAliveMillis" value="5000"/>
    <constructor-arg name="maxQueueSize" value="500"/>
</bean>

Some Galaxy components may ask for a co.paralleluniverse.galaxy.core.NodeOrderedThreadPoolExecutor, which is configured so:

<bean class="co.paralleluniverse.galaxy.core.NodeOrderedThreadPoolExecutor">
    <constructor-arg name="corePoolSize" value="2"/>
    <constructor-arg name="maximumPoolSize" value="8"/>
    <constructor-arg name="keepAliveTime" value="5000"/>
    <constructor-arg name="unit" value="MILLISECONDS"/>
    <constructor-arg name="maxQueueSize" value="500"/>
    <constructor-arg name="workQueue">
        <bean class="co.paralleluniverse.common.concurrent.QueueFactory" factory-method="getInstance" c:maxSize="500"/>
    </constructor-arg>
</bean>
Monitoring thread-pools

All of Galaxy’s thread-pools expose monitoring information using MBeans. All of these MBeans are named co.paralleluniverse:type=ThreadPoolExecutor,name=POOL_NAME, and can be found in VisualVM or JConsole in the MBean tree under the co.paralleluniverse/ThreadPoolExecutor node.

documentation

Galaxy

View on Github API File a bug Discuss