摘要:
我正在尝试使用KNative事件通过Kafka主题公开一个简单的Web应用程序。服务器应该能够同时处理多个请求,但不幸的是,当我通过Kafka发送它们时,它似乎是按顺序处理它们的。但是,当直接向服务发出简单的HTTP请求时,并发性工作得很好。
设置:
安装程序仅使用指向我的KNative Service
的KafkaSource
,并使用使用bitnami/kafka
舵图部署的Kafka示例。
我使用的版本是用于KNative服务和事件的v1.7.1
,以及用于Kafka事件集成的v1.7.0
(来自knative-sandbox/eventing-kafka
)。
代码:
我尝试部署的服务是一个python FastAPI应用程序,它在接收到请求(具有某种ID)时,记录接收到的请求,休眠5秒,然后返回一条伪消息:
import asyncio
from fastapi import FastAPI
from pydantic import BaseModel
import logging
logging.basicConfig(
format="%(asctime)s %(levelname)-8s %(message)s",
level=logging.DEBUG, datefmt="%Y-%m-%d %H:%M:%S",
)
app = FastAPI()
class Item(BaseModel):
id: str
@app.post("/")
async def root(item: Item):
logging.debug(f"Request received with ID: {item.id}")
await asyncio.sleep(5)
logging.debug(f"Request complete for ID: {item.id}")
return {"message": "Hello World"}
该应用程序使用uvicorn提供服务:
FROM python:3.9-slim
RUN pip install fastapi uvicorn
ADD main.py .
ENTRYPOINT uvicorn --host 0.0.0.0 --port 8877 main:app
服务部署规范显示我设置的containerConcurrency
值大于1
:
apiVersion: serving.knative.dev/v1
kind: Service
metadata:
name: concurrency-test
spec:
template:
metadata:
annotations:
autoscaling.knative.dev/class: "kpa.autoscaling.knative.dev"
autoscaling.knative.dev/metric: "concurrency"
autoscaling.knative.dev/target: "5"
spec:
containerConcurrency: 5
containers:
- name: app
image: dev.local/concurrency-test:latest
imagePullPolicy: IfNotPresent
ports:
- containerPort: 8877
---
apiVersion: sources.knative.dev/v1beta1
kind: KafkaSource
metadata:
name: concurrency-test
spec:
consumerGroup: concurrency-test-group
bootstrapServers:
- kafka.default.svc.cluster.local:9092
topics:
- concurrency-test-requests
sink:
ref:
apiVersion: serving.knative.dev/v1
kind: Service
name: concurrency-test
注:我也尝试在KafkaSource
中使用spec.consumers: 2
,但行为相同。
日志:
当使用HTTP直接向服务发送两个并发请求时,日志如下所示(两个请求都在6秒内完成,因此并发生效):
2022-09-12 02:14:36 DEBUG Request received with ID: abc
2022-09-12 02:14:37 DEBUG Request received with ID: def
2022-09-12 02:14:41 DEBUG Request complete for ID: abc
INFO: 10.42.0.7:0 - "POST / HTTP/1.1" 200 OK
2022-09-12 02:14:42 DEBUG Request complete for ID: def
INFO: 10.42.0.7:0 - "POST / HTTP/1.1" 200 OK
然而,当通过Kafka发送请求时,日志如下所示(请求正在一个接一个地处理):
2022-09-12 02:14:55 DEBUG Request received with ID: 111
2022-09-12 02:15:00 DEBUG Request complete for ID: 111
INFO: 10.42.0.7:0 - "POST / HTTP/1.1" 200 OK
2022-09-12 02:15:00 DEBUG Request received with ID: 222
2022-09-12 02:15:05 DEBUG Request complete for ID: 222
INFO: 10.42.0.7:0 - "POST / HTTP/1.1" 200 OK
当仅使用KafkaSource
使用事件时,请让我知道这种顺序请求处理是否是预期的行为,我希望在此设置中有启用并发的方法。
1条答案
按热度按时间kmynzznz1#
Kafka提供分区内的排序(实现为分布式日志)。您可能需要更改Kafka主题上的分区数量以实现更高的并行度;您也可以使用
spec.consumers
值来增加吞吐量(未经测试)。我还鼓励您将问题提交给in the
eventing-kafka
repo,如果您正在寻找其他行为,请添加任何额外的旋钮。