- 浏览: 123231 次
- 性别:
- 来自: 广州
文章分类
最新评论
-
xiaoyao3857:
...
openstack dashboard简体中文汉化 -
yuky1327:
你看看系统重启之后还正常不,我们研究的时候是win7重启之后 ...
VMWare上安装OpenStack -
dotapinkcat:
2. Network ConfigurationEdit th ...
VMWare上安装OpenStack -
daduedie:
嗯,不错~~~
VMWare上安装OpenStack
Spring-ActiveMQ的点对点和Topic
- 博客分类:
- activemq maven
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.example.activemq</groupId> <artifactId>activemq-test</artifactId> <version>1.0-SNAPSHOT</version> <packaging>jar</packaging> <name>activemq-test</name> <url>http://maven.apache.org</url> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <org.springframework.version>3.1.1.RELEASE</org.springframework.version> </properties> <repositories> <repository> <id>kxcomm-maven</id> <name>Maven kxcomm Repository</name> <url>http://122.13.0.56:8088/nexus/content/groups/public/</url> <releases> <enabled>true</enabled> </releases> <snapshots> <enabled>true</enabled> </snapshots> </repository> </repositories> <dependencies> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>3.8.1</version> <scope>test</scope> </dependency> <dependency> <groupId>commons-collections</groupId> <artifactId>commons-collections</artifactId> <version>3.2</version> </dependency> <dependency> <groupId>commons-configuration</groupId> <artifactId>commons-configuration</artifactId> <version>1.6</version> </dependency> <dependency> <groupId>commons-io</groupId> <artifactId>commons-io</artifactId> <version>1.3.2</version> </dependency> <dependency> <groupId>commons-logging</groupId> <artifactId>commons-logging</artifactId> <version>1.1.1</version> </dependency> <dependency> <groupId>log4j</groupId> <artifactId>log4j</artifactId> <version>1.2.17</version> </dependency> <dependency> <groupId>commons-beanutils</groupId> <artifactId>commons-beanutils</artifactId> <version>1.8.3</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-asm</artifactId> <version>${org.springframework.version}</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-jms</artifactId> <version>${org.springframework.version}</version> </dependency> <dependency> <groupId>com.davidkarlsen.commonstransaction.spring</groupId> <artifactId>commons-transaction-spring</artifactId> <version>0.9</version> </dependency> <dependency> <groupId>org.apache.httpcomponents</groupId> <artifactId>httpclient</artifactId> <version>4.2-beta1</version> </dependency> <dependency> <groupId>fastutil</groupId> <artifactId>fastutil</artifactId> <version>5.0.9</version> </dependency> <dependency> <groupId>ch.qos.logback</groupId> <artifactId>logback-classic</artifactId> <version>0.9.27</version> <scope>compile</scope> </dependency> <dependency> <groupId>ch.qos.logback</groupId> <artifactId>logback-core</artifactId> <version>0.9.27</version> <scope>compile</scope> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> <version>1.6.1</version> <scope>compile</scope> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>jcl-over-slf4j</artifactId> <version>1.6.1</version> <scope>runtime</scope> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> <version>1.7.5</version> </dependency> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-all</artifactId> <version>5.8.0</version> </dependency> </dependencies> </project>
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:aop="http://www.springframework.org/schema/aop" xmlns:tx="http://www.springframework.org/schema/tx" xmlns:context="http://www.springframework.org/schema/context" xsi:schemaLocation=" http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.5.xsd http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-2.5.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-2.5.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-2.5.xsd" default-autowire="byName" default-lazy-init="true"> <import resource="activemq-test.xml"/> </beans>
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd"> <!-- 创建工厂连接 --> <bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"> <property name="brokerURL" value="tcp://localhost:61616" /> </bean> <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate"> <property name="connectionFactory" ref="connectionFactory" /> <property name="defaultDestination" ref="rantzDestination" /> </bean> <!-- Point-to-Point --> <!-- activeMQ消息目标 队列 --> <bean id="rantzDestination" class="org.apache.activemq.command.ActiveMQQueue"> <constructor-arg index="0" value="rantz.marketing.queue"></constructor-arg> </bean> <!-- activeMQ消息目标 主题--> <!-- <bean id="rantzDestination" class="org.apache.activemq.command.ActiveMQTopic">--> <!-- <constructor-arg index="0" value="rantz.marketing.queue"></constructor-arg>--> <!-- </bean>--> <bean id="producer" class="activemq.test.p2p.producer.RantzMarketingGatewayImpl"> <property name="jmsTemplate" ref="jmsTemplate" /> <property name="destination" ref="rantzDestination" /> </bean> <bean id="consumer" class="activemq.test.p2p.consumer.MarketingReceiverGatewayImpl"> <property name="jmsTemplate" ref="jmsTemplate" /> </bean> <!-- Point-to-Point End--> <!-- Topic --> <bean id="topic" class="org.apache.activemq.command.ActiveMQTopic" autowire="constructor"> <constructor-arg index="0" value="kxcomm.mms.topic" /> </bean> <bean id="control" class="org.apache.activemq.command.ActiveMQTopic" autowire="constructor"> <constructor-arg index="0" value="kxcomm.mms.control" /> </bean> <bean id="myListener" class="activemq.test.topic.MyListener"> <property name="connectionFactory" ref="connectionFactory" /> <property name="topic" ref="topic" /> <property name="control" ref="control" /> </bean> <bean id="myPublisher" class="activemq.test.topic.MyPublisher"> <property name="connectionFactory" ref="connectionFactory" /> <property name="topic" ref="topic" /> <property name="control" ref="control" /> </bean> <!-- Topic End--> </beans>
package activemq.test.model; import java.io.Serializable; public class User implements Serializable{ private static final long serialVersionUID = -3098636047897519268L; private String name; private String sex; private int age; public String getName() { return name; } public void setName(String name) { this.name = name; } public String getSex() { return sex; } public void setSex(String sex) { this.sex = sex; } public int getAge() { return age; } public void setAge(int age) { this.age = age; } @Override public String toString() { return "User [name=" + name + ", sex=" + sex + ", age=" + age + "]"; } }
PTP模型
PTP(Point-to-Point)模型是基于队列的,生产者发消息到队列,消费者从队列接收消息,队列的存在使得消息的异步传输成为可能。和邮件系统中的邮箱一样,队列可以包含各种消息,JMS Provider 提供工具管理队列的创建、删除。JMS PTP 模型定义了客户端如何向队列发送消息,从队列接收消息,浏览队列中的消息。
package activemq.test.p2p.consumer; import org.springframework.jms.core.JmsTemplate; import activemq.test.model.User; public class MarketingReceiverGatewayImpl { private JmsTemplate jmsTemplate; public JmsTemplate getJmsTemplate() { return jmsTemplate; } public void setJmsTemplate(JmsTemplate jmsTemplate) { this.jmsTemplate = jmsTemplate; } public MarketingReceiverGatewayImpl() { } public void receiveMotorist() throws Exception{ User message = (User)jmsTemplate.receiveAndConvert(); System.out.println("reviced msg is:" + message.toString()); } }
package activemq.test.p2p.consumer; import org.springframework.context.ApplicationContext; import org.springframework.context.support.ClassPathXmlApplicationContext; public class StartConsumer { public static void main(String[] args) { /*开始加载spring配置文件*/ ApplicationContext context = new ClassPathXmlApplicationContext("classpath:modules/applicationContext.xml"); MarketingReceiverGatewayImpl rantzMarketingGateway= (MarketingReceiverGatewayImpl) context.getBean("consumer"); System.out.println("Receive Start ..."); try { while(true){ rantzMarketingGateway.receiveMotorist(); } } catch (Exception e) { e.printStackTrace(); } } }
package activemq.test.p2p.producer; public interface IRantzMarketingGateway { /** * * 发送文本对象 * * @author zhangjh 新增日期:2013-9-20 * @since smsc-gateway */ public void sendMotoristInfo(); /** * * 发送对象 * * @author zhangjh 新增日期:2013-9-20 * @since smsc-gateway */ public void sendObjectInfo(); }
package activemq.test.p2p.producer; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.Session; import org.springframework.jms.core.JmsTemplate; import org.springframework.jms.core.MessageCreator; import activemq.test.model.User; public class RantzMarketingGatewayImpl implements IRantzMarketingGateway { private JmsTemplate jmsTemplate; private Destination destination; public JmsTemplate getJmsTemplate() { return jmsTemplate; } public void setJmsTemplate(JmsTemplate jmsTemplate) { this.jmsTemplate = jmsTemplate; } public Destination getDestination() { return destination; } public void setDestination(Destination destination) { this.destination = destination; } public void sendMotoristInfo() { MessageCreator msg = new MessageCreator(){ public Message createMessage(Session session) throws JMSException { return session.createTextMessage("这是一个测试,"+System.currentTimeMillis()); } }; jmsTemplate.send(destination, msg); } public void sendObjectInfo() { User u = new User(); u.setAge(17); u.setName("yuky"+System.currentTimeMillis()); u.setSex("女"); jmsTemplate.convertAndSend(u); } }
package activemq.test.p2p.producer; import org.springframework.context.ApplicationContext; import org.springframework.context.support.ClassPathXmlApplicationContext; public class StartProducer { public static void main(String[] args) { /*开始加载spring配置文件*/ ApplicationContext context = new ClassPathXmlApplicationContext("classpath:modules/applicationContext.xml"); IRantzMarketingGateway rantzMarketingGateway= (RantzMarketingGatewayImpl) context.getBean("producer"); for(int i=0;i<10;i++){ rantzMarketingGateway.sendObjectInfo(); System.out.println("Start ..."); } } }
PUB/SUB模型
消息订阅分为非持久订阅(non-durable subscription)和持久订阅(durable subscrip-tion),非持久订阅只有当客户端处于激活状态,也就是和JMS Provider 保持连接状态才能收到发送到某个主题的消息,而当客户端处于离线状态,这个时间段发到主题的消息将会丢失,永远不会收到。持久订阅时,客户端向JMS 注册一个识别自己身份的ID,当这个客户端处于离线时,JMS Provider 会为这个ID 保存所有发送到主题的消息,当客户再次连接到JMS Provider时,会根据自己的ID 得到所有当自己处于离线时发送到主题的消息。
package activemq.test.topic; import javax.jms.Connection; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageListener; import javax.jms.MessageProducer; import javax.jms.ObjectMessage; import javax.jms.Session; import javax.jms.TextMessage; import javax.jms.Topic; import org.apache.activemq.ActiveMQConnectionFactory; import activemq.test.model.User; public class MyListener implements MessageListener { private ActiveMQConnectionFactory connectionFactory; private Connection connection; private Session session; private MessageProducer producer; private Topic topic; private Topic control; public Topic getTopic() { return topic; } public void setTopic(Topic topic) { this.topic = topic; } public Topic getControl() { return control; } public void setControl(Topic control) { this.control = control; } public ActiveMQConnectionFactory getConnectionFactory() { return connectionFactory; } public void setConnectionFactory(ActiveMQConnectionFactory connectionFactory) { this.connectionFactory = connectionFactory; } public void onMessage(Message message) { try{ if (checkText(message, "SHUTDOWN")) { try { connection.close(); System.out.println("退出监听消息"); } catch (Exception e) { e.printStackTrace(System.out); } } else if (checkText(message, "REPORT")) { // send a report: try { System.out.println("MyListener->收到 a report"); long time = System.currentTimeMillis(); String msg = "MyListener->返回 a report :" + time + "ms"; System.out.println(msg); producer.send(session.createTextMessage(msg)); } catch (Exception e) { e.printStackTrace(System.out); } } else { ObjectMessage obj = (ObjectMessage)message; User u = (User) obj.getObject(); System.out.println("Received messages."+ u.toString()); } }catch(Exception e){ } } public void run() throws JMSException { if(connectionFactory!=null){ System.out.println("connectionFactory is ok"); connection = connectionFactory.createConnection(); session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); MessageConsumer consumer = session.createConsumer(topic); consumer.setMessageListener(this); connection.start(); producer = session.createProducer(control); System.out.println("Waiting for messages..."); } } private static boolean checkText(Message m, String s) { try { return m instanceof TextMessage && ((TextMessage)m).getText().equals(s); } catch (JMSException e) { e.printStackTrace(System.out); return false; } } }
package activemq.test.topic; import org.springframework.context.ApplicationContext; import org.springframework.context.support.ClassPathXmlApplicationContext; public class StartListener { public static void main(String[] args) { /*开始加载spring配置文件*/ ApplicationContext context = new ClassPathXmlApplicationContext("classpath:modules/applicationContext.xml"); MyListener myListener= (MyListener) context.getBean("myListener"); try { if(myListener!=null){ System.out.println("success..."); } myListener.run(); } catch (Exception e) { e.printStackTrace(); } } }
package activemq.test.topic; import javax.jms.Connection; import javax.jms.DeliveryMode; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageListener; import javax.jms.MessageProducer; import javax.jms.ObjectMessage; import javax.jms.Session; import javax.jms.TextMessage; import javax.jms.Topic; import org.apache.activemq.ActiveMQConnectionFactory; import activemq.test.model.User; public class MyPublisher implements MessageListener { private ActiveMQConnectionFactory connectionFactory; private Connection connection; private Session session; private MessageProducer publisher; private Topic topic; private Topic control; private final Object mutex = new Object(); public ActiveMQConnectionFactory getConnectionFactory() { return connectionFactory; } public void setConnectionFactory(ActiveMQConnectionFactory connectionFactory) { this.connectionFactory = connectionFactory; } public Topic getTopic() { return topic; } public void setTopic(Topic topic) { this.topic = topic; } public Topic getControl() { return control; } public void setControl(Topic control) { this.control = control; } public void onMessage(Message message) { synchronized (mutex) { System.out.println("Received report " + getReport(message) ); } } Object getReport(Message m) { try { return ((TextMessage)m).getText(); } catch (JMSException e) { e.printStackTrace(System.out); return e.toString(); } } public void publish() throws Exception { User u = new User(); u.setAge(17); u.setName("yuky"+System.currentTimeMillis()); u.setSex("女"); // send events ObjectMessage obj = session.createObjectMessage(); obj.setObject(u); for (int i = 0; i < 10; i++) { publisher.send(obj); publisher.send(session.createTextMessage("REPORT")); } } public void run() throws Exception { connection = connectionFactory.createConnection(); session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); publisher = session.createProducer(topic); publisher.setDeliveryMode(DeliveryMode.NON_PERSISTENT); session.createConsumer(control).setMessageListener(this); connection.start(); } public void stop() throws JMSException{ publisher.send(session.createTextMessage("SHUTDOWN")); connection.stop(); connection.close(); } }
package activemq.test.topic; import javax.jms.JMSException; import org.springframework.context.ApplicationContext; import org.springframework.context.support.ClassPathXmlApplicationContext; public class StartPublisher { public static void main(String[] args) throws InterruptedException { /*开始加载spring配置文件*/ ApplicationContext context = new ClassPathXmlApplicationContext("classpath:modules/applicationContext.xml"); MyPublisher publisher= (MyPublisher) context.getBean("myPublisher"); try { publisher.run(); publisher.publish(); } catch (Exception e) { try { publisher.stop(); } catch (JMSException e1) { e1.printStackTrace(); } e.printStackTrace(); } } }
相关推荐
Spring-ActiveMQ-jar
apache-activemq-5.16.0.zip
apache-activemq-5.9.0-bin.zip
apache-activemq-5.8.0-bin.zip
apache-activemq-5.15.15二进制包,安装包,apache-activemq-5.15.15-bin.tar.gz
spring-boot-activemq-consumer 源码
spring-boot-activemq-producer 源码
解压缩apache-activemq-5.5.1-bin.zip,然后双击apache-activemq-5.5.1\bin\activemq.bat运行ActiveMQ程序。 包含了apache-activemq-5.5.1-bin.zip以及ActiveMQ一个helloworld的demo启动ActiveMQ以后,登陆:...
apache-activemq-5.16.0
MQ是消息中间件,是一种在分布式系统中应用程序借以传递消息的媒介,常用的有ActiveMQ,RabbitMQ,kafka。ActiveMQ是Apache下的开源项目,完全支持JMS1.1和J2EE1.4规范的JMS ...1、点对点(queue) 2、一对多(topic)
spring boot activemq集成示例,包含queue和topic消息的发送、接收,连接池的支持。
tar -zxvf apache-activemq-5.15.12-bin.tar.gz 2.进入bin目录 cd /apache-activemq-5.15.12/bin 3.运行,没有配置环境变量只能在bin目录下使用命令 ./activemq 4.配置环境变量,配置完环境变量之后...
apache-activemq-5.15.3-bin apache-activemq-5.15.3-bin
spring集成ActiveMQ的配置过程
本项目基于Spring这一平台,整合流行的开源消息队列中间件ActiveMQ,实现一个向ActiveMQ添加和读取消息的功能。并比较了两种模式:生产者-消费者模式和发布-订阅模式的区别。 包含的特性如下: 1.开启activeMQ,访问...
spring整合activemq单点测试
linux-apache-activemq-5.15.3和 linux-jdk1.8,用于开发activemq,activemq5.15.3必须用jdk1.8支持
spring-activemq-integration 通过使用Spring Boot演示ActiveMQ Artemis的基本集成。 要使用嵌入式ActiveMQ而不是外部代理,请在pom.xml中添加以下依赖项。在Docker上运行ActiveMQ 泊坞窗运行-it --rm -p 8161:8161...
spring-boot 2.5 集成 ActiveMQ 发布订阅模式
spring-activeMQ-演示 spring-activeMQ-演示 怎么跑? 1片spring-activeMQ-demo ``mvn全新安装`'' 2以tomcat7身份运行 3访问