abstractstringbuilder.ensurecapacityinternal在storm bolt中获取nullpointerexception

mutmk8jj  于 2021-06-24  发布在  Storm
关注(0)|答案(2)|浏览(279)

在线系统,风暴螺栓得到nullpointerexception,虽然我想我检查前61行;它偶尔会得到nullpointerexception;

import***.KeyUtils;
import***.redis.PipelineHelper;
import***.redis.PipelinedCacheClusterClient;
import**.redis.R2mClusterClient;
import org.apache.commons.lang3.StringUtils;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.IRichBolt;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.tuple.Tuple;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;

import java.util.Map;

/**
 * RedisBolt batch operate
 */
public class RedisBolt implements IRichBolt {
    static final long serialVersionUID = 737015318988609460L;
    private static ApplicationContext applicationContext;
    private static long logEmitNumber = 0;
    private static StringBuffer totalCmds = new StringBuffer();
    private Logger logger = LoggerFactory.getLogger(getClass());
    private OutputCollector _collector;
    private R2mClusterClient r2mClusterClient;

    @Override
    public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
        _collector = outputCollector;
        if (applicationContext == null) {
            applicationContext = new ClassPathXmlApplicationContext("spring/spring-config-redisbolt.xml");
        }
        if (r2mClusterClient == null) {
            r2mClusterClient = (R2mClusterClient) applicationContext.getBean("r2mClusterClient");
        }

    }

    @Override
    public void execute(Tuple tuple) {
        String log = tuple.getString(0);
        String lastCommands = tuple.getString(1);

        try {
            //log count
            if (StringUtils.isNotEmpty(log)) {
                logEmitNumber++;
            }

            if (StringUtils.isNotEmpty(lastCommands)) {
                if(totalCmds==null){
                    totalCmds = new StringBuffer();
                }
                totalCmds.append(lastCommands);//line 61
            }

            //日志数量控制
            int numberLimit = 1;
            String flow_log_limit = r2mClusterClient.get(KeyUtils.KEY_PIPELINE_LIMIT);
            if (StringUtils.isNotEmpty(flow_log_limit)) {
                try {
                    numberLimit = Integer.parseInt(flow_log_limit);
                } catch (Exception e) {
                    numberLimit = 1;
                    logger.error("error", e);
                }
            }

            if (logEmitNumber >= numberLimit) {
                StringBuffer _totalCmds = new StringBuffer(totalCmds);
                try {
                    //pipeline submit
                    PipelinedCacheClusterClient pip = r2mClusterClient.pipelined();
                    String[] commandArray = _totalCmds.toString().split(KeyUtils.REDIS_CMD_SPILT);
                    PipelineHelper.cmd(pip, commandArray);
                    pip.sync();
                    pip.close();
                    totalCmds = new StringBuffer();
                } catch (Exception e) {
                    logger.error("error", e);
                }

                logEmitNumber = 0;
            }
        } catch (Exception e) {
            logger.error(new StringBuffer("====RedisBolt error for log=[ ").append(log).append("] \n commands=[").append(lastCommands).append("]").toString(), e);
            _collector.reportError(e);
            _collector.fail(tuple);
        }

        _collector.ack(tuple);
    }

    @Override
    public void cleanup() {

    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
    }

    @Override
    public Map<String, Object> getComponentConfiguration() {
        return null;
    }

}

异常信息:
java.lang.abstractstringbuilder.ensurecapacityinternal(abstractstringbuilder)中的java.lang.nullpointerexception。java:113)在java.lang.abstractstringbuilder.append(abstractstringbuilder。java:415)在java.lang.stringbuffer.append(stringbuffer。java:237)在com.jd.jr.dataeye.storm.bolt.redisbolt.execute(redisbolt。java:61)在org.apache.storm.daemon.executor$fn\u 5044$tuple\u action\u fn\u 5046.invoke(executor。clj:727)在org.apache.storm.daemon.executor$mk\u task\u receiver$fn\uu 4965.invoke(executor。clj:459)在org.apache.storm.disruptor$clojure\u handler$reify\u 4480.onevent(disruptor。clj:40)在org.apache.storm.utils.disruptorqueue.consumebatchtocursor(disruptorqueue。java:472)在org.apache.storm.utils.disruptorqueue.consumebatchwhenavailable(disruptorqueue。java:451)在org.apache.storm.disruptor$consume\u batch\u when\u available.invoke(disruptor。clj:73)在org.apache.storm.daemon.executor$fn\uuu5044$fn\uuu5057$fn\uuu5110.invoke(executor。clj:846)在org.apache.storm.util$async\u loop$fn\u 557.invoke(util。clj:484)在clojure.lang.afn.run(afn。java:22)在java.lang.thread.run(线程。java:745)
谁能给我一些建议来找出原因。

f5emj3cl

f5emj3cl1#

我想我可能找到了问题所在;
关键是
“stringbuffer _totalcmds=新建stringbuffer(totalcmds);”和“totalcmds.append(lastcommands)”//第61行”
新建对象时,需要执行以下步骤:
(1) 分配内存并返回引用
(2) 初始化
如果在(1)之后和(2)之前追加,那么stringbuffer.java将扩展abstractstringbuilder.java

/**
 * The value is used for character storage.
 */
char[] value;

值未初始化;所以它将为空:

@Override
public synchronized void ensureCapacity(int minimumCapacity) {
    if (minimumCapacity > value.length) {
        expandCapacity(minimumCapacity);
    }
}

这个污点还有一个问题,在多线程环境下,一些数据可能会丢失

jdgnovmf

jdgnovmf2#

发生这种事真是奇怪。请阅读两个类的代码。
https://github.com/openjdk-mirror/jdk7u-jdk/blob/master/src/share/classes/java/lang/abstractstringbuilder.java
https://github.com/openjdk-mirror/jdk7u-jdk/blob/master/src/share/classes/java/lang/stringbuffer.java
abstractstringbuilder有一个没有参数的构造函数,该构造函数不分配字段“value”,这使得访问“value”字段成为npe。stringbuffer中的任何构造函数都使用该构造函数。所以在序列化/反序列化过程中可能会发生一些奇怪的事情,不幸的是abstractstringbuilder中的“value”字段为空。
也许在prepare()中初始化totalcmds会更好,而且还需要考虑螺栓之间的同步(线程安全)。prepare()可以为每个bolt示例调用,因此字段是线程安全的,但类字段不是线程安全的。

相关问题