MQSeries Java客户机服务器连接最快的方式是MQ Java Binding方式,这种方式要求MQ Java应用和MQ Server在同一台机器上。使用MQ Java Binding方式避免了建立网络连接的开销,因此,当连接对性能影响很大时,应当选用MQ Java Binding方式。
MQ Java Client方式通过Server端定义的服务器连接通道连接,服务器方需要启动侦听程序。MQ Java Client方式用于Java客户程序和服务器不在同一台机器时进行连接。
客户端连接,建立MQEnvironment类
MQEnvironment.hostname
以下是,客户端连接例子
// ===========================================================================
//
// Licensed Materials - Property of IBM
//
// 5639-C34
//
// (c) Copyright IBM Corp. 1995,1999
//
// ===========================================================================
// WebSphere MQ M'z Java f sample applet
//
// This sample runs as an applet using the appletviewer and HTML file,
// using the command :-
// appletviewer MQSample.html
// Output is to the command line, NOT the applet viewer window.
//
// Note. If you receive WebSphere MQ error 2 reason 2059 and you are sure your
// WebSphere MQ and TCP/IP setup is correct,
// you should click on the "Applet" selection in the Applet viewer window
// select properties, and change "Network access" to unrestricted.
import com.ibm.mq.*// Include the WebSphere MQ classes for Java package
public class MQSample extends java.applet.Applet
{
private String hostname = "your_hostname"// define the name of your
// host to connect to
private String channel = "server_channel"// define name of channel
// for client to use
// Note. assumes WebSphere MQ Server
// is listening on the default
// TCP/IP port of 1414
private String qManager = "your_Q_manager"// define name of queue
// manager object to
// connect to.
private MQQueueManager qMgr// define a queue manager object
// When the class is called, this initialization is done first.
public void init()
{
// Set up WebSphere MQ environment
MQEnvironment.hostname = hostname// Could have put the
// hostname &channel
MQEnvironment.channel = channel// string directly here!
MQEnvironment.properties.put(MQC.TRANSPORT_PROPERTY,//Set TCP/IP or server
MQC.TRANSPORT_MQSERIES)//Connection
} // end of init
public void start()
{
try {
// Create a connection to the queue manager
qMgr = new MQQueueManager(qManager)
// Set up the options on the queue we wish to open...
// Note. All WebSphere MQ Options are prefixed with MQC in Java.
int openOptions = MQC.MQOO_INPUT_AS_Q_DEF |
MQC.MQOO_OUTPUT
// Now specify the queue that we wish to open, and the open options...
MQQueue system_default_local_queue =
qMgr.accessQueue("SYSTEM.DEFAULT.LOCAL.QUEUE",
openOptions)
// Define a simple WebSphere MQ message, and write some text in UTF format..
MQMessage hello_world = new MQMessage()
hello_world.writeUTF("Hello World!")
// specify the message options...
MQPutMessageOptions pmo = new MQPutMessageOptions()// accept the defaults,
// same as
// MQPMO_DEFAULT
// constant
// put the message on the queue
system_default_local_queue.put(hello_world,pmo)
// get the message back again...
// First define WebSphere MQ message buffer to receive the message into..
MQMessage retrievedMessage = new MQMessage()
retrievedMessage.messageId = hello_world.messageId
// Set the get message options..
MQGetMessageOptions gmo = new MQGetMessageOptions()// accept the defaults
// same as
// MQGMO_DEFAULT
// get the message off the queue..
system_default_local_queue.get(retrievedMessage, gmo)
// And prove we have the message by displaying the UTF message text
String msgText = retrievedMessage.readUTF()
System.out.println("The message is: " + msgText)
// Close the queue
system_default_local_queue.close()
// Disconnect from the queue manager
qMgr.disconnect()
}
// If an error has occurred in the above, try to identify what went wrong.
// Was it WebSphere MQ error?
<1. WebSphere MQ classes for Java >} applet (2/3)
>}zk
62 WebSphere MQ 9C Java
>}&CLrzk
TBzkN]>vr%D&CLr,|9Cs(==:
1. ,S=SP\mw
2. +{"Ek SYSTEM.DEFAULT.LOCAL.QUEUE
3. YN!5XD{"
catch (MQException ex)
{
System.out.println("WebSphere MQ error occurred : Completion code " +
ex.completionCode +
" Reason code " + ex.reasonCode)
}
// Was it a Java buffer space error?
catch (java.io.IOException ex)
{
System.out.println("An error occurred whilst writing to the
message buffer: " + ex)
}
} // end of start
} // end of sample
MQException该类包含WebSphere MQ 完成代码和错误代码常量的定义。以MQCC_开始的常量是WebSphere MQ 完成代码,而以MQRC_开始的常量则是WebSphere MQ 原因代码。只要出现WebSphere MQ
错误,就会给出MQException。
MQGetMessageOptions
该类包含控制MQQueue.get()方法行为的选项。
MQManagedObject
该类是MQQueueManager、MQQueue 和MQProcess 类的超类。它提供查询并设置这些资源属性的能力。
------解决方案--------------------
去取一次,得到 2033 错误就是没有消息符合你的条件。
使用 PCF 查询队列资料:
/**
* @return current depth of queue connected currently.
* @throws Exception
*/
public QueueInfo queryQueueInfo() throws Exception {
if (!checkStatus2(this.queueManager)) {
throw new IllegalStateException("Not Connected to queue manager.")
}
PCFMessageAgent agent = null
try {
agent = new PCFMessageAgent(this.queueManager)
// Inquiry Queue Name &Current Depth.
int[] attrs = {
CMQC.MQCA_Q_NAME, CMQC.MQIA_CURRENT_Q_DEPTH,
CMQC.MQIA_OPEN_INPUT_COUNT, CMQC.MQIA_OPEN_OUTPUT_COUNT,
CMQC.MQIA_Q_TYPE, CMQC.MQIA_DEFINITION_TYPE, CMQC.MQIA_INHIBIT_GET,
CMQC.MQIA_INHIBIT_PUT }
PCFParameter[] parameters = {
new MQCFST(CMQC.MQCA_Q_NAME , getInputQueue().getText().trim()),
new MQCFIL(CMQCFC.MQIACF_Q_ATTRS , attrs) }
// logger.log("Querying current depth of current queue.")
MQMessage[] responses = agent.send(CMQCFC.MQCMD_INQUIRE_Q, parameters)
QueueInfo info = new QueueInfo()
for (int i = 0i <responses.lengthi++) {
MQCFH cfh = new MQCFH(responses[i])
// Check the PCF header (MQCFH) in the response message
if (cfh.reason == 0) {
String name = ""
Integer depth = new Integer(0)
for (int j = 0j <cfh.parameterCountj++) { // Extract what we want from the returned attributes
PCFParameter p = PCFParameter.nextParameter(responses[i])
switch (p.getParameter()) {
case CMQC.MQCA_Q_NAME:
name = (String) p.getValue()
info.name = name
break
case CMQC.MQIA_CURRENT_Q_DEPTH:
depth = (Integer) p.getValue()
info.depth = depth.intValue()
break
case CMQC.MQIA_OPEN_INPUT_COUNT:
Integer inputCount = (Integer) p.getValue()
info.inputCount = inputCount.intValue()
break
case CMQC.MQIA_OPEN_OUTPUT_COUNT:
Integer outputCount = (Integer) p.getValue()
info.outputCount = outputCount.intValue()
break
case CMQC.MQIA_Q_TYPE:
info.type = ((Integer) p.getValue()).intValue()
break
case CMQC.MQIA_DEFINITION_TYPE:
info.definitionType = ((Integer) p.getValue()).intValue()
break
case CMQC.MQIA_INHIBIT_PUT:
info.putNotAllowed = ((Integer) p.getValue()).intValue() == 1
breakcase CMQC.MQIA_INHIBIT_GET:
info.getNotAllowed = ((Integer) p.getValue()).intValue() == 1
default:
}
}
// System.out.println("Queue " + name + " curdepth " + depth)
return info
} else {
System.out.println("PCF error:\n" + cfh)
// Walk through the returned parameters describing the error
for (int j = 0j <cfh.parameterCountj++) {
System.out.println(PCFParameter.nextParameter(responses[0]))
}
throw new Exception("PCF Error [reason :" + cfh.reason + "]")
}
}
return null
} catch (Exception e) {
throw e
} finally {
if (agent != null) {
try {
agent.disconnect()
} catch (Exception e) {
logger.log(e)
}
}
}
不是通过线程实现的,它是通过一种注册--通知机制实现的。在java的设计模式中,有一种模式叫:观察者模式,和这个类似。举个例子,本例子是一个简单的监听当数据发生变化时要做的操作。1,我们先定义一个接口,可以让多个监听者实现<pre t="code" l="java">public interface IDataListen {
public void update(Object event,Object msg)
}2,实现一监听者
<pre t="code" l="java">public class DataListen implements IDataListen{
@Override
public void update(Object event, Object arg) {
// TODO Auto-generated method stub
System.out.println("数据发生了变化")
}
}3,被监听者
<pre t="code" l="java">public class DataManager{
private List<IDataListen>listenList = new ArrayList<>()
public void notifyListen(Object event,Object msg){
for(IDataListen dataListen : listenList){
dataListen.update(null, null)
}
}
public void addListen(IDataListen dataListen){
listenList.add(dataListen)
}
public void updateData(Object msg){
this.notifyListen(null, msg)
}
public static void main(String[] args) {
DataManager dataManager = new DataManager()
IDataListen dataListen1 = new DataListen()
IDataListen dataListen2 = new DataListen()
dataManager.addListen(dataListen1)
dataManager.addListen(dataListen2)
dataManager.updateData("aaa")
}
}main方法里面是监听的应用。这样就可以监听DataManager中的updateData行为了,当有数据发生变化时,就可以即时被监听者收到。