Example how simple queue (channel) can be processed concurrently in Perl:


#!/usr/bin/perl

#use threads;            # threads are buggy... Use forks.
#use threads::shared;

use forks;
use forks::shared;      # deadlock => ( detect=>0, period=>2, resolve=>0 );

use feature ':5.24';
use strict;
use warnings;
use English qw( -no_match_vars );

use Const::Fast;        ## libconst-fast-perl
use Thread::Queue;
use Time::HiRes qw(sleep);

use sigtrap 'handler' => sub {
    return unless 0 == threads->tid;  ## Main process tid is 0; run this handler only in main process.
    say q{Meow};
    exit 2;
}, qw(INT);

my $CPUs = eval { require Sys::CpuAffinity and Sys::CpuAffinity::getNumCpus(); } ## libsys-cpuaffinity-perl
           || qx{getconf _NPROCESSORS_ONLN}
           || 4
;

local $OUTPUT_AUTOFLUSH=1; # local $|=1;

const my $JOBSIZE => 17 * 8;
my $workers = 3* $CPUs - 1;

my @data :shared;
my $data_count :shared = 0;
my $q :shared = Thread::Queue->new();   # Make a new empty queue.
my $qd = int rand(22);
warn "I: queue depth is $qd\n";
$q->limit = $qd;       # Set queue capacity. Random depth is just for fun. :)

## Takse one argument - Thread::Queue to fetch data from.
sub generator {
    my ($q) = @_;
    while ( my $val = $q->dequeue_timed(11) ) {
        say "T: got ", $val;

        ## crunch-crunch
        sleep rand(3);  ## rate limit; simulating slow processing.
        $val = "qqq" . $val;

        ## Locking this is very-very important;
        ## If already locked by another thread it will wait until lock can be acquired.
        lock($data_count);
        push @data, $val;
        $data_count += 1;
    } # Lock implicitly released at end of scope
    warn "T: thread "
        ,threads->tid
        ," is finished.\n"
    ;
    return;
}

## starting workers.
for (1..$workers) {
    threads->create( \&generator, $q ) or die("E: can't fork\n");
};

#say Dumper ":::", forks->list;

for (my $i=1; $i<=$JOBSIZE; $i++) {
    $q->enqueue($i);      ## push data into queue.
}

warn "I: ending queue; no more items to add.\n";
$q->end();    ## have to do this so threads could finish.

warn "I: pending items in the queue: ",
     ,$q->pending()
     ,"\n"
;

warn "I: waiting for results\n";
#cond_wait($data_count);
sleep 0.5 while $data_count < $JOBSIZE;    ## wait for workers to finish processing queue.

print "I: joining threads ";
for (threads->list()){
    if ($_->is_joinable()){
        $_->join();        ## un-fork workers: finished threads should be joined too.
        print $_->tid
             ," "
        ;
    }
}
say q{};

my $processed_count = scalar @data;
if ($data_count == $processed_count){
    say "OK: processed ", $processed_count, " items.";
}else{
    say "ERR: processed ", $processed_count, " of ", $data_count, ".";
    exit 1;
}

__END__

Donations are much appreciated.