Go语言 持续从Azure Service Bus订阅/队列接收消息

baubqpgj  于 5个月前  发布在  Go
关注(0)|答案(1)|浏览(58)

我希望有一个标准函数来持续侦听/接收来自Azure服务总线队列的消息。我希望有一种相当有效的方法来处理订阅/主题上没有消息的情况-而不是不断地在for循环中锤击请求。
目前,ReceiveMessages()函数在接收选项方面与其他语言的SDK相比相当有限。没有MaxWaitTime,因此要处理没有接收到消息的情况,您只需在这里建议的某种形式的循环中不断重试-How do I keep my consumer listening the messages on Azure Sevice Bus using Azure sdk for Golang v0.3.1?

client, err := azservicebus.NewClientFromConnectionString("Connection String", nil)
    
    if err != nil {
        log.Fatalf("Failed to create Service Bus Client: %s", err.Error())
    }

    receiver, err := client.NewReceiverForQueue("queue", nil)

    if err != nil {
        log.Fatalf("Failed to create Consumer: %s", err.Error())
    }

    messages, err := receiver.ReceiveMessages(context.TODO(), 10, nil)

    if err != nil {
        log.Fatalf("Failed to receive Messages: %s", err.Error())
    }

    for _, message := range messages {

        body, err := message.Body()

        if err != nil {
            log.Fatalf("Failed to parse message body: %s", err.Error())
        }

        fmt.Println("Message --->", string(body))

        err = receiver.CompleteMessage(context.TODO(), message)

        if err != nil {
            log.Fatalf("Failed to complete message: %s", err.Error())
        }

字符串
假设不断地请求消息(在可能长时间没有消息的情况下)不是最有效的倾听方式吗?
我已经考虑过实现某种形式的静态/动态等待,但是是否还有更多的“开箱即用”和/或最佳实践?

vql8enpb

vql8enpb1#

  • 使用此参考MSDOC,您可以从Azure服务总线接收消息。
  • 下面的代码连接到Azure服务总线并从队列接收消息。
  • 要安装,请运行go get github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus
package main

import (
    "context"
    "fmt"
    "time"

    "github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus"
)

func main() {
    
    connectionString := "AzureServiceBusConnectionString "

    
    client, err := azservicebus.NewClientFromConnectionString(connectionString, nil)
    if err != nil {
        panic(err)
    }

    
    queueName := "AzureServiceBusQueuename"

    
    receiver, err := client.NewReceiverForQueue(queueName, nil)
    if err != nil {
        panic(err)
    }

    
    ctx, cancel := context.WithTimeout(context.TODO(), 60*time.Second)
    defer cancel()

    messages, err := receiver.ReceiveMessages(ctx, 1, nil)
    if err != nil {
        panic(err)
    }

    for _, message := range messages {
        // The message body is a []byte. For this example, we assume it's a string.
        var body []byte = message.Body
        fmt.Printf("Message received with body: %s\n", string(body))

        
        err = receiver.CompleteMessage(context.TODO(), message, nil)
        if err != nil {
            // Handle completion error as needed
            panic(err)
        }

        fmt.Printf("Received and completed the message\n")
    }

    
    client.Close(context.TODO())
}

字符串

若要持续从Azure服务总线订阅/队列接收消息:

  • 下面的代码从Azure服务接收消息,设置信号处理程序来捕获终止信号,并使用上下文来控制消息接收循环。
package main

import (
    "context"
    "fmt"
    "os"
    "os/signal"
    "syscall"
    "time"

    "github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus"
)

func main() {
    // Replace 'your_connection_string' with the actual Service Bus connection string
    connectionString := "your_connection_string"

    
    client, err := azservicebus.NewClientFromConnectionString(connectionString, nil)
    if err != nil {
        panic(err)
    }

    
    queueName := "your_queue_name"

    // Create a receiver for the specified queue
    receiver, err := client.NewReceiverForQueue(queueName, nil)
    if err != nil {
        panic(err)
    }

    ctx, cancel := context.WithCancel(context.Background())

    // Use a signal handler to gracefully stop the application
    stopSignal := make(chan os.Signal, 1)
    signal.Notify(stopSignal, syscall.SIGINT, syscall.SIGTERM)

    go func() {
        
        <-stopSignal
        fmt.Println("Received termination signal. Stopping gracefully...")
        cancel()
    }()

    
messageLoop:
    for {
        
        select {
        case <-ctx.Done():
            fmt.Println("Context canceled. Exiting the message receiving loop.")
            break messageLoop
        default:
            
            messages, err := receiver.ReceiveMessages(ctx, 1, nil)
            if err != nil {
                // Handle receive error as needed
                fmt.Printf("Error receiving messages: %v\n", err)
                continue
            }

            for _, message := range messages {
                
                var body []byte = message.Body
                fmt.Printf("Message received with body: %s\n", string(body))

            
                err = receiver.CompleteMessage(ctx, message, nil)
                if err != nil {
                    
                    fmt.Printf("Error completing message: %v\n", err)
                }

                fmt.Printf("Received and completed the message\n")
            }

        
            time.Sleep(1 * time.Second)
        }
    }

    
    client.Close(ctx)
}


x1c 0d1x的数据


  • 使用Azure servicebus为Azure go sdk发布此版本。

相关问题