Netty HttpObjectAggregator聚合消息流程详细介绍

x33g5p2x  于2021-08-17 转载在 Java  
字(9.5k)|赞(0)|评价(0)|浏览(1156)

聚合消息

前面我们讲了, 一个HTTP请求最少也会在HttpRequestDecoder里分成两次往后传递,第一次是消息行和消息头,第二次是消息体,哪怕没有消息体,也会传一个空消息体。如果发送的消息体比较大的话,可能还会分成好几个消息体来处理,往后传递多次,这样使得我们后续的处理器可能要写多个逻辑判断,比较麻烦,那能不能把消息都整合成一个完整的,再往后传递呢,当然可以,用HttpObjectAggregator

先介绍一下一些属性

HTTP有个头属性Except:100-continue用来优化服务器和客户端数据传输的,在要发送比较大的数据的时候,不会直接发送,而是会先征求下服务器意见是否可以继续发送数据,服务器可以允许也可以不允许,都应该响应一下。具体介绍可以参考这篇文章

//接受100-continue,响应状态码100
    private static final FullHttpResponse CONTINUE =//Except:100-continue的响应
            new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.CONTINUE, Unpooled.EMPTY_BUFFER);
            
	//不接受,响应状态码417 不支持
    private static final FullHttpResponse EXPECTATION_FAILED = new DefaultFullHttpResponse(
            HttpVersion.HTTP_1_1, HttpResponseStatus.EXPECTATION_FAILED, Unpooled.EMPTY_BUFFER);

	//不接受,响应状态码413 消息体太大而关闭连接
    private static final FullHttpResponse TOO_LARGE_CLOSE = new DefaultFullHttpResponse(
            HttpVersion.HTTP_1_1, HttpResponseStatus.REQUEST_ENTITY_TOO_LARGE, Unpooled.EMPTY_BUFFER);

	//不接受,响应状态码413 消息体太大,没关闭连接
    private static final FullHttpResponse TOO_LARGE = new DefaultFullHttpResponse(
        HttpVersion.HTTP_1_1, HttpResponseStatus.REQUEST_ENTITY_TOO_LARGE, Unpooled.EMPTY_BUFFER);

    static { //设定头消息
        EXPECTATION_FAILED.headers().set(CONTENT_LENGTH, 0);
        TOO_LARGE.headers().set(CONTENT_LENGTH, 0);

        TOO_LARGE_CLOSE.headers().set(CONTENT_LENGTH, 0);
        TOO_LARGE_CLOSE.headers().set(CONNECTION, HttpHeaderValues.CLOSE);//关闭头信息
    }

    private final boolean closeOnExpectationFailed;//如果消息过大是否关闭连接,报异常

结构

看到他有4个泛型,分别对应是聚合HTTP类型的,HTTP通用消息请求行和请求头的,HTTP消息体,HTTP完整通用消息,包括消息体
在这里插入图片描述
对应的父类的泛型就是:
在这里插入图片描述
这些类型直接会影响到后续的逻辑判断,所以要弄清楚对应的关系。

MessageAggregator

主要的逻辑代码在这里,这个是通用的模板,里面就是模板方法啦,先看下他的一些属性吧,他会把HTTP的消息体都封装成一个缓冲区,加到复合缓冲区里。

private static final int DEFAULT_MAX_COMPOSITEBUFFER_COMPONENTS = 1024;//最大复合缓冲区组件个数

    private final int maxContentLength;//最大消息图长度
    private O currentMessage;//当前消息
    private boolean handlingOversizedMessage;//是否处理过大消息

    private int maxCumulationBufferComponents = DEFAULT_MAX_COMPOSITEBUFFER_COMPONENTS;//累加组件的最大个数
    private ChannelHandlerContext ctx;//处理器上下文
    private ChannelFutureListener continueResponseWriteListener;// 100-continue响应监听器

    private boolean aggregating;//是否正在聚合

acceptInboundMessage判断类型

判断是否是泛型I类型,也就是我们HttpObjectAggregator泛型中的HttpObject类型,是才会处理,否则就不处理。然后会判断是否聚合好了,如果没开始聚合就进行聚合,如果还在聚合就继续。

@Override
    public boolean acceptInboundMessage(Object msg) throws Exception { 

        if (!super.acceptInboundMessage(msg)) { //是否是泛型I类型,比如HttpObject类型
            return false;
        }
        @SuppressWarnings("unchecked")
        I in = (I) msg;

        if (isAggregated(in)) { //是否聚合好了
            return false;
        }
        if (isStartMessage(in)) { //是否是开始聚合
            aggregating = true;//开始聚合
            return true;
        } else if (aggregating && isContentMessage(in)) { //正在内容聚合
            return true;
        }

        return false;
    }

decode真正的聚合

这个方法比较长,但是做的事情分那么几个:

  • 如果是开始消息,也就不是请求体,那就开始判断是否有Except:100-continue头信息,有的话根据长度和是否支持来判断是否要返回响应。之后判断如果前面解码失败,就直接整合消息体返回,否则就创建复合缓冲区,如果是消息体的话就添加进去,然后封装成一个完整的消息类型。
  • 如果是消息体了,就加入到复合画冲去里,然后判断是否是最后一个消息体,是的话就进行最后的整合,其实就是设置Content-Length头信息。
@Override
    protected void decode(final ChannelHandlerContext ctx, I msg, List<Object> out) throws Exception { 
        assert aggregating;

        if (isStartMessage(msg)) { //是否是开始消息
            handlingOversizedMessage = false;//没处理超大信息
            if (currentMessage != null) { //上次的消息没释放
                currentMessage.release();
                currentMessage = null;
                throw new MessageAggregationException();
            }

            @SuppressWarnings("unchecked")
            S m = (S) msg;

            // 100-continue需要持续响应
           
            Object continueResponse = newContinueResponse(m, maxContentLength, ctx.pipeline());
            if (continueResponse != null) { //有 100-continue响应
                // Cache the write listener for reuse.
                ChannelFutureListener listener = continueResponseWriteListener;
                if (listener == null) { //不存在监听器要创建一个
                    continueResponseWriteListener = listener = new ChannelFutureListener() { 
                        @Override
                        public void operationComplete(ChannelFuture future) throws Exception { 
                            if (!future.isSuccess()) { 
                                ctx.fireExceptionCaught(future.cause());
                            }
                        }
                    };
                }


                boolean closeAfterWrite = closeAfterContinueResponse(continueResponse);
                handlingOversizedMessage = ignoreContentAfterContinueResponse(continueResponse);
                //这里会直接刷出去,所以HttpResponseEncoder需要放在这个前面,不然写出去没编码过会报错的
                final ChannelFuture future = ctx.writeAndFlush(continueResponse).addListener(listener);

                if (closeAfterWrite) { 
                    future.addListener(ChannelFutureListener.CLOSE);
                    return;
                }
                if (handlingOversizedMessage) { 
                    return;
                }
            } else if (isContentLengthInvalid(m, maxContentLength)) { //消息体长度是否超过了

                invokeHandleOversizedMessage(ctx, m);
                return;
            }
            //解码不成功
            if (m instanceof DecoderResultProvider && !((DecoderResultProvider) m).decoderResult().isSuccess()) { 
                O aggregated;
                if (m instanceof ByteBufHolder) { 
                    aggregated = beginAggregation(m, ((ByteBufHolder) m).content().retain());
                } else { 
                    aggregated = beginAggregation(m, EMPTY_BUFFER);
                }
                finishAggregation0(aggregated);
                out.add(aggregated);
                return;
            }

            CompositeByteBuf content = ctx.alloc().compositeBuffer(maxCumulationBufferComponents);//创建复合缓冲区
            if (m instanceof ByteBufHolder) { //是内容
                appendPartialContent(content, ((ByteBufHolder) m).content());
            }
            currentMessage = beginAggregation(m, content);//开始聚合
        } else if (isContentMessage(msg)) { //后面属于消息体聚合
            if (currentMessage == null) { //长度超过最大了,直接丢弃了,不处理了

                return;
            }
            //提取内容
            CompositeByteBuf content = (CompositeByteBuf) currentMessage.content();

            @SuppressWarnings("unchecked")
            final C m = (C) msg;
            // 超过最大长度了,处理过大的消息
            if (content.readableBytes() > maxContentLength - m.content().readableBytes()) { 
               
                @SuppressWarnings("unchecked")
                S s = (S) currentMessage;
                invokeHandleOversizedMessage(ctx, s);
                return;
            }

            //添加新的内容到复合缓冲区
            appendPartialContent(content, m.content());


            aggregate(currentMessage, m);//整合尾部请求头

            final boolean last;//是不是最后一次聚合
            if (m instanceof DecoderResultProvider) { //处理解码结果
                DecoderResult decoderResult = ((DecoderResultProvider) m).decoderResult(); if (!decoderResult.isSuccess()) { //没解码成功
                    if (currentMessage instanceof DecoderResultProvider) { 
                        ((DecoderResultProvider) currentMessage).setDecoderResult( DecoderResult.failure(decoderResult.cause()));
                    }
                    last = true;
                } else { 
                    last = isLastContentMessage(m);//是否是最后的内容
                }
            } else { 
                last = isLastContentMessage(m);
            }

            if (last) { //是最后的
                finishAggregation0(currentMessage);

                // All done
                out.add(currentMessage);
                currentMessage = null;
            }
        } else { 
            throw new MessageAggregationException();
        }
    }

HttpObjectAggregator的isStartMessage

HTTP来说其实就是判断是否是通用的消息行和消息头信息。

@Override
    protected boolean isStartMessage(HttpObject msg) throws Exception { 
        return msg instanceof HttpMessage;
    }

HttpObjectAggregator的isLastContentMessage

是否是最后的内容。

@Override
    protected boolean isLastContentMessage(HttpContent msg) throws Exception { 
        return msg instanceof LastHttpContent;
    }

HttpObjectAggregator的isAggregated

是否聚合好了。

@Override
    protected boolean isAggregated(HttpObject msg) throws Exception { 
        return msg instanceof FullHttpMessage;
    }

HttpObjectAggregator的newContinueResponse

如果需要100-continue响应的话,要把100-continue头设置去掉,不往后传播了。

@Override
    protected Object newContinueResponse(HttpMessage start, int maxContentLength, ChannelPipeline pipeline) { 
        Object response = continueResponse(start, maxContentLength, pipeline);
       
        if (response != null) { 
            start.headers().remove(EXPECT);//如果有100-continue响应,就不用再传播下去了
        }
        return response;
    }

HttpObjectAggregator的continueResponse

这个就是上面说的根据是否支持100-continue,是否长度超过限制等进行响应。

private static Object continueResponse(HttpMessage start, int maxContentLength, ChannelPipeline pipeline) { 
        if (HttpUtil.isUnsupportedExpectation(start)) { //不支持Expect头
           
            pipeline.fireUserEventTriggered(HttpExpectationFailedEvent.INSTANCE);
            return EXPECTATION_FAILED.retainedDuplicate();
        } else if (HttpUtil.is100ContinueExpected(start)) { //支持100-continue请求
           
            if (getContentLength(start, -1L) <= maxContentLength) { 
                return CONTINUE.retainedDuplicate();//继续
            }
            pipeline.fireUserEventTriggered(HttpExpectationFailedEvent.INSTANCE);
            return TOO_LARGE.retainedDuplicate();//消息体太大
        }

        return null;
    }

HttpObjectAggregator的closeAfterContinueResponse

是否不支持100-continue后把连接断开。

@Override
    protected boolean closeAfterContinueResponse(Object msg) { 
        return closeOnExpectationFailed && ignoreContentAfterContinueResponse(msg);
    }

HttpObjectAggregator的ignoreContentAfterContinueResponse

如果直接给他报400的话就要断开了,后面的内容就不忽略了。

@Override
    protected boolean ignoreContentAfterContinueResponse(Object msg) { 
        if (msg instanceof HttpResponse) { 
            final HttpResponse httpResponse = (HttpResponse) msg;
            return httpResponse.status().codeClass().equals(HttpStatusClass.CLIENT_ERROR);
        }
        return false;
    }

HttpObjectAggregator的beginAggregation

开始聚合就是创建一个聚合的类,根据不同情况创建请求还是响应的完整类型。

@Override
    protected FullHttpMessage beginAggregation(HttpMessage start, ByteBuf content) throws Exception { 
        assert !(start instanceof FullHttpMessage);

        HttpUtil.setTransferEncodingChunked(start, false);

        AggregatedFullHttpMessage ret;
        if (start instanceof HttpRequest) { 
            ret = new AggregatedFullHttpRequest((HttpRequest) start, content, null);//聚合请求
        } else if (start instanceof HttpResponse) { 
            ret = new AggregatedFullHttpResponse((HttpResponse) start, content, null);//聚合响应
        } else { 
            throw new Error();
        }
        return ret;
    }

appendPartialContent

这个就是将内容添加到复合缓冲区里。

private static void appendPartialContent(CompositeByteBuf content, ByteBuf partialContent) { 
        if (partialContent.isReadable()) { //可读的话就加进去
            content.addComponent(true, partialContent.retain());
        }
    }

HttpObjectAggregator的aggregate

这个就是整合尾部的头信息,因为chunk协议可能会有尾部头信息的。

@Override
    protected void aggregate(FullHttpMessage aggregated, HttpContent content) throws Exception { 
        if (content instanceof LastHttpContent) { //如果是最后的尾部内容就整合尾部头信息
            // Merge trailing headers into the message.
            ((AggregatedFullHttpMessage) aggregated).setTrailingHeaders(((LastHttpContent) content).trailingHeaders());
        }
    }

finishAggregation0

完成聚合,标志位也设置为false了,最后再坚持一遍头信息。

private void finishAggregation0(O aggregated) throws Exception { 
        aggregating = false;
        finishAggregation(aggregated);
    }

finishAggregation

最后检查下,如果没设置Content-Length头的话要设置。

@Override
    protected void finishAggregation(FullHttpMessage aggregated) throws Exception { 
        if (!HttpUtil.isContentLengthSet(aggregated)) { //没设置Content-Length头的话要设置
            aggregated.headers().set(
                    CONTENT_LENGTH,
                    String.valueOf(aggregated.content().readableBytes()));
        }
    }

基本上所有的方法都讲了,其实说白了,就是把先到的包保存下来,等最后接收完了一起传递给后面的。其他的一些异常什么的就不说了,自己看看就好了。最后要注意用的时候,这个放到HttpResponseEncoder后面,否则他出站的错误消息不经过HttpResponseEncoder响应解码器,底层传输是不支持的:
在这里插入图片描述

好了,今天就到这里了,希望对学习理解有帮助,大神看见勿喷,仅为自己的学习理解,能力有限,请多包涵。

相关文章