RocketMQ SpringBoot 采坑

news/2024/7/5 9:54:22

1、项目结构:

在这里插入图片描述

2、Provider

2.1、Provider pom

<!--spring boot 版本依赖-->
<parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>2.1.4.RELEASE</version>
    <relativePath/> <!-- lookup parent from repository -->
</parent>
<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-test</artifactId>
    </dependency>
    <!-- https://mvnrepository.com/artifact/org.apache.rocketmq/rocketmq-client -->
    <dependency>
        <groupId>org.apache.rocketmq</groupId>
        <artifactId>rocketmq-client</artifactId>
        <version>4.4.0</version>
    </dependency>
</dependencies>

2.2、application.yml

server:
  port: 8080
rocketmq:
  producer:
    groupName: hello_group_provider
    nameServer: 192.168.92.39
    port: 9876

2.3、RocketMQProducer

@Component
public class RocketMQProducer {
    @Value("${rocketmq.producer.groupName}")
    private String groupName;
    @Value("${rocketmq.producer.nameServer}")
    private String nameServer;
    @Value("${rocketmq.producer.port}")
    private String port;
    private DefaultMQProducer producer;
    /**
     * 功能描述 获取当前producer
     */
    @Bean
    public DefaultMQProducer getProducer() {
        //设置producer 并且开启producer
        producer = new DefaultMQProducer(groupName);
        producer.setNamesrvAddr(nameServer + ":" + port);
        try {
            producer.start();
        } catch (MQClientException e) {
            e.printStackTrace();
        }
        return producer;
    }
}

2.4、Service

@Service
public class ProducerService {
    @Autowired
    private DefaultMQProducer producer;
    private static final String topic = "helloTopic";
    private static final String tag = "helloTag";
    public void sendMsg() {
        Message msg = new Message(topic, tag, "hello RocketMQ".getBytes());
        try {
            SendResult send = producer.send(msg);
            System.out.println(send);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

2.5、测试方法

@RunWith(SpringRunner.class)
//主application方法
@SpringBootTest(classes = RocketMQProducerApplication.class)
public class ProducerTest {
    @Autowired
    private ProducerService producerService;
    @Test
    public void testSend() {
        producerService.sendMsg();
    }
}

2.6、运行
No route info of this topic, xxxxxxxxxx
解决思路:maven版本必须和RocketMQ b版本一致
在这里插入图片描述
在这里插入图片描述

发送超时
rocketmq都会根据当前网卡选择一个IP使用,当你的机器有多块网卡时,很有可能会有问题。比如,我遇到的问题是我机器上有两个IP,一个公网IP,一个私网IP, 因此需要配置broker.conf 指定当前的公网ip, 然后重新启动broker。

#修改文件
vim  conf/broker.conf
brokerIP1=192.168.92.39
#重启broker
sh ./mqshutdown broker
nohup sh ./mqbroker -n localhost:9876  -c ../conf/broker.conf   autoCreateTopicEnable=true   >/dev/null  2>&1  &

在这里插入图片描述
在这里插入图片描述
**2,7、重新运行
在这里插入图片描述

3、Customer

3.1、pom文件同Customer

3.2、application.yml

server:
  port: 8081
rocketmq:
  consumer:
    groupName: hello_group_consumer
    nameServer: 192.168.92.39
    port: 9876

3.3、消费者

@Component
public class RocketMQCustomer {
    @Value("${rocketmq.consumer.groupName}")
    private String groupName;
    @Value("${rocketmq.consumer.nameServer}")
    private String nameServer;
    @Value("${rocketmq.consumer.port}")
    private String port;
    private static final String topic = "helloTopic";
    @Bean
    public DefaultMQPushConsumer getConsumer() {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer();
        //配置组名 IP
        consumer.setNamesrvAddr(nameServer + ":" + port);
        consumer.setConsumerGroup(groupName);
        //最后偏移量读取消息
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
        try {
            //订阅主题,监听主题下哪些标签
            consumer.subscribe(topic, "*");
            consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
                try {
                    //默认一条一条消息消费
                    Message msg = msgs.get(0);
                    System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), new String(msgs.get(0).getBody()));
                    String topic = msg.getTopic();
                    String body = new String(msg.getBody(), "utf-8");
                    String tags = msg.getTags();
                    String keys = msg.getKeys();
                    System.out.println("topic=" + topic + ", tags=" + tags + ", keys=" + keys + ", msg=" + body);
                    //返回成功
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                } catch (UnsupportedEncodingException e) {
                    e.printStackTrace();
                    //返回稍后进行投递
                    return ConsumeConcurrentlyStatus.RECONSUME_LATER;
                }
            });
            //开始
            consumer.start();
        } catch (MQClientException e) {
            e.printStackTrace();
        }
        return consumer;
    }
}

3.4、启动消费者
在这里插入图片描述


http://www.niftyadmin.cn/n/4556920.html

相关文章

MemSQL Start[c]UP 2.0 - Round 1 F - Permutation 思维+线段树维护hash值

F - Permutation 思路&#xff1a;对于当前的值x&#xff0c; 只需要知道x k, x - k这两个值是否出现在其左右两侧&#xff0c;又因为每个值只有一个&#xff0c; 所以可以转换成&#xff0c;xk, x-k在到x所在位置的时候是否都出现&#xff0c;或者都不出现&#xff0c;即出现…

什么是索引?

最常见的索引类型涉及单个列&#xff0c;将来自该列的值的副本存储在数据结构中&#xff0c;从而允许快速查找具有相应列值的行。B树数据结构让索引可以快速找到一个特定的值&#xff0c;一组值&#xff0c;或者一个范围内的值&#xff0c;对应于子句 中的运算符&#xff0c;如…

学c语言有什么用

自己查看看 ||| 基础性的语法 它适合作为系统描述语言 学学还是有必要的 为以后打基础 由于汇编语言依赖于计算机硬件 即可用来编写系统软件 C语言是国际上广泛流行的、很有发展前途的计算机高级语言 早期的操作系统等系统软件主要是用汇编语言编写的&#xff08;包括 UNIX操作…

Spire.Doc修改目录字体大小

官方论坛 官方论坛的解决方案&#xff1a; static void Main(string[] args){Document doc new Document();doc.LoadFromFile("目录.docx");foreach (Section section in doc.Sections){//遍历body下面所有对象foreach (DocumentObject obj in section.Body.ChildO…

分布式的演练、Zookeeper基本概念(一)

1、从集中式到分布式 在20世纪60年代大型主机被发明出来之后&#xff0c;凭借其超强的计算和io处理能力以及在稳定型和安全性方面的卓越表现&#xff0c;在很长的一段时间内&#xff0c;大型主机引领了计算机行业以及商业计算领域的发展&#xff0c;在大型主机的研发上最知名的…

C语言一共有哪32个关键字

声明无类型指针&#xff08;基本上就这三个作用&#xff09; default&#xff1a;开关语句中的“其他”分支 goto&#xff1a;无条件跳转语句 sizeof&#xff1a;计算数据类型长度 volatile&#xff1a;说明变量在程序执行中可被隐含地改变 do &#xff1a;循环语句的循环体 wh…

Zookeeper安装、常用命令(二)

1、安装 1.1、需要环境jdk 1.2、解压 tar -zxvf zookeeper-3.4.6.tar.gz mv zookeeper-3.4.6/ zookeeper1.3、更名 cd /usr/local/zookeeper/conf cp zoo_sample.cfg zoo.cfg1.4、创建文件 mkdir -p /usr/local/datas/zookeeper mkdir -p /usr/local/logs/zookeeper1.5、…

在linux中根据pid杀死所有子进程/后代进程

杀死所有子进程&#xff1a; pkill -P $$$$ 为 目标 pid 另一种情况是你可能想要杀死当前 shell 进程的所有后代以及直接子进程。在这种情况下&#xff0c;你可以使用下面的递归 shell 函数列出所有后代 PID&#xff0c;然后将它们作为参数传递给 kill&#xff1a; list_des…