可以在storm中从清理方法发出吗?

lf3rwulv  于 2021-06-21  发布在  Storm
关注(0)|答案(0)|浏览(331)

我有一个bolt,它从db获取数据并对数据进行一些处理,然后将数据作为双数组发送到另一个bolt。
新的螺栓需要所有这些数组的数据才能找到值。我只需要这个值一次。应该在我刚才提到的过程之后完成。
如何确保此螺栓仅在其进程完成时发出数据?
我如何使用它的输出呢?
这是我的拓扑结构:

//Topology definition
TopologyBuilder builder = new TopologyBuilder();

builder.setSpout    ("sample-split",new SampleSplit(), 1);
builder.setBolt     ("tuple-split", new TupleSplit(), 1)
            .shuffleGrouping("sample-split");
builder.setBolt     ("read-images", new ReadImage (), 10)
            .shuffleGrouping("tuple-split");
builder.setBolt     ("filter-image", new FilterImage (), 10)
        .shuffleGrouping("read-images");
builder.setBolt     ("wavelet-transformation", new WaveletTransformation (), 5)
        .shuffleGrouping("filter-image");
//builder.setBolt     ("PNMSTD", new PNMSTD (), 5 ).
//    shuffleGrouping ("wavelet-transformation");

我想在storm上实现以下代码:

public static double[][] PNMSTD(String MS,double []MP, double []MN,String TrainingSetFile,String WvdecFileDir,
            String TestAnnotationLevelFromorOnly,int TestAnnotationID,int TestAnnotationLevel)
    {
    BufferedReader inFile;
    int Psamples=0;
    int Nsamples=0;
    int N=0;
    try {
        inFile=new BufferedReader(
                new FileReader(TrainingSetFile));
        String StrSqls=inFile.readLine();
        String TSet[]=StrSqls.split(", ");
        String DataFile=WvdecFileDir+"/e"+TSet[1]+"_1.dat";
        double C[]=FileIO.Xread(DataFile);
        N=C.length;
        inFile.close();
    }
    catch (FileNotFoundException e)
    {
        e.printStackTrace();
    }
    catch (IOException e)
    {
        e.printStackTrace();
    }
    double []MSP=new double[N];
    double []MSN=new double[N];
    double [][] MeanStd=new double[2][N];
    for(int i=0;i<N;i++)
    {
        MSP[i]=0;
        MSN[i]=0;
    }
    try {
        inFile=new BufferedReader(
                new FileReader(TrainingSetFile));
        String StrSqls;
        while ((StrSqls=inFile.readLine() )!= null){
            //String StrSqls=inFile.readLine();
            //System.out.println(StrSqls);
            String TSet[]=StrSqls.split(", ");
            String DataFile=WvdecFileDir+"/e"+TSet[1]+"_1.dat";
            double C[]=FileIO.Xread(DataFile);  
            if(TestAnnotationLevelFromorOnly.equals("From"))
            {
                if (Integer.parseInt(TSet[TestAnnotationID+4])>=TestAnnotationLevel)
                {
                    for(int i=0;i<C.length;i++)
                    {
                        if (MS.equals("Mean")){
                        MSP[i]=MSP[i]+C[i];}
                        else{MSP[i]=MSP[i]+(C[i]-MP[i])*(C[i]-MP[i]);
                        }
                    }
                    Psamples++;
                }
                else
                {
                    for(int i=0;i<C.length;i++)
                    {
                        if (MS.equals("Mean")){MSN[i]=MSN[i]+C[i];}
                        else{MSN[i]=MSN[i]+(C[i]-MN[i])*(C[i]-MN[i]);}

                    }
                    Nsamples++;
                }
            }
            else
            {
                if (Integer.parseInt(TSet[TestAnnotationID+4])==TestAnnotationLevel)
                {
                    for(int i=0;i<C.length;i++)
                    {if (MS.equals("Mean")){MSP[i]=MSP[i]+C[i];}
                    else{MSP[i]=MSP[i]+(C[i]-MP[i])*(C[i]-MP[i]);}
                    }
                    Psamples++;
                }
                else
                {
                    for(int i=0;i<C.length;i++)
                    {
                        if (MS.equals("Mean")){MSN[i]=MSN[i]+C[i];}
                        else{MSN[i]=MSN[i]+(C[i]-MN[i])*(C[i]-MN[i]);}
                    }
                    Nsamples++;
                }
            }   
        }
        inFile.close();
    }
    catch (FileNotFoundException e)
    {
      e.printStackTrace();
    } 
    catch (IOException e)
    {
      e.printStackTrace();       
    }   
    for(int i=1;i<N;i++)
         {
             MeanStd[0][i]=MSP[i]/Psamples;
             MeanStd[1][i]=MSN[i]/Psamples; // Meaningless should be asked!!!
         }      
    return MeanStd;
}

为了使这个函数工作,我需要创建文件。前面的bolt正在处理和创建这些文件。对于下一个bolt,我需要这个bolt完成所有输入的处理(所以我可以在下一个bolt中使用输出。)

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题