博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
ActiveMQ消息队列
阅读量:6821 次
发布时间:2019-06-26

本文共 8873 字,大约阅读时间需要 29 分钟。

hot3.png

        下载地址:

        下载完成后解压缩apache-activemq-5.5.1-bin.zip,然后双击apache-activemq-5.5.1\bin\activemq.bat运行ActiveMQ程序。    

wrapper  | --> Wrapper Started as Consolewrapper  | Launching a JVM...jvm 1    | Wrapper (Version 3.2.3) http://wrapper.tanukisoftware.orgjvm 1    |   Copyright 1999-2006 Tanuki Software, Inc.  All Rights Reserved.jvm 1    |jvm 1    | Java Runtime: Oracle Corporation 1.7.0_51 C:\Program Files\Java\jdk1.7.0_51\jrejvm 1    |   Heap sizes: current=61440k  free=56514k  max=932352kjvm 1    |     JVM args: -Dactivemq.home=../.. -Dactivemq.base=../.. -Djavax.net.ssl.keyStorePassword=password -Djavax.net.ssl.trustStorePassword=password -Djavax.net.ssl.keyStore=../../conf/broker.ks -Djavax.net.ssl.trustStore=../../conf/broker.ts -Dcom.sun.management.jmxremote -Dorg.apache.activemq.UseDedicatedTaskRunner=true -Djava.util.logging.config.file=logging.properties -Dactivemq.conf=../../conf -Dactivemq.data=../../data -Djava.security.auth.login.config=../../conf/login.config -Xmx1024m -Djava.library.path=../../bin/win64 -Dwrapper.key=nHS3sBcSd6ZaDpiu -Dwrapper.port=32000 -Dwrapper.jvm.port.min=31000 -Dwrapper.jvm.port.max=31999 -Dwrapper.pid=8028 -Dwrapper.version=3.2.3 -Dwrapper.native_library=wrapper -Dwrapper.cpu.timeout=10 -Dwrapper.jvmid=1jvm 1    | Extensions classpath:jvm 1    |   [..\..\lib,..\..\lib\camel,..\..\lib\optional,..\..\lib\web,..\..\lib\extra]jvm 1    | ACTIVEMQ_HOME: ..\..jvm 1    | ACTIVEMQ_BASE: ..\..jvm 1    | ACTIVEMQ_CONF: ..\..\confjvm 1    | ACTIVEMQ_DATA: ..\..\datajvm 1    | Loading message broker from: xbean:activemq.xmljvm 1    |  INFO | Refreshing org.apache.activemq.xbean.XBeanBrokerFactory$1@21f4075c: startup date [Mon Aug 24 14:09:22 CST 2015]; root of context hierarchyjvm 1    |  INFO | PListStore:[E:\apache-activemq-5.12.0\bin\win64\..\..\data\localhost\tmp_storage] startedjvm 1    |  INFO | Using Persistence Adapter: KahaDBPersistenceAdapter[E:\apache-activemq-5.12.0\bin\win64\..\..\data\kahadb]jvm 1    |  INFO | Apache ActiveMQ 5.12.0 (localhost, ID:shy-PC-64231-1440396565339-0:1) is startingjvm 1    |  INFO | Listening for connections at: tcp://shy-PC:61616?maximumConnections=1000&wireFormat.maxFrameSize=104857600jvm 1    |  INFO | Connector openwire startedjvm 1    |  INFO | Listening for connections at: amqp://shy-PC:5672?maximumConnections=1000&wireFormat.maxFrameSize=104857600jvm 1    |  INFO | Connector amqp startedjvm 1    |  INFO | Listening for connections at: stomp://shy-PC:61613?maximumConnections=1000&wireFormat.maxFrameSize=104857600jvm 1    |  INFO | Connector stomp startedjvm 1    |  INFO | Listening for connections at: mqtt://shy-PC:1883?maximumConnections=1000&wireFormat.maxFrameSize=104857600jvm 1    |  INFO | Connector mqtt startedjvm 1    | {}jvm 1    |  INFO | Listening for connections at ws://shy-PC:61614?maximumConnections=1000&wireFormat.maxFrameSize=104857600jvm 1    |  INFO | Connector ws startedjvm 1    |  INFO | Apache ActiveMQ 5.12.0 (localhost, ID:shy-PC-64231-1440396565339-0:1) startedjvm 1    |  INFO | For help or more information please see: http://activemq.apache.orgjvm 1    |  INFO | ActiveMQ WebConsole available at http://0.0.0.0:8161/jvm 1    |  INFO | ActiveMQ Jolokia REST API available at http://0.0.0.0:8161/api/jolokia/jvm 1    |  INFO | Initializing Spring FrameworkServlet 'dispatcher'jvm 1    |  INFO | jolokia-agent: No access restrictor found at classpath:/jolokia-access.xml, access to all MBeans is allowed

        用户配置文件在apache-activemq-5.12.0\conf\users.properties,默认用户密码为admin/admin

        启动ActiveMQ以后,登陆:

        141229_DrkR_2358114.png

        打开eclipse,创建两个项目(producer、customer),将apache-activemq-5.5.1\lib下的jar拷贝到项目里并buildpath。分别在两个项目里对应建立如下类:

        producer类(producer项目

package producer;import javax.jms.Connection;import javax.jms.ConnectionFactory;import javax.jms.DeliveryMode;import javax.jms.Destination;import javax.jms.MessageProducer;import javax.jms.Session;import javax.jms.TextMessage;import org.apache.activemq.ActiveMQConnection;import org.apache.activemq.ActiveMQConnectionFactory;public class Producer {	private static final int SEND_NUMBER = 5;    public static void main(String[] args) {        // ConnectionFactory :连接工厂,JMS 用它创建连接        ConnectionFactory connectionFactory;        // Connection :JMS 客户端到JMS Provider 的连接        Connection connection = null;        // Session: 一个发送或接收消息的线程        Session session;        // Destination :消息的目的地;消息发送给谁.        Destination destination;        // MessageProducer:消息发送者        MessageProducer producer;        // TextMessage message;        // 构造ConnectionFactory实例对象,此处采用ActiveMq的实现jar        connectionFactory = new ActiveMQConnectionFactory(                ActiveMQConnection.DEFAULT_USER,                ActiveMQConnection.DEFAULT_PASSWORD,                "tcp://localhost:61616");        try {            // 构造从工厂得到连接对象            connection = connectionFactory.createConnection();            // 启动            connection.start();            // 获取操作连接            session = connection.createSession(Boolean.TRUE,                    Session.AUTO_ACKNOWLEDGE);            // 获取session注意参数值xingbo.xu-queue是一个服务器的queue,须在在ActiveMq的console配置            destination = session.createQueue("FirstQueue");            // 得到消息生成者【发送者】            producer = session.createProducer(destination);            // 设置不持久化,此处学习,实际根据项目决定            producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);            // 构造消息,此处写死,项目就是参数,或者方法获取            sendMessage(session, producer);            session.commit();        } catch (Exception e) {            e.printStackTrace();        } finally {            try {                if (null != connection)                    connection.close();            } catch (Throwable ignore) {            }        }    }    public static void sendMessage(Session session, MessageProducer producer)            throws Exception {        for (int i = 1; i <= SEND_NUMBER; i++) {            TextMessage message = session                    .createTextMessage("ActiveMq 发送的消息" + i);            // 发送消息到目的地方            System.out.println("发送消息:" + "ActiveMq 发送的消息" + i);            producer.send(message);        }    }}

        customer类(customer项目

package customer;import javax.jms.Connection;import javax.jms.ConnectionFactory;import javax.jms.Destination;import javax.jms.MessageConsumer;import javax.jms.Session;import javax.jms.TextMessage;import org.apache.activemq.ActiveMQConnection;import org.apache.activemq.ActiveMQConnectionFactory;public class Customer {	public static void main(String[] args) {        // ConnectionFactory :连接工厂,JMS 用它创建连接        ConnectionFactory connectionFactory;        // Connection :JMS 客户端到JMS Provider 的连接        Connection connection = null;        // Session: 一个发送或接收消息的线程        Session session;        // Destination :消息的目的地;消息发送给谁.        Destination destination;        // 消费者,消息接收者        MessageConsumer consumer;        connectionFactory = new ActiveMQConnectionFactory(                ActiveMQConnection.DEFAULT_USER,                ActiveMQConnection.DEFAULT_PASSWORD,                "tcp://localhost:61616");        try {            // 构造从工厂得到连接对象            connection = connectionFactory.createConnection();            // 启动            connection.start();            // 获取操作连接            session = connection.createSession(Boolean.FALSE,                    Session.AUTO_ACKNOWLEDGE);            // 获取session注意参数值xingbo.xu-queue是一个服务器的queue,须在在ActiveMq的console配置            destination = session.createQueue("FirstQueue");            consumer = session.createConsumer(destination);            while (true) {                //设置接收者接收消息的时间,为了便于测试,这里谁定为100s                TextMessage message = (TextMessage) consumer.receive(100000);                if (null != message) {                    System.out.println("收到消息" + message.getText());                } else {                    break;                }            }        } catch (Exception e) {            e.printStackTrace();        } finally {            try {                if (null != connection)                    connection.close();            } catch (Throwable ignore) {            }        }    }}

        然后运行producer项目,以下为控制台输出:

发送消息:ActiveMq 发送的消息1发送消息:ActiveMq 发送的消息2发送消息:ActiveMq 发送的消息3发送消息:ActiveMq 发送的消息4发送消息:ActiveMq 发送的消息5收到消息ActiveMq 发送的消息1收到消息ActiveMq 发送的消息2收到消息ActiveMq 发送的消息3收到消息ActiveMq 发送的消息4收到消息ActiveMq 发送的消息5

       在刚刚打开的页面里找到queues,点击可以看到我们刚刚创建的queues

143233_fuSg_2358114.png

         注:有时候并不明白队列到底是被用来做什么?或者说它可以做什么?什么情况下需要做?举几个简单的例子,或许不合适,但应该能够明白:

        1、日志。当多个类实例同时产生日志的时候,因为日志文件只有一个,每次写入都只能有一个类实例能够写入,其他全部阻塞。这样显然是消耗性能的,那么如果这个时候有个中间件,类实例只需要把要写的日志传入中间件,就执行结束,然后另外一个程序不断从中间件获取日志再去写入日志,那么这样能够提升很大的性能。

        2、分布式事物。我们知道分布式项目的事物是很麻烦的,加入我们购买了一个商品,那么就需要同时去更改很多东西,比如客户信息、订单信息、发货信息等,在dao或service里一个一个去连接再写入,会导致页面长时间无反应,用户体验度非常差,而如果把这些要添加的数据写到一个中间件,其他专门的程序去读取处理,这样页面就不会出现长时间等待问题。  

        另外注意,TextMessage类的setObjectProperty只能传递原始类型、字符、map、list,其他类型会报错。

与spring集成点击此:http://my.oschina.net/shyloveliyi/blog/496489

转载于:https://my.oschina.net/shyloveliyi/blog/496288

你可能感兴趣的文章