rabbitmq在storm拓扑中创建新连接时阻塞

eivnm1vs  于 2021-06-25  发布在  Storm
关注(0)|答案(1)|浏览(344)

我是storm的新手,我在我的喷口中使用rabbitmq,它从某个队列接收元组,有一个客户机运行另一台机器,将元组插入该队列我运行了一个简单的rabbitmq示例程序,该程序运行良好,但当我在storm喷口中使用它时,它可能会被阻塞

connection = factory.newConnection();

即使我的rabbitmq服务器也在运行,并且在同一台机器上,当我运行示例代码时,它成功地运行了。打印对账单打印到对账单

System.out.println(" setting host to 192.168.8.218..... ");

下面是我完整的喷口类。

package storm.starter.spout;

import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import backtype.storm.utils.Utils;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.QueueingConsumer;
import java.util.Map;
import java.util.Random;
import java.net.*;
import java.io.*;
import java.lang.Exception;
import java.io.IOException;

public class RabbitmqSpout extends BaseRichSpout {
    SpoutOutputCollector _collector;
    public final static String QUEUE_NAME = "record";
    ConnectionFactory factory;
    Connection connection;
    Channel channel;
    QueueingConsumer consumer;

    @Override
    public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) 
    {
        _collector = collector;
        System.out.println(" [*] Intilization of spout..... ");

        try
        {
            factory = new ConnectionFactory();
            System.out.println(" creating connection factory..... ");
            factory.setHost("192.168.8.96");
            System.out.println(" setting host to 192.168.8.218..... ");
            connection = factory.newConnection();
            System.out.println(" creating new connection..... ");
            channel = connection.createChannel();
            System.out.println(" creating new channel..... ");
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            System.out.println(" Declaring queue..... ");
            System.out.println(" [*] Waiting for messages. ");
        }
        catch(Exception exception)
        {
            System.out.println("Exception occurred. "+exception.getMessage());
        }

    }

    @Override
    public void nextTuple() 
    {
        System.out.println("In wait of tuples.... ");
        try
        {
            consumer = new QueueingConsumer(channel);
            System.out.println(" trying to consume..... ");
            channel.basicConsume(QUEUE_NAME, true, consumer);

            while (true) 
            {
                System.out.println(" trying to deliver..... ");
                QueueingConsumer.Delivery delivery = consumer.nextDelivery();
                String message = new String(delivery.getBody());
                System.out.println(" getting string..... ");
                System.out.println(" [x] Received '" + message + "'");
                System.out.print("emitting Rabbitmq Queue tuple");
                _collector.emit(new Values(message));
                System.out.print("emitted Rabbitmq Queue tuple");
            }   
        }

        catch(IOException io)
        {
            System.out.println("Exception occurred. ");
        }
        catch(Exception exception)
        {
            System.out.println("Exception occurred. ");
        }

    }        

    @Override
    public void ack(Object id) {
    }

    @Override
    public void fail(Object id) 
    {

    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) 
    {
        declarer.declare(new Fields("record"));
    }

}

hc2pp10m

hc2pp10m1#

connection = factory.newConnection()

完成时间 RabbitMQConsumer() rabbitmqconsumer类的方法,因为rabbitmq consumer将数据从队列中取出,而这又将数据提供给您的rabbitmq喷口。请参考:[https://github.com/ppat/storm-rabbitmq/tree/master/src/main/java/io/latent/storm/rabbitmq][1]

相关问题