Perl Mojolicious:使用all_settled限制并发

sz81bmfz  于 8个月前  发布在  Perl
关注(0)|答案(1)|浏览(61)

Mojolicious for Perl中,当使用Promise->all_settled时,是否有方法限制concurrency
在下面的例子中,我想限制concurrency=>10。我使用sleep来模拟阻塞操作:

use Mojolicious::Lite -signatures, -async_await;

helper isOdd_p => sub($self, $number)
{
    return Mojo::Promise->new(sub($resolve, $reject ) {
        Mojo::IOLoop->subprocess(
            sub {
                sleep 1;
                $number % 2;
            },
            sub ($subprocess, $err, @res ) {
                $reject->( $err ) if $err;
                $reject->( @res ) if @res && $res[0]==0; # reject Even
                $resolve->( @res );
            })
            
        });
};

any '/' => async sub ($c) {
    $c->render_later();
    my @promises = map { $c->isOdd_p($_) } (0..50);
    my @results = eval { await Mojo::Promise->all_settled(@promises) };
    #my @results = eval { await Mojo::Promise->map({concurrency=>10}, sub { $c->isOdd_p($_)}, (0..50) ) };
    if (my $err = $@) {
        $c->render(json => $@); # will this line be ever reached with all_settled??
    } else {
        $c->render(json => [ @results ] );
    }
};

app->start;
p8h8hvxi

p8h8hvxi1#

当前,您将启动所有51个任务,然后等待所有51个任务完成。
将并发性限制为10将意味着启动10个任务,等待一个完成,启动一个,等待一个完成,等等。
map做三个微小的修改就足够了。(将all的两个引用更改为all_settled,并将拒绝处理程序更改为继续。还将其从M::P方法更改为sub方法。)

sub map_all_settled {
  my ($class, $options, $cb, @items) = ('Mojo::Promise', ref $_[0] eq 'HASH' ? shift : {}, @_);
 
  return $class->all_settled(map { $_->$cb } @items) if !$options->{concurrency} || @items <= $options->{concurrency};
 
  my @start = map { $_->$cb } splice @items, 0, $options->{concurrency};
  my @wait  = map { $start[0]->clone } 0 .. $#items;
 
  my $start_next = sub {
    return () unless my $item = shift @items;
    my ($start_next, $chain) = (__SUB__, shift @wait);
    $_->$cb->then(sub { $chain->resolve(@_); $start_next->() }, sub { $chain->reject(@_); $start_next->() }) for $item;
    return ();
  };
 
  $_->then($start_next, sub { }) for @start;
 
  return $class->all_settled(@start, @wait);
}
my @results = await map_all_settled({concurrency=>10}, sub { $c->isOdd_p($_)}, (0..50) );

相关问题