by Kuldip Bajwa
Using Asynchronous communication will always have performance benefits over a synchronous/blocking
paradigm.When you have a design whereby you are making two Web service calls in the same method and their result set
is mutually exclusive for example, this is a clear candidate of using asynchronous communication via
Asynchronous Beans.
In this I will talk about the IBM reference implementation of Asynchronous Beans and show a small code
example to compliment.
Basically Asynchronous Beans is a new feature introduced in Websphere Application Server (WAS) v5.0. It
allows JEE applications to take advantage of the asynchronous communication paradigm by using the server
thread pool of worker threads defined on the WAS server defined by:
#---------------------------------------------------
# Application Server Related Variables
#---------------------------------------------------
serverName = "serverName"
serverNode = "serverNode"
cookieName = "JSESSIONID"
threadPoolMinSize = 10
threadPoolMaxSize = 200
minJVMHeapSize = 768
maxJVMHeapSize = 2560
The main interface that you need to work with is com.ibm.websphere.asynchbeans.WorkManager. This will
be set as resource-ref on either a Web application or EJB module as this type.
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.NamingException;
import com.ibm.websphere.asynchbeans.WorkManager;
/**
* CodeDairy.com
* AsynchHelper class.
* @author Kuldip Bajwa
* @version 1.0
* */
public class AsynchHelper extends Object{
/**
* WORK_MANAGER_JNDI.
* */
private static final String WORK_MANAGER_JNDI
= "java:comp/env/testWorkManager";
/**
* Global store for WorkManager type related object keys.
*
* As ConcurrentHashMap doesn't lock the entire the
* the Map bucket, it is important to provide
* thread safety for "put if-absent" of aggregate
* Map algorithms. The ConcurrentHashMap provides
* much better scalability compared to complete
* Thread safe collections implementations such Hashtable
* or Vector. To use the ConcurrentHashMap effectively
* you must sychnorize key areas - read/writes.
*/
private static final ConcurrentMap<String, WorkManager> JNDI_CACHE
= new ConcurrentHashMap<String, WorkManager>(10);
/**
* The lock.
* */
private static final Lock LOCK = new ReentrantLock();
/**
* constructor.
* */
private AsynchHelper(){};
/**
* getWorkManager.
* @return the work manager
* @throws NamingException NamingException
* */
public static WorkManager getWorkManager() throws NamingException {
WorkManager workManager = null;
LOCK.lock();
try {
/**
* get WorkManager naming Context from Cache.
*/
workManager = JNDI_CACHE.get(WORK_MANAGER_JNDI);
if (workManager == null) {
Context context = new InitialContext();
workManager = (WorkManager) context.lookup(WORK_MANAGER_JNDI);
/**
* place WorkManager in cache
*/
JNDI_CACHE.putIfAbsent(WORK_MANAGER_JNDI, workManager);
}
} finally {
LOCK.unlock();
}
return (workManager);
}
}
As an example, let’s take a scenario whereby a method requires to fetch a user’s account details and also perform a bureau credit check. In this particular use case, retrieving the account and credit details can be done in a mutually exclusive fashion.
So, in this example there will be two light weight processes (threads) spawned asynchronously each doing the following units of work asynchronously:
- Fetching user’s account details
- Performing a bureau credit check
The APIs will be explained as we walk through the above user case in code and implementing the following steps:
- Each process will be of type com.ibm.websphere.asynchbeans.Work.
- Each process will contain a unit of work that will be finally joined, all logic will placed inside the run method that is defined in the Work interface.
- Ideally but not necessary (depending on your use case) two separate classes should be created each implementing com.ibm.websphere.asynchbeans.Work.
Use case code flow details as follows:
- WorkManager retrieved at 1.
- Two threads spawned when Work types are called when the startWork is called at 2. The startWork method will in turn call the Work:run( ) function which at that point will create a new process or thread and placed into ready to run state. It will be entirely upto the ThreadScheduler as to when the thread is excuted i.e.put into the running state.
- This will return two com.ibm.websphere.asynchbeans.WorkItem objects at 2.
- WorkItems at 2 need to be placed into an ArrayList – note some implementations are not type safe and do not enforce the use of generics. Therefore it is safer to use raw types, because this can only be determined at runtime.
- At 3, the WorkManager wait indefinitely for the two threads to come out of their running state and a join of threads will be done. This ensures that the threads have completed their required tasks. See IBM WorkManager for WorkManager API information.
- Once returned the Work types, in this case the AccountHandler and the UserCreditCheckHandler, the appropriate public state methods can be used to get the required data. This is at 5.
Client
import com.ibm.websphere.asynchbeans.WorkException;
import com.ibm.websphere.asynchbeans.WorkItem;
import com.ibm.websphere.asynchbeans.WorkManager;
public class AccountOpener {
public AccountOpener() {
super();
}
public List getPartyAccountsAndValidate( final String partyId ) {
try {
WorkManager workManager = AsynchHelper .getWorkManager() //1
WorkItem itemAccountDetails = workManager.startWork(new AccountHandler (partyId )); //2
WorkItem itemCreditCheck = workManager.startWork(new UserCreditCheckHandler (partyId));
//Create an ArrayList
ArrayList userDetails = new ArrayList (); //3
userDetails.add(itemAccountDetails );
userDetails.add(itemCreditCheck );
//Join them using WorkManager workManager
workManager.join(items, WorkManager.JOIN_AND,(int) WorkManager.INDEFINITE); //4
List accounts = ((AccountHandler ) itemAccountDetails). getAccounts(); //5
String checkStatus = (( UserCreditCheckHandler ) itemCreditCheck).getCheckStatus();
} catch (WorkException wex) {
throw new RuntimeException(wex);
} catch (IllegalArgumentException iaex) {
throw new RuntimeException(iaex);
}
}
}
Work Item 1: used to fetch user account details
public class AccountHandler implements com.ibm.websphere.asynchbeans.Work {
private final List userAccounts = new CopyOnWriteArrayList(10);
private String partyId;
//Data store used for example
private static final String DATASOURCE_JNDI = “java:comp/env/jdbc/accounts”;
//SQL query
Private static final String SQL_QUERY = “select ACCOUNT_NUMBERS from PARTY_ACCOUNTS where PARTY_ACCOUNTS.PARTY_ID = ?”;
public AccountHandler( String partyId ) {
super( );
this.partyId = partyId;
}
public void run ( ) {
try {
DataSource dS = DatabaseUtilities.Lookup(DATASOURCE_JNDI );
Connection connection = dS.getConnection();
//initialise Statement and JDBC ResultSet
PreparedStatement statement = null;
ResultSet resultSet = null;
statement = connection.prepareStatement(SQL_QUERY,
ResultSet.TYPE_SCROLL_SENSITIVE);
//bind variable
statement.setString(1 , this.partyId );
/**
* execute SQL.
*/
resultSet = statement.executeQuery();
//ITERATE OVER RESULTSET AND PERFORM RESULT SET TO OBJECT MAPPINGS
//set data into List
userAccounts.add(ACCOUNT);
}
catch(SQLException sqlEx) {
throw new RuntimeException( sqlEx );
}
finally {
/**
* clean all JDBC resources.
*
* order: ResultSet, Statement, Connection Sun best practice.
*/
try {
DatabaseUtilities.closeResources(resultSet, statement, connection);
}
catch (SQLException sqlex) {
throw new RuntimeException( sqlEx );
}
}
}
public List getAccounts() {
return this.userAccounts;
}
} //end of class
Work Item 2: used to fetch user credit check data
public class UserCreditCheckHandler implements com.ibm.websphere.asynchbeans.Work {
private String partyCreditStatus;
//Data store used for example
private static final String DATASOURCE_JNDI = “java:comp/env/jdbc/credit”;
//SQL query
Private static final String SQL_QUERY = “select STATUS from PARTY_CREDIT_CHECK where PARTY_CREDIT_CHECK.PARTY_ID = ?”;
public UserCreditCheckHandler ( String partyId ) {
super( );
this.partyId = partyId;
}
public void run ( ) {
try {
DataSource dS = DatabaseUtilities.Lookup(DATASOURCE_JNDI );
Connection connection = dS.getConnection();
//initialise Statement and JDBC ResultSet
PreparedStatement statement = null;
ResultSet resultSet = null;
statement = connection.prepareStatement(SQL_QUERY,
ResultSet.TYPE_SCROLL_SENSITIVE);
//bind variable
statement.setString(1 , this.partyId );
/**
* execute SQL.
*/
resultSet = statement.executeQuery();
if(resultSet != null) {
while(result.Set.next( ) ) {
this.partyCreditStatus = resultSet.getString(1 );
}
}
}
catch(SQLException sqlEx) {
throw new RuntimeException( sqlEx );
}
finally {
/**
* clean all JDBC resources.
*
* order: ResultSet, Statement, Connection Sun best practice.
*/
try {
DatabaseUtilities.closeResources(resultSet, statement, connection);
}
catch (SQLException sqlex) {
throw new RuntimeException( sqlEx );
}
}
}
public String getCheckStatus() {
return this. partyCreditStatus;
}
} //end of class
Simply JDBC utility class for this illustration:
/*
* J2SE imports
*/
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
/*
* JEE imports
*/
import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.NamingException;
import javax.sql.DataSource;
public class DatabaseUtilities {
/* ---------------- Class Fields -------------- */
/**
* Global store for DataSource type related object keys.
*
* As ConcurrentHashMap doesn't lock the entire the Map bucket, it is
* important to provide thread safety for "put if-absent" of aggregate Map
* algorithms. The ConcurrentHashMap provides much better scalability
* compared to complete Thread safe collections implementations such
* Hashtable or Vector. To use the ConcurrentHashMap effectively you must
* sychnorize key areas - read/writes.
*/
private static Map jndiCache = new ConcurrentHashMap(1);
/**
* Lock implementations provide more extensive locking operations
* than can be obtained using synchronized methods and
* statements. They allow more flexible structuring, may have quite
* different properties, and may support multiple associated
* {@link Condition} objects.
*/
private static final Lock lock = new ReentrantLock();
/**
* private constructor to prevent object instantiation externally.
*/
private DatabaseUtilities() {
super();
}
/**
* This is a utility method to clean up JDBC resources. ResultSet, Statement
* and the Connection. Used in finally block of client calling routine.
*
* Sun best practices recommend freeing the resource in this order.
*
* @param ResultSet
* @param Statement
* @param Connection
*
* @throws SQLException
*
*/
public static void closeResources(ResultSet resultSet, Statement statement,
Connection connection) throws SQLException {
/**
* close JDBC ResultSet.
*/
if (resultSet != null) {
resultSet.close();
}
/**
* close JDBC Statement.
*/
if (statement != null) {
statement.close();
}
/**
* close JDBC Dsitributed Connection.
*/
if (connection != null) {
connection.close();
}
}
Resources
Learn more @
Get more information in the Asynchronous beans programming guide.
Code Samples : Utility Program
/**
* This is a utility method to get a DataSource from the server
* via a JNDI lookup. This is for distributed transactions i.e. 2phase
* commit.
*
* The jndi resource is defined in the XML Configuration file externally.
*
* @param JNDI resource String
*
* @throws NamingException
*
*
*/
public static DataSource lookup(String jndiName) throws NamingException {
/* ---------------- critical section -------------- */
lock.lock();
DataSource dataSource = null;
try {
/**
* get JDBC Distributed DataSource from Cache.
*/
dataSource = jndiCache.get(jndiName);
/**
* if not available from cache then retrieve DataSource
* from server Connection pool and store in cache.
*/
if (dataSource == null) {
Context context = new InitialContext();
Object obj = context.lookup(jndiName);
dataSource = (DataSource) obj;
/**
* place DataSource in cache
*/
jndiCache.put(jndiName, dataSource);
}
}
/*
* unlock the current intrinsic lock.
*/
finally {
lock.unlock();
}
return dataSource;
}
}// end of class