Synchronization of multiple objects

The following program illustrates use of signals and promises in synchronization of work of multiple objects in POOL. Object of the supervisor class handles asynchronously incoming tasks and distibutes them among available worker objects. If all workers are busy, the task is buffered and executed as soon as free worker appears.
Object sender allows to adjust tasks length (prep function arguments) and frequency of sending tasks (timer instructions arguments) - you may adjust the amount of work to the performance of your computer. Too high load will cause constantly increasing number of buffered tasks.


to worker :n ;class for objects making calculations
  to do_work :inp :len
    repeat :len ["inp += (runif -0.0001 0.0001)]
    signal "done ;signal that job is finished
    op :inp      ;and output the calculated value
  "idx := :n
  signal "ready  ;signal that constructor is finished

to supervisor :n ;class for the object distributing the jobs
  to onsignaldone :turtle
    :njobs,(:idx @ :turtle) += 1 ;count jobs done on the worker
    queue :free :turtle          ;and add worker to the list of available
    if not empty? :buffer [signal "dobuffer;try to complete buffered jobs

  to onsignalwork :turtle :data  ;handle arriving data
    if empty? :free [ ;all workers busy, buffer this chunk
      queue :buffer :data
      (print "buffered count :buffer)
      op false
    ;send job to the worker, return result as a promise
    :a,(:data,3) := (do_work :data,1 :data,2) @ dequeue :free
    op true

  let "buffer []
  to onsignaldobuffer
    while not empty? :buffer [
      if not onsignalwork this dequeue :buffer [stop]
      if empty? :buffer [print "|buffered jobs done|]

  let "free []
  repeat :n [queue :free (anewobject $worker repcount)]
  (waitsignal "ready :free;wait for all worker constructors
  "njobs := (newarray :n 0)

  "t := timer [(print "|jobs done:| :njobs)] 3000

to sender ;class for the object sending data
  let "j 0
  to prep :len ;send next value from :a for processing
    let "i 1 + :j % count :a
    if number? :a,:i [ ;do anything with the value to realize promise
      (signal "work (array :a,:i :len :i))
    "j += 1
  "r1 := timer [prep 300000] 70 ;frequently send data for short jobs
  "r2 := timer [                ;and sometimes much longer job
    if random 3 < 1 [print "|start long job| prep 5000000]
  ] 500

(shared "s "a)
"a := (newarray 1000 0)
"sup := (newobject $supervisor :sys_inf,"cpu_count)
"send := newobject $sender

"p := newp "pool [
  "t := timer ["data := (histogram :a 50 (-0.2) 0.2) refresh] 1000
:pool_cfg,"min_x := -0.3
:pool_cfg,"max_x := 0.3
:pool_cfg,"min_y := 0
:pool_cfg,"max_y := 300


jobs done: {0 0 0 0}
start long job
jobs done: {11 16 16 16}
start long job
start long job
jobs done: {23 26 31 31}
start long job
start long job
start long job
start long job
jobs done: {33 28 43 36}
start long job
buffered 1
buffered jobs done
start long job
jobs done: {54 36 55 44}

See also:

Turtle - plot - object

Table of Conent