mq java client 方式和mq java binding方式的区别

Python018

mq java client 方式和mq java binding方式的区别,第1张

MQ Java Binding方式使用JNI(Java Native Interface)类似于MQ 服务器应用程序。

MQSeries Java客户机服务器连接最快的方式是MQ Java Binding方式,这种方式要求MQ Java应用和MQ Server在同一台机器上。使用MQ Java Binding方式避免了建立网络连接的开销,因此,当连接对性能影响很大时,应当选用MQ Java Binding方式。

MQ Java Client方式通过Server端定义的服务器连接通道连接,服务器方需要启动侦听程序。MQ Java Client方式用于Java客户程序和服务器不在同一台机器时进行连接。

客户端连接,建立MQEnvironment类

MQEnvironment.hostname

以下是,客户端连接例子

// ===========================================================================

//

// Licensed Materials - Property of IBM

//

// 5639-C34

//

// (c) Copyright IBM Corp. 1995,1999

//

// ===========================================================================

// WebSphere MQ M'z Java f sample applet

//

// This sample runs as an applet using the appletviewer and HTML file,

// using the command :-

// appletviewer MQSample.html

// Output is to the command line, NOT the applet viewer window.

//

// Note. If you receive WebSphere MQ error 2 reason 2059 and you are sure your

// WebSphere MQ and TCP/IP setup is correct,

// you should click on the "Applet" selection in the Applet viewer window

// select properties, and change "Network access" to unrestricted.

import com.ibm.mq.*// Include the WebSphere MQ classes for Java package

public class MQSample extends java.applet.Applet

{

private String hostname = "your_hostname"// define the name of your

// host to connect to

private String channel = "server_channel"// define name of channel

// for client to use

// Note. assumes WebSphere MQ Server

// is listening on the default

// TCP/IP port of 1414

private String qManager = "your_Q_manager"// define name of queue

// manager object to

// connect to.

private MQQueueManager qMgr// define a queue manager object

// When the class is called, this initialization is done first.

public void init()

{

// Set up WebSphere MQ environment

MQEnvironment.hostname = hostname// Could have put the

// hostname &channel

MQEnvironment.channel = channel// string directly here!

MQEnvironment.properties.put(MQC.TRANSPORT_PROPERTY,//Set TCP/IP or server

MQC.TRANSPORT_MQSERIES)//Connection

} // end of init

public void start()

{

try {

// Create a connection to the queue manager

qMgr = new MQQueueManager(qManager)

// Set up the options on the queue we wish to open...

// Note. All WebSphere MQ Options are prefixed with MQC in Java.

int openOptions = MQC.MQOO_INPUT_AS_Q_DEF |

MQC.MQOO_OUTPUT

// Now specify the queue that we wish to open, and the open options...

MQQueue system_default_local_queue =

qMgr.accessQueue("SYSTEM.DEFAULT.LOCAL.QUEUE",

openOptions)

// Define a simple WebSphere MQ message, and write some text in UTF format..

MQMessage hello_world = new MQMessage()

hello_world.writeUTF("Hello World!")

// specify the message options...

MQPutMessageOptions pmo = new MQPutMessageOptions()// accept the defaults,

// same as

// MQPMO_DEFAULT

// constant

// put the message on the queue

system_default_local_queue.put(hello_world,pmo)

// get the message back again...

// First define WebSphere MQ message buffer to receive the message into..

MQMessage retrievedMessage = new MQMessage()

retrievedMessage.messageId = hello_world.messageId

// Set the get message options..

MQGetMessageOptions gmo = new MQGetMessageOptions()// accept the defaults

// same as

// MQGMO_DEFAULT

// get the message off the queue..

system_default_local_queue.get(retrievedMessage, gmo)

// And prove we have the message by displaying the UTF message text

String msgText = retrievedMessage.readUTF()

System.out.println("The message is: " + msgText)

// Close the queue

system_default_local_queue.close()

// Disconnect from the queue manager

qMgr.disconnect()

}

// If an error has occurred in the above, try to identify what went wrong.

// Was it WebSphere MQ error?

<1. WebSphere MQ classes for Java >} applet (2/3)

>}zk

62 WebSphere MQ 9C Java

>}&CLrzk

TBzkN]>vr%D&CLr,|9Cs(==:

1. ,S=SP\mw

2. +{"Ek SYSTEM.DEFAULT.LOCAL.QUEUE

3. YN!5XD{"

catch (MQException ex)

{

System.out.println("WebSphere MQ error occurred : Completion code " +

ex.completionCode +

" Reason code " + ex.reasonCode)

}

// Was it a Java buffer space error?

catch (java.io.IOException ex)

{

System.out.println("An error occurred whilst writing to the

message buffer: " + ex)

}

} // end of start

} // end of sample

MQException

该类包含WebSphere MQ 完成代码和错误代码常量的定义。以MQCC_开始的常量是WebSphere MQ 完成代码,而以MQRC_开始的常量则是WebSphere MQ 原因代码。只要出现WebSphere MQ

错误,就会给出MQException。

MQGetMessageOptions

该类包含控制MQQueue.get()方法行为的选项。

MQManagedObject

该类是MQQueueManager、MQQueue 和MQProcess 类的超类。它提供查询并设置这些资源属性的能力。

------解决方案--------------------

去取一次,得到 2033 错误就是没有消息符合你的条件。

使用 PCF 查询队列资料:

/**

* @return current depth of queue connected currently.

* @throws Exception

*/

public QueueInfo queryQueueInfo() throws Exception {

if (!checkStatus2(this.queueManager)) {

throw new IllegalStateException("Not Connected to queue manager.")

}

PCFMessageAgent agent = null

try {

agent = new PCFMessageAgent(this.queueManager)

// Inquiry Queue Name &Current Depth.

int[] attrs = {

CMQC.MQCA_Q_NAME, CMQC.MQIA_CURRENT_Q_DEPTH,

CMQC.MQIA_OPEN_INPUT_COUNT, CMQC.MQIA_OPEN_OUTPUT_COUNT,

CMQC.MQIA_Q_TYPE, CMQC.MQIA_DEFINITION_TYPE, CMQC.MQIA_INHIBIT_GET,

CMQC.MQIA_INHIBIT_PUT }

PCFParameter[] parameters = {

new MQCFST(CMQC.MQCA_Q_NAME , getInputQueue().getText().trim()),

new MQCFIL(CMQCFC.MQIACF_Q_ATTRS , attrs) }

// logger.log("Querying current depth of current queue.")

MQMessage[] responses = agent.send(CMQCFC.MQCMD_INQUIRE_Q, parameters)

QueueInfo info = new QueueInfo()

for (int i = 0i <responses.lengthi++) {

MQCFH cfh = new MQCFH(responses[i])

// Check the PCF header (MQCFH) in the response message

if (cfh.reason == 0) {

String name = ""

Integer depth = new Integer(0)

for (int j = 0j <cfh.parameterCountj++) { // Extract what we want from the returned attributes

PCFParameter p = PCFParameter.nextParameter(responses[i])

switch (p.getParameter()) {

case CMQC.MQCA_Q_NAME:

name = (String) p.getValue()

info.name = name

break

case CMQC.MQIA_CURRENT_Q_DEPTH:

depth = (Integer) p.getValue()

info.depth = depth.intValue()

break

case CMQC.MQIA_OPEN_INPUT_COUNT:

Integer inputCount = (Integer) p.getValue()

info.inputCount = inputCount.intValue()

break

case CMQC.MQIA_OPEN_OUTPUT_COUNT:

Integer outputCount = (Integer) p.getValue()

info.outputCount = outputCount.intValue()

break

case CMQC.MQIA_Q_TYPE:

info.type = ((Integer) p.getValue()).intValue()

break

case CMQC.MQIA_DEFINITION_TYPE:

info.definitionType = ((Integer) p.getValue()).intValue()

break

case CMQC.MQIA_INHIBIT_PUT:

info.putNotAllowed = ((Integer) p.getValue()).intValue() == 1

breakcase CMQC.MQIA_INHIBIT_GET:

info.getNotAllowed = ((Integer) p.getValue()).intValue() == 1

default:

}

}

// System.out.println("Queue " + name + " curdepth " + depth)

return info

} else {

System.out.println("PCF error:\n" + cfh)

// Walk through the returned parameters describing the error

for (int j = 0j <cfh.parameterCountj++) {

System.out.println(PCFParameter.nextParameter(responses[0]))

}

throw new Exception("PCF Error [reason :" + cfh.reason + "]")

}

}

return null

} catch (Exception e) {

throw e

} finally {

if (agent != null) {

try {

agent.disconnect()

} catch (Exception e) {

logger.log(e)

}

}

}

不是通过线程实现的,它是通过一种注册--通知机制实现的。在java的设计模式中,有一种模式叫:观察者模式,和这个类似。举个例子,本例子是一个简单的监听当数据发生变化时要做的操作。

1,我们先定义一个接口,可以让多个监听者实现<pre t="code" l="java">public interface IDataListen {

public void update(Object event,Object msg)

}2,实现一监听者

<pre t="code" l="java">public class DataListen implements IDataListen{

@Override

public void update(Object event, Object arg) {

// TODO Auto-generated method stub

System.out.println("数据发生了变化")

}

}3,被监听者

<pre t="code" l="java">public class DataManager{

private List<IDataListen>listenList = new ArrayList<>()

public void notifyListen(Object event,Object msg){

for(IDataListen dataListen : listenList){

dataListen.update(null, null)

}

}

public void addListen(IDataListen dataListen){

listenList.add(dataListen)

}

public void updateData(Object msg){

this.notifyListen(null, msg)

}

public static void main(String[] args) {

DataManager dataManager = new DataManager()

IDataListen dataListen1 = new DataListen()

IDataListen dataListen2 = new DataListen()

dataManager.addListen(dataListen1)

dataManager.addListen(dataListen2)

dataManager.updateData("aaa")

}

}main方法里面是监听的应用。这样就可以监听DataManager中的updateData行为了,当有数据发生变化时,就可以即时被监听者收到。