java spring 使用 rabbitmq

2016-07-24 21:54:05   最后更新: 2016-07-24 21:54:05   访问数量:636




在项目开发的过程中,遇到了一个奇怪的问题,使用原生的 com.rabbitmq.client 包操作 rabbitmq,失败率高达近 50%,超时非常严重

经过仔细排查,发现是在创建 channel 的阶段耗时过长,那么,这究竟是为什么呢?

经过阅读源码,我们看到 com.rabbitmq.client 的 Connection 在创建 Channel 的过程中通过 synchronized 关键字使用了锁,致使同一个 connection 上的所有线程同时只能有一个执行 channel 的创建过程,而其他线程需要阻塞等待这一过程的完成,这也就造成了超时的产生

那么,如何解决这个问题呢?我们是否可以让所有的线程去共享和使用同一个 channel 呢?他们共享同一个 channel 也就不需要分别创建,因此就不会陷入对锁的争夺和等待了

通过查阅官方文档,在com.rabbitmq.client Interface Channel中提到:

While a Channel can be used by multiple threads, it's important to ensure that only one thread executes a command at once. Concurrent execution of commands will likely cause an UnexpectedFrameError to be thrown.

 

因此,与 Connection 不同,我们必须为每一个工作线程创建一个单独的 channel,那么,解决办法就是为单独的线程去缓存他唯一的 channel 了,好在 spring 为我们提供了 org.springframework.amqp.rabbit 包,他为所有的线程创建了两个分别缓存具有事务的和不具有事务的 channel 的 List,避免了这个问题的产生

通过换用 org.springframework.amqp.rabbit 包,问题也确实得以解决

本篇日志中,我们就介绍一下 org.springframework.amqp.rabbit 包的用法

 

要使用 spring 提供的 org.springframework.amqp.rabbit 包,我们首先需要配置 pom.xml 来下载相应的 jar 包

<dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>2.8.2</version> </dependency> <dependency> <groupId>org.springframework.amqp</groupId> <artifactId>spring-amqp</artifactId> <version>1.1.1.RELEASE</version> </dependency> <dependency> <groupId>org.springframework.amqp</groupId> <artifactId>spring-rabbit</artifactId> <version>1.1.1.RELEASE</version> </dependency> <dependency> <groupId>com.caucho</groupId> <artifactId>hessian</artifactId> <version>4.0.7</version> </dependency>

 

 

接下来我们就需要为我们的应用类注入 rabbitConnectionFactory 的对象了,因此,在此之前,我们需要先创建相应的 bean

 

xml 方式

我们可以使用 xml 方式声明 bean:

<bean id="rabbitConnectionFactory" class="org.springframework.amqp.rabbit.connection.CachingConnectionFactory" destroy-method="destroy"> <property name="host" value="127.0.0.1"/> <property name="port" value="5672"/> <property name="username" value="user"/> <property name="password" value="psword"/> <property name="virtualHost" value="/"/> <property name="channelCacheSize" value="2"/> <!-- 事件处理线程数 --> </bean>

 

 

Java 注解方式

我们也可以通过 @Configuration 注解实现 bean 的声明,可以参看:

spring 实现基本 xml 配置注解化

package com.techlog.test.configuration; import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** * Created by techlog on 2016/7/24. */ @Configuration public class ApplicationContext { @Bean public CachingConnectionFactory rabbitmqFactory() { CachingConnectionFactory factory = new CachingConnectionFactory(); factory.setHost("127.0.0.1"); factory.setPort(5672); factory.setUsername("user"); factory.setPassword("password"); factory.setVirtualHost("/"); return factory; } }

 

 

注入成员

@Resource private CachingConnectionFactory rabbitmqFactory;

 

 

这样,我们就可以使用这个成员来操作 rabbitmq 了,当然,即使你使用的不是 spring 框架,在你的 java 代码中,你也可以使用这个包,通过上述方法创建 CachingConnectionFactory 对象即可

 

下面是一个简单的 publisher 的代码:

package com.techlog.test.service; import com.rabbitmq.client.AMQP; import com.rabbitmq.client.BasicProperties; import com.rabbitmq.client.Channel; import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; import org.springframework.amqp.rabbit.connection.Connection; import org.springframework.stereotype.Service; import javax.annotation.Resource; import java.io.IOException; /** * Created by techlog on 2016/7/22. */ @Service public class RabbitmqPublisher { @Resource private CachingConnectionFactory rabbitmqFactory; private Connection connection = null; public void send(String exchange, String queue, String message) throws IOException { if (connection == null) { connection = rabbitmqFactory.createConnection(); } Channel channel = connection.createChannel(false); AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder(); builder.priority(3).clusterId("techlog_rabbit"); channel.basicPublish(exchange, queue, builder.build(), message.getBytes()); } }

 

 

spring 创建 rabbitmq consumer 之前还需要配置 Consumer 和 listener 的 bean:

<bean id="messageReceiver" class="com.techlog.test.service.RabbitmqConsumer"></bean> <rabbit:listener-container connection-factory="connectionFactory"> <rabbit:listener queues="queueTest" ref="messageReceiver"/> </rabbit:listener-container>

 

 

package com.techlog.test.service; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessageListener; import org.springframework.stereotype.Service; import javax.annotation.Resource; import java.io.IOException; import java.util.Arrays; /** * Created by techlog on 2016/7/24. */ @Service public class RabbitmqConsumer implements MessageListener { private Logger logger = LoggerFactory.getLogger(MessageConsumer.class); @Override public void onMessage(Message message) { logger.info("receive message:{}", message); } }

 

 

rabbitmq-api-guide -- http://www.rabbitmq.com/api-guide.html

com.rabbitmq.client Interface Channel -- http://www.rabbitmq.com/releases/rabbitmq-java-client/v2.5.1/rabbitmq-java-client-javadoc-2.5.1/com/rabbitmq/client/Channel.html

 






技术帖      龙潭书斋      技术分享      rabbitmq      java      message queue      connection      queue      spring      exchange      channel     


京ICP备15018585号