apache flink中两个流左连接的正确方法

6g8kf2rb  于 2021-06-24  发布在  Flink
关注(0)|答案(1)|浏览(394)

我正在使用apache flink开发一个欺诈检测系统,但我是一个初学者,一直在解决这个问题:
我想从两个流中进行左连接,一个流包含当前交易,另一个流已验证与银行的交易,在那里我可以发现是否有一些错误,如被盗的信用卡等。因此,我需要将它们连接起来,以了解卡是否在过去被拒绝。

DataStream<Card> currentDataStream =  getCardsStream(env, Parameters.CURRENT_SOCKET)
            .keyBy((card) -> card.getCardID);

    DataStream<Card> historicDataStream =  getCardsStream(env, Parameters.HISTORIC_SOCKET)
            .keyBy((card) -> card.getCardID());

我现在正在做的是一个richcoflatmapment函数,它在每次historicdatastream到达时更新一个名为historiclist的列表状态,并返回一个包含当前卡的元组和一个包含该id的所有关联事件的列表:

public class LeftJoin extends RichCoFlatMapFunction<Card, Card, Tuple2<Card, List<Card>> > {

    private ValueState<Card> currentValueState;
    private ListState<Card> historicListState;

    @Override
    public void open(Configuration parameters) throws Exception {
        currentValueState = getRuntimeContext().getState(new ValueStateDescriptor<>("Current State", Card.class));
        historicListState = getRuntimeContext().getListState(new ListStateDescriptor<>("historic state", Card.class));
    }

    @Override
    public void flatMap1(Card currentCard, Collector<Tuple2<Card, List<Card>>> out) throws Exception {
        Iterable<Card> historicCardList =  historicListState.get();

        //If there is a coincidence
        if (Iterables.size(historicCardList) > 0) {
            out.collect(new Tuple2<>(currentCard, Lists.newArrayList(historicCardList) ));
        } else {
            currentValueState.update(currentCard);
            //Returning null if there are no cards for the Id
            out.collect(new Tuple2<>(currentCard, null));
        }
    }

    @Override
    public void flatMap2(Card historicCard, Collector<Tuple2<Card, List<Card>>> out) throws Exception {
        historicListState.add(historicCard); //Updates the historicListState
    }
}

问题是 List<Card> 在以后我想对照包含的卡检查规则时会给我带来很多麻烦,因为它总是会再次获得所有的卡,并且我需要一种方法来标记我已经按照规则处理的卡,类似这样:

//I don't like this list because it always gets me all the join coincidences
    for (Card card : historicList) {

        //Comparar cada regla del Broadcast state con el error que contiene el elemento card
        if (rule.getBankDecision().equals(card.getErrors())) {

            //Evaluate some rules
            for (Long stateEventTime : windowState.keys()) {
                if (isStateValueInWindow(stateEventTime, windowStartForEvent, System.currentTimeMillis())) {
                    aggregateValuesInState(stateEventTime, aggregator);
                }

            }
    }

有没有更好的方法让加入的卡片成为一个流?

2q5ifsrm

2q5ifsrm1#

我希望我能正确地理解你,如果不能,请纠正我。 private ValueState<Card> currentValueState 是redundent(在本例中,您只更新它,从不读取它的值)
如果我理解正确的话,问题是你在整个historicliststate上发布了你的规则系统,尽管你已经检查了其中的一些。为什么不从历史上删除那些已经超过规则的状态卡呢?

相关问题