NAME
    Coro::ProcessPool - an asynchronous process pool

SYNOPSIS
        use Coro::ProcessPool;
        use Coro;

        my $pool = Coro::ProcessPool->new(
            max_procs => 4,
            max_reqs  => 100,
            include   => ['/path/to/my/task/classes', '/path/to/other/packages'],
        );

        my $double = sub { $_[0] * 2 };

        #-----------------------------------------------------------------------
        # Process in sequence
        #-----------------------------------------------------------------------
        my %result;
        foreach my $i (1 .. 1000) {
            $result{$i} = $pool->process($double, [$i]);
        }

        #-----------------------------------------------------------------------
        # Process as a batch
        #-----------------------------------------------------------------------
        my @results = $pool->map($double, 1 .. 1000);

        #-----------------------------------------------------------------------
        # Defer waiting for result
        #-----------------------------------------------------------------------
        my %deferred = map { $_ => $pool->defer($double, [$_]) } 1 .. 1000);
        foreach my $i (keys %deferred) {
            print "$i = " . $deferred{$i}->() . "\n";
        }

        #-----------------------------------------------------------------------
        # Use a "task class", implementing 'new' and 'run'
        #-----------------------------------------------------------------------
        my $result = $pool->process('Task::Doubler', 21);

        #-----------------------------------------------------------------------
        # Pipelines (work queues)
        #-----------------------------------------------------------------------
        my $pipe = $pool->pipeline;

        # Start producer thread to queue tasks
        my $producer = async {
            while (my $task = get_next_task()) {
                $pipe->queue('Some::TaskClass', $task);
            }

            # Let the pipeline know no more tasks are coming
            $pipe->shutdown;
        };

        # Collect the results of each task as they are received
        while (my $result = $pipe->next) {
            do_stuff_with($result);
        }

        $pool->shutdown;

DESCRIPTION
    Processes tasks using a pool of external Perl processes.

ATTRIBUTES
  max_procs
    The maximum number of processes to run within the process pool. Defaults
    to the number of CPUs on the ssytem.

  max_reqs
    The maximum number of tasks a worker process may run before being
    terminated and replaced with a fresh process. This is useful for tasks
    that might leak memory over time.

  include
    An optional array ref of directory paths to prepend to the set of
    directories the worker process will use to find Perl packages.

PRIVATE ATTRIBUTES
  procs_lock
    Semaphore used to control access to the worker processes. Starts
    incremented to the number of processes ("max_procs").

  num_procs
    Running total of processes that are currently running.

  procs
    Array holding the Coro::ProcessPool::Process objects.

  all_procs
  is_running
    Boolean which signals to the instance that the "shutdown" method has
    been called.

METHODS
  capacity
    Returns the number of free worker processes.

  shutdown
    Shuts down all processes and resets state on the process pool. After
    calling this method, the pool is effectively in a new state and may be
    used normally.

  process($f, $args)
    Processes code ref $f in a child process from the pool. If $args is
    provided, it is an array ref of arguments that will be passed to $f.
    Returns the result of calling $f->(@$args).

    Alternately, $f may be the name of a class implementing the methods
    "new" and "run", in which case the result is equivalent to calling
    $f->new(@$args)->run(). Note that the include path for worker processes
    is identical to that of the calling process.

    This call will yield until the results become available. If all
    processes are busy, this method will block until one becomes available.
    Processes are spawned as needed, up to "max_procs", from this method.
    Also note that the use of "max_reqs" can cause this method to yield
    while a new process is spawned.

  map($f, @args)
    Applies $f to each value in @args in turn and returns a list of the
    results. Although the order in which each argument is processed is not
    guaranteed, the results are guaranteed to be in the same order as @args,
    even if the result of calling $f returns a list itself (in which case,
    the results of that calcuation is flattened into the list returned by
    "map".

  defer($f, $args)
    Similar to "process" in ., but returns immediately. The return value is
    a code reference that, when called, returns the results of calling
    "$f-"(@$args)>.

        my $deferred = $pool->defer($coderef, [ $x, $y, $z ]);
        my $result   = $deferred->();

  pipeline
    Returns a Coro::ProcessPool::Pipeline object which can be used to pipe
    requests through to the process pool. Results then come out the other
    end of the pipe. It is up to the calling code to perform task account
    (for example, by passing an id in as one of the arguments to the task
    class).

        my $pipe = $pool->pipeline;

        my $producer = async {
            foreach my $args (@tasks) {
                $pipe->queue('Some::Class', $args);
            }

            $pipe->shutdown;
        };

        while (my $result = $pipe->next) {
            ...
        }

    All arguments to "pipeline()" are passed transparently to the
    constructor of Coro::ProcessPool::Pipeline. There is no limit to the
    number of pipelines which may be created for a pool.

    If the pool is shutdown while the pipeline is active, any tasks pending
    in Coro::ProcessPool::Pipeline::next will fail and cause the next call
    to "next()" to croak.

A NOTE ABOUT IMPORTS AND CLOSURES
    Code refs are serialized using Storable to pass them to the worker
    processes. Once deserialized in the pool process, these functions can no
    longer see the stack as it is in the parent process. Therefore, imports
    and variables external to the function are unavailable.

    Something like this will not work:

        use Foo;
        my $foo = Foo->new();

        my $result = $pool->process(sub {
            return $foo->bar; # $foo not found
        });

    Nor will this:

        use Foo;
        my $result = $pool->process(sub {
            my $foo = Foo->new; # Foo not found
            return $foo->bar;
        });

    The correct way to do this is to import from within the function:

        my $result = $pool->process(sub {
            require Foo;
            my $foo = Foo->new();
            return $foo->bar;
        });

    ...or to pass in external variables that are needed by the function:

        use Foo;
        my $foo = Foo->new();

        my $result = $pool->process(sub { $_[0]->bar }, [ $foo ]);

  Use versus require
    The "use" pragma is run a compile time, whereas "require" is evaluated
    at runtime. Because of this, the use of "use" in code passed directly to
    the "process" method can fail because the "use" statement has already
    been evaluated when the calling code was compiled.

    This will not work:

        $pool->process(sub {
            use Foo;
            my $foo = Foo->new();
        });

    This will work:

        $pool->process(sub {
            require Foo;
            my $foo = Foo->new();
        });

    If "use" is necessary (for example, to import a method or transform the
    calling code via import), it is recommended to move the code into its
    own module, which can then be called in the anonymous routine:

        package Bar;

        use Foo;

        sub dostuff {
            ...
        }

    Then, in your caller:

        $pool->process(sub {
            require Bar;
            Bar::dostuff();
        });

  If it's a problem...
    Use the task class method if the loading requirements are causing
    headaches:

        my $result = $pool->process('Task::Class', [@args]);

COMPATIBILITY
    "Coro::ProcessPool" will likely break on Win32 due to missing support
    for non-blocking file descriptors (Win32 can only call "select" and
    "poll" on actual network sockets). Without rewriting this as a network
    server, which would impact performance and be really annoying, it is
    likely this module will not support Win32 in the near future.

    The following modules will get you started if you wish to explore a
    synchronous process pool on Windows:

    Win32::Process
    Win32::IPC
    Win32::Pipe

AUTHOR
    Jeff Ober <jeffober@gmail.com>

COPYRIGHT AND LICENSE
    Copyright (C) 2013-2015, Jeff Ober. All rights reserved.

    Redistribution and use in source and binary forms, with or without
    modification, are permitted provided that the following conditions are
    met:

    1. Redistributions of source code must retain the above copyright
    notice, this list of conditions and the following disclaimer.

    2. Redistributions in binary form must reproduce the above copyright
    notice, this list of conditions and the following disclaimer in the
    documentation and/or other materials provided with the distribution.

    3. Neither the name of the copyright holder nor the names of its
    contributors may be used to endorse or promote products derived from
    this software without specific prior written permission.

    THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
    IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
    TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
    PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
    HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
    SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED
    TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
    PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
    LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
    NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
    SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.