Thursday, February 17, 2011

Weblogic JMS pause and resume consumption

This program is an example of connecting to weblogic and manipulating the JMS queues.
The example here, pauses and resumes the consumption of a selected queue.


/*
* $Revision: $
* $Date: $
* $Author: $
*/

package com.test;

import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Map;
import java.util.Hashtable;
import java.util.Set;
import java.util.Vector;

import javax.management.MBeanServerConnection;
import javax.management.MalformedObjectNameException;
import javax.management.ObjectName;
import javax.management.remote.JMXConnector;
import javax.management.remote.JMXConnectorFactory;
import javax.management.remote.JMXServiceURL;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class QueueManipulator {
Logger logger = LoggerFactory.getLogger(this.getClass());
public static final String SYSTEM_OSB = "OSB";

public static void main(String[] args) {
QueueManipulator qm = new QueueManipulator();
qm.run();
}

public void run() {
String[] queues = { "PurchaseRequisitionCreateQueue" };
// Pause
updatePauseStatus(queues, true);
// resume
updatePauseStatus(queues, false);
}

private Collection updatePauseStatus(String[] queueNames,
boolean pause) {
Vector updatedQueues = new Vector();
Collection serverList;
// Connection details
String jmsModuleName, username, password;
serverList = new ArrayList();
serverList.add(new Server("ms_soa1", "osb1tst2:7001"));
// serverList.add(new Server("ms_soa2" ,"soa2tst2:7001"));
jmsModuleName = "ApplicationModule";
username = "khyland";
password = "Password1";

for (Server server : serverList) {
JMXConnector jmxCon = null;
try {
jmxCon = getJMXConnector(server, username, password);
jmxCon.connect();
MBeanServerConnection con = jmxCon.getMBeanServerConnection();
String action = pause ? "pauseConsumption"
: "resumeConsumption";
Exception exception = null;
for (String queueName : queueNames) {
exception = null;
try {
ObjectName destination = findQueueObjectName(con,
jmsModuleName, queueName);
if (pause) {
Object pauseConsResult = con.invoke(destination,
action, null, null);
logger.info("Result from pauseConsumption: "
+ pauseConsResult);
} else {
Object resumeConsResult = con.invoke(destination,
action, null, null);
logger.info("Result from resumeConsumption: "
+ resumeConsResult);
}
updatedQueues.add(new QueueInfo(SYSTEM_OSB, queueName));
} catch (Exception e) {
exception = e;
logger.error("Queue not found name " + jmsModuleName
+ ":" + queueName);
throw e;
} finally {
// Audit.getInst().msg(Audit.AuditAction.PauseQueue,
// action+" of "+jmsModuleName+":"+queueName+" "+(exception==null?"success":"failed"),
// null, exception);
}
}

} catch (Exception ioe) {
// TODO: review
// throw new RuntimeException("Error while pausing the queue",
// ioe);
// ignore the error and continue working with the rest of the
// nodes
logger.error("Error while pausing the queue on " + server, ioe);
} finally {
if (jmxCon != null)
try {
jmxCon.close();
} catch (IOException e) {
logger.error("Error while closing JMX connection", e);
}
}
}
return updatedQueues;
}

private static JMXConnector getJMXConnector(Server server, String username,
String password) throws IOException {
String fullServerURL = "service:jmx:iiop://" + server.getUrl()
+ "/jndi/weblogic.management.mbeanservers.runtime";
JMXConnector jmxCon = null;

JMXServiceURL serviceUrl = new JMXServiceURL(fullServerURL);

Hashtable env = new Hashtable();
env.put(JMXConnectorFactory.PROTOCOL_PROVIDER_PACKAGES,
"weblogic.management.remote");
env.put(javax.naming.Context.SECURITY_PRINCIPAL, username);
env.put(javax.naming.Context.SECURITY_CREDENTIALS, password);

jmxCon = JMXConnectorFactory.newJMXConnector(serviceUrl, env);

return jmxCon;
}

private ObjectName findQueueObjectName(MBeanServerConnection con,
String jmsModuleName, String queueName)
throws MalformedObjectNameException, NullPointerException,
Exception {
ObjectName query_obj_name;
query_obj_name = new ObjectName(
"com.bea:Type=JMSDestinationRuntime,Name=" + jmsModuleName
+ "!*" + queueName + "*,*");
Set objectNames = con.queryNames(query_obj_name, null);
if (objectNames == null || objectNames.size() == 0) {
throw new Exception(
"NotFoundException: Error while getting a reference to the queue (queue "
+ queueName + " not found)");
} else if (objectNames.size() > 1) {
throw new Exception(
"DuplicateFoundException: Error while getting a reference to the queue (more than one queue found named "
+ queueName + ")");
} else {
return objectNames.iterator().next();
}
}

class Server implements Serializable {
private static final long serialVersionUID = -466143988049844989L;

private String name;
private String url;

public Server() {
}

public Server(String name, String url) {
setName(name);
setUrl(url);
}

public void setName(String name) {
this.name = name;
}

public String getName() {
return name;
}

public void setUrl(String url) {
this.url = url;
}

public String getUrl() {
return url;
}

@Override
public String toString() {
return (new StringBuilder()).append(getName()).append(" [")
.append(getUrl()).append("]").toString();
}
}

public class QueueInfo implements Comparable {

private String system;
private String queueName;
private String jndiName;
private Collection serverQueueInfo = new Vector();

public QueueInfo(String system, String queueName) {
this(system, queueName, "");
}

public QueueInfo(String system, String queueName, String jndiName) {
this.system = system;
this.queueName = queueName;
this.jndiName = jndiName;
}

public String getQueueName() {
return queueName;
}

public void setQueueName(String queueName) {
this.queueName = queueName;
}

public Collection getServerQueueInfo() {
return serverQueueInfo;
}

public void setServerQueueInfo(
Collection serverQueueInfo) {
this.serverQueueInfo = serverQueueInfo;
}

public void addSingleServerQueueInfo(String serverName,
SingleServerQueueInfo singleServerQueueInfo) {
serverQueueInfo.add(singleServerQueueInfo);
}

public String getJndiName() {
return jndiName;
}

public void setJndiName(String jndiName) {
this.jndiName = jndiName;
}

@Override
public int compareTo(QueueInfo o) {
return queueName.compareTo(o.getQueueName());
}

@Override
public boolean equals(Object o) {
if (o == null)
return false;
if (o instanceof QueueInfo)
return this.compareTo((QueueInfo) o) == 0;
return false;
}

@Override
public int hashCode() {
return queueName.hashCode();
}

@Override
public String toString() {
return getSystem() + ":" + getQueueName() + ":" + getJndiName();
}

public String getSystem() {
return system;
}

public void setSystem(String system) {
this.system = system;
}
}

public class SingleServerQueueInfo {

private String serverName;
private String queueName;
private String jndiName;
private long messagesCurrent;
private long messagesHigh;
private long messagesPending;
private long consumersCurrent;
private long consumersHigh;
private boolean insertionPaused;
private boolean consumptionPaused;
private boolean productionPaused;

public String getQueueName() {
return queueName;
}

public void setQueueName(String queueName) {
this.queueName = queueName;
}

public long getMessagesCurrent() {
return messagesCurrent;
}

public void setMessagesCurrent(long messagesCurrent) {
this.messagesCurrent = messagesCurrent;
}

public long getMessagesHigh() {
return messagesHigh;
}

public void setMessagesHigh(long messagesHigh) {
this.messagesHigh = messagesHigh;
}

public long getMessagesPending() {
return messagesPending;
}

public void setMessagesPending(long messagesPending) {
this.messagesPending = messagesPending;
}

public long getConsumersCurrent() {
return consumersCurrent;
}

public void setConsumersCurrent(long consumersCurrent) {
this.consumersCurrent = consumersCurrent;
}

public long getConsumersHigh() {
return consumersHigh;
}

public void setConsumersHigh(long consumersHigh) {
this.consumersHigh = consumersHigh;
}

public boolean isInsertionPaused() {
return insertionPaused;
}

public void setInsertionPaused(boolean insertionPaused) {
this.insertionPaused = insertionPaused;
}

public boolean isConsumptionPaused() {
return consumptionPaused;
}

public void setConsumptionPaused(boolean consumptionPaused) {
this.consumptionPaused = consumptionPaused;
}

public boolean isProductionPaused() {
return productionPaused;
}

public void setProductionPaused(boolean productionPaused) {
this.productionPaused = productionPaused;
}

public String getServerName() {
return serverName;
}

public void setServerName(String serverName) {
this.serverName = serverName;
}

public String getJndiName() {
return jndiName;
}

public void setJndiName(String jndiName) {
this.jndiName = jndiName;
}

@Override
public String toString() {
return getServerName() + "::" + getQueueName();
}
}
}

No comments: