Founder of the Compass Open Source Project
Shay is the founder of the Compass open source project, a unique solution enabling search capabilities into any application model. He started working on mission critical real time C/C++ systems, later moving to Java (and never looked back). Within the Java world, Shay has worked on a propriety implementation of a distributed rule engine(RETE) server, your typical Java based web projects, and messaging based projects within the financial industry. Currently, Shay is a System Architect at GigaSpaces, GigaSpaces provides a single platform for end-to-end scalability of high performance and stateful distributed applications. GigaSpaces’ unique approach enables developers to Write their business logic Once and then seamlessly Scale out the application linearly Anywhere.Presentations by Shay Banon
Rapid Fire: The Compass Enterprise Search Engine Framework
Compass is an open source Java Search Engine framework, allowing the integration of search functionality into any application. One of Compass main modules is a Spring integration module, heavily used among Compass user base."Rapid Fire Session- The Compass Search Engine Framework
Compass is an open source Java Search Engine framework, allowing the integration of search functionality into any application. One of Compass main modules is a Spring integration module, heavily used among Compass user base."KimchyBlog
Here we go. Here comes the Prozac
Sunday, November 23, 2008
java.util.concurrent ExecutorService allows for a simple way of using a thread pool within a java application. I have seen the following happens in more than one place (including some quite known open source projects) that I thought it make sense to blog about it.
One of the most common scenarios of using a thread pool is creating an unbounded thread pool with minimum and maximum number of threads. With the executor service, you can create the following quite simply:
A fixed size thread pool with unbounded queue. Problematic in our case since we want to set the minimum and maximum thread pool size.
1 2 3 4 | // similar to j.u.c.Executors.newFixedThreadPool(int nThreads) new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()) |
A cached thread pool that creates threads as needed and reuses threads when possible (problematic with bounded maximum threads so can’t be used in our case):
1 2 3 4 | // similar to j.u.c.Executors.newCachedThreadPool() new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); |
The problem starts when one needs to create an unbounded thread pool with minimum and maximum threads. What most people do is the following:
1 2 3 4 | // similar to j.u.c.Executors.newCachedThreadPool() new ThreadPoolExecutor(coreThreads, maximumThreads, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>()); |
This construction of thread pool will simply not work as expected. This is due to the logic within the ThreadPoolExecutor where new threads are added if there is a failure to offer a task to the queue. In our case, we use an unbounded LinkedBlockingQueue, where we can always offer a task to the queue. It effectively means that we will never grow above the core pool size and up to the maximum pool size.
OK, so this does not work, lets try and build one that works. The first thing that we want to do is create a blocking queue that is aware of the ThreadPoolExecutor:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 | public class ScalingQueue<E> extends LinkedBlockingQueue<E> { /** * The executor this Queue belongs to */ private ThreadPoolExecutor executor; /** * Creates a <tt>TaskQueue</tt> with a capacity of * {@link Integer#MAX_VALUE}. */ public ScalingQueue() { super(); } /** * Creates a <tt>TaskQueue</tt> with the given (fixed) capacity. * * @param capacity the capacity of this queue. */ public ScalingQueue(int capacity) { super(capacity); } /** * Sets the executor this queue belongs to. */ public void setThreadPoolExecutor(ThreadPoolExecutor executor) { this.executor = executor; } /** * Inserts the specified element at the tail of this queue if there is at * least one available thread to run the current task. If all pool threads * are actively busy, it rejects the offer. * * @param o the element to add. * @return <tt>true</tt> if it was possible to add the element to this * queue, else <tt>false</tt> * @see ThreadPoolExecutor#execute(Runnable) */ @Override public boolean offer(E o) { int allWorkingThreads = executor.getActiveCount() + super.size(); return allWorkingThreads < executor.getPoolSize() && super.offer(o); } } |
As you can see, we are going to reject the addition of a new task if there are no threads to handle it. This will cause the thread pool executor to try and allocate a new thread (up to the maximum threads). If there are no threads, the task will be rejected. In our case, if the task is rejected, we would like to put it back to the queue. This is a simple thing to do with ThreadPoolExecutor since we can implement our own RejectedExecutionHandler:
1 2 3 4 5 6 7 8 9 10 | public class ForceQueuePolicy implements RejectedExecutionHandler { public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { try { executor.getQueue().put(r); } catch (InterruptedException e) { //should never happen since we never wait throw new RejectedExecutionException(e); } } } |
Last, as a way to increase performance of additions of tasks to the queue, we can enhance the built in getActiveCount method in ThreadPoolExecutor to be faster (by default it obtains a lock and runs on the current workers):
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 | public class ScalingThreadPoolExecutor extends ThreadPoolExecutor { /** * number of threads that are actively executing tasks */ private final AtomicInteger activeCount = new AtomicInteger(); public ScalingThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue); } @Override public int getActiveCount() { return activeCount.get(); } @Override protected void beforeExecute(Thread t, Runnable r) { activeCount.incrementAndGet(); } @Override protected void afterExecute(Runnable r, Throwable t) { activeCount.decrementAndGet(); } } |
That is it. Now, we can construct our thread pool (a simple factory method is provided here):
1 2 3 4 5 6 7 8 | public static ExecutorService newScalingThreadPool(int min, int max, long keepAliveTime) { ScalingQueue<Runnable> queue = new ScalingQueue<Runnable>(); ThreadPoolExecutor executor = new ScalingThreadPoolExecutor(min, max, keepAliveTime, TimeUnit.MILLISECONDS, queue); executor.setRejectedExecutionHandler(new ForceQueuePolicy()); queue.setThreadPoolExecutor(executor); return executor; } |
Thanks to Moran, a fellow GigaSpaceyan for creating such a clean implementation.
Comments are welcomed, Enjoy!
Sunday, November 2, 2008
Compass version 2.1 GA released. Major feature since the RC release include the ability to configure Compass using JSON (both configuration and mappings). There are important fixes that are included, among them is the ability to work with Terracotta based index and Lucene 2.4. Full release notes can be found here.
As always, feedback will be welcomed, and work on 2.2 M1 had already began :). Enjoy!
p.s.
Posts with a breakdown of the most important features will be submitted to the usual suspects. For now, here are links to the previous releases: 2.1.0 M1, 2.1.0 M2, 2.1.0 M3, 2.1.0 M4, 2.1.0 RC.
Wednesday, October 29, 2008
The time between RC and GA in Compass was always time for nice to have features that I always wanted to get in but got out of the way for more important ones. One of these features is the ability to configure Compass using JSON.
Compass itself can be configured using properties/settings based configuration. On top of that, Compass can also be configured using schema based xml configuration. On top of the properties based configuration, Compass can now be configured using JSON. Here is the same JSON configuration:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 | {
compass : {
engine : {
connection : "test-index"
},
event : {
preCreate : {
event1 : {
type : "test.MyEvent1"
},
"event2.type" : "test.MyEvent2"
}
}
}
} |
Compass mappings (RSEM, XSEM, OSEM and JSEM) can now be configured using JSON as well (on top of xml based configuration). Here is an OSEM example:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 | {
"compass-core-mapping" : {
package : "org.compass.core.test.simple",
class : {
name : "A",
alias : "A",
id : {
name : "id"
},
property : [
{
name : "value",
"meta-data" : {
name : "value"
}
},
{
name : "otherValue",
"meta-data" : {
name : "other-value"
}
}
]
}
}
} |
And here is a JSEM example:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 | {
"compass-core-mapping" : {
"root-json-object" : [
{
alias : "test",
id : {
name : "id"
},
property : [
{
name : "int",
"value-converter" : "int",
format : "0000"
},
{
name : "float",
"value-converter" : "float",
format : "0000.00",
store : "compress"
}],
content : {
name : "content"
}
}
}
} |
Enjoy!
Sunday, October 19, 2008
Compass version 2.1.0 RC released. Major features include upgrade of Lucene version to 2.4 (make sure to update the lucene jar files) and usage of Lucene 2.4 features, better cache management of Lucene Readers/Searches, pluggable Lucene similarity, and many bug fixes and minor improvements. Hoperfuly, this will be the only release before GA, which is planned to be released in a couple of weeks. Check out the release notes. Enjoy!.
Tuesday, September 23, 2008
Compass version 2.1.0 M4 released. This is a critical bug fix release of the M3 released just a few days ago and is a recommended release for all 2.1.0 M3 users. Check out the release notes. Enjoy!.