Monday, January 17, 2011

JMS lifecycle and Searching Filters (Message Selector)

The JMS message lifecycle can be summarized as below with respect to the following two states:

1. A message sent by a JMS producer
* Without any associated transaction:
It is immediately current.
* Within a JTA transaction or transacted JMS session:
It remains pending until the transaction is committed or rolled back. If the transaction is committed, the message becomes available, and if the transaction is rolled back, the message is removed from the destination.
* With a specified TimeToDeliver property:
It remains pending until the TimeToDeliver period expires. This is expected because the purpose of the TimeToDeliver property is to keep the message unavailable to consumers until the specified time period.

2. A message received by a JMS consumer
* Without any associated JTA transaction and in a non-transacted JMS session with an acknowledgement mode of NO_ACKNOWLEDGE or MULTICAST_NO_ACKNOWLEDGE:
It is immediately removed from the destination.
* Within a JTA transaction or transacted JMS session:
It becomes pending until the transaction or session is committed or rolled back. On commit, the message is removed from the destination, and on rollback, the message becomes available again, unless a Redelivery delay is specified, in which case it continues to remain pending until the Redelivery delay.
* Within a non-transacted JMS session with the acknowledgement mode of AUTO_ACKNOWLEDGE, DUPS_OK_ACKNOWLEDGE, or CLIENT_ACKNOWLEDGE:
It becomes pending until the acknowledgement is received.

Message Selector

When searching for messages in JMS queue. There is an expression language for specifying parameters. This is known as Message Selector

This is the format. This is an example from weblogic that I was using.
JMSMessageId = 'ID:<982769.1294675696539.0>'

What confused me was the ID section. I assumed that was the variable name, so I was changing the format to ID=<> etc.

Note can also do logical operations, see http://download.oracle.com/javaee/6/api/javax/jms/Message.html
e.g.
"JMSType = 'car' AND color = 'blue' AND weight > 2500"

State String
The current state of a message, which could be one of DELAYED, EXPIRED, ORDERED, PAUSED, RECEIVE, REDELIVERY_COUNT_EXCEEDED, SEND, TRANSACTION, or VISIBLE.

Note with Weblogic Unit of Order
We were having problems. A standalone utility program is used to display Jms message see this post . It was having problems displaying some messages. One common theme appeared to be that they were in the Ordered state, as opposed to the visible state. This is due to the message using the Unit of Order to ensure ordered delivery.

Another blog post noted (http://forum.springsource.org/showthread.php?t=69398 )that if the message is in the Receive State, then it may be caused by having 2 Weblogic servers with the same name, where one weblogic server is consuming from a queue serverd from another Weblogic server.

Note also the Xml manipulation required on the JMSMessage







private static final Integer JMS_ALL_STATES = new Integer(0x7fffffff);

public JmsMessage[] getMEssages(){
try {
jmxCon = getJMXConnector(server, username, password);
jmxCon.connect();
MBeanServerConnection con = jmxCon.getMBeanServerConnection();
ObjectName destination = findQueueObjectName(con, jmsModuleName, queueName);
String newCursor = (String) con.invoke(destination, "getMessages", new Object[] { "JMSMessageID='"+msgId+"'", QueuePersistenceJmxImpl.BROWSE_TIMEOUT, JMS_ALL_STATES}, OP_GET_MESSAGES_ALL_SIGNATURE);
result.setCursor(newCursor);
// Should only be one value so no need to sort
//con.invoke(destination, "sort", new Object[] { newCursor, POSITION_UNDEFINED, DEFAULT_ORDERING_ATTRS, DEFAULT_ORDER }, OP_SORT_SIGNATURE);
result.setNumMessages((Long) con.invoke(destination, "getCursorSize", new Object[] { newCursor }, OP_GET_CURSOR_SIZE_SIGNATURE));
Long initialPosition =new Long(0);
Integer pageSize=new Integer(100);
data = (CompositeData[]) con.invoke(destination, "getItems", new Object[] { newCursor, initialPosition, pageSize }, OP_GET_ITEMS_SIGNATURE);
if (data != null) {
JmsMessage[] ret = toJmsMessages(data, new CursorParam(destination, con, newCursor));
if(ret!=null){
if(ret.length>1)
logger.warn("findMessage (by messageId) returned more than one result"+JavaUtil.toString(ret));
return ret[0];
}
return null;
}
} catch (Exception e) {
// throw new RuntimeException("Error while browsing messages",
// e);
// ignore the error and try the next node
logger.error("Error searching for message " + msgId + " on server " + server, e);
}

private static JMXConnector getJMXConnector(Server server, String username, String password) throws IOException {
String fullServerURL = "service:jmx:iiop://" + server.getUrl() + "/jndi/weblogic.management.mbeanservers.runtime";
JMXConnector jmxCon = null;

JMXServiceURL serviceUrl = new JMXServiceURL(fullServerURL);

Hashtable env = new Hashtable();
env.put(JMXConnectorFactory.PROTOCOL_PROVIDER_PACKAGES, "weblogic.management.remote");
env.put(javax.naming.Context.SECURITY_PRINCIPAL, username);
env.put(javax.naming.Context.SECURITY_CREDENTIALS, CryptoUtilWrapper.decrypt(password));

jmxCon = JMXConnectorFactory.newJMXConnector(serviceUrl, env);

return jmxCon;
}

private JmsMessage[] toJmsMessages(CompositeData[] data, CursorParam params) {
XPathUtils xpathUtils = null;
Document d;
JmsMessage[] ret = new JmsMessage[data.length];
int i=0;
MBeanServerConnection con = params.getConnection();
ObjectName destination = params.getDestination();
try {
for (CompositeData resItem : data) {
JmsMessage msg = new JmsMessage();
d = XMLUtils.stringToDocument((String) resItem.get("MessageXMLText"));
xpathUtils = new XPathUtils(d, QueuePersistenceJmxImpl.JMS_MESSAGE_NS_CTX);
xpathUtils.setDocument(d);
msg.setId(xpathUtils.findValue("/mes:WLJMSMessage/mes:Header/mes:JMSMessageID"));
msg.setTimestamp(new Date(Long.parseLong(xpathUtils.findValue("/mes:WLJMSMessage/mes:Header/mes:JMSTimestamp"))));
// Get BOdy
CompositeData body = (CompositeData) con.invoke(destination, "getMessage", new Object[] { params.getCursor(), msg.getId() }, OP_CURSOR_GET_MESSAGE_SIGNATURE);
d = XMLUtils.stringToDocument((String)body.get("MessageXMLText"));
xpathUtils = new XPathUtils(d, QueuePersistenceJmxImpl.JMS_MESSAGE_NS_CTX);
xpathUtils.setDocument(d);
msg.setBody(xpathUtils.findValue("/mes:WLJMSMessage/mes:Body/mes:Text"));
ret[i++]=msg;
}
return ret;
} catch (Exception e) {
throw new RuntimeException("Error while transforming CompositeData to JmsMessage", e);
}
}

No comments: