rabbitmq 如何在MassTransit中制作过滤器?

cwdobuhd  于 7个月前  发布在  RabbitMQ
关注(0)|答案(1)|浏览(70)

我试图处理与MassTransit,我有一些服务器与StableDeficiency,他们接受任务,在队列中生成图像
但事实是,不同的服务器有不同的模型集,我不想为每个模型创建大量队列
我的GPU服务器如何从队列中过滤任务,并只处理它们可以执行的任务?
我的PublisherServer

internal class Program {
    static async Task Main(string[] args) {
        var busControl = Bus.Factory.CreateUsingRabbitMq(cfg => {
            cfg.Host(new Uri(ServerConfig.Host), h => {
                h.Username(ServerConfig.Username);
                h.Password(ServerConfig.Password);
            });

            cfg.ReceiveEndpoint("calculation_results_queue", e => {
                e.Consumer(() => new ResultConsumer());
            });
        });
        await busControl.StartAsync();
        Console.WriteLine("Publisher is running...");

        await busControl.Publish<CalculationTask>(new {
            TaskId = NewId.NextGuid().ToString(),
            Data = "Important data for calculation"
        }, context => {
            context.Headers.Set("model", "model_comic");
        });

        Console.WriteLine("Press any key to exit");
        await Task.Run(() => Console.ReadKey());
        await busControl.StopAsync();
    }
}

字符串
我的GPU工人

internal class Program {
    static async Task Main(string[] args) {
        var busControl = Bus.Factory.CreateUsingRabbitMq(cfg => {
            cfg.Host(new Uri(ServerConfig.Host), h => {
                h.Username(ServerConfig.Username);
                h.Password(ServerConfig.Password);
            });

            cfg.ReceiveEndpoint("calculation_task_queue", e => {
                e.Bind("headers_exchange", x => {
                    x.ExchangeType = "headers";
                    x.SetExchangeArgument("x-match", "any");
                    x.SetExchangeArgument("model", "model_real");
                    x.SetExchangeArgument("model", "model_anime");
                });

                e.Consumer(() => new CalculationTaskConsumer());
            });
        });

        await busControl.StartAsync();

        Console.WriteLine("Press any key to exit");
        await Task.Run(() => Console.ReadKey());
        await busControl.StopAsync();
    }
}


我的员工不应该接收信息,但他还是这样做了。

wpx232ag

wpx232ag1#

cfg.ReceiveEndpoint("calculation_task_queue", e => {

    e.ConfigureConsumeTopology = false;
    
    e.Bind("headers_exchange", x => {
        x.ExchangeType = "headers";
        x.SetExchangeArgument("x-match", "any");
        x.SetExchangeArgument("model", "model_real");
        x.SetExchangeArgument("model", "model_anime");
    });

    e.Consumer(() => new CalculationTaskConsumer());
});

字符串
默认的拓扑配置可能是原因所在,但您可以通过查看RabbitMQ管理控制台并查看附加绑定来了解这一点。

相关问题