java—有没有更好的方法使用kafka实现多租户?

uxh89sit  于 2021-06-30  发布在  Java
关注(0)|答案(1)|浏览(438)

我正在尝试使用springboot实现一个多租户微服务。我已经实现了web层和持久层。在web层,我实现了一个过滤器,在原型bean中设置租户id(使用threadlocaltargetsource),在持久化层,我使用了hibernate多租户配置(每个租户的模式),它们工作正常,数据持久化在适当的模式中。目前我正在消息层上实现相同的行为,使用springkaka库,到目前为止ir的工作方式与我预期的一样,但是我想知道是否有更好的方法来实现它。
这是我的密码:
这是管理kafkamessagelistenercontainer的类:

@Component
public class MessagingListenerContainer {

    private final MessagingProperties messagingProperties;

    private KafkaMessageListenerContainer<String, String> container;

    @PostConstruct
    public void init() {
        ContainerProperties containerProps = new ContainerProperties(
                messagingProperties.getConsumer().getTopicsAsList());

        containerProps.setMessageListener(buildCustomMessageListener());

        container = createContainer(containerProps);
        container.start();
    }

    @Bean
    public MessageListener<String, String> buildCustomMessageListener() {
        return new CustomMessageListener();
    }

    private KafkaMessageListenerContainer<String, String> createContainer(
            ContainerProperties containerProps) {
        Map<String, Object> props = consumerProps();
        …
        return container;
    }

    private Map<String, Object> consumerProps() {
        Map<String, Object> props = new HashMap<>();
        …
        return props;
    }

    @PreDestroy
    public void finish() {
        container.stop();
    }

}

这是custommessagelistener:

@Slf4j
public class CustomMessageListener implements MessageListener<String, String> {

    @Autowired
    private TenantStore tenantStore; // Prototype Bean

    @Autowired
    private List<ServiceListener> services;

    @Override
    public void onMessage(ConsumerRecord<String, String> record) {
        log.info(“Tenant {} | Payload: {} | Record: {}", record.key(),
                record.value(), record.toString());

        tenantStore.setTenantId(record.key()); // Currently tenant is been setting as key

        services.stream().forEach(sl -> sl.onMessage(record.value()));

    }

}

这是一个测试服务,将使用消息数据和租户:

@Slf4j
@Service
public class ConsumerService implements ServiceListener {

    private final MessagesRepository messages;
    private final TenantStore tenantStore;

    @Override
    public void onMessage(String message) {
        log.info("ConsumerService {}, tenant {}", message, tenantStore.getTenantId());
        messages.save(new Message(message));
    }

}

谢谢你的时间!

2w3kk1z5

2w3kk1z51#

只是要澄清一下(如果我错了,请纠正我):您对所有租户使用相同的主题。根据每个租户区分消息的方法是使用消息键,在您的情况下,消息键是租户id。
通过使用消息头来存储租户id而不是密钥,可以做一些轻微的改进。通过这样做,您将不局限于基于租户对消息进行分区。
虽然您描述的模型可以工作,但它有一个主要的安全问题。如果有人访问了你的主题,那么你将泄露所有租户的数据。
一种更安全的方法是使用主题命名约定和acl(访问控制列表)。你可以在这里找到一个简短的解释。简而言之,您可以使用后缀或前缀将租户的名称包含在主题名称中。e、 g:orders\u tenanta,orders\u tenantb或tenanta\u orders,tenantb\u orders
然后,使用acl可以限制哪些应用程序可以连接到这些特定主题。如果您的一个租户需要将其应用程序直接连接到您的kafka集群,那么此场景也很有用。

相关问题