Sunday, December 13, 2009

Executors and Implementations JSR 166

Executors and Implementations JSR 166

We do face OutOfMemory error not only because of Heap size, eventually because of number of threads get created in JVM. Managing threads lifecycle will cost lot than actual business implementation. JSR 166 comes with interfaces Executor, ExecutorService, and ScheduledExecutorService.

ScheduledExecutorService subinterface of ExecutorService and ExecutorService is subinterface of Executor.

Executor has execute(Runnable command) API.

Prior to JSR166, we create a new thread and set the context and/or classloader bringing the thread to runnable state will take considerable time. Similarly, garbage collector has to clear the dead thread references.

In JSR166, Executor takes and treats all new thread(Thread object, Runnable implementation object) as a job and the same will get run and produce result with using available or predefined worker threads.

java.util.concurrent.Executors class has n number of factory methods which helps to create the thread pools based on our need.


  • newFixedThreadPool(...) fixed number of threads will be created and used to run all the job

  • newSingleThreadExecutor() only one thread will do job one by one

  • newCachedThreadPool() Integer.MAX_VALUE threads possible to create automatically and reuse based on the jobs load. Once job is done then thread may die if no more jobs found in queue

  • newSingleThreadScheduledExecutor() schedule the job time to run in a single thread

  • newScheduledThreadPool(int corePoolSize) schedule the job time to run, based on corePoolSize threads will be created and reused

  • unconfigurableExecutorService(ExecutorService) this ExecutorService freeze thread pool configuration and allows to executes job based on ExecutorService delegated in this.




ThreadPoolExecutor



Above mentioned ExecutorService are not enough for the realtime business. For instance, my application wants to establish 300 HTTP connection at a time, means that 300 runnable has to run in a given time, there could be more than 300 jobs may wait in queue. Instance of java.util.concurrent.ThreadPoolExecutor class will help us to do this

new ThreadPoolExecutor(300, Integer.MAX_VALUE,60L, TimeUnit.SECONDS, new SynchronousQueue());

where 1st argument is corePoolSize, 2nd maxPoolSize, 3rd keepAlive, 4th Queue object

Based on Queueing mechanism, Executor behaves differently

  • Direct handoffs(eg: SynchronousQueue) - if no thread available to run new task then fails. Direct handoffs generally require unbounded maximumPoolSizes to avoid rejection.

  • Unbounded queues (eg: LinkedBlockingQueue)- no use of specifying maxPoolSize, upto maximum of corePoolSize threads will be created and kept for keepalive time if no new job found. No rejection possible until Integer.MAX_VALUE reached. But, keep in mind if this grows, we get OutOfMemory error for Heapspace.

  • Bounded queues (eg: ArrayBlockingQueue) - Rejection possible if it tasks count in queue increase more than maxPoolSize



ThreadFactory



In general, threads will be created with normal priority and with default context. Or else, we may need to do some manipulation in the time of thread creation as a pre-thread creation activity, then ThreadFactory interface helps to do that.


class myThreadFactory implements ThreadFactory
{
Thread newThread(Runnable r)
{
System.out.println("new thread is created");
Thread t =new Thread(r);
t.setPriority(Thread.MAX_PRIORITY); // maximum priority set to threads created for this pool
}

}



Callable and Future interfaces



I have submitted job and I would like to check the status of the task and also wants to get the result from the task completion. To achieve this, instead of Runnable interface implementation, we have to implement Callable implementation which has a method call(...) with return type.


FutureTask<String> future =
new FutureTask<String>(new Callable<String>() {
public String call() {
return "your job result goes here";
}});
executor.execute(future);

or else


Future<String> future = executor.submit(new Callable<String>() {
public String call() {
return searcher.search(target);
}});
executor.submit(future);



future.get() returns the result of task once it get completed.

Assume I need to cancel, after enqueued job in future queue, then FutureTask class will be useful.

No comments:

Post a Comment

Recent Posts

Unix Commands | List all My Posts

Texts

This blog intended to share the knowledge and contribute to JAVA Community such a way that by providing samples and pointing right documents/webpages. We try to give our knowledege level best and no guarantee can be claimed on truth. Copyright and Terms of Policy refer blogspot.com