Tujuan:
- membaca dari beberapa antrian dengan nama yang sama, tetapi terletak di host / manajer antrian yang berbeda
- tulis respons ke node yang ditentukan secara acak
0. Anggaplah Anda telah menerapkan MQ atau menggunakan MQ orang lain.
1. Kami memuat dependensi ke dalam proyek:
maven
<dependency>
<groupId>com.ibm.mq</groupId>
<artifactId>mq-jms-spring-boot-starter</artifactId>
<version>2.3.3</version>
</dependency>
gradle
compile group: 'com.ibm.mq', name: 'mq-jms-spring-boot-starter', version: '2.3.3'
2. Buat konfigurasi, masukkan parameter koneksi poin Anda (Anda sudah membuatnya?). Kami menggunakan array, jadi bisa ada koneksi sebanyak yang Anda suka.
mq:
servers:
- queueManager: QM1
channel: DEV.ADMIN.SVRCONN
connName: ibmmq.ru(1414)
user: admin
password: passw0rd
- queueManager: QM2
channel: DEV.ADMIN.SVRCONN
connName: ibmmq.ru(1415)
user: admin
password: passw0rd
queue1: QUEUE1
queue2: QUEUE2
3. Buat kelas untuk membaca properti ini:
import lombok.Data;
import lombok.EqualsAndHashCode;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;
@Configuration
@ConfigurationProperties(prefix = "mq")
@EqualsAndHashCode(callSuper = false)
@Data
public class MqConfig {
private List<ConnectionConfiguration> servers;
private String queue1;
private String queue2;
}
import lombok.Data;
import lombok.EqualsAndHashCode;
@Data
@EqualsAndHashCode(callSuper = false)
public class ConnectionConfiguration {
String queueManager;
String channel;
String connName;
String user;
String password;
}
4. Buat pendengar:
import javax.jms.MessageListener;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;
@Component
@Slf4j
public class MqListener implements MessageListener {
@SneakyThrows
@Override
public void onMessage(@Payload javax.jms.Message message) {
log.info(" <" + message + ">");
//TODO:
}
5. Konfigurasi! Kami menentukan connectionFactors untuk setiap elemen array dari yml-properties. Kami membuat selembar templat untuk mengirim pesan, ke input yang kami beri makan koneksi yang dibuat. Kami membuat pabrik pendengar, yang masukannya kami juga menggunakan koneksiFactories yang dibuat.
import com.fasterxml.jackson.databind.ObjectMapper;
import com.ibm.mq.jms.MQConnectionFactory;
import com.ibm.msg.client.wmq.WMQConstants;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jms.annotation.EnableJms;
import org.springframework.jms.config.*;
import org.springframework.jms.connection.CachingConnectionFactory;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.support.QosSettings;
import org.springframework.jms.support.converter.MappingJackson2MessageConverter;
import org.springframework.jms.support.converter.MessageConverter;
import org.springframework.jms.support.converter.MessageType;
import org.springframework.jms.support.converter.SimpleMessageConverter;
import javax.jms.*;
import java.util.*;
import static javax.jms.DeliveryMode.NON_PERSISTENT;
import static javax.jms.Session.CLIENT_ACKNOWLEDGE;
@Configuration
@EnableJms
@Slf4j
public class MqConfiguration {
@Autowired
MqConfig mqConfig;
@Autowired
private JmsListenerEndpointRegistry registry;
// , connectionFactories
@Bean
public List<JmsListenerContainerFactory> myFactories(
@Qualifier("myConnFactories")
List<CachingConnectionFactory> connectionFactories,
MqListener mqListener) {
List<JmsListenerContainerFactory> factories = new ArrayList<>();
connectionFactories.forEach(connectionFactory -> {
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setSessionAcknowledgeMode(CLIENT_ACKNOWLEDGE);
QosSettings qosSettings = new QosSettings();
qosSettings.setDeliveryMode(NON_PERSISTENT);
factory.setReplyQosSettings(qosSettings);
SimpleJmsListenerEndpoint endpoint = new SimpleJmsListenerEndpoint();
endpoint.setId("myJmsEndpoint-"+ UUID.randomUUID());
endpoint.setDestination(mqConfig.getQueue1());
endpoint.setMessageListener(mqListener);
registry.registerListenerContainer(endpoint, factory);
factories.add(factory);
});
return factories;
}
// ,
@Bean
@Qualifier("myJmsTemplates")
public List<JmsTemplate> jmsTemplates(
@Qualifier("myConnFactories")
List<CachingConnectionFactory> connectionFactories) {
return getJmsTemplates(new ArrayList<ConnectionFactory>(connectionFactories));
}
public List<JmsTemplate> getJmsTemplates(List<ConnectionFactory> connectionFactories) {
List<JmsTemplate> jmsTemplates = new ArrayList<>();
for (ConnectionFactory connectionFactory : connectionFactories) {
JmsTemplate jmsTemplate = new JmsTemplate();
jmsTemplate.setConnectionFactory(connectionFactory);
jmsTemplate.setMessageConverter(new SimpleMessageConverter());
jmsTemplate.setDefaultDestinationName(mqConfig.getQueue2());
jmsTemplate.setDeliveryMode(NON_PERSISTENT);
jmsTemplate.setDeliveryPersistent(false);
jmsTemplate.setExplicitQosEnabled(true);
jmsTemplates.add(jmsTemplate);
}
return jmsTemplates;
}
// yml-
@Bean
@Qualifier("myConnFactories")
public List<CachingConnectionFactory> connectionFactories() throws JMSException {
List<CachingConnectionFactory> factories = new ArrayList<>();
for (ConnectionConfiguration server : mqConfig.getServers()) {
CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory();
MQConnectionFactory cf = new MQConnectionFactory();
cachingConnectionFactory.setTargetConnectionFactory(cf);
cf.setQueueManager(server.getQueueManager());
cf.setChannel(server.getChannel());
cf.setConnectionNameList(server.getConnName());
cf.setStringProperty(WMQConstants.USERID, server.getUser());
cf.setStringProperty(WMQConstants.PASSWORD, server.getPassword());
cf.setStringProperty("XMSC_WMQ_CONNECTION_MODE", "1");
factories.add(cachingConnectionFactory);
}
return factories;
}
}
endpoint.setMessageListener (mqListener);Di sini kami menunjukkan pendengar (yang dibuat pada langkah 4) untuk menentukan tindakan saat menerima pesan.
6. Mari buat lapisan layanan, di mana katakanlah akan ada beberapa logika dan setelah mengirim tanggapan.
import javax.jms.TextMessage;
public interface MqService {
void sendToMq(TextMessage msg);
}
import javax.jms.TextMessage;
import org.springframework.jms.JmsException;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.stereotype.Service;
@Service
@Slf4j
public class MqServiceImpl implements MqService {
@Autowired
private MqConfig mqConfig;
@Autowired
@Qualifier("myJmsTemplates")
List<JmsTemplate> jmsTemplates;
@Override
public void sendToMq(TextMessage msg ) {
//-
// / .
int maxIndex = jmsTemplates.size()-1; // - ""
int randomNumber = (int) Math.round(Math.random() * maxIndex);
jmsTemplates.get(randomNumber).convertAndSend(mqConfig.getQueue2(), msg);
}
}
7. Tambahkan pengiriman respons ke pendengar:
@Autowired
MqService mqService;
@SneakyThrows
@Override
public void onMessage(@Payload javax.jms.Message message) {
log.info(" <" + message + ">");
mqService.sentToMq((TextMessage) message);
}
Voila, Anda sudah selesai, Anda bisa memeriksanya.