matlab 如何同步返回未知序列的异步进程的结果?

zbq4xfa0  于 7个月前  发布在  Matlab
关注(0)|答案(1)|浏览(60)

bounty将在6天后过期。回答此问题可获得+50声望奖励。Will希望引起更多关注此问题。

考虑一个昂贵的计算过程,无论它处理多少数据,都可以很好地扩展,需要在交互式MATLAB环境中提供,以便随时生成新数据。如果新数据到达时,过程的示例已经在进行中,我想放弃该过程并重新启动所有现有数据,以便尽早获得迄今为止所有已知数据的结果。
然而,该过程的早期示例可能由同步函数调用触发,我仍然希望在替换过程完成后返回一个值。
下面的最小示例类重新创建了这种行为:它有一个向存储数组添加元素的方法,以及一个main work方法,该方法在首先执行一些缓慢的操作后,简单地将迄今为止所有存储的数据乘以2。work方法DoWork使用parfeval异步调用,以便任何未完成的调用都可以由cancel引导,而不会影响最终的输出结果。

classdef ExampleClass < handle
    properties
        InputData
        OutputData
    end

    properties (Hidden, Transient)
        WorkFuture parallel.Future
    end
    
    methods
        function obj = ExampleClass()
            uibutton(uifigure, ButtonPushedFcn = @(~,evt) obj.AddData(evt.Source.Parent.CurrentPoint(1)));
        end

        function output = DoWork(obj)
            pause(2);
            drawnow
            output = obj.InputData * 2;
        end

        function val = StoreOutput(obj,val)
            obj.OutputData = val;
        end

        function F = AddData(obj,val)
            obj.WorkFuture.cancel;
            obj.InputData(end+1) = val;
            F = parfeval(parallel.Pool.empty,@obj.DoWork,1).afterAll(@obj.StoreOutput,1);
            obj.WorkFuture = F;
        end
    end
end

字符串
示例化类h = ExampleClass;创建了一个uifigure,它带有一个按钮,可以将新数据(鼠标X坐标)输入到对象中。连续单击它几次将始终导致存储在InputData属性中的元素数量,并在最后一次单击后的2秒内将相同数量的结果输出存储在OutputData中。
现在假设有一个同步函数需要这个进程的结果,其中包括输入值n的输出。如果它是进程的唯一客户端,它可以通过调用来获得这个结果:

result = h.AddData(n).fetchOutputs;


但是,如果在工作仍在完成时单击该按钮以添加更多输入数据,则从AddData返回的Future将被取消,因此结果是错误:

Error using parallel.Future/fetchOutputs
One or more futures resulted in an error.

Caused by:
    Execution of the future was cancelled.


错误发生后不久,包含输入n的输出的所需数据仍然存储在h中,但该信息无法到达串行进程result = h.AddData(n).fetchOutputs;
有没有办法定义一个同步调用,它具有相同的预期效果(触发一个异步进程,并最终在该进程完成时返回一个值),可以容忍它触发的初始异步进程的取消?
这可能采取Future将责任“传递”给另一个的形式,而不是简单地停止有效,或者通知串行代码可以以某种方式等待的事件,但这两个概念对我来说都没有明确的实现方式,所以任何能够做这些事情或解决问题的解决方案都是受欢迎的。

kiz8lqtg

kiz8lqtg1#

有没有办法定义一个同步调用,它具有相同的预期效果(触发一个异步进程,并最终在该进程完成时返回一个值),可以容忍它触发的初始异步进程的取消?
ExampleClass的实现可以确保同步调用等待最新的异步进程完成,即使它发起的原始进程被取消。
这可以通过维护对最新Future object的引用并使用wait until the computation completes的循环来实现:

classdef ExampleClass < handle
    properties
        InputData
        OutputData
        LatestFuture parallel.Future
    end
    
    methods
        function obj = ExampleClass()
            uibutton(uifigure, 'ButtonPushedFcn', @(~,evt) obj.AddData(evt.Source.Parent.CurrentPoint(1)));
            obj.LatestFuture = parallel.Future;
        end

        function output = DoWork(obj)
            pause(2); % Simulate a slow process
            output = obj.InputData * 2;
        end

        function StoreOutput(obj, val)
            obj.OutputData = val;
        end

        function F = AddData(obj, val)
            if ~isempty(obj.WorkFuture) && isvalid(obj.WorkFuture)
                cancel(obj.WorkFuture);
            end
            obj.InputData(end+1) = val;
            F = parfeval(@obj.DoWork, 1);
            obj.WorkFuture = F;
            obj.LatestFuture = F;
        end

        function output = SyncGetData(obj, val)
            F = obj.AddData(val);
            while ~strcmp(F.State, 'finished')
                pause(0.1); % Wait for the latest future to finish
                if ~isempty(obj.LatestFuture) && isvalid(obj.LatestFuture)
                    F = obj.LatestFuture;
                end
            end
            output = fetchOutputs(F);
        end
    end
end

字符串
LatestFuture属性跟踪最近的异步计算。AddData方法被修改为每当新计算开始时更新LatestFuture。新方法SyncGetData会等待最新计算的结果,即使原始计算被取消。

+----------------------+
| SyncGetData          |
| +------------------+ |
| |  Call AddData    |-----> Starts new async process
| +------------------+ |
| |  Wait Loop       |-----> Waits for the latest Future
| +------------------+ |       to complete
| |  Fetch Output    |-----> Retrieves result from the latest
| +------------------+ |       completed process
+----------------------+


您现在应该使用result = h.SyncGetData(n);,而不是直接调用h.AddData(n).fetchOutputs;来获得同步结果。

相关问题