与KafkaSource一起使用事件时,KNative服务不并发处理请求

oewdyzsn  于 2022-09-21  发布在  Kafka
关注(0)|答案(1)|浏览(130)

摘要:

我正在尝试使用KNative事件通过Kafka主题公开一个简单的Web应用程序。服务器应该能够同时处理多个请求,但不幸的是,当我通过Kafka发送它们时,它似乎是按顺序处理它们的。但是,当直接向服务发出简单的HTTP请求时,并发性工作得很好。

设置:

安装程序仅使用指向我的KNative ServiceKafkaSource,并使用使用bitnami/kafka舵图部署的Kafka示例。

我使用的版本是用于KNative服务和事件的v1.7.1,以及用于Kafka事件集成的v1.7.0(来自knative-sandbox/eventing-kafka)。

代码:

我尝试部署的服务是一个python FastAPI应用程序,它在接收到请求(具有某种ID)时,记录接收到的请求,休眠5秒,然后返回一条伪消息:

import asyncio
from fastapi import FastAPI
from pydantic import BaseModel
import logging

logging.basicConfig(
    format="%(asctime)s %(levelname)-8s %(message)s",
    level=logging.DEBUG, datefmt="%Y-%m-%d %H:%M:%S",
)

app = FastAPI()

class Item(BaseModel):
    id: str

@app.post("/")
async def root(item: Item):
    logging.debug(f"Request received with ID: {item.id}")
    await asyncio.sleep(5)
    logging.debug(f"Request complete for ID: {item.id}")
    return {"message": "Hello World"}

该应用程序使用uvicorn提供服务:

FROM python:3.9-slim

RUN pip install fastapi uvicorn

ADD main.py .

ENTRYPOINT uvicorn --host 0.0.0.0 --port 8877 main:app

服务部署规范显示我设置的containerConcurrency值大于1

apiVersion: serving.knative.dev/v1
kind: Service
metadata:
  name: concurrency-test
spec:
  template:
    metadata:
      annotations:
        autoscaling.knative.dev/class: "kpa.autoscaling.knative.dev"
        autoscaling.knative.dev/metric: "concurrency"
        autoscaling.knative.dev/target: "5"
    spec:
      containerConcurrency: 5
      containers:
        - name: app
          image: dev.local/concurrency-test:latest
          imagePullPolicy: IfNotPresent
          ports:
            - containerPort: 8877
---
apiVersion: sources.knative.dev/v1beta1
kind: KafkaSource
metadata:
  name: concurrency-test
spec:
  consumerGroup: concurrency-test-group
  bootstrapServers:
  - kafka.default.svc.cluster.local:9092
  topics:
  - concurrency-test-requests
  sink:
    ref:
      apiVersion: serving.knative.dev/v1
      kind: Service
      name: concurrency-test

注:我也尝试在KafkaSource中使用spec.consumers: 2,但行为相同。

日志:

当使用HTTP直接向服务发送两个并发请求时,日志如下所示(两个请求都在6秒内完成,因此并发生效):

2022-09-12 02:14:36 DEBUG    Request received with ID: abc
2022-09-12 02:14:37 DEBUG    Request received with ID: def
2022-09-12 02:14:41 DEBUG    Request complete for ID: abc
INFO:     10.42.0.7:0 - "POST / HTTP/1.1" 200 OK
2022-09-12 02:14:42 DEBUG    Request complete for ID: def
INFO:     10.42.0.7:0 - "POST / HTTP/1.1" 200 OK

然而,当通过Kafka发送请求时,日志如下所示(请求正在一个接一个地处理):

2022-09-12 02:14:55 DEBUG    Request received with ID: 111
2022-09-12 02:15:00 DEBUG    Request complete for ID: 111
INFO:     10.42.0.7:0 - "POST / HTTP/1.1" 200 OK
2022-09-12 02:15:00 DEBUG    Request received with ID: 222
2022-09-12 02:15:05 DEBUG    Request complete for ID: 222
INFO:     10.42.0.7:0 - "POST / HTTP/1.1" 200 OK

当仅使用KafkaSource使用事件时,请让我知道这种顺序请求处理是否是预期的行为,我希望在此设置中有启用并发的方法。

kmynzznz

kmynzznz1#

Kafka提供分区内的排序(实现为分布式日志)。您可能需要更改Kafka主题上的分区数量以实现更高的并行度;您也可以使用spec.consumers值来增加吞吐量(未经测试)。

我还鼓励您将问题提交给in the eventing-kafka repo,如果您正在寻找其他行为,请添加任何额外的旋钮。

相关问题